# Welcome to our Machine Learning Project for IT1244

We are group <group number> and we would like to investigate the effects of using bigrams and trigrams in the accuracy of predicting the sentiments of movie reviews. 

## Why bigrams and trigrams

The concept of using N-grams is a fundamental practice in the realm of Natural Language Processing. N-grams refer to a contiguous sequences of n items in a body of text grouped together. More specifically, bigrams refer to pairs of consecutive words while trigrams refer to groups of 3 consecutive words. N-grams are used to capture context, allowing machines to accurately accomplish tasks such as generating texts and predicting sentiments.

### Our approach

We decided to use neural networks as our training model, rather than recurrent neural networks, because we feel that NNs can help with capturing long-term dependencies in sequences more than RNNs.

First, let us import some modules to help us train our model. We set manual seed to be 0 (or could be any other value) to ensure that the dropout values are consistent in the later part of training the model.


In [8]:
import datetime

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

torch.manual_seed(0)
torch.cuda.manual_seed(0)


## First Step: Setting up the Dataset class

Pytorch has a dataset class that allows us to use data in batches, rather than using the data wholesale. We want to implement a `SentDataset` class that inherits the property of the Dataset class in Pytorch.

Some more information about the class methods:
- `generate_bigrams` generates a list of bigrams from every movie review
- `generate_trigrams` generates a list of trigrams

In [9]:
class SentDataset(Dataset):
    """
    A pytorch dataset class that accepts a text path, and optionally label path (only during training phase) and
    a vocabulary (only during testing phase). This class holds all the data and implement
    a __getitem__ method to be used by a Python generator object or other classes that need it.

    """
    def __init__(self, train_path, type, label_path=None, vocab=None):
        """
        Read the content of vocab and text_file
        Args:
            vocab (string): Path to the vocabulary file.
            text_file (string): Path to the text file.
            type (string): Specify if model is trained using bigrams or trigrams
        """
        self.label_path = label_path
        self.type = type
        self.texts = []
        self.labels = []
        with open(train_path, encoding='utf-8') as f:
            self.texts = [line for line in f.readlines() if line.strip()]
        if label_path:
            with open(label_path, encoding='utf-8') as f:
                self.labels = [line for line in f.readlines() if line.strip()]
        if not vocab:
            self.vocabulary = {}
            curr_idx = 0
            for text in self.texts:
                ngrams = self.generate_bigrams_or_trigrams(text, type)
                for ngram in ngrams:
                    if ngram in self.vocabulary:
                        continue
                    else:
                        self.vocabulary[ngram] = curr_idx
                        curr_idx += 1
        else: 
            self.vocabulary = vocab
        

    def generate_bigrams(self, text):
        """
        Function to generate bigrams from a text (string)
        Bigrams are defined as a grouping of a text into a list of 2 consecutive words
        """
        tokens = text.split()
        bigrams = []
        for i in range(len(tokens) - 1):
            bigram = f"{tokens[i]} {tokens[i + 1]}"
            bigrams.append(bigram)
        return bigrams
    
    def generate_trigrams(self, text):
        """
        Function to generate bigrams from a text (string)
        Bigrams are defined as a grouping of a text into a list of 3 consecutive words
        """
        tokens = text.split()
        trigrams = []
        for i in range(len(tokens) - 2):
            trigram = f"{tokens[i]} {tokens[i + 1]} {tokens[i + 2]}"
            trigrams.append(trigram)
        return trigrams
    
    def generate_bigrams_or_trigrams(self, text, type):
        """
        Function to determine if bigrams or trigrams should be generated, depending on type specified
        """
        if type == "bigram":
            return self.generate_bigrams(text)
        else: 
            return self.generate_trigrams(text)


    def vocab_size(self):
        """
        A function to inform the vocab size. The function returns two numbers:
            num_vocab: size of the vocabulary
        """
        return len(self.vocabulary)

    
    def __len__(self):
        """
        Return the number of instances in the data
        """
        return len(self.texts)

    def __getitem__(self, i):
        """
        Return the i-th instance in the format of:
            (text, label)
        Text and label is encoded according to the vocab (word_id).

        """
        if self.label_path: # training
            text = self.texts[i]
            label = int(self.labels[i])
            indices = []
            ngrams_in_text = self.generate_bigrams_or_trigrams(text, self.type)
            for ngram in ngrams_in_text:
                index = self.vocabulary.get(ngram)
                indices.append(index)
    
            indices_tensor = torch.tensor(indices)
            return indices_tensor, label
        
        else: # testing 
            text = self.texts[i]
            indices = []
            ngrams_in_text = self.generate_bigrams_or_trigrams(text, self.type)
            for ngram in ngrams_in_text:
                if ngram in self.vocabulary:
                    index = self.vocabulary.get(ngram)
                    indices.append(index)
            indices_tensor = torch.tensor(indices)
            return indices_tensor

