# Implement and train a LSTM for sentiment analysis

## Step 0: set up the environment

In [1]:
import functools
import sys
import numpy as np
import pandas as pd
import random
import re
import matplotlib.pyplot as plt
import tqdm
import nltk
from sklearn.model_selection import train_test_split
from nltk.corpus import stopwords
from collections import Counter
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset

nltk.download('stopwords')

torch.backends.cudnn.benchmark = True

import os
os.makedirs("resources", exist_ok=True)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


### Hyperparameters. No need to touch.

In [2]:
class HyperParams:
    def __init__(self):
        # Constance hyperparameters. They have been tested and don't need to be tuned.
        self.PAD_INDEX = 0
        self.UNK_INDEX = 1
        self.PAD_TOKEN = '<pad>'
        self.UNK_TOKEN = '<unk>'
        self.STOP_WORDS = set(stopwords.words('english'))
        self.MAX_LENGTH = 256
        self.BATCH_SIZE = 96
        self.EMBEDDING_DIM = 1
        self.HIDDEN_DIM = 100
        self.OUTPUT_DIM = 2
        self.N_LAYERS = 1
        self.DROPOUT_RATE = 0.0
        self.LR = 0.001
        self.N_EPOCHS = 5
        self.WD = 0
        self.SEED = 12
        self.BIDIRECTIONAL = False

In [3]:
#############################################
# GPU check
device = 'cuda' if torch.cuda.is_available() else 'cpu'
if device =='cuda':
    print("Run on GPU...")
else:
    print("Run on CPU...")
#############################################

Run on GPU...


In [4]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

import os


Mounted at /content/drive


## Lab 1(a) Implement your own data loader function.  
First, you need to read the data from the dataset file on the local disk.
Then, split the dataset into three sets: train, validation and test by 7:1:2 ratio.
Finally return x_train, x_valid, x_test, y_train, y_valid, y_test where x represents reviews and y represent labels.  

In [5]:
#def load_imdb(base_csv:str = './IMDBDataset.csv'):

def load_imdb(base_csv:str = '/content/drive/MyDrive/2023 - Duke/2025-01/ECE661_ComputerEngineeringMachineLearning&DeepNeuralNets/Assigments/HW3/IMDBDataset.csv'):
    """
    Load the IMDB dataset
    :param base_csv: the path of the dataset file.
    :return: train, validation and test set.
    """
    # Add your code here.

    df = pd.read_csv(base_csv)
    X = df['review']
    y = df["sentiment"]
    #y = np.where(df['sentiment'] == 'positive', 1, 0)

    x_train, x_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
    x_valid, x_test, y_valid, y_test = train_test_split(x_temp, y_temp, test_size=(0.66666666666), random_state=42)

    x_train = x_train.reset_index(drop=True)
    x_valid = x_valid.reset_index(drop=True)
    x_test = x_test.reset_index(drop=True)
    y_train = y_train.reset_index(drop=True)
    y_valid = y_valid.reset_index(drop=True)
    y_test = y_test.reset_index(drop=True)

    print(f'shape of train data is {x_train.shape}')
    print(f'shape of test data is {x_test.shape}')
    print(f'shape of valid data is {x_valid.shape}')
    return x_train, x_valid, x_test, y_train, y_valid, y_test



In [6]:
data_path = '/content/drive/MyDrive/2023 - Duke/2025-01/ECE661_ComputerEngineeringMachineLearning&DeepNeuralNets/Assigments/HW3/IMDBDataset.csv'

X_train, X_valid, X_test, y_train, y_valid, y_test = load_imdb(data_path)


shape of train data is (35000,)
shape of test data is (10000,)
shape of valid data is (5000,)


## Lab 1(b): Implement your function to build a vocabulary based on the training corpus.
Implement the build_vocab function to build a vocabulary based on the training corpus.
You should first compute the frequency of all the words in the training corpus. Remove the words
that are in the STOP_WORDS. Then filter the words by their frequency (≥ min_freq) and finally
generate a corpus variable that contains a list of words.

