In [18]:
# --------- IMPORTS ---------
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import f1_score, classification_report
import numpy as np
import pandas as pd
import optuna
from datasets import load_dataset
from classes import *
from CNN import CNN
from sklearn.metrics import f1_score, classification_report
import pandas as pd

In [19]:
# --------- CONFIGURATION ---------
DATASET_NAME = 'go_emotions'
DATASET_CONFIG = 'simplified'
DEVICE = torch.device("mps" if torch.backends.mps.is_available()
                      else "cuda" if torch.cuda.is_available()
                      else "cpu")
BATCH_SIZE = 32
PREPROCESSOR = TextPreprocessor(extra_stopwords={'name'})

In [3]:
# Docstrings generated from Anysphere. (2025). Cursor [Large language model]. https://cursor.com/en

In [20]:
# --------- DATA PREPARATION ---------
def prepare_dataset_arrays():
    """Load GoEmotions, drop neutral-only samples, and return arrays per split.

    Removes the "neutral" label from multi-label annotations, filters examples
    that would otherwise have no labels, and returns raw texts plus binarized
    label arrays for train/validation/test, along with the remaining label names.

    Returns:
        Tuple[List[str], np.ndarray, List[str], np.ndarray, List[str], np.ndarray, List[str]]:
            `(X_train, y_train, X_val, y_val, X_test, y_test, new_labels)` where
            each `X_*` is a list of raw strings and each `y_*` is a multi-hot
            numpy array aligned to `new_labels`.
    """
    ds = load_dataset(DATASET_NAME, DATASET_CONFIG)

    label_names = ds["train"].features["labels"].feature.names
    neutral_idx = label_names.index("neutral")
    new_labels = [name for name in label_names if name != "neutral"]

    def not_neutral_only(ex):
        return not (len(ex["labels"]) == 1 and ex["labels"][0] == neutral_idx)

    def remove_neutral(ex):
        ex["labels"] = [l for l in ex["labels"] if l != neutral_idx]
        return ex

    for split in ["train", "validation", "test"]:
        ds[split] = ds[split].filter(not_neutral_only).map(remove_neutral)

    mlb = MultiLabelBinarizer(classes=range(len(new_labels)))
    mlb.fit(ds["train"]["labels"])

    def to_arrays(split):
        X = ds[split]["text"]
        y = mlb.transform(ds[split]["labels"])
        return X, y

    return (
        *to_arrays("train"),
        *to_arrays("validation"),
        *to_arrays("test"),
        new_labels
    )

In [21]:
def preprocess_data(X_train, y_train, X_val, y_val, X_test, y_test):
    """Clean texts, build vocabulary, and return encoded Torch datasets.

    Steps:
    - Apply `PREPROCESSOR.preprocess` to each text in train/val/test.
    - Remove empty strings and align labels accordingly.
    - Build vocabulary on cleaned training texts only.
    - Encode all splits to padded id sequences and wrap in `TextDataset`.

    Args:
        X_train, X_val, X_test (List[str]): Raw texts per split.
        y_train, y_val, y_test (array-like): Multi-hot label arrays aligned to texts.

    Returns:
        Tuple[TextDataset, TextDataset, TextDataset, int]: Encoded datasets and `vocab_size`.
    """
    # Preprocess texts
    X_train = [PREPROCESSOR.preprocess(text) for text in X_train]
    X_val = [PREPROCESSOR.preprocess(text) for text in X_val]
    X_test = [PREPROCESSOR.preprocess(text) for text in X_test]

    # Remove empty texts
    def clean_texts(texts, labels):
        cleaned_X, cleaned_y = [], []
        for text, label in zip(texts, labels):
            if text.strip():
                cleaned_X.append(text)
                cleaned_y.append(label)
        return cleaned_X, cleaned_y

    X_train, y_train = clean_texts(X_train, y_train)
    X_val, y_val = clean_texts(X_val, y_val)
    X_test, y_test = clean_texts(X_test, y_test)

    # Build vocabulary on training data
    PREPROCESSOR.build_vocab(X_train)
    vocab_size = len(PREPROCESSOR.word2idx)

    # Encode datasets
    X_train = PREPROCESSOR.encode_batch(X_train)
    X_val = PREPROCESSOR.encode_batch(X_val)
    X_test = PREPROCESSOR.encode_batch(X_test)

    # Create dataset objects
    return (
        TextDataset(X_train, y_train),
        TextDataset(X_val, y_val),
        TextDataset(X_test, y_test),
        vocab_size
    )

