# üè¥‚Äç‚ò†Ô∏è AN2DL25 Challenge 1 ‚Äî Pirate Pain Classification

This notebook implements a full deep-learning pipeline for multivariate time-series classification of the Pirate Pain dataset. It is inspired by the Lecture 4 notebook (`Timeseries Classification (1).ipynb`) but adapted for the competition setting, including data preparation, model training (RNN/GRU/LSTM variants), evaluation, and test-time inference.



In [None]:
# %%capture
# !pip install -q -r requirements.txt




In [None]:
import os
import random
import math
import copy
from pathlib import Path
from typing import Tuple, Dict, Optional, List
from datetime import datetime

import numpy as np
import pandas as pd
from sklearn.metrics import (
    f1_score,
    classification_report,
    confusion_matrix,
    precision_recall_fscore_support,
)
from sklearn.model_selection import StratifiedKFold, train_test_split
import matplotlib.pyplot as plt
import seaborn as sns

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
from torch.cuda.amp import autocast, GradScaler



In [None]:
SEED = 42

try:
    from google.colab import drive  # type: ignore
    IN_COLAB = True
except ImportError:  # pragma: no cover
    IN_COLAB = False

if IN_COLAB:
    drive.mount('/content/drive', force_remount=False)
    BASE_DIR = Path('/content/drive/MyDrive/[2025-2026]\ A2NDL/Challenge')
else:
    BASE_DIR = Path('/Users/md101ta/Desktop/Pirates')

DATA_DIR = (BASE_DIR / 'data').resolve()
OUTPUT_DIR = (BASE_DIR / 'outputs').resolve()
OUTPUT_DIR.mkdir(exist_ok=True, parents=True)

# Reproducibility
os.environ['PYTHONHASHSEED'] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)
    DEVICE = torch.device('cuda')
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
else:
    DEVICE = torch.device('cpu')

print(f'Running in Colab: {IN_COLAB}')
print(f'Device: {DEVICE}')
print(f'Data dir: {DATA_DIR}')
print(f'Output dir: {OUTPUT_DIR}')


In [None]:
LOG_DIR = (OUTPUT_DIR / 'logs').resolve()
CHECKPOINT_DIR = (OUTPUT_DIR / 'checkpoints').resolve()
LOG_DIR.mkdir(exist_ok=True, parents=True)
CHECKPOINT_DIR.mkdir(exist_ok=True, parents=True)



In [None]:
def load_data(data_dir: Path) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    X_train = pd.read_csv(data_dir / 'pirate_pain_train.csv')
    y_train = pd.read_csv(data_dir / 'pirate_pain_train_labels.csv')
    X_test = pd.read_csv(data_dir / 'pirate_pain_test.csv')
    return X_train, y_train, X_test


X_train_raw, y_train, X_test_raw = load_data(DATA_DIR)
print(X_train_raw.shape, y_train.shape, X_test_raw.shape)


In [None]:
CATEGORICAL_COLUMNS = ['n_legs', 'n_hands', 'n_eyes']
CATEGORY_MAPPINGS: Dict[str, Dict[str, int]] = {}

for col in CATEGORICAL_COLUMNS:
    uniques = pd.concat([X_train_raw[col], X_test_raw[col]]).dropna().unique()
    mapping = {value: idx for idx, value in enumerate(sorted(uniques))}
    CATEGORY_MAPPINGS[col] = mapping
    X_train_raw[col] = X_train_raw[col].map(mapping).astype(np.int32)
    X_test_raw[col] = X_test_raw[col].map(mapping).astype(np.int32)

FEATURE_COLUMNS = [col for col in X_train_raw.columns if col not in ['sample_index', 'time']]
TIME_STEPS = X_train_raw['time'].nunique()
NUM_FEATURES = len(FEATURE_COLUMNS)
NUM_CLASSES = y_train['label'].nunique()
print(f'Time steps: {TIME_STEPS} | Features: {NUM_FEATURES} | Classes: {NUM_CLASSES}')
print('Category mappings:', CATEGORY_MAPPINGS)

y_train['label'].value_counts().plot(kind='bar', title='Class distribution')
plt.show()



In [None]:
LABEL2IDX = {label: idx for idx, label in enumerate(sorted(y_train['label'].unique()))}
IDX2LABEL = {idx: label for label, idx in LABEL2IDX.items()}
print('Label mapping:', LABEL2IDX)