In [7]:
def build_vocab(x_train:list, min_freq: int=5, hparams=None) -> dict:
    """
    build a vocabulary based on the training corpus.
    :param x_train:  List. The training corpus. Each sample in the list is a string of text.
    :param min_freq: Int. The frequency threshold for selecting words.
    :return: dictionary {word:index}
    """
    # Add your code here. Your code should assign corpus with a list of words.

    corpus = Counter()

    for review in x_train:
        words = review.split()
        words = [word.lower() for word in words]
        words = [word for word in words if word not in hparams.STOP_WORDS]

        corpus.update(words)

    corpus_ = [word for word, freq in corpus.items() if freq >= min_freq]
    # creating a dict
    vocab = {w:i+2 for i, w in enumerate(corpus_)}
    vocab[hparams.PAD_TOKEN] = hparams.PAD_INDEX
    vocab[hparams.UNK_TOKEN] = hparams.UNK_INDEX
    return vocab


In [8]:
import itertools
hparams = HyperParams()

vocab = build_vocab(X_train, min_freq=5, hparams=hparams)
vocab

for key, value in itertools.islice(vocab.items(), 5):
    print(key, value)

much 2
love 3
trains, 4
stomach 5
movie. 6


## Lab 1(c): Implement your tokenize function.
For each word, find its index in the vocabulary.
Return a list of int that represents the indices of words in the example.

In [9]:
def tokenize(vocab: dict, example: str)-> list:
    """
    Tokenize the give example string into a list of token indices.
    :param vocab: dict, the vocabulary.
    :param example: a string of text.
    :return: a list of token indices.
    """
    # Your code here.

    words = example.split()
    words = [word.lower() for word in words]
    words = [word for word in words if word not in hparams.STOP_WORDS]

    token_indices = [vocab.get(word, vocab[hparams.UNK_TOKEN]) for word in words]


    return token_indices

In [10]:
'''
vocab
{'much': 2,
 'love': 3,
 'trains,': 4,
 'stomach': 5,
 'movie.': 6,
 'premise': 7,
 'one': 8,
 ....
'''

example_text = "I love the premise of the movie."

token_indices = tokenize(vocab, example_text)
token_indices

[3, 7, 6]

## Lab 1 (d): Implement the __getitem__ function. Given an index i, you should return the i-th review and label.
The review is originally a string. Please tokenize it into a sequence of token indices.
Use the max_length parameter to truncate the sequence so that it contains at most max_length tokens.
Convert the label string ('positive'/'negative') to a binary index. 'positive' is 1 and 'negative' is 0.
Return a dictionary containing three keys: 'ids', 'length', 'label' which represent the list of token ids, the length of the sequence, the binary label.

In [11]:
class IMDB(Dataset):
    def __init__(self, x, y, vocab, max_length=256) -> None:
        """
        :param x: list of reviews
        :param y: list of labels
        :param vocab: vocabulary dictionary {word:index}.
        :param max_length: the maximum sequence length.
        """
        self.x = x
        self.y = y
        self.vocab = vocab
        self.max_length = max_length

    def __getitem__(self, idx: int):
        """
        Return the tokenized review and label by the given index.
        :param idx: index of the sample.
        :return: a dictionary containing three keys: 'ids', 'length', 'label' which represent the list of token ids, the length of the sequence, the binary label.
        """
        # Add your code here.
        review = self.x[idx]
        label = self.y[idx]

        token_indices = tokenize(self.vocab, review)
        token_indices = token_indices[:self.max_length]
        label = 1 if label == 'positive' else 0

        return {'ids': token_indices, 'length': len(token_indices), 'label': label}


    def __len__(self) -> int:
        return len(self.x)


def collate(batch, pad_index):
    batch_ids = [torch.LongTensor(i['ids']) for i in batch]
    batch_ids = nn.utils.rnn.pad_sequence(batch_ids, padding_value=pad_index, batch_first=True)
    batch_length = torch.Tensor([i['length'] for i in batch])
    batch_label = torch.LongTensor([i['label'] for i in batch])
    batch = {'ids': batch_ids, 'length': batch_length, 'label': batch_label}
    return batch

collate_fn = collate

## Lab 1 (e): Implement the LSTM model for sentiment analysis.
Q(a): Implement the initialization function.
Your task is to create the model by stacking several necessary layers including an embedding layer, a lstm cell, a linear layer, and a dropout layer.
You can call functions from Pytorch's nn library. For example, nn.Embedding, nn.LSTM, nn.Linear.<br>
Q(b): Implement the forward function.
    Decide where to apply dropout.
    The sequences in the batch have different lengths. Write/call a function to pad the sequences into the same length.
    Apply a fully-connected (fc) layer to the output of the LSTM layer.
    Return the output features which is of size [batch size, output dim].

