# Introduction

based on : https://github.com/bentrevett/pytorch-sentiment-analysis

Task : Binary Classification(sentimental analysis)  
Method : Long-Short Term Memory(LSTM)    
Dataset : IMDB

# 0. Set Environment

In [63]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence
import torchtext
torchtext.disable_torchtext_deprecation_warning()
from torchtext.data import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.tensorboard import SummaryWriter

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from pprint import pprint

import subprocess
import os
import sys

import datasets

In [64]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Selected device:", device)

Selected device: cuda


In [65]:
seed = 42

np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True

In [66]:
model_dir = './models/Custom_LSTM_Classifier_model.pth'
pretrained_embedding_dir = './models/Glove_pretrained.pth'

# 1. Data processing

## 1-1. Get Data

In [67]:
train_data, test_data = datasets.load_dataset("imdb", split=["train", "test"])

## 1-2. Tokenize

In [68]:
tokenizer = get_tokenizer("basic_english")

In [69]:
def tokenize_example(example, tokenizer, max_length):
    tokens = tokenizer(example["text"])[:max_length]
    length = len(tokens)
    return {"tokens": tokens, "length": length}

In [70]:
max_length = 256

train_data = train_data.map(
    tokenize_example, fn_kwargs={"tokenizer": tokenizer, "max_length": max_length}
)
test_data = test_data.map(
    tokenize_example, fn_kwargs={"tokenizer": tokenizer, "max_length": max_length}
)

## 1-3. Build Vocab 

In [71]:
min_freq = 5
special_tokens = ["<unk>", "<pad>"]

vocab = build_vocab_from_iterator(train_data['tokens'],
                                  min_freq = min_freq,
                                  specials = special_tokens)

In [72]:
unk_index = vocab["<unk>"]
pad_index = vocab["<pad>"]

vocab.set_default_index(unk_index)

## 1-4. Numericalize Text

In [73]:
def numericalize_example(example, vocab):
    ids = vocab.lookup_indices(example["tokens"])
    return {"ids": ids}

In [74]:
train_data = train_data.map(numericalize_example, fn_kwargs={"vocab": vocab})
test_data = test_data.map(numericalize_example, fn_kwargs={"vocab": vocab})

In [75]:
train_data = train_data.with_format(type="torch", columns=["ids", "label", "length"])
test_data = test_data.with_format(type="torch", columns=["ids", "label", "length"])

## 1-5. Word Embedding

In [76]:
if not os.path.exists(pretrained_embedding_dir):
    vectors = torchtext.vocab.GloVe()
    pretrained_embedding = vectors.get_vecs_by_tokens(vocab.get_itos())
    torch.save(pretrained_embedding, pretrained_embedding_dir)
else:
    pretrained_embedding = torch.load(pretrained_embedding_dir)

## 1-6. Prepare for Data Loading

In [77]:
class CustomDataset(Dataset):
    def __init__(self, data):
        self.data = data
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        return item

In [78]:
train_dataset = CustomDataset(train_data)
test_dataset = CustomDataset(test_data)

In [79]:
def custom_collate_fn(batch):
    
    batch_inputs = [sample['ids'] for sample in batch]
    batch_labels = [sample['label'] for sample in batch]
    
    collate_inputs = pad_sequence(batch_inputs, 
                                  padding_value = pad_index, 
                                  batch_first = True)
    collate_labels = torch.tensor(batch_labels)
    
    return collate_inputs, collate_labels

In [80]:
batch_size = 128
pad_index = pad_index
shuffle = True

trainloader = DataLoader(dataset = train_dataset,
                         batch_size = batch_size,
                         collate_fn = custom_collate_fn,
                         shuffle = shuffle)
testloader = DataLoader(dataset = test_dataset,
                         batch_size = batch_size,
                         collate_fn = custom_collate_fn,
                         shuffle = shuffle)

# 2. Define Model

## 2-1. Model Structure

