In [40]:
!pip install transformers #installing because we want to use pre_trained models, both from huggingface and our own models

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [41]:
import pickle
import torch
import re
import transformers
from transformers import RobertaTokenizer, RobertaForSequenceClassification, DebertaTokenizer, DebertaForSequenceClassification
from transformers import AutoModel
from torch.utils.data import TensorDataset, DataLoader
import random
from sklearn.model_selection import train_test_split
import numpy as np
from transformers import get_linear_schedule_with_warmup
from torch.optim import NAdam
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
import math

In [42]:
import spacy
!python -m spacy download en_core_web_lg

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting en-core-web-lg==3.5.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.5.0/en_core_web_lg-3.5.0-py3-none-any.whl (587.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m587.7/587.7 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_lg')


In [43]:
import warnings
warnings.filterwarnings('ignore', category=UserWarning) #ignoring UserWarning from colab because I know what I am doing here
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=DeprecationWarning)

In [44]:
from google.colab import drive
drive.mount('/content/drive', force_remount = True)
base_dir = '/content/drive/My Drive/ESC324projectdrive/dav/'

Mounted at /content/drive


In [45]:
def load_pickle(file_path):
    with open(file_path, 'rb') as f:
        data = pickle.load(f)
    return data

In [46]:
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)

nlp = spacy.load('en_core_web_lg', disable=['parser', 'ner'])
nlp.add_pipe('sentencizer')

rel_freq = load_pickle(base_dir + 'tag_dict.pickle')
print(rel_freq)

roberta_tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
deberta_tokenizer = DebertaTokenizer.from_pretrained('microsoft/deberta-base')



roberta_model = transformers.RobertaForSequenceClassification.from_pretrained(base_dir + 'trained_roberta_model')
deberta_model = transformers.DebertaForSequenceClassification.from_pretrained(base_dir + 'trained_deberta_model')  