In [12]:
def init_weights(m):
    if isinstance(m, nn.Embedding):
        nn.init.xavier_normal_(m.weight)
    elif isinstance(m, nn.Linear):
        nn.init.xavier_normal_(m.weight)
        nn.init.zeros_(m.bias)
    elif isinstance(m, nn.LSTM) or isinstance(m, nn.GRU):
        for name, param in m.named_parameters():
            if 'bias' in name:
                nn.init.zeros_(param)
            elif 'weight' in name:
                nn.init.orthogonal_(param)

class LSTM(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        embedding_dim: int,
        hidden_dim: int,
        output_dim: int,
        n_layers: int,
        dropout_rate: float,
        pad_index: int,
        bidirectional: bool,
        **kwargs):
        """
        Create a LSTM model for classification.
        :param vocab_size: size of the vocabulary
        :param embedding_dim: dimension of embeddings
        :param hidden_dim: dimension of hidden features
        :param output_dim: dimension of the output layer which equals to the number of labels.
        :param n_layers: number of layers.
        :param dropout_rate: dropout rate.
        :param pad_index: index of the padding token.we
        """
        super().__init__()
        # Add your code here. Initializing each layer by the given arguments. (you can use nn.LSTM, nn.Embedding, nn.Linear, nn.Dropout)
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_index)

        self.lstm = nn.LSTM(
                  embedding_dim,
                  hidden_dim,
                  num_layers=n_layers,
                  bidirectional=bidirectional,
                  dropout=dropout_rate if n_layers > 1 else 0,
                  batch_first=True
              )


        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout_rate)

        self.pad_index = pad_index

        if "weight_init_fn" not in kwargs:
            self.apply(init_weights)
        else:
            self.apply(kwargs["weight_init_fn"])


    def forward(self, ids:torch.Tensor, length:torch.Tensor):
        """
        Feed the given token ids to the model.
        :param ids: [batch size, seq len] batch of token ids.
        :param length: [batch size] batch of length of the token ids.
        :return: prediction of size [batch size, output dim].
        """
        # Add your code here.

        embedded = self.dropout(self.embedding(ids))
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, length.cpu().int(), batch_first=True, enforce_sorted=False)
        packed_output, (hidden, cell) = self.lstm(packed_embedded)

        if self.lstm.bidirectional:
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        else:
            hidden = hidden[-1,:,:]

        hidden = self.dropout(hidden)
        prediction = self.fc(hidden)

        return prediction

## Training Code (do not modify)

In [13]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


