In [52]:

import torch                             # PyTorch for tensor operations and deep learning
import torch.nn as nn                    # Neural network module from PyTorch
import numpy as np                       # NumPy for numerical operations
import re                                # Regular expressions for text processing (if needed)
import urllib.request                    # For downloading files from URLs
import gzip                              # For handling compressed files
import json                              # For parsing JSON data
import requests                          # For making HTTP requests to download data
import random                            # For shuffling data and setting random seeds
import pickle                            # For saving and loading serialized objects
import os                                # For file system operations
from tqdm import tqdm                    # For displaying progress bars during loops
from torch.utils.data import Dataset, DataLoader  # For creating custom datasets and data loaders in PyTorch


In [56]:
class Tokenizer:
    def tokenize(self, text):
        return text.split()

class Embedder:
    def __init__(self, embeddings, em_dim, seq_len):
        self.embeddings = embeddings
        self.em_dim = em_dim
        self.seq_len = seq_len

    def embed(self, tokens):
        embeddings = []
        for word in tokens[:self.seq_len]:
            if word in self.embeddings:
                embeddings.append(torch.tensor(self.embeddings[word]))
            elif word.lower() in self.embeddings:
                embeddings.append(torch.tensor(self.embeddings[word.lower()]))
            else:
                embeddings.append(torch.zeros(self.em_dim))
        if len(embeddings) < self.seq_len:
            padding_size = self.seq_len - len(embeddings)
            embeddings.extend([torch.zeros(self.em_dim)] * padding_size)
        return torch.stack(embeddings)

def load_embeddings(url, filename="vectors.dat"):
    if not os.path.exists(filename):
        with tqdm(unit="B", unit_scale=True, unit_divisor=1024, desc="Downloading") as progress_bar:
            def report_hook(count, block_size, total_size):
                if total_size != -1:
                    progress_bar.total = total_size
                progress_bar.update(block_size)
            urllib.request.urlretrieve(url, filename, reporthook=report_hook)
    else:
        print(f"File {filename} already exists. Skipping download.")

    with gzip.open(filename, "rb") as f:
        header = f.readline()
        vocab_size, emb_dim = map(int, header.split())
        vectors = {}
        binary_len = np.dtype("float32").itemsize * emb_dim
        with tqdm(total=vocab_size, desc="Loading word vectors") as pbar:
            for _ in range(vocab_size):
                word = []
                while True:
                    ch = f.read(1)
                    if ch == b" ":
                        word = b"".join(word).decode("utf-8")
                        break
                    if ch != b"\n":
                        word.append(ch)
                vector = np.frombuffer(f.read(binary_len), dtype="float32")
                vectors[word] = vector
                pbar.update(1)
    return vectors, emb_dim

def load_and_split_data(url, test_ratio=0.1):
    response = requests.get(url)
    content = gzip.decompress(response.content).decode()
    data = [json.loads(line) for line in content.splitlines() if line.strip()]  # Ensure valid JSON objects
    random.shuffle(data)
    split_index = int(len(data) * (1 - test_ratio))
    return data[:split_index], data[split_index:]

def download_and_prepare_data(data_url, vectors_url, seq_len, batch_size):
    train_split, test_split = load_and_split_data(data_url, test_ratio=0.1)
    embeddings, emb_dim = load_embeddings(vectors_url)
    label_to_id, id_to_label, num_classes = create_label_mappings(train_split)
    tokenizer = Tokenizer()
    embedder = Embedder(embeddings, emb_dim, seq_len)
    train_loader, test_loader = create_data_loaders(
        train_split, test_split,
        tokenizer, embedder,
        label_to_id, batch_size
    )
    return (train_loader, test_loader, id_to_label, num_classes, emb_dim)


class TextClassificationDataset(Dataset):
    def __init__(self, data, tokenizer, embedder, label_to_id):
        self.texts = [item["text"] for item in data]
        self.label_ids = [label_to_id[item["label"]] for item in data]
        self.tokenizer = tokenizer
        self.embedder = embedder

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        tokens = self.tokenizer.tokenize(self.texts[idx])
        embeddings = self.embedder.embed(tokens)
        return embeddings, torch.tensor(self.label_ids[idx], dtype=torch.long)


In [57]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from tqdm import tqdm

