In [None]:
pip install mediapipe==0.10.14 opencv-python pillow tqdm
pip install torch numpy scikit-learn
pip install optuna

Collecting optuna
  Downloading optuna-4.6.0-py3-none-any.whl.metadata (17 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.10.1-py3-none-any.whl.metadata (11 kB)
Downloading optuna-4.6.0-py3-none-any.whl (404 kB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m404.7/404.7 kB[0m [31m31.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.10.1-py3-none-any.whl (11 kB)
Installing collected packages: colorlog, optuna
Successfully installed colorlog-6.10.1 optuna-4.6.0


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#New Mediapipe
import os
import json
import argparse
from pathlib import Path
from typing import List, Optional, Dict
import numpy as np
from PIL import Image
from tqdm import tqdm

# ‚úÖ Optional: detect if we're running in Google Colab
try:
    from google.colab import drive
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

# Mediapipe is required. In Colab, install with:
# !pip install mediapipe==0.10.14 opencv-python pillow tqdm
import mediapipe as mp
import cv2
mp_hands = mp.solutions.hands


def list_images_by_label(root: Path, patterns: List[str]) -> Dict[str, List[Path]]:
    """
    Assumes directory structure like:
      root/
        A/*.jpg
        B/*.png
        ...
    Returns a dict: label -> list of image paths
    """
    label_to_files = {}
    for label_dir in sorted([p for p in root.iterdir() if p.is_dir()]):
        files = []
        for pat in patterns:
            files.extend(sorted(label_dir.glob(pat)))
        if files:
            label_to_files[label_dir.name] = files
    return label_to_files


def extract_hand_landmarks(img_bgr: np.ndarray,
                           hands_detector: mp_hands.Hands) -> Optional[np.ndarray]:
    """
    Returns (21, 3) array of (x,y,z) in normalized image coordinates for the BEST hand
    (highest detection score) or None if no hands.
    """
    rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
    result = hands_detector.process(rgb)
    if not result.multi_hand_landmarks or not result.multi_handedness:
        return None

    # choose the hand with highest score
    scores = [h.classification[0].score for h in result.multi_handedness]
    idx = int(np.argmax(scores))
    hand_lms = result.multi_hand_landmarks[idx]

    pts = []
    for lm in hand_lms.landmark:
        pts.append([lm.x, lm.y, lm.z])
    return np.asarray(pts, dtype=np.float32)  # (21, 3)


def clean_landmarks(pts_xyz: np.ndarray) -> np.ndarray:
    """
    Cleaning as specified:
      - Remove z (depth)
      - Centralize coordinates to the center point of the hand (mean of x,y over 21 landmarks)
      - Flatten to 1D
      - Normalize w.r.t. max absolute value (so values in [-1,1])
    Input: (21,3) float32
    Output: (42,) float32
    """
    pts_xy = pts_xyz[:, :2]  # (21, 2)
    center = pts_xy.mean(axis=0, keepdims=True)  # (1,2)
    pts_centered = pts_xy - center  # (21, 2)
    flat = pts_centered.reshape(-1)  # (42,)
    denom = np.max(np.abs(flat))
    if denom < 1e-12:
        denom = 1.0
    flat_norm = flat / denom
    return flat_norm.astype(np.float32)


def read_image_bgr(path: Path, max_side: Optional[int] = None) -> np.ndarray:
    """
    Read with Pillow (same as original), convert to BGR for Mediapipe/OpenCV.
    Optionally resize so the longest side == max_side to speed up processing (keeps aspect).
    """
    img = Image.open(path).convert("RGB")
    if max_side is not None:
        w, h = img.size
        scale = max(w, h) / max_side
        if scale > 1.0:
            new_w = int(round(w / scale))
            new_h = int(round(h / scale))
            img = img.resize((new_w, new_h), Image.BILINEAR)
    arr = np.array(img)  # RGB
    bgr = arr[:, :, ::-1].copy()
    return bgr


def stratified_split_3way(paths: List[Path],
                          labels: List[int],
                          train_ratio: float = 0.7,
                          val_ratio: float = 0.2,
                          test_ratio: float = 0.1,
                          seed: int = 42):
    """
    Simple stratified split into train/val/test without sklearn.

    Ratios must sum to ~1.0.
    """
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6, \
        "train_ratio + val_ratio + test_ratio must equal 1.0"

    rng = np.random.default_rng(seed)
    paths = np.array(paths, dtype=object)
    labels = np.array(labels, dtype=int)
    unique = np.unique(labels)

    train_idx, val_idx, test_idx = [], [], []

    for c in unique:
        idx = np.where(labels == c)[0]
        rng.shuffle(idx)
        n = len(idx)

        n_train = int(round(n * train_ratio))
        n_val   = int(round(n * val_ratio))
        # Ensure we don't lose samples due to rounding
        n_test  = n - n_train - n_val
        if n_test < 0:
            # In rare rounding issues, fix by reducing validation
            n_test = 0
            n_val = n - n_train

        train_idx.extend(idx[:n_train])
        val_idx.extend(idx[n_train:n_train + n_val])
        test_idx.extend(idx[n_train + n_val:])

    return (
        paths[train_idx].tolist(), labels[train_idx].tolist(),
        paths[val_idx].tolist(),   labels[val_idx].tolist(),
        paths[test_idx].tolist(),  labels[test_idx].tolist(),
    )


def build_dataset(data_root: Path,
                  output_dir: Path,
                  patterns: List[str],
                  max_side: Optional[int],
                  min_confidence: float,
                  static_image_mode: bool,
                  train_ratio: float,
                  val_ratio: float,
                  test_ratio: float,
                  seed: int):
    output_dir.mkdir(parents=True, exist_ok=True)

    # Map labels to integer ids
    label_to_files = list_images_by_label(data_root, patterns)
    if not label_to_files:
        raise SystemExit(f"No images found under {data_root}. Expected folder-per-label with images.")

    labels_sorted = sorted(label_to_files.keys())
    label_to_id = {lab: i for i, lab in enumerate(labels_sorted)}
    with open(output_dir / "labels.json", "w", encoding="utf-8") as f:
        json.dump({"labels": labels_sorted}, f, indent=2)

    # Flatten file list and labels
    all_paths, all_labels = [], []
    for lab, files in label_to_files.items():
        all_paths.extend(files)
        all_labels.extend([label_to_id[lab]] * len(files))

    # Stratified split by label into train/val/test
    (train_paths, train_labels,
     val_paths, val_labels,
     test_paths, test_labels) = stratified_split_3way(
        all_paths,
        all_labels,
        train_ratio=train_ratio,
        val_ratio=val_ratio,
        test_ratio=test_ratio,
        seed=seed,
    )

    def process_split(paths: List[Path], labels: List[int], split_name: str,
                      hands: mp_hands.Hands):
        xs, ys, kept = [], [], 0
        for p, y in tqdm(zip(paths, labels), total=len(paths), desc=f"Extracting {split_name}"):
            try:
                img_bgr = read_image_bgr(p, max_side=max_side)
                pts = extract_hand_landmarks(img_bgr, hands)
                if pts is None:
                    continue
                feat = clean_landmarks(pts)  # (42,)
                xs.append(feat)
                ys.append(y)
                kept += 1
            except Exception:
                # Skip unreadable/bad images
                continue

        if kept == 0:
            raise SystemExit(f"No hands found for split {split_name}. Check data or confidence settings.")
        X = np.stack(xs, axis=0).astype(np.float32)  # (N, 42)
        y = np.asarray(ys, dtype=np.int64)           # (N,)
        np.savez_compressed(output_dir / f"{split_name}.npz", X=X, y=y)
        return kept

    # ‚úÖ Single Mediapipe Hands instance reused for all splits
    with mp_hands.Hands(
        static_image_mode=static_image_mode,
        max_num_hands=2,
        min_detection_confidence=min_confidence
    ) as hands:
        ntr   = process_split(train_paths, train_labels, "train", hands)
        nval  = process_split(val_paths,  val_labels,  "val",   hands)
        ntest = process_split(test_paths, test_labels, "test",  hands)

    # Write a small summary
    summary = {
        "data_root": str(data_root),
        "output_dir": str(output_dir),
        "labels": labels_sorted,
        "counts_after_extraction": {
            "train": int(ntr),
            "val":   int(nval),
            "test":  int(ntest),
        },
        "splits": {
            "train_ratio": float(train_ratio),
            "val_ratio":   float(val_ratio),
            "test_ratio":  float(test_ratio),
        },
        "feature_dim": 42,
        "normalization": "center by mean(x,y), divide by max-abs; z removed",
    }
    with open(output_dir / "summary.json", "w", encoding="utf-8") as f:
        json.dump(summary, f, indent=2)
    print(json.dumps(summary, indent=2))


def main():
    ap = argparse.ArgumentParser(description="ASL Approach 2: Landmark extraction & dataset prep (fast + original-like)")
    ap.add_argument(
        "--data_dir",
        type=str,
        required=False,  # allow default in Colab
        help=(
            "Root folder containing subfolders per label (e.g., data/raw/A, data/raw/B, ...). "
            "In Colab with Google Drive mounted, this might look like "
            "'/content/drive/MyDrive/Duke University/CV-Group6/asl_alphabet_train'."
        ),
    )
    ap.add_argument(
        "--output_dir",
        type=str,
        default="/content/drive/MyDrive/Duke University/CV-Group6",
        help="Where to write train/val/test .npz, labels.json, summary.json",
    )
    ap.add_argument(
        "--patterns",
        type=str,
        nargs="+",
        default=["*.jpg", "*.jpeg", "*.png", "*.bmp"],
        help="Glob patterns for image files per label folder",
    )
    ap.add_argument(
        "--max_side",
        type=int,
        default=512,  # same as your original behavior
        help="If set, downscale images so the longest side equals this (speeds up processing)",
    )
    ap.add_argument(
        "--min_confidence",
        type=float,
        default=0.5,
        help="Mediapipe min_detection_confidence",
    )
    ap.add_argument(
        "--static_image_mode",
        action="store_true",
        help="Use Mediapipe static image mode (recommended for photos)",
    )
    ap.add_argument("--train_ratio", type=float, default=0.7)
    ap.add_argument("--val_ratio",   type=float, default=0.2)
    ap.add_argument("--test_ratio",  type=float, default=0.1)
    ap.add_argument("--seed",        type=int,   default=42)

    # Fix for Colab: argparse tries to parse kernel arguments like -f
    if IN_COLAB:
        args = ap.parse_args(args=[])
    else:
        args = ap.parse_args()

    # If data_dir not provided, set a default Google Drive path in Colab
    if args.data_dir is None:
        if IN_COLAB:
            default_drive_path = "/content/drive/MyDrive/Duke University/CV-Group6/asl_alphabet_train"
            print(f"No --data_dir provided, using default Google Drive path: {default_drive_path}")
            data_root = Path(default_drive_path).expanduser()
        else:
            raise SystemExit("Error: --data_dir must be provided when not running in Colab.")
    else:
        data_root = Path(args.data_dir).expanduser()

    output_dir = Path(args.output_dir).expanduser()

    build_dataset(
        data_root=data_root,
        output_dir=output_dir,
        patterns=args.patterns,
        max_side=args.max_side,
        min_confidence=args.min_confidence,
        static_image_mode=args.static_image_mode,
        train_ratio=args.train_ratio,
        val_ratio=args.val_ratio,
        test_ratio=args.test_ratio,
        seed=args.seed,
    )


if __name__ == "__main__":
    main()


No --data_dir provided, using default Google Drive path: /content/drive/MyDrive/Duke University/CV-Group6/asl_alphabet_train


Extracting train: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 61001/61001 [8:35:08<00:00,  1.97it/s]
Extracting val: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 17428/17428 [2:29:21<00:00,  1.94it/s]
Extracting test: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 8715/8715 [1:18:43<00:00,  1.85it/s]


{
  "data_root": "/content/drive/MyDrive/Duke University/CV-Group6/asl_alphabet_train",
  "output_dir": "/content/drive/MyDrive/Duke University/CV-Group6",
  "labels": [
    "A",
    "B",
    "C",
    "D",
    "E",
    "F",
    "G",
    "H",
    "I",
    "J",
    "K",
    "L",
    "M",
    "N",
    "O",
    "P",
    "Q",
    "R",
    "S",
    "T",
    "U",
    "V",
    "W",
    "X",
    "Y",
    "Z",
    "del",
    "nothing",
    "space"
  ],
  "counts_after_extraction": {
    "train": 46808,
    "val": 13360,
    "test": 6663
  },
  "splits": {
    "train_ratio": 0.7,
    "val_ratio": 0.2,
    "test_ratio": 0.1
  },
  "feature_dim": 42,
  "normalization": "center by mean(x,y), divide by max-abs; z removed"
}


In [None]:
# Train MLP classifier on ASL landmark vectors (Colab-friendly version) Uses SciKit
import argparse
import json
from pathlib import Path
import numpy as np
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import f1_score
import matplotlib.pyplot as plt
import itertools
import joblib

# ‚úÖ Detect if running in Google Colab
try:
    from google.colab import drive
    IN_COLAB = True
except ImportError:
    IN_COLAB = False


def load_npz(npz_path: Path):
    """Load X (features) and y (labels) from a .npz file."""
    data = np.load(npz_path)
    return data["X"], data["y"]


def plot_confusion_matrix(cm, class_names, out_path: Path, title: str = "Confusion Matrix"):
    """Save confusion matrix as an image file."""
    fig = plt.figure(figsize=(8, 8))
    ax = plt.gca()
    im = ax.imshow(cm, interpolation="nearest", cmap="Blues")
    ax.set_title(title)
    plt.colorbar(im)

    tick_marks = range(len(class_names))
    ax.set_xticks(tick_marks)
    ax.set_yticks(tick_marks)
    ax.set_xticklabels(class_names, rotation=90)
    ax.set_yticklabels(class_names)

    fmt = "d"
    thresh = cm.max() / 2 if cm.size else 0
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        ax.text(
            j, i, format(cm[i, j], fmt),
            ha="center",
            color="white" if cm[i, j] > thresh else "black"
        )

    ax.set_ylabel("True Label")
    ax.set_xlabel("Predicted Label")
    plt.tight_layout()
    out_path = Path(out_path)
    out_path.parent.mkdir(parents=True, exist_ok=True)
    plt.savefig(out_path, dpi=150, bbox_inches="tight")
    plt.close(fig)


def main():
    parser = argparse.ArgumentParser(
        description="Train ASL classifier on Mediapipe landmark vectors"
    )

    # üîß Default paths assume everything is on Google Drive when in Colab
    if IN_COLAB:
        base = "/content/drive/MyDrive/Duke University/CV-Group6"
        default_data_dir = f"{base}"
        default_model_out = f"{base}/asl_mlp_model.joblib"
        default_val_cm_out = f"{base}/asl_confusion_matrix_val.png"
        default_test_cm_out = f"{base}/asl_confusion_matrix_test.png"
    else:
        # Fallback defaults for local runs
        default_data_dir = "/content/drive/MyDrive/Duke University/CV-Group6"
        default_model_out = "model.joblib"
        default_val_cm_out = "confusion_matrix_val.png"
        default_test_cm_out = "confusion_matrix_test.png"

    parser.add_argument(
        "--data_dir",
        type=str,
        default=default_data_dir,
        help="Folder with train.npz, val.npz, test.npz, labels.json",
    )
    parser.add_argument(
        "--model_out",
        type=str,
        default=default_model_out,
        help="Where to save the trained model (.joblib)",
    )
    parser.add_argument(
        "--val_cm_out",
        type=str,
        default=default_val_cm_out,
        help="Where to save the validation confusion matrix image (.png)",
    )
    parser.add_argument(
        "--test_cm_out",
        type=str,
        default=default_test_cm_out,
        help="Where to save the test confusion matrix image (.png)",
    )

    # üß† Fix argparse for Colab (avoid parsing notebook/kernel args)
    if IN_COLAB:
        args = parser.parse_args(args=[])
    else:
        args = parser.parse_args()

    data_dir = Path(args.data_dir)
    model_out = Path(args.model_out)
    val_cm_out = Path(args.val_cm_out)
    test_cm_out = Path(args.test_cm_out)

    print(f"üìÇ Using data directory: {data_dir}")
    print(f"üíæ Model will be saved to: {model_out}")
    print(f"üìä Val confusion matrix ‚Üí {val_cm_out}")
    print(f"üìä Test confusion matrix ‚Üí {test_cm_out}")

    # ---------- Load datasets ----------
    X_train, y_train = load_npz(data_dir / "train.npz")
    X_val, y_val     = load_npz(data_dir / "val.npz")
    X_test, y_test   = load_npz(data_dir / "test.npz")

    # Load label names (e.g., A, B, C, ..., space)
    with open(data_dir / "labels.json", "r", encoding="utf-8") as f:
        labels = json.load(f)["labels"]

    print("‚úÖ Data loaded")
    print(f"  Train samples: {len(X_train)}")
    print(f"  Val samples:   {len(X_val)}")
    print(f"  Test samples:  {len(X_test)}")

    #Optimal HP: 'hidden_size': 343, 'num_layers': 2, 'dropout': 0.28319417648906664, 'lr': 0.0017897493066372295, 'batch_size': 64
    # ---------- Create classifier pipeline (scale ‚Üí MLP) ----------
    clf = Pipeline([
        ("scaler", StandardScaler(with_mean=False)),
        ("mlp", MLPClassifier(
            hidden_layer_sizes=(128, 64),
            activation="relu",
            solver="adam",
            learning_rate_init=0.001,
            batch_size=256,
            max_iter=50,
            early_stopping=True,
            n_iter_no_change=5,
            random_state=42,
            verbose=True,
        )),
    ])

    # ---------- Train on TRAIN only ----------
    print("\nüöÄ Training model on TRAIN set...")
    clf.fit(X_train, y_train)

    # ---------- Evaluate on VALIDATION set ----------
    print("\nüîé Evaluating on VALIDATION set...")
    y_val_pred = clf.predict(X_val)
    val_acc = accuracy_score(y_val, y_val_pred)
    print("\n==============================")
    print(f"‚úÖ Validation Accuracy: {val_acc:.4f}")
    print("==============================")
    print("\nüìã Validation Classification Report:")
    print(classification_report(y_val, y_val_pred, labels=list(range(len(labels))), target_names=labels))

    cm_val = confusion_matrix(y_val, y_val_pred, labels=list(range(len(labels))))
    plot_confusion_matrix(cm_val, labels, val_cm_out, title="Validation Confusion Matrix")
    print(f"üìä Saved validation confusion matrix to {val_cm_out}")
    # üîπ Macro and weighted F1 for validation
    val_f1_macro   = f1_score(y_val, y_val_pred, average="macro")
    val_f1_weighted = f1_score(y_val, y_val_pred, average="weighted")

    print("\n==============================")
    print(f"‚úÖ Validation Accuracy:   {val_acc:.4f}")
    print(f"‚úÖ Val macro F1:          {val_f1_macro:.4f}")
    print(f"‚úÖ Val weighted F1:       {val_f1_weighted:.4f}")
    print("==============================")

    print("\nüìã Validation Classification Report:")
    print(classification_report(y_val, y_val_pred, labels=list(range(len(labels))), target_names=labels))


    # ---------- Evaluate on TEST set ----------
    print("\nüîé Evaluating on TEST set...")
    y_test_pred = clf.predict(X_test)
    test_acc = accuracy_score(y_test, y_test_pred)
    print("\n==============================")
    print(f"‚úÖ Test Accuracy: {test_acc:.4f}")
    print("==============================")
    print("\nüìã Test Classification Report:")
    print(classification_report(y_test, y_test_pred, labels=list(range(len(labels))), target_names=labels))

    cm_test = confusion_matrix(y_test, y_test_pred, labels=list(range(len(labels))))
    plot_confusion_matrix(cm_test, labels, test_cm_out, title="Test Confusion Matrix")
    print(f"üìä Saved test confusion matrix to {test_cm_out}")
    # üîπ Macro and weighted F1 for test
    test_f1_macro    = f1_score(y_test, y_test_pred, average="macro")
    test_f1_weighted = f1_score(y_test, y_test_pred, average="weighted")

    print("\n==============================")
    print(f"‚úÖ Test Accuracy:         {test_acc:.4f}")
    print(f"‚úÖ Test macro F1:         {test_f1_macro:.4f}")
    print(f"‚úÖ Test weighted F1:      {test_f1_weighted:.4f}")
    print("==============================")

    print("\nüìã Test Classification Report:")
    print(classification_report(y_test, y_test_pred, labels=list(range(len(labels))), target_names=labels))

    # ---------- Save model ----------
    model_out.parent.mkdir(parents=True, exist_ok=True)
    joblib.dump(clf, model_out)
    print(f"üíæ Saved trained model to {model_out}")


if __name__ == "__main__":
    main()


üìÇ Using data directory: /content/drive/MyDrive/Duke University/CV-Group6
üíæ Model will be saved to: /content/drive/MyDrive/Duke University/CV-Group6/asl_mlp_model.joblib
üìä Val confusion matrix ‚Üí /content/drive/MyDrive/Duke University/CV-Group6/asl_confusion_matrix_val.png
üìä Test confusion matrix ‚Üí /content/drive/MyDrive/Duke University/CV-Group6/asl_confusion_matrix_test.png
‚úÖ Data loaded
  Train samples: 46808
  Val samples:   13360
  Test samples:  6663

üöÄ Training model on TRAIN set...
Iteration 1, loss = 0.97324907
Validation score: 0.926725
Iteration 2, loss = 0.24284196
Validation score: 0.953856
Iteration 3, loss = 0.18433105
Validation score: 0.956633
Iteration 4, loss = 0.15702002
Validation score: 0.960692
Iteration 5, loss = 0.14014193
Validation score: 0.961333
Iteration 6, loss = 0.12818126
Validation score: 0.963683
Iteration 7, loss = 0.11819113
Validation score: 0.964537
Iteration 8, loss = 0.11083791
Validation score: 0.968383
Iteration 9, loss = 0.

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


üìä Saved validation confusion matrix to /content/drive/MyDrive/Duke University/CV-Group6/asl_confusion_matrix_val.png

‚úÖ Validation Accuracy:   0.9754
‚úÖ Val macro F1:          0.9742
‚úÖ Val weighted F1:       0.9754

üìã Validation Classification Report:
              precision    recall  f1-score   support

           A       0.98      0.98      0.98       480
           B       0.98      0.99      0.98       474
           C       0.99      0.99      0.99       430
           D       0.98      0.97      0.98       534
           E       0.97      0.99      0.98       506
           F       0.98      0.98      0.98       554
           G       0.99      0.99      0.99       511
           H       0.99      0.97      0.98       523
           I       0.98      0.96      0.97       530
           J       0.96      1.00      0.98       520
           K       0.99      0.97      0.98       542
           L       0.99      0.99      0.99       532
           M       0.88      0.96 

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


üìä Saved test confusion matrix to /content/drive/MyDrive/Duke University/CV-Group6/asl_confusion_matrix_test.png

‚úÖ Test Accuracy:         0.9742
‚úÖ Test macro F1:         0.9741
‚úÖ Test weighted F1:      0.9742

üìã Test Classification Report:
              precision    recall  f1-score   support

           A       0.97      0.98      0.97       231
           B       1.00      1.00      1.00       253
           C       1.00      1.00      1.00       233
           D       0.98      0.99      0.99       260
           E       0.97      1.00      0.98       249
           F       0.99      0.99      0.99       274
           G       0.99      0.98      0.99       244
           H       0.99      0.97      0.98       256
           I       0.96      0.97      0.97       264
           J       0.96      0.98      0.97       257
           K       0.99      0.98      0.98       274
           L       0.99      0.98      0.98       278
           M       0.91      0.97      0.94  

In [None]:
# Train MLP classifier on ASL landmark vectors (PyTorch version, Colab-friendly)
import argparse
import json
from pathlib import Path
import numpy as np
import itertools
import matplotlib.pyplot as plt
import joblib

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    accuracy_score,
    f1_score,
)

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

# ‚úÖ Detect if running in Google Colab
try:
    from google.colab import drive
    IN_COLAB = True
except ImportError:
    IN_COLAB = False


def load_npz(npz_path: Path):
    """Load X (features) and y (labels) from a .npz file."""
    data = np.load(npz_path)
    return data["X"], data["y"]


def plot_confusion_matrix(cm, class_names, out_path: Path, title: str = "Confusion Matrix"):
    """Save confusion matrix as an image file."""
    fig = plt.figure(figsize=(8, 8))
    ax = plt.gca()
    im = ax.imshow(cm, interpolation="nearest", cmap="Blues")
    ax.set_title(title)
    plt.colorbar(im)

    tick_marks = range(len(class_names))
    ax.set_xticks(tick_marks)
    ax.set_yticks(tick_marks)
    ax.set_xticklabels(class_names, rotation=90)
    ax.set_yticklabels(class_names)

    fmt = "d"
    thresh = cm.max() / 2 if cm.size else 0
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        ax.text(
            j, i, format(cm[i, j], fmt),
            ha="center",
            color="white" if cm[i, j] > thresh else "black"
        )

    ax.set_ylabel("True Label")
    ax.set_xlabel("Predicted Label")
    plt.tight_layout()
    out_path = Path(out_path)
    out_path.parent.mkdir(parents=True, exist_ok=True)
    plt.savefig(out_path, dpi=150, bbox_inches="tight")
    plt.close(fig)


# ---------- PyTorch MLP model ----------
class MLPNet(nn.Module):
    def __init__(self, input_dim, hidden_sizes, num_classes, dropout=0.3):
        super().__init__()
        layers = []
        in_dim = input_dim

        for h in hidden_sizes:
            layers.append(nn.Linear(in_dim, h))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            in_dim = h

        layers.append(nn.Linear(in_dim, num_classes))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        # x: (batch_size, input_dim)
        return self.net(x)


def main():
    parser = argparse.ArgumentParser(
        description="Train ASL classifier on Mediapipe landmark vectors (PyTorch MLP)"
    )

    # üîß Default paths assume everything is on Google Drive when in Colab
    if IN_COLAB:
        base = "/content/drive/MyDrive/Duke University/CV-Group6"
        default_data_dir = f"{base}"
        default_model_out = f"{base}/asl_mlp_torch_model.joblib"
        default_val_cm_out = f"{base}/asl_confusion_matrix_val.png"
        default_test_cm_out = f"{base}/asl_confusion_matrix_test.png"
    else:
        # Fallback defaults for local runs
        default_data_dir = "/content/drive/MyDrive/Duke University/CV-Group6"
        default_model_out = "asl_mlp_torch_model.joblib"
        default_val_cm_out = "confusion_matrix_val.png"
        default_test_cm_out = "confusion_matrix_test.png"

    parser.add_argument(
        "--data_dir",
        type=str,
        default=default_data_dir,
        help="Folder with train.npz, val.npz, test.npz, labels.json",
    )
    parser.add_argument(
        "--model_out",
        type=str,
        default=default_model_out,
        help="Where to save the trained model + scaler (joblib)",
    )
    parser.add_argument(
        "--val_cm_out",
        type=str,
        default=default_val_cm_out,
        help="Where to save the validation confusion matrix image (.png)",
    )
    parser.add_argument(
        "--test_cm_out",
        type=str,
        default=default_test_cm_out,
        help="Where to save the test confusion matrix image (.png)",
    )

    # üß† Fix argparse for Colab (avoid parsing notebook/kernel args)
    if IN_COLAB:
        args = parser.parse_args(args=[])
    else:
        args = parser.parse_args()

    data_dir = Path(args.data_dir)
    model_out = Path(args.model_out)
    val_cm_out = Path(args.val_cm_out)
    test_cm_out = Path(args.test_cm_out)

    print(f"üìÇ Using data directory: {data_dir}")
    print(f"üíæ Model will be saved to: {model_out}")
    print(f"üìä Val confusion matrix ‚Üí {val_cm_out}")
    print(f"üìä Test confusion matrix ‚Üí {test_cm_out}")

    # ---------- Load datasets ----------
    X_train, y_train = load_npz(data_dir / "train.npz")
    X_val, y_val     = load_npz(data_dir / "val.npz")
    X_test, y_test   = load_npz(data_dir / "test.npz")

    # Load label names (e.g., A, B, C, ..., space)
    with open(data_dir / "labels.json", "r", encoding="utf-8") as f:
        labels = json.load(f)["labels"]

    print("‚úÖ Data loaded")
    print(f"  Train samples: {len(X_train)}")
    print(f"  Val samples:   {len(X_val)}")
    print(f"  Test samples:  {len(X_test)}")

    # ---------- Scale features (like before) ----------
    scaler = StandardScaler(with_mean=False)
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled   = scaler.transform(X_val)
    X_test_scaled  = scaler.transform(X_test)

    # ---------- Convert to torch tensors ----------
    X_train_t = torch.from_numpy(X_train_scaled.astype(np.float32))
    y_train_t = torch.from_numpy(y_train.astype(np.int64))

    X_val_t = torch.from_numpy(X_val_scaled.astype(np.float32))
    y_val_t = torch.from_numpy(y_val.astype(np.int64))

    X_test_t = torch.from_numpy(X_test_scaled.astype(np.float32))
    y_test_t = torch.from_numpy(y_test.astype(np.int64))

    # ---------- Datasets & Dataloaders ----------
    # Optimal HP (from Optuna; mapped to PyTorch):
    # 'hidden_size': 343, 'num_layers': 2, 'dropout': 0.283..., 'lr': 0.001789..., 'batch_size': 64
    batch_size = 64

    train_ds = TensorDataset(X_train_t, y_train_t)
    val_ds   = TensorDataset(X_val_t, y_val_t)
    test_ds  = TensorDataset(X_test_t, y_test_t)

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False)
    test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False)

    input_dim   = X_train_t.shape[1]
    num_classes = len(labels)

    # ---------- Set up device ----------
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"üñ•Ô∏è Using device: {device}")

    # ---------- Initialize PyTorch MLP ----------
    hidden_sizes = [343, 343]   # from 'hidden_size' and 'num_layers' in Optuna
    dropout = 0.28319417648906664
    lr = 0.0017897493066372295

    model = MLPNet(
        input_dim=input_dim,
        hidden_sizes=hidden_sizes,
        num_classes=num_classes,
        dropout=dropout,
    ).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    num_epochs = 30
    best_val_f1 = -1.0
    best_state_dict = None

    # ---------- Training loop ----------
    print("\nüöÄ Training PyTorch MLP on TRAIN set...")
    for epoch in range(1, num_epochs + 1):
        model.train()
        running_loss = 0.0
        total = 0

        for xb, yb in train_loader:
            xb = xb.to(device)
            yb = yb.to(device)

            optimizer.zero_grad()
            logits = model(xb)
            loss = criterion(logits, yb)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * xb.size(0)
            total += xb.size(0)

        train_loss = running_loss / total

        # ---- Validation each epoch (for monitoring) ----
        model.eval()
        all_val_preds = []
        all_val_labels = []
        val_loss = 0.0
        val_total = 0

        with torch.no_grad():
            for xb, yb in val_loader:
                xb = xb.to(device)
                yb = yb.to(device)
                logits = model(xb)
                loss = criterion(logits, yb)

                val_loss += loss.item() * xb.size(0)
                val_total += xb.size(0)

                preds = torch.argmax(logits, dim=1)
                all_val_preds.extend(preds.cpu().numpy())
                all_val_labels.extend(yb.cpu().numpy())

        val_loss /= val_total
        val_acc = accuracy_score(all_val_labels, all_val_preds)
        val_f1_macro = f1_score(all_val_labels, all_val_preds, average="macro")

        # Track best model by macro F1
        if val_f1_macro > best_val_f1:
            best_val_f1 = val_f1_macro
            best_state_dict = model.state_dict()

        print(
            f"Epoch [{epoch}/{num_epochs}] "
            f"Train Loss: {train_loss:.4f} | "
            f"Val Loss: {val_loss:.4f} Acc: {val_acc:.4f} Macro F1: {val_f1_macro:.4f}"
        )

    # Load best weights (based on val macro F1)
    if best_state_dict is not None:
        model.load_state_dict(best_state_dict)
        print(f"\n‚úÖ Loaded best model weights (Val macro F1 = {best_val_f1:.4f})")

    # ---------- Final evaluation on VALIDATION set ----------
    print("\nüîé Evaluating on VALIDATION set...")
    model.eval()
    with torch.no_grad():
        val_logits = model(X_val_t.to(device))
        val_preds = torch.argmax(val_logits, dim=1).cpu().numpy()

    val_acc = accuracy_score(y_val, val_preds)
    val_f1_macro   = f1_score(y_val, val_preds, average="macro")
    val_f1_weighted = f1_score(y_val, val_preds, average="weighted")

    print("\n==============================")
    print(f"‚úÖ Validation Accuracy:   {val_acc:.4f}")
    print(f"‚úÖ Val macro F1:          {val_f1_macro:.4f}")
    print(f"‚úÖ Val weighted F1:       {val_f1_weighted:.4f}")
    print("==============================")

    print("\nüìã Validation Classification Report:")
    print(classification_report(y_val, val_preds, labels=list(range(len(labels))), target_names=labels))

    cm_val = confusion_matrix(y_val, val_preds, labels=list(range(len(labels))))
    plot_confusion_matrix(cm_val, labels, val_cm_out, title="Validation Confusion Matrix")
    print(f"üìä Saved validation confusion matrix to {val_cm_out}")

    # ---------- Final evaluation on TEST set ----------
    print("\nüîé Evaluating on TEST set...")
    model.eval()
    with torch.no_grad():
        test_logits = model(X_test_t.to(device))
        test_preds = torch.argmax(test_logits, dim=1).cpu().numpy()

    test_acc = accuracy_score(y_test, test_preds)
    test_f1_macro    = f1_score(y_test, test_preds, average="macro")
    test_f1_weighted = f1_score(y_test, test_preds, average="weighted")

    print("\n==============================")
    print(f"‚úÖ Test Accuracy:         {test_acc:.4f}")
    print(f"‚úÖ Test macro F1:         {test_f1_macro:.4f}")
    print(f"‚úÖ Test weighted F1:      {test_f1_weighted:.4f}")
    print("==============================")

    print("\nüìã Test Classification Report:")
    print(classification_report(y_test, test_preds, labels=list(range(len(labels))), target_names=labels))

    cm_test = confusion_matrix(y_test, test_preds, labels=list(range(len(labels))))
    plot_confusion_matrix(cm_test, labels, test_cm_out, title="Test Confusion Matrix")
    print(f"üìä Saved test confusion matrix to {test_cm_out}")

    # ---------- Save model + scaler ----------
    model_out.parent.mkdir(parents=True, exist_ok=True)
    save_obj = {
        "state_dict": model.state_dict(),
        "input_dim": input_dim,
        "hidden_sizes": hidden_sizes,
        "num_classes": num_classes,
        "dropout": dropout,
        "scaler": scaler,
        "labels": labels,
    }
    joblib.dump(save_obj, model_out)
    print(f"üíæ Saved trained PyTorch MLP + scaler to {model_out}")


