In [9]:
import os
import gzip
import pickle
import random
import wget
import re
from typing import List, Tuple, Dict

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import time

In [3]:
random.seed(0)
torch.manual_seed(0)

<torch._C.Generator at 0x109994170>

In [2]:
IMDB_URL = 'http://dlvu.github.io/data/imdb.{}.pkl.gz'
IMDB_FILE = 'imdb.{}.pkl.gz'

PAD, START, END, UNK = '.pad', '.start', '.end', '.unk'

def load_imdb(final=False, val=5000, seed=0, voc=None, char=False):

    cst = 'char' if char else 'word'

    imdb_url = IMDB_URL.format(cst)
    imdb_file = IMDB_FILE.format(cst)

    if not os.path.exists(imdb_file):
        wget.download(imdb_url)

    with gzip.open(imdb_file) as file:
        sequences, labels, i2w, w2i = pickle.load(file)

    if voc is not None and voc < len(i2w):
        nw_sequences = {}

        i2w = i2w[:voc]
        w2i = {w: i for i, w in enumerate(i2w)}

        mx, unk = voc, w2i['.unk']
        for key, seqs in sequences.items():
            nw_sequences[key] = []
            for seq in seqs:
                seq = [s if s < mx else unk for s in seq]
                nw_sequences[key].append(seq)

        sequences = nw_sequences

    if final:
        return (sequences['train'], labels['train']), (sequences['test'], labels['test']), (i2w, w2i), 2

    # Make a validation split
    random.seed(seed)

    x_train, y_train = [], []
    x_val, y_val = [], []

    val_ind = set( random.sample(range(len(sequences['train'])), k=val) )
    for i, (s, l) in enumerate(zip(sequences['train'], labels['train'])):
        if i in val_ind:
            x_val.append(s)
            y_val.append(l)
        else:
            x_train.append(s)
            y_train.append(l)

    return (x_train, y_train), \
           (x_val, y_val), \
           (i2w, w2i), 2

In [4]:
(x_train, y_train), (x_val, y_val), (i2w, w2i), numcls = load_imdb(final=False)

Number of Training Samples: 20000
Number of Validation Samples: 5000
Vocabulary Size: 99430
Number of Classes: 2



In [5]:
def pad_and_convert(sequences: List[List[int]], w2i: Dict[str, int],
                   max_length: int = None) -> torch.Tensor:
    """
    Pads a list of sequences to a fixed length and converts them to a PyTorch tensor.

    Args:
        sequences (List[List[int]]): A batch of sequences, where each sequence is a list of integer indices.
        w2i (Dict[str, int]): A dictionary mapping words to their integer indices.
        max_length (int, optional): The length to pad the sequences to. If None, uses the length of the longest sequence in the batch.

    Returns:
        torch.Tensor: A tensor of shape (batch_size, max_length) containing the padded sequences.
    """
    # Retrieve the padding index from the w2i dictionary
    pad_idx = w2i.get('.pad')
    if pad_idx is None:
        raise ValueError("The padding token '.pad' is not found in the w2i dictionary.")

    # Determine the maximum length for padding
    if max_length is None:
        max_length = max(len(seq) for seq in sequences)

    # Initialize a list to hold the padded sequences
    padded_sequences = []

    for seq in sequences:
        # Calculate the number of padding tokens needed
        padding_needed = max_length - len(seq)

        if padding_needed < 0:
            raise ValueError("A sequence is longer than the specified max_length.")

        # Pad the sequence with pad_idx
        padded_seq = seq + [pad_idx] * padding_needed
        padded_sequences.append(padded_seq)

    # Convert the list of padded sequences to a PyTorch tensor with dtype torch.long
    batch_tensor = torch.tensor(padded_sequences, dtype=torch.long)

    return batch_tensor

def create_batches(sequences: List[List[int]], labels: List[int],
                  batch_size: int, w2i: Dict[str, int]) -> List[Tuple[torch.Tensor, torch.Tensor]]:
    """
    Splits the data into batches, pads each batch, and converts them to tensors.

    Args:
        sequences (List[List[int]]): List of all sequences.
        labels (List[int]): Corresponding labels for each sequence.
        batch_size (int): Number of samples per batch.
        w2i (Dict[str, int]): Dictionary mapping words to their integer indices.

    Returns:
        List[Tuple[torch.Tensor, torch.Tensor]]: A list of tuples, each containing padded sequences and their labels as tensors.
    """
    batches = []
    total_samples = len(sequences)
    num_batches = (total_samples + batch_size - 1) // batch_size  # Ceiling division

    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = min(start_idx + batch_size, total_samples)
        batch_sequences = sequences[start_idx:end_idx]
        batch_labels = labels[start_idx:end_idx]

        # Pad and convert sequences
        padded_sequences = pad_and_convert(batch_sequences, w2i)

        # Convert labels to tensor
        labels_tensor = torch.tensor(batch_labels, dtype=torch.long)

        batches.append((padded_sequences, labels_tensor))

    return batches

