In [None]:
import pandas as pd
import time, datetime, numpy as np

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import torch
from torch.utils.data import Dataset, DataLoader

import torch.nn as nn

from google.colab import drive

import requests
import tensorflow as tf
!pip install tensorflow-text==2.15.*
import tensorflow_text as tf_text
!pip install transformers
from transformers import AdamW, get_linear_schedule_with_warmup
from transformers import BertTokenizer, BertForSequenceClassification
import torchtext


drive.mount('/content/drive')

df = pd.read_csv('/content/drive/MyDrive/CS4248/dataset/esnli_train.csv').dropna
test = pd.read_csv('/content/drive/MyDrive/CS4248/dataset/esnli_test.csv').dropna

Utility fns

In [None]:
def format_time(elapsed):
    '''
    Takes a time in seconds and returns a string hh:mm:ss
    '''
    elapsed_rounded = int(round((elapsed)))
    return str(datetime.timedelta(seconds=elapsed_rounded))

def select_cols(df, col_list):
    '''
    Select columns from a dataframe
    '''
    return df[col_list]

def combine_sentences(df, col_list):
    
    results_df = df.copy()
    results_df['combined_text'] = '[CLS]' + results_df[col_list].astype(str).agg('[SEP]'.join, axis=1)
    return results_df

In [None]:
target_cols = ['Sentence1', 'Sentence2', 'gold_label']
target_cols2 = ['Sentence1', 'Sentence2']

df = select_cols(df, target_cols)
test_df = select_cols(test, target_cols)

lables = {
    'entailment': 0,
    'neutral': 1,
    'contradiction': 2
}

df['labels'] = df['gold_label'].map(lables)
test_df['labels'] = test_df['gold_label'].map(lables)

X = select_cols(df, target_cols2)
y = df['labels']

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.1, random_state=42)

X_test = select_cols(test_df, target_cols2)
y_test = test_df['labels']

class NliDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# use if encodings already previously saved
# train_encodings_1 = torch.load('/content/drive/MyDrive/CS4248/encodings/base_train_encodings1.pt')
# train_encodings_2 = torch.load('/content/drive/MyDrive/CS4248/encodings/base_train_encodings2.pt')
# train_encodings_3 = torch.load('/content/drive/MyDrive/CS4248/encodings/base_train_encodings3.pt')
# test_encodings_1 = torch.load('/content/drive/MyDrive/CS4248/encodings/base_test_encodings1.pt')
# test_encodings_2 = torch.load('/content/drive/MyDrive/CS4248/encodings/base_test_encodings2.pt')
# test_encodings_3 = torch.load('/content/drive/MyDrive/CS4248/encodings/base_test_encodings3.pt')

train_encodings_1 = tokenizer(X_train['Sentence1'].astype(str).tolist(), truncation=True, padding=True)
train_encodings_2 = tokenizer(X_train['Sentence2'].astype(str).tolist(), truncation=True, padding=True)
train_encodings_3 = tokenizer(X_train['Explanation_1'].astype(str).tolist(), truncation=True, padding=True)
test_encodings_1 = tokenizer(X_test['Sentence1'].astype(str).tolist(), truncation=True, padding=True)
test_encodings_2 = tokenizer(X_test['Sentence2'].astype(str).tolist(), truncation=True, padding=True)
test_encodings_3 = tokenizer(X_test['Explanation_1'].astype(str).tolist(), truncation=True, padding=True)

train_dataset_1 = NliDataset(train_encodings_1, y_train.tolist())
train_loader_1 = DataLoader(train_dataset_1, batch_size = 16)
train_dataset_2 = NliDataset(train_encodings_2, y_train.tolist())
train_loader_2 = DataLoader(train_dataset_2, batch_size = 16)
train_dataset_3 = NliDataset(train_encodings_3, y_train.tolist())
train_loader_3 = DataLoader(train_dataset_3, batch_size = 16)

