In [3]:
# Import necessary libraries
import torch
from transformers import BertTokenizer, BertModel
from sentence_transformers import SentenceTransformer, util, models
from nltk.tokenize import word_tokenize
from spacy.matcher import Matcher
import spacy
import spacy_transformers
import random
import pandas as pd
from tqdm import tqdm
import math

In [4]:
import re

In [5]:
# Specify the BERT model variant you want to use
model_name = "bert_model/"

# Load BERT tokenizer and model
tokenizer_w = BertTokenizer.from_pretrained(model_name)
model_w = BertModel.from_pretrained(model_name, output_attentions=True)
word_embedding_model = models.Transformer(model_name)
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension(), 'cls')
model_s = SentenceTransformer(modules=[word_embedding_model, pooling_model])

Some weights of the model checkpoint at bert_model/ were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.decoder.weight', 'cls.predictions.decoder.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertModel were not initialized from the model checkpoint at bert_model/ and are newly initialized: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
You should probably

In [6]:
def get_attention_matrix(sentence, target_words, tokenizer, model):
    # Initialize tokenizer and model

    # Tokenize input and obtain outputs
    inputs = tokenizer(sentence, return_tensors="pt", truncation=True, padding=True, max_length=512)
    outputs = model(**inputs)

    # Get attention weights from outputs
    attentions = outputs.attentions  # List of attention tensors for each layer

    # Define weights for the last four layers
    weights = torch.tensor([0.1, 0.2, 0.3, 0.4], device=attentions[0].device)

    # Extract the last four layers and apply weights
    last_four_layers = torch.stack(attentions[-4:])
    weighted_layers = last_four_layers * weights[:, None, None, None, None]

    # Sum across the weighted layers and then average over the heads
    weighted_sum = torch.sum(weighted_layers, dim=0)
    avg_attention = torch.mean(weighted_sum, dim=1)[0]

    # Aggregate subword attentions for whole words
    tokens = tokenizer.tokenize(sentence)
    word_attention_list = []
    word_list = []
    i = 0
    while i < len(tokens):
        if not tokens[i].startswith("##"):
            word = tokens[i]
            if i == len(tokens) - 1 or not tokens[i+1].startswith("##"):
                word_attention_list.append(avg_attention[i].tolist())
            else:
                subword_count = 1
                subword_attention = avg_attention[i].clone()
                word += tokens[i+1][2:]
                while i + subword_count < len(tokens) and tokens[i + subword_count].startswith("##"):
                    subword_attention += avg_attention[i + subword_count]
                    word += tokens[i + subword_count + 1][2:] if i + subword_count + 1 < len(tokens) and tokens[i + subword_count + 1].startswith("##") else ""
                    subword_count += 1
                word_attention_list.append((subword_attention / subword_count).tolist())
                i += subword_count - 1
            word_list.append(word)
        i += 1

   # Convert attention to dictionary form for whole words
    attention_dict = {}
    for i, word in enumerate(word_list):
        attention_dict[word] = {word_list[j]: word_attention_list[i][j] for j in range(len(word_list))}

    # Compute importance scores for all words
    all_importance_scores = {}
    for word, weights in attention_dict.items():
        all_importance_scores[word] = sum(weights.values())

    # Extract importance scores for target words
    importance_scores = {word: all_importance_scores[word] for word in target_words if word in all_importance_scores}

    # Normalize the importance scores so they sum to 1 for the target words
    total_score = sum(importance_scores.values())
    for word in importance_scores:
        importance_scores[word] /= total_score

    return attention_dict, importance_scores


In [7]:
def dependency_importance(joke, target_words):
    return []


