# LSTM Model
ECE590 Final


In [None]:
import functools
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tqdm
import nltk
from sklearn.model_selection import train_test_split
from nltk.corpus import stopwords
from collections import Counter
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
nltk.download('stopwords')
torch.backends.cudnn.benchmark = True

import os
os.makedirs("llm_models", exist_ok=True)

In [None]:
!pip install datasets #needed for dataset

## Load Data

In [None]:
from datasets import load_dataset

dataset = load_dataset("dair-ai/emotion")

In [None]:
dataset

In [5]:
train_x = dataset['train']['text']
train_y = dataset['train']['label']
test_x = dataset['test']['text']
test_y = dataset['test']['label']
val_x = dataset['validation']['text']
val_y = dataset['validation']['label']

In [None]:
print(len(train_x), len(test_x), len(val_x))

## Function to build a vocabulary based on the training corpus.

In [6]:
def build_vocab(x_train:list, min_freq: int=20) -> dict:
    """
    build a vocabulary based on the training corpus.
    :param x_train:  List. Each sample in the list is a string of text.
    :param min_freq: Int. The frequency threshold for selecting words.
    :return: dictionary {word:index}
    """


    word_list = []
    for sent in x_train:
        for word in sent.lower().split():
            word_list.append(word)

    corpus = Counter(word_list)
    corpus_ = [word for word, freq in corpus.items() if freq >= min_freq]
    # creating a dict
    vocab = {w:i+2 for i, w in enumerate(corpus_)}

    #accomdate for padding and OOV tokens
    vocab['<pad'] = 0
    vocab['<unk>'] = 1
    return vocab


## Tokenize Function.
For each word in example, find its index in the vocabulary.
Return a list of int that represents the indices of words in the example.

In [7]:
def tokenize(vocab: dict, example: str)-> list:
    """
    :param vocab: dict, the vocabulary.
    :param example: a string of text.
    :return: a list of token indices.
    """
    token_inds = []

    for word in example.lower().split():
      try:
        token_inds.append(vocab[word])
      except:
        token_inds.append(1) #unknown index
    return token_inds

In [8]:
#EXAMPLE
vocab = build_vocab(train_x)
tokenize(vocab, "i feel burdened to share it")

[2, 4, 43, 12, 268, 36]

## Data Class - initialize and getitem
- get item returns a dict of the tokenized review, the length of the review, and its corresponding label

In [9]:
class Data(Dataset):
    def __init__(self, x, y, vocab, max_length=300) -> None:
        """
        :param x: list of reviews
        :param y: list of labels
        :param vocab: vocabulary dictionary {word:index}.
        :param max_length: the maximum sequence length.
        """
        self.x = x
        self.y = y
        self.vocab = vocab
        self.max_length = max_length

    def __getitem__(self, idx: int):
        """
        Return the tokenized review and label by the given index.
        :param idx: index of the sample.
        :return: a dictionary containing three keys: 'ids', 'length', 'label'
        """
        token_ids = tokenize(self.vocab, self.x[idx])
        if self.max_length:
            token_ids = token_ids[:self.max_length]

        return {"ids": token_ids, "length": len(token_ids), "label": self.y[idx]}


    def __len__(self) -> int:
        return len(self.x)

def collate(batch, pad_index):
    batch_ids = [torch.LongTensor(i['ids']) for i in batch]
    batch_ids = nn.utils.rnn.pad_sequence(batch_ids, padding_value=pad_index, batch_first=True)
    batch_length = torch.Tensor([i['length'] for i in batch])

    batch_label = torch.LongTensor([i['label'] for i in batch])
    batch = {'ids': batch_ids, 'length': batch_length, 'label': batch_label}
    return batch

collate_fn = collate

## LSTM Model

In [10]:
class LSTM(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        embedding_dim: int,
        hidden_dim: int,
        output_dim: int,
        n_layers: int,
        dropout_rate: float
        ):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout_rate, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim,)
        self.dropout = nn.Dropout(dropout_rate)


    def forward(self, ids:torch.Tensor, length:torch.Tensor):
        """
        Feed the given token ids to the model.
        :param ids: [batch size, seq len] batch of token ids.
        :param length: [batch size] batch of length of the token ids.
        :return: prediction of size [batch size, output dim].
        """
        embedded = self.dropout(self.embedding(ids))
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, length, batch_first=True,
                                                            enforce_sorted=False)
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        hidden = self.dropout(hidden[-1])
        out = self.fc(hidden)
        return out

##Training, Validation, and Testing

In [11]:
def evaluate(dataloader, model, criterion, device):
    model.eval()
    epoch_losses = []
    epoch_accs = []

    with torch.no_grad():
        for batch in tqdm.tqdm(dataloader, desc='evaluating...', file=sys.stdout):
            ids = batch['ids'].to(device)
            length = batch['length']
            label = batch['label'].to(device)
            prediction = model(ids, length)
            loss = criterion(prediction, label)
            accuracy = get_accuracy(prediction, label)
            epoch_losses.append(loss.item())
            epoch_accs.append(accuracy.item())

    return epoch_losses, epoch_accs

