<a href="https://colab.research.google.com/github/aburkov/theLMbook/blob/main/emotion_classifier_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Import required libraries
import torch                             # PyTorch for tensor operations and deep learning
import torch.nn as nn                    # Neural network module from PyTorch
import numpy as np                       # NumPy for numerical operations
import re                                # Regular expressions for text processing (if needed)
import urllib.request                    # For downloading files from URLs
import gzip                              # For handling compressed files
import json                              # For parsing JSON data
import requests                          # For making HTTP requests to download data
import random                            # For shuffling data and setting random seeds
import pickle                            # For saving and loading serialized objects
import os                                # For file system operations
from tqdm import tqdm                    # For displaying progress bars during loops
from torch.utils.data import Dataset, DataLoader  # For creating custom datasets and data loaders in PyTorch


def set_seed(seed):
    """
    Sets random seeds for reproducibility across different libraries.

    Args:
        seed (int): Seed value for random number generation.
    """
    random.seed(seed)                      # Set seed for Python's built-in random module
    torch.manual_seed(seed)                # Set seed for CPU operations in PyTorch
    torch.cuda.manual_seed_all(seed)       # Set seed for all GPUs
    torch.backends.cudnn.deterministic = True  # Use deterministic algorithms for cuDNN
    torch.backends.cudnn.benchmark = False     # Disable cuDNN auto-tuner for consistent behavior


class Tokenizer:
    """
    Basic tokenizer that splits text on whitespace.
    """
    def tokenize(self, text):
        """
        Splits the input text into tokens based on whitespace.

        Args:
            text (str): Input text string to tokenize.

        Returns:
            list: List of word tokens.
        """
        words = text.split()             # Split text by whitespace
        return words


class Embedder:
    """
    Embedder that converts tokens into their corresponding embeddings.

    Attributes:
        embeddings (dict): Dictionary mapping words to their vector representations.
        emb_dim (int): Dimensionality of the embeddings.
        seq_len (int): Fixed sequence length for input text.
    """
    def __init__(self, embeddings, emb_dim, seq_len):
        """
        Initializes the Embedder with embeddings, embedding dimension, and sequence length.

        Args:
            embeddings (dict): Pre-loaded word embeddings.
            emb_dim (int): Dimension of the embeddings.
            seq_len (int): Maximum number of tokens to consider.
        """
        self.embeddings = embeddings
        self.emb_dim = emb_dim
        self.seq_len = seq_len

    def embed(self, tokens):
        """
        Converts a list of tokens into a tensor of embeddings.

        Tokens are looked up in the embeddings dictionary. If a token is not found,
        a zero vector is used. The sequence is truncated or padded to match the fixed sequence length.

        Args:
            tokens (list): List of token strings.

        Returns:
            torch.Tensor: Tensor of shape (seq_len, emb_dim) representing the embedded tokens.
        """
        embeddings = []
        # Process each token up to the maximum sequence length
        for word in tokens[:self.seq_len]:
            if word in self.embeddings:
                embeddings.append(torch.tensor(self.embeddings[word]))
            elif word.lower() in self.embeddings:
                embeddings.append(torch.tensor(self.embeddings[word.lower()]))
            else:
                # Use a zero vector for words not found in the embeddings
                embeddings.append(torch.zeros(self.emb_dim))

        # Pad sequence with zero vectors if the number of tokens is less than seq_len
        if len(embeddings) < self.seq_len:
            padding_size = self.seq_len - len(embeddings)
            embeddings.extend([torch.zeros(self.emb_dim)] * padding_size)

        # Stack list of tensors into a single tensor of shape (seq_len, emb_dim)
        return torch.stack(embeddings)