In [8]:
def get_semantic_weights(sentence, matcher):

    doc = nlp(sentence)
    matches = matcher(doc)

    chunks = []

    for match in matches:
        match_id, start, end = match
        string_id = nlp.vocab.strings[match_id]
        span = doc[start:end]
        # print(span.text)
        chunks.append((start, end))

    chunk_phrases = [str(doc[start:end]) for start, end in chunks]
    s_embedding = model_s.encode(sentence, convert_to_tensor=True)

    weights = []
    for chunk in chunks:

        start, end = chunk
        chunk_phrase = doc[start:end]

        new_sent = ' '.join([doc[:start].text, doc[end:].text])
        # sentence.replace(chunk, '')
        new_embedding = model_s.encode(new_sent, convert_to_tensor=True)
        # print(chunk_phrase, ": ", new_sent)
        cosine_score = util.cos_sim(s_embedding, new_embedding)
        weights.append(((start, end), 1-cosine_score.cpu().squeeze().numpy()))

    total = sum([score for (chunk, score) in weights])
    weights = [(chunk, score/total) for (chunk, score) in weights]

    weights.sort(key = lambda x : x[1], reverse = True)
    sem_weight_scores = {}
    for (start, end), weight in weights:
      sem_weight_scores[str(doc[start:end])] = weight
    return sem_weight_scores

In [9]:
def get_semantic_weights(sentence, matcher):

    doc = nlp(sentence)
    matches = matcher(doc)

    chunks = []

    for match in matches:
        match_id, start, end = match
        string_id = nlp.vocab.strings[match_id]
        span = doc[start:end]
        # print(span.text)
        chunks.append((start, end))

    chunk_phrases = [str(doc[start:end]) for start, end in chunks]
    attention_dict, importance_scores = get_attention_matrix(sentence, chunk_phrases, tokenizer_w, model_w)
    dependency_scores = dependency_importance(sentence, chunk_phrases)
    s_embedding = model_s.encode(sentence, convert_to_tensor=True)

    weights = []
    for chunk in chunks:

        start, end = chunk
        chunk_phrase = doc[start:end]

        new_sent = ' '.join([doc[:start].text, doc[end:].text])
        # sentence.replace(chunk, '')
        new_embedding = model_s.encode(new_sent, convert_to_tensor=True)
        # print(chunk_phrase, ": ", new_sent)
        cosine_score = util.cos_sim(s_embedding, new_embedding)
        weights.append(((start, end), 1-cosine_score.cpu().squeeze().numpy()))

    total = sum([score for (chunk, score) in weights])
    weights = [(chunk, score/total) for (chunk, score) in weights]

    weights.sort(key = lambda x : x[1], reverse = True)
    sem_weight_scores = {}
    for (start, end), weight in weights:
      sem_weight_scores[str(doc[start:end])] = weight
    return sem_weight_scores, importance_scores, dependency_scores

In [10]:
nlp = spacy.load("en_core_web_trf")

pattern = [{"POS": {"IN": ["NOUN", "PROPN","ADJ","ADV"]}}]

matcher = Matcher(nlp.vocab)
matcher.add("pattern",[pattern])

In [11]:
sample = "why is the fish fishing?"

In [12]:
def mask_function(document, words_to_mask):
    """
    Returns a list of 3-tuples indicating positions of masked words.

    Parameters:
    - document (str): The input document.
    - words_to_mask (list): List of words that need to be masked.

    Returns:
    - list: List of 3-tuples (infilling type, span offset, span length).
    """

    masked_positions = []

    for word in words_to_mask:
        offset = document.find(word)
        while offset != -1:
            masked_positions.append(("mask", offset, len(word)))
            offset = document.find(word, offset + len(word))

    # Sort by offsets to ensure the list is in order
    return sorted(masked_positions, key=lambda x: x[1])