test_dataset_1 = NliDataset(test_encodings_1, y_test.tolist())
test_dataset_2 = NliDataset(test_encodings_2, y_test.tolist())
test_dataset_3 = NliDataset(test_encodings_3, y_test.tolist())
test_loader_1 = DataLoader(test_dataset_1, batch_size = 16)
test_loader_2 = DataLoader(test_dataset_2, batch_size = 16)
test_loader_3 = DataLoader(test_dataset_3, batch_size = 16)

In [None]:
torch.save({key: torch.tensor(val) for key, val in train_encodings_1.items()}, '/content/drive/MyDrive/CS4248/encodings/base_train_encodings1.pt')
torch.save({key: torch.tensor(val) for key, val in train_encodings_2.items()}, '/content/drive/MyDrive/CS4248/encodings/base_train_encodings2.pt')
torch.save({key: torch.tensor(val) for key, val in train_encodings_3.items()}, '/content/drive/MyDrive/CS4248/encodings/base_train_encodings3.pt')

torch.save({key: torch.tensor(val) for key, val in test_encodings_1.items()}, '/content/drive/MyDrive/CS4248/encodings/base_test_encodings1.pt')
torch.save({key: torch.tensor(val) for key, val in test_encodings_2.items()}, '/content/drive/MyDrive/CS4248/encodings/base_test_encodings2.pt')
torch.save({key: torch.tensor(val) for key, val in test_encodings_3.items()}, '/content/drive/MyDrive/CS4248/encodings/base_test_encodings3.pt')

MODEL ARCHITECTURES


In [None]:
class BiLSTMEncoder(nn.Module):

  def __init__(self, hidden_dim, maxpool=False, batch_size=64):

    super(BiLSTMEncoder, self).__init__()

    self.batch_size = batch_size
    self.hidden_dim = hidden_dim
    self.maxpool = maxpool
    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    self.emb_dim = 300

    self.embedding = nn.Embedding(30000, self.emb_dim)
    self.linear = nn.Linear(self.emb_dim, self.hidden_dim)
    self.relu = nn.ReLU()
    self.projection = nn.Sequential(self.embedding, self.linear, self.relu)

    self.lstm = nn.LSTM(input_size=self.hidden_dim,
                        hidden_size=self.hidden_dim,
                        bidirectional=True,
                        batch_first=True)

  def forward(self, x):

    lengths = [len(sent) for sent in x]
    x = self.projection(x)

    h0 = torch.zeros(2, x.shape[0], self.hidden_dim).to(self.device)
    c0 = torch.zeros(2, x.shape[0], self.hidden_dim).to(self.device)

    x = nn.utils.rnn.pack_padded_sequence(x, lengths, batch_first=True)
    padded_output, _ = self.lstm(x, (h0, c0))
    output, _ = nn.utils.rnn.pad_packed_sequence(padded_output,
                                                 batch_first=True)
    
    max_vecs = [torch.max(x, 0)[0] for x in output]
    embed = torch.stack(max_vecs, 0)

    return embed

###############################################################################
## Classifier MLP model
###############################################################################
class Classifier(nn.Module):

  def __init__(self, input_dim, hidden_dim, out_dim):

    super(Classifier, self).__init__()
    self.lin1 = nn.Linear(input_dim, hidden_dim)
    self.lin2 = nn.Linear(hidden_dim, hidden_dim)
    self.lin3 = nn.Linear(hidden_dim, out_dim)
    self.relu = nn.ReLU()

    self.net = nn.Sequential(self.lin1, self.relu, self.lin2, self.relu,
                             self.lin3)

  def forward(self, premise, hypothesis, explanation):
    combined = torch.cat((premise, hypothesis, torch.abs(premise - hypothesis), torch.abs(premise - explanation), torch.abs(hypothesis - explanation),
                          premise * explanation, hypothesis * explanation,
                          premise * hypothesis), 1)
    out = self.net(combined)
    return out


###############################################################################
## InferSent model to bring it all together
###############################################################################


