# AIN313 - Assignment 4 (Fall 2025) - Human Action Classification from Pose Time-Series

> **Goal:** Build **4** human-action classifiers from **OpenPose BODY-25** pose time-series and compare them with **ablation studies** and **clear plots/tables**.

**Due:** Dec 26, 2025 23:59:59

**Dataset (summary):**
- 6 actions: boxing, handclapping, handwaving, jogging, running, walking
- ~160x120 resolution, 25 fps, 25 actors
- Download link: use the Google Drive link provided in the PDF

**Deliverables (single zip, do not include dataset):**
- `project.ipynb` (report + code, self-contained)
- `project.py` (exported from notebook)
- Name: `project_studentIDs.zip`

**Team members (fill in):**
- **Person A:** _name, student ID_
- **Person B:** _name, student ID_

**How to use this notebook**
- This is a clean assignment skeleton with assigned owners and TODO checklists.
- Keep results reproducible: fix seeds, log configs, save metrics tables/figures.
- Each method needs an ablation study (multiple configs), not a single run.

---

## Global TODO (shared)
- [ ] Confirm OpenPose extraction method (CLI or Python bindings)
- [ ] Confirm dataset path(s) and label mapping (6 classes)
- [ ] Agree on pose representation: `(x,y)` or `(x,y,conf)` and joint subset policy
- [ ] Agree on evaluation protocol: stratified split (and/or CV), metrics, ablation grid size
- [ ] Decide the extra method (must be course-related and time-series suitable)


## 0. Assumptions & constraints (edit these first)

**Owners:** Person A + Person B

- [ ] Dataset downloaded locally (not committed to GitHub)
- [ ] OpenPose BODY-25 available locally (or pose `.npz` already generated)
- [ ] `.npz` pose files will be created per video (not committed), containing:
  - `pose` (raw keypoints), `pose_norm` (normalized), `frames`, `label`, `video_path`
  - optional: `label_name`
- [ ] Notebook will run end-to-end assuming `.npz` already exists
  (pose extraction cells can be marked as optional if OpenPose is not available on the runner)

**Classes (must match dataset folders):**
- boxing
- handclapping
- handwaving
- jogging
- running
- walking


In [None]:
# 1) Install / import dependencies
# Owners: Person A + Person B
# - [ ] Ensure all required packages are installed in your environment
# - [ ] Update versions if your course environment requires it

import os, json, glob, random, math, time
from pathlib import Path

import numpy as np
import pandas as pd

from tqdm import tqdm

import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, f1_score, confusion_matrix, classification_report, ConfusionMatrixDisplay
)

# Time-series + shapelets
# NOTE: install tslearn if not present: pip install tslearn
try:
    import tslearn
    from tslearn.utils import to_time_series_dataset
    from tslearn.preprocessing import TimeSeriesScalerMeanVariance
    from tslearn.metrics import cdist_gak
except Exception as e:
    print("tslearn import issue:", e)

# SVM
from sklearn.svm import SVC

# PyTorch (MLP/LSTM/extra)
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence


## 1. Configuration & reproducibility

**Owners:** Person B (primary), Person A (review)

### TODO
- [ ] Set your local paths (dataset, openpose, npz output)
- [ ] Confirm label mapping is correct for your dataset
- [ ] Choose pose format:
  - [ ] `USE_CONFIDENCE = True/False`
  - [ ] `JOINTS = all or subset`
- [ ] Decide padding / truncation policy for neural models
- [ ] Decide evaluation split protocol (default: stratified 80/20)


In [None]:
# Configuration (edit paths!)
SEED = 42

# Dataset link (from PDF)
DATASET_URL = "PASTE_LINK_FROM_PDF"

# Paths (local, not committed)
DATASET_ROOT = Path("PATH/TO/DATASET")         # raw videos
OPENPOSE_BIN  = Path("PATH/TO/OPENPOSE_BIN")   # optional
NPZ_ROOT      = Path("PATH/TO/POSE_NPZ")       # output .npz per video

# Output artifacts (OK to commit if small)
OUT_DIR = Path("outputs")
FIG_DIR = OUT_DIR / "figures"
RES_DIR = OUT_DIR / "results"
OUT_DIR.mkdir(exist_ok=True)
FIG_DIR.mkdir(exist_ok=True, parents=True)
RES_DIR.mkdir(exist_ok=True, parents=True)

# Dataset labels
CLASS_NAMES = ["boxing", "handclapping", "handwaving", "jogging", "running", "walking"]
LABEL2ID = {c:i for i,c in enumerate(CLASS_NAMES)}
ID2LABEL = {i:c for c,i in LABEL2ID.items()}

