<a href="https://colab.research.google.com/github/WillN202/NLU_CW/blob/main/NLU_Task_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## NLU Task 2

In [27]:
!pip install torchmetrics
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [28]:
import torch
from torch.utils.data import Dataset, DataLoader
import torchmetrics
import os.path
import pandas as pd
import numpy as np
from gensim.models import Word2Vec
import string
import re
TRAINING_DATASET_LOCATION = "/content/drive/MyDrive/train.csv"
DEV_DATASET_LOCATION = "/content/drive/MyDrive/dev.csv"
TEST_DATASET_LOCATION = "/content/drive/MyDrive/AV_trial.csv"
WORD2VEC_EMBEDDINGS = "/content/drive/MyDrive/word2vec_embeddings.model"
EPOCHS = 4

DEVICE = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(DEVICE)
torch.set_default_device(DEVICE)

cuda


In [29]:
def generic_preprocessor(sentence):
  sentence = sentence.lower()

  return sentence

class AVDataset(Dataset):
  def __init__(self, csv_file, pre_processor=None):
    self.samples = pd.read_csv(csv_file)
    self.pre_processor = pre_processor

  def __len__(self):
      return len(self.samples)

  def __getitem__(self, index):
    sample = self.samples.iloc[index]
    # sample[0] = re.sub(f"[{re.escape(string.punctuation)}]", "", sample[0])
    # sample[0] = re.sub(f"[{re.escape(string.punctuation)}]", "", sample[1])
    sample_text = f"{sample[0]} <sep> {sample[1]}"
    return (self.pre_processor(sample_text), sample[2])




In [33]:
training_samples = AVDataset(TRAINING_DATASET_LOCATION, pre_processor=generic_preprocessor)
dev_samples = AVDataset(DEV_DATASET_LOCATION, pre_processor=generic_preprocessor)
test_samples = AVDataset(TEST_DATASET_LOCATION, pre_processor=generic_preprocessor)
training_loader = DataLoader(training_samples, batch_size=64, shuffle=True, generator=torch.Generator(device=DEVICE))
dev_loader = DataLoader(dev_samples, batch_size=64, shuffle=True, generator=torch.Generator(device=DEVICE))
test_loader =  DataLoader(test_samples, batch_size=64, shuffle=True, generator=torch.Generator(device=DEVICE))

In [32]:
# TODO - Fix reading from saved embeddings
phrases = pd.read_csv(TRAINING_DATASET_LOCATION)
phrases = phrases.loc[:, "text_1":"text_2"].to_numpy().flatten().tolist()
#phrases = [re.sub(f"[{re.escape(string.punctuation)}]", "", str(phrase)).split() for phrase in phrases]
phrases = [str(phrase).split() for phrase in phrases]

embeddings = Word2Vec(sentences=phrases, workers=100, min_count=1, vector_size=256)
embeddings.wv["<UNK>"] = np.random.rand(256)
embeddings.wv["<sep>"] = np.random.rand(256)
embeddings.wv["<pad>"] = np.random.rand(256)
# embeddings.save("/content/drive/MyDrive/word2vec_embeddings.model")
# WORD2VEC_EMBEDDINGS = "/content/drive/MyDrive/word2vec_embeddings.model"

#embeddings = Word2Vec.load(WORD2VEC_EMBEDDINGS)


In [34]:
class BaseGruRNN(torch.nn.Module):
  def __init__(self, embedding_size, output_size, hidden_size, embeddings, vocab, rnn_layers=1, is_bidirectional=False):
    super(BaseGruRNN, self).__init__()
    self.vocab = vocab
    self.get_embedding = torch.nn.Embedding.from_pretrained(embeddings)
    self.GRU_Layer = torch.nn.GRU(embedding_size, hidden_size, batch_first=True, num_layers=rnn_layers, dropout=0.1, bidirectional=is_bidirectional)

  def forward(self, x, linear_layer):
    unk_embedding = self.vocab["<UNK>"]
    x = [re.sub(f"[{re.escape(string.punctuation)}]", "", sentence) for sentence in x]
    x = [sentence.split() for sentence in x]
    x = [[self.vocab.get(word, unk_embedding) for word in sentence ] for sentence in x]
    # TODO -> instead of padding, use pack sequence instead. Note this may break the output from the lstm (woo)
    max_len = max([len(words) for words in x])
    x = [([self.vocab["<pad>"]] * (max_len -  len(words))) + words for words in x]
    x = torch.tensor(x)

    embeddings = self.get_embedding(x)
    GRU_values = self.GRU_Layer(embeddings)[1]
    GRU_values = GRU_values[0] #Get last hidden state(s)
    result = linear_layer(GRU_values)
    return result

class OneLinearLayerGruRNN(BaseGruRNN):
    def __init__(self, embedding_size, output_size, hidden_size, embeddings, vocab, rnn_layers=1, is_bidirectional=False):
      self.base = super(OneLinearLayerGruRNN, self).__init__(embedding_size, output_size, hidden_size, embeddings, vocab, rnn_layers, is_bidirectional)
      self.linear_layer = torch.nn.Sequential(
          torch.nn.Linear(hidden_size, output_size),
      )

    def forward(self, x):
      return self.base.forward(x, self.linear_layer)

