In [3]:
import os, random, sys, copy
import torch, torch.nn as nn, numpy as np
from tqdm.notebook import tqdm
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from nltk.tokenize import word_tokenize
from sklearn.metrics import f1_score

from transformers import AutoTokenizer, AutoModel
import pandas as pd
import emoji

## Data Preprocessing

In this section, we load and process data using the `SarcasmDataset` class. To create a `SarcasmDataset`, input the path of the data csv file and the tokenizer. Later use pytorch to crate a dataloader for the dataset (in the main script).

In [4]:
class SarcasmDataset(torch.utils.data.Dataset):
    def __init__(self, data_path, tokenizer, max_len):
        ''' 
        data_path: path to csv file
        tokenizer: tokenizer to use, likely load from AutoTokenizer
        max_len: max length of input sequence
        '''
        self.data_path = data_path
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.data = self.load_data()

    def load_data(self):
        # use pandas to read csv file
        df = pd.read_csv(self.data_path)
        # only need the 2nd and 3rd col (text, label)
        df = df.iloc[:, 1:3]
        # replace nan with empty string
        df = df.fillna('')
        # convert to np array
        data = df.values
        # convert posible emoji to text
        data = [[emoji.demojize(text), label] for text, label in data]
        return data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        '''
        Convert text to tokens, add special tokens, and create attention mask
        return: input_ids, attention_mask, label
        '''
        text, label = self.data[idx]
        tokens = self.tokenizer.tokenize(text)
        tokens = ['[CLS]'] + tokens + ['[SEP]']
        if len(tokens) < self.max_len:
            tokens = tokens + ['[PAD]' for _ in range(self.max_len - len(tokens))]
        else:
            tokens = tokens[:self.max_len - 1] + ['[SEP]']
        input_ids = self.tokenizer.convert_tokens_to_ids(tokens)
        attention_mask = [1 if token != '[PAD]' else 0 for token in tokens]
        return torch.tensor(input_ids), torch.tensor(attention_mask), torch.tensor(label)

## Model

In this section, we define the model. We will use the ensemble method, which would use multiple models and combine their outputs to get the final prediction. The models we will use are: 1. `Custom LSTM model` 2. `RoBERTa model` 3. `BERT model` (the 2nd and 3rd model are pretained, so can be change in the future to other pretrained models if needed).

The custom LSTM model is a simple LSTM model is defined in `CustomLSTM` class. The pretrained models are defined in `PretrainedModelPlus` class, which can take in any pretrained model and add a hidden layer and output layer on top of it. 

### Architecture

#### Custom LSTM

- Embedding layer: 50d GloVe embedding
- LSTM layer: bidirectional LSTM 
- Hidden layer: two linear layers with non-linear activation function
- Output layer: linear with output size 1

#### RoBERTa

- RoBERTa model: pretrained RoBERTa model, freezed the weights
- Hidden layer: two linear layers with non-linear activation function
- Output layer: linear with output size 1

#### BERT

- BERT model: pretrained BERT model, freezed the weights
- Hidden layer: two linear layers with non-linear activation function
- Output layer: linear with output size 1


### Ensembling

The models are trained separately and the outputs are combined using combined probability. This is implemented in the predict function.


In [None]:
class CustomLSTM(nn.Module):
  def __init__(self, embedding_matrix, lstm_hidden_size=50, num_lstm_layers=1, bidirectional=True, activation='ReLU'):
    """
    Initalizes the overall structure of the Sarcasm Model

    param embedding_matrix: matrix of pretrained Glove embeddings (dataset doesn't come with vocab so this is easier, but may require extra processing for emojis)
    param lstm_hidden_size: size of the hidden layer of the lstm
    param num_lstm_layers: number of lstm layers
    param bidirectional: whether the sentence embedding is bidirectional or not
    param activation: the final activation functino to be applied
    """
    super().__init__()
    self.embedding = nn.Embedding.from_pretrained(torch.FloatTensor(embedding_matrix))
    self.lstm = nn.LSTM(input_size = embedding_matrix.shape[1],
                            hidden_size = lstm_hidden_size,
                            num_layers = num_lstm_layers,
                            bidirectional = bidirectional,
                            batch_first = True)
    self.hidden_1 = nn.Linear(lstm_hidden_size * 2, lstm_hidden_size)
    self.hidden_2 = nn.Linear(lstm_hidden_size, 1)
    self.activation_function = nn.ReLU()

  def forward(self, input_batch, input_lengths):
    embedded_input = self.embedding(input_batch)

