# Implement and train a LSTM for sentiment analysis

## Step 0: set up the environment

In [57]:
%pip install nltk

import functools
import sys
import numpy as np
import pandas as pd
import random
import re
import matplotlib.pyplot as plt
import tqdm
import nltk
from sklearn.model_selection import train_test_split
from nltk.corpus import stopwords
from collections import Counter
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
import string

nltk.download('stopwords')

torch.backends.cudnn.benchmark = True

import os
os.makedirs("resources", exist_ok=True)

Note: you may need to restart the kernel to use updated packages.


[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\18208\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


### Hyperparameters. No need to touch.

In [58]:
class HyperParams:
    def __init__(self):
        # Constance hyperparameters. They have been tested and don't need to be tuned.
        self.PAD_INDEX = 0
        self.UNK_INDEX = 1
        self.PAD_TOKEN = '<pad>'
        self.UNK_TOKEN = '<unk>'
        self.STOP_WORDS = set(stopwords.words('english'))
        self.MAX_LENGTH = 256
        self.BATCH_SIZE = 96
        self.EMBEDDING_DIM = 1
        self.HIDDEN_DIM = 100
        self.OUTPUT_DIM = 2
        self.N_LAYERS = 1
        self.DROPOUT_RATE = 0.0
        self.LR = 0.001
        self.N_EPOCHS = 5
        self.WD = 0
        self.SEED = 12
        self.BIDIRECTIONAL = False

## Lab 1(a) Implement your own data loader function.  
First, you need to read the data from the dataset file on the local disk.
Then, split the dataset into three sets: train, validation and test by 7:1:2 ratio.
Finally return x_train, x_valid, x_test, y_train, y_valid, y_test where x represents reviews and y represent labels.  

In [59]:
def load_imdb(base_csv:str = './IMDBDataset.csv'):
    """
    Load the IMDB dataset
    :param base_csv: the path of the dataset file.
    :return: train, validation and test set.
    """
    # Add your code here.
    df = pd.read_csv(base_csv)
    X = df['review']
    y = df['sentiment']
    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=12)
    x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.125, random_state = 12)


    print(f'shape of train data is {x_train.shape}')
    print(f'shape of test data is {x_test.shape}')
    print(f'shape of valid data is {x_valid.shape}')
    return x_train, x_valid, x_test, y_train, y_valid, y_test

x_train, x_valid, x_test, y_train, y_valid, y_test = load_imdb()

shape of train data is (35000,)
shape of test data is (10000,)
shape of valid data is (5000,)


## Lab 1(b): Implement your function to build a vocabulary based on the training corpus.
Implement the build_vocab function to build a vocabulary based on the training corpus.
You should first compute the frequency of all the words in the training corpus. Remove the words
that are in the STOP_WORDS. Then filter the words by their frequency (≥ min_freq) and finally
generate a corpus variable that contains a list of words.

In [60]:
def build_vocab(x_train:list, min_freq: int=5, hparams=None) -> dict:
    """
    build a vocabulary based on the training corpus.
    :param x_train:  List. The training corpus. Each sample in the list is a string of text.
    :param min_freq: Int. The frequency threshold for selecting words.
    :return: dictionary {word:index}
    """
    # Add your code here. Your code should assign corpus with a list of words.
    
    all_words = []
    for sample in x_train:
        sample = sample.lower()
        sample = sample.translate(str.maketrans('', '', string.punctuation))
        words = sample.split()
        all_words.extend(words)
            
    corpus = Counter(all_words)
    corpus = {word: freq for word, freq in corpus.items() if word.lower() not in stopwords.words('english')}
    corpus_ = [word for word, freq in corpus.items() if freq >= min_freq]
    # creating a dict
    vocab = {w:i+2 for i, w in enumerate(corpus_)}
    vocab[hparams.PAD_TOKEN] = hparams.PAD_INDEX
    vocab[hparams.UNK_TOKEN] = hparams.UNK_INDEX
    return vocab

## Lab 1(c): Implement your tokenize function.
For each word, find its index in the vocabulary.
Return a list of int that represents the indices of words in the example.

In [61]:
def tokenize(vocab: dict, example: str)-> list:
    """
    Tokenize the give example string into a list of token indices.
    :param vocab: dict, the vocabulary.
    :param example: a string of text.
    :return: a list of token indices.
    """
    # Your code here.
    example = example.lower()
    example = example.translate(str.maketrans('', '', string.punctuation))
    tokens = example.split()
    unk_index = vocab.get("<UNK>", 1)
    token_indices = [vocab.get(word, unk_index) for word in tokens]

    return token_indices