In [81]:
class CustomLSTMCell(nn.Module):
    def __init__(self, input_dim, hidden_dim, bias = True):
        super().__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.bias = bias

        self.Wi = nn.Linear(input_dim + hidden_dim, hidden_dim, bias = bias)
        self.Wf = nn.Linear(input_dim + hidden_dim, hidden_dim, bias = bias)
        self.Wo = nn.Linear(input_dim + hidden_dim, hidden_dim, bias = bias)
        self.Wh = nn.Linear(input_dim + hidden_dim, hidden_dim, bias = bias)
        
    def forward(self, input, hidden):
        h, c = hidden
        concat_ih = torch.cat((input, h), 1)

        input_gate = F.sigmoid(self.Wi(concat_ih))
        forget_gate = F.sigmoid(self.Wf(concat_ih))
        output_gate = F.sigmoid(self.Wo(concat_ih))
        cell_gate = F.tanh(self.Wh(concat_ih))

        c_new = forget_gate * c + input_gate * cell_gate
        h_new = output_gate * F.tanh(c)

        return h_new, c_new

In [82]:
class CustomLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.Layers = nn.ModuleList([CustomLSTMCell(input_dim, hidden_dim)])
        for _ in range(1, num_layers):
            self.Layers.append(CustomLSTMCell(hidden_dim, hidden_dim))

    def forward(self, inputs, hidden):
        
        batch_size = inputs.size(0)
        seq_length = inputs.size(1)

        h0, c0 = hidden

        output_h = torch.zeros(self.num_layers, batch_size, seq_length, self.hidden_dim).to(device)
        output_c = torch.zeros(self.num_layers, batch_size, seq_length, self.hidden_dim).to(device)

        for layer_idx, layer in enumerate(self.Layers):

            if layer_idx == 0:
                layer_inputs = inputs
            else:
                layer_inputs = output_h[layer_idx - 1, :, :, :]
            
            h , c = h0[layer_idx, : :], c0[layer_idx, : :]
            for t in range(seq_length):
                h, c = layer(layer_inputs[:, t, :], (h, c))
                output_h[layer_idx, :, t, :] = h
                output_c[layer_idx, :, t, :] = c

        return output_h[-1, :, :, :], (output_h[:, :, -1, :], output_c[:, :, -1, :])

In [83]:
class CustomLSTMClassifier(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim, num_layers, pad_index):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.num_layers = num_layers
        
        self.embedding = nn.Embedding(input_dim, embedding_dim, padding_idx = pad_index)
        self.embedding.requires_grad_(False)
        
        self.lstm = CustomLSTM(embedding_dim, hidden_dim, num_layers)

        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, inputs):

        batch_size = inputs.size(0)
        seq_length = inputs.size(1)

        x = self.embedding(inputs)

        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(device)

        output, (h, c) = self.lstm(x, (h0, c0))

        logit = self.fc(h[-1, :, :])

        return logit

## 2-2. Hyperparameter & functions

In [84]:
vocab_size = len(vocab)
embedding_dim = 300
hidden_dim = 128
num_layers = 3
output_dim = 1
pad_index = pad_index
lr = 5e-4

model = CustomLSTMClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, num_layers, pad_index)

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

## 2-3. Weight Initialization

In [85]:
def initialize_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.xavier_normal_(m.weight)
        nn.init.zeros_(m.bias)
    elif isinstance(m, CustomLSTMCell):
        for name, param in m.named_parameters():
            if "bias" in name:
                nn.init.zeros_(param)
            else:
                nn.init.orthogonal_(param)

In [86]:
model.apply(initialize_weights)

CustomLSTMClassifier(
  (embedding): Embedding(24897, 300, padding_idx=1)
  (lstm): CustomLSTM(
    (Layers): ModuleList(
      (0): CustomLSTMCell(
        (Wi): Linear(in_features=428, out_features=128, bias=True)
        (Wf): Linear(in_features=428, out_features=128, bias=True)
        (Wo): Linear(in_features=428, out_features=128, bias=True)
        (Wh): Linear(in_features=428, out_features=128, bias=True)
      )
      (1-2): 2 x CustomLSTMCell(
        (Wi): Linear(in_features=256, out_features=128, bias=True)
        (Wf): Linear(in_features=256, out_features=128, bias=True)
        (Wo): Linear(in_features=256, out_features=128, bias=True)
        (Wh): Linear(in_features=256, out_features=128, bias=True)
      )
    )
  )
  (fc): Linear(in_features=128, out_features=1, bias=True)
)

In [87]:
model.embedding.weight.data = pretrained_embedding

