In [1]:
#### FUNCTION TO CREATE VOCAB

from pathlib import Path
from typing import List, Set, Tuple

import spacy

from utils.io import load_json, save_json
from utils.text_processing import get_lemmas_flat_set, get_lemma_word

%cd /home/alessandro/work/repo/task-testbed
%pwd

remove_stopwords = True
join_composed_kw = False
nlp = spacy.load("en_core_web_lg")


def find_vocabulary_for_label(examples: List, target_label: int) -> Set[str]:
    keyword_set = set()
    for example in examples:
        label = example["label"]
        text = example["text"]
        # texts_train.add(text)
        if target_label == label == 1:
            lw, w = get_lemma_word(example['w_p'], nlp, remove_stopwords, join_result=join_composed_kw)
            la, a = get_lemma_word(example['w_a'], nlp, remove_stopwords, join_result=join_composed_kw)
            lcws, cws = get_lemmas_flat_set(example['c_w'], nlp, remove_stopwords,
                                            join_composed_expressions=join_composed_kw)

            keyword_set.update(la)
            keyword_set.update(a)
            keyword_set.update(lw)
            keyword_set.update(w)
            [keyword_set.add(c) for c in lcws]
            [keyword_set.add(c) for c in cws]
        elif target_label == label == 0:
            keywords_example = set()
            for text_word in nlp(text.lower()):
                if text_word.text.strip() and not text_word.is_punct and (
                        not remove_stopwords or not text_word.is_stop):
                    keywords_example.add(text_word.lemma_)
                    keywords_example.add(text_word.text.strip())
            keyword_set.update(keywords_example)
    return keyword_set

  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


/home/alessandro/work/repo/task-testbed


In [2]:
#### CREATE VOCAB WITH TRAINING KEYWORDS (CONTEXT + PUN WORD + ALT WORD)

NEW_DATASET = "data/legacy/puns/puneval_split"

train_examples = load_json(Path(NEW_DATASET) / "train_puns_original.json")
test_examples = load_json(Path(NEW_DATASET) / "test_puns.json")
print(len(train_examples), len(test_examples))

train_kws = find_vocabulary_for_label(train_examples, 1)
# train_kws_0 = find_vocabulary_for_label(train_examples, 0)
print(len(train_kws))

1248 1341
2274


In [15]:
from typing import Union
##### FIND TOKENS IN TRAINING SET THAT HAVE HIGH CORRELATION WITH POSITIVE OR NEGATIVE EXAMPLES IN THE TEST

import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.linear_model import LogisticRegression
from typing import Optional, List
from tqdm import tqdm

TOP_N_KEYWORDS = 20


def find_high_correlation_tokens(train_examples, test_examples, top_k_coeffs: float, ngram_range: Tuple[int, int],
                                 min_df: Union[float, int], max_features: int = None,
                                 stop_words=None,
                                 vocabulary: Optional[List[str]] = None, min_diff: int = 1) -> Tuple[
    pd.DataFrame, float]:
    # Step 1: Create a binary feature matrix for the training examples
    vectorizer = CountVectorizer(vocabulary=vocabulary, binary=True, ngram_range=ngram_range, analyzer="word",
                                 min_df=min_df, max_features=max_features, stop_words=stop_words)
    X_train = vectorizer.fit_transform([example["text"] for example in train_examples]).toarray()
    y_train = np.array([example["label"] for example in train_examples])

    # discover how the ngrams works

    # Step 2: Train a Logistic Regression model
    model = LogisticRegression(solver="liblinear")
    model.fit(X_train, y_train)

    if vocabulary is None:
        vocabulary = list(sorted(vectorizer.vocabulary_.keys()))

    # Step 3: Analyze feature coefficients
    coefficients = model.coef_[0]

    # Step 4: Create a binary feature matrix for the test examples
    X_test = vectorizer.transform([example["text"] for example in test_examples]).toarray()
    y_test = np.array([example["label"] for example in test_examples])
    acc = model.score(X_test, y_test)

    # Step 5: Count keyword occurrences in the test set
    total_count = X_test.sum(axis=0)
    # Count occurrences of keywords in positive and negative examples
    pos_counts = X_test[y_test == 1].sum(axis=0)
    neg_counts = X_test[y_test == 0].sum(axis=0)

    # Create DataFrame for counts
    combined_df = pd.DataFrame({
        'Keyword': vocabulary,
        'Coefficient': coefficients,
        'Support': total_count,
        'Positive Count': pos_counts,
        'Negative Count': neg_counts
    })

    # Sort the combined DataFrame by coefficient
    combined_df = combined_df.sort_values(by='Coefficient', ascending=False)

    # DEBUG STUFF
    # exising_words = [vocabulary[jj] for jj in np.nonzero(total_count)[0].tolist()]
    # elem = [test_examples[jj]["text"] for jj in np.nonzero(X_test[:, vocabulary.index("add oil")])[0].tolist()]
    combined_df['Diff'] = (combined_df['Positive Count'] - combined_df['Negative Count'])
    combined_df = combined_df[combined_df['Support'] >= min_diff]

    # Calculate the threshold for the top-k%
    threshold = combined_df['Coefficient'].quantile(1 - top_k_coeffs)

    # Filter the DataFrame to keep only rows with coefficients in the top-k%
    combined_df = combined_df[combined_df['Coefficient'] >= threshold]

    # Display the top keywords by positive coefficients along with their counts
    # print("Top keywords by positive coefficients:")
    # print(combined_df[combined_df['Coefficient'] > 0].head(20))

    # print("\nTop keywords by negative coefficients:")
    # print(combined_df[combined_df['Coefficient'] < 0].head(20))

    # Display keywords that are present in all positive examples
    # print("\nKeywords present in all positive examples:")
    # print(combined_df[combined_df['Positive Count'] == (y_test == 1).sum()])

    combined_df['Ratio'] = (combined_df['Positive Count'] - combined_df['Negative Count']).abs() / combined_df[
        ['Positive Count', 'Negative Count']].max(axis=1)
    # print(combined_df['Ratio'].min(), combined_df['Ratio'].max())
    # Calculate the combined score (you can adjust the formula as needed)
    combined_df['Combined Score'] = combined_df['Coefficient'].abs() * combined_df['Ratio']

    return combined_df, acc