## Lab 1 (d): Implement the __getitem__ function. Given an index i, you should return the i-th review and label.
The review is originally a string. Please tokenize it into a sequence of token indices.
Use the max_length parameter to truncate the sequence so that it contains at most max_length tokens.
Convert the label string ('positive'/'negative') to a binary index. 'positive' is 1 and 'negative' is 0.
Return a dictionary containing three keys: 'ids', 'length', 'label' which represent the list of token ids, the length of the sequence, the binary label.

In [73]:
class IMDB(Dataset):
    def __init__(self, x, y, vocab, max_length=256) -> None:
        """
        :param x: list of reviews
        :param y: list of labels
        :param vocab: vocabulary dictionary {word:index}.
        :param max_length: the maximum sequence length.
        """
        self.x = list(x)
        self.y = list(y)
        self.vocab = vocab
        self.max_length = max_length

    def __getitem__(self, idx: int):
        """
        Return the tokenized review and label by the given index.
        :param idx: index of the sample.
        :return: a dictionary containing three keys: 'ids', 'length', 'label' which represent the list of token ids, the length of the sequence, the binary label.
        """
        # Add your code here.


        review = self.x[idx]
        label = self.y[idx]

        # Tokenize the review
        token_ids = tokenize(self.vocab, review)

        # Truncate to max_length
        if len(token_ids) > self.max_length:
            token_ids = token_ids[:self.max_length]

        # Convert label to binary
        binary_label = 1 if label == "positive" else 0

        return {
            'ids': token_ids,
            'length': len(token_ids),
            'label': binary_label
        }


    def __len__(self) -> int:
        return len(self.x)

def collate(batch, pad_index):
    batch_ids = [torch.LongTensor(i['ids']) for i in batch]
    batch_ids = nn.utils.rnn.pad_sequence(batch_ids, padding_value=pad_index, batch_first=True)
    batch_length = torch.Tensor([i['length'] for i in batch])
    batch_label = torch.LongTensor([i['label'] for i in batch])
    batch = {'ids': batch_ids, 'length': batch_length, 'label': batch_label}
    return batch

collate_fn = collate

## Lab 1 (e): Implement the LSTM model for sentiment analysis.
Q(a): Implement the initialization function.
Your task is to create the model by stacking several necessary layers including an embedding layer, a lstm cell, a linear layer, and a dropout layer.
You can call functions from Pytorch's nn library. For example, nn.Embedding, nn.LSTM, nn.Linear.<br>
Q(b): Implement the forward function.
    Decide where to apply dropout.
    The sequences in the batch have different lengths. Write/call a function to pad the sequences into the same length.
    Apply a fully-connected (fc) layer to the output of the LSTM layer.
    Return the output features which is of size [batch size, output dim].

In [75]:
def init_weights(m):
    if isinstance(m, nn.Embedding):
        nn.init.xavier_normal_(m.weight)
    elif isinstance(m, nn.Linear):
        nn.init.xavier_normal_(m.weight)
        nn.init.zeros_(m.bias)
    elif isinstance(m, nn.LSTM) or isinstance(m, nn.GRU):
        for name, param in m.named_parameters():
            if 'bias' in name:
                nn.init.zeros_(param)
            elif 'weight' in name:
                nn.init.orthogonal_(param)

