# 03 - Training Stray Pose Classifier

Training del classificatore MLP per identificare posture "stray-like" dai keypoints.

## IMPORTANTE: 24 Keypoints Dog-Pose

Questo notebook usa i **24 keypoints anatomici del cane** estratti dal modello `yolo11n-dog-pose.pt`.

**PREREQUISITO**: Esegui prima:
1. `00a_yolo_dog_pose_training.ipynb` - per addestrare il modello dog-pose
2. `00_keypoints_extraction.ipynb` - per estrarre i keypoints dai dataset

## Approccio Weak Supervision

Questo è il **contributo originale** del progetto:
- I label NON sono annotati manualmente
- Derivano dall'**origine del dataset**:
  - FYP Dataset (cani randagi) → Label = 1 (Stray)
  - Stanford Dogs / Skin Diseases (cani padronali) → Label = 0 (Owned)

## Architettura
- **Input**: 24 keypoints × 3 valori (x, y, visibility) = **72 features**
- **Model**: MLP (72 → 128 → 64 → 1)
- **Output**: P(stray_pose) ∈ [0, 1]

## Keypoints del Cane (24)
```
 0: nose            8: withers           16: right_back_elbow
 1: left_eye        9: left_front_elbow  17: left_back_knee
 2: right_eye      10: right_front_elbow 18: right_back_knee
 3: left_ear_base  11: left_front_knee   19: left_back_paw
 4: right_ear_base 12: right_front_knee  20: right_back_paw
 5: left_ear_tip   13: left_front_paw    21: tail_start
 6: right_ear_tip  14: right_front_paw   22: tail_end
 7: throat         15: left_back_elbow   23: chin
```

In [None]:
# Installazione dipendenze
%pip install torch numpy pandas scikit-learn matplotlib seaborn tqdm -q

In [None]:
import os
import sys
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from tqdm.auto import tqdm
import json

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    classification_report, confusion_matrix, 
    roc_auc_score, roc_curve, accuracy_score, f1_score
)

print(f"Python: {sys.version}")
print(f"PyTorch: {torch.__version__}")

In [None]:
# Device configuration
if torch.cuda.is_available():
    DEVICE = torch.device('cuda')
elif torch.backends.mps.is_available():
    DEVICE = torch.device('mps')
else:
    DEVICE = torch.device('cpu')

print(f"Using device: {DEVICE}")

In [None]:
# Configurazione paths - RELATIVI per portabilità
import sys
sys.path.insert(0, str(Path.cwd()))
try:
    from notebook_utils import get_paths, get_device, print_paths
    paths = get_paths()
    print_paths(paths)
except ImportError:
    print("notebook_utils.py non trovato, usando fallback...")
    NOTEBOOK_DIR = Path.cwd()
    if NOTEBOOK_DIR.name == "notebooks":
        PROJECT_DIR = NOTEBOOK_DIR.parent.parent
    elif NOTEBOOK_DIR.name == "training":
        PROJECT_DIR = NOTEBOOK_DIR.parent
    else:
        PROJECT_DIR = NOTEBOOK_DIR
        while PROJECT_DIR.name != "ResQPet" and PROJECT_DIR.parent != PROJECT_DIR:
            PROJECT_DIR = PROJECT_DIR.parent
    BASE_DIR = PROJECT_DIR.parent
    paths = {
        'project_dir': PROJECT_DIR,
        'base_dir': BASE_DIR,
        'weights_dir': PROJECT_DIR / "backend" / "weights",
        'data_dir': PROJECT_DIR / "data",
    }
    paths['weights_dir'].mkdir(parents=True, exist_ok=True)

# Assegna variabili per retrocompatibilità
BASE_DIR = paths['base_dir']
DATA_DIR = paths['data_dir'] / "keypoints"
OUTPUT_DIR = paths['weights_dir']

print(f"\nData dir: {DATA_DIR}")
print(f"Output dir: {OUTPUT_DIR}")

## 1. Caricamento Dataset Keypoints

Il dataset deve essere stato generato dal notebook `00_keypoints_extraction.ipynb`

In [None]:
# Carica dataset
dataset_path = DATA_DIR / 'pose_keypoints_dataset.csv'

if dataset_path.exists():
    df = pd.read_csv(dataset_path)
    print(f"Dataset caricato: {len(df)} samples")
    print(f"Colonne: {len(df.columns)}")
    print(f"\nDistribuzione label:")
    print(df['label'].value_counts())