# Pose representation
USE_CONFIDENCE = False          # True -> include conf channel as feature dim
USE_JOINT_SUBSET = False        # True -> only some joints
JOINT_IDS = list(range(25))     # BODY-25; replace if subset

# Sequence handling
PAD_TO_MAXLEN = True            # for neural models
T_MAX = 150                     # truncate/pad length if PAD_TO_MAXLEN

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("DEVICE:", DEVICE)

def set_seed(seed=SEED):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(SEED)


## 2. Data discovery (videos) → metadata table

**Owners:** Person A (primary), Person B (review)

### TODO
- [ ] Implement video listing (glob patterns depend on dataset)
- [ ] Confirm label parsing from paths
- [ ] Build a dataframe: `video_path`, `label_name`, `label_id`, `video_id`
- [ ] Print dataset summary (counts per class)


In [None]:
def find_videos(dataset_root: Path, exts=(".avi", ".mp4", ".mov", ".mkv")):
    # TODO (Person A): adjust patterns for dataset structure
    paths = []
    for ext in exts:
        paths.extend(dataset_root.rglob(f"*{ext}"))
    return sorted(paths)

def infer_label_from_path(video_path: Path):
    # TODO (Person A): ensure this matches dataset folder naming
    parts = [p.lower() for p in video_path.parts]
    for cname in CLASS_NAMES:
        if cname in parts:
            return cname
    return None

videos = find_videos(DATASET_ROOT)
rows = []
for vp in videos:
    lbl = infer_label_from_path(vp)
    if lbl is None:
        continue
    rows.append({
        "video_path": str(vp),
        "label_name": lbl,
        "label_id": LABEL2ID[lbl],
        "video_id": vp.stem
    })

df_videos = pd.DataFrame(rows)
display(df_videos.head())
print("N videos:", len(df_videos))
print(df_videos["label_name"].value_counts())


## 3. OpenPose extraction → `.npz` (optional in runtime)

**Owners:** Person A (primary), Person B (review)

> If OpenPose is not available in the grading environment, keep these cells for documentation and run them locally.  
> The notebook should still run starting from the `.npz` loading section.

### TODO
- [ ] Decide extraction approach:
  - [ ] OpenPose CLI (recommended)
  - [ ] Python API bindings
- [ ] For each video:
  - [ ] Run OpenPose with BODY-25
  - [ ] Parse per-frame JSON to `[T, 25, 3]` (x,y,conf)
  - [ ] Normalize pose (`pose_norm`)
  - [ ] Save `.npz` with required fields

### Notes
- Handle missing detections (conf=0) robustly.
- Keep frame indices to support debugging and plotting.


In [None]:
# OPTIONAL: OpenPose runner (CLI-based template)

import subprocess

def run_openpose_cli(video_path: Path, out_json_dir: Path, openpose_bin: Path, model="BODY_25"):
    """Run OpenPose on a single video and save per-frame JSON outputs."""
    out_json_dir.mkdir(parents=True, exist_ok=True)
    # TODO (Person A): adjust flags to your OpenPose install
    cmd = [
        str(openpose_bin),
        "--video", str(video_path),
        "--write_json", str(out_json_dir),
        "--display", "0",
        "--render_pose", "0",
        "--model_pose", model,
    ]
    # print(" ".join(cmd))
    subprocess.run(cmd, check=True)

def parse_openpose_json_sequence(json_dir: Path):
    """Parse OpenPose JSON files into array [T, 25, 3] => x,y,conf."""
    json_files = sorted(json_dir.glob("*.json"))
    seq = []
    frames = []
    for jf in json_files:
        with open(jf, "r", encoding="utf-8") as f:
            data = json.load(f)
        people = data.get("people", [])
        if not people:
            keypoints = np.zeros((25, 3), dtype=np.float32)
        else:
            # Pick the person with the highest total confidence
            best_kp = None
            best_score = -1.0
            for person in people:
                kp = person.get("pose_keypoints_2d", [])
                if not kp:
                    continue
                kp = np.array(kp, dtype=np.float32).reshape(-1, 3)
                score = float(kp[:, 2].sum())
                if score > best_score:
                    best_score = score
                    best_kp = kp
            if best_kp is None:
                keypoints = np.zeros((25, 3), dtype=np.float32)
            else:
                keypoints = best_kp[:25]
        seq.append(keypoints)
        frames.append(len(frames))
    seq = np.stack(seq, axis=0) if len(seq) else np.zeros((0,25,3), dtype=np.float32)
    frames = np.array(frames, dtype=np.int32)
    return seq, frames