class LSTM(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        embedding_dim: int,
        hidden_dim: int,
        output_dim: int,
        n_layers: int,
        dropout_rate: float,
        pad_index: int,
        bidirectional: bool = False,
        **kwargs):
        """
        Create an LSTM model for classification.
        :param vocab_size: Size of the vocabulary.
        :param embedding_dim: Dimension of word embeddings.
        :param hidden_dim: Dimension of hidden features in LSTM.
        :param output_dim: Number of output classes.
        :param n_layers: Number of LSTM layers.
        :param dropout_rate: Dropout rate.
        :param pad_index: Index of the padding token.
        :param bidirectional: Whether to use a bidirectional LSTM.
        """
        super().__init__()

        # 1. Embedding Layer
        self.embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embedding_dim,
            padding_idx=pad_index  # Ignore padding tokens in embedding
        )

        # 2. LSTM Layer
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            batch_first=True,  # Shape: (batch_size, seq_len, hidden_dim)
            bidirectional=bidirectional,
            dropout=dropout_rate if n_layers > 1 else 0  # Dropout only when n_layers > 1
        )

        # 3. Fully Connected (Linear) Layer
        lstm_output_dim = hidden_dim * 2 if bidirectional else hidden_dim
        self.fc = nn.Linear(lstm_output_dim, output_dim)

        # 4. Dropout Layer
        self.dropout = nn.Dropout(dropout_rate)

        # Initialize weights
        if "weight_init_fn" not in kwargs:
            self.apply(init_weights)
        else:
            self.apply(kwargs["weight_init_fn"])

    def forward(self, ids: torch.Tensor, length: torch.Tensor):
        """
        Forward pass of LSTM model.
        :param ids: [batch_size, seq_len] Tokenized input sequences.
        :param length: [batch_size] Length of each sequence before padding.
        :return: [batch_size, output_dim] Predicted logits.
        """
        # 1. Embedding Lookup
        embedded = self.embedding(ids)  # Shape: [batch_size, seq_len, embedding_dim]

        # 2. Pack Padded Sequence (Handles variable-length sequences)
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, length.cpu(), batch_first=True, enforce_sorted=False)

        # 3. LSTM Forward Pass
        packed_output, (hidden, _) = self.lstm(packed_embedded)

        # 4. Unpack Sequence
        lstm_output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)

        # 5. Extract Final Hidden State
        if self.lstm.bidirectional:
            hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)  # Concatenate last two hidden states for bidirectional LSTM
        else:
            hidden = hidden[-1]  # Take the last hidden state for unidirectional LSTM

        # 6. Apply Dropout
        dropped = self.dropout(hidden)

        # 7. Fully Connected Layer
        prediction = self.fc(dropped)  # Shape: [batch_size, output_dim]

        return prediction


## Training Code (do not modify)

In [76]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


