## 1. Environment Setup

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Set working directory - modify to your project path
import os
PROJECT_PATH = '/content/drive/MyDrive/aml-2025-mistake-detection-gp'  # Change to your path
os.chdir(PROJECT_PATH)
print(f"Current working directory: {os.getcwd()}")

In [None]:
# Install dependencies
!pip install wandb torcheval tqdm -q

In [None]:
# Import required libraries
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, accuracy_score

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

## 2. Define New LSTM/GRU Models

This is the core of Task 2.b: implementing new baseline models

In [None]:
# Import existing project modules
from core.models.blocks import fetch_input_dim, MLP
from constants import Constants as const

In [None]:
class ErLSTM(nn.Module):
    """
    LSTM-based baseline for error recognition.
    Task 2.b: New baseline model using bidirectional LSTM.
    """

    def __init__(self, config, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.config = config
        input_dimension = fetch_input_dim(config)

        # LSTM parameters
        self.hidden_size = 512
        self.num_layers = 2
        self.bidirectional = True
        
        # LSTM encoder
        self.lstm = nn.LSTM(
            input_size=input_dimension,
            hidden_size=self.hidden_size,
            num_layers=self.num_layers,
            batch_first=True,
            dropout=0.3 if self.num_layers > 1 else 0,
            bidirectional=self.bidirectional
        )
        
        # LSTM output dimension
        lstm_output_dim = self.hidden_size * 2 if self.bidirectional else self.hidden_size
        
        # MLP decoder for classification
        self.decoder = MLP(lstm_output_dim, 256, 1)
        self.dropout = nn.Dropout(0.3)

    def forward(self, input_data):
        # Handle NaN values
        input_data = torch.nan_to_num(input_data, nan=0.0, posinf=1.0, neginf=-1.0)

        # Add sequence dimension if needed
        if len(input_data.shape) == 2:
            input_data = input_data.unsqueeze(1)
        
        # LSTM forward pass
        lstm_out, (hidden, cell) = self.lstm(input_data)
        
        # Concatenate forward and backward hidden states for bidirectional
        if self.bidirectional:
            forward_hidden = hidden[-2, :, :]
            backward_hidden = hidden[-1, :, :]
            final_hidden = torch.cat([forward_hidden, backward_hidden], dim=1)
        else:
            final_hidden = hidden[-1, :, :]
        
        # Apply dropout and decode
        final_hidden = self.dropout(final_hidden)
        output = self.decoder(final_hidden)

        return output


class ErGRU(nn.Module):
    """
    GRU-based baseline for error recognition.
    Task 2.b: Alternative baseline model (lighter than LSTM).
    """

    def __init__(self, config, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.config = config
        input_dimension = fetch_input_dim(config)

        # GRU parameters
        self.hidden_size = 512
        self.num_layers = 2
        self.bidirectional = True
        
        # GRU encoder
        self.gru = nn.GRU(
            input_size=input_dimension,
            hidden_size=self.hidden_size,
            num_layers=self.num_layers,
            batch_first=True,
            dropout=0.3 if self.num_layers > 1 else 0,
            bidirectional=self.bidirectional
        )
        
        # GRU output dimension
        gru_output_dim = self.hidden_size * 2 if self.bidirectional else self.hidden_size
        
        # MLP decoder for classification
        self.decoder = MLP(gru_output_dim, 256, 1)
        self.dropout = nn.Dropout(0.3)

    def forward(self, input_data):
        # Handle NaN values
        input_data = torch.nan_to_num(input_data, nan=0.0, posinf=1.0, neginf=-1.0)

        # Add sequence dimension if needed
        if len(input_data.shape) == 2:
            input_data = input_data.unsqueeze(1)
        
        # GRU forward pass
        gru_out, hidden = self.gru(input_data)
        
        # Concatenate forward and backward hidden states for bidirectional
        if self.bidirectional:
            forward_hidden = hidden[-2, :, :]
            backward_hidden = hidden[-1, :, :]
            final_hidden = torch.cat([forward_hidden, backward_hidden], dim=1)
        else:
            final_hidden = hidden[-1, :, :]
        
        # Apply dropout and decode
        final_hidden = self.dropout(final_hidden)
        output = self.decoder(final_hidden)

        return output


print("LSTM and GRU models defined successfully")

## 3. Update Model Loading Function

Modify `fetch_model` function to support new models

In [None]:
# Add new constants
const.LSTM_VARIANT = "LSTM"
const.GRU_VARIANT = "GRU"

# Import existing models
from core.models.er_former import ErFormer

def fetch_model_extended(config):
    """
    Extended model loading function with LSTM and GRU support.
    """
    model = None
    
    if config.variant == const.MLP_VARIANT:
        input_dim = fetch_input_dim(config)
        model = MLP(input_dim, 512, 1)
        
    elif config.variant == const.TRANSFORMER_VARIANT:
        model = ErFormer(config)
        
    elif config.variant == const.LSTM_VARIANT:
        model = ErLSTM(config)
        
    elif config.variant == const.GRU_VARIANT:
        model = ErGRU(config)

    assert model is not None, f"Model not found: {config.variant}"
    model.to(config.device)
    return model

print("Model loading function extended successfully")

## 4. Configure Experiment Parameters

In [None]:
from dataclasses import dataclass
from typing import Optional, List

@dataclass
class ExperimentConfig:
    """Experiment configuration dataclass"""
    # Model settings
    backbone: str = "omnivore"  # omnivore or slowfast
    variant: str = "LSTM"       # MLP, Transformer, LSTM, GRU
    modality: List[str] = None
    
    # Data settings
    split: str = "recordings"   # recordings, person, environment
    segment_features_directory: str = "data/"
    
    # Training settings
    num_epochs: int = 20
    batch_size: int = 1
    lr: float = 1e-3
    weight_decay: float = 1e-3
    
    # Other settings
    device: str = "cuda" if torch.cuda.is_available() else "cpu"
    seed: int = 42
    task_name: str = "error_recognition"
    ckpt_directory: str = "checkpoints/"
    model_name: Optional[str] = None
    enable_wandb: bool = False  # Can disable for Colab
    
    def __post_init__(self):
        if self.modality is None:
            self.modality = ["video"]

# Create configuration
config = ExperimentConfig()
print(f"\nExperiment Configuration:")
print(f"  Model: {config.variant}")
print(f"  Backbone: {config.backbone}")
print(f"  Split: {config.split}")
print(f"  Epochs: {config.num_epochs}")
print(f"  Device: {config.device}")

## 5. Training Functions

In [None]:
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm.notebook import tqdm
from torcheval.metrics.functional import binary_auprc

from dataloader.CaptainCookStepDataset import CaptainCookStepDataset, collate_fn


def train_and_evaluate(config, model_variant):
    """
    Train and evaluate a model.
    
    Args:
        config: Experiment configuration
        model_variant: Model type (MLP, Transformer, LSTM, GRU)
    
    Returns:
        test_metrics: Dictionary of test metrics
        history: Training history dictionary
    """
    config.variant = model_variant
    print(f"\n{'='*60}")
    print(f"Training {model_variant} model")
    print(f"{'='*60}")
    
    # Set random seeds for reproducibility
    torch.manual_seed(config.seed)
    np.random.seed(config.seed)
    
    # Load datasets
    print("Loading datasets...")
    train_dataset = CaptainCookStepDataset(config, 'train', config.split)
    val_dataset = CaptainCookStepDataset(config, 'val', config.split)
    test_dataset = CaptainCookStepDataset(config, 'test', config.split)
    
    train_loader = DataLoader(train_dataset, batch_size=config.batch_size, 
                              shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=config.batch_size, 
                            collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=config.batch_size, 
                             collate_fn=collate_fn)
    
    print(f"Training set: {len(train_dataset)} samples")
    print(f"Validation set: {len(val_dataset)} samples")
    print(f"Test set: {len(test_dataset)} samples")
    
    # Create model
    model = fetch_model_extended(config)
    print(f"\nModel parameters: {sum(p.numel() for p in model.parameters()):,}")
    
    # Optimizer and loss function
    optimizer = torch.optim.Adam(model.parameters(), lr=config.lr, 
                                  weight_decay=config.weight_decay)
    criterion = nn.BCEWithLogitsLoss(
        pos_weight=torch.tensor([2.5], dtype=torch.float32).to(config.device)
    )
    scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.1, 
                                   patience=5, verbose=True)
    
    # Training history
    history = {'train_loss': [], 'val_loss': [], 'val_f1': [], 'val_auc': []}
    best_val_f1 = 0
    
    # Training loop
    for epoch in range(1, config.num_epochs + 1):
        # Training phase
        model.train()
        train_losses = []
        
        pbar = tqdm(train_loader, desc=f'Epoch {epoch}/{config.num_epochs}')
        for data, target in pbar:
            data, target = data.to(config.device), target.to(config.device)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            
            if torch.isnan(loss):
                continue
                
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            
            train_losses.append(loss.item())
            pbar.set_postfix({'loss': f'{loss.item():.4f}'})
        
        avg_train_loss = np.mean(train_losses)
        history['train_loss'].append(avg_train_loss)
        
        # Validation phase
        val_metrics = evaluate_model(model, val_loader, criterion, config.device)
        history['val_loss'].append(val_metrics['loss'])
        history['val_f1'].append(val_metrics['f1'])
        history['val_auc'].append(val_metrics['auc'])
        
        scheduler.step(val_metrics['f1'])
        
        print(f"Epoch {epoch}: Train Loss={avg_train_loss:.4f}, "
              f"Val Loss={val_metrics['loss']:.4f}, "
              f"Val F1={val_metrics['f1']:.4f}, "
              f"Val AUC={val_metrics['auc']:.4f}")
        
        # Save best model
        if val_metrics['f1'] > best_val_f1:
            best_val_f1 = val_metrics['f1']
            torch.save(model.state_dict(), f'best_{model_variant.lower()}_model.pth')
    
    # Test phase
    print(f"\nLoading best model for testing...")
    model.load_state_dict(torch.load(f'best_{model_variant.lower()}_model.pth'))
    test_metrics = evaluate_model(model, test_loader, criterion, config.device)
    
    print(f"\n{'='*40}")
    print(f"{model_variant} Test Results:")
    print(f"{'='*40}")
    for metric, value in test_metrics.items():
        print(f"  {metric}: {value:.4f}")
    
    return test_metrics, history


def evaluate_model(model, data_loader, criterion, device, threshold=0.5):
    """
    Evaluate model on a dataset.
    
    Args:
        model: PyTorch model
        data_loader: DataLoader for evaluation
        criterion: Loss function
        device: Device to use
        threshold: Classification threshold
    
    Returns:
        metrics: Dictionary of evaluation metrics
    """
    model.eval()
    all_preds = []
    all_targets = []
    all_probs = []
    total_loss = 0
    
    with torch.no_grad():
        for data, target in data_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            
            loss = criterion(output, target)
            total_loss += loss.item()
            
            probs = torch.sigmoid(output)
            preds = (probs >= threshold).float()
            
            all_probs.extend(probs.cpu().numpy().flatten())
            all_preds.extend(preds.cpu().numpy().flatten())
            all_targets.extend(target.cpu().numpy().flatten())
    
    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)
    all_probs = np.array(all_probs)
    
    # Calculate metrics
    metrics = {
        'loss': total_loss / len(data_loader),
        'accuracy': accuracy_score(all_targets, all_preds),
        'precision': precision_score(all_targets, all_preds, zero_division=0),
        'recall': recall_score(all_targets, all_preds, zero_division=0),
        'f1': f1_score(all_targets, all_preds, zero_division=0),
        'auc': roc_auc_score(all_targets, all_probs) if len(np.unique(all_targets)) > 1 else 0.5,
    }
    
    return metrics

