In [6]:
# Import libraries
import pandas as pd
import numpy as np
import re
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from collections import Counter

torch.manual_seed(42)
np.random.seed(42)


Part 1: Data Loading & Preprocessing

In [7]:
# Part 1: Data Loading & Preprocessing

print("PART 1: DATA LOADING & PREPROCESSING")

# Load the IMDB dataset
df = pd.read_csv('IMDB Dataset.csv')
print(f"\nDataset loaded: {len(df)} samples")
print(df['sentiment'].value_counts())

def clean_text(text):
    """
    Clean the text by:
    - Converting to lowercase
    - Removing HTML tags
    - Removing punctuation (keeping only letters, numbers, and spaces)
    - Removing stopwords
    """
    # Convert to lowercase
    text = text.lower()

    # Remove HTML tags
    text = re.sub(r'<br\s*/?>', ' ', text)
    text = re.sub(r'<[^>]+>', '', text)

    # Keep only letters, numbers, and spaces
    text = re.sub(r'[^a-z0-9\s]', '', text)

    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()

    # Remove common stopwords
    stopwords = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
                 'of', 'with', 'is', 'was', 'are', 'were', 'been', 'be', 'have', 'has',
                 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may',
                 'might', 'can', 'it', 'its', 'this', 'that', 'these', 'those'}

    words = text.split()
    words = [w for w in words if w not in stopwords]

    return ' '.join(words)

# Apply cleaning
df['cleaned_review'] = df['review'].apply(clean_text)

# Map sentiment to binary values
df['label'] = df['sentiment'].map({'positive': 1, 'negative': 0})

# Split the dataset: 70/15/15 (train/val/test)
train_df, temp_df = train_test_split(df, test_size=0.3, random_state=42, stratify=df['label'])
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42, stratify=temp_df['label'])

print(f"\nTrain: {len(train_df)}, Val: {len(val_df)}, Test: {len(test_df)}")

# Build vocabulary from training data
class Vocabulary:
    def __init__(self, min_freq=2):
        self.min_freq = min_freq
        self.token2idx = {'<PAD>': 0, '<UNK>': 1}
        self.idx2token = {0: '<PAD>', 1: '<UNK>'}

    def build_vocab(self, texts):
        """Build vocabulary from texts"""
        word_counts = Counter()
        for text in texts:
            tokens = text.split()
            word_counts.update(tokens)

        idx = 2
        for word, count in word_counts.items():
            if count >= self.min_freq:
                self.token2idx[word] = idx
                self.idx2token[idx] = word
                idx += 1

    def encode(self, text, max_length=100):
        """Encode text to sequence of indices"""
        tokens = text.split()
        indices = [self.token2idx.get(token, 1) for token in tokens]  # 1 is <UNK>

        # Pad or truncate
        if len(indices) < max_length:
            indices = indices + [0] * (max_length - len(indices))  # 0 is <PAD>
        else:
            indices = indices[:max_length]

        return indices

# Build vocabulary
vocab = Vocabulary(min_freq=2)
vocab.build_vocab(train_df['cleaned_review'].values)
print(f"Vocabulary size: {len(vocab.token2idx)}")

# Encode sequences
MAX_LENGTH = 100
train_df['encoded'] = train_df['cleaned_review'].apply(lambda x: vocab.encode(x, MAX_LENGTH))
val_df['encoded'] = val_df['cleaned_review'].apply(lambda x: vocab.encode(x, MAX_LENGTH))
test_df['encoded'] = test_df['cleaned_review'].apply(lambda x: vocab.encode(x, MAX_LENGTH))
print("Text encoding complete!")

PART 1: DATA LOADING & PREPROCESSING

Dataset loaded: 50000 samples
sentiment
positive    25000
negative    25000
Name: count, dtype: int64

Train: 35000, Val: 7500, Test: 7500
Vocabulary size: 63441
Text encoding complete!


Part 2: CNN Model

In [8]:
# Part 2: CNN Model

print("PART 2: CNN MODEL")


# Custom Dataset
class IMDBDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.encodings[idx], dtype=torch.long), torch.tensor(self.labels[idx], dtype=torch.long)

# Create datasets
train_dataset = IMDBDataset(train_df['encoded'].tolist(), train_df['label'].tolist())
val_dataset = IMDBDataset(val_df['encoded'].tolist(), val_df['label'].tolist())
test_dataset = IMDBDataset(test_df['encoded'].tolist(), test_df['label'].tolist())

# Create dataloaders
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f"DataLoaders created with batch size: {batch_size}")