def load_embeddings(url, filename="vectors.dat"):
    """
    Downloads and loads word embeddings from a gzipped file.

    If the file does not exist locally, it is downloaded from the provided URL.
    The file is expected to have a header indicating vocabulary size and embedding dimension,
    followed by each word and its corresponding binary vector.

    Args:
        url (str): URL to download the embeddings from.
        filename (str): Local filename to save/load the embeddings.

    Returns:
        tuple: A tuple containing:
            - vectors (dict): Mapping from words to their embedding vectors (as NumPy arrays).
            - emb_dim (int): Dimensionality of the embedding vectors.
    """
    # Check if the embeddings file exists locally
    if not os.path.exists(filename):
        # Download the embeddings file with a progress bar
        with tqdm(unit="B", unit_scale=True, unit_divisor=1024, desc="Downloading") as progress_bar:
            def report_hook(count, block_size, total_size):
                if total_size != -1:
                    progress_bar.total = total_size
                progress_bar.update(block_size)
            urllib.request.urlretrieve(url, filename, reporthook=report_hook)
    else:
        print(f"File {filename} already exists. Skipping download.")

    # Open the gzipped embeddings file in binary read mode
    with gzip.open(filename, "rb") as f:
        # Read header line to get vocabulary size and embedding dimension
        header = f.readline()
        vocab_size, emb_dim = map(int, header.split())

        vectors = {}
        # Calculate the number of bytes for each embedding vector
        binary_len = np.dtype("float32").itemsize * emb_dim

        # Read each word and its corresponding embedding vector with a progress bar
        with tqdm(total=vocab_size, desc="Loading word vectors") as pbar:
            for _ in range(vocab_size):
                word = []
                # Read characters one by one until a space is encountered (indicating end of word)
                while True:
                    ch = f.read(1)
                    if ch == b" ":
                        word = b"".join(word).decode("utf-8")
                        break
                    if ch != b"\n":
                        word.append(ch)

                # Read the binary vector data and convert it into a NumPy array of type float32
                vector = np.frombuffer(f.read(binary_len), dtype="float32")
                vectors[word] = vector
                pbar.update(1)

    return vectors, emb_dim


def load_and_split_data(url, test_ratio=0.1):
    """
    Downloads, decompresses, and splits the dataset into training and testing sets.

    The dataset is expected to be a gzipped file where each line is a JSON object.

    Args:
        url (str): URL to download the dataset from.
        test_ratio (float): Proportion of data to be used as the test set.

    Returns:
        tuple: A tuple containing:
            - train_data (list): List of training examples.
            - test_data (list): List of testing examples.
    """
    # Download the dataset from the provided URL
    response = requests.get(url)
    # Decompress the gzipped content and decode it to a string
    content = gzip.decompress(response.content).decode()
    # Parse each line as a JSON object
    data = [json.loads(line) for line in content.splitlines()]
    # Shuffle the data to ensure a random distribution
    random.shuffle(data)
    # Determine the split index based on the test_ratio
    split_index = int(len(data) * (1 - test_ratio))
    return data[:split_index], data[split_index:]


def download_and_prepare_data(data_url, vectors_url, seq_len, batch_size):
    """
    Downloads and prepares the dataset and word embeddings for training and evaluation.

    This function downloads the text dataset and word embeddings, creates label mappings,
    initializes the tokenizer and embedder, and returns data loaders for training and testing.

    Args:
        data_url (str): URL to download the text dataset.
        vectors_url (str): URL to download the word embeddings.
        seq_len (int): Fixed sequence length for token embeddings.
        batch_size (int): Batch size for the data loaders.

    Returns:
        tuple: A tuple containing:
            - train_loader (DataLoader): DataLoader for training data.
            - test_loader (DataLoader): DataLoader for testing data.
            - id_to_label (dict): Mapping from label IDs to label names.
            - num_classes (int): Number of unique classes.
            - emb_dim (int): Dimensionality of the word embeddings.
    """
    # Load and split the dataset into training and testing splits
    train_split, test_split = load_and_split_data(data_url, test_ratio=0.1)

    # Load pre-trained word embeddings and get the embedding dimension
    embeddings, emb_dim = load_embeddings(vectors_url)

    # Create mappings between labels and their numeric IDs using the training data
    label_to_id, id_to_label, num_classes = create_label_mappings(train_split)

    # Initialize the tokenizer and embedder with the loaded embeddings and sequence length
    tokenizer = Tokenizer()
    embedder = Embedder(embeddings, emb_dim, seq_len)

    # Create DataLoaders for both training and testing datasets
    train_loader, test_loader = create_data_loaders(
        train_split, test_split,
        tokenizer, embedder,
        label_to_id, batch_size
    )

    return (train_loader, test_loader, id_to_label, num_classes, emb_dim)