def get_accuracy(prediction, label):
    batch_size, _ = prediction.shape
    predicted_classes = prediction.argmax(dim=-1)
    correct_predictions = predicted_classes.eq(label).sum()
    accuracy = correct_predictions / batch_size
    return accuracy

In [26]:
def train_and_test_model(vocab):
    vocab_size = len(vocab)
    print("Vocab Size = ", vocab_size)

    train_data = Data(train_x, train_y, vocab, max_length = 300)
    valid_data = Data(val_x, val_y, vocab, max_length = 300)
    test_data = Data(test_x, test_y, vocab, max_length = 300)
    batch_size = 32
    collate = functools.partial(collate_fn, pad_index=0)
    train_dataloader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, collate_fn=collate, shuffle=True)
    valid_dataloader = torch.utils.data.DataLoader(valid_data, batch_size=batch_size, collate_fn=collate)
    test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=batch_size, collate_fn=collate)

    # Create Model
    model = LSTM(
        vocab_size,
        embedding_dim=50,
        hidden_dim=10,
        output_dim=6,
        n_layers=1,
        dropout_rate=.2,
        )

    num_params = (sum(p.numel() for p in model.parameters() if p.requires_grad))
    print(f'The model has {num_params:,} trainable parameters')
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    optimizer = optim.Adam(model.parameters(), lr = .001) #optim.Adam(model.parameters(), lr=.0001)
    criterion = nn.CrossEntropyLoss().to(device)


    #check accuracy, loss before training - should be about 1/6 acc
    valid_loss, valid_acc = evaluate(valid_dataloader, model, criterion, device)
    epoch_valid_loss = np.mean(valid_loss)
    epoch_valid_acc = np.mean(valid_acc)
    print("before training:", epoch_valid_loss, epoch_valid_acc)


    # Start training
    all_train_l = []
    all_val_l = []
    best_valid_loss = float('inf')
    train_losses = []
    train_accs = []
    valid_losses = []
    valid_accs = []
    epochs = 50
    for epoch in range(epochs):

        model.train()
        train_losses = []
        train_accs = []

        for batch in tqdm.tqdm(train_dataloader, desc='training...', file=sys.stdout):
            ids = batch['ids'].to(device)
            length = batch['length']
            label = batch['label'].to(device)
            prediction = model(ids, length)
            loss = criterion(prediction, label)
            accuracy = get_accuracy(prediction, label)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())
            train_accs.append(accuracy.item())

        epoch_train_loss = np.mean(train_losses)
        all_train_l.append(epoch_train_loss)
        epoch_train_acc = np.mean(train_accs)


        valid_loss, valid_acc = evaluate(valid_dataloader, model, criterion, device)
        epoch_valid_loss = np.mean(valid_loss)
        all_val_l.append(epoch_valid_loss)
        epoch_valid_acc = np.mean(valid_acc)

        print(f'epoch: {epoch+1}')
        print(f'train_loss: {epoch_train_loss:.3f}, train_acc: {epoch_train_acc:.3f}')
        print(f'valid_loss: {epoch_valid_loss:.3f}, valid_acc: {epoch_valid_acc:.3f}')

    test_loss, test_acc = evaluate(test_dataloader, model, criterion, device)
    epoch_test_loss = np.mean(test_loss)
    epoch_test_acc = np.mean(test_acc)
    print(f'test_loss: {epoch_test_loss:.3f}, test_acc: {epoch_test_acc:.3f}')

    return model,all_train_l, all_val_l

In [27]:
vocab = build_vocab(train_x, min_freq= 10)

In [None]:
model, train_loss, val_loss = train_and_test_model(vocab)

In [None]:
#PLOT LOSSES
plt.plot([i for i in range(len(train_loss))], train_loss, label = "train loss")
plt.plot([i for i in range(len(val_loss))], val_loss, label = "val loss")
plt.xlabel("Epoch")
plt.ylabel("Cross Entropy Loss")
plt.title("LSTM: Cross Entropy Loss by Epoch")
plt.legend()


In [None]:
## COMPUTE CONFUSION MATRIX
test_data = Data(test_x, test_y, vocab, max_length = 300)
batch_size = 32
collate = functools.partial(collate_fn, pad_index=0)
test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=batch_size, collate_fn=collate)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
criterion = nn.CrossEntropyLoss().to(device)

model.eval()
preds = []
#test_loss, test_acc = evaluate(test_dataloader, model, criterion, device)
with torch.no_grad():
  for batch in tqdm.tqdm(test_dataloader, desc='evaluating...', file=sys.stdout):
      ids = batch['ids'].to(device)
      length = batch['length']
      label = batch['label'].to(device)
      prediction = model(ids, length)
      batch_size, _ = prediction.shape
      predicted_classes = prediction.argmax(dim=-1)
      preds+=predicted_classes.cpu().numpy().tolist()

from sklearn.metrics import confusion_matrix
print(confusion_matrix(test_y, preds,normalize='true'))


# End