def pivot_timeseries(df: pd.DataFrame) -> np.ndarray:
    pivoted = (
        df.pivot(index='sample_index', columns='time', values=FEATURE_COLUMNS)
          .sort_index(axis=0)
          .sort_index(axis=1, level=1)
    )
    data = pivoted.to_numpy().reshape(-1, TIME_STEPS, NUM_FEATURES)
    return data


X_train_np = pivot_timeseries(X_train_raw)
X_test_np = pivot_timeseries(X_test_raw)
y_train_idx = y_train.set_index('sample_index').loc[pd.unique(X_train_raw['sample_index'])]['label'].map(LABEL2IDX).to_numpy()

print(X_train_np.shape, y_train_idx.shape, X_test_np.shape)



In [None]:
def compute_normalization_stats(data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    # data shape: (N, T, F)
    mean = data.reshape(-1, NUM_FEATURES).mean(axis=0)
    std = data.reshape(-1, NUM_FEATURES).std(axis=0) + 1e-6
    return mean, std


def normalize(data: np.ndarray, mean: np.ndarray, std: np.ndarray) -> np.ndarray:
    return (data - mean) / std


feat_mean, feat_std = compute_normalization_stats(X_train_np)
X_train_np = normalize(X_train_np, feat_mean, feat_std)
X_test_np = normalize(X_test_np, feat_mean, feat_std)



In [None]:
class TimeSeriesDataset(Dataset):
    def __init__(self, data: np.ndarray, labels: Optional[np.ndarray] = None):
        self.data = torch.tensor(data, dtype=torch.float32)
        self.labels = None if labels is None else torch.tensor(labels, dtype=torch.long)

    def __len__(self) -> int:
        return self.data.shape[0]

    def __getitem__(self, idx: int):
        if self.labels is None:
            return self.data[idx]
        return self.data[idx], self.labels[idx]



In [None]:
def create_dataloaders(X: np.ndarray, y: np.ndarray, valid_size: float = 0.2, batch_size: int = 64):
    X_train, X_valid, y_train, y_valid = train_test_split(
        X, y, test_size=valid_size, random_state=SEED, stratify=y
    )

    train_loader = DataLoader(TimeSeriesDataset(X_train, y_train), batch_size=batch_size, shuffle=True, drop_last=False)
    valid_loader = DataLoader(TimeSeriesDataset(X_valid, y_valid), batch_size=batch_size, shuffle=False, drop_last=False)
    return train_loader, valid_loader, (X_train, y_train, X_valid, y_valid)


BATCH_SIZE = 64
train_loader, valid_loader, (X_train_split, y_train_split, X_valid_split, y_valid_split) = create_dataloaders(X_train_np, y_train_idx, batch_size=BATCH_SIZE)



In [None]:
class RecurrentBackbone(nn.Module):
    def __init__(
        self,
        input_size: int,
        hidden_size: int = 128,
        num_layers: int = 2,
        dropout: float = 0.2,
        rnn_type: str = 'lstm',
        bidirectional: bool = True,
    ):
        super().__init__()
        rnn_cls = {
            'rnn': nn.RNN,
            'gru': nn.GRU,
            'lstm': nn.LSTM,
        }[rnn_type.lower()]
        self.rnn_type = rnn_type.lower()
        self.bidirectional = bidirectional
        self.rnn = rnn_cls(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0.0,
            batch_first=True,
            bidirectional=bidirectional,
        )
        proj_input = hidden_size * (2 if bidirectional else 1)
        self.head = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(proj_input, hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size, NUM_CLASSES),
        )

    def forward(self, x):
        out, _ = self.rnn(x)
        # use last time-step hidden state
        last = out[:, -1, :]
        return self.head(last)



In [None]:
def compute_classification_metrics(preds: np.ndarray, targets: np.ndarray) -> Dict[str, float]:
    accuracy = float((preds == targets).mean())
    precision, recall, f1, _ = precision_recall_fscore_support(
        targets,
        preds,
        average='macro',
        zero_division=0,
    )
    return {
        'accuracy': accuracy,
        'precision': float(precision),
        'recall': float(recall),
        'f1': float(f1),
    }


def train_one_epoch(
    model: nn.Module,
    loader: DataLoader,
    criterion: nn.Module,
    optimizer: torch.optim.Optimizer,
    scaler: GradScaler,
    max_grad_norm: float = 5.0,
) -> Tuple[float, Dict[str, float]]:
    model.train()
    running_loss = 0.0
    preds_all, targets_all = [], []

    for inputs, targets in loader:
        inputs = inputs.to(DEVICE)
        targets = targets.to(DEVICE)
        optimizer.zero_grad(set_to_none=True)

        with autocast(device_type=DEVICE.type, enabled=(DEVICE.type == 'cuda')):
            logits = model(inputs)
            loss = criterion(logits, targets)

        scaler.scale(loss).backward()
        if max_grad_norm is not None:
            scaler.unscale_(optimizer)
            nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * inputs.size(0)
        preds_all.append(torch.argmax(logits.detach(), dim=1).cpu())
        targets_all.append(targets.cpu())

    preds_np = torch.cat(preds_all).numpy()
    targets_np = torch.cat(targets_all).numpy()
    metrics = compute_classification_metrics(preds_np, targets_np)
    avg_loss = running_loss / len(loader.dataset)
    return avg_loss, metrics