if __name__ == "__main__":
    main()


üìÇ Using data directory: /content/drive/MyDrive/Duke University/CV-Group6
üíæ Model will be saved to: /content/drive/MyDrive/Duke University/CV-Group6/asl_mlp_torch_model.joblib
üìä Val confusion matrix ‚Üí /content/drive/MyDrive/Duke University/CV-Group6/asl_confusion_matrix_val.png
üìä Test confusion matrix ‚Üí /content/drive/MyDrive/Duke University/CV-Group6/asl_confusion_matrix_test.png
‚úÖ Data loaded
  Train samples: 46808
  Val samples:   13360
  Test samples:  6663
üñ•Ô∏è Using device: cuda

üöÄ Training PyTorch MLP on TRAIN set...
Epoch [1/30] Train Loss: 0.3372 | Val Loss: 0.1481 Acc: 0.9580 Macro F1: 0.9558
Epoch [2/30] Train Loss: 0.1707 | Val Loss: 0.1278 Acc: 0.9621 Macro F1: 0.9599
Epoch [3/30] Train Loss: 0.1459 | Val Loss: 0.1050 Acc: 0.9686 Macro F1: 0.9664
Epoch [4/30] Train Loss: 0.1325 | Val Loss: 0.1088 Acc: 0.9665 Macro F1: 0.9643
Epoch [5/30] Train Loss: 0.1266 | Val Loss: 0.0968 Acc: 0.9709 Macro F1: 0.9361
Epoch [6/30] Train Loss: 0.1183 | Val Loss: 0.1

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


