## Transformer - Sentiment Analysis

In [1]:
#%pip install torchtext
#%pip install torchdata
#%pip install torchdata==0.7.1
#%pip install portalocker
#%pip install datasets


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from datasets import load_dataset
import random
import numpy as np
from collections import Counter
import torch
import math
from torchtext.vocab import GloVe


# set device
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# set seed
def set_seed(seed: int = 42):
    random.seed(seed)                           # Python random
    np.random.seed(seed)                        # NumPy
    torch.manual_seed(seed)                     # PyTorch CPU
    torch.cuda.manual_seed_all(seed)            # PyTorch GPU (if using)
    torch.backends.cudnn.deterministic = True   # For reproducibility
    torch.backends.cudnn.benchmark = False      # Disable auto-tuning (slower but stable)

set_seed(42)

  from .autonotebook import tqdm as notebook_tqdm


Using device: cpu


In [3]:
# ========== Hyperparameters ==========
EMBEDDING_DIM = 50
HIDDEN_DIM = 128
MAX_LEN = 300
BATCH_SIZE = 64
EPOCHS = 5
LEARNING_RATE = 0.005

# ========== Load IMDB Dataset ==========
dataset = load_dataset("imdb")
train_data = dataset["train"]
test_data = dataset["test"]



In [4]:
# ========== Tokenizer and GloVe ==========
tokenizer = get_tokenizer("basic_english")
glove = GloVe(name="6B", dim=EMBEDDING_DIM)

def yield_glove_tokens(data_iter):
    for example in data_iter:
        tokens = tokenizer(example["text"])
        yield [token for token in tokens if token in glove.stoi]

# ========== Build Vocab (only GloVe tokens) ==========
vocab = build_vocab_from_iterator(yield_glove_tokens(train_data), specials=["<pad>", "<unk>"])
vocab.set_default_index(vocab["<unk>"])

# ========== GloVe Coverage Check ==========
known = sum(1 for token in vocab.get_itos() if token in glove.stoi)
print(f"GloVe coverage: {known / len(vocab):.2%}")

# ========== Preprocessing ==========
def preprocess(example):
    tokens = tokenizer(example["text"])
    input_ids = vocab(tokens)[:MAX_LEN]
    label = int(example["label"])
    return {"input_ids": input_ids, "label": label}

train_data = train_data.map(preprocess)
test_data = test_data.map(preprocess)
train_data.set_format(type="python", columns=["input_ids", "label"])
test_data.set_format(type="python", columns=["input_ids", "label"])

# ========== Collate Function ==========
def collate_batch(batch):
    texts = [torch.tensor(sample["input_ids"], dtype=torch.int64) for sample in batch]
    labels = [torch.tensor(sample["label"], dtype=torch.float32) for sample in batch]
    texts_padded = pad_sequence(texts, batch_first=True, padding_value=vocab["<pad>"])
    return texts_padded.to(device), torch.stack(labels).to(device)

# ========== DataLoaders ==========
train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, 
                          collate_fn=collate_batch, num_workers=0, pin_memory=True)
test_loader = DataLoader(test_data, batch_size=BATCH_SIZE, 
                         collate_fn=collate_batch, num_workers=0, pin_memory=True)

# ========== Check Class Balance ==========
print("Label distribution in training set:")
print(Counter([sample["label"] for sample in train_data]))

# ========== Build Embedding Matrix ==========
embedding_matrix = torch.zeros(len(vocab), EMBEDDING_DIM)

for idx, token in enumerate(vocab.get_itos()):
    if token in glove.stoi:
        embedding_matrix[idx] = glove[token]
    else:
        embedding_matrix[idx] = torch.randn(EMBEDDING_DIM) * 0.6  # small random vector for unknowns

.vector_cache/glove.6B.zip: 862MB [02:45, 5.21MB/s]                               
100%|█████████▉| 399999/400000 [00:10<00:00, 38053.34it/s]


GloVe coverage: 100.00%
Label distribution in training set:
Counter({0: 12500, 1: 12500})


In [5]:
#Crearte the Transformer
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # shape: (1, max_len, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return x

class TransformerSentimentClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, hidden_dim, num_layers, max_len, dropout=0.1):
        super(TransformerSentimentClassifier, self).__init__()
        
        self.embedding = nn.Embedding.from_pretrained(embedding_matrix, freeze=False, padding_idx=vocab["<pad>"]) # embedding layer
        self.positional_encoding = PositionalEncoding(embed_dim, max_len) # Positional Encoding

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            dim_feedforward=hidden_dim,
            dropout=dropout,
            activation='relu',
            batch_first=True 
        ) # Transformer Encoder Layer

        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) # Transformer Encoder

        self.classifier = nn.Sequential( # Final classification layer
            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, x):
        mask = (x == vocab["<pad>"]).to(torch.bool)
        
        x = self.embedding(x)  # Embed tokens
        x = self.positional_encoding(x) # Add positional encoding

        x = self.transformer_encoder(x, src_key_padding_mask=mask) # Transformer Encoder
        x = x.mean(dim=1)  # Global average pooling

        return self.classifier(x).squeeze()
    
# Initialize model, loss function, and optimizer
model = TransformerSentimentClassifier(
    vocab_size=len(vocab),
    embed_dim=EMBEDDING_DIM,
    num_heads=1,  
    hidden_dim=HIDDEN_DIM,
    num_layers=1,
    max_len=MAX_LEN
).to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=2, factor=0.5)

# Initialize weights
def init_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight)
        nn.init.zeros_(m.bias)

model.apply(init_weights)



TransformerSentimentClassifier(
  (embedding): Embedding(63925, 50, padding_idx=0)
  (positional_encoding): PositionalEncoding()
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=50, out_features=50, bias=True)
        )
        (linear1): Linear(in_features=50, out_features=128, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=128, out_features=50, bias=True)
        (norm1): LayerNorm((50,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((50,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (classifier): Sequential(
    (0): Linear(in_features=50, out_features=128, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.1, inplace=False)
    (3): Linear(in_fea

In [6]:
from tqdm import tqdm

# Training loop
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    loop = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}")
    
    for texts, labels in loop:
        optimizer.zero_grad()
        outputs = model(texts)
        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        total_loss += loss.item()
        loop.set_postfix(loss=total_loss / (loop.n + 1))
    
    # Step the scheduler once per epoch
    scheduler.step(total_loss / len(train_loader))  # at end of each epoch

    print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader):.4f}")

Epoch 1/5:  88%|████████▊ | 343/391 [01:12<00:10,  4.75it/s, loss=0.408]


KeyboardInterrupt: 

In [None]:
model.eval()
y_true, y_pred = [], []

with torch.no_grad():
    for texts, labels in test_loader:
        outputs = model(texts)
        predicted = (torch.sigmoid(outputs) >= 0.50).float()  # only apply sigmoid here
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

# Now compute metrics on all collected predictions
acc = accuracy_score(y_true, y_pred)
precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average="binary")

print(f"Accuracy:  {acc:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1:.4f}")


Accuracy:  0.8018
Precision: 0.8152
Recall:    0.7804
F1 Score:  0.7974


In [None]:
print("Unique predictions:", torch.unique(torch.tensor(y_pred), return_counts=True))

Unique predictions: (tensor([0., 1.]), tensor([13034, 11966]))


In [None]:
Counter([sample["label"] for sample in test_data])

Counter({0: 12500, 1: 12500})