class InferSent(nn.Module):

  def __init__(self, enc_hidden_dim, cls_hidden_dim):
    super(InferSent, self).__init__()
    self.encoder = BiLSTMEncoder(enc_hidden_dim, maxpool=True)
    self.cls_input_dim = enc_hidden_dim * 2 * 8
    self.classifier = Classifier(self.cls_input_dim, cls_hidden_dim, out_dim=3)

  def forward(self, batch):
    (premise, hypothesis, explanation) = batch
    premise_encoded = self.encoder(premise)
    hypothesis_encoded = self.encoder(hypothesis)
    explanation_encoded = self.encoder(explanation)
    out = self.classifier(premise_encoded, hypothesis_encoded, explanation_encoded)
    return out

  def encode(self, sent):
    return self.encoder(sent)

In [None]:
model = InferSent(1028, 512)
if torch.cuda.is_available():
    print("CUDA is available. Using GPU:", torch.cuda.get_device_name(0))
else:
    print("CUDA is not available. Using CPU.")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
optimizer = AdamW(model.parameters(),
                  lr = 5e-5,
                  eps = 1e-8
                 )


total_steps = len(train_loader_1) * 5

scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps = 0,
                                            num_training_steps = total_steps)

In [None]:
# Training loop
loss_tracker = []
criterion = nn.CrossEntropyLoss()
num_epochs = 2
for epoch in range(num_epochs):
    print("")
    print('======== Epoch {:} / {:} ========'.format(epoch + 1, num_epochs))
    print('Training...')

    # time taken for each epoch
    t0 = time.time()
    model.train()  # Set the model to training mode
    running_loss = 0.0

    for step, (batch1, batch2, batch3) in enumerate(zip(train_loader_1, train_loader_2, train_loader_3)):
        if step % 1000 == 0:
            elapsed = format_time(time.time() - t0)
            print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(train_loader_1), elapsed))
        b_input_ids = batch1['input_ids'].to(device)
        b_input_mask = batch1['attention_mask'].to(device)
        b_labels = batch1['labels'].to(device)

        b_input_ids2 = batch2['input_ids'].to(device)
        b_input_mask2 = batch2['attention_mask'].to(device)
        b_labels2 = batch2['labels'].to(device)

        b_input_id3 = batch3['input_ids'].to(device)
        b_input_mask3 = batch3['attention_mask'].to(device)
        b_labels3 = batch3['labels'].to(device)

        optimizer.zero_grad()  # Zero the parameter gradients

        # Forward pass
        outputs = model((b_input_ids, b_input_ids2))

        # Calculate the loss
        loss = criterion(outputs, b_labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        scheduler.step()

        running_loss += loss.item()
    avg_loss = running_loss / len(train_loader_1)
    loss_tracker.append(avg_loss)

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader_1)}')

In [None]:
model.eval()  # Set the model to evaluation mode
predictions, true_labels = [], []
for batch1, batch2, batch3 in zip(test_loader_1, test_loader_2, test_loader_3):
  b1 = batch1['input_ids'].to(device)
  b2 = batch2['input_ids'].to(device)
  b3 = batch3['input_ids'].to(device)
  with torch.no_grad():
      outputs = model((b1, b2, b3))
  logits = outputs
  logits = logits.detach().cpu().numpy()
  label_ids = batch1['labels'].to('cpu').numpy()

  predictions.append(logits)
  true_labels.append(label_ids)

predictions = np.argmax(np.concatenate(predictions, axis=0), axis=1)
true_labels = np.concatenate(true_labels, axis=0)

accuracy = accuracy_score(true_labels, predictions)
print("Accuracy:", accuracy)

precision, recall, f1, _ = precision_recall_fscore_support(true_labels, predictions, average='weighted')
print(f"Precision: {precision}\nRecall: {recall}\nF1 Score: {f1}")

