In [1]:
from sentence_transformers import SentenceTransformer, util
from nltk.tokenize import word_tokenize
from spacy.matcher import Matcher
import spacy
import random
import pandas as pd
from tqdm import tqdm

2023-03-30 00:32:20.532429: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-30 00:32:34.633731: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-03-30 00:32:34.634203: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
2023-03-30 00:32:54.045157: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudnn.so.8'; dlerror: libc

### Semantic Weights

In [2]:
model = SentenceTransformer('all-MiniLM-L6-v2')

In [3]:
def get_semantic_weights(sentence, matcher):

    doc = nlp(sentence)
    matches = matcher(doc)
    
    chunks = []

    for match in matches:
        match_id, start, end = match
        string_id = nlp.vocab.strings[match_id]
        span = doc[start:end]
        # print(span.text)
        chunks.append((start, end))

    s_embedding = model.encode(sentence, convert_to_tensor=True)
    
    weights = []
    for chunk in chunks:
        
        start, end = chunk
        chunk_phrase = doc[start:end]
        
        new_sent = ' '.join([doc[:start].text, doc[end:].text])
        # sentence.replace(chunk, '')
        new_embedding = model.encode(new_sent, convert_to_tensor=True)
        # print(chunk_phrase, ": ", new_sent)
        cosine_score = util.cos_sim(s_embedding, new_embedding)
        weights.append(((start, end), 1-cosine_score.cpu().squeeze().numpy()))
        
    total = sum([score for (chunk, score) in weights])
    weights = [(chunk, score/total) for (chunk, score) in weights]

    weights.sort(key = lambda x : x[1], reverse = True)
    return doc, weights

### Chunk extraction

In [4]:
nlp = spacy.load("en_core_web_trf")

pattern1 = [{'POS': 'ADV', 'OP': '*'}, {'POS': 'ADJ', 'OP': '*'}]
pattern2 = [{'POS': 'PDT', 'OP': '?'}, {'POS': 'DET', 'OP': '?'},{'POS': 'ADJ', 'OP': '*'},
           {"POS": {"IN": ["NOUN", "PROPN"]}, 'OP': '*'}]
pattern3 = [{'POS': 'PDT', 'OP': '?'},{'POS': 'DET', 'OP': '?'},{'POS': 'ADV', 'OP': '*'},
           {'POS': 'VERB'}]


matcher = Matcher(nlp.vocab)
matcher.add("pattern",[pattern1,pattern2,pattern3])

In [5]:
with open('test.txt', 'r') as f:
    data = f.readlines()

In [6]:
idx = random.randint(0, len(data))
sample = data[idx]
print(sample)

What did the fish say when he hit a wall? Dam.



In [7]:
doc, weights = get_semantic_weights(sample, matcher)
for (start, end), weight in weights:
    print((start,end), doc[start:end], ":", weight)

(17, 18) eating : 0.25141154590183307
(2, 3) friend : 0.23998979890151587
(1, 3) a friend : 0.20324643739075807
(6, 7) problems : 0.12165270235807099
(16, 17) tried : 0.0853775961484633
(3, 4) tells : 0.07110016359078666
(1, 2) a : 0.02722175570857203


In [10]:
def get_masked_template(sentence, matcher, masking_type='upto_tau', tau=0.4, n=0.5):
    
    doc, weights = get_semantic_weights(sentence, matcher)
    
    # Masking sentence chunks till words with upto tau cumulative weight
    # have been masked, ignoring overlapping chunks
    cumulative_weight = 0
    masked = [0 for _ in range(len(doc))]
    
    if masking_type == 'upto_tau':
        marker = 1
        for (start, end), weight in weights:

            if cumulative_weight + weight >= tau:
                break

            marked = False
            for j in range(start, end):
                if masked[j] == 0:
                    masked[j] = marker
                    marked = True

            if marked:
                marker += 1
                cumulative_weight += weight
    
    elif masking_type == 'top_n':
        
        num_tokens_to_mask = n * len(doc)
        num_masked = 0
        
        marker = 1
        for (start, end), weight in weights:
            marked = False
            for j in range(start, end):
                if masked[j] == 0:
                    masked[j] = marker
                    marked = True

            if marked:
                marker += 1
                num_masked += end-start
            
            if num_masked >= num_tokens_to_mask:
                break
            
    
    # Building target template
    sent = []
    maskon = 0
    for i in range(len(masked)):
        elem = masked[i]
        if elem == 0:
            sent.append(doc[i])
            maskon = 0
        else:
            if elem == maskon:
                continue
            maskon = elem
            sent.append('[MASK]')
            
    return ''.join([token.text_with_ws if type(token) != str else token + ' ' for token in sent])

In [21]:
idx = random.randint(0, len(data))
sample = data[idx]
print(sample)