In [88]:
pprint("Model's state_dict:")
for name, param in model.named_parameters():
    print(f"Parameter name: {name}")
    print(f"    Size : {param.size()}")
    print(f"    Value: {param}")

"Model's state_dict:"
Parameter name: embedding.weight
    Size : torch.Size([24897, 300])
    Value: Parameter containing:
tensor([[ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.2720, -0.0620, -0.1884,  ...,  0.1302, -0.1832,  0.1323],
        ...,
        [ 0.6701, -0.2717,  0.4766,  ...,  0.2786,  0.3312,  0.0230],
        [-0.1503,  0.5624, -0.5622,  ..., -0.4224, -0.6836,  0.0726],
        [ 1.1741, -0.4386,  0.3310,  ...,  0.3193, -0.2292, -0.0887]])
Parameter name: lstm.Layers.0.Wi.weight
    Size : torch.Size([128, 428])
    Value: Parameter containing:
tensor([[ 0.0131,  0.0123, -0.0348,  ...,  0.0095, -0.0612, -0.0559],
        [ 0.0962,  0.0589,  0.0271,  ..., -0.1056, -0.0373, -0.0010],
        [ 0.0557,  0.0242, -0.0937,  ..., -0.0347, -0.0239, -0.0508],
        ...,
        [ 0.0345, -0.0402, -0.0242,  ..., -0.0160,  0.0406, -0.0089],
        [-0.0125,  0.0060, -0.0612,  ..., 

In [89]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


print(f"The model has {count_parameters(model):,} trainable parameters")

The model has 482,945 trainable parameters


In [90]:
model.to(device)
criterion.to(device)

BCEWithLogitsLoss()

# 3. Train Model

In [92]:
def train_model(model, criterion, optimizer, trainloader, num_epochs):
    print("-----Training Started------")
    for epoch in range(num_epochs):
        
        model.train()
        
        running_loss = 0.0
        
        for inputs, labels in tqdm(trainloader):
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs.view(-1), labels.float())
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(trainloader.dataset)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss: .4f}")

        torch.save(model.state_dict(), model_dir)
    
    print("-----Training Completed-----")

In [93]:
num_epochs = 16

train_model(model, criterion, optimizer, trainloader, num_epochs)

-----Training Started------


100%|██████████| 196/196 [04:28<00:00,  1.37s/it]


Epoch [1/16], Loss:  0.6455


100%|██████████| 196/196 [03:36<00:00,  1.10s/it]


Epoch [2/16], Loss:  0.6274


100%|██████████| 196/196 [02:43<00:00,  1.20it/s]


Epoch [3/16], Loss:  0.6625


100%|██████████| 196/196 [02:41<00:00,  1.22it/s]


Epoch [4/16], Loss:  0.6932


100%|██████████| 196/196 [02:41<00:00,  1.21it/s]


Epoch [5/16], Loss:  0.6826


100%|██████████| 196/196 [02:41<00:00,  1.22it/s]


Epoch [6/16], Loss:  0.6428


100%|██████████| 196/196 [02:40<00:00,  1.22it/s]


Epoch [7/16], Loss:  0.6936


100%|██████████| 196/196 [02:41<00:00,  1.21it/s]


Epoch [8/16], Loss:  0.6935


100%|██████████| 196/196 [02:41<00:00,  1.21it/s]


Epoch [9/16], Loss:  0.6931


100%|██████████| 196/196 [02:41<00:00,  1.22it/s]


Epoch [10/16], Loss:  0.6923


100%|██████████| 196/196 [02:41<00:00,  1.21it/s]


Epoch [11/16], Loss:  0.6801


100%|██████████| 196/196 [02:40<00:00,  1.22it/s]


Epoch [12/16], Loss:  0.6111


100%|██████████| 196/196 [02:40<00:00,  1.22it/s]


Epoch [13/16], Loss:  0.5735


100%|██████████| 196/196 [02:41<00:00,  1.21it/s]


Epoch [14/16], Loss:  0.6920


100%|██████████| 196/196 [02:41<00:00,  1.21it/s]


Epoch [15/16], Loss:  0.6882


100%|██████████| 196/196 [02:40<00:00,  1.22it/s]

Epoch [16/16], Loss:  0.6789
-----Training Completed-----





# 4. Test

In [94]:
def test_model(model, testloader):
    model.eval()
    correct = 0
    total = 0
    TP = 0  # True Positives
    TN = 0  # True Negatives
    FP = 0  # False Positives
    FN = 0  # False Negatives

    with torch.no_grad():
        for inputs, labels in testloader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            predictions = torch.round(F.sigmoid(outputs))
            total += labels.size(0)

            predictions, labels = predictions.view(-1).cpu(), labels.cpu()
            
            correct += (predictions == labels).sum().item()

            TP += ((predictions == 1) & (labels == 1)).sum().item()
            TN += ((predictions == 0) & (labels == 0)).sum().item()
            FP += ((predictions == 1) & (labels == 0)).sum().item()
            FN += ((predictions == 0) & (labels == 1)).sum().item()

    accuracy = correct / total
    precision = TP / (TP + FP) if TP + FP != 0 else 0
    recall = TP / (TP + FN) if TP + FN != 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if precision + recall != 0 else 0

    print(f"Accuracy on test set: {accuracy:.4f}")
    print(f"Precision on test set: {precision:.4f}")
    print(f"Recall on test set: {recall:.4f}")
    print(f"F1 Score on test set: {f1:.4f}")


In [95]:
model.load_state_dict(torch.load(model_dir))
model.to(device)

CustomLSTMClassifier(
  (embedding): Embedding(24897, 300, padding_idx=1)
  (lstm): CustomLSTM(
    (Layers): ModuleList(
      (0): CustomLSTMCell(
        (Wi): Linear(in_features=428, out_features=128, bias=True)
        (Wf): Linear(in_features=428, out_features=128, bias=True)
        (Wo): Linear(in_features=428, out_features=128, bias=True)
        (Wh): Linear(in_features=428, out_features=128, bias=True)
      )
      (1-2): 2 x CustomLSTMCell(
        (Wi): Linear(in_features=256, out_features=128, bias=True)
        (Wf): Linear(in_features=256, out_features=128, bias=True)
        (Wo): Linear(in_features=256, out_features=128, bias=True)
        (Wh): Linear(in_features=256, out_features=128, bias=True)
      )
    )
  )
  (fc): Linear(in_features=128, out_features=1, bias=True)
)

In [96]:
test_model(model, testloader)

Accuracy on test set: 0.6440
Precision on test set: 0.6854
Recall on test set: 0.5323
F1 Score on test set: 0.5992


# 5. Inference

In [97]:
def inference(text, model, tokenizer, vocab):
    tokens = tokenizer(text)
    ids = vocab.lookup_indices(tokens)
    tensor = torch.LongTensor(ids).unsqueeze(dim=0).to(device)

    output = model(tensor)

    prediction = torch.round(F.sigmoid(output))

    return prediction

In [98]:
model.load_state_dict(torch.load(model_dir))
model.to(device)

CustomLSTMClassifier(
  (embedding): Embedding(24897, 300, padding_idx=1)
  (lstm): CustomLSTM(
    (Layers): ModuleList(
      (0): CustomLSTMCell(
        (Wi): Linear(in_features=428, out_features=128, bias=True)
        (Wf): Linear(in_features=428, out_features=128, bias=True)
        (Wo): Linear(in_features=428, out_features=128, bias=True)
        (Wh): Linear(in_features=428, out_features=128, bias=True)
      )
      (1-2): 2 x CustomLSTMCell(
        (Wi): Linear(in_features=256, out_features=128, bias=True)
        (Wf): Linear(in_features=256, out_features=128, bias=True)
        (Wo): Linear(in_features=256, out_features=128, bias=True)
        (Wh): Linear(in_features=256, out_features=128, bias=True)
      )
    )
  )
  (fc): Linear(in_features=128, out_features=1, bias=True)
)

In [99]:
text = 'This film is terrible!'

prediction = inference(text, model, tokenizer, vocab)

print(prediction.item())

1.0


In [100]:
text = 'This film is great!'

prediction = inference(text, model, tokenizer, vocab)

print(prediction.item())

1.0


In [101]:
text = 'The best film I have ever seen!'

prediction = inference(text, model, tokenizer, vocab)

print(prediction.item())

0.0


In [102]:
text = "This film is not terrible, it's great!"

prediction = inference(text, model, tokenizer, vocab)

print(prediction.item())

0.0


# 6. Visualize Results

# Limitation

padding 토큰 연산 제외 / dropout / validation