else:
    print(f"Dataset non trovato: {dataset_path}")
    print("\nEsegui prima il notebook 00_keypoints_extraction.ipynb")
    print("\nPer ora, creiamo un dataset sintetico per demo...")
    
    # Crea dataset sintetico per demo
    # IMPORTANTE: 24 keypoints come da documentazione (dog-pose, NON human-pose)
    np.random.seed(42)
    n_samples = 2000
    n_keypoints = 24  # 24 keypoints anatomici del cane (NON 17 di human-pose!)
    
    # Simula keypoints per cani stray (posture più rannicchiate)
    stray_kpts = np.random.randn(n_samples // 2, n_keypoints * 3) * 0.3
    stray_kpts[:, 1::3] += 0.2  # y più alto (testa bassa)
    stray_labels = np.ones(n_samples // 2)
    
    # Simula keypoints per cani owned (posture più aperte)
    owned_kpts = np.random.randn(n_samples // 2, n_keypoints * 3) * 0.3
    owned_kpts[:, 0::3] += 0.1  # x più largo
    owned_labels = np.zeros(n_samples // 2)
    
    # Combina
    X_synthetic = np.vstack([stray_kpts, owned_kpts])
    y_synthetic = np.concatenate([stray_labels, owned_labels])
    
    # Crea DataFrame con nomi keypoints corretti
    columns = ['label'] + [f'kpt_{i}_{c}' for i in range(n_keypoints) for c in ['x', 'y', 'v']]
    df = pd.DataFrame(
        np.column_stack([y_synthetic, X_synthetic]),
        columns=columns
    )
    
    print(f"Dataset sintetico creato: {len(df)} samples")
    print(f"Keypoints: {n_keypoints} (24 anatomici del cane)")
    print(f"Features: {n_keypoints * 3} (72 = 24 × 3)")

In [None]:
# Visualizza prime righe
print(df.head())
print(f"\nShape: {df.shape}")

## 2. Preparazione Features

In [None]:
# Estrai features (colonne keypoints)
kpt_columns = [c for c in df.columns if c.startswith('kpt_')]
print(f"Keypoint columns: {len(kpt_columns)}")

# Features e labels
X = df[kpt_columns].values.astype(np.float32)
y = df['label'].values.astype(np.float32)

print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")
print(f"Class distribution: {np.bincount(y.astype(int))}")

In [None]:
# Gestisci valori mancanti
print(f"NaN values: {np.isnan(X).sum()}")
X = np.nan_to_num(X, nan=0.0)

# Split train/val/test
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.3, random_state=42, stratify=y
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")

In [None]:
# Normalizzazione
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

print(f"Train mean: {X_train_scaled.mean():.4f}, std: {X_train_scaled.std():.4f}")

In [None]:
# Crea DataLoaders
BATCH_SIZE = 64

train_dataset = TensorDataset(
    torch.FloatTensor(X_train_scaled),
    torch.FloatTensor(y_train)
)
val_dataset = TensorDataset(
    torch.FloatTensor(X_val_scaled),
    torch.FloatTensor(y_val)
)
test_dataset = TensorDataset(
    torch.FloatTensor(X_test_scaled),
    torch.FloatTensor(y_test)
)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")
print(f"Test batches: {len(test_loader)}")

## 3. Modello MLP

In [None]:
class StrayPoseMLP(nn.Module):
    """
    MLP per classificazione postura stray/owned.
    
    Architettura:
    - Input: keypoints flattened (24 keypoints × 3 = 72 features per dog-pose)
    - Hidden: 128 → 64 con ReLU, BatchNorm, Dropout
    - Output: 1 (sigmoid per probabilità)
    
    NOTA: L'ordine dei layer è Linear → ReLU → BatchNorm → Dropout
    per corrispondere al modello nel backend.
    """
    
    def __init__(self, input_dim=72, hidden_dims=[128, 64], dropout=0.3):
        super().__init__()
        
        layers = []
        prev_dim = input_dim
        
        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.ReLU(),
                nn.BatchNorm1d(hidden_dim),
                nn.Dropout(dropout)
            ])
            prev_dim = hidden_dim
        
        layers.append(nn.Linear(prev_dim, 1))
        layers.append(nn.Sigmoid())
        
        self.model = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.model(x).squeeze(-1)


# Crea modello
# INPUT_DIM viene calcolato dal dataset (dovrebbe essere 72 per 24 keypoints × 3)
INPUT_DIM = X_train_scaled.shape[1]

# Verifica che sia 72 (24 keypoints × 3)
expected_dim = 24 * 3  # 72
if INPUT_DIM != expected_dim:
    print(f"⚠️ ATTENZIONE: INPUT_DIM = {INPUT_DIM}, ma ci si aspetta {expected_dim} (24 keypoints × 3)")
    print(f"   Probabilmente il dataset è stato estratto con il modello sbagliato.")
    print(f"   Ri-esegui 00_keypoints_extraction.ipynb con yolo11n-dog-pose.pt")
else:
    print(f"✓ INPUT_DIM = {INPUT_DIM} (24 keypoints × 3) - Corretto!")

model = StrayPoseMLP(input_dim=INPUT_DIM, hidden_dims=[128, 64], dropout=0.3)
model = model.to(DEVICE)

print(f"\nModel:")
print(model)
print(f"\nParameters: {sum(p.numel() for p in model.parameters()):,}")

## 4. Training

In [None]:
# Training configuration
EPOCHS = 100
LEARNING_RATE = 1e-3
WEIGHT_DECAY = 1e-4
PATIENCE = 15

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5)

In [None]:
def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    all_preds = []
    all_labels = []
    
    for X_batch, y_batch in loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        all_preds.extend(outputs.detach().cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())
    
    # Calcola metriche
    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)
    auc = roc_auc_score(all_labels, all_preds)
    acc = accuracy_score(all_labels, (all_preds > 0.5).astype(int))
    
    return total_loss / len(loader), acc, auc