class TextClassificationDataset(Dataset):
    """
    PyTorch Dataset for text classification.

    This dataset converts raw text and label data into a format that can be fed into the model.
    It tokenizes text and then embeds it using the provided tokenizer and embedder.

    Args:
        data (list): List of dictionaries containing "text" and "label" keys.
        tokenizer (Tokenizer): Tokenizer instance to split text into tokens.
        embedder (Embedder): Embedder instance to convert tokens into embeddings.
        label_to_id (dict): Mapping from label strings to numeric IDs.
    """
    def __init__(self, data, tokenizer, embedder, label_to_id):
        # Extract texts and convert labels to their corresponding IDs
        self.texts = [item["text"] for item in data]
        self.label_ids = [label_to_id[item["label"]] for item in data]
        self.tokenizer = tokenizer
        self.embedder = embedder

    def __len__(self):
        """
        Returns the total number of examples in the dataset.
        """
        return len(self.texts)

    def __getitem__(self, idx):
        """
        Retrieves the embedded text and label for the example at the specified index.

        Args:
            idx (int): Index of the example to retrieve.

        Returns:
            tuple: A tuple containing:
                - embeddings (torch.Tensor): Tensor of shape (seq_len, emb_dim).
                - label (torch.Tensor): Tensor containing the label ID.
        """
        # Tokenize the text at the given index
        tokens = self.tokenizer.tokenize(self.texts[idx])
        # Convert tokens into embeddings using the embedder
        embeddings = self.embedder.embed(tokens)
        # Return the embeddings and corresponding label as a tensor
        return embeddings, torch.tensor(self.label_ids[idx], dtype=torch.long)


class CNNTextClassifier(nn.Module):
    """
    Convolutional Neural Network for text classification.

    This model applies two 1D convolutional layers followed by fully connected layers
    to classify input text based on its embedded representation.

    Args:
        emb_dim (int): Dimensionality of the input embeddings.
        num_classes (int): Number of target classes for classification.
        seq_len (int): Fixed sequence length of the input text.
        id_to_label (dict): Mapping from label IDs to label names.
    """
    def __init__(self, emb_dim, num_classes, seq_len, id_to_label):
        super().__init__()
        # Save model configuration for later use (e.g., during inference)
        self.config = {
            "emb_dim": emb_dim,
            "num_classes": num_classes,
            "seq_len": seq_len,
            "id_to_label": id_to_label
        }
        # First convolutional layer: input channels = emb_dim, output channels = 512
        self.conv1 = nn.Conv1d(emb_dim, 512, kernel_size=3, padding=1)
        # Second convolutional layer: input channels = 512, output channels = 256
        self.conv2 = nn.Conv1d(512, 256, kernel_size=3, padding=1)
        # Fully connected layer to reduce flattened features to 128 units
        self.fc1 = nn.Linear(256 * seq_len, 128)
        # Output layer mapping to the number of classes
        self.fc2 = nn.Linear(128, num_classes)
        # ReLU activation function
        self.relu = nn.ReLU()

    def forward(self, x):
        """
        Defines the forward pass of the CNN model.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, seq_len, emb_dim).

        Returns:
            torch.Tensor: Output logits tensor of shape (batch_size, num_classes).
        """
        x = x.permute(0, 2, 1)           # Rearrange dimensions to (batch_size, emb_dim, seq_len)
        x = self.relu(self.conv1(x))     # Apply first convolution and ReLU activation
        x = self.relu(self.conv2(x))     # Apply second convolution and ReLU activation
        x = x.flatten(start_dim=1)       # Flatten features for the fully connected layers
        x = self.fc1(x)                  # Apply first fully connected layer
        return self.fc2(x)               # Return output logits from the final layer


def calculate_accuracy(model, dataloader, device):
    """
    Evaluates the model's accuracy on the provided dataset.

    Args:
        model (nn.Module): Trained model.
        dataloader (DataLoader): DataLoader for the dataset to evaluate.
        device: Device on which computations are performed.

    Returns:
        float: Accuracy as a fraction of correct predictions.
    """
    model.eval()                       # Set model to evaluation mode
    correct = 0
    total = 0
    with torch.no_grad():              # Disable gradient calculations for efficiency
        for batch in dataloader:
            embeddings, labels = batch
            embeddings = embeddings.to(device)
            labels = labels.to(device)
            outputs = model(embeddings)           # Forward pass
            _, predicted = torch.max(outputs, 1)   # Get predicted class indices
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = correct / total
    model.train()                      # Set model back to training mode
    return accuracy


def create_label_mappings(train_dataset):
    """
    Creates mappings between label strings and numeric IDs based on the training dataset.

    Args:
        train_dataset (list): List of training examples with a "label" field.

    Returns:
        tuple: A tuple containing:
            - label_to_id (dict): Mapping from label string to numeric ID.
            - id_to_label (dict): Mapping from numeric ID to label string.
            - num_classes (int): Total number of unique classes.
    """
    # Extract and sort unique labels from the training data
    unique_labels = sorted(set(item["label"] for item in train_dataset))
    # Create a mapping from each label to a unique integer ID
    label_to_id = {label: i for i, label in enumerate(unique_labels)}
    # Create the reverse mapping from ID to label
    id_to_label = {i: label for label, i in label_to_id.items()}
    return label_to_id, id_to_label, len(unique_labels)


