In [None]:
! pip install datasets

# Imports

In [None]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import torch.optim as optim
from datasets import load_dataset
import tqdm
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import matplotlib.pyplot as plt
from huggingface_hub import notebook_login
from collections import Counter
import re

In [None]:
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('punkt_tab')

In [None]:
notebook_login()

# Dataset

## Download dataset

In [None]:
dataset = load_dataset("imdb")

## show dataset

In [None]:
print(dataset)

In [None]:
print(dataset["train"][0]['text'])
print(dataset["train"][0]['label'])

## Index to label

In [None]:
idx2label = {0: "Negative", 1: "Positive"}

In [None]:
dataset["train"] = dataset["train"].map(lambda x: {"label_name": idx2label[x["label"]]})
dataset["test"] = dataset["test"].map(lambda x: {"label_name": idx2label[x["label"]]})

## Show class distribution

In [None]:
label_counts = Counter(dataset['train']['label_name'])
labels = list(label_counts.keys())
counts = list(label_counts.values())


plt.bar(labels, counts)
plt.title('Distribution of Labels')
plt.xlabel('Label')
plt.ylabel('Count')
plt.tight_layout()
plt.show()

## Preprocessing

### Remove HTML tags

In [None]:
def remove_html(text):
    clean = re.compile('<.*?>')
    return re.sub(clean, '', text)

### Tokenize and remove stop words

In [None]:
stop_words = set(stopwords.words('english'))

In [None]:
def tokenize_and_remove_stopwords(example):
    tokens = word_tokenize(example['text'])
    filtered_tokens = [word for word in tokens if word.lower() not in stop_words]
    example['tokens'] = filtered_tokens[:200]
    return example

### Preprocessing function

In [None]:
def preprocess(example):
    example['text'] = remove_html(example['text'])
    example = tokenize_and_remove_stopwords(example)
    return example

### Apply preprocesing

In [None]:
dataset = dataset.map(preprocess)

In [None]:
dataset

### Get Sequence lengths

In [None]:
dataset["train"] = dataset["train"].map(lambda x: {"len": len(x['tokens'])})

In [None]:
dataset["test"] = dataset["test"].map(lambda x: {"len": len(x['tokens'])})

In [None]:
print(dataset)

## build vocab

In [None]:
all_tokens = [token for example in dataset['train'] for token in example['tokens']]
vocab_counter = Counter(all_tokens)
vocab = {'<PAD>': 0, '<UNK>': 1}
vocab.update({token: idx + 2 for idx, (token, _) in enumerate(vocab_counter.most_common(20000))})

In [None]:
vocab_size = len(vocab)
print(f"Vocabulary size: {vocab_size}")

## tokens to ids

In [None]:
def tokens_to_ids(example):
    example['input_ids'] = [vocab.get(token, vocab['<UNK>']) for token in example['tokens']]
    return example

dataset = dataset.map(tokens_to_ids)


## Show sequence length distribution

In [None]:
plt.hist(dataset['train']['len'])
plt.title('Distribution of sequence length')
plt.xlabel('sequence length')
plt.ylabel('Count')
plt.tight_layout()
plt.show()

## Dataset class

In [None]:
class SentimentDataset(torch.utils.data.Dataset):
    def __init__(self, dataset):
        self.dataset = dataset
    def __len__(self):
        return len(self.dataset)
    def __getitem__(self, idx):
        item = self.dataset[idx]
        return torch.tensor(item['input_ids']), item['label'], item['len']

In [None]:
def collate_fn(batch):
    input_ids, labels, lengths_list = zip(*batch)

    lengths = torch.tensor(lengths_list)
    labels = torch.tensor(labels)


    lengths, idx = lengths.sort(descending=True)
    input_ids = [input_ids[i] for i in idx]
    labels = labels[idx]

    padded_inputs = pad_sequence(input_ids, batch_first=True, padding_value=vocab['<PAD>'])

    return padded_inputs, labels, lengths

In [None]:
train_dataset = SentimentDataset(dataset['train'])
test_dataset = SentimentDataset(dataset['test'].select(range(1000)))

## Dataloaders

In [None]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, num_workers=2, shuffle=True, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn)

# LSTM model class

In [None]:
class SentimentClassifier(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout, pad_idx):
    super().__init__()
    self.embed = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
    self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, bidirectional=bidirectional, dropout= dropout if n_layers > 0 else 0, batch_first=True)
    self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(dropout)
  def forward(self, text, lengths):
      embedded = self.embed(text)
      packed = pack_padded_sequence(embedded, lengths, batch_first=True)
      packed_output, (hidden, cell) = self.lstm(packed)

      if self.lstm.bidirectional:
          hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)
      else:
          hidden = hidden[-1]

      hidden = self.dropout(hidden)
      hidden = self.relu(hidden)
      return self.fc(hidden)