In [22]:
def prepare_data_loaders(train_dataset, val_dataset, test_dataset):
    """Create DataLoader objects for training, validation, and test splits.

    Args:
        train_dataset, val_dataset, test_dataset (TextDataset): Encoded datasets.

    Returns:
        Tuple[DataLoader, DataLoader, DataLoader]: DataLoaders with `BATCH_SIZE`.
    """
    return (
        DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True),
        DataLoader(val_dataset, batch_size=BATCH_SIZE),
        DataLoader(test_dataset, batch_size=BATCH_SIZE)
    )

In [23]:
def compute_pos_weight(y_train):
    """Compute normalized positive class weights for BCEWithLogitsLoss.

    Uses per-class ratio of negatives to positives, capped to avoid extremes,
    then normalizes by the mean to keep magnitudes stable.

    Args:
        y_train (array-like): Multi-hot labels for the training split.

    Returns:
        torch.Tensor: Vector of shape `(num_classes,)` with normalized weights.
    """
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
    N, C = y_train_tensor.shape
    pos_counts = y_train_tensor.sum(dim=0)
    neg_counts = N - pos_counts
    pos_weight = (neg_counts / pos_counts.clamp(min=1)).clamp(max=5.0)
    return pos_weight / pos_weight.mean()

In [24]:
def tune_thresholds(probs, targets, low=0.1, high=0.9, steps=81):
    """Grid-search per-class thresholds to maximize micro-F1 per label.

    Args:
        probs (np.ndarray): Predicted probabilities, shape `(N, C)`.
        targets (np.ndarray): Binary ground-truth array, shape `(N, C)`.
        low (float): Lower bound for threshold sweep (inclusive).
        high (float): Upper bound for threshold sweep (inclusive).
        steps (int): Number of thresholds to evaluate between `low` and `high`.

    Returns:
        np.ndarray: Best threshold per class, shape `(C,)`.
    """
    C = probs.shape[1]
    best_thresholds = np.full(C, 0.5)
    for i in range(C):
        best_f1 = 0.0
        for t in np.linspace(low, high, steps):
            pred_i = (probs[:, i] >= t).astype(int)
            f1_i = f1_score(targets[:, i], pred_i, zero_division=0)
            if f1_i > best_f1:
                best_f1 = f1_i
                best_thresholds[i] = t
    return best_thresholds

In [25]:
def evaluate_with_thresholds(trainer, val_loader, test_loader, labels, save_csv=True):
    """Tune decision thresholds on validation, evaluate on both val/test.

    Args:
        trainer (MyCnnFunctions): Provides `evaluate` to get probabilities/labels.
        val_loader, test_loader (DataLoader): Validation and test splits.
        labels (List[str]): Human-readable label names for reports/CSVs.
        save_csv (bool): If True, saves per-label F1 tables to CSV files.

    Returns:
        Tuple[np.ndarray, pd.DataFrame, pd.DataFrame]: `(thresholds, val_df, test_df)`.
    """
    # Tune on validation
    probs_val, targets_val = trainer.evaluate(val_loader)
    best_thresholds = tune_thresholds(probs_val, targets_val)
    preds_val = (probs_val >= best_thresholds[None, :]).astype(int)

    print("\n=== Validation Set ===")
    print(classification_report(targets_val, preds_val,
          zero_division=0, target_names=labels))
    f1_df_val = pd.DataFrame({'Label': labels, 'F1-score': f1_score(
        targets_val, preds_val, average=None, zero_division=0).round(2)})
    if save_csv:
        f1_df_val.to_csv('val_metrics.csv', index=False)

    # Apply to test
    probs_test, targets_test = trainer.evaluate(test_loader)
    preds_test = (probs_test >= best_thresholds[None, :]).astype(int)

    print("\n=== Test Set ===")
    print(classification_report(targets_test, preds_test,
          zero_division=0, target_names=labels))
    f1_df_test = pd.DataFrame({'Label': labels, 'F1-score': f1_score(
        targets_test, preds_test, average=None, zero_division=0).round(2)})
    if save_csv:
        f1_df_test.to_csv('cnn_metrics.csv', index=False)

    return best_thresholds, f1_df_val, f1_df_test