üìä Saved validation confusion matrix to /content/drive/MyDrive/Duke University/CV-Group6/asl_confusion_matrix_val.png

üîé Evaluating on TEST set...

‚úÖ Test Accuracy:         0.9784
‚úÖ Test macro F1:         0.9777
‚úÖ Test weighted F1:      0.9784

üìã Test Classification Report:
              precision    recall  f1-score   support

           A       0.97      0.97      0.97       231
           B       1.00      1.00      1.00       253
           C       1.00      0.98      0.99       233
           D       0.99      0.98      0.99       260
           E       0.99      0.98      0.98       249
           F       1.00      0.99      0.99       274
           G       0.99      0.99      0.99       244
           H       0.99      0.99      0.99       256
           I       0.97      0.98      0.98       264
           J       0.99      0.99      0.99       257
           K       0.98      0.99      0.99       274
           L       1.00      0.98      0.99       278
        

In [None]:
#LSTM Model from ChatGPT
# ============================================================
# 1. Imports
# ============================================================
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score, classification_report

# ============================================================
# 2. GPU setup (works in Google Colab if GPU is enabled)
#    In Colab: Runtime -> Change runtime type -> Hardware accelerator: GPU
# ============================================================
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print("Using CPU")

# ============================================================
# 3. Dataset class for NPZ files: train.npz / val.npz / test.npz
# ============================================================
class ASLLandmarkDataset(Dataset):
    def __init__(self, npz_path):
        """
        Expects an .npz with:
          - X: (N, 42) where 42 = 21 landmarks * 2 (x,y) coordinates
          - y: (N,)
        """
        data = np.load(npz_path)
        self.X = data["X"]  # shape: (N, 42)
        self.y = data["y"]  # shape: (N,)

        assert len(self.X) == len(self.y), "X and y must have same length"
        # Assertions updated for (N, 42) shape
        assert self.X.ndim == 2, f"X must be 2-dimensional (N, features), but got {self.X.ndim} dimensions"
        assert self.X.shape[1] == 42, f"Expected 42 features per sample, but got {self.X.shape[1]}"

        # Reshape for LSTM: (N, 42) -> (N, T=1, features=42)
        # Each static image is treated as a sequence of length 1
        self.X = self.X[:, np.newaxis, :].astype(np.float32) # Shape becomes (N, 1, 42)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = self.X[idx]              # (T=1, features=42)
        y = self.y[idx]              # scalar label
        x = torch.from_numpy(x)      # float32 tensor
        y = torch.tensor(y, dtype=torch.long)
        return x, y