In [13]:
# ... [rest of your code above]
# w1 - semantic, w2 - attention, w3 - dependency
def get_masked_template(sentence, n=0.5, w1=0.5, w2=0.5, w3=0):
    if sentence.count('?'):
        # 1. Split the joke into setup and punchline.
        setup, punchline = sentence.rsplit("?", 1)

        # 2. Calculate semantic and attention scores for the entire sentence.
        sem_weight_scores, attention_scores, dependency_scores = get_semantic_weights(sentence, matcher)

        # Create a final_score dictionary just as before.
        final_score = {}
        for k in sem_weight_scores:
            try:
                final_score[k] = w1*sem_weight_scores[k] + w2*attention_scores[k] + w3*dependency_scores[k]
            except Exception as e:
                # print(e)
                final_score[k] = sem_weight_scores[k]
        # 3. Determine words to be masked for setup and punchline.
        setup_words = re.findall(r'\b\w+\b', setup)
        punchline_words = re.findall(r'\b\w+\b', punchline)

        setup_scores = {word: final_score[word] for word in setup_words if word in final_score}
        punchline_scores = {word: final_score[word] for word in punchline_words if word in final_score}

        setup_masked_words = [k for k, v in sorted(setup_scores.items(), key=lambda item: item[1], reverse=True)]
        punchline_masked_words = [k for k, v in sorted(punchline_scores.items(), key=lambda item: item[1], reverse=True)]
        total_candidates = setup_masked_words + punchline_masked_words
        #print(setup_masked_words, punchline_masked_words)
        
        final_setup_masked_words = setup_masked_words[:math.ceil(len(setup_masked_words)*n)]
        final_punchline_masked_words = punchline_masked_words[:math.ceil(len(punchline_masked_words)*n)]
        #print(final_punchline_masked_words, final_setup_masked_words)

        # 4. Mask the setup and punchline separately.
        masked_setup = mask_function(setup, final_setup_masked_words)
        masked_punchline = mask_function(punchline, final_punchline_masked_words)
        adjustment = len(setup) + 1  # +1 to account for the question mark
        masked_punchline = [(typ, offset + adjustment, length) for typ, offset, length in masked_punchline]

        # 5. Combine the masked setup and punchline.
        masked_joke_spans = masked_setup + masked_punchline

        return sorted(masked_joke_spans, key=lambda x: x[1]), total_candidates
    else:
        sem_weight_scores, attention_scores, dependency_scores = get_semantic_weights(sentence, matcher)
        final_score = {}
        for k in sem_weight_scores:
            try:
                final_score[k] = final_score[k] = w1*sem_weight_scores[k] + w2*attention_scores[k] + w3*dependency_scores[k]
            except Exception as e:
                # print(e)
                final_score[k] = sem_weight_scores[k]
        masked_words = [k for k, v in sorted(final_score.items(), key=lambda item: item[1], reverse=True)]
        final_masked_words = masked_words[:math.ceil(len(masked_words)*n)]
        return mask_function(sentence, final_masked_words), masked_words


In [14]:
get_masked_template("i am fishing for fishies because i love food")

([('mask', 17, 7)], ['fishies', 'food'])

In [15]:
def apply_mask(sentence):
    """
    Masks the specified spans in the sentence.

    Parameters:
    - sentence (str): The input sentence.
    - mask_spans (list): List of 3-tuples specifying spans to mask.

    Returns:
    - str: Sentence with specified spans replaced by [MASK].
    """

    # Reverse the list so that we can mask from the end of the sentence.
    # This ensures that the earlier offsets don't change.
    mask_spans,_ = get_masked_template(sentence)
    mask_spans.reverse()

    for _, offset, length in mask_spans:
        sentence = sentence[:offset] + '[MASK]' + sentence[offset + length:]

    return sentence


In [16]:
apply_mask("What is drake's favorite drug? quack ")

"What is [MASK]'s favorite [MASK]? quack "

In [17]:
f = open('jokegen/test.txt')

In [18]:
import random

In [19]:
data = f.readlines()
data = [i.strip() for i in data if len(i)<80]
random.shuffle(data)

In [20]:
len(data)

12783

In [21]:
from tqdm import tqdm
import random

In [22]:
masked_data = []
random.shuffle(data)
for i in tqdm(data[:200]):
    masked_data.append((i, apply_mask(i)))

100%|█████████████████████████████████████████| 200/200 [00:28<00:00,  6.97it/s]


In [23]:
import pickle
with open("masked_templates_test.pkl","wb") as f:
    pickle.dump(masked_data, f)

## Mask Filling

In [24]:
from transformers import pipeline
from transformers import AutoModelForMaskedLM, DistilBertForMaskedLM
from transformers import AutoTokenizer, DistilBertTokenizer

In [25]:
filling_model = DistilBertForMaskedLM.from_pretrained('distilbert_model/')
filling_tokenizer = DistilBertTokenizer.from_pretrained('distilbert_model/')

In [26]:
#mask_filler = pipeline("fill-mask", model=model_w, tokenizer=tokenizer_w)
mask_filler = pipeline("fill-mask", model=filling_model, tokenizer=filling_tokenizer)