In [26]:
# --------- TRAINING & TUNING ---------
def train_and_tune_model(vocab_size, labels, norm_weight, train_loader, val_loader, test_loader):
    """Optimize hyperparameters, train final model, and plot results.

    Runs an Optuna study to tune architecture and training hyperparameters, then
    retrains a final model on the best configuration and evaluates it.

    Args:
        vocab_size (int): Size of the token vocabulary.
        labels (List[str]): Label names for reporting and head size.
        norm_weight (torch.Tensor): Normalized positive class weights.
        train_loader, val_loader, test_loader (DataLoader): Data splits.
    """
    def objective(trial):
        embed_dim = trial.suggest_int('embed_dim', 50, 300)
        dropout = trial.suggest_float('dropout', 0.2, 0.7)
        lr = trial.suggest_float('lr', 1e-4, 1e-2, log=True)
        weight_decay = trial.suggest_float(
            'weight_decay', 1e-5, 1e-2, log=True)
        num_filter = trial.suggest_int('num_filter', 8, 256)
        fc1_size = trial.suggest_categorical("fc1_size", [64, 128, 256])
        alpha = trial.suggest_float("weight_scale", 0.25, 2.0)

        model = CNN(vocab_size, embed_dim, len(labels), num_filter,
                    dropout, fc1_size=fc1_size).to(DEVICE)
        optimizer = Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
        scaled = torch.clamp(norm_weight * alpha, max=5.0).to(DEVICE)
        loss_func = nn.BCEWithLogitsLoss(pos_weight=scaled)
        scheduler = ReduceLROnPlateau(optimizer, 'min')
        trainer = MyCnnFunctions(model, DEVICE, multi_label=True)

        result = trainer.model_trainer(epochs=5, train_loader=train_loader, val_loader=val_loader,
                                       loss_func=loss_func, optimizer=optimizer, scheduler=scheduler, patience=2)
        return float(result["best_val_f1"])

    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=30)
    best_params = study.best_params
    print("Best parameters:", best_params)

    # Final training
    model = CNN(vocab_size, best_params['embed_dim'], len(labels),
                best_params['num_filter'], best_params['dropout'], fc1_size=best_params['fc1_size']).to(DEVICE)
    optimizer = Adam(model.parameters(
    ), lr=best_params['lr'], weight_decay=best_params['weight_decay'])
    scaled = torch.clamp(
        norm_weight * best_params['weight_scale'], max=5.0).to(DEVICE)
    loss_func = nn.BCEWithLogitsLoss(pos_weight=scaled)
    scheduler = ReduceLROnPlateau(optimizer, 'min')
    trainer = MyCnnFunctions(model, DEVICE, multi_label=True)

    result = trainer.model_trainer(epochs=50, train_loader=train_loader, val_loader=val_loader,
                                   loss_func=loss_func, optimizer=optimizer, scheduler=scheduler, patience=2)

    torch.save(model.state_dict(), "cnn_model.pth")
    print("Model saved to cnn_model.pth")

    # Evaluate
    best_thresholds, f1_df_val, f1_df_test = evaluate_with_thresholds(
        trainer, val_loader, test_loader, labels)
    print(f'BEST THRESHOLDS: {best_thresholds}')
    trainer.plot_training(result["train_losses"], result["val_losses"],
                          result["train_accuracies"], result["val_f1s"])

In [None]:
# --------- MAIN ---------
def main():
    """Execute the complete CNN-based emotion classification pipeline with hyperparameter optimization.

    Orchestrates the entire deep learning pipeline for multi-label emotion classification:
    1. Dataset preparation: Load GoEmotions, remove neutral labels, create arrays
    2. Data preprocessing: Clean texts, build vocabulary, encode to tensors
    3. Data loader creation: Set up PyTorch DataLoaders for training/validation/test
    4. Class weight computation: Calculate balanced weights for imbalanced classes
    5. Model training and tuning: Use Optuna for hyperparameter optimization
    6. Evaluation: Apply optimal thresholds and generate comprehensive metrics
    7. Visualization: Plot training curves and save results

    The pipeline uses a CNN architecture with word embeddings, convolutional layers,
    and fully connected layers for multi-label emotion classification on the GoEmotions
    dataset. It includes automatic hyperparameter tuning and threshold optimization
    for optimal F1 scores.

    Outputs:
        - Saves trained model to 'cnn_model.pth'
        - Saves validation metrics to 'val_metrics.csv'
        - Saves test metrics to 'test_metrics.csv'
        - Displays training plots and classification reports
        - Prints optimal thresholds for each emotion class

    Configuration:
        Uses global constants: DATASET_NAME, DATASET_CONFIG, DEVICE, BATCH_SIZE, PREPROCESSOR

    Returns:
        None: Executes the complete pipeline and saves all results
    """
    print('---PREPARING DATASETS---')
    X_train, y_train, X_val, y_val, X_test, y_test, labels = prepare_dataset_arrays()
    print('---DONE---')
    print('---PREPROCESSING DATA---')
    train_dataset, val_dataset, test_dataset, vocab_size = preprocess_data(
        X_train, y_train, X_val, y_val, X_test, y_test)
    train_loader, val_loader, test_loader = prepare_data_loaders(
        train_dataset, val_dataset, test_dataset)
    pos_weight = compute_pos_weight(y_train)
    print('---DONE---')
    print('---TRAINING, TUNING, AND EVALUATING MODEL---')
    train_and_tune_model(vocab_size, labels, pos_weight,
                         train_loader, val_loader, test_loader)
    print('---DONE---')