def evaluate_epoch(
    model: nn.Module,
    loader: DataLoader,
    criterion: nn.Module,
) -> Tuple[float, Dict[str, float], np.ndarray, np.ndarray]:
    model.eval()
    running_loss = 0.0
    preds_all, targets_all = [], []

    with torch.no_grad():
        for inputs, targets in loader:
            inputs = inputs.to(DEVICE)
            targets = targets.to(DEVICE)
            with autocast(device_type=DEVICE.type, enabled=(DEVICE.type == 'cuda')):
                logits = model(inputs)
                loss = criterion(logits, targets)

            running_loss += loss.item() * inputs.size(0)
            preds_all.append(torch.argmax(logits, dim=1).cpu())
            targets_all.append(targets.cpu())

    preds_np = torch.cat(preds_all).numpy()
    targets_np = torch.cat(targets_all).numpy()
    metrics = compute_classification_metrics(preds_np, targets_np)
    avg_loss = running_loss / len(loader.dataset)
    return avg_loss, metrics, preds_np, targets_np



In [None]:
def fit_model(
    config: Dict,
    train_loader: DataLoader,
    valid_loader: DataLoader,
    run_name: str,
    tensorboard: bool = True,
) -> Tuple[nn.Module, Dict[str, List[float]], Dict]:
    model = RecurrentBackbone(
        input_size=NUM_FEATURES,
        hidden_size=config['hidden_size'],
        num_layers=config['num_layers'],
        dropout=config['dropout'],
        rnn_type=config['rnn_type'],
        bidirectional=config.get('bidirectional', False),
    ).to(DEVICE)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=config['lr'],
        weight_decay=config.get('weight_decay', 0.0),
    )
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer,
        mode='max',
        factor=config.get('scheduler_factor', 0.5),
        patience=config.get('scheduler_patience', 3),
    )
    scaler = GradScaler(enabled=(DEVICE.type == 'cuda'))

    history: Dict[str, List[float]] = {
        'train_loss': [],
        'train_accuracy': [],
        'train_f1': [],
        'train_precision': [],
        'train_recall': [],
        'valid_loss': [],
        'valid_accuracy': [],
        'valid_f1': [],
        'valid_precision': [],
        'valid_recall': [],
        'lr': [],
    }

    run_log_dir = (LOG_DIR / run_name).resolve()
    writer = SummaryWriter(run_log_dir.as_posix()) if tensorboard else None

    best_metric = -np.inf
    best_state: Optional[Dict] = None
    patience = config.get('patience', 10)
    patience_counter = 0
    checkpoint_path = (CHECKPOINT_DIR / f'{run_name}.pt').resolve()

    for epoch in range(1, config['epochs'] + 1):
        train_loss, train_metrics = train_one_epoch(
            model,
            train_loader,
            criterion,
            optimizer,
            scaler,
            max_grad_norm=config.get('max_grad_norm', 5.0),
        )
        valid_loss, valid_metrics, preds, targets = evaluate_epoch(
            model,
            valid_loader,
            criterion,
        )
        scheduler.step(valid_metrics['f1'])

        current_lr = optimizer.param_groups[0]['lr']

        history['train_loss'].append(train_loss)
        history['train_accuracy'].append(train_metrics['accuracy'])
        history['train_f1'].append(train_metrics['f1'])
        history['train_precision'].append(train_metrics['precision'])
        history['train_recall'].append(train_metrics['recall'])
        history['valid_loss'].append(valid_loss)
        history['valid_accuracy'].append(valid_metrics['accuracy'])
        history['valid_f1'].append(valid_metrics['f1'])
        history['valid_precision'].append(valid_metrics['precision'])
        history['valid_recall'].append(valid_metrics['recall'])
        history['lr'].append(current_lr)

        if writer is not None:
            writer.add_scalar('Loss/train', train_loss, epoch)
            writer.add_scalar('Loss/valid', valid_loss, epoch)
            writer.add_scalar('F1/train', train_metrics['f1'], epoch)
            writer.add_scalar('F1/valid', valid_metrics['f1'], epoch)
            writer.add_scalar('Accuracy/train', train_metrics['accuracy'], epoch)
            writer.add_scalar('Accuracy/valid', valid_metrics['accuracy'], epoch)
            writer.add_scalar('LearningRate', current_lr, epoch)

        msg = (
            f"Epoch {epoch:03d} | "
            f"train_loss={train_loss:.4f} acc={train_metrics['accuracy']:.3f} f1={train_metrics['f1']:.3f} | "
            f"valid_loss={valid_loss:.4f} acc={valid_metrics['accuracy']:.3f} f1={valid_metrics['f1']:.3f} | "
            f"lr={current_lr:.2e}"
        )
        print(msg)

        if valid_metrics['f1'] > best_metric + config.get('min_improvement', 0.0):
            best_metric = valid_metrics['f1']
            patience_counter = 0
            best_state = {
                'epoch': epoch,
                'model_state': copy.deepcopy(model.state_dict()),
                'optimizer_state': copy.deepcopy(optimizer.state_dict()),
                'metrics': valid_metrics,
                'train_metrics': train_metrics,
                'preds': preds,
                'targets': targets,
            }
            torch.save(best_state['model_state'], checkpoint_path)
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping triggered at epoch {epoch}. Best f1={best_metric:.4f}.")
                break

    if writer is not None:
        writer.close()

    if best_state is None:
        best_state = {
            'epoch': config['epochs'],
            'model_state': copy.deepcopy(model.state_dict()),
            'optimizer_state': copy.deepcopy(optimizer.state_dict()),
            'metrics': valid_metrics,
            'train_metrics': train_metrics,
            'preds': preds,
            'targets': targets,
        }
        torch.save(best_state['model_state'], checkpoint_path)

    model.load_state_dict(best_state['model_state'])
    best_state.update(
        {
            'run_name': run_name,
            'config': copy.deepcopy(config),
            'history': history,
            'checkpoint_path': checkpoint_path,
            'best_f1': best_metric,
        }
    )
    return model, history, best_state