def normalize_pose(seq: np.ndarray):
    """Normalize pose in a simple, defensible way. Replace with your method."""
    # seq: [T,25,3]
    # TODO (Person A): implement normalization (center + scale)
    seq_norm = seq.copy()
    # Example: center x,y by per-frame mean of valid joints
    xy = seq_norm[..., :2]
    conf = seq_norm[..., 2:3]
    valid = (conf > 0).astype(np.float32)
    denom = np.maximum(valid.sum(axis=1, keepdims=True), 1.0)
    center = (xy * valid).sum(axis=1, keepdims=True) / denom
    xy = xy - center
    seq_norm[..., :2] = xy
    return seq_norm

def build_npz_for_video(video_row, npz_root: Path):
    vp = Path(video_row["video_path"])
    label_id = int(video_row["label_id"])
    label_name = video_row["label_name"]
    out_path = npz_root / f"{vp.stem}.npz"
    if out_path.exists():
        return str(out_path)

    tmp_json_dir = npz_root / "_openpose_json" / vp.stem

    # Option 1: run OpenPose now (uncomment when ready)
    # run_openpose_cli(vp, tmp_json_dir, OPENPOSE_BIN)

    if not tmp_json_dir.exists():
        raise RuntimeError(
            "OpenPose JSON not found. Run OpenPose or point to existing output in tmp_json_dir."
        )

    seq, frames = parse_openpose_json_sequence(tmp_json_dir)
    if seq.shape[0] == 0:
        raise RuntimeError(f"No frames parsed for {vp}")

    seq_norm = normalize_pose(seq)

    npz_root.mkdir(parents=True, exist_ok=True)
    np.savez_compressed(
        out_path,
        pose=seq.astype(np.float32),
        pose_norm=seq_norm.astype(np.float32),
        frames=frames,
        label=label_id,
        label_name=label_name,
        video_path=str(vp),
    )
    return str(out_path)

# Batch template (run locally)
# for _, row in tqdm(df_videos.iterrows(), total=len(df_videos)):
#     build_npz_for_video(row, NPZ_ROOT)


## 4. Load `.npz` pose dataset → unified in-memory samples

**Owners:** Person A (primary), Person B (review)

### TODO
- [ ] Implement loader that reads:
  - `pose_norm` (preferred) or `pose`
  - `label`
  - `frames` (optional)
- [ ] Convert each sample to a standard representation:
  - `X_i` as `[T, D]` where `D = 25*2` or `25*3`
- [ ] Create `samples` list with fields:
  - `X`, `y`, `length`, `video_id`
- [ ] Summarize lengths and class counts


In [None]:
def load_npz_samples(npz_root: Path):
    npz_files = sorted(npz_root.glob("*.npz"))
    samples = []
    for f in npz_files:
        data = np.load(f, allow_pickle=True)
        pose = data["pose_norm"] if "pose_norm" in data.files else data["pose"]  # [T,25,3]
        y = int(data["label"])
        video_id = Path(str(data.get("video_path", f.stem))).stem

        # Joint subset
        pose = pose[:, JOINT_IDS, :] if USE_JOINT_SUBSET else pose[:, :25, :]

        # Feature dims
        if USE_CONFIDENCE:
            feat = pose.reshape(pose.shape[0], -1)            # [T, 25*3]
        else:
            feat = pose[..., :2].reshape(pose.shape[0], -1)   # [T, 25*2]

        samples.append({
            "X": feat.astype(np.float32),
            "y": y,
            "length": int(feat.shape[0]),
            "video_id": video_id,
            "npz_path": str(f),
        })
    return samples

samples = load_npz_samples(NPZ_ROOT)
print("Loaded samples:", len(samples))
print("Class counts:", pd.Series([s['y'] for s in samples]).value_counts().sort_index().to_dict())
lengths = np.array([s["length"] for s in samples])
print("Length stats:", dict(min=int(lengths.min()), max=int(lengths.max()), mean=float(lengths.mean()), median=float(np.median(lengths))))


## 5. Train/test split + shared preprocessing utilities

**Owners:** Person B (primary), Person A (review)

### TODO
- [ ] Implement stratified split
- [ ] Decide scaling:
  - [ ] per-sequence scaler (safe, no leakage)
  - [ ] global scaler fitted on train only (be careful)
- [ ] Decide fixed-length policy for neural models:
  - [ ] `pad/truncate to T_MAX`
  - [ ] or keep variable-length with packing (recommended for LSTM)
- [ ] Implement helper functions reused by all methods


In [None]:
def stratified_split(samples, test_size=0.2, seed=SEED):
    y = np.array([s["y"] for s in samples])
    idx = np.arange(len(samples))
    tr_idx, te_idx = train_test_split(idx, test_size=test_size, random_state=seed, stratify=y)
    train_samples = [samples[i] for i in tr_idx]
    test_samples  = [samples[i] for i in te_idx]
    return train_samples, test_samples

train_samples, test_samples = stratified_split(samples, test_size=0.2)
print("Train:", len(train_samples), "Test:", len(test_samples))

