# Embbedding + LSTM Classifier
## Strongly Recommended to be run on GPU
https://colab.research.google.com/drive/14hS4hSftIV2NOlO0AXXMhJ5oGgJgXsak?usp=sharing

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence
from transformers import BertTokenizer
from datasets import load_dataset

dataset = load_dataset("imdb")

torch.manual_seed(0)
np.random.seed(0)

# Calculate 5% of the dataset size
train_size = int(0.05 * len(dataset["train"]))
test_size = int(0.05 * len(dataset["test"]))

# Shuffle and select 5% of the data
train_data = dataset["train"].shuffle(seed=42).select(range(train_size))
test_data = dataset["test"].shuffle(seed=42).select(range(test_size))

# Initialize tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Define constants
BATCH_SIZE = 64
PAD_IDX = tokenizer.pad_token_id

Using the latest cached version of the dataset since imdb couldn't be found on the Hugging Face Hub
Found the latest cached dataset configuration 'plain_text' at /Users/kobkrit/.cache/huggingface/datasets/imdb/plain_text/0.0.0/e6281661ce1c48d982bc483cf8a173c1bbeb5d31 (last modified on Fri May 24 19:42:16 2024).


In [None]:
# Dataset class
class IMDBDataset(Dataset):
    def __init__(self, data):
        self.data = data
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        text = self.data[idx]["text"]
        label = self.data[idx]["label"]
        tokens = tokenizer(text, truncation=True, padding='max_length', max_length=512)
        token_ids = torch.tensor(tokens["input_ids"], dtype=torch.long)
        return token_ids, torch.tensor(label, dtype=torch.float)

# Create Dataset objects
train_dataset = IMDBDataset(train_data)
test_dataset = IMDBDataset(test_data)

In [None]:
test_dataset.data

Dataset({
    features: ['text', 'label'],
    num_rows: 1250
})

In [None]:
test_dataset.data.to_pandas()

Unnamed: 0,text,label
0,<br /><br />When I unsuspectedly rented A Thou...,1
1,This is the latest entry in the long series of...,1
2,This movie was so frustrating. Everything seem...,0
3,"I was truly and wonderfully surprised at ""O' B...",1
4,This movie spends most of its time preaching t...,0
...,...,...
1245,This movie was really stupid and I thought tha...,0
1246,This is an example of why the majority of acti...,0
1247,"I am a fan of a few of the Vacation films, but...",0
1248,I loved Complete Savages! Why did they cancel ...,1


In [None]:
# Pad sequences and create DataLoaders
def collate_batch(batch):
    label_list, text_list, lengths = [], [], []
    for (_text, _label) in batch:
        label_list.append(_label)
        text_list.append(_text)
        lengths.append(len(_text))
    label_list = torch.tensor(label_list, dtype=torch.float)
    text_list = pad_sequence(text_list, batch_first=True, padding_value=PAD_IDX)
    lengths = torch.tensor(lengths, dtype=torch.long)
    return text_list, lengths, label_list

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_batch)

In [None]:
# Define LSTM Model Class
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, 
                 bidirectional, dropout, pad_idx):
        super(LSTMClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, 
                            bidirectional=bidirectional, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text, text_lengths):
        embedded = self.dropout(self.embedding(text))
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        return self.fc(hidden)

In [None]:
    
# Hyperparameters
INPUT_DIM = tokenizer.vocab_size
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 1
N_LAYERS = 2
BIDIRECTIONAL = True
DROPOUT = 0.5

# Initialize the model
model = LSTMClassifier(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, 
                       BIDIRECTIONAL, DROPOUT, PAD_IDX)

In [None]:
model

LSTMClassifier(
  (embedding): Embedding(30522, 100, padding_idx=0)
  (lstm): LSTM(100, 256, num_layers=2, batch_first=True, dropout=0.5, bidirectional=True)
  (fc): Linear(in_features=512, out_features=1, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
)

In [None]:
# Define loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters())

# Move model and loss function to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = criterion.to(device)
print(device)

cpu


In [None]:

# Training function
def train(model, iterator, optimizer, criterion):
    epoch_loss = 0
    epoch_acc = 0
    model.train()
    for batch in iterator:
        optimizer.zero_grad()
        text, text_lengths, labels = batch
        text = text.to(device)
        text_lengths = text_lengths.to(device)
        labels = labels.to(device)
        predictions = model(text, text_lengths).squeeze(1)
        loss = criterion(predictions, labels)
        rounded_preds = torch.round(torch.sigmoid(predictions))
        correct = (rounded_preds == labels).float()
        acc = correct.sum() / len(correct)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        epoch_acc += acc.item()
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:

# Evaluation function
def evaluate(model, iterator, criterion):
    epoch_loss = 0
    epoch_acc = 0
    model.eval()
    with torch.no_grad():
        for batch in iterator:
            text, text_lengths, labels = batch
            text = text.to(device)
            text_lengths = text_lengths.to(device)
            labels = labels.to(device)
            predictions = model(text, text_lengths).squeeze(1)
            loss = criterion(predictions, labels)
            rounded_preds = torch.round(torch.sigmoid(predictions))
            correct = (rounded_preds == labels).float()
            acc = correct.sum() / len(correct)
            epoch_loss += loss.item()
            epoch_acc += acc.item()
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
# Training loop
N_EPOCHS = 10
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    print(f"Epoch {epoch+1}/{N_EPOCHS}")
    train_loss, train_acc = train(model, train_dataloader, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, test_dataloader, criterion)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut4-model.pt')

    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

# Loading the best model
model.load_state_dict(torch.load('tut4-model.pt'))

# Evaluate on test data
test_loss, test_acc = evaluate(model, test_dataloader, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

Epoch 1/10


KeyboardInterrupt: 