In [24]:
class PretrainedModelPlus(nn.Module):
    def __init__(self, pretrained_model, num_classes, linear_layer_size):
        super().__init__()
        self.pretrained_model = pretrained_model
        # Add a linear layer on top of the pretrained model
        self.linear = nn.Linear(self.pretrained_model.config.hidden_size, linear_layer_size)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(linear_layer_size, num_classes)
        # Add a sigmoid layer to get the probabilities
        self.sigmoid = nn.Sigmoid()
        # Define the loss function
        self.loss = nn.BCELoss()
        # Define the number of classes
        self.num_classes = num_classes

        # Freeze the pretrained model
        for param in self.pretrained_model.parameters():
            param.requires_grad = False

    def forward(self, x, attention_mask, labels):
        pretrained_outputs = self.pretrained_model(input_ids=x, attention_mask=attention_mask)
        linear_outputs = self.linear(pretrained_outputs.pooler_output)
        activation_outputs = self.relu(linear_outputs)
        output = self.linear2(activation_outputs)
        probs = self.sigmoid(output)
        loss = self.loss(probs.view(-1), labels.float())
        return loss, probs

In [2]:
def trian(model, dataloader, epochs=5, learning_rate=1e-5):
    ''' Train a model
    model: model to train
    dataloader: data loader to use
    epochs: number of epochs to train
    learning_rate: learning rate to use
    return: trained model
    '''
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    for epoch in range(epochs):
        model.train()
        for input_ids, attention_mask, labels in dataloader:
            loss, probs = model(input_ids, attention_mask, labels)
            loss.backward()
            optimizer.step()
        print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")  
    return model

In [18]:
def predict(model1, model2, model3, data_loader):
    ''' Combine the predictions of 3 models
    model1, model2, model3: models to use
    data_loader: data loader to use
    return: list of predictions
    '''
    preds = []
    for input_ids, attention_mask, labels in data_loader:
        loss1, probs1 = model1(input_ids, attention_mask, labels)
        loss2, probs2 = model2(input_ids, attention_mask, labels)
        loss3, probs3 = model3(input_ids, attention_mask, labels)
        probs = (probs1 + probs2 + probs3) / 3
        pred = 1 if probs > 0.5 else 0
        preds.append(pred)
    return preds

## Evaluation Matrics

We use the f1 score as the evaluation matrics.

In [None]:
def evaluate(test_file, model_generated_file):
  '''
  Inputs a test file and file generated by the model and returns the f1 score using f1_score from sklearn.metrics
  :param test_file: csv of shape(num_samples, num_classifications)
  :param model_generated_file: csv of shape(num_samples, num_classifications)
  :return: f1_score of test_file and model_generated_file of shape(1)
  '''
  arr1 = np.loadtxt("test_file",delimiter=",", dtype=str) # from https://www.geeksforgeeks.org/how-to-read-csv-files-with-numpy/#
  arr2 = np.loadtxt("model_generated_file",delimiter=",", dtype=str)

  return f1_score(arr1, arr2)

## Main Script

**Instructions for running the main script:**

1. Download the data from [here](https://github.com/iabufarha/iSarcasmEval).

2. Create the dataset and dataloader for each of the models.

3. Create and Train model

4. Predict and evaluate f1 score on test set




In [21]:
# Create dataset #2
dataset_roberta = SarcasmDataset(data_path='iSarcasmEval/train/train.En.csv',
                                 tokenizer=AutoTokenizer.from_pretrained('roberta-base'),
                                 max_len=128)
# Create data loader #2
dataloader_roberta = torch.utils.data.DataLoader(dataset_roberta, batch_size=32, shuffle=True)

# Create dataset #3
dataset_bert = SarcasmDataset(data_path='iSarcasmEval/train/train.En.csv',
                              tokenizer=AutoTokenizer.from_pretrained('bert-base'),
                              max_len=128)
# Create data loader #3
dataloader_bert = torch.utils.data.DataLoader(dataset_bert, batch_size=32, shuffle=True)


In [22]:
# Some sanity checks
assert len(dataset_roberta) == 3468
assert len(dataset_bert) == 3468

In [None]:
# Create models
model2 = PretrainedModelPlus(pretrained_model=AutoModel.from_pretrained('roberta-base'), num_classes=1, linear_layer_size=100)
model3 = PretrainedModelPlus(pretrained_model=AutoModel.from_pretrained('bert-base'), num_classes=1, linear_layer_size=100)

In [None]:
# Train models
model2 = trian(model2, dataloader_roberta, epochs=5, learning_rate=1e-5)
model3 = trian(model3, dataloader_bert, epochs=5, learning_rate=1e-5)

In [None]:
# Test on test set
test_dataset = SarcasmDataset(data_path='iSarcasmEval/test/test.En.csv',
                                tokenizer=AutoTokenizer.from_pretrained('bert-base'),
                                max_len=128)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=True)

# Get predictions
preds = predict(model1, model2, model3, test_dataloader)

# F1 score
f1_score = evaluate('iSarcasmEval/test/test.En.csv', preds)