In [27]:
from transformers import pipeline
# mask_filler = pipeline(task="fill-mask", model="bert_finetuning/final_model/")

def fill_in_the_blanks(sent, model, tokenizer, mask_filler):
  # print(sent)
  sent = sent.replace("[MASK]",f"{tokenizer.mask_token}")
  c = sent.count(f"{tokenizer.mask_token}")
  for i in range(c):
    s_embedding = model_s.encode(sent, convert_to_tensor=True)
    sent = mask_filler(sent)
    min_cosine_score = 1
    for mask_candidates in sent:
      if type(mask_candidates) is list:
        for replacement in mask_candidates:
          new_sent = replacement['sequence']
          n_embedding = model_s.encode(new_sent, convert_to_tensor=True)
          cosine_score = util.cos_sim(s_embedding, n_embedding)
          if cosine_score < min_cosine_score:
            min_cosine_score = cosine_score
            best_candidate = new_sent
      else:
        new_sent = mask_candidates['sequence']
        n_embedding = model_s.encode(new_sent, convert_to_tensor=True)
        cosine_score = util.cos_sim(s_embedding, n_embedding)
        if cosine_score < min_cosine_score:
          min_cosine_score = cosine_score
          best_candidate = new_sent
    sent = best_candidate
  # print(sent)
  return sent

In [28]:
import pickle
with open("masked_templates_test.pkl","rb") as f:
    masked_data = pickle.load(f)

In [29]:
bert_outputs = []

In [30]:
c = 1
for i,j in masked_data:
    generated = fill_in_the_blanks(j, model_w, tokenizer_w, mask_filler)
    bert_outputs.append({"original": i, "masked": j, "bert":generated})

In [31]:
bert_outputs[:5]