def pad_or_truncate(X: np.ndarray, T_max=T_MAX):
    T, D = X.shape
    if T == T_max:
        return X
    if T > T_max:
        return X[:T_max]
    pad = np.zeros((T_max - T, D), dtype=X.dtype)
    return np.vstack([X, pad])

def to_tslearn_dataset(sample_list):
    # tslearn wants array-like of shape [N, T, D] (possibly ragged -> to_time_series_dataset)
    X = [s["X"] for s in sample_list]
    return to_time_series_dataset(X)

def get_xy(sample_list):
    X = [s["X"] for s in sample_list]
    y = np.array([s["y"] for s in sample_list], dtype=np.int64)
    return X, y


## 6. Evaluation helpers (metrics, confusion matrices, result logging)

**Owners:** Person B (primary), Person A (review)

### TODO
- [ ] Implement a standard evaluation dictionary (accuracy, macro-F1, per-class report)
- [ ] Implement confusion matrix plotting + saving
- [ ] Create a `results` list of dicts for experiment tracking


In [None]:
def evaluate_and_report(y_true, y_pred, title="", save_cm_path=None):
    acc = accuracy_score(y_true, y_pred)
    f1  = f1_score(y_true, y_pred, average="macro")
    print(f"{title}  |  acc={acc:.4f}  macroF1={f1:.4f}")
    print(classification_report(y_true, y_pred, target_names=CLASS_NAMES, digits=4))

    cm = confusion_matrix(y_true, y_pred, labels=list(range(len(CLASS_NAMES))))
    disp = ConfusionMatrixDisplay(cm, display_labels=CLASS_NAMES)
    fig, ax = plt.subplots(figsize=(7,6))
    disp.plot(ax=ax, cmap="Blues", colorbar=False, xticks_rotation=45)
    ax.set_title(title)
    plt.tight_layout()
    if save_cm_path is not None:
        fig.savefig(save_cm_path, dpi=200)
    plt.show()

    return {"title": title, "accuracy": acc, "macro_f1": f1, "cm": cm}

RESULTS = []


# METHOD 1 — GAK + SVM (required)

**Owners:** Person B (primary), Person A (review)

### Checklist
- [ ] Prepare sequences for tslearn (`[N, T, D]`, variable length allowed)
- [ ] Compute GAK Gram matrix for train (`K_train`)
- [ ] Train SVM on precomputed kernel
- [ ] Compute `K_test` and predict
- [ ] Log metrics + save confusion matrix
- [ ] Ablation study:
  - [ ] `sigma` (kernel bandwidth) sweep
  - [ ] SVM `C` sweep
  - [ ] optional: joint subset / confidence feature toggle


In [None]:
def gak_svm_train_predict(train_samples, test_samples, sigma=1.0, C=1.0):
    # Prepare
    X_train, y_train = get_xy(train_samples)
    X_test,  y_test  = get_xy(test_samples)

    Xtr = to_time_series_dataset(X_train)  # [N, T, D] with padding by tslearn
    Xte = to_time_series_dataset(X_test)

    # Optional scaling (per-series)
    scaler = TimeSeriesScalerMeanVariance()
    Xtr_s = scaler.fit_transform(Xtr)
    Xte_s = scaler.transform(Xte)

    # GAK Gram matrices
    K_train = cdist_gak(Xtr_s, Xtr_s, sigma=sigma)
    K_test  = cdist_gak(Xte_s, Xtr_s, sigma=sigma)

    clf = SVC(kernel="precomputed", C=C)
    clf.fit(K_train, y_train)
    y_pred = clf.predict(K_test)
    return y_test, y_pred

# Quick single run (edit params)
# y_true, y_pred = gak_svm_train_predict(train_samples, test_samples, sigma=1.0, C=1.0)
# metrics = evaluate_and_report(y_true, y_pred, title="GAK+SVM", save_cm_path=FIG_DIR/"cm_gak_svm.png")
# RESULTS.append({"method":"GAK+SVM", "sigma":1.0, "C":1.0, **metrics})


## METHOD 1 — Ablation grid (GAK+SVM)

**Owners:** Person B

### TODO
- [ ] Define a reasonable grid (e.g., 6–12 runs total)
- [ ] Save results to CSV
- [ ] Plot performance vs sigma and vs C


In [None]:
def run_gak_svm_ablation(train_samples, test_samples, sigmas, Cs):
    for sigma in sigmas:
        for C in Cs:
            y_true, y_pred = gak_svm_train_predict(train_samples, test_samples, sigma=sigma, C=C)
            title = f"GAK+SVM sigma={sigma} C={C}"
            cm_path = FIG_DIR / f"cm_gak_svm_sigma{sigma}_C{C}.png"
            metrics = evaluate_and_report(y_true, y_pred, title=title, save_cm_path=cm_path)
            RESULTS.append({"method":"GAK+SVM", "sigma":sigma, "C":C, **metrics})