In [None]:
def prepare_config(name: str, overrides: Dict) -> Dict:
    base_config = {
        'run_name': name,
        'rnn_type': 'lstm',
        'hidden_size': 192,
        'num_layers': 2,
        'dropout': 0.4,
        'bidirectional': True,
        'lr': 2e-3,
        'weight_decay': 1e-4,
        'epochs': 80,
        'batch_size': 128,
        'valid_size': 0.2,
        'patience': 10,
        'max_grad_norm': 5.0,
        'scheduler_factor': 0.5,
        'scheduler_patience': 3,
        'min_improvement': 1e-4,
        'tensorboard': True,
    }
    config = copy.deepcopy(base_config)
    config.update(overrides)
    return config


def run_experiment(config: Dict) -> Dict:
    run_name = config.get('run_name') or f"{config['rnn_type'].upper()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    config = copy.deepcopy(config)
    config['run_name'] = run_name

    train_loader, valid_loader, (X_tr, y_tr, X_val, y_val) = create_dataloaders(
        X_train_np,
        y_train_idx,
        valid_size=config.get('valid_size', 0.2),
        batch_size=config['batch_size'],
    )

    model, history, best_state = fit_model(
        config,
        train_loader,
        valid_loader,
        run_name=run_name,
        tensorboard=config.get('tensorboard', True),
    )

    best_state['data_split'] = {
        'X_train': X_tr,
        'y_train': y_tr,
        'X_valid': X_val,
        'y_valid': y_val,
    }
    best_state['model'] = model
    best_state['history'] = history
    return best_state



In [None]:
EXPERIMENT_CONFIGS = [
    prepare_config('LSTM_BASE', {}),
    prepare_config('GRU_BASE', {'rnn_type': 'gru', 'bidirectional': True}),
]

experiment_results: List[Dict] = []
for cfg in EXPERIMENT_CONFIGS:
    print(f"\n=== Running experiment: {cfg['run_name']} ({cfg['rnn_type'].upper()}) ===")
    result = run_experiment(cfg)
    experiment_results.append(result)
    print(
        f"Best validation F1: {result['best_f1']:.4f} at epoch {result['epoch']} | "
        f"Accuracy: {result['metrics']['accuracy']:.4f}"
    )