## Second step: Setting up the model

As mentioned earlier, we decided to use neural networks as our model framework. We have also came up with custom layers to pass our data through.

*TODO: explain more on embedding*

These layers are (in order):
- an embedding layer that takes in the number of vocabulary to place a limit on the number of embeddings needed, as well as the embedding dimension, which is a hyperparameter.
- a first linear layer with output dimension, which is a parameter.
- a RELU activation function after the first linear layer
- a dropout layer with hyperparameter probability to reduce overfitting
- a second linear layer with output dimension 1
- a final sigmoid layer to output values 0 to 1, representing the probability of the movie review to be positive (1) or negative (0)


In [10]:
class Model(nn.Module):
    """
    Define your model here
    """
    def __init__(self, num_vocab):
        super().__init__()
        # define model attributes 
        self.embedding_dim = 8 # define embedding dimensions (hyperparameter)
        self.embedding = nn.Embedding(num_vocab, self.embedding_dim) # transform words into embeddings
        self.first_layer_dim = 24 # define first layer dimension (hyperparameter)
        self.linear_layer_1 = nn.Linear(self.embedding_dim, self.first_layer_dim) # linear layer
        self.relu = nn.ReLU() # ReLU activation function
        self.dropout = nn.Dropout(0.2) # dropout of 0.2 probability (hyperparameter) to reduce overfitting
        self.linear_layer_2 = nn.Linear(self.first_layer_dim, 1) # last linear layer
        self.sigmoid = nn.Sigmoid() # Sigmoid function to determine probabilities


    def forward(self, x):
        x = self.embedding(x)
        x = torch.mean(x, dim=1)
        x = self.linear_layer_1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear_layer_2(x)
        x = self.sigmoid(x)

        return x

## Third step: Define a collator function

A collator function is used during the loading of batches of data during testing. This function returns a pair of tensors that represent the texts in the batch, as well as the labels of the texts.

Note that the dimensions of the text may differ from each other, so we padded the tensor with zeros to ensure that we can fit multiple texts of different sizes into one big tensor.

In [11]:
def collator(batch):
    """
    A function that receives a list of (text, label) pair
    and return a pair of tensors:
        texts: a tensor that combines all the text in the mini-batch, pad with 0
        labels: a tensor that combines all the labels in the mini-batch
    """
    if len(batch[0]) == 2:
        texts, labels = zip(*batch)
        # convert text indices to tensor
        texts_tensor = nn.utils.rnn.pad_sequence([text for text in texts], batch_first=True, padding_value=0)
        labels_tensor = torch.tensor(labels, dtype=torch.float32)
        return texts_tensor, labels_tensor
    else:
        texts_tensor = nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0)
        return texts_tensor

## Fourth step: Define the training function

Next, we would have to train the model. We used the `DataLoader` class provided by pytorch to load data in batches, with the help of the `collator` function we just initialised.

The loss function we have chosen is the Binary Cross Entropy Loss, as the labels are in binary form (0 or 1). The optimiser we chose is Adam's optimiser.

In each epoch, we will train the model using batches of data. We would then do forward propagation of the data, calculate the loss, and finally update the weights of the model.

After training, we would save the state of the model and the optimizer into a checkpoint file. This file is to be used later during testing.