In [27]:
if __name__ == '__main__':
    print('---PREPARING DATASETS---')
    X_train, y_train, X_val, y_val, X_test, y_test, labels = prepare_dataset_arrays()
    print('---DONE---')
    print('---PREPROCESSING DATA---')
    train_dataset, val_dataset, test_dataset, vocab_size = preprocess_data(
        X_train, y_train, X_val, y_val, X_test, y_test)
    train_loader, val_loader, test_loader = prepare_data_loaders(
        train_dataset, val_dataset, test_dataset)
    pos_weight = compute_pos_weight(y_train)
    print('---DONE---')

---PREPARING DATASETS---
---DONE---
---PREPROCESSING DATA---
---DONE---


In [51]:
mlb = MultiLabelBinarizer(classes=range(len(labels)))
mlb.fit(y_train)

In [None]:
thresholds = [0.56, 0.34, 0.4, 0.24, 0.26, 0.19, 0.38, 0.36, 0.26, 0.27, 0.27, 0.41, 0.24, 0.25,
              0.54, 0.79, 0.5,  0.31, 0.62, 0.5,  0.39, 0.1,  0.34, 0.1,  0.12, 0.43, 0.53]

In [None]:
model = CNN(vocab_size, 169, len(labels),
            131, 0.274, fc1_size=256).to(DEVICE)
optimizer = Adam(model.parameters(
), lr=1.319e-3, weight_decay=5.256e-5)
scaled = torch.clamp(
    pos_weight * 1.779, max=5.0).to(DEVICE)
loss_func = nn.BCEWithLogitsLoss(pos_weight=scaled)
scheduler = ReduceLROnPlateau(optimizer, 'min')
trainer = MyCnnFunctions(model, DEVICE, multi_label=True)

result = trainer.model_trainer(epochs=50, train_loader=train_loader, val_loader=val_loader,
                               loss_func=loss_func, optimizer=optimizer, scheduler=scheduler, patience=2)

torch.save(model.state_dict(), "cnn_model.pth")
print("Model saved to cnn_model.pth")

Epoch 0   | Train Loss: 0.2073 | Train Acc: 0.9616 | Val Loss: 0.1788 | Val Micro F1: 0.4413 | Wait: 0
Epoch 1   | Train Loss: 0.1709 | Train Acc: 0.9644 | Val Loss: 0.1627 | Val Micro F1: 0.5018 | Wait: 0
Epoch 2   | Train Loss: 0.1578 | Train Acc: 0.9653 | Val Loss: 0.1567 | Val Micro F1: 0.5327 | Wait: 0
Epoch 3   | Train Loss: 0.1532 | Train Acc: 0.9655 | Val Loss: 0.1542 | Val Micro F1: 0.5392 | Wait: 0
Epoch 4   | Train Loss: 0.1501 | Train Acc: 0.9656 | Val Loss: 0.1524 | Val Micro F1: 0.5371 | Wait: 1
Epoch 5   | Train Loss: 0.1472 | Train Acc: 0.9659 | Val Loss: 0.1522 | Val Micro F1: 0.5337 | Wait: 2
Early stopping at epoch 5
Model saved to cnn_model.pth


In [None]:
device = next(model.parameters()).device

# Demonstration

In [None]:
def predict_cnn(texts):
    if isinstance(texts, str):
        texts = [texts]
    X = PREPROCESSOR.encode_batch(texts)
    device = next(model.parameters()).device  # Get model device
    X = X.to(device)

    # Pad sequences to at least the largest kernel size
    min_len = max([conv.kernel_size[0] for conv in model.convs])
    if X.size(1) < min_len:
        pad_size = min_len - X.size(1)
        X = F.pad(X, (0, pad_size))

    with torch.no_grad():
        logits = model(X)
        probs = torch.sigmoid(logits).cpu().numpy()

    if thresholds is not None:
        preds = (probs >= thresholds).astype(int)
    else:
        preds = (probs >= 0.5).astype(int)

    # Convert back to labels
    labels = mlb.inverse_transform(preds)[0]
    print(f"Text: {texts}")
    print(f"Labels: {[label_names[l] for l in labels]}")

In [None]:
text = input(str("Enter:"))

predict_cnn(text)