class TwoLinearLayerGruRNN(BaseGruRNN):
    def __init__(self, embedding_size, output_size, hidden_size, embeddings, vocab, rnn_layers=1, is_bidirectional=False):
      super().__init__(embedding_size, output_size, hidden_size, embeddings, vocab, rnn_layers, is_bidirectional)
      self.linear_layer = torch.nn.Sequential(
          torch.nn.Linear(hidden_size, hidden_size),
          torch.nn.LeakyReLU(),
          torch.nn.Dropout(p=0.1),
          torch.nn.Linear(hidden_size, output_size)
      )

    def forward(self, x):
      return super().forward(x, self.linear_layer)

class ThreeLinearLayerGruRNN(BaseGruRNN):
    def __init__(self, embedding_size, output_size, hidden_size, embeddings, vocab, rnn_layers=1, is_bidirectional=False):
      super(ThreeLinearLayerGruRNN, self).__init__(embedding_size, output_size, hidden_size, embeddings, vocab, rnn_layers, is_bidirectional)
      self.linear_layer = torch.nn.Sequential(
          torch.nn.Linear(hidden_size, hidden_size),
          torch.nn.LeakyReLU(),
          torch.nn.Dropout(p=0.1),
          torch.nn.Linear(hidden_size, hidden_size),
          torch.nn.LeakyReLU(),
          torch.nn.Dropout(p=0.1),
          torch.nn.Linear(hidden_size, output_size)
      )

class FourLinearLayerGruRNN(BaseGruRNN):
    def __init__(self, embedding_size, output_size, hidden_size, embeddings, vocab, rnn_layers=1, is_bidirectional=False):
      super(FourLinearLayerGruRNN, self).__init__(embedding_size, output_size, hidden_size, embeddings, vocab, rnn_layers, is_bidirectional)
      self.linear_layer = torch.nn.Sequential(
          torch.nn.Linear(hidden_size, hidden_size),
          torch.nn.LeakyReLU(),
          torch.nn.Dropout(p=0.1),
          torch.nn.Linear(hidden_size, hidden_size),
          torch.nn.LeakyReLU(),
          torch.nn.Dropout(p=0.1),
          torch.nn.Linear(hidden_size, hidden_size),
          torch.nn.LeakyReLU(),
          torch.nn.Dropout(p=0.1),
          torch.nn.Linear(hidden_size, output_size)
      )

coded_embeddings = torch.FloatTensor(embeddings.wv.vectors).to(DEVICE)
vocab = embeddings.wv.key_to_index

## Model Training

In [37]:
# TODO figure out why default device isn't working
model = TwoLinearLayerGruRNN(256, 1, 256, coded_embeddings, vocab).to(DEVICE)
loss_function = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.00001)
torch.set_grad_enabled(True)

print(f"Epochs: {EPOCHS}")

for epoch in range(0, EPOCHS):
    model.train()
    running_loss = 0.0

    # Training
    for index, value in enumerate(training_loader):
        optimizer.zero_grad()
        data, labels = value
        labels = labels.reshape(-1,1)
        labels = labels.type(torch.FloatTensor)
        labels = labels.to(DEVICE)
        outputs = model(data)

        loss = loss_function(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # Validation to ensure the model is learning
    model.eval()
    torch.set_grad_enabled(False)
    running_loss = 0
    num_correct = 0
    for index, value in enumerate(dev_loader):
        data, labels = value
        labels = labels.reshape(-1,1)
        labels = labels.type(torch.FloatTensor)
        labels = labels.to(DEVICE)

        outputs = model(data)
        loss = loss_function(outputs, labels)
        running_loss += loss

        for answer, standard in zip(outputs, labels):
            normalised_answer = torch.round(torch.sigmoid(answer[0]))
            num_correct += 1 if normalised_answer == standard[0] else 0

    torch.set_grad_enabled(True)
    print(f"---------------------EPOCH {epoch+1} / {EPOCHS}---------------------")
    print(f"Batch Loss {running_loss / len(dev_loader)}")
    print(f"Accuracy {num_correct / len(dev_loader.dataset)}")



Epochs: 4
---------------------EPOCH 1 / 4---------------------
Batch Loss 0.6928998231887817
Accuracy 0.509
---------------------EPOCH 2 / 4---------------------
Batch Loss 0.6925622224807739
Accuracy 0.5111666666666667
---------------------EPOCH 3 / 4---------------------
Batch Loss 0.6925020813941956
Accuracy 0.5185
---------------------EPOCH 4 / 4---------------------
Batch Loss 0.6935755014419556
Accuracy 0.5128333333333334


## Model Testing

In [39]:
model.eval()
torch.set_grad_enabled(False)
running_loss = 0
num_correct = 0

loader = test_loader

for index, value in enumerate(loader):
    data, labels = value
    labels = labels.reshape(-1,1)
    labels = labels.type(torch.FloatTensor)
    labels = labels.to(DEVICE)

    outputs = model(data)
    loss = loss_function(outputs, labels)
    running_loss += loss

    for answer, standard in zip(outputs, labels):
        normalised_answer = torch.round(torch.sigmoid(answer[0]))
        num_correct += 1 if normalised_answer == standard[0] else 0

torch.set_grad_enabled(True)
print(f"Accuracy {num_correct / len(loader.dataset)}")

Accuracy 0.56