# Example grid (edit)
# sigmas = [0.5, 1.0, 2.0]
# Cs = [0.1, 1.0, 10.0]
# run_gak_svm_ablation(train_samples, test_samples, sigmas, Cs)

# Save table
# pd.DataFrame(RESULTS).to_csv(RES_DIR/"results_all.csv", index=False)


# METHOD 2 — Shapelets + MLP (required)

**Owners:** Person A (primary for shapelets), Person B (primary for PyTorch MLP)

### Checklist
- [ ] Fit shapelet transform/model on training set
- [ ] Transform train/test to fixed-length feature vectors
- [ ] Train MLP classifier (PyTorch)
- [ ] Evaluate + confusion matrix
- [ ] Ablations:
  - [ ] shapelet sizes / counts
  - [ ] MLP hidden size / depth / dropout
  - [ ] learning rate / epochs


In [None]:
# Shapelets imports (may fail if tslearn version differs)
try:
    from tslearn.shapelets import ShapeletModel
except Exception as e:
    print("ShapeletModel import issue:", e)

def build_shapelet_datasets(train_samples, test_samples):
    # For shapelets, use [N,T,D] with possible scaling
    X_train, y_train = get_xy(train_samples)
    X_test,  y_test  = get_xy(test_samples)
    Xtr = to_time_series_dataset(X_train)
    Xte = to_time_series_dataset(X_test)
    scaler = TimeSeriesScalerMeanVariance()
    Xtr_s = scaler.fit_transform(Xtr)
    Xte_s = scaler.transform(Xte)
    return Xtr_s, y_train, Xte_s, y_test

def fit_shapelets(Xtr, ytr, n_shapelets_per_size, max_iter=50):
    """Fit shapelet model. You must tune sizes/counts."""
    # TODO (Person A): choose params based on assignment + ablations
    shp = ShapeletModel(
        n_shapelets_per_size=n_shapelets_per_size,
        optimizer="adam",
        weight_regularizer=0.01,
        max_iter=max_iter,
        verbose=1,
        random_state=SEED
    )
    shp.fit(Xtr, ytr)
    return shp

class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dims, num_classes, dropout=0.2):
        super().__init__()
        layers = []
        d = input_dim
        for h in hidden_dims:
            layers += [nn.Linear(d, h), nn.ReLU(), nn.Dropout(dropout)]
            d = h
        layers += [nn.Linear(d, num_classes)]
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)