# ============================================================
# 4. LSTM model definition
# ============================================================
class ASLLSTMClassifier(nn.Module):
    def __init__(self, input_size=42, hidden_size=128, num_layers=2, num_classes=26, dropout=0.3):
        """
        input_size: features per time step (21 landmarks * 2 coords = 42)
        hidden_size: LSTM hidden dimension
        num_layers: number of stacked LSTM layers
        num_classes: number of ASL classes
        """
        super().__init__()

        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0
        )

        self.fc = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size, num_classes)
        )

    def forward(self, x):
        # x: (batch, T, input_size)
        out, (h_n, c_n) = self.lstm(x)
        # h_n: (num_layers, batch, hidden_size)
        last_hidden = h_n[-1]  # (batch, hidden_size)
        logits = self.fc(last_hidden)
        return logits

# ============================================================
# 5. Create datasets and dataloaders for train / val / test
# ============================================================
train_path = "/content/drive/MyDrive/Duke University/CV-Group6/train.npz"
val_path   = "/content/drive/MyDrive/Duke University/CV-Group6/val.npz"
test_path  = "/content/drive/MyDrive/Duke University/CV-Group6/test.npz"

train_dataset = ASLLandmarkDataset(train_path)
val_dataset   = ASLLandmarkDataset(val_path)
test_dataset  = ASLLandmarkDataset(test_path)

