In [None]:
try:
    from google.colab import drive
    drive.mount('/content/drive')
    data_dir = '/content/drive/MyDrive/data/task-1'
    bert_dir = '/content/drive/MyDrive/BERT'
    roberta_dir = '/content/drive/MyDrive/RoBERTa'
except:
    data_dir = './data/'
    bert_dir = './BERT/BERT'
    roberta_dir = './RoBERTa/RoBERTa'

In [None]:
!pip install ekphrasis

In [None]:
!pip install transformers

In [None]:
!python -m spacy download en_core_web_lg

### Setting up

In [None]:
# Imports

import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import codecs
import re
import spacy
import nltk
import torch.nn.functional as F
import matplotlib.pyplot as plt
import seaborn as sns

from tqdm import tqdm
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from torch.utils.data import Dataset, random_split
from ekphrasis.classes.preprocessor import TextPreProcessor
from ekphrasis.classes.tokenizer import SocialTokenizer
from transformers import BertForSequenceClassification, BertTokenizer, RobertaForSequenceClassification, RobertaTokenizer

In [None]:
# Setting random seed and device
SEED = 1

torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

use_cuda = torch.cuda.is_available()
device = torch.device("cuda:0" if use_cuda else "cpu")

### Loading the data

In [None]:
# Load data
test_df = pd.read_csv(f'{data_dir}/test.csv')

In [None]:
nlp = spacy.load('en_core_web_lg')

In [None]:
test_df.head()

### Preprocessing functions

In [None]:
def capitalisation_by_ner(sentence, entities=['GPE', 'ORG', 'NORP', 'PERSON']):
    edited_row = []

    trial_doc = nlp(sentence)
        
    for tok in trial_doc:
        if tok.ent_type_ in entities:
            edited_row.append(tok.text)
        else:
            edited_row.append(tok.text.lower())
    
    return ' '.join(edited_row)

In [None]:
# Word replacement
# Join the contractions
# Tokenize
# remove stop words
# remove punct EXCEPT ! ? #
# Twitter handles

def preprocessor(df):
    _df = pd.DataFrame(index=df.index, columns=['edited_sentences', 'meanGrade'])

    _df['meanGrade'] = df.meanGrade

    text_processor = TextPreProcessor(
        fix_html=True,  # fix HTML tokens

        # corpus from which the word statistics are going to be used 
        # for word segmentation 
        segmenter="english", 

        # corpus from which the word statistics are going to be used 
        # for spell correction
        corrector="english", 

        unpack_hashtags=False,  # perform word segmentation on hashtags
        unpack_contractions=False,  # Unpack contractions (can't -> can not)
        spell_correct=True,  # spell correction for elongated words
    )
    
    punct = "[\.,:;\(\)\[\]@\-\$£]"
    
    nltk.download('stopwords')
    stops = stopwords.words('english')

    # Word replacement + join the contractions
    # NOTE: need to deal with ' '
    # NOTE: Numbers/digits have not been removed
    # NOTE: We have removed all stop words. We analysed the sentiment of the stop 
    # words in the training set to determine if removing them would negatively 
    # affect our results. The motivation for this check was that any word with a 
    # sentiment would affect the funniness score of the sentence. 
    # Since stop words have no sentiment, they have been removed
    # This doesn't retain any twitter handles, but retains the hashtags

    _df['edited_sentences'] = df[['original', 'edit']] \
        .apply(lambda x: re.subn("<.*/>", x[1], x[0])[0], axis=1) \
        .apply(lambda x: capitalisation_by_ner(x)) \
        .str.replace(" (?P<one>\w*'\w+)", lambda x: x.group("one")) \
        .apply(lambda x: text_processor.pre_process_doc(x)) \
        .str.replace("#", "# ") \
        .str.replace("[‘’]", "'") \
        .str.replace("'s", "") \
        .str.replace(punct, "") \
        .apply(lambda x: " ".join([w for w in x.split(" ") if w not in stops])) \
        .str.replace("[0-9]", "")

    return _df

### Setting up the models and the evaluation functions