def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for X_batch, y_batch in loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            
            total_loss += loss.item()
            all_preds.extend(outputs.cpu().numpy())
            all_labels.extend(y_batch.cpu().numpy())
    
    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)
    auc = roc_auc_score(all_labels, all_preds)
    acc = accuracy_score(all_labels, (all_preds > 0.5).astype(int))
    
    return total_loss / len(loader), acc, auc, all_preds, all_labels

In [None]:
# Training loop
print("="*50)
print("INIZIO TRAINING STRAY POSE CLASSIFIER")
print("="*50)
print(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"\nApproccio: WEAK SUPERVISION")
print(f"  - Stray labels: da FYP Dataset")
print(f"  - Owned labels: da Stanford Dogs / Skin Diseases")
print()

history = {
    'train_loss': [], 'train_acc': [], 'train_auc': [],
    'val_loss': [], 'val_acc': [], 'val_auc': []
}

best_auc = 0
patience_counter = 0
best_model_path = OUTPUT_DIR / 'stray_pose_classifier_best.pt'

for epoch in range(EPOCHS):
    # Train
    train_loss, train_acc, train_auc = train_epoch(
        model, train_loader, criterion, optimizer, DEVICE
    )
    
    # Validate
    val_loss, val_acc, val_auc, _, _ = evaluate(
        model, val_loader, criterion, DEVICE
    )
    
    # Update scheduler
    scheduler.step(val_auc)
    
    # Log
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['train_auc'].append(train_auc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    history['val_auc'].append(val_auc)
    
    # Print progress ogni 10 epoche
    if (epoch + 1) % 10 == 0 or epoch == 0:
        print(f"Epoch {epoch+1}/{EPOCHS}")
        print(f"  Train - Loss: {train_loss:.4f}, Acc: {train_acc:.4f}, AUC: {train_auc:.4f}")
        print(f"  Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, AUC: {val_auc:.4f}")
    
    # Save best model
    if val_auc > best_auc:
        best_auc = val_auc
        patience_counter = 0
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_auc': val_auc,
            'val_acc': val_acc,
            'input_dim': INPUT_DIM,
            'scaler_mean': scaler.mean_,
            'scaler_scale': scaler.scale_
        }, best_model_path)
        if (epoch + 1) % 10 == 0:
            print(f"  ✓ Best model saved (AUC: {val_auc:.4f})")
    else:
        patience_counter += 1
        if patience_counter >= PATIENCE:
            print(f"\nEarly stopping at epoch {epoch+1}")
            break

print("\n" + "="*50)
print("TRAINING COMPLETATO!")
print("="*50)

## 5. Valutazione Finale

In [None]:
# Carica best model
# NOTA: weights_only=False necessario per caricare scaler numpy
checkpoint = torch.load(best_model_path, weights_only=False)
model.load_state_dict(checkpoint['model_state_dict'])
print(f"Best model loaded from epoch {checkpoint['epoch']+1}")
print(f"Val AUC: {checkpoint['val_auc']:.4f}")

In [None]:
# Test finale
test_loss, test_acc, test_auc, test_preds, test_labels = evaluate(
    model, test_loader, criterion, DEVICE
)

print(f"\nTest Results:")
print(f"  Loss: {test_loss:.4f}")
print(f"  Accuracy: {test_acc:.4f}")
print(f"  AUC-ROC: {test_auc:.4f}")

In [None]:
# Classification report
test_preds_binary = (test_preds > 0.5).astype(int)

print("\nClassification Report:")
print(classification_report(test_labels, test_preds_binary,
                           target_names=['Owned', 'Stray']))

In [None]:
# Visualizzazioni
fig, axes = plt.subplots(2, 2, figsize=(12, 10))

# 1. ROC Curve
fpr, tpr, _ = roc_curve(test_labels, test_preds)
axes[0, 0].plot(fpr, tpr, label=f'AUC = {test_auc:.3f}')
axes[0, 0].plot([0, 1], [0, 1], 'k--')
axes[0, 0].set_xlabel('False Positive Rate')
axes[0, 0].set_ylabel('True Positive Rate')
axes[0, 0].set_title('ROC Curve')
axes[0, 0].legend()