[{'original': "What's Louis C.K.'s favorite type of meat other than his own? Jerkey",
  'masked': "What's Louis C.K.'s [MASK] [MASK] of [MASK] other than his own? [MASK]",
  'bert': "What's Louis C. K.'s favourite kind of music other than his own?!"},
 {'original': 'Did you hear about the failed Origami shop? If folded.',
  'masked': 'Did you hear about the failed Origami [MASK]? If folded.',
  'bert': 'Did you hear about the failed Origami team? If folded.'},
 {'original': 'How much does a dead elephant weigh? A skele**ton**.',
  'masked': 'How much does a [MASK] [MASK] weigh? A skele**ton**.',
  'bert': 'How much does a capital whale weigh? A skele * * ton * *.'},
 {'original': 'How did the hipster burn his tongue? He ate his pizza BEFORE it was cool.',
  'masked': 'How did the [MASK] burn his tongue? He ate his pizza BEFORE it was [MASK].',
  'bert': 'How did the vampire burn his tongue? He ate his pizza BEFORE it was terrible.'},
 {'original': "What do you call a basement full of S

In [32]:
with open("bert_outputs.pkl","wb") as f:
    pickle.dump(bert_outputs, f)

In [33]:
len(bert_outputs)

200

In [None]:
generated = fill_in_the_blanks("Why did the [MASK] cross the [MASK]? To get to [MASK].", filling_model, filling_tokenizer, mask_filler)

In [None]:
generated

In [None]:
def mask_words_in_sentence(sentence, masked_template):
    """
    Mask words in a sentence based on specified indices and offsets.

    Parameters:
    sentence (str): The input sentence.
    masked_template (list): A list of tuples, each containing the word 'mask',
                            a starting index, and an offset amount.

    Returns:
    list: A list of masked words extracted from the original sentence.
    """
    masked_words = []

    # We'll work with the sentence as a mutable list of characters for easier processing.
    sentence_chars = list(sentence)

    # Processing each mask operation in the template.
    for mask_operation in masked_template:
        # Each operation should be a tuple: ('mask', start_index, offset)
        if len(mask_operation) == 3 and mask_operation[0] == 'mask':
            start_index = mask_operation[1]
            offset = mask_operation[2]

            # Bound checking to avoid index errors.
            if start_index >= 0 and start_index + offset <= len(sentence_chars):
                # Extract the word to be masked based on the start index and offset.
                word_to_mask = sentence_chars[start_index : start_index + offset]

                # Convert the characters back to a string.
                word_to_mask_str = ''.join(word_to_mask)

                # Add the extracted word to the list of masked words.
                masked_words.append(word_to_mask_str)

                # Mask the word in the original sentence characters list with a placeholder.
                for i in range(start_index, start_index + offset):
                    sentence_chars[i] = '*'  # Using '*' to denote masked characters.

    return masked_words

In [None]:
import pickle

In [None]:
with open("masking_sample_set.pkl","rb") as f:
    samples = pickle.load(f)


In [None]:
with open("gpt4-selected-words.pkl","rb") as f:
    ground = pickle.load(f)

In [None]:
samples[1]

In [None]:
ground[1]

In [None]:
w1, w2, w3 = 1,0,0

In [None]:
model_outputs, candidate_list = [], []
for i in samples:
    temp,candidates = get_masked_template(i,w1=w1, w2=w2, w3=w3)
    model_out = mask_words_in_sentence(i,temp)
    model_outputs.append(sorted(model_out))
    candidate_list.append(candidates)

In [None]:
fin_data, err_data = [],[]

In [None]:
for sent, ground_t, candidates, model_output in zip(samples, ground, candidate_list, model_outputs):
    gr_set, ca_set = set(ground_t), set(candidates)
    if gr_set.issubset(ca_set):
        fin_data.append((sent, ground_t, candidates, model_output))
    else:
        err_data.append((sent, ground_t, candidates, model_output))

In [None]:
len(fin_data)

In [None]:
len(err_data)

In [None]:
def calculate_f1_score(actual_keywords, predicted_keywords):
    """
    Calculate the F1 score based on lists of actual and predicted keywords.
    
    Parameters:
    actual_keywords (list of str): The list of actual keywords.
    predicted_keywords (list of str): The list of keywords predicted by the model.

    Returns:
    float: The F1 score.
    """

    # Convert lists to sets to eliminate duplicates and allow for set operations
    actual_set = set(actual_keywords)
    predicted_set = set(predicted_keywords)

    # Calculate true positive, false positive, and false negative
    true_positive = len(actual_set.intersection(predicted_set))
    false_positive = len(predicted_set.difference(actual_set))
    false_negative = len(actual_set.difference(predicted_set))

    # Calculate precision and recall
    precision = true_positive / (true_positive + false_positive) if true_positive + false_positive > 0 else 0
    recall = true_positive / (true_positive + false_negative) if true_positive + false_negative > 0 else 0

    # Calculate F1 score
    if precision + recall == 0:  # Handle the case where both precision and recall are zero
        f1_score = 0
    else:
        f1_score = 2 * (precision * recall) / (precision + recall)

    return precision, recall, f1_score


In [None]:
t = []
for s,g,c,m in fin_data:
    if len(g)!=len(m):
        continue
    t.append(calculate_f1_score(g,m))
p = [i[0] for i in t]
r = [i[1] for i in t]
f = [i[2] for i in t]
n = len(p)
x,y,z = sum(p)/n, sum(r)/n, sum(f)/n
print(x,y,z)

In [None]:
random_els = []
for s,g,c,m in fin_data:
    words = []
    for x in g:
        words.append(x)
    for x in range(len(c)-len(g)):
        words.append('0')
    random_els.append(random.sample(words, k=len(m)))

In [None]:
t = []
for val,r in zip(fin_data, random_els):
    s,g,c,m = val
    if len(g)!=len(r):
        print(g,r)
        continue
    t.append(calculate_f1_score(g,r))
p = [i[0] for i in t]
r = [i[1] for i in t]
f = [i[2] for i in t]
n = len(p)
x,y,z = sum(p)/n, sum(r)/n, sum(f)/n
print(x,y,z)

# Experiments

## 1.

0.33 0.33 0.33

0.6395476190476188 0.6080476190476188 0.6209790764790761

## 2.

0.5 0.5 0

0.6448809523809522 0.6128809523809522 0.6260822510822508

## 3

3 2 1

0.6498809523809522 0.6192142857142856 0.6317886002886