# Infer num_classes from training labels
num_classes = len(np.unique(train_dataset.y))
print("Number of classes:", num_classes)

batch_size = 64

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False)
test_loader  = DataLoader(test_dataset,  batch_size=batch_size, shuffle=False)

# ============================================================
# 6. Initialize model, loss, optimizer
# ============================================================
model = ASLLSTMClassifier(
    input_size=42, # Changed from 63 to 42
    hidden_size=128,
    num_layers=2,
    num_classes=num_classes,
    dropout=0.3
).to(device)  # move model to GPU/CPU

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

num_epochs = 30

# ============================================================
# 7. Training + validation loop (with F1 on val)
# ============================================================
for epoch in range(1, num_epochs + 1):
    # ---- Training ----
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for X_batch, y_batch in train_loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)

        optimizer.zero_grad()
        logits = model(X_batch)
        loss = criterion(logits, y_batch)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * X_batch.size(0)
        _, preds = torch.max(logits, dim=1)
        correct += (preds == y_batch).sum().item()
        total += y_batch.size(0)

    train_loss = running_loss / total
    train_acc = correct / total

    # ---- Validation ----
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    all_val_preds = []
    all_val_labels = []

    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            logits = model(X_batch)
            loss = criterion(logits, y_batch)

            val_loss += loss.item() * X_batch.size(0)
            _, preds = torch.max(logits, dim=1)
            val_correct += (preds == y_batch).sum().item()
            val_total += y_batch.size(0)

            all_val_preds.extend(preds.cpu().numpy())
            all_val_labels.extend(y_batch.cpu().numpy())

    val_loss /= val_total
    val_acc = val_correct / val_total

    # F1 scores on validation
    val_f1_macro = f1_score(all_val_labels, all_val_preds, average="macro")
    val_f1_weighted = f1_score(all_val_labels, all_val_preds, average="weighted")

    print(
        f"Epoch [{epoch}/{num_epochs}] "
        f"Train Loss: {train_loss:.4f} Acc: {train_acc:.3f} | "
        f"Val Loss: {val_loss:.4f} Acc: {val_acc:.3f} "
        f"F1(macro): {val_f1_macro:.3f} F1(weighted): {val_f1_weighted:.3f}"
    )