def train_mlp_on_features(Xtr_feat, ytr, Xte_feat, yte, hidden_dims=(256,128), lr=1e-3, epochs=30, batch_size=64, dropout=0.2):
    Xtr_t = torch.tensor(Xtr_feat, dtype=torch.float32)
    ytr_t = torch.tensor(ytr, dtype=torch.long)
    Xte_t = torch.tensor(Xte_feat, dtype=torch.float32)
    yte_t = torch.tensor(yte, dtype=torch.long)

    train_ds = torch.utils.data.TensorDataset(Xtr_t, ytr_t)
    test_ds  = torch.utils.data.TensorDataset(Xte_t, yte_t)
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    test_loader  = DataLoader(test_ds, batch_size=batch_size, shuffle=False)

    model = MLP(Xtr_feat.shape[1], list(hidden_dims), len(CLASS_NAMES), dropout=dropout).to(DEVICE)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()

    history = {"train_loss":[], "train_acc":[], "test_acc":[]}

    for ep in range(1, epochs+1):
        model.train()
        total_loss=0.0
        correct=0
        n=0
        for xb, yb in train_loader:
            xb, yb = xb.to(DEVICE), yb.to(DEVICE)
            opt.zero_grad()
            logits = model(xb)
            loss = loss_fn(logits, yb)
            loss.backward()
            opt.step()
            total_loss += loss.item()*len(xb)
            pred = logits.argmax(dim=1)
            correct += (pred==yb).sum().item()
            n += len(xb)

        train_loss = total_loss/n
        train_acc = correct/n

        model.eval()
        all_pred=[]
        all_true=[]
        with torch.no_grad():
            for xb, yb in test_loader:
                xb = xb.to(DEVICE)
                logits = model(xb)
                pred = logits.argmax(dim=1).cpu().numpy()
                all_pred.append(pred)
                all_true.append(yb.numpy())
        all_pred = np.concatenate(all_pred)
        all_true = np.concatenate(all_true)
        test_acc = accuracy_score(all_true, all_pred)

        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)
        history["test_acc"].append(test_acc)

        if ep % max(1, epochs//10) == 0 or ep == 1:
            print(f"epoch {ep:03d} | loss {train_loss:.4f} | train_acc {train_acc:.4f} | test_acc {test_acc:.4f}")

    return model, history, all_true, all_pred

# Pipeline template (uncomment after implementing shapelet transform properly)
# Xtr_s, ytr, Xte_s, yte = build_shapelet_datasets(train_samples, test_samples)
# shapelet_model = fit_shapelets(Xtr_s, ytr, n_shapelets_per_size={10:5, 20:5}, max_iter=50)
# Xtr_feat = shapelet_model.transform(Xtr_s)
# Xte_feat = shapelet_model.transform(Xte_s)
# mlp_model, hist, y_true, y_pred = train_mlp_on_features(Xtr_feat, ytr, Xte_feat, yte)
# metrics = evaluate_and_report(y_true, y_pred, title="Shapelets+MLP", save_cm_path=FIG_DIR/"cm_shapelets_mlp.png")
# RESULTS.append({"method":"Shapelets+MLP", **metrics})


## METHOD 2 — Ablation grid (Shapelets+MLP)

**Owners:** Person A (shapelet config sweep), Person B (MLP sweep)

### TODO
- [ ] Define shapelet configs:
  - [ ] sizes (e.g., 10/20/30)
  - [ ] counts per size (small/med/large)
  - [ ] max_iter
- [ ] Define MLP configs:
  - [ ] hidden_dims, dropout
  - [ ] lr, epochs
- [ ] Run a controlled grid (aim ~8–16 runs total) and log results


In [None]:
# TODO: Implement ablation runner similar to GAK+SVM.
# Suggestions:
# - Fix MLP config and sweep shapelets
# - Then fix best shapelets and sweep MLP hyperparams
#
# Save:
# pd.DataFrame(RESULTS).to_csv(RES_DIR/"results_all.csv", index=False)


# METHOD 3 — LSTM classifier (required)

**Owners:** Person B (primary), Person A (review)

### Checklist
- [ ] Build PyTorch Dataset that returns variable-length sequences
- [ ] Implement padded collate + lengths
- [ ] Implement LSTM with packing (`pack_padded_sequence`)
- [ ] Train, evaluate, log
- [ ] Ablations:
  - [ ] hidden size, layers
  - [ ] bidirectional on/off
  - [ ] dropout
  - [ ] confidence on/off, joint subset


In [None]:
class PoseSeqDataset(Dataset):
    def __init__(self, sample_list, pad_to_maxlen=False, T_max=T_MAX):
        self.samples = sample_list
        self.pad_to_maxlen = pad_to_maxlen
        self.T_max = T_max

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        s = self.samples[idx]
        X = s["X"]
        y = s["y"]
        if self.pad_to_maxlen:
            X = pad_or_truncate(X, self.T_max)
            length = min(s["length"], self.T_max)
        else:
            length = s["length"]
        return torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.long), length

def collate_pad(batch):
    xs, ys, lens = zip(*batch)
    lens = torch.tensor(lens, dtype=torch.long)
    xs_padded = pad_sequence(xs, batch_first=True)  # [B, T_max, D]
    ys = torch.stack(ys)
    return xs_padded, lens, ys

class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden_size, num_layers, num_classes, bidirectional=False, dropout=0.2):
        super().__init__()
        self.bidirectional = bidirectional
        self.lstm = nn.LSTM(
            input_dim, hidden_size, num_layers=num_layers,
            batch_first=True,
            bidirectional=bidirectional,
            dropout=dropout if num_layers > 1 else 0.0
        )
        out_dim = hidden_size * (2 if bidirectional else 1)
        self.fc = nn.Linear(out_dim, num_classes)

    def forward(self, x, lengths):
        # x: [B,T,D], lengths: [B]
        lengths_sorted, idx_sort = torch.sort(lengths, descending=True)
        x_sorted = x[idx_sort]

        packed = pack_padded_sequence(x_sorted, lengths_sorted.cpu(), batch_first=True, enforce_sorted=True)
        packed_out, (hn, cn) = self.lstm(packed)

        # last layer hidden
        if self.bidirectional:
            # hn: [num_layers*2, B, H] -> take last layer forward/back
            forward_last = hn[-2]
            backward_last = hn[-1]
            h_last = torch.cat([forward_last, backward_last], dim=1)
        else:
            h_last = hn[-1]  # [B,H]

        # unsort
        _, idx_unsort = torch.sort(idx_sort)
        h_last = h_last[idx_unsort]

        logits = self.fc(h_last)
        return logits