class CNNTextClassifier(nn.Module):
    def __init__(self, emb_dim, num_classes, seq_len, id_to_label):
        super().__init__()
        self.config = {
            "emb_dim": emb_dim,
            "num_classes": num_classes,
            "seq_len": seq_len,
            "id_to_label": id_to_label
        }
        self.conv1 = nn.Conv1d(emb_dim, 512, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(512, 256, kernel_size=3, padding=1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(256 * seq_len, 128)
        self.fc2 = nn.Linear(128, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.fc2(x)
        return x

# -------------------- Utility Functions -------------------- #

def calculate_accuracy(model, dataloader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in dataloader:
            embeddings, label = batch
            embeddings = embeddings.to(device)
            label = label.to(device)
            logits = model(embeddings)
            _, predicted = torch.max(logits, 1)
            total += label.size(0)
            correct += (predicted == label).sum().item()
    accuracy = correct / total
    model.train()
    return accuracy

def create_label_mappings(train_dataset):
    unique_labels = sorted(set(item["label"] for item in train_dataset))
    label_to_id = {label: i for i, label in enumerate(unique_labels)}
    id_to_label = {i: label for label, i in label_to_id.items()}
    return label_to_id, id_to_label, len(unique_labels)

def create_data_loaders(train_split, test_split, tokenizer, embedder, label_to_id, batch_size):
    train_dataset = TextClassificationDataset(train_split, tokenizer, embedder, label_to_id)
    test_dataset = TextClassificationDataset(test_split, tokenizer, embedder, label_to_id)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)
    return train_loader, test_loader

def save_model(model, prefix):
    torch.save({
        "state_dict": model.state_dict(),
        "config": model.config
    }, f"{prefix}_model.pth")

def load_model(prefix):
    checkpoint = torch.load(f"{prefix}_model.pth", map_location=torch.device("cpu"))
    config = checkpoint["config"]
    model = CNNTextClassifier(
        emb_dim=config["emb_dim"],
        num_classes=config["num_classes"],
        seq_len=config["seq_len"],
        id_to_label=config["id_to_label"]
    )
    model.load_state_dict(checkpoint["state_dict"])
    model.eval()
    return model

def test_model(model, test_input, tokenizer=None, embedder=None):
    if tokenizer is None:
        tokenizer = Tokenizer()
    if embedder is None:
        embeddings, emb_dim = load_embeddings(vectors_url)
        embedder = Embedder(embeddings, emb_dim, model.config["seq_len"])

    device = next(model.parameters()).device
    model.eval()
    with torch.no_grad():
        tokens = tokenizer.tokenize(test_input)
        embeddings = embedder.embed(tokens).unsqueeze(0).to(device)
        outputs = model(embeddings)
        _, predicted = torch.max(outputs.data, 1)
        predicted_label = model.config["id_to_label"][predicted.item()]
    print(f"Input: {test_input}")
    print(f"Predicted emotion: {predicted_label}")

def set_hyperparameters():
    num_epochs = 2
    seq_len = 100
    batch_size = 32
    learning_rate = 0.001
    return num_epochs, seq_len, batch_size, learning_rate


In [58]:
if __name__ == "__main__":
    # set_seed(42)
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    print(f"Using device: {device}")

    data_url = "https://www.thelmbook.com/data/emotions"
    vectors_url = "https://www.thelmbook.com/data/word-vectors"

    num_epochs, seq_len, batch_size, learning_rate = set_hyperparameters()

    train_loader, test_loader, id_to_label, num_classes, emb_dim = \
        download_and_prepare_data(data_url, vectors_url, seq_len, batch_size)

    model = CNNTextClassifier(emb_dim, num_classes, seq_len, id_to_label)
    model = model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        num_batches = 0
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")
        for batch in progress_bar:
            batch_embeddings, batch_labels = batch
            batch_embeddings = batch_embeddings.to(device)
            batch_labels = batch_labels.to(device)
            optimizer.zero_grad()
            outputs = model(batch_embeddings)
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            num_batches += 1
            progress_bar.set_postfix({"Loss": total_loss / num_batches})

        avg_loss = total_loss / num_batches
        test_acc = calculate_accuracy(model, test_loader, device)
        print(f"Epoch [{epoch+1}/{num_epochs}], Test Accuracy: {test_acc:.4f}")

    model_name = "CNN_classifier"
    save_model(model, model_name)


Using device: mps
File vectors.dat already exists. Skipping download.


Loading word vectors: 100%|██████████████████████████████████████████████████████████████| 3000000/3000000 [00:18<00:00, 160181.55it/s]
Epoch 1/2: 100%|█████████████████████████████████████████████████████████████████████████| 563/563 [00:15<00:00, 36.06it/s, Loss=0.774]


Epoch [1/2], Test Accuracy: 0.8450


Epoch 2/2: 100%|█████████████████████████████████████████████████████████████████████████| 563/563 [00:12<00:00, 44.44it/s, Loss=0.247]


Epoch [2/2], Test Accuracy: 0.8905


In [59]:
if __name__ == "__main__":
    loaded_model = load_model(model_name)
    embeddings, emb_dim = load_embeddings(vectors_url)
    tokenizer = Tokenizer()
    embedder = Embedder(embeddings, emb_dim, seq_len)
    test_input = "I'm so happy to be able to train a text classifier!"
    test_model(loaded_model, test_input, tokenizer, embedder)


File vectors.dat already exists. Skipping download.


Loading word vectors: 100%|██████████████████████████████████████████████████████████████| 3000000/3000000 [00:18<00:00, 159378.42it/s]


Input: I'm so happy to be able to train a text classifier!
Predicted emotion: joy