def train(dataloader, model, criterion, optimizer, scheduler, device):
    model.train()
    epoch_losses = []
    epoch_accs = []

    for batch in tqdm.tqdm(dataloader, desc='training...', file=sys.stdout):
        ids = batch['ids'].to(device)
        length = batch['length']
        label = batch['label'].to(device)
        prediction = model(ids, length)
        loss = criterion(prediction, label)
        accuracy = get_accuracy(prediction, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_losses.append(loss.item())
        epoch_accs.append(accuracy.item())
        scheduler.step()

    return epoch_losses, epoch_accs

def evaluate(dataloader, model, criterion, device):
    model.eval()
    epoch_losses = []
    epoch_accs = []

    with torch.no_grad():
        for batch in tqdm.tqdm(dataloader, desc='evaluating...', file=sys.stdout):
            ids = batch['ids'].to(device)
            length = batch['length']
            label = batch['label'].to(device)
            prediction = model(ids, length)
            loss = criterion(prediction, label)
            accuracy = get_accuracy(prediction, label)
            epoch_losses.append(loss.item())
            epoch_accs.append(accuracy.item())

    return epoch_losses, epoch_accs

def get_accuracy(prediction, label):
    batch_size, _ = prediction.shape
    predicted_classes = prediction.argmax(dim=-1)
    correct_predictions = predicted_classes.eq(label).sum()
    accuracy = correct_predictions / batch_size
    return accuracy

def predict_sentiment(text, model, vocab, device):
    UNK_INDEX = 1
    tokens = tokenize(vocab, text)
    ids = [vocab[t] if t in vocab else UNK_INDEX for t in tokens]
    length = torch.LongTensor([len(ids)])
    tensor = torch.LongTensor(ids).unsqueeze(dim=0).to(device)
    prediction = model(tensor, length).squeeze(dim=0)
    probability = torch.softmax(prediction, dim=-1)
    predicted_class = prediction.argmax(dim=-1).item()
    predicted_probability = probability[predicted_class].item()
    return predicted_class, predicted_probability

### Learning rate warmup. DO NOT TOUCH!

In [14]:
class ConstantWithWarmup(torch.optim.lr_scheduler._LRScheduler):
    def __init__(
        self,
        optimizer,
        num_warmup_steps: int,
    ):
        self.num_warmup_steps = num_warmup_steps
        super().__init__(optimizer)

    def get_lr(self):
        if self._step_count <= self.num_warmup_steps:
            # warmup
            scale = 1.0 - (self.num_warmup_steps - self._step_count) / self.num_warmup_steps
            lr = [base_lr * scale for base_lr in self.base_lrs]
            self.last_lr = lr
        else:
            lr = self.base_lrs
        return lr

### Implement the training / validation iteration here.

In [15]:
def train_and_test_model_with_hparams(hparams, model_type="lstm", **kwargs):
    # Seeding. DO NOT TOUCH! DO NOT TOUCH hparams.SEED!
    # Set the random seeds.
    torch.manual_seed(hparams.SEED)
    random.seed(hparams.SEED)
    np.random.seed(hparams.SEED)

    x_train, x_valid, x_test, y_train, y_valid, y_test = load_imdb()
    vocab = build_vocab(x_train, hparams=hparams)
    vocab_size = len(vocab)
    print(f'Length of vocabulary is {vocab_size}')

    train_data = IMDB(x_train, y_train, vocab, hparams.MAX_LENGTH)
    valid_data = IMDB(x_valid, y_valid, vocab, hparams.MAX_LENGTH)
    test_data = IMDB(x_test, y_test, vocab, hparams.MAX_LENGTH)

    collate = functools.partial(collate_fn, pad_index=hparams.PAD_INDEX)

    train_dataloader = torch.utils.data.DataLoader(
        train_data, batch_size=hparams.BATCH_SIZE, collate_fn=collate, shuffle=True)
    valid_dataloader = torch.utils.data.DataLoader(
        valid_data, batch_size=hparams.BATCH_SIZE, collate_fn=collate)
    test_dataloader = torch.utils.data.DataLoader(
        test_data, batch_size=hparams.BATCH_SIZE, collate_fn=collate)

    # Model

    model = LSTM(
            vocab_size,
            hparams.EMBEDDING_DIM,
            hparams.HIDDEN_DIM,
            hparams.OUTPUT_DIM,
            hparams.N_LAYERS,
            hparams.DROPOUT_RATE,
            hparams.PAD_INDEX,
            hparams.BIDIRECTIONAL,
            **kwargs)

    num_params = count_parameters(model)
    print(f'The model has {num_params:,} trainable parameters')


    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)

    # DO NOT TOUCH optimizer-specific hyperparameters! (e.g., eps, momentum)
    # DO NOT change optimizer implementations!

    optimizer = optim.Adam(model.parameters(), lr=hparams.LR, weight_decay=hparams.WD, eps=1e-6)


    criterion = nn.CrossEntropyLoss()
    criterion = criterion.to(device)

    # Start training
    best_valid_loss = float('inf')

    # Warmup Scheduler. DO NOT TOUCH!
    WARMUP_STEPS = 200
    lr_scheduler = ConstantWithWarmup(optimizer, WARMUP_STEPS)

    for epoch in range(hparams.N_EPOCHS):

        # Your code: implement the training process and save the best model.
        train_loss, train_acc = train(train_dataloader, model, criterion, optimizer, lr_scheduler, device)
        valid_loss, valid_acc = evaluate(valid_dataloader, model, criterion, device)



        epoch_train_loss = np.mean(train_loss)
        epoch_train_acc = np.mean(train_acc)
        epoch_valid_loss = np.mean(valid_loss)
        epoch_valid_acc = np.mean(valid_acc)

        # Save the model that achieves the smallest validation loss.
        if epoch_valid_loss < best_valid_loss:
            # Your code: save the best model somewhere (no need to submit it to Sakai)
            best_valid_loss = epoch_valid_loss
            torch.save(model.state_dict(), f'{model_type}.pt')


        print(f'epoch: {epoch+1}')
        print(f'train_loss: {epoch_train_loss:.3f}, train_acc: {epoch_train_acc:.3f}')
        print(f'valid_loss: {epoch_valid_loss:.3f}, valid_acc: {epoch_valid_acc:.3f}')


    # Your Code: Load the best model's weights.
    model.load_state_dict(torch.load(f'{model_type}.pt'))

    # Your Code: evaluate test loss on testing dataset (NOT Validation)
    test_loss, test_acc = evaluate(test_dataloader, model, criterion, device)

    epoch_test_loss = np.mean(test_loss)
    epoch_test_acc = np.mean(test_acc)
    print(f'test_loss: {epoch_test_loss:.3f}, test_acc: {epoch_test_acc:.3f}')

    # Your Code: select one of the entries in test set and predict its sentiment, print out the text, prediction and the probability.
    sample = x_test.iloc[0]
    predicted_class, predicted_probability = predict_sentiment(sample, model, vocab, device)
    print(f'Text: {sample}')
    print(f'Predicted sentiment: {"Positive" if predicted_class == 1 else "Negative"}')
    print(f'Confidence: {predicted_probability:.4f}')


    # Free memory for later usage.
    del model
    torch.cuda.empty_cache()
    return {
        'num_params': num_params,
        "test_loss": epoch_test_loss,
        "test_acc": epoch_test_acc,
    }