# ============================================================
# 8. Final evaluation on test set (with F1 + report)
# ============================================================
model.eval()
test_loss = 0.0
test_correct = 0
test_total = 0

all_test_preds = []
all_test_labels = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)

        logits = model(X_batch)
        loss = criterion(logits, y_batch)

        test_loss += loss.item() * X_batch.size(0)
        _, preds = torch.max(logits, dim=1)
        test_correct += (preds == y_batch).sum().item()
        test_total += y_batch.size(0)

        all_test_preds.extend(preds.cpu().numpy())
        all_test_labels.extend(y_batch.cpu().numpy())

test_loss /= test_total
test_acc = test_correct / test_total

test_f1_macro = f1_score(all_test_labels, all_test_preds, average="macro")
test_f1_weighted = f1_score(all_test_labels, all_test_preds, average="weighted")

print(f"\nTest Loss: {test_loss:.4f} | Test Acc: {test_acc:.3f}")
print(f"Test F1(macro): {test_f1_macro:.3f} | Test F1(weighted): {test_f1_weighted:.3f}")

print("\nClassification report:\n")
print(classification_report(all_test_labels, all_test_preds))

# ============================================================
# 9. Optional: helper to predict a single sequence
# ============================================================
def predict_single_sequence(model, single_sample_features_np):
    """
    single_sample_features_np: numpy array of shape (42,) for one example (x,y coords)
    returns: predicted class index (int)
    """
    model.eval()
    with torch.no_grad():
        # Reshape (42,) to (1, 1, 42) for batch=1, T=1, input_size=42
        x = torch.from_numpy(single_sample_features_np[np.newaxis, np.newaxis, :]).to(device)
        logits = model(x)
        probs = torch.softmax(logits, dim=1)
        pred = torch.argmax(probs, dim=1).item()
    return pred

# Example usage (uncomment when you have test data loaded):
# You can use a sample from the dataset's raw X array (which has shape (N, 42))
# For example, to predict the first sample:
# example_features = test_dataset.X_raw[0] # Assuming X_raw is the (N,42) array
# print("Predicted class:", predict_single_sequence(model, example_features))

# If you want to use the processed dataset output, you'd access it like:
# example_features_processed = test_dataset.X[0].squeeze().numpy() # This gives (42,)
# print("Predicted class:", predict_single_sequence(model, example_features_processed))

Using GPU: NVIDIA A100-SXM4-80GB
Number of classes: 29
Epoch [1/30] Train Loss: 0.9381 Acc: 0.712 | Val Loss: 0.2808 Acc: 0.915 F1(macro): 0.905 F1(weighted): 0.913
Epoch [2/30] Train Loss: 0.2803 Acc: 0.924 | Val Loss: 0.1855 Acc: 0.946 F1(macro): 0.942 F1(weighted): 0.946
Epoch [3/30] Train Loss: 0.2209 Acc: 0.937 | Val Loss: 0.1585 Acc: 0.953 F1(macro): 0.950 F1(weighted): 0.953
Epoch [4/30] Train Loss: 0.1869 Acc: 0.948 | Val Loss: 0.1478 Acc: 0.954 F1(macro): 0.950 F1(weighted): 0.954
Epoch [5/30] Train Loss: 0.1704 Acc: 0.951 | Val Loss: 0.1265 Acc: 0.961 F1(macro): 0.958 F1(weighted): 0.961
Epoch [6/30] Train Loss: 0.1565 Acc: 0.956 | Val Loss: 0.1172 Acc: 0.965 F1(macro): 0.963 F1(weighted): 0.965
Epoch [7/30] Train Loss: 0.1435 Acc: 0.959 | Val Loss: 0.1124 Acc: 0.967 F1(macro): 0.966 F1(weighted): 0.968
Epoch [8/30] Train Loss: 0.1350 Acc: 0.961 | Val Loss: 0.1083 Acc: 0.968 F1(macro): 0.965 F1(weighted): 0.968
Epoch [9/30] Train Loss: 0.1292 Acc: 0.963 | Val Loss: 0.1088 Acc

In [None]:
#Hyperparameter tuning for MLP
# ==============================
# Hyperparameter tuning: MLP
# ==============================
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import optuna