def train(dataloader, model, criterion, optimizer, scheduler, device):
    model.train()
    epoch_losses = []
    epoch_accs = []

    for batch in tqdm.tqdm(dataloader, desc='training...', file=sys.stdout):
        ids = batch['ids'].to(device)
        length = batch['length']
        label = batch['label'].to(device)
        prediction = model(ids, length)
        loss = criterion(prediction, label)
        accuracy = get_accuracy(prediction, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_losses.append(loss.item())
        epoch_accs.append(accuracy.item())
        scheduler.step()

    return epoch_losses, epoch_accs

def evaluate(dataloader, model, criterion, device):
    model.eval()
    epoch_losses = []
    epoch_accs = []

    with torch.no_grad():
        for batch in tqdm.tqdm(dataloader, desc='evaluating...', file=sys.stdout):
            ids = batch['ids'].to(device)
            length = batch['length']
            label = batch['label'].to(device)
            prediction = model(ids, length)
            loss = criterion(prediction, label)
            accuracy = get_accuracy(prediction, label)
            epoch_losses.append(loss.item())
            epoch_accs.append(accuracy.item())

    return epoch_losses, epoch_accs

def get_accuracy(prediction, label):
    batch_size, _ = prediction.shape
    predicted_classes = prediction.argmax(dim=-1)
    correct_predictions = predicted_classes.eq(label).sum()
    accuracy = correct_predictions / batch_size
    return accuracy

def predict_sentiment(text, model, vocab, device):
    tokens = tokenize(vocab, text)
    ids = [vocab[t] if t in vocab else UNK_INDEX for t in tokens]
    length = torch.LongTensor([len(ids)])
    tensor = torch.LongTensor(ids).unsqueeze(dim=0).to(device)
    prediction = model(tensor, length).squeeze(dim=0)
    probability = torch.softmax(prediction, dim=-1)
    predicted_class = prediction.argmax(dim=-1).item()
    predicted_probability = probability[predicted_class].item()
    return predicted_class, predicted_probability

### Learning rate warmup. DO NOT TOUCH!

In [77]:
class ConstantWithWarmup(torch.optim.lr_scheduler._LRScheduler):
    def __init__(
        self,
        optimizer,
        num_warmup_steps: int,
    ):
        self.num_warmup_steps = num_warmup_steps
        super().__init__(optimizer)

    def get_lr(self):
        if self._step_count <= self.num_warmup_steps:
            # warmup
            scale = 1.0 - (self.num_warmup_steps - self._step_count) / self.num_warmup_steps
            lr = [base_lr * scale for base_lr in self.base_lrs]
            self.last_lr = lr
        else:
            lr = self.base_lrs
        return lr

### Implement the training / validation iteration here.

In [78]:
def train_and_test_model_with_hparams(hparams, model_type="lstm", **kwargs):
    # Seeding. DO NOT TOUCH! DO NOT TOUCH hparams.SEED!
    # Set the random seeds.
    torch.manual_seed(hparams.SEED)
    random.seed(hparams.SEED)
    np.random.seed(hparams.SEED)

    x_train, x_valid, x_test, y_train, y_valid, y_test = load_imdb()
    vocab = build_vocab(x_train, hparams=hparams)
    vocab_size = len(vocab)
    print(f'Length of vocabulary is {vocab_size}')

    train_data = IMDB(x_train, y_train, vocab, hparams.MAX_LENGTH)
    valid_data = IMDB(x_valid, y_valid, vocab, hparams.MAX_LENGTH)
    test_data = IMDB(x_test, y_test, vocab, hparams.MAX_LENGTH)

    collate = functools.partial(collate_fn, pad_index=hparams.PAD_INDEX)

    train_dataloader = torch.utils.data.DataLoader(
        train_data, batch_size=hparams.BATCH_SIZE, collate_fn=collate, shuffle=True)
    valid_dataloader = torch.utils.data.DataLoader(
        valid_data, batch_size=hparams.BATCH_SIZE, collate_fn=collate)
    test_dataloader = torch.utils.data.DataLoader(
        test_data, batch_size=hparams.BATCH_SIZE, collate_fn=collate)

    # Model

    model = LSTM(
            vocab_size,
            hparams.EMBEDDING_DIM,
            hparams.HIDDEN_DIM,
            hparams.OUTPUT_DIM,
            hparams.N_LAYERS,
            hparams.DROPOUT_RATE,
            hparams.PAD_INDEX,
            hparams.BIDIRECTIONAL,
            **kwargs)
    
    num_params = count_parameters(model)
    print(f'The model has {num_params:,} trainable parameters')


    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)

    # DO NOT TOUCH optimizer-specific hyperparameters! (e.g., eps, momentum)
    # DO NOT change optimizer implementations!

    optimizer = optim.Adam(model.parameters(), lr=hparams.LR, weight_decay=hparams.WD, eps=1e-6)

    criterion = nn.CrossEntropyLoss()
    criterion = criterion.to(device)

    # Start training
    best_valid_loss = float('inf')

    # Warmup Scheduler. DO NOT TOUCH!
    WARMUP_STEPS = 200
    lr_scheduler = ConstantWithWarmup(optimizer, WARMUP_STEPS)

    for epoch in range(hparams.N_EPOCHS):

        # Your code: implement the training process and save the best model.

        # Training
        model.train()
        train_losses = []
        train_accuracies = []
        for batch in train_dataloader:
            ids = batch['ids'].to(device)
            lengths = batch['length'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()

            outputs = model(ids, lengths)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()
            lr_scheduler.step()

            train_losses.append(loss.item())

            # Calculate accuracy
            predictions = outputs.argmax(dim=1)
            correct = (predictions == labels).float()
            acc = correct.sum() / len(correct)
            train_accuracies.append(acc.item())

        epoch_train_loss = np.mean(train_losses)
        epoch_train_acc = np.mean(train_accuracies)

        # Validation
        model.eval()
        valid_losses = []
        valid_accuracies = []
        with torch.no_grad():
            for batch in valid_dataloader:
                ids = batch['ids'].to(device)
                lengths = batch['length'].to(device)
                labels = batch['label'].to(device)

                outputs = model(ids, lengths)
                loss = criterion(outputs, labels)
                valid_losses.append(loss.item())

                # Calculate accuracy
                predictions = outputs.argmax(dim=1)
                correct = (predictions == labels).float()
                acc = correct.sum() / len(correct)
                valid_accuracies.append(acc.item())

        epoch_valid_loss = np.mean(valid_losses)
        epoch_valid_acc = np.mean(valid_accuracies)

        # Save the model that achieves the smallest validation loss.
        if epoch_valid_loss < best_valid_loss:
            best_valid_loss = epoch_valid_loss
            # Save the best model
            torch.save(model.state_dict(), 'best_model.pt')

        print(f'epoch: {epoch+1}')
        print(f'train_loss: {epoch_train_loss:.3f}, train_acc: {epoch_train_acc:.3f}')
        print(f'valid_loss: {epoch_valid_loss:.3f}, valid_acc: {epoch_valid_acc:.3f}')

    # Load the best model's weights.
    model.load_state_dict(torch.load('best_model.pt'))

    # Evaluate test loss on testing dataset (NOT Validation)
    model.eval()
    test_losses = []
    test_accuracies = []
    with torch.no_grad():
        for batch in test_dataloader:
            ids = batch['ids'].to(device)
            lengths = batch['length'].to(device)
            labels = batch['label'].to(device)

            outputs = model(ids, lengths)
            loss = criterion(outputs, labels)
            test_losses.append(loss.item())

            # Calculate accuracy
            predictions = outputs.argmax(dim=1)
            correct = (predictions == labels).float()
            acc = correct.sum() / len(correct)
            test_accuracies.append(acc.item())

    epoch_test_loss = np.mean(test_losses)
    epoch_test_acc = np.mean(test_accuracies)
    print(f'test_loss: {epoch_test_loss:.3f}, test_acc: {epoch_test_acc:.3f}')

    # Select one of the entries in test set and predict its sentiment, print out the text, prediction and the probability.
    idx = random.randint(0, len(test_data) - 1)
    sample = test_data[idx]
    text = x_test.iloc[idx]
    label = y_test.iloc[idx]

    model.eval()
    with torch.no_grad():
        ids = torch.LongTensor(sample['ids']).unsqueeze(0).to(device)
        length = torch.LongTensor([sample['length']]).to(device)
        output = model(ids, length)
        probs = torch.softmax(output, dim=1)
        prediction = probs.argmax(dim=1).item()
        probability = probs[0][prediction].item()

        sentiment = 'positive' if prediction == 1 else 'negative'
        print(f'\nSample Text: {text}')
        print(f'Actual Sentiment: {label}')
        print(f'Predicted Sentiment: {sentiment}, Probability: {probability:.4f}')

    # Free memory for later usage.
    del model
    torch.cuda.empty_cache()
    return {
        'num_params': num_params,
        "test_loss": epoch_test_loss,
        "test_acc": epoch_test_acc,
    }

### Lab 1 (f): Train LSTM model .

Train the model with default hyperparameter settings.

In [80]:
org_hyperparams = HyperParams()
_ = train_and_test_model_with_hparams(org_hyperparams, "lstm_1layer_base_adam_e32_h100")

shape of train data is (35000,)
shape of test data is (10000,)
shape of valid data is (5000,)
Length of vocabulary is 38034
The model has 79,436 trainable parameters
epoch: 1
train_loss: 0.689, train_acc: 0.541
valid_loss: 0.676, valid_acc: 0.563
epoch: 2
train_loss: 0.558, train_acc: 0.716
valid_loss: 0.389, valid_acc: 0.835
epoch: 3
train_loss: 0.314, train_acc: 0.871
valid_loss: 0.363, valid_acc: 0.850
epoch: 4
train_loss: 0.220, train_acc: 0.917
valid_loss: 0.349, valid_acc: 0.873
epoch: 5
train_loss: 0.172, train_acc: 0.940
valid_loss: 0.401, valid_acc: 0.862


  model.load_state_dict(torch.load('best_model.pt'))


test_loss: 0.320, test_acc: 0.873

Sample Text: Deanna Durbin really did save Universal from bankruptcy and enabled it to remain a big studio. By the mid 30s most of the big directors that had been at Universal eg Milestone, Browning and Wyler had gone. Only James Whale remained but his prestigious horror films were behind him. Deanna and Judy Garland appeared in a short "Every Sunday" and initially Garland was suggested for the role of Penny in "Three Smart Girls". When Garland was unavailable Universal switched to Durbin. Initially she had been definitely a supporting player but her potential was so vivid that the script was rewritten to make her the star. Directed by Henry Koster the film had a European touch.<br /><br />The film starts with a beautiful panorama of a lake in "Switzerland". The "three smart girls" of the title - three sisters, Joan (Nan Grey), Kay (Barbara Read) and Penny (Deanna Durbin) are sailing with Penny giving her glorious voice to "My Heart is Singing". All i