In [12]:
def train(model, dataset, batch_size, learning_rate, num_epoch, device='cpu', model_path=None):
    """
    Complete the training procedure below by specifying the loss function
    and optimizers with the specified learning rate and specified number of epoch.
    
    """
    # instantiate the data loader which loads data in batches
    data_loader = DataLoader(dataset, batch_size=batch_size, collate_fn=collator, shuffle=True)

    # loss function is Binary Cross Entropy Loss
    criterion = nn.BCELoss()
    # optimiser is Adam's optimiser
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    start = datetime.datetime.now()
    for epoch in range(num_epoch):
        model.train()
        running_loss = 0.0
        for step, data in enumerate(data_loader, 0):
            # get the inputs; data is a list of [inputs, labels]
            texts = data[0].to(device)
            labels = data[1].to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # do forward propagation
            outputs = model(texts)

            # calculate the loss
            loss = criterion(outputs, labels.reshape((outputs.shape[0], 1)))

            # do backward propagation to update the weights
            loss.backward()

            # do the parameter optimization
            optimizer.step()

            # calculate running loss value for non padding
            running_loss += loss.item()

            # print loss value every 100 iterations and reset running loss
            if step % 100 == 99:
                print('[%d, %5d] loss: %.3f' %
                    (epoch + 1, step + 1, running_loss / 100))
                running_loss = 0.0

    end = datetime.datetime.now()
    
    # make the checkpoint of the model and save it to the model path
    # contains current state of the model, optimiser, number of epochs, and current vocabulary
    checkpoint = {
        'model_state': model.state_dict(),
        'optimizer_state': optimizer.state_dict(),
        'epoch': num_epoch,
        'vocab': dataset.vocabulary
    }
    torch.save(checkpoint, model_path)

    print('Model saved in ', model_path)
    print('Training finished in {} minutes.'.format((end - start).seconds / 60.0))

## Fifth step: define the testing function

With the same logic as the training function, we load batches from the test dataset to predict their labels. Using 0.5 as the threshold, results that are 0.5 and above would be a positive sentiment, while those below 0.5 would be a negative sentiment.

In [13]:
def test(model, dataset, thres=0.5, device='cpu'):
    model.eval()
    data_loader = DataLoader(dataset, batch_size=20, collate_fn=collator, shuffle=False)
    labels = []
    with torch.no_grad():
        for data in data_loader:
            texts = data.to(device)
            results = model(texts)
            pred_labels = (results > thres).int().tolist()
            pred_labels = sum(pred_labels, [])
            labels.extend(pred_labels)

    return [str(x) for x in labels]

## Almost there

Let's initialise variables useful to us such as the path to training dataset, the testing dataset, and the model checkpoint.

In [14]:
x_train = "x_train.txt"
x_test = "x_test.txt"
y_train = "y_train.txt"
y_test = "y_test.txt"
output_path = "out.txt"
model_checkpoint = "model.pt"

## Finally: train the model!

We have now initialised all the functions that we needed. Now it is time to run the code. 

During the training phase, we need to initialise the SentDataset model with the path of the training dataset and the labels. Then, we pass the number of vocab as a parameter into the model. We specify hyper-parameters of such as batch size, learning rate, and number of epoch. 

The function below trains the model using bigrams.

In [15]:
if torch.cuda.is_available():
    device_str = 'cuda:{}'.format(0)
else:
    device_str = 'cpu'
    device = torch.device(device_str)

dataset = SentDataset(x_train, "bigram", y_train)
num_vocab = dataset.vocab_size()
model = Model(num_vocab).to(device)

# specify hyper-parameters
batch_size = 48
learning_rate = 0.01
num_epochs = 10

train(model, dataset, batch_size, learning_rate, num_epochs, device, model_checkpoint)