In [None]:
def model_eval(data_loader, model):
  model.eval()
  preds = []
  targets = []
  rmse = 0
  model = model.to(device)

  with torch.no_grad():
    for batch in data_loader:
          input_ids = batch['input_ids'].to(device)
          attention_mask = batch['attention_mask'].to(device)
          labels = batch['labels'].to(device)
          outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
          preds.extend(outputs.logits.squeeze(1).detach().cpu().numpy())
          targets.extend(labels.detach().cpu().numpy())

  preds = np.array(preds)
  targets = np.array(targets)
  print(preds, targets)
  model_performance(preds, targets, print_output=True)
  return preds, targets

In [None]:
# How we print the model performance
def model_performance(output, target, print_output=False):
    """
    Returns SSE and MSE per batch (printing the MSE and the RMSE)
    """

    sq_error = (output - target)**2

    sse = np.sum(sq_error)
    mse = np.mean(sq_error)
    rmse = np.sqrt(mse)

    if print_output:
        print(f'| MSE: {mse:.2f} | RMSE: {rmse:.2f} |')

    return sse, mse

In [None]:
class Task1Dataset(Dataset):

    def __init__(self, train_data, labels):
        self.x_train = train_data
        self.y_train = labels

    def __len__(self):
        return len(self.y_train)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.x_train.items()}
        item['labels'] = torch.tensor(self.y_train[idx], dtype=torch.float)
        return item

In [None]:
clean_test_df = preprocessor(test_df)

In [None]:
test_data = clean_test_df['edited_sentences']

In [None]:
bert_model = BertForSequenceClassification.from_pretrained(bert_dir)

In [None]:
roberta_model = RobertaForSequenceClassification.from_pretrained(roberta_dir)

In [None]:
tokenizer_bert = BertTokenizer.from_pretrained('bert-base-uncased')
tokenizer_roberta = RobertaTokenizer.from_pretrained('roberta-base')

### Evaluating the BERT model on unseen test data

In [None]:
test_X = tokenizer_bert(test_data.to_list(), add_special_tokens=False, padding=True, return_tensors="pt")

In [None]:
test_dataset = Task1Dataset(test_X, test_df['meanGrade'])

In [None]:
bert_model = bert_model.to(device)

In [None]:
BATCH_SIZE = 32
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE)

print("Dataloaders created.")


In [None]:
predictions, target = model_eval(test_loader, bert_model)
model_performance(predictions, target, print_output=True)

In [None]:
clean_test_df['predictions_bert'] = predictions

In [None]:
fig, (ax1, ax2) = plt.subplots(ncols=2, figsize=(20, 10))

 
fig.suptitle('Final Test Dataset Analysis')
sns.boxplot(x='meanGrade', y='predictions_bert', data=clean_test_df, ax=ax1)
sns.scatterplot(x='meanGrade', y='predictions_bert', data=clean_test_df, ax=ax2)
plt.show()

### Evaluating the RoBERTa model on unseen test data

In [None]:
test_X_roberta = tokenizer_roberta(test_data.to_list(), add_special_tokens=False, padding=True, return_tensors="pt")

In [None]:
test_dataset_roberta = Task1Dataset(test_X_roberta, test_df['meanGrade'])

In [None]:
roberta_model = roberta_model.to(device)

In [None]:
BATCH_SIZE = 32
test_loader_roberta = torch.utils.data.DataLoader(test_dataset_roberta, batch_size=BATCH_SIZE)

print("Dataloaders created.")


In [None]:
predictions_r, target_r = model_eval(test_loader_roberta, roberta_model)
model_performance(predictions_r, target_r, print_output=True)

In [None]:
clean_test_df['predictions_roberta'] = predictions_r
clean_test_df.head()

In [None]:
fig, (ax1, ax2) = plt.subplots(ncols=2, figsize=(20, 10))

 
fig.suptitle('Final Test Dataset Analysis for RoBERTa model')
sns.boxplot(x='meanGrade', y='predictions_roberta', data=clean_test_df, ax=ax1)
sns.scatterplot(x='meanGrade', y='predictions_roberta', data=clean_test_df, ax=ax2)
plt.show()