### Lab 1 (f): Train LSTM model .

Train the model with default hyperparameter settings.

In [16]:
org_hyperparams = HyperParams()
_ = train_and_test_model_with_hparams(org_hyperparams, "lstm_1layer_base_adam_e32_h100")

shape of train data is (35000,)
shape of test data is (10000,)
shape of valid data is (5000,)
Length of vocabulary is 56473
The model has 97,875 trainable parameters
training...: 100%|██████████| 365/365 [00:09<00:00, 36.74it/s]
evaluating...: 100%|██████████| 53/53 [00:00<00:00, 74.20it/s]
epoch: 1
train_loss: 0.675, train_acc: 0.556
valid_loss: 0.627, valid_acc: 0.627
training...: 100%|██████████| 365/365 [00:09<00:00, 40.10it/s]
evaluating...: 100%|██████████| 53/53 [00:01<00:00, 46.64it/s]
epoch: 2
train_loss: 0.331, train_acc: 0.863
valid_loss: 0.277, valid_acc: 0.889
training...: 100%|██████████| 365/365 [00:09<00:00, 39.09it/s]
evaluating...: 100%|██████████| 53/53 [00:00<00:00, 70.54it/s]
epoch: 3
train_loss: 0.166, train_acc: 0.940
valid_loss: 0.290, valid_acc: 0.889
training...: 100%|██████████| 365/365 [00:10<00:00, 33.31it/s]
evaluating...: 100%|██████████| 53/53 [00:00<00:00, 72.49it/s]
epoch: 4
train_loss: 0.096, train_acc: 0.969
valid_loss: 0.441, valid_acc: 0.875
traini

  model.load_state_dict(torch.load(f'{model_type}.pt'))



epoch: 5
train_loss: 0.054, train_acc: 0.984
valid_loss: 0.465, valid_acc: 0.878
evaluating...: 100%|██████████| 105/105 [00:01<00:00, 71.05it/s]
test_loss: 0.278, test_acc: 0.888
Text: The biggest National Lampoon hit remains "Animal House", and rightly so. It was funny, raucous and good-natured.<br /><br />The exact opposite of every other National Lampoon film. Including "Class Reunion".<br /><br />PLEASE do not be fooled by the inclusion of Stephen Furst ("Flounder") from "Animal House". Or by the fact that John Hughes wrote this jumbled mess. This reunion is about as hilarious as root canal and twice as painful.<br /><br />One star, and that's being generous. Then again, I always thought most of my old classmates were demons, vampires and serial killers, too.
Predicted sentiment: Negative
Confidence: 0.5196