top_train_kw, acc = find_high_correlation_tokens(train_examples, test_examples, 0.1, ngram_range=(1, 1), min_df=10,
                                                 vocabulary=list(train_kws), stop_words="english", min_diff=10)
print(f"Selected {len(top_train_kw)} keywords. Acc={acc}.")
print()
top_train_unigrams, acc = find_high_correlation_tokens(train_examples, test_examples, 0.1, ngram_range=(1, 1),
                                                       min_df=10,
                                                       stop_words="english", min_diff=10)
print(f"Selected {len(top_train_unigrams)} uni-grams. Acc={acc}.")
print()
top_train_bigrams, acc = find_high_correlation_tokens(train_examples, test_examples, 0.2, ngram_range=(2, 2), min_df=3,
                                                      min_diff=3)
print(f"Selected {len(top_train_bigrams)} bi-grams. Acc={acc}.")
print()
top_train_trigrams, acc = find_high_correlation_tokens(train_examples, test_examples, 0.3, ngram_range=(3, 3), min_df=3,
                                                       min_diff=3)
print(f"Selected {len(top_train_trigrams)} tri-grams. Acc={acc}.")
print()
top_train_4grams, acc = find_high_correlation_tokens(train_examples, test_examples, 0.3, ngram_range=(4, 4), min_df=3,
                                                     min_diff=3)
print(f"Selected {len(top_train_4grams)} 4-grams. Acc={acc}.")
top_ngrams = pd.concat([top_train_kw, top_train_bigrams, top_train_trigrams, top_train_4grams], ignore_index=True)
# top_ngrams = top_train_kw
# Remove rows with duplicate 'Keyword' values, keeping the first occurrence
top_ngrams = top_ngrams.sort_values(by='Combined Score', ascending=False)
top_ngrams = top_ngrams.drop_duplicates(subset='Keyword', keep='first').nlargest(TOP_N_KEYWORDS, 'Combined Score')
# Select the top-k n-grams based on the combined score
top_k_ngrams = top_ngrams
print(f"Selected {len(top_k_ngrams)} n-grams")

tot_g = 0
words = list()
for i in tqdm(range(len(top_k_ngrams))):
    row = top_k_ngrams.iloc[i]
    g = int(row["Diff"])
    if g > 0:
        words.append({"expression": str(row["Keyword"]), "count": g})
        tot_g += g
save_json(Path("dataset_scripts/shortcuts_v2.json"), words)
print(tot_g)

top_ngrams

Selected 5 keywords. Acc=0.6823266219239373.

Selected 4 uni-grams. Acc=0.5734526472781506.

Selected 30 bi-grams. Acc=0.6964951528709918.

Selected 5 tri-grams. Acc=0.5831469052945563.

Selected 2 4-grams. Acc=0.5764354958985831.
Selected 20 n-grams


100%|██████████| 20/20 [00:00<00:00, 18047.78it/s]

701





Unnamed: 0,Keyword,Coefficient,Support,Positive Count,Negative Count,Diff,Ratio,Combined Score
40,never die they just,2.779405,58,58,0,58,1.0,2.779405
0,tom,2.617024,61,61,0,61,1.0,2.617024
5,said tom,2.527773,24,24,0,24,1.0,2.527773
6,it was,1.819988,23,23,0,23,1.0,1.819988
35,never die they,1.81857,60,60,0,60,1.0,1.81857
7,he was,1.744455,18,18,0,18,1.0,1.744455
1,old,1.798727,75,70,5,65,0.928571,1.670247
36,die they just,1.604793,58,58,0,58,1.0,1.604793
37,she was only,1.570773,19,19,0,19,1.0,1.570773
2,said,1.703012,50,46,4,42,0.913043,1.554924