def train_lstm(train_samples, test_samples, hidden_size=128, num_layers=2, bidirectional=True, dropout=0.2, lr=1e-3, epochs=25, batch_size=32):
    train_ds = PoseSeqDataset(train_samples, pad_to_maxlen=False)
    test_ds  = PoseSeqDataset(test_samples,  pad_to_maxlen=False)

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=collate_pad)
    test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False, collate_fn=collate_pad)

    input_dim = train_samples[0]["X"].shape[1]
    model = LSTMClassifier(input_dim, hidden_size, num_layers, len(CLASS_NAMES), bidirectional=bidirectional, dropout=dropout).to(DEVICE)

    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()

    history = {"train_loss":[], "train_acc":[], "test_acc":[]}

    for ep in range(1, epochs+1):
        model.train()
        total_loss=0.0
        correct=0
        n=0
        for xb, lens, yb in train_loader:
            xb, lens, yb = xb.to(DEVICE), lens.to(DEVICE), yb.to(DEVICE)
            opt.zero_grad()
            logits = model(xb, lens)
            loss = loss_fn(logits, yb)
            loss.backward()
            opt.step()
            total_loss += loss.item()*len(xb)
            pred = logits.argmax(dim=1)
            correct += (pred==yb).sum().item()
            n += len(xb)
        train_loss = total_loss/n
        train_acc = correct/n

        model.eval()
        all_pred=[]
        all_true=[]
        with torch.no_grad():
            for xb, lens, yb in test_loader:
                xb, lens = xb.to(DEVICE), lens.to(DEVICE)
                logits = model(xb, lens)
                pred = logits.argmax(dim=1).cpu().numpy()
                all_pred.append(pred)
                all_true.append(yb.numpy())
        all_pred = np.concatenate(all_pred)
        all_true = np.concatenate(all_true)
        test_acc = accuracy_score(all_true, all_pred)

        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)
        history["test_acc"].append(test_acc)

        if ep % max(1, epochs//10) == 0 or ep == 1:
            print(f"epoch {ep:03d} | loss {train_loss:.4f} | train_acc {train_acc:.4f} | test_acc {test_acc:.4f}")

    return model, history, all_true, all_pred

# Run template
# lstm_model, lstm_hist, y_true, y_pred = train_lstm(train_samples, test_samples)
# metrics = evaluate_and_report(y_true, y_pred, title="LSTM", save_cm_path=FIG_DIR/"cm_lstm.png")
# RESULTS.append({"method":"LSTM", **metrics})


## METHOD 3 — Ablation grid (LSTM)

**Owners:** Person B

### TODO
- [ ] Sweep:
  - [ ] hidden_size: 64/128/256
  - [ ] num_layers: 1/2/3
  - [ ] bidirectional: False/True
  - [ ] dropout: 0.0/0.2/0.5
- [ ] Log results + pick best


In [None]:
# TODO: implement ablation runner


# METHOD 4 — Extra time-series classifier (required: choose one)

**Owners:** Person A (primary), Person B (review)

Recommended: **Temporal 1D CNN** (strong baseline, easy ablations)

### Checklist
- [ ] Implement TemporalCNN classifier on pose sequences
- [ ] Decide how to handle variable length:
  - [ ] fixed-length pad/truncate to T_MAX, or
  - [ ] global pooling over time with masking
- [ ] Train + evaluate + log
- [ ] Ablations:
  - [ ] kernel sizes (3/5/7)
  - [ ] channels/blocks
  - [ ] pooling type


In [None]:
class TemporalCNN(nn.Module):
    def __init__(self, input_dim, num_classes, channels=(128,128), kernel_size=5, dropout=0.2):
        super().__init__()
        layers = []
        in_ch = input_dim
        for ch in channels:
            layers += [
                nn.Conv1d(in_ch, ch, kernel_size=kernel_size, padding=kernel_size//2),
                nn.ReLU(),
                nn.Dropout(dropout),
                nn.MaxPool1d(kernel_size=2),
            ]
            in_ch = ch
        self.conv = nn.Sequential(*layers)
        self.head = nn.Sequential(
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Linear(in_ch, num_classes)
        )

    def forward(self, x):
        # x: [B,T,D] -> [B,D,T]
        x = x.transpose(1,2)
        z = self.conv(x)
        return self.head(z)

def train_temporal_cnn(train_samples, test_samples, channels=(128,128), kernel_size=5, dropout=0.2, lr=1e-3, epochs=25, batch_size=32):
    # Fixed-length for CNN
    train_ds = PoseSeqDataset(train_samples, pad_to_maxlen=True, T_max=T_MAX)
    test_ds  = PoseSeqDataset(test_samples,  pad_to_maxlen=True, T_max=T_MAX)

    def collate_fixed(batch):
        xs, ys, lens = zip(*batch)
        xs = torch.stack(xs)  # already fixed [T_MAX,D]
        ys = torch.stack(ys)
        return xs, ys

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=collate_fixed)
    test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False, collate_fn=collate_fixed)

    input_dim = train_samples[0]["X"].shape[1]
    model = TemporalCNN(input_dim, len(CLASS_NAMES), channels=channels, kernel_size=kernel_size, dropout=dropout).to(DEVICE)
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()

    history = {"train_loss":[], "train_acc":[], "test_acc":[]}

    for ep in range(1, epochs+1):
        model.train()
        total_loss=0.0
        correct=0
        n=0
        for xb, yb in train_loader:
            xb, yb = xb.to(DEVICE), yb.to(DEVICE)
            opt.zero_grad()
            logits = model(xb)
            loss = loss_fn(logits, yb)
            loss.backward()
            opt.step()
            total_loss += loss.item()*len(xb)
            pred = logits.argmax(dim=1)
            correct += (pred==yb).sum().item()
            n += len(xb)
        train_loss = total_loss/n
        train_acc = correct/n

        model.eval()
        all_pred=[]
        all_true=[]
        with torch.no_grad():
            for xb, yb in test_loader:
                xb = xb.to(DEVICE)
                logits = model(xb)
                pred = logits.argmax(dim=1).cpu().numpy()
                all_pred.append(pred)
                all_true.append(yb.numpy())
        all_pred = np.concatenate(all_pred)
        all_true = np.concatenate(all_true)
        test_acc = accuracy_score(all_true, all_pred)

        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)
        history["test_acc"].append(test_acc)

        if ep % max(1, epochs//10) == 0 or ep == 1:
            print(f"epoch {ep:03d} | loss {train_loss:.4f} | train_acc {train_acc:.4f} | test_acc {test_acc:.4f}")

    return model, history, all_true, all_pred

# Run template
# cnn_model, cnn_hist, y_true, y_pred = train_temporal_cnn(train_samples, test_samples)
# metrics = evaluate_and_report(y_true, y_pred, title="TemporalCNN", save_cm_path=FIG_DIR/"cm_temporalcnn.png")
# RESULTS.append({"method":"TemporalCNN", **metrics})


## METHOD 4 — Ablation grid (Extra method)

**Owners:** Person A

### TODO
- [ ] Sweep:
  - [ ] kernel_size: 3/5/7
  - [ ] channels: (64,64) vs (128,128) vs (256,256)
  - [ ] dropout: 0.0/0.2/0.5
- [ ] Log results, pick best


In [None]:
# TODO: implement ablation runner


# 7. Results aggregation & comparison (ALL methods)

**Owners:** Person A + Person B

### Checklist
- [ ] Convert `RESULTS` to DataFrame
- [ ] Save to CSV in `outputs/results/`
- [ ] Create a summary table (best per method)
- [ ] Create at least:
  - [ ] overall comparison bar plot (accuracy or macro-F1)
  - [ ] ablation plots (e.g., performance vs hyperparameter)
- [ ] Write short analysis paragraphs:
  - [ ] Which method won and why?
  - [ ] What hyperparameters mattered most?
  - [ ] Which classes were confused most and why (based on CM)?


In [None]:
def results_to_df(results_list):
    # Drop huge matrices for CSV friendliness
    rows=[]
    for r in results_list:
        rr = dict(r)
        if "cm" in rr:
            rr["cm"] = rr["cm"].tolist()  # still ok; or remove
        rows.append(rr)
    return pd.DataFrame(rows)

# df_results = results_to_df(RESULTS)
# display(df_results.head())
# df_results.to_csv(RES_DIR/"results_all.csv", index=False)

# Best-per-method summary
# if len(df_results):
#     best = df_results.sort_values(["method","macro_f1"], ascending=[True, False]).groupby("method").head(1)
#     display(best[["method","accuracy","macro_f1","title"]])


# 8. Write-up (final report sections inside notebook)

**Owners:** Person A + Person B

### Required narrative (keep it concise but concrete, self-contained)
- [ ] Dataset overview & pose extraction
- [ ] Preprocessing choices (normalization, missing joints, padding)
- [ ] Include pseudocode or figures where they clarify key steps
- [ ] Summarize ablation design (what you swept, why, and what changed)
- [ ] For each method:
  - [ ] brief description
  - [ ] hyperparameter search / ablation design
  - [ ] best settings and results
  - [ ] confusion matrix interpretation
- [ ] Cross-method comparison and conclusion
- [ ] Limitations and future work

> Tip: Write like a scientist, not like a poet—save poetry for your Git commit messages.


## 9. Appendix (optional)

**Owners:** whoever has energy left

### Ideas
- [ ] Visualize sample skeletons over time (animated or key frames)
- [ ] Plot joint trajectories for each class
- [ ] Class imbalance handling tests
- [ ] Sensitivity to T_MAX or resampling rate