[1,   100] loss: 0.695
[1,   200] loss: 0.691
[1,   300] loss: 0.660
[1,   400] loss: 0.496
[1,   500] loss: 0.382
[2,   100] loss: 0.166
[2,   200] loss: 0.147
[2,   300] loss: 0.136
[2,   400] loss: 0.138
[2,   500] loss: 0.119
[3,   100] loss: 0.027
[3,   200] loss: 0.023
[3,   300] loss: 0.027
[3,   400] loss: 0.042
[3,   500] loss: 0.020
[4,   100] loss: 0.010
[4,   200] loss: 0.012
[4,   300] loss: 0.008
[4,   400] loss: 0.029
[4,   500] loss: 0.009
[5,   100] loss: 0.004
[5,   200] loss: 0.005
[5,   300] loss: 0.005
[5,   400] loss: 0.005
[5,   500] loss: 0.005
[6,   100] loss: 0.002
[6,   200] loss: 0.005
[6,   300] loss: 0.003
[6,   400] loss: 0.003
[6,   500] loss: 0.002
[7,   100] loss: 0.004
[7,   200] loss: 0.004
[7,   300] loss: 0.004
[7,   400] loss: 0.001
[7,   500] loss: 0.001
[8,   100] loss: 0.090
[8,   200] loss: 0.008
[8,   300] loss: 0.006
[8,   400] loss: 0.003
[8,   500] loss: 0.002
[9,   100] loss: 0.003
[9,   200] loss: 0.002
[9,   300] loss: 0.001
[9,   400] 

Run the code excerpt below to train the model using trigrams

In [None]:
if torch.cuda.is_available():
    device_str = 'cuda:{}'.format(0)
else:
    device_str = 'cpu'
    device = torch.device(device_str)

dataset = SentDataset(x_train, "trigram", y_train)
num_vocab = dataset.vocab_size()
model = Model(num_vocab).to(device)

# specify hyper-parameters
batch_size = 48
learning_rate = 0.01
num_epochs = 10

train(model, dataset, batch_size, learning_rate, num_epochs, device, model_checkpoint)

## Test the model

Now that you have finished training the model, it is time to test the model. 

We first load the model checkpoint saved during the testing phase. Similarly to our training phase, we pass our testing dataset into our SentDataset class. Then, we load the model using the checkpoint saved.

Following which, we test the model using the testing dataset, outputting the predicted labels into the output path as specified earlier.

Run the code excerpt below if you have trained your model using `bigrams`.

In [16]:
# load the checkpoint
checkpoint = torch.load(model_checkpoint)

# create the test dataset object using SentDataset class
dataset = SentDataset(x_test, "bigram" , vocab=checkpoint["vocab"])

# initialize and load the model
num_vocab = dataset.vocab_size()
model = Model(num_vocab).to(device)
model.load_state_dict(checkpoint["model_state"])

# run the prediction
preds = test(model, dataset, 0.5, device)
# write the output
with open(output_path, 'w', encoding='utf-8') as f:
    f.write('\n'.join(preds))

Run the code excerpt below if you have trained your model using `trigrams`

**Note**: you should run the testing model that corresponds to your method of training. For example, if you trained your model using bigrams, you should test your model using bigrams. This is to ensure maximum accuracy.

In [None]:
# load the checkpoint
checkpoint = torch.load(model_checkpoint)

# create the test dataset object using SentDataset class
dataset = SentDataset(x_test, "bigram" , vocab=checkpoint["vocab"])

# initialize and load the model
num_vocab = dataset.vocab_size()
model = Model(num_vocab).to(device)
model.load_state_dict(checkpoint["model_state"])

# run the prediction
preds = test(model, dataset, 0.5, device)
# write the output
with open(output_path, 'w', encoding='utf-8') as f:
    f.write('\n'.join(preds))

## Evaluate your results!

Finally, it is time to evaluate the accuracy of the model. To do this, we compare the predicted labels against the actual test labels.

In [17]:
with open(output_path, encoding='utf-8') as f:
    preds = [l.strip() for l in f.readlines()]
with open(y_test, encoding='utf-8') as f:
    labels = [l.strip() for l in f.readlines()]
assert len(preds) == len(labels), "Length of predictions ({}) and labels ({}) are not the same"\
    .format(len(preds), len(labels))

correct = 0
for pred, label in zip(preds, labels):
    if pred == label:
        correct += 1
print('Accuracy: {:.2f}%'.format((100.0 * correct) / len(labels)))

Accuracy: 90.91%