print("Training and evaluation functions defined successfully")

## 6. Run Experiment: Train All Baseline Models

In [None]:
# Store results for all models
all_results = {}
all_histories = {}

# Models to compare
models_to_compare = ['MLP', 'Transformer', 'LSTM', 'GRU']

# If you only want to test new models, train only LSTM and GRU
# models_to_compare = ['LSTM', 'GRU']

In [None]:
# Train all models
for model_name in models_to_compare:
    try:
        results, history = train_and_evaluate(config, model_name)
        all_results[model_name] = results
        all_histories[model_name] = history
    except Exception as e:
        print(f"Error training {model_name}: {e}")
        continue

print("\n" + "="*60)
print("All models trained successfully!")
print("="*60)

## 7. Results Comparison Analysis

In [None]:
# Create results comparison table
results_df = pd.DataFrame(all_results).T
results_df = results_df.round(4)

# Convert to percentage display
results_pct = results_df.copy()
for col in ['accuracy', 'precision', 'recall', 'f1', 'auc']:
    if col in results_pct.columns:
        results_pct[col] = (results_pct[col] * 100).round(2)

print("\n" + "="*70)
print("Model Comparison Results (Test Set, %)")
print("="*70)
print(results_pct.to_string())
print("="*70)

In [None]:
# Visualization comparison
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
metrics = ['accuracy', 'precision', 'recall', 'f1', 'auc', 'loss']
colors = ['#2ecc71', '#3498db', '#e74c3c', '#9b59b6']