In [None]:
summary_rows = []
for res in experiment_results:
    cfg = res['config']
    metrics = res['metrics']
    row = {
        'run_name': res['run_name'],
        'rnn_type': cfg['rnn_type'],
        'bidirectional': cfg.get('bidirectional', False),
        'hidden_size': cfg['hidden_size'],
        'num_layers': cfg['num_layers'],
        'dropout': cfg['dropout'],
        'best_epoch': res['epoch'],
        'best_f1': res['best_f1'],
        'accuracy': metrics['accuracy'],
        'precision': metrics['precision'],
        'recall': metrics['recall'],
        'checkpoint': str(res['checkpoint_path']),
    }
    summary_rows.append(row)

summary_df = pd.DataFrame(summary_rows)
summary_table = summary_df.sort_values(by='best_f1', ascending=False).reset_index(drop=True)
summary_table



In [None]:
if not experiment_results:
    raise RuntimeError('No experiments have been executed. Run the experiment cell above first.')

best_run = max(experiment_results, key=lambda x: x['best_f1'])
best_model = best_run['model']
best_history = best_run['history']
print(
    f"Selected best run: {best_run['run_name']} | "
    f"F1={best_run['best_f1']:.4f} | Accuracy={best_run['metrics']['accuracy']:.4f}"
)



In [None]:
def plot_history(history: Dict[str, List[float]], title: str = 'Learning Curves'):
    epochs = range(1, len(history['train_loss']) + 1)
    fig, axs = plt.subplots(1, 3, figsize=(18, 4))

    axs[0].plot(epochs, history['train_loss'], label='Train')
    axs[0].plot(epochs, history['valid_loss'], label='Valid')
    axs[0].set_title('Loss')
    axs[0].set_xlabel('Epoch')
    axs[0].legend()

    axs[1].plot(epochs, history['train_accuracy'], label='Train')
    axs[1].plot(epochs, history['valid_accuracy'], label='Valid')
    axs[1].set_title('Accuracy')
    axs[1].set_xlabel('Epoch')
    axs[1].legend()

    axs[2].plot(epochs, history['train_f1'], label='Train F1')
    axs[2].plot(epochs, history['valid_f1'], label='Valid F1')
    axs[2].set_title('Macro F1')
    axs[2].set_xlabel('Epoch')
    axs[2].legend()

    plt.suptitle(title)
    plt.tight_layout()
    plt.show()


plot_history(best_history, title=f"Learning Curves ‚Äî {best_run['run_name']}")



In [None]:
best_preds = best_run['preds']
best_targets = best_run['targets']
print(f"Best validation macro F1: {best_run['best_f1']:.3f}")
print(
    classification_report(
        best_targets,
        best_preds,
        target_names=[IDX2LABEL[i] for i in range(NUM_CLASSES)],
    )
)

cf = confusion_matrix(best_targets, best_preds)
plt.figure(figsize=(6, 5))
sns.heatmap(
    cf,
    annot=True,
    fmt='d',
    cmap='Blues',
    xticklabels=[IDX2LABEL[i] for i in range(NUM_CLASSES)],
    yticklabels=[IDX2LABEL[i] for i in range(NUM_CLASSES)],
)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title(f"Validation Confusion Matrix ‚Äî {best_run['run_name']}")
plt.show()



In [None]:
test_loader = DataLoader(TimeSeriesDataset(X_test_np), batch_size=256, shuffle=False)

best_model.eval()
test_preds = []
with torch.no_grad():
    for inputs in test_loader:
        inputs = inputs.to(DEVICE)
        logits = best_model(inputs)
        test_preds.append(torch.argmax(logits, dim=1).cpu().numpy())

test_preds = np.concatenate(test_preds)
test_labels = [IDX2LABEL[idx] for idx in test_preds]

submission = pd.DataFrame({
    'sample_index': pd.unique(X_test_raw['sample_index']),
    'label': test_labels,
})
submission_filename = OUTPUT_DIR / f"submission_{best_run['run_name'].lower()}.csv"
submission.to_csv(submission_filename, index=False)
print(f"Saved submission to {submission_filename}")
submission.head()



## Next Steps

- Aggiungere nuovi esperimenti modificando `EXPERIMENT_CONFIGS` (es. diverse profondit√†, dropout, versioni mono/bidirezionali).
- Integrare una procedura di `StratifiedKFold` o `GroupKFold` per mediare su pi√π split e ridurre la varianza della validazione.
- Loggare e confrontare i risultati in `summary_table` (esportarli in CSV per il report finale).
- Provare tecniche di regularizzazione aggiuntive (label smoothing, mixup temporale) o layer di attenzione per migliorare l'F1.
- Consolidare un ensemble (media delle probabilit√† di pi√π checkpoint) prima della submission Kaggle definitiva.