# ---------- Dataset ----------
class ASLMLPDataset(Dataset):
    def __init__(self, npz_path):
        data = np.load(npz_path)
        self.X = data["X"]     # (N, 42)
        self.y = data["y"]     # (N,)

        # Assertions to ensure data is in the expected (N, 42) format
        assert self.X.ndim == 2, f"X must be 2-dimensional (N, features), but got {self.X.ndim} dimensions"
        assert self.X.shape[1] == 42, f"Expected 42 features per sample, but got {self.X.shape[1]}"
        assert len(self.X) == len(self.y), "X and y must have same length"

        # The X data is already flattened (N, 42) as required for an MLP input
        # No further reshaping needed for self.X

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = torch.from_numpy(self.X[idx])          # (42,)
        y = torch.tensor(self.y[idx], dtype=torch.long)
        return x, y

# ---------- Model ----------
class ASLMLPClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout):
        super().__init__()
        layers = []
        in_dim = input_size

        for i in range(num_layers):
            layers.append(nn.Linear(in_dim, hidden_size))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            in_dim = hidden_size

        layers.append(nn.Linear(in_dim, num_classes))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        # x: (batch, input_size)
        return self.net(x)

# ---------- Setup ----------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using:", device)

train_dataset = ASLMLPDataset("/content/drive/MyDrive/Duke University/CV-Group6/train.npz")
val_dataset   = ASLMLPDataset("/content/drive/MyDrive/Duke University/CV-Group6/val.npz")

input_size  = train_dataset.X.shape[1]
num_classes = len(np.unique(train_dataset.y))

# ---------- Optuna objective ----------
def objective_mlp(trial):
    # Hyperparameters to tune
    hidden_size = trial.suggest_int("hidden_size", 64, 512)
    num_layers  = trial.suggest_int("num_layers", 1, 3)
    dropout     = trial.suggest_float("dropout", 0.1, 0.5)
    lr          = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    batch_size  = trial.suggest_categorical("batch_size", [32, 64, 128])

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False)

    model = ASLMLPClassifier(
        input_size=input_size,
        hidden_size=hidden_size,
        num_layers=num_layers,
        num_classes=num_classes,
        dropout=dropout
    ).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    num_epochs = 5  # keep small for tuning speed

    # ---- Train for a few epochs ----
    for epoch in range(num_epochs):
        model.train()
        for X_batch, y_batch in train_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            optimizer.zero_grad()
            logits = model(X_batch)
            loss = criterion(logits, y_batch)
            loss.backward()
            optimizer.step()

    # ---- Validation loss (objective) ----
    model.eval()
    val_loss = 0.0
    val_total = 0

    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            logits = model(X_batch)
            loss = criterion(logits, y_batch)
            val_loss += loss.item() * X_batch.size(0)
            val_total += X_batch.size(0)

    val_loss /= val_total
    return val_loss  # Optuna will MINIMIZE this

# ---------- Run study ----------
study_mlp = optuna.create_study(direction="minimize")
study_mlp.optimize(objective_mlp, n_trials=20)

print("Best MLP hyperparameters:", study_mlp.best_params)
print("Best MLP validation loss:", study_mlp.best_value)


Using: cuda


[I 2025-12-05 05:14:40,395] A new study created in memory with name: no-name-a05d0446-62fb-4e9e-95fe-50d27eb2f4f4
[I 2025-12-05 05:14:50,745] Trial 0 finished with value: 0.1425960822722423 and parameters: {'hidden_size': 140, 'num_layers': 2, 'dropout': 0.36329921034641965, 'lr': 0.0017858593593998095, 'batch_size': 128}. Best is trial 0 with value: 0.1425960822722423.
[I 2025-12-05 05:14:59,480] Trial 1 finished with value: 0.25707441852344365 and parameters: {'hidden_size': 149, 'num_layers': 3, 'dropout': 0.4125901349768505, 'lr': 0.00019647213314477865, 'batch_size': 64}. Best is trial 0 with value: 0.1425960822722423.
[I 2025-12-05 05:15:11,239] Trial 2 finished with value: 0.134058354906933 and parameters: {'hidden_size': 477, 'num_layers': 1, 'dropout': 0.11528568496839108, 'lr': 0.0005299314203444907, 'batch_size': 32}. Best is trial 2 with value: 0.134058354906933.
[I 2025-12-05 05:15:19,197] Trial 3 finished with value: 0.13388605746189633 and parameters: {'hidden_size': 195

Best MLP hyperparameters: {'hidden_size': 343, 'num_layers': 2, 'dropout': 0.28319417648906664, 'lr': 0.0017897493066372295, 'batch_size': 64}
Best MLP validation loss: 0.09801971552123924


In [None]:
#Hyperparameter Tuning for LSTM
# ==============================
# Hyperparameter tuning: LSTM
# ==============================
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import optuna

# ---------- Dataset ----------
class ASLLSTMDataset(Dataset):
    def __init__(self, npz_path):
        data = np.load(npz_path)
        self.X = data["X"]     # (N, 42)
        self.y = data["y"]     # (N,)

        assert self.X.ndim == 2, f"X must be 2-dimensional (N, features), but got {self.X.ndim} dimensions"
        assert self.X.shape[1] == 42, f"Expected 42 features per sample, but got {self.X.shape[1]}"
        assert len(self.X) == len(self.y), "X and y must have same length"

        # Reshape for LSTM: (N, 42) -> (N, T=1, features=42)
        # Each static image is treated as a sequence of length 1
        self.X = self.X[:, np.newaxis, :].astype(np.float32) # Shape becomes (N, 1, 42)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = torch.from_numpy(self.X[idx])          # (T=1, 42)
        y = torch.tensor(self.y[idx], dtype=torch.long)
        return x, y

# ---------- LSTM Model ----------
class ASLLSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0
        )
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size, num_classes)
        )

    def forward(self, x):
        # x: (batch, T, input_size)
        out, (h_n, c_n) = self.lstm(x)
        last_hidden = h_n[-1]  # (batch, hidden_size)
        return self.fc(last_hidden)

# ---------- Setup ----------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using:", device)

train_dataset = ASLLSTMDataset("/content/drive/MyDrive/Duke University/CV-Group6/train.npz")
val_dataset   = ASLLSTMDataset("/content/drive/MyDrive/Duke University/CV-Group6/val.npz")

# Correctly derive input_size from the dataset's X shape
# self.X has shape (N, 1, 42), so X.shape[2] gives 42
_, _, input_size = train_dataset.X.shape
num_classes = len(np.unique(train_dataset.y))

# ---------- Optuna objective ----------
def objective_lstm(trial):
    # Hyperparameters to tune
    hidden_size = trial.suggest_int("hidden_size", 64, 256)
    num_layers  = trial.suggest_int("num_layers", 1, 3)
    dropout     = trial.suggest_float("dropout", 0.1, 0.5)
    lr          = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    batch_size  = trial.suggest_categorical("batch_size", [32, 64, 128])

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False)

    model = ASLLSTMClassifier(
        input_size=input_size,
        hidden_size=hidden_size,
        num_layers=num_layers,
        num_classes=num_classes,
        dropout=dropout
    ).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    num_epochs = 5  # small for tuning

    # ---- Train for a few epochs ----
    for epoch in range(num_epochs):
        model.train()
        for X_batch, y_batch in train_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            optimizer.zero_grad()
            logits = model(X_batch)
            loss = criterion(logits, y_batch)
            loss.backward()
            optimizer.step()

    # ---- Validation loss (objective) ----
    model.eval()
    val_loss = 0.0
    val_total = 0

    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            logits = model(X_batch)
            loss = criterion(logits, y_batch)
            val_loss += loss.item() * X_batch.size(0)
            val_total += X_batch.size(0)

    val_loss /= val_total
    return val_loss

# ---------- Run study ----------
study_lstm = optuna.create_study(direction="minimize")
study_lstm.optimize(objective_lstm, n_trials=20)

print("Best LSTM hyperparameters:", study_lstm.best_params)
print("Best LSTM validation loss:", study_lstm.best_value)


[I 2025-12-05 05:17:45,203] A new study created in memory with name: no-name-39c5b829-d775-40b2-a578-df6aa29e4c06