def create_data_loaders(train_split, test_split, tokenizer, embedder, label_to_id, batch_size):
    """
    Creates PyTorch DataLoaders for training and testing datasets.

    Args:
        train_split (list): List of training examples.
        test_split (list): List of testing examples.
        tokenizer (Tokenizer): Tokenizer instance for processing text.
        embedder (Embedder): Embedder instance for converting tokens to embeddings.
        label_to_id (dict): Mapping from label strings to numeric IDs.
        batch_size (int): Batch size for the DataLoaders.

    Returns:
        tuple: A tuple containing:
            - train_loader (DataLoader): DataLoader for the training dataset.
            - test_loader (DataLoader): DataLoader for the testing dataset.
    """
    # Initialize the custom dataset for training and testing data
    train_dataset = TextClassificationDataset(train_split, tokenizer, embedder, label_to_id)
    test_dataset = TextClassificationDataset(test_split, tokenizer, embedder, label_to_id)
    # Create DataLoaders; enable shuffling for training data
    train_loader = DataLoader(train_dataset, batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size)
    return train_loader, test_loader


def save_model(model, prefix):
    """
    Saves the trained model's state dictionary and configuration to disk.

    Args:
        model (nn.Module): Trained model to be saved.
        prefix (str): Prefix for the saved file name.
    """
    # Save the model's state and configuration as a checkpoint file
    torch.save({
        "state_dict": model.state_dict(),
        "config": model.config
    }, f"{prefix}_model.pth")


def load_model(prefix):
    """
    Loads a saved model from disk and prepares it for evaluation.

    Args:
        prefix (str): Prefix used during model saving.

    Returns:
        nn.Module: The loaded CNNTextClassifier model in evaluation mode.
    """
    # Load checkpoint containing model state and configuration
    checkpoint = torch.load(f"{prefix}_model.pth", map_location=torch.device("cpu"))
    config = checkpoint["config"]
    # Reinitialize the model using the saved configuration
    model = CNNTextClassifier(
        emb_dim=config["emb_dim"],
        num_classes=config["num_classes"],
        seq_len=config["seq_len"],
        id_to_label=config["id_to_label"]
    )
    # Load the saved weights into the model
    model.load_state_dict(checkpoint["state_dict"])
    model.eval()  # Set the model to evaluation mode

    return model


def test_model(model, test_input, tokenizer=None, embedder=None):
    """
    Tests the model on a single input text and prints the predicted label.

    Args:
        model (nn.Module): Trained text classification model.
        test_input (str): Input text to classify.
        tokenizer (Tokenizer, optional): Tokenizer instance. If None, a new one is created.
        embedder (Embedder, optional): Embedder instance. If None, embeddings are loaded and a new embedder is created.

    Notes:
        This function prints the input text along with the predicted emotion.
    """
    # Initialize tokenizer if not provided
    if not tokenizer:
        tokenizer = Tokenizer()
    # Initialize embedder if not provided by loading embeddings using global vectors_url and seq_len
    if not embedder:
        embeddings, emb_dim = load_embeddings(vectors_url)
        embedder = Embedder(embeddings, emb_dim, seq_len)

    # Determine the device from the model's parameters
    device = next(model.parameters()).device
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():
        # Tokenize the input text
        tokens = tokenizer.tokenize(test_input)
        # Convert tokens into embeddings
        embeddings = embedder.embed(tokens)
        # Convert embeddings to a float tensor, add batch dimension, and move to the correct device
        embeddings = torch.tensor(embeddings, dtype=torch.float32).unsqueeze(0).to(device)

        # Perform a forward pass through the model to get predictions
        outputs = model(embeddings)
        _, predicted = torch.max(outputs.data, 1)

        # Map the predicted numeric label to the actual label string
        predicted_label = model.config["id_to_label"][predicted.item()]

    print(f"Input: {test_input}")
    print(f"Predicted emotion: {predicted_label}")


def set_hyperparameters():
    """
    Defines and returns hyperparameters for training.

    Returns:
        tuple: A tuple containing:
            - num_epochs (int): Number of training epochs.
            - seq_len (int): Fixed sequence length for input text.
            - batch_size (int): Batch size for training.
            - learning_rate (float): Learning rate for the optimizer.
    """
    num_epochs = 2
    seq_len = 100
    batch_size = 32
    learning_rate = 0.001
    return num_epochs, seq_len, batch_size, learning_rate