{'': 0.0003436426116838488, "''": 0.0003436426116838488, ',': 0.0003436426116838488, '-LRB-': 0.0003436426116838488, '-RRB-': 0.0003436426116838488, '.': 0.0003436426116838488, ':': 0.0003436426116838488, 'CC': 0.0003436426116838488, 'CD': 0.0010309278350515464, 'DT': 0.0003436426116838488, 'EX': 0.0003436426116838488, 'FW': 0.0006872852233676976, 'HYPH': 0.0003436426116838488, 'IN': 0.0024054982817869417, 'JJ': 0.11134020618556702, 'JJR': 0.0013745704467353953, 'JJS': 0.0013745704467353953, 'LS': 0.0003436426116838488, 'MD': 0.0013745704467353953, 'NFP': 0.0003436426116838488, 'NN': 0.3518900343642612, 'NNP': 0.07835051546391752, 'NNPS': 0.0013745704467353953, 'NNS': 0.09725085910652921, 'PDT': 0.0003436426116838488, 'POS': 0.0003436426116838488, 'PRP': 0.0006872852233676976, 'PRP$': 0.0003436426116838488, 'RB': 0.06632302405498282, 'RBR': 0.0006872852233676976, 'RBS': 0.0003436426116838488, 'RP': 0.001718213058419244, 'TO': 0.0003436426116838488, 'UH': 0.002061855670103093, 'VB': 0.0

In [47]:
def sentence_processor(sentence):
  """Helper function to process the input sentence by fixing punctuation and POS tagging it"""
  unprocessed_sentence = []
  processed_sentence = []

  #remove extra spaces using rejex
  sentence = re.sub(' +', ' ', sentence)
  #replace ' m with 'm
  sentence = sentence.replace("' m", "'m")
  #replace ' s with 's
  sentence = sentence.replace("' s", "'s")
  #replace ' ve with 've
  sentence = sentence.replace("' ve", "'ve")
  #replace ' re with 're
  sentence = sentence.replace("' re", "'re")        
  #replace ' d with 'd
  sentence = sentence.replace("' d", "'d")
  #replace ' ll with 'll
  sentence = sentence.replace("' ll", "'ll")
  #replace ' t with 't
  sentence = sentence.replace("' t", "'t")
  #replace '' with "
  sentence = sentence.replace("''", "\"")
  
  #remove new line character at the end of the sentence
  sentence = sentence.strip()
  sentence = nlp(sentence)
  for sent in sentence.sents:
      for token in sent:
          #Write "/POS" after each token.
          #Split tokens with spaces.
          token.lemma_ = token.lemma_.lower()
          #replace pun with its lemma
          processed_sentence.append(token.lemma_ + "/" + token.tag_)
          unprocessed_sentence.append(token.text)
  return [processed_sentence, unprocessed_sentence]

In [48]:
def roberta_predictor(input_list, print_output):
  pos_sentence = input_list[0]
  sentence = input_list[1]
  max_len = 79 #dont change
  #creating an array of heuristic based on rel_freq mapping for each tag in pos_sentence
  rel_freq_heuristic = [0]
  for word in pos_sentence:
    rel_freq_heuristic.append(rel_freq.get(word.split("/")[1], 0))
  rel_freq_heuristic += [0]*(max_len - len(rel_freq_heuristic))
  rel_freq_heuristic = torch.tensor(rel_freq_heuristic)

  k_means_output = []

  encoded_data = roberta_tokenizer.encode_plus(
          sentence,
          add_special_tokens=True,
          max_length = max_len, #we are padding all sentences to a max_len elements (words + punction)
          pad_to_max_length = True,
          return_attention_mask = False,
          return_tensors='pt',
          truncation = True
      )

  temp = np.array(encoded_data['input_ids'][0].tolist().copy())
  temp = temp[temp != temp[-1]] #remove padding

  multiplier = 0.5 #you can change this value to affect how the test model works. This is a hyperparameter which changes the max number of clusters for kmeans
  #mutliplier has been set to 0.5 because RoBERTa was fine-tuned by us with this value
  num_clusters = max(math.ceil(len(temp)*multiplier), min(5, len(sentence)))

  kmeans = KMeans(n_clusters = num_clusters, n_init = 10)
  kmeans.fit(temp.reshape(-1, 1))
  att_labels = np.array(kmeans.labels_)
  k_means_output.append(kmeans.labels_)

  unique, count = (np.unique(att_labels, return_counts = True))

  idx = np.argsort(count)
  count = count[idx]
  unique = unique[idx]

  first_mode = unique[-1]
  attention_mask = torch.tensor([0] * max_len)
  indices = np.where(att_labels == first_mode)
  attention_mask[indices] = 1

  
  lone_word = unique[np.where(count == 1)]
  lone_word_ind = np.where(np.isin(att_labels, lone_word))[0]
  attention_mask[lone_word_ind] = 1

  if len(unique) > 1:
    second_lowest_chosen = unique[1]
    indices2 = np.where(att_labels == second_lowest_chosen)
    attention_mask[indices2] = 1
                                              
  input_id = torch.stack([encoded_data['input_ids'][0]], dim = 0)
  attention_mask = torch.stack([attention_mask], dim = 0)

  #use fine-tuned model
  model_output = roberta_model(input_id, attention_mask = attention_mask)
  model_output = torch.softmax(model_output["logits"].detach(), dim = 1) * rel_freq_heuristic
  model_pred = torch.argmax(model_output, dim = 1)
  model_confidence = torch.max(model_output, dim = 1)
  #updating some values
  if model_pred == 0:
    model_pred = 1
  pred_pun = sentence[model_pred.item() - 1]
  model_confidence = model_confidence[0][0].item()*100
  return pred_pun, model_confidence

In [49]:
def deberta_predictor(input_list, print_output):
  pos_sentence = input_list[0]
  sentence = input_list[1]
  max_len = 79 #dont change
  #creating an array of heuristic based on rel_freq mapping for each tag in pos_sentence
  rel_freq_heuristic = [0]
  for word in pos_sentence:
    rel_freq_heuristic.append(rel_freq.get(word.split("/")[1], 0))
  rel_freq_heuristic += [0]*(max_len - len(rel_freq_heuristic))
  rel_freq_heuristic = torch.tensor(rel_freq_heuristic)

  k_means_output = []

  encoded_data = deberta_tokenizer.encode_plus(
          sentence,
          add_special_tokens=True,
          max_length = max_len, #we are padding all sentences to a max_len elements (words + punction)
          pad_to_max_length = True,
          return_attention_mask = False,
          return_tensors='pt',
          truncation = True
      )

  temp = np.array(encoded_data['input_ids'][0].tolist().copy())
  temp = temp[temp != temp[-1]] #remove padding

  multiplier = 1 #you can change this value to affect how the test model works. This is a hyperparameter which changes the max number of clusters for kmeans
  #mutliplier has been set to 1 because deberta was fine-tuned by us with this value
  num_clusters = max(math.ceil(len(temp)*multiplier), min(5, len(sentence)))

  kmeans = KMeans(n_clusters = num_clusters, n_init = 10)
  kmeans.fit(temp.reshape(-1, 1))
  att_labels = np.array(kmeans.labels_)
  k_means_output.append(kmeans.labels_)

  unique, count = (np.unique(att_labels, return_counts = True))

  idx = np.argsort(count)
  count = count[idx]
  unique = unique[idx]

  first_mode = unique[-1]
  attention_mask = torch.tensor([0] * max_len)
  indices = np.where(att_labels == first_mode)
  attention_mask[indices] = 1
  
  lone_word = unique[np.where(count == 1)]
  lone_word_ind = np.where(np.isin(att_labels, lone_word))[0]
  attention_mask[lone_word_ind] = 1

  if len(unique) > 1:
    second_lowest_chosen = unique[1]
    indices2 = np.where(att_labels == second_lowest_chosen)
    attention_mask[indices2] = 1
                                              
  input_id = torch.stack([encoded_data['input_ids'][0]], dim = 0)
  attention_mask = torch.stack([attention_mask], dim = 0)

  #use fine-tuned model
  model_output = deberta_model(input_id, attention_mask = attention_mask)
  model_output = torch.softmax(model_output["logits"].detach(), dim = 1) * rel_freq_heuristic
  model_pred = torch.argmax(model_output, dim = 1)
  model_confidence = torch.max(model_output, dim = 1)
  #updating some values
  if model_pred == 0:
    model_pred = 1
  pred_pun = sentence[model_pred.item() - 1]
  model_confidence = model_confidence[0][0].item()*100
  return pred_pun, model_confidence

In [50]:
def PunPal(input_sentence = "", model_name = "ensemble", print_output = True):
  """This function acts as an API to test the models trained by us on any input sentence.
  Input:
      input_sentence: <your sentence>
      model_name: roberta / deberta / ensemble
      print_output (optional): True / False
  Output:
      model_pred: Predicted pun word
      model_confidence: Confidence rate of a word being a pun
  """
  if len(input_sentence) == 0:
    print("Invalid sentence")
    return None, None

  #POS tagging
  output_list = sentence_processor(input_sentence)
  
  roberta_pred, roberta_conf = roberta_predictor(output_list, print_output)
  deberta_pred, deberta_conf = deberta_predictor(output_list, print_output)
  
  if print_output:
    print("RoBERTa predicts that the Pun word is '", roberta_pred, "' with confidence = ", roberta_conf, "%")
    print("DeBERTa predicts that the Pun word is '", deberta_pred, "' with confidence = ", deberta_conf, "%")
  
  if model_name == "roberta":
    return roberta_pred, roberta_conf

  elif model_name == "deberta":
    return deberta_pred, deberta_conf
  
  elif model_name == "ensemble":
    if roberta_conf > deberta_conf:
      return roberta_pred, roberta_conf
    else:
      return deberta_pred, deberta_conf
  else:
    print("Invalid Model name")
    return None, None

In [51]:
#model = roberta / deberta / ensemble
#input = <your sentence>
#print_output (optional) = True / False

model = "roberta"
input = "When the glassblower inhaled he got a pane in the stomach."
pred, conf = PunPal(input_sentence = input, model_name = model)

RoBERTa predicts that the Pun word is ' pane ' with confidence =  35.05114158813896 %
DeBERTa predicts that the Pun word is ' pane ' with confidence =  34.76151194359429 %