# CNN Model
class CNNModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, num_filters=100, kernel_size=3, num_classes=2):
        super(CNNModel, self).__init__()

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # 1D Convolutional layer
        # Input: [batch, seq_len, embedding_dim]
        # Need to transpose to [batch, embedding_dim, seq_len] for Conv1d
        self.conv1d = nn.Conv1d(in_channels=embedding_dim,
                                out_channels=num_filters,
                                kernel_size=kernel_size,
                                padding=kernel_size//2)  # Preserve sequence length

        # ReLU activation
        self.relu = nn.ReLU()

        # Max pooling layer
        self.pool = nn.MaxPool1d(kernel_size=2)

        # Calculate size after pooling
        pooled_length = MAX_LENGTH // 2

        # Fully connected layer
        self.fc = nn.Linear(num_filters * pooled_length, num_classes)

    def forward(self, x):

        x = self.embedding(x)

        # Transpose for Conv1d
        x = x.transpose(1, 2)

        # Convolution
        x = self.conv1d(x)
        x = self.relu(x)

        # Pooling
        x = self.pool(x)

        # Flatten
        x = x.view(x.size(0), -1)

        # Fully connected
        x = self.fc(x)

        return x

# Initialize CNN model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
cnn_model = CNNModel(vocab_size=len(vocab.token2idx), embedding_dim=128, num_filters=100).to(device)

print(f"\nCNN Model Architecture:")
print(cnn_model)
print(f"\nDevice: {device}")

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(cnn_model.parameters(), lr=1e-3)

# Training function
def train_model(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(train_loader)

# Evaluation function
def evaluate_model(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

# Train CNN model
print("\nTraining CNN Model...")
num_epochs = 3

for epoch in range(num_epochs):
    train_loss = train_model(cnn_model, train_loader, criterion, optimizer, device)
    val_accuracy = evaluate_model(cnn_model, val_loader, device)

    print(f"Epoch {epoch+1}/{num_epochs} - Training Loss: {train_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

# Test CNN model
test_accuracy = evaluate_model(cnn_model, test_loader, device)
print(f"\nCNN Test Accuracy: {test_accuracy:.2f}%")

PART 2: CNN MODEL
DataLoaders created with batch size: 64

CNN Model Architecture:
CNNModel(
  (embedding): Embedding(63441, 128)
  (conv1d): Conv1d(128, 100, kernel_size=(3,), stride=(1,), padding=(1,))
  (relu): ReLU()
  (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc): Linear(in_features=5000, out_features=2, bias=True)
)

Device: cpu

Training CNN Model...
Epoch 1/3 - Training Loss: 0.5609, Validation Accuracy: 80.33%
Epoch 2/3 - Training Loss: 0.3024, Validation Accuracy: 80.53%
Epoch 3/3 - Training Loss: 0.1428, Validation Accuracy: 81.69%

CNN Test Accuracy: 82.32%


 Part 3: LSTM Model

In [9]:
# Part 3: LSTM Model

print("PART 3: LSTM MODEL")

# LSTM Model
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, hidden_dim=128, num_classes=2, bidirectional=True):
        super(LSTMModel, self).__init__()

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # LSTM layer
        self.lstm = nn.LSTM(input_size=embedding_dim,
                           hidden_size=hidden_dim,
                           num_layers=1,
                           batch_first=True,
                           bidirectional=bidirectional)

        # Fully connected layer
        # If bidirectional, hidden state is concatenated from both directions
        fc_input_dim = hidden_dim * 2 if bidirectional else hidden_dim
        self.fc = nn.Linear(fc_input_dim, num_classes)

    def forward(self, x):

        x = self.embedding(x)

        # LSTM
        lstm_out, (hidden, cell) = self.lstm(x)

        # Use final hidden state
        if self.lstm.bidirectional:
            # Concatenate forward and backward hidden states
            hidden = torch.cat((hidden[-2], hidden[-1]), dim=1)
        else:
            hidden = hidden[-1]

        # Fully connected
        output = self.fc(hidden)

        return output

# Initialize LSTM model
lstm_model = LSTMModel(vocab_size=len(vocab.token2idx),
                       embedding_dim=128,
                       hidden_dim=128,
                       bidirectional=True).to(device)

print(f"\nLSTM Model Architecture:")
print(lstm_model)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=1e-3)

# Train LSTM model
print("\nTraining LSTM Model...")
num_epochs = 3

for epoch in range(num_epochs):
    train_loss = train_model(lstm_model, train_loader, criterion, optimizer, device)
    val_accuracy = evaluate_model(lstm_model, val_loader, device)

    print(f"Epoch {epoch+1}/{num_epochs} - Training Loss: {train_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

# Test LSTM model
test_accuracy = evaluate_model(lstm_model, test_loader, device)
print(f"\nLSTM Test Accuracy: {test_accuracy:.2f}%")

PART 3: LSTM MODEL

LSTM Model Architecture:
LSTMModel(
  (embedding): Embedding(63441, 128)
  (lstm): LSTM(128, 128, batch_first=True, bidirectional=True)
  (fc): Linear(in_features=256, out_features=2, bias=True)
)

Training LSTM Model...
Epoch 1/3 - Training Loss: 0.5673, Validation Accuracy: 77.97%
Epoch 2/3 - Training Loss: 0.3659, Validation Accuracy: 82.92%
Epoch 3/3 - Training Loss: 0.2495, Validation Accuracy: 83.77%

LSTM Test Accuracy: 84.01%