In [6]:
batch_size = 64
train_batches = create_batches(x_train, y_train, batch_size, w2i)
val_batches = create_batches(x_val, y_val, batch_size, w2i)

In [12]:
device = torch.device("mps")
print(f'Using device: {device}')

Using device: mps


In [14]:
class Seq2SeqModel(nn.Module):
    def __init__(self, vocab_size: int, embedding_dim: int = 300, hidden_size: int = 300, num_classes: int = 2):
        """
        Initializes the Sequence-to-Sequence Model.
        
        Args:
            vocab_size (int): Number of unique tokens in the vocabulary.
            embedding_dim (int, optional): Dimension of the embedding vectors. Defaults to 300.
            hidden_size (int, optional): Dimension of the hidden layer. Defaults to 300.
            num_classes (int, optional): Number of output classes. Defaults to 2.
        """
        super(Seq2SeqModel, self).__init__()
        
        # 1) Embedding layer: Converts integer indices to embedding vectors
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        
        # 2) Linear layer: Maps each embedding vector to a hidden representation
        self.linear = nn.Linear(in_features=embedding_dim, out_features=hidden_size)
        
        # 3) ReLU activation: Introduces non-linearity
        self.relu = nn.ReLU()
        
        # 5) Final Linear layer: Projects the pooled representation to the number of classes
        self.output_linear = nn.Linear(in_features=hidden_size, out_features=num_classes)
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Defines the forward pass of the model.
        
        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, time_steps), dtype=torch.long
        
        Returns:
            torch.Tensor: Output tensor of shape (batch_size, num_classes), dtype=torch.float
        """
        # 1) Embedding: (batch, time) -> (batch, time, embedding_dim)
        embeds = self.embedding(x)
        
        # 2) Linear layer: (batch, time, embedding_dim) -> (batch, time, hidden_size)
        linear_out = self.linear(embeds)
        
        # 3) ReLU activation: (batch, time, hidden_size) -> (batch, time, hidden_size)
        relu_out = self.relu(linear_out)
        
        # 4) Global max pool along the time dimension: (batch, time, hidden_size) -> (batch, hidden_size)
        # torch.max returns a tuple (values, indices). We take the first element (values).
        pooled_out, _ = torch.max(relu_out, dim=1)
        
        # 5) Final Linear layer: (batch, hidden_size) -> (batch, num_classes)
        output = self.output_linear(pooled_out)
        
        # 6) Output tensor: (batch, num_classes)
        return output

In [15]:
# Model Initialization
vocab_size = len(i2w)  # Vocabulary size from load_imdb
embedding_dim = 300
hidden_size = 300
num_classes = 2
pad_idx = w2i.get('.pad', 0)  # Default to 0 if '.pad' not found

model = Seq2SeqModel(vocab_size=vocab_size,
                           embedding_dim=embedding_dim,
                           hidden_size=hidden_size,
                           num_classes=num_classes).to(device)

print(model)

Seq2SeqModel(
  (embedding): Embedding(99430, 300)
  (linear): Linear(in_features=300, out_features=300, bias=True)
  (relu): ReLU()
  (output_linear): Linear(in_features=300, out_features=2, bias=True)
)


In [16]:
# Loss Function and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Accuracy Calculation
def calculate_accuracy(preds, labels):
    _, predicted = torch.max(preds, dim=1)
    correct = (predicted == labels).sum().item()
    accuracy = correct / labels.size(0) * 100
    return accuracy

# Epoch Time Calculation
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [17]:
# Training Function
def train_model(model, train_batches, val_batches, criterion, optimizer, device, num_epochs=10):
    best_val_loss = float('inf')
    patience = 3
    counter = 0

    for epoch in range(1, num_epochs + 1):
        model.train()
        epoch_loss = 0
        epoch_acc = 0
        start_time = time.time()

        for batch_idx, (batch_sequences, batch_labels) in enumerate(train_batches):
            # Move data to device
            batch_sequences = batch_sequences.to(device)
            batch_labels = batch_labels.to(device)

            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(batch_sequences)

            # Compute loss
            loss = criterion(outputs, batch_labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Accumulate loss and accuracy
            epoch_loss += loss.item()
            epoch_acc += calculate_accuracy(outputs, batch_labels)

            if (batch_idx + 1) % 50 == 0 or (batch_idx + 1) == len(train_batches):
                print(f'Epoch [{epoch}/{num_epochs}], Batch [{batch_idx + 1}/{len(train_batches)}], '
                      f'Loss: {loss.item():.4f}, Accuracy: {calculate_accuracy(outputs, batch_labels):.2f}%')

        # Calculate average loss and accuracy for the epoch
        avg_loss = epoch_loss / len(train_batches)
        avg_acc = epoch_acc / len(train_batches)

        # Validation
        model.eval()
        val_loss = 0
        val_acc = 0

        with torch.no_grad():
            for batch_sequences, batch_labels in val_batches:
                # Move data to device
                batch_sequences = batch_sequences.to(device)
                batch_labels = batch_labels.to(device)

                # Forward pass
                outputs = model(batch_sequences)

                # Compute loss
                loss = criterion(outputs, batch_labels)
                val_loss += loss.item()
                val_acc += calculate_accuracy(outputs, batch_labels)

        avg_val_loss = val_loss / len(val_batches)
        avg_val_acc = val_acc / len(val_batches)

        end_time = time.time()
        epoch_mins, epoch_secs = epoch_time(start_time, end_time)

        print(f'Epoch [{epoch}/{num_epochs}] completed in {epoch_mins}m {epoch_secs}s')
        print(f'Training Loss: {avg_loss:.4f}, Training Accuracy: {avg_acc:.2f}%')
        print(f'Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {avg_val_acc:.2f}%\n')

        # Early Stopping
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            counter = 0
            # Save the best model
            torch.save(model.state_dict(), 'best_simple_seq2seq_model.pth')
        else:
            counter += 1
            if counter >= patience:
                print("Early stopping triggered!")
                break

In [18]:
# Start Training
num_epochs = 2
train_model(model, train_batches, val_batches, criterion, optimizer, device, num_epochs)

# Load the Best Model (Optional)
best_model = Seq2SeqModel(vocab_size=vocab_size,
                                embedding_dim=embedding_dim,
                                hidden_size=hidden_size,
                                num_classes=num_classes).to(device)
best_model.load_state_dict(torch.load('best_simple_seq2seq_model.pth'))
print('Best model loaded for evaluation.')

Epoch [1/2], Batch [50/313], Loss: 0.4294, Accuracy: 79.69%
Epoch [1/2], Batch [100/313], Loss: 0.4173, Accuracy: 81.25%
Epoch [1/2], Batch [150/313], Loss: 0.4022, Accuracy: 81.25%
Epoch [1/2], Batch [200/313], Loss: 0.2980, Accuracy: 85.94%
Epoch [1/2], Batch [250/313], Loss: 0.4032, Accuracy: 81.25%
Epoch [1/2], Batch [300/313], Loss: 0.4539, Accuracy: 82.81%
Epoch [1/2], Batch [313/313], Loss: 0.2485, Accuracy: 90.62%
Epoch [1/2] completed in 0m 21s
Training Loss: 0.3876, Training Accuracy: 81.90%
Validation Loss: 0.3227, Validation Accuracy: 86.37%

Epoch [2/2], Batch [50/313], Loss: 0.1777, Accuracy: 92.19%
Epoch [2/2], Batch [100/313], Loss: 0.2695, Accuracy: 89.06%
Epoch [2/2], Batch [150/313], Loss: 0.2708, Accuracy: 89.06%
Epoch [2/2], Batch [200/313], Loss: 0.1761, Accuracy: 95.31%
Epoch [2/2], Batch [250/313], Loss: 0.2935, Accuracy: 87.50%
Epoch [2/2], Batch [300/313], Loss: 0.3206, Accuracy: 87.50%
Epoch [2/2], Batch [313/313], Loss: 0.1509, Accuracy: 90.62%
Epoch [2/2] c

  best_model.load_state_dict(torch.load('best_simple_seq2seq_model.pth'))


In [20]:
def evaluate_model(model, test_batches, criterion, device):
    """
    Evaluates the model on the test dataset.
    
    Args:
        model (nn.Module): The trained sequence model.
        test_batches (List[Tuple[torch.Tensor, torch.Tensor]]): Test data batches.
        criterion (nn.Module): Loss function.
        device (torch.device): Device to run the evaluation on.
    
    Returns:
        None
    """
    model.eval()  # Set the model to evaluation mode
    test_loss = 0
    test_acc = 0
    
    with torch.no_grad():  # Disable gradient computation
        for batch_sequences, batch_labels in test_batches:
            # Move data to the appropriate device
            batch_sequences = batch_sequences.to(device)
            batch_labels = batch_labels.to(device)
            
            # Forward pass
            outputs = model(batch_sequences)
            
            # Compute loss
            loss = criterion(outputs, batch_labels)
            test_loss += loss.item()
            
            # Calculate accuracy
            test_acc += calculate_accuracy(outputs, batch_labels)
    
    # Calculate average loss and accuracy
    avg_test_loss = test_loss / len(test_batches)
    avg_test_acc = test_acc / len(test_batches)
    
    print(f'Test Loss: {avg_test_loss:.4f}, Test Accuracy: {avg_test_acc:.2f}%')

In [21]:
# Define Test Batches
# For example purposes, let's assume you have a function similar to create_batches
(x_test, y_test), _, _, _ = load_imdb(final= True)  # Modify as needed
test_batches = create_batches(x_test, y_test, batch_size, w2i)

# Evaluate on Test Set
evaluate_model(best_model, test_batches, criterion, device)

Test Loss: 0.1946, Test Accuracy: 92.56%