for idx, metric in enumerate(metrics):
    ax = axes[idx // 3, idx % 3]
    
    if metric in results_df.columns:
        values = [results_df.loc[m, metric] if m in results_df.index else 0 
                  for m in models_to_compare]
        
        bars = ax.bar(models_to_compare, values, color=colors[:len(models_to_compare)])
        ax.set_title(metric.upper(), fontsize=12, fontweight='bold')
        ax.set_ylim([0, 1.1 if metric != 'loss' else max(values) * 1.2])
        ax.grid(axis='y', alpha=0.3)
        
        # Add value labels
        for bar, val in zip(bars, values):
            ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.02,
                   f'{val:.3f}', ha='center', va='bottom', fontsize=10)

plt.suptitle(f'Baseline Model Comparison ({config.backbone}, {config.split})', 
             fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig('baseline_comparison.png', dpi=300, bbox_inches='tight')
plt.show()

print("\nChart saved as baseline_comparison.png")

In [None]:
# Plot training curves
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Loss curves
ax1 = axes[0]
for model_name, history in all_histories.items():
    ax1.plot(history['train_loss'], label=f'{model_name} (train)', linestyle='-')
    ax1.plot(history['val_loss'], label=f'{model_name} (val)', linestyle='--')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training & Validation Loss')
ax1.legend()
ax1.grid(alpha=0.3)

# F1 curves
ax2 = axes[1]
for model_name, history in all_histories.items():
    ax2.plot(history['val_f1'], label=model_name)
ax2.set_xlabel('Epoch')
ax2.set_ylabel('F1 Score')
ax2.set_title('Validation F1 Score')
ax2.legend()
ax2.grid(alpha=0.3)

plt.tight_layout()
plt.savefig('training_curves.png', dpi=300, bbox_inches='tight')
plt.show()

## 8. Analysis Summary

In [None]:
# Print analysis summary
print("\n" + "="*70)
print("Task 2.b Experiment Analysis Summary")
print("="*70)

if len(all_results) > 0:
    # Find best models
    best_f1_model = max(all_results.items(), key=lambda x: x[1].get('f1', 0))
    best_auc_model = max(all_results.items(), key=lambda x: x[1].get('auc', 0))
    best_acc_model = max(all_results.items(), key=lambda x: x[1].get('accuracy', 0))
    
    print(f"\nBest Models:")
    print(f"  - Highest F1 Score: {best_f1_model[0]} ({best_f1_model[1]['f1']*100:.2f}%)")
    print(f"  - Highest AUC: {best_auc_model[0]} ({best_auc_model[1]['auc']*100:.2f}%)")
    print(f"  - Highest Accuracy: {best_acc_model[0]} ({best_acc_model[1]['accuracy']*100:.2f}%)")
    
    # Compare LSTM/GRU with existing baselines
    print(f"\nNew Models vs Existing Baselines:")
    
    if 'MLP' in all_results and 'LSTM' in all_results:
        lstm_vs_mlp = all_results['LSTM']['f1'] - all_results['MLP']['f1']
        print(f"  - LSTM vs MLP (F1): {'+' if lstm_vs_mlp >= 0 else ''}{lstm_vs_mlp*100:.2f}%")
    
    if 'Transformer' in all_results and 'LSTM' in all_results:
        lstm_vs_trans = all_results['LSTM']['f1'] - all_results['Transformer']['f1']
        print(f"  - LSTM vs Transformer (F1): {'+' if lstm_vs_trans >= 0 else ''}{lstm_vs_trans*100:.2f}%")
    
    if 'LSTM' in all_results and 'GRU' in all_results:
        gru_vs_lstm = all_results['GRU']['f1'] - all_results['LSTM']['f1']
        print(f"  - GRU vs LSTM (F1): {'+' if gru_vs_lstm >= 0 else ''}{gru_vs_lstm*100:.2f}%")

print("\n" + "="*70)

In [None]:
# Save results to CSV
results_df.to_csv('task2b_results.csv')
print("Results saved to task2b_results.csv")

# Display final results table
print("\nFinal Results Table (for report):")
print(results_pct.to_markdown() if hasattr(results_pct, 'to_markdown') else results_pct.to_string())

## 9. Experiment Conclusions

Based on the experimental results above, fill in the following conclusions:

### Model Performance Comparison
| Model | F1 | AUC | Precision | Recall |
|-------|----|----|-----------|--------|
| MLP (V1) | xx% | xx% | xx% | xx% |
| Transformer (V2) | xx% | xx% | xx% | xx% |
| **LSTM (New)** | xx% | xx% | xx% | xx% |
| **GRU (New)** | xx% | xx% | xx% | xx% |

### Key Findings
1. LSTM/GRU improvement over MLP: ...
2. LSTM/GRU improvement over Transformer: ...
3. Effect of sequence modeling on error detection: ...

### Recommendations
- Recommended model for deployment: ...
- Potential improvements: ...