In [None]:
target_cols3 = ['Sentence1', 'Sentence2', 'Explanation_2']
X_test = select_cols(test, target_cols3)
test_encodings_1 = tokenizer(X_test['Sentence1'].astype(str).tolist(), truncation=True, padding=True)
test_encodings_2 = tokenizer(X_test['Sentence2'].astype(str).tolist(), truncation=True, padding=True)
test_encodings_3 = tokenizer(X_test['Explanation_2'].astype(str).tolist(), truncation=True, padding=True)
test_dataset_1 = NliDataset(test_encodings_1, y_test.tolist())
test_dataset_2 = NliDataset(test_encodings_2, y_test.tolist())
test_loader_1 = DataLoader(test_dataset_1, batch_size = 16)
test_loader_2 = DataLoader(test_dataset_2, batch_size = 16)
test_dataset_3 = NliDataset(test_encodings_3, y_test.tolist())
test_loader_3 = DataLoader(test_dataset_3, batch_size = 16)

predictions, true_labels = [], []
for batch1, batch2, batch3 in zip(test_loader_1, test_loader_2, test_loader_3):
  b1 = batch1['input_ids'].to(device)
  b2 = batch2['input_ids'].to(device)
  b3 = batch3['input_ids'].to(device)
  with torch.no_grad():
      outputs = model((b1, b2, b3))
  logits = outputs
  logits = logits.detach().cpu().numpy()
  label_ids = batch1['labels'].to('cpu').numpy()

  predictions.append(logits)
  true_labels.append(label_ids)

predictions = np.argmax(np.concatenate(predictions, axis=0), axis=1)
true_labels = np.concatenate(true_labels, axis=0)

accuracy = accuracy_score(true_labels, predictions)
print("Accuracy:", accuracy)

precision, recall, f1, _ = precision_recall_fscore_support(true_labels, predictions, average='weighted')
print(f"Precision: {precision}\nRecall: {recall}\nF1 Score: {f1}")

In [None]:
target_cols3 = ['Sentence1', 'Sentence2', 'Explanation_3']
X_test = select_cols(test, target_cols3)
test_encodings_1 = tokenizer(X_test['Sentence1'].astype(str).tolist(), truncation=True, padding=True)
test_encodings_2 = tokenizer(X_test['Sentence2'].astype(str).tolist(), truncation=True, padding=True)
test_encodings_3 = tokenizer(X_test['Explanation_3'].astype(str).tolist(), truncation=True, padding=True)
test_dataset_1 = NliDataset(test_encodings_1, y_test.tolist())
test_dataset_2 = NliDataset(test_encodings_2, y_test.tolist())
test_loader_1 = DataLoader(test_dataset_1, batch_size = 16)
test_loader_2 = DataLoader(test_dataset_2, batch_size = 16)
test_dataset_3 = NliDataset(test_encodings_3, y_test.tolist())
test_loader_3 = DataLoader(test_dataset_3, batch_size = 16)

predictions, true_labels = [], []
for batch1, batch2, batch3 in zip(test_loader_1, test_loader_2, test_loader_3):
  b1 = batch1['input_ids'].to(device)
  b2 = batch2['input_ids'].to(device)
  b3 = batch3['input_ids'].to(device)
  with torch.no_grad():
      outputs = model((b1, b2, b3))
  logits = outputs
  logits = logits.detach().cpu().numpy()
  label_ids = batch1['labels'].to('cpu').numpy()

  predictions.append(logits)
  true_labels.append(label_ids)

predictions = np.argmax(np.concatenate(predictions, axis=0), axis=1)
true_labels = np.concatenate(true_labels, axis=0)

accuracy = accuracy_score(true_labels, predictions)
print("Accuracy:", accuracy)

precision, recall, f1, _ = precision_recall_fscore_support(true_labels, predictions, average='weighted')
print(f"Precision: {precision}\nRecall: {recall}\nF1 Score: {f1}")

In [None]:
model_save_path = "/content/drive/MyDrive/CS4248/models/base/model.pth"
optimizer_save_path = "/content/drive/MyDrive/CS4248/models/base/optimizer.pth"

torch.save(model.state_dict(), model_save_path)
torch.save(optimizer.state_dict(), optimizer_save_path)