Using: cuda


[I 2025-12-05 05:17:57,188] Trial 0 finished with value: 0.14070784345179974 and parameters: {'hidden_size': 65, 'num_layers': 2, 'dropout': 0.3605597572560686, 'lr': 0.009069185165078967, 'batch_size': 64}. Best is trial 0 with value: 0.14070784345179974.
[I 2025-12-05 05:18:07,300] Trial 1 finished with value: 0.12937650104822915 and parameters: {'hidden_size': 68, 'num_layers': 1, 'dropout': 0.27877644347486863, 'lr': 0.001427299955001774, 'batch_size': 64}. Best is trial 1 with value: 0.12937650104822915.
[I 2025-12-05 05:18:14,922] Trial 2 finished with value: 0.19702394620744054 and parameters: {'hidden_size': 126, 'num_layers': 3, 'dropout': 0.2719916277679831, 'lr': 0.0007737335282986118, 'batch_size': 128}. Best is trial 1 with value: 0.12937650104822915.
[I 2025-12-05 05:18:28,434] Trial 3 finished with value: 0.12278247959232153 and parameters: {'hidden_size': 192, 'num_layers': 3, 'dropout': 0.13891388166791696, 'lr': 0.002334004793405831, 'batch_size': 64}. Best is trial 3

Best LSTM hyperparameters: {'hidden_size': 227, 'num_layers': 1, 'dropout': 0.3096817900972546, 'lr': 0.004331507540025564, 'batch_size': 128}
Best LSTM validation loss: 0.10028495022171026


# Predictive Models (CNN/LSTM) without mediapipe processing

In [None]:
!pip install tensorflow_addons

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
import tensorflow_addons as tfa

# =========================
# 0. Mount Google Drive (run this once per Colab session)
# =========================

# Set your dataset folder paths in Drive
train_folder = "/content/drive/MyDrive/Duke University/CV-Group6/train_dataset"   # <-- edit to your path
val_folder   = "/content/drive/MyDrive/Duke University/CV-Group6/val_dataset"     # <-- edit to your path

# =========================
# 1. Load tf.data Datasets saved as tensors
# =========================

train_ds = tf.data.Dataset.load(train_folder)
val_ds   = tf.data.Dataset.load(val_folder)

# At this point, train_ds and val_ds are already batched datasets of (images, labels)
# because you saved them from image_dataset_from_directory with batch_size=32.

# =========================
# 2. Inspect one batch to infer shapes
# =========================

for images, labels in train_ds.take(1):
    print("Image batch shape:", images.shape)
    print("Label batch shape:", labels.shape)
    img_height = images.shape[1]
    img_width  = images.shape[2]
    num_channels = images.shape[3]          # 1 for grayscale, 3 for RGB
    num_classes = labels.shape[-1]          # because labels are one-hot (categorical)
    break

print("img_height:", img_height)
print("img_width:", img_width)
print("num_channels:", num_channels)
print("num_classes:", num_classes)

# =========================
# 3. Prepare datasets (shuffle, cache, prefetch)
# =========================

AUTOTUNE = tf.data.AUTOTUNE

# You can shuffle at the batch level; if you want true example-level shuffle:
# train_ds = train_ds.unbatch().shuffle(10000).batch(32)
train_ds = train_ds.shuffle(1000).cache().prefetch(AUTOTUNE)
val_ds   = val_ds.cache().prefetch(AUTOTUNE)

# =========================
# 4. Data augmentation (automatic rotation)
# =========================

# factor=0.1 ‚âà ¬±10% of 180¬∞ ‚Üí about ¬±18¬∞
# Increase to 0.25 (~¬±45¬∞) if your ASL images can tolerate more rotation.
data_augmentation = keras.Sequential([
    layers.RandomRotation(factor=0.1),
    # You can add more if you want:
    # layers.RandomZoom(0.1),
    # layers.RandomTranslation(0.1, 0.1),
])

# =========================
# 5. Define the CNN model (with rotation)
# =========================

model = models.Sequential([
    layers.Input(shape=(img_height, img_width, num_channels)),

    # üîÅ Apply random rotations during training only
    data_augmentation,

    # Normalize pixels 0‚Äì255 ‚Üí 0‚Äì1
    layers.Rescaling(1./255),

    layers.Conv2D(32, (3, 3), activation="relu", padding="same"),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(64, (3, 3), activation="relu", padding="same"),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(128, (3, 3), activation="relu", padding="same"),
    layers.MaxPooling2D((2, 2)),

    layers.Conv2D(256, (3, 3), activation="relu", padding="same"),
    layers.MaxPooling2D((2, 2)),

    layers.Flatten(),
    layers.Dense(256, activation="relu"),
    layers.Dropout(0.5),
    layers.Dense(num_classes, activation="softmax"),
])

# IMPORTANT: labels are one-hot (because you used label_mode="categorical")
# ‚Üí use categorical_crossentropy
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-3),
    loss="categorical_crossentropy",
    metrics=[
        "accuracy",
        tf.keras.metrics.AUC(
            name="auc_ovr",
            multi_label=True,      # because y_true is one-hot with shape (batch, num_classes)
            from_logits=False
        ),
        tfa.metrics.F1Score(
            num_classes=num_classes,
            average="macro",
            name="f1_macro"
        ),
    ],
)


model.summary()

# =========================
# 6. Train the model
# =========================

epochs = 15
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=epochs,
)

# =========================
# 7. Save the model
# =========================

model_path = "/content/drive/MyDrive/Duke University/CV-Group6/cnn_from_saved_tensors_with_rotation.h5"
model.save(model_path)
print(f"‚úÖ Model saved to {model_path}")

# =========================
# 8. Predict on a single tensor image (from a batch)
# =========================

import numpy as np

def predict_from_tensor(model, img_tensor, class_names=None):
    """
    img_tensor: single image tensor, shape (H, W, C), values 0‚Äì255 or 0‚Äì1
    """
    if img_tensor.ndim == 3:
        img_tensor = np.expand_dims(img_tensor, axis=0)  # (1, H, W, C)

    img_tensor = img_tensor.astype("float32")
    preds = model.predict(img_tensor)
    pred_idx = int(np.argmax(preds[0]))
    confidence = float(np.max(preds[0]))

    if class_names is not None:
        pred_class = class_names[pred_idx]
    else:
        pred_class = pred_idx

    return pred_class, confidence

# Example usage: use one batch from val_ds
# for batch_images, batch_labels in val_ds.take(1):
#     img = batch_images[0].numpy()
#     pred, conf = predict_from_tensor(model, img)
#     print("Predicted class index:", pred, "confidence:", conf)
#     break


# Loading Model and Evaluating Performance on Test Dataset

In [None]:
#Load Model and Evaluate performance on Test dataset
model_path = "/content/drive/MyDrive/Duke University/CV-Group6/cnn_from_saved_tensors_with_rotation.h5"
model = keras.models.load_model(model_path)
print("model succesfully loaded")

In [None]:
test_ds = tf.data.Dataset.load("/content/drive/MyDrive/Duke University/CV-Group6/test_dataset")
# Inspect one batch to get shape & num_classes
for images, labels in test_ds.take(1):
    img_height = images.shape[1]
    img_width  = images.shape[2]
    num_channels = images.shape[3]
    num_classes = labels.shape[-1]   # one-hot labels
    break

AUTOTUNE = tf.data.AUTOTUNE
test_ds = test_ds.cache().prefetch(AUTOTUNE)


In [None]:
#Implement model and evaluate performance
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-3),
    loss="categorical_crossentropy",
    metrics=[
        "accuracy",
        tf.keras.metrics.AUC(
            name="auc_ovr",
            multi_label=True,
            from_logits=False,
        ),
        tfa.metrics.F1Score(
            num_classes=num_classes,
            average="macro",
            name="f1_macro",
        ),
    ],
)

model.summary()
results = model.evaluate(test_ds)
print(dict(zip(model.metrics_names, results)))


In [None]:
#Make prediction on a random Image
