# Lab 07-03: Sentiment Analysis with Bidirectional LSTM

**Dataset:** IMDB Movie Reviews (25,000 training + 25,000 test)  
**Task:** Binary classification (Positive/Negative sentiment)  

## Part 1: Setup và Import Libraries

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
import re
import os
import urllib.request
import tarfile
from tqdm import tqdm
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# Set random seed
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
print(f"PyTorch version: {torch.__version__}")

## Part 2: Data Preparation

In [None]:
# Download và load IMDB dataset (copy from solution notebook)
def download_imdb_dataset(data_dir='./data'):
    """Download IMDB dataset"""
    url = "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"
    filepath = os.path.join(data_dir, "aclImdb_v1.tar.gz")
    
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)
    
    if not os.path.exists(os.path.join(data_dir, 'aclImdb')):
        print("Downloading IMDB dataset...")
        urllib.request.urlretrieve(url, filepath)
        print("Extracting...")
        with tarfile.open(filepath, 'r:gz') as tar:
            tar.extractall(data_dir)
        os.remove(filepath)
        print(" Done!")
    else:
        print(" Dataset already exists.")
    
    return os.path.join(data_dir, 'aclImdb')

def load_imdb_data(data_path, split='train'):
    """Load IMDB data"""
    texts = []
    labels = []
    
    for label_type in ['pos', 'neg']:
        dir_path = os.path.join(data_path, split, label_type)
        label = 1 if label_type == 'pos' else 0
        
        for filename in os.listdir(dir_path):
            if filename.endswith('.txt'):
                with open(os.path.join(dir_path, filename), 'r', encoding='utf-8') as f:
                    texts.append(f.read())
                    labels.append(label)
    
    return texts, labels

def preprocess_text(text):
    """Preprocess text"""
    text = text.lower()
    text = re.sub(r'<[^>]+>', ' ', text)
    text = re.sub(r'[^a-z0-9\\s]', ' ', text)
    tokens = text.split()
    return tokens

# Download and load data
data_path = download_imdb_dataset()
print("\\nLoading data...")
train_texts, train_labels = load_imdb_data(data_path, 'train')
test_texts, test_labels = load_imdb_data(data_path, 'test')
print(f" Train: {len(train_texts)}, Test: {len(test_texts)}")

In [None]:
# Build Vocabulary (copy from solution)
class Vocabulary:
    """Vocabulary class"""
    
    def __init__(self, max_vocab_size=25000, min_freq=2):
        self.max_vocab_size = max_vocab_size
        self.min_freq = min_freq
        self.word2idx = {'<PAD>': 0, '<UNK>': 1}
        self.idx2word = {0: '<PAD>', 1: '<UNK>'}
        self.word_freq = Counter()
        
    def build_vocab(self, texts):
        """Build vocabulary"""
        print("Building vocabulary...")
        for text in tqdm(texts):
            tokens = preprocess_text(text)
            self.word_freq.update(tokens)
        
        filtered_words = [
            word for word, freq in self.word_freq.items() 
            if freq >= self.min_freq
        ]
        
        sorted_words = sorted(filtered_words, key=lambda w: self.word_freq[w], reverse=True)
        vocab_words = sorted_words[:self.max_vocab_size - 2]
        
        for idx, word in enumerate(vocab_words, start=2):
            self.word2idx[word] = idx
            self.idx2word[idx] = word
        
        print(f"✓ Vocabulary: {len(self.word2idx):,} words")
    
    def text_to_indices(self, text):
        """Convert text to indices"""
        tokens = preprocess_text(text)
        indices = [self.word2idx.get(token, self.word2idx['<UNK>']) for token in tokens]
        return indices
    
    def __len__(self):
        return len(self.word2idx)

# Build vocabulary
vocab = Vocabulary(max_vocab_size=25000, min_freq=2)
vocab.build_vocab(train_texts)
print(f"Vocabulary size: {len(vocab)}")

In [None]:
# Create Dataset and DataLoader
class IMDBDataset(Dataset):
    """PyTorch Dataset for IMDB"""
    
    def __init__(self, texts, labels, vocab, max_length=256):
        self.labels = labels
        self.vocab = vocab
        self.max_length = max_length
        
        self.sequences = []
        for text in tqdm(texts, desc="Converting texts"):
            indices = vocab.text_to_indices(text)
            if len(indices) > max_length:
                indices = indices[:max_length]
            self.sequences.append(indices)
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return (
            torch.tensor(self.sequences[idx], dtype=torch.long),
            torch.tensor(self.labels[idx], dtype=torch.float)
        )