if __name__ == "__main__":
    # Set random seeds for reproducibility
    set_seed(42)
    # Determine computation device: use GPU if available, otherwise CPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # URLs for downloading the dataset and word embeddings
    data_url = "https://www.thelmbook.com/data/emotions"
    vectors_url = "https://www.thelmbook.com/data/word-vectors"

    # Set training hyperparameters
    num_epochs, seq_len, batch_size, learning_rate = set_hyperparameters()

    # Download and prepare data loaders, label mappings, and embedding dimensions
    train_loader, test_loader, id_to_label, num_classes, emb_dim = \
        download_and_prepare_data(data_url, vectors_url, seq_len, batch_size)

    # Initialize the CNN text classifier model with the embedding dimension and label mappings
    model = CNNTextClassifier(emb_dim, num_classes, seq_len, id_to_label)
    model = model.to(device)  # Move model to the appropriate device

    # Define the loss function and optimizer
    criterion = nn.CrossEntropyLoss()                # Cross-entropy loss for classification
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)  # AdamW optimizer

    # Training loop over multiple epochs
    for epoch in range(num_epochs):
        model.train()                  # Set model to training mode
        total_loss = 0
        num_batches = 0
        # Initialize progress bar for the current epoch
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")
        for batch in progress_bar:
            batch_embeddings, batch_labels = batch
            batch_embeddings = batch_embeddings.to(device)
            batch_labels = batch_labels.to(device)
            optimizer.zero_grad()      # Reset gradients before backpropagation
            outputs = model(batch_embeddings)   # Forward pass through the model
            loss = criterion(outputs, batch_labels)  # Compute loss
            loss.backward()            # Backpropagation
            optimizer.step()           # Update model parameters

            total_loss += loss.item()
            num_batches += 1

            # Update progress bar with the current average loss
            progress_bar.set_postfix({"Loss": total_loss / num_batches})

        avg_loss = total_loss / num_batches

        # Evaluate model accuracy on the test set after each epoch
        test_acc = calculate_accuracy(model, test_loader, device)
        print(f"Epoch [{epoch+1}/{num_epochs}], Test Accuracy: {test_acc:.4f}")

    # Save the trained model to disk with the specified prefix
    model_name = "CNN_classifier"
    save_model(model, model_name)

Using device: cuda


Downloading: 1.53GB [04:39, 5.89MB/s]                            
Loading word vectors: 100%|██████████| 3000000/3000000 [00:55<00:00, 54420.35it/s]
Epoch 1/2: 100%|██████████| 563/563 [00:07<00:00, 72.08it/s, Loss=0.759]


Epoch [1/2], Test Accuracy: 0.8830


Epoch 2/2: 100%|██████████| 563/563 [00:06<00:00, 85.55it/s, Loss=0.248]


Epoch [2/2], Test Accuracy: 0.9055


In [2]:
if __name__ == "__main__":
    # Load the previously saved model using the specified model name prefix.
    # The 'load_model' function reads the checkpoint and reconstructs the CNNTextClassifier.
    loaded_model = load_model(model_name)

    # Load the word embeddings from the provided URL.
    # This returns a dictionary mapping words to their embedding vectors and the embedding dimension.
    embeddings, emb_dim = load_embeddings(vectors_url)

    # Initialize the tokenizer.
    # The Tokenizer here simply splits text into tokens based on whitespace.
    tokenizer = Tokenizer()

    # Create an embedder instance using the loaded embeddings, embedding dimension, and fixed sequence length.
    # The Embedder will convert tokenized text into a tensor of embeddings suitable for the model.
    embedder = Embedder(embeddings, emb_dim, seq_len)

    # Define a sample input text to classify.
    test_input = "I'm so happy to be able to train a text classifier!"

    # Use the test_model function to evaluate the loaded model on the sample input.
    # This function tokenizes the text, converts it to embeddings, performs a forward pass through the model,
    # and prints out the predicted label.
    test_model(loaded_model, test_input, tokenizer, embedder)


  checkpoint = torch.load(f"{prefix}_model.pth", map_location=torch.device("cpu"))


File vectors.dat already exists. Skipping download.


Loading word vectors: 100%|██████████| 3000000/3000000 [00:54<00:00, 54728.27it/s]
  embeddings = torch.tensor(embeddings, dtype=torch.float32).unsqueeze(0).to(device)


Input: I'm so happy to be able to train a text classifier!
Predicted emotion: joy