# 2. Confusion Matrix
cm = confusion_matrix(test_labels, test_preds_binary)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[0, 1],
           xticklabels=['Owned', 'Stray'], yticklabels=['Owned', 'Stray'])
axes[0, 1].set_xlabel('Predicted')
axes[0, 1].set_ylabel('True')
axes[0, 1].set_title('Confusion Matrix')

# 3. Training curves - Loss
axes[1, 0].plot(history['train_loss'], label='Train')
axes[1, 0].plot(history['val_loss'], label='Val')
axes[1, 0].set_xlabel('Epoch')
axes[1, 0].set_ylabel('Loss')
axes[1, 0].set_title('Training Loss')
axes[1, 0].legend()

# 4. Training curves - AUC
axes[1, 1].plot(history['train_auc'], label='Train')
axes[1, 1].plot(history['val_auc'], label='Val')
axes[1, 1].set_xlabel('Epoch')
axes[1, 1].set_ylabel('AUC')
axes[1, 1].set_title('AUC-ROC')
axes[1, 1].legend()

plt.tight_layout()
plt.savefig(OUTPUT_DIR.parent.parent / 'training' / 'notebooks' / 'pose_training_results.png', dpi=150)
plt.show()

In [None]:
# Distribuzione predizioni
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Per classe
for label, name in [(0, 'Owned'), (1, 'Stray')]:
    mask = test_labels == label
    axes[0].hist(test_preds[mask], bins=30, alpha=0.6, label=name)
axes[0].axvline(x=0.5, color='r', linestyle='--', label='Threshold')
axes[0].set_xlabel('P(stray_pose)')
axes[0].set_ylabel('Count')
axes[0].set_title('Distribuzione Predizioni per Classe')
axes[0].legend()

# Overall
axes[1].hist(test_preds, bins=30, edgecolor='black')
axes[1].axvline(x=0.5, color='r', linestyle='--', label='Threshold')
axes[1].set_xlabel('P(stray_pose)')
axes[1].set_ylabel('Count')
axes[1].set_title('Distribuzione Complessiva Predizioni')
axes[1].legend()

plt.tight_layout()
plt.savefig(OUTPUT_DIR.parent.parent / 'training' / 'notebooks' / 'pose_prediction_distribution.png', dpi=150)
plt.show()

## 6. Export Modello

In [None]:
# Salva modello finale
final_model_path = OUTPUT_DIR / 'stray_pose_classifier.pt'

torch.save({
    'model_state_dict': model.state_dict(),
    'input_dim': INPUT_DIM,
    'hidden_dims': [128, 64],
    'scaler_mean': scaler.mean_,
    'scaler_scale': scaler.scale_,
    'test_auc': test_auc,
    'test_acc': test_acc,
    'training_approach': 'weak_supervision'
}, final_model_path)

print(f"Modello salvato in: {final_model_path}")
print(f"Dimensione: {final_model_path.stat().st_size / 1024:.2f} KB")

In [None]:
# Test modello esportato
print("\nTest modello esportato...")

# Carica (weights_only=False per numpy arrays)
loaded = torch.load(final_model_path, weights_only=False)
test_model = StrayPoseMLP(
    input_dim=loaded['input_dim'],
    hidden_dims=loaded['hidden_dims']
)
test_model.load_state_dict(loaded['model_state_dict'])
test_model.eval()

# Test su un campione
sample = X_test_scaled[0:1]
sample_tensor = torch.FloatTensor(sample)

with torch.no_grad():
    pred = test_model(sample_tensor).item()

print(f"\nSample prediction:")
print(f"  P(stray_pose): {pred:.4f}")
print(f"  True label: {'Stray' if y_test[0] == 1 else 'Owned'}")
print(f"  Predicted: {'Stray' if pred > 0.5 else 'Owned'}")

In [None]:
# Riepilogo finale
print("\n" + "="*50)
print("RIEPILOGO TRAINING STRAY POSE CLASSIFIER")
print("="*50)
print(f"\nApproccio: WEAK SUPERVISION (Contributo Originale)")
print(f"  - Labels derivati dall'origine del dataset")
print(f"  - Nessuna annotazione manuale richiesta")
print(f"\nDataset:")
print(f"  - Train: {len(X_train)}")
print(f"  - Val: {len(X_val)}")
print(f"  - Test: {len(X_test)}")
print(f"  - Features: {INPUT_DIM}")
print(f"\nArchitettura: MLP ({INPUT_DIM} → 128 → 64 → 1)")
print(f"\nRisultati Test:")
print(f"  - AUC-ROC: {test_auc:.4f}")
print(f"  - Accuracy: {test_acc:.4f}")
print(f"\nModello salvato: {final_model_path}")