def collate_fn(batch):
    """Custom collate function"""
    sequences, labels = zip(*batch)
    lengths = torch.tensor([len(seq) for seq in sequences])
    padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)
    labels = torch.stack(labels)
    return padded_sequences, labels, lengths

# Create datasets
MAX_LENGTH = 256
BATCH_SIZE = 64

print("Creating datasets...")
train_dataset = IMDBDataset(train_texts, train_labels, vocab, MAX_LENGTH)
test_dataset = IMDBDataset(test_texts, test_labels, vocab, MAX_LENGTH)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

print(f" Train batches: {len(train_loader)}, Test batches: {len(test_loader)}")

## Part 3: BiLSTM Model

### 3.1. BiLSTM Architecture

Key differences from unidirectional LSTM:
1. **bidirectional=True** trong nn.LSTM
2. **hidden_dim * 2** for FC layer (because we have forward + backward)
3. **Concatenate forward và backward hidden states**: `torch.cat((hidden[-2], hidden[-1]), dim=1)`

In [None]:
class BiLSTMClassifier(nn.Module):
    """
    Bidirectional LSTM for Sentiment Classification
    """
    
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
                 n_layers=2, dropout=0.5, pad_idx=0):
        super(BiLSTMClassifier, self).__init__()
        
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        
        # Bidirectional LSTM 
        self.lstm = nn.LSTM(
            embedding_dim, 
            hidden_dim, 
            num_layers=n_layers,
            batch_first=True,
            bidirectional=True,  #  KEY: Enable bidirectional
            dropout=dropout if n_layers > 1 else 0
        )
        
        # Dropout
        self.dropout = nn.Dropout(dropout)
        
        # FC layer: input = hidden_dim * 2 (vì bidirectional) 
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
    
    def forward(self, text, text_lengths):
        """
        Forward pass
        
        Args:
            text: [batch_size, seq_len]
            text_lengths: [batch_size]
        
        Returns:
            output: [batch_size]
        """
        # Embedding: [batch_size, seq_len, embedding_dim]
        embedded = self.embedding(text)
        
        # Pack padded sequence
        packed_embedded = pack_padded_sequence(
            embedded, text_lengths.cpu(), batch_first=True, enforce_sorted=False
        )
        
        # BiLSTM
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        # hidden: [n_layers * 2, batch_size, hidden_dim]
        # Note: *2 because bidirectional
        
        # Concatenate forward and backward hidden states 
        # Forward: hidden[-2], Backward: hidden[-1]
        final_hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)
        # final_hidden: [batch_size, hidden_dim * 2]
        
        # Dropout + FC + Sigmoid
        output = self.dropout(final_hidden)
        output = self.fc(output)
        output = torch.sigmoid(output)
        
        return output.squeeze(1)

# Model hyperparameters
VOCAB_SIZE = len(vocab)
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 1
N_LAYERS = 2
DROPOUT = 0.5
PAD_IDX = vocab.word2idx['<PAD>']

# Create BiLSTM model
bilstm_model = BiLSTMClassifier(
    VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM,
    N_LAYERS, DROPOUT, PAD_IDX
).to(device)

print("BiLSTM Model")
print(bilstm_model)
print(f"\\nTotal parameters: {sum(p.numel() for p in bilstm_model.parameters()):,}")

## Part 4: Training

In [None]:
# Training functions (copy from solution)
def train_epoch(model, dataloader, optimizer, criterion, device):
    """Train model for one epoch"""
    model.train()
    epoch_loss = 0
    correct = 0
    total = 0
    
    for batch in tqdm(dataloader, desc="Training"):
        sequences, labels, lengths = batch
        sequences = sequences.to(device)
        labels = labels.to(device)
        lengths = lengths.to(device)
        
        optimizer.zero_grad()
        predictions = model(sequences, lengths)
        loss = criterion(predictions, labels)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        epoch_loss += loss.item()
        predicted_labels = (predictions >= 0.5).float()
        correct += (predicted_labels == labels).sum().item()
        total += labels.size(0)
    
    return epoch_loss / len(dataloader), correct / total