Why didn't the sun go to college? Because it had a million degrees.



In [26]:
template = get_masked_template(sample, matcher, masking_type='top_n', tau=0.4, n=0.6)
print(template)

Why didn't [MASK] [MASK] to [MASK] ? Because it [MASK] [MASK] million [MASK] .



In [11]:
# template_data = {'Joke':[], 'Template':[]}
# for joke in tqdm(data):
#     template = get_masked_template(joke, matcher, tau=0.45)
#     template_data['Joke'].append(joke)
#     template_data['Template'].append(template)

In [12]:
# df = pd.DataFrame(template_data)

In [13]:
# df.to_csv('test_templates.csv')

### Mask Filling

In [14]:
from transformers import pipeline
from transformers import AutoModelForMaskedLM, DistilBertForMaskedLM
from transformers import AutoTokenizer, DistilBertTokenizer

In [15]:
filling_model = DistilBertForMaskedLM.from_pretrained('./finetuned-distilbert')
filling_tokenizer = DistilBertTokenizer.from_pretrained('./finetuned-distilbert/')

In [16]:
mask_filler = pipeline("fill-mask", model=filling_model, tokenizer=filling_tokenizer)

In [17]:
filled = template
while '[MASK]' in filled:
    try:
        filled = mask_filler(filled)[0][0]['sequence']
        filled = filled.replace('[CLS]', '')
        filled = filled.replace('[SEP]', '')
    except:
        filled = mask_filler(filled)[0]['sequence']
        filled = filled.replace('[CLS]', '')
        filled = filled.replace('[SEP]', '')
    print(filled)

 Because you will do anything for [MASK] [MASK] of [MASK]. 
 Because you will do anything for a [MASK] of [MASK]. 
 Because you will do anything for a lot of [MASK]. 
Because you will do anything for a lot of money.


In [22]:
from transformers import pipeline
mask_filler = pipeline(task="fill-mask", model="./finetuned-distilbert")

def fill_in_the_blanks(sent, model, tokenizer, mask_filler):
  # print(sent)
  sent = sent.replace("[MASK]",f"{tokenizer.mask_token}")
  c = sent.count(f"{tokenizer.mask_token}")
  for i in range(c):
    s_embedding = model.encode(sent, convert_to_tensor=True)
    sent = mask_filler(sent)
    min_cosine_score = 1
    for mask_candidates in sent:
      if type(mask_candidates) is list:
        for replacement in mask_candidates:
          new_sent = replacement['sequence']
          n_embedding = model.encode(new_sent, convert_to_tensor=True)
          cosine_score = util.cos_sim(s_embedding, n_embedding)
          if cosine_score < min_cosine_score:
            min_cosine_score = cosine_score
            best_candidate = new_sent
      else:
        new_sent = mask_candidates['sequence']
        n_embedding = model.encode(new_sent, convert_to_tensor=True)
        cosine_score = util.cos_sim(s_embedding, n_embedding)
        if cosine_score < min_cosine_score:
          min_cosine_score = cosine_score
          best_candidate = new_sent
    sent = best_candidate
  # print(sent)
  return sent

In [28]:
joke_data = {'type':[], 'body':[], 'template':[]}

for _ in tqdm(range(500)):
    idx = random.randint(0, len(data))
    sample = data[idx]
    joke_data['type'].append('human')
    joke_data['body'].append(sample)
    joke_data['template'].append('N/A')
    
for _ in tqdm(range(500)):
    idx = random.randint(0, len(data))
    sample = data[idx]
    template = get_masked_template(sample, matcher, masking_type='top_n', tau=0.4, n=0.3)
    generated = fill_in_the_blanks(template, model, filling_tokenizer, mask_filler)
    joke_data['type'].append('generated')
    joke_data['body'].append(generated)
    joke_data['template'].append(template)
    

100%|██████████████████████████████████████████| 500/500 [00:00<00:00, 331042.15it/s]
100%|██████████████████████████████████████████████| 500/500 [06:42<00:00,  1.24it/s]


In [29]:
eval_df = pd.DataFrame(joke_data)

In [31]:
eval_df.to_csv('human_eval_jokes.csv')

In [150]:
idx = random.randint(0, len(data))
sample = data[idx]
print(sample)

template = get_masked_template(sample, matcher, masking_type='top_n', tau=0.4, n=0.3)
print(template)

fill_in_the_blanks(template, model, filling_tokenizer, mask_filler)

What do you call a tea blend that was deceptive about its ingredients? A poly tea sham.

What do you call a tea blend that was deceptive about its ingredients? [MASK] [MASK] .

What do you call a tea blend that was deceptive about its ingredients? [MASK] [MASK] .

What do you call a tea blend that was deceptive about its ingredients? Orange cream.


'What do you call a tea blend that was deceptive about its ingredients? Orange cream.'