# Training loop

In [None]:
def training_loop(model, train_loader, optimizer, criterion, epochs, device):
    model.train()
    losses = []
    accuracies = []

    for epoch in range(epochs):
        running_loss = 0.0
        total_correct = 0
        total_samples = 0

        progress_bar = tqdm.tqdm(train_loader, desc=f"Epoch {epoch + 1} / {epochs}")
        for inputs, labels, lengths in progress_bar:
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs, lengths)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * labels.size(0)  # accumulate total loss
            preds = torch.argmax(outputs, dim=1)
            total_correct += (preds == labels).sum().item()
            total_samples += labels.size(0)

            avg_batch_loss = running_loss / total_samples
            batch_accuracy = total_correct / total_samples
            progress_bar.set_postfix(loss=f"{avg_batch_loss:.4f}", accuracy=f"{batch_accuracy:.4f}")

        epoch_loss = running_loss / total_samples
        epoch_accuracy = total_correct / total_samples
        losses.append(epoch_loss)
        accuracies.append(epoch_accuracy)

    return losses, accuracies

# Model Training

## Hyperparameters

In [None]:
embedding_dim = 100
hidden_dim = 256
pad_idx = vocab['<PAD>']
output_dim = 2

### Model 1
- uni direction
- 1 layer

In [None]:
n_layers = 1
bidirectional = False
dropout = 0.5


In [None]:
model = SentimentClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout, pad_idx)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
epochs = 10

In [None]:
losses_1, accuracies_1 = training_loop(model,
                                   train_loader,
                                   optimizer,
                                   criterion,
                                   epochs,
                                   device)

### Model 2
- bi-directional
- 1 layer

In [None]:
n_layers = 1
bidirectional = True
dropout = 0.5

In [None]:
model_2 = SentimentClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout, pad_idx)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_2 = model_2.to(device)
optimizer = torch.optim.Adam(model_2.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
epochs = 10

In [None]:
losses_2, accuracies_2 = training_loop(model_2,
                                   train_loader,
                                   optimizer,
                                   criterion,
                                   epochs,
                                   device)

### Model 3
- uni direction
- 2 layer

In [None]:
n_layers = 2
bidirectional = False
dropout = 0.5


In [None]:
model_3 = SentimentClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout, pad_idx)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_3 = model_3.to(device)
optimizer = torch.optim.Adam(model_3.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
epochs = 10

In [None]:
losses_3, accuracies_3 = training_loop(model_3,
                                   train_loader,
                                   optimizer,
                                   criterion,
                                   epochs,
                                   device)

# Results

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(18, 5))
ax[0].plot(losses_1)
ax[0].plot(losses_2)
ax[0].plot(losses_3)
ax[0].set_title("loss Reslut")
ax[0].legend(['Model 1', 'Model 2', 'Model 3'])

ax[1].plot(accuracies_1)
ax[1].plot(accuracies_2)
ax[1].plot(accuracies_3)
ax[1].set_title("Training Reslut")
ax[1].legend(['Model 1', 'Model 2', 'Model 3'])

plt.tight_layout()
plt.show()

# Inference

In [None]:
def inference_loop(model, data_loader, device, idx_to_label=None, max_examples=20):
    model.eval()
    predictions = []
    true_labels = []
    examples_shown = 0
    correct = 0
    total = 0

    with torch.inference_mode():
        for inputs, labels, lengths in data_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs, lengths)
            preds = torch.argmax(outputs, dim=1)

            predictions_batch = preds.cpu().tolist()
            true_batch = labels.cpu().tolist()
            predictions.extend(predictions_batch)
            true_labels.extend(true_batch)

            correct += sum(p == t for p, t in zip(predictions_batch, true_batch))
            total += len(true_batch)

      
            if examples_shown < max_examples:
                for i in range(len(labels)):
                    pred_label = preds[i].item()
                    true_label = labels[i].item()
                    print(f"[Example {examples_shown + 1}]")
                    print(f"Prediction: {idx_to_label[pred_label] if idx_to_label else pred_label}")
                    print(f"Ground Truth: {idx_to_label[true_label] if idx_to_label else true_label}")
                    print("-" * 40)
                    examples_shown += 1
                    if examples_shown >= max_examples:
                        break

    accuracy = correct / total if total > 0 else 0
    print(f" Inference Accuracy: {accuracy * 100:.2f}%")

    return predictions, true_labels, accuracy

In [None]:
predictions, true_labels, accuracy = inference_loop(model_3, test_loader, device, idx_to_label=idx2label, max_examples=10)

In [None]:
predictions, true_labels, accuracy = inference_loop(model_2, test_loader, device, idx_to_label=idx2label, max_examples=10)

In [None]:
predictions, true_labels, accuracy = inference_loop(model, test_loader, device, idx_to_label=idx2label, max_examples=10)