def evaluate(model, dataloader, criterion, device):
    """Evaluate model"""
    model.eval()
    epoch_loss = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            sequences, labels, lengths = batch
            sequences = sequences.to(device)
            labels = labels.to(device)
            lengths = lengths.to(device)
            
            predictions = model(sequences, lengths)
            loss = criterion(predictions, labels)
            epoch_loss += loss.item()
            
            predicted_labels = (predictions >= 0.5).float()
            all_preds.extend(predicted_labels.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    accuracy = accuracy_score(all_labels, all_preds)
    return epoch_loss / len(dataloader), accuracy, all_preds, all_labels

In [None]:
# Training loop
N_EPOCHS = 5
LEARNING_RATE = 0.001

optimizer = optim.Adam(bilstm_model.parameters(), lr=LEARNING_RATE)
criterion = nn.BCELoss()

history = {
    'train_loss': [],
    'train_acc': [],
    'test_loss': [],
    'test_acc': []
}

best_acc = 0

print("Training Bidirectional LSTM Model")

for epoch in range(N_EPOCHS):
    print(f"\\nEpoch {epoch+1}/{N_EPOCHS}")
    
    # Train
    train_loss, train_acc = train_epoch(bilstm_model, train_loader, optimizer, criterion, device)
    
    # Evaluate
    test_loss, test_acc, _, _ = evaluate(bilstm_model, test_loader, criterion, device)
    
    # Save history
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['test_loss'].append(test_loss)
    history['test_acc'].append(test_acc)
    
    # Print results
    print(f"\\nTrain Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}%")
    print(f"Test Loss: {test_loss:.4f} | Test Acc: {test_acc*100:.2f}%")
    
    # Save best model
    if test_acc > best_acc:
        best_acc = test_acc
        torch.save(bilstm_model.state_dict(), 'bilstm_best.pt')
        print(f" Saved best model (Acc: {best_acc*100:.2f}%)")

print(f"Training Complete! Best Test Accuracy: {best_acc*100:.2f}%")

## Part 5: Visualization và Analysis

In [None]:
# Plot training history
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

epochs = range(1, N_EPOCHS + 1)

# Loss
ax = axes[0]
ax.plot(epochs, history['train_loss'], 'b-', label='Train Loss', linewidth=2)
ax.plot(epochs, history['test_loss'], 'r-', label='Test Loss', linewidth=2)
ax.set_xlabel('Epoch', fontsize=12)
ax.set_ylabel('Loss', fontsize=12)
ax.set_title('Training & Test Loss', fontsize=14, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)

# Accuracy
ax = axes[1]
ax.plot(epochs, [acc*100 for acc in history['train_acc']], 'b-', label='Train Acc', linewidth=2)
ax.plot(epochs, [acc*100 for acc in history['test_acc']], 'r-', label='Test Acc', linewidth=2)
ax.set_xlabel('Epoch', fontsize=12)
ax.set_ylabel('Accuracy (%)', fontsize=12)
ax.set_title('Training & Test Accuracy', fontsize=14, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
# plt.savefig('bilstm_history.png', dpi=150)
plt.show()

In [None]:
# Confusion Matrix
bilstm_model.load_state_dict(torch.load('bilstm_best.pt'))
_, test_acc, predictions, true_labels = evaluate(bilstm_model, test_loader, criterion, device)

cm = confusion_matrix(true_labels, predictions)

plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Negative', 'Positive'],
            yticklabels=['Negative', 'Positive'])
plt.xlabel('Predicted', fontsize=12)
plt.ylabel('True', fontsize=12)
plt.title(f'Confusion Matrix (Accuracy: {test_acc*100:.2f}%)', fontsize=14, fontweight='bold')
plt.tight_layout()
# plt.savefig('bilstm_confusion_matrix.png', dpi=150)
plt.show()

print("\\nClassification Report:")
print(classification_report(true_labels, predictions, target_names=['Negative', 'Positive']))

## Part 6: Comparison with Unidirectional LSTM


In [None]:
# Unidirectional LSTM for comparison
class LSTMClassifier(nn.Module):
    """Unidirectional LSTM"""
    
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
                 n_layers=2, dropout=0.5, pad_idx=0):
        super(LSTMClassifier, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        
        # Unidirectional LSTM
        self.lstm = nn.LSTM(
            embedding_dim, hidden_dim, num_layers=n_layers,
            batch_first=True, bidirectional=False,  # Not bidirectional
            dropout=dropout if n_layers > 1 else 0
        )
        
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim, output_dim)  # Only hidden_dim (not *2)
    
    def forward(self, text, text_lengths):
        embedded = self.embedding(text)
        packed_embedded = pack_padded_sequence(
            embedded, text_lengths.cpu(), batch_first=True, enforce_sorted=False
        )
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        final_hidden = hidden[-1]  # Only last layer
        output = self.dropout(final_hidden)
        output = self.fc(output)
        output = torch.sigmoid(output)
        return output.squeeze(1)

# Create and train LSTM
lstm_model = LSTMClassifier(
    VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM,
    N_LAYERS, DROPOUT, PAD_IDX
).to(device)

print("Model Comparison")
print(f"BiLSTM parameters: {sum(p.numel() for p in bilstm_model.parameters()):,}")
print(f"LSTM parameters: {sum(p.numel() for p in lstm_model.parameters()):,}")
print(f"\\nBiLSTM has ~{sum(p.numel() for p in bilstm_model.parameters()) / sum(p.numel() for p in lstm_model.parameters()):.2f}x more parameters")

# Train LSTM
print("Training Unidirectional LSTM")

lstm_optimizer = optim.Adam(lstm_model.parameters(), lr=LEARNING_RATE)
lstm_history = {'train_loss': [], 'train_acc': [], 'test_loss': [], 'test_acc': []}

for epoch in range(N_EPOCHS):
    print(f"\\nEpoch {epoch+1}/{N_EPOCHS}")
    train_loss, train_acc = train_epoch(lstm_model, train_loader, lstm_optimizer, criterion, device)
    test_loss, test_acc, _, _ = evaluate(lstm_model, test_loader, criterion, device)
    
    lstm_history['train_loss'].append(train_loss)
    lstm_history['train_acc'].append(train_acc)
    lstm_history['test_loss'].append(test_loss)
    lstm_history['test_acc'].append(test_acc)
    
    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}%")
    print(f"Test Loss: {test_loss:.4f} | Test Acc: {test_acc*100:.2f}%")

In [None]:
# Compare BiLSTM vs LSTM
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

epochs = range(1, N_EPOCHS + 1)

# Test Loss comparison
ax = axes[0]
ax.plot(epochs, history['test_loss'], 'b-', label='BiLSTM', linewidth=2, marker='o')
ax.plot(epochs, lstm_history['test_loss'], 'r-', label='LSTM', linewidth=2, marker='s')
ax.set_xlabel('Epoch', fontsize=12)
ax.set_ylabel('Test Loss', fontsize=12)
ax.set_title('Test Loss Comparison', fontsize=14, fontweight='bold')
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)

# Test Accuracy comparison
ax = axes[1]
ax.plot(epochs, [acc*100 for acc in history['test_acc']], 'b-', 
        label='BiLSTM', linewidth=2, marker='o')
ax.plot(epochs, [acc*100 for acc in lstm_history['test_acc']], 'r-', 
        label='LSTM', linewidth=2, marker='s')
ax.set_xlabel('Epoch', fontsize=12)
ax.set_ylabel('Test Accuracy (%)', fontsize=12)
ax.set_title('Test Accuracy Comparison', fontsize=14, fontweight='bold')
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)

plt.tight_layout()
# plt.savefig('bilstm_vs_lstm.png', dpi=150)
plt.show()

# Summary table
print("FINAL COMPARISON SUMMARY")
print(f"{'Model':<15} {'Parameters':>15} {'Best Test Acc':>15}")
print(f"{'BiLSTM':<15} {sum(p.numel() for p in bilstm_model.parameters()):>15,} "
      f"{max(history['test_acc'])*100:>14.2f}%")
print(f"{'LSTM':<15} {sum(p.numel() for p in lstm_model.parameters()):>15,} "
      f"{max(lstm_history['test_acc'])*100:>14.2f}%")

improvement = (max(history['test_acc']) - max(lstm_history['test_acc'])) * 100
print(f"\\n BiLSTM improvement: +{improvement:.2f}%")

## Part 7: Prediction on Custom Text

In [None]:
def predict_sentiment(model, text, vocab, device):
    """Predict sentiment for a single text"""
    model.eval()
    
    # Preprocess
    indices = vocab.text_to_indices(text)
    sequence = torch.tensor(indices, dtype=torch.long).unsqueeze(0).to(device)
    length = torch.tensor([len(indices)]).to(device)
    
    # Predict
    with torch.no_grad():
        prediction = model(sequence, length)
    
    prob = prediction.item()
    sentiment = "Positive" if prob >= 0.5 else "Negative"
    confidence = prob if prob >= 0.5 else 1 - prob
    
    return sentiment, confidence

# Test with custom reviews
test_reviews = [
    "This movie was absolutely fantastic! I loved every minute of it.",
    "Terrible film. Waste of time and money. Do not watch.",
    "The acting was good but the plot was confusing and boring.",
    "Best movie I've seen this year! Highly recommended!",
    "Not bad, but not great either. Just average."
]

print("Sample Predictions")

for i, review in enumerate(test_reviews, 1):
    sentiment, confidence = predict_sentiment(bilstm_model, review, vocab, device)
    
    print(f"\\n[{i}] Review: {review}")
    print(f"    Prediction: {sentiment} (confidence: {confidence*100:.2f}%)")