In [10]:
import os
import random
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm

# --- Sklearn & Torchvision ---
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score, accuracy_score, classification_report

import torch
from torchvision import datasets, transforms

# --- MediaPipe for landmarks ---
import mediapipe as mp

# ─── 1) Reproducibility ──────────────────────────────────────
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)


# ─── 2) Helper: make image‐list DataFrame ────────────────────
def make_image_df(root_dir, extensions=('jpg', 'jpeg', 'png')):
    recs = []
    for label in sorted(os.listdir(root_dir)):
        d = os.path.join(root_dir, label)
        if not os.path.isdir(d): continue
        for fn in os.listdir(d):
            if fn.lower().endswith(extensions):
                recs.append({'filepath': os.path.join(d, fn),
                             'label': label})
    return pd.DataFrame(recs)


# ─── 3) Build DataFrames & splits ────────────────────────────
df_tv = make_image_df('data/asl_alphabet')
df_test = make_image_df('data/synthetic_test')

df_train, df_dev = train_test_split(
    df_tv, stratify=df_tv['label'],
    test_size=0.2, random_state=seed
)

# ─── 4) Encode labels once globally ─────────────────────────
le = LabelEncoder().fit(df_tv['label'])
y_train = le.transform(df_train['label'])
y_dev = le.transform(df_dev['label'])
y_test = le.transform(df_test['label'])


# ─── 5) Pixel‐based features: flatten → PCA → scale ─────────
def load_flatten_norm(paths, size=(64, 64)):
    arrs = []
    for p in paths:
        img = Image.open(p).convert('L').resize(size)
        arrs.append(np.asarray(img, np.float32).ravel() / 255.0)
    return np.vstack(arrs)


# load & split
train_paths = df_train['filepath'].tolist()
dev_paths = df_dev['filepath'].tolist()
test_paths = df_test['filepath'].tolist()

X_train_pix = load_flatten_norm(train_paths)
X_dev_pix = load_flatten_norm(dev_paths)
X_test_pix = load_flatten_norm(test_paths)

# PCA → 50 dims
pca = PCA(n_components=20, random_state=seed)
X_train_px_p = pca.fit_transform(X_train_pix)
X_dev_px_p = pca.transform(X_dev_pix)
X_test_px_p = pca.transform(X_test_pix)

# scale the PCA space
scaler_pix = StandardScaler().fit(X_train_px_p)
X_train_px_s = scaler_pix.transform(X_train_px_p)
X_dev_px_s = scaler_pix.transform(X_dev_px_p)
X_test_px_s = scaler_pix.transform(X_test_px_p)



In [2]:
# ─── 6) Landmark‐based features: MediaPipe Hands ────────────
mp_hands = mp.solutions.hands.Hands(
    static_image_mode=True,
    max_num_hands=1,
    min_detection_confidence=0.1,
    min_tracking_confidence=0.1,
    model_complexity=0
)

# optional resize transform
tf = transforms.Compose([transforms.Resize((256, 256))])


def extract_landmarks(path, lbl):
    img = Image.open(path).convert("RGB")
    img = tf(img)
    arr = np.array(img, dtype=np.uint8)
    res = mp_hands.process(arr)
    if not res.multi_hand_landmarks:
        vec63 = np.zeros(63, dtype=np.float32)
        conf = 0.0
    else:
        lm = res.multi_hand_landmarks[0]
        pts = np.array([[p.x * arr.shape[1], p.y * arr.shape[0], p.z]
                        for p in lm.landmark], dtype=np.float32)
        x1, y1 = pts[:, 0].min(), pts[:, 1].min()
        x2, y2 = pts[:, 0].max(), pts[:, 1].max()
        bw = max(x2 - x1, 1e-3);
        bh = max(y2 - y1, 1e-3)
        norm = []
        for xpx, ypx, z in pts:
            norm += [(xpx - x1) / bw, (ypx - y1) / bh, z]
        vec63 = np.array(norm, dtype=np.float32)
        conf = float(res.multi_handedness[0].classification[0].score)
    return np.concatenate([vec63, [conf]]), lbl


def build_landmark_features(paths, labels):
    feats = []
    for p, l in tqdm(zip(paths, labels), total=len(paths), desc="Landmarks"):
        feats.append(extract_landmarks(p, l))
    X, y = zip(*feats)
    return np.vstack(X), np.array(y, dtype=int)


# extract for train/dev/test
X_train_lm, _ = build_landmark_features(train_paths, y_train)
X_dev_lm, _ = build_landmark_features(dev_paths, y_dev)
X_test_lm, _ = build_landmark_features(test_paths, y_test)


I0000 00:00:1746900397.496209 12115417 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 89.4), renderer: Apple M4 Pro
Landmarks:   0%|          | 0/69622 [00:00<?, ?it/s]INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
Landmarks: 100%|██████████| 69622/69622 [14:58<00:00, 77.53it/s]
Landmarks: 100%|██████████| 17406/17406 [03:45<00:00, 77.31it/s]
Landmarks: 100%|██████████| 2700/2700 [00:45<00:00, 59.47it/s]


In [12]:

# ─── 7) Concatenate & scale joint 114-dim space ─────────────
X_train_comb = np.hstack([X_train_px_s, X_train_lm])
X_dev_comb = np.hstack([X_dev_px_s, X_dev_lm])
X_test_comb = np.hstack([X_test_px_s, X_test_lm])

scaler_all = StandardScaler().fit(X_train_comb)
X_train_all = scaler_all.transform(X_train_comb)
X_dev_all = scaler_all.transform(X_dev_comb)
X_test_all = scaler_all.transform(X_test_comb)

In [13]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from itertools import product

# ─── 8) Multi-threaded grid search on combined space ─────────
grid_params = {
    'n_estimators': [100, 200, 300],
    'max_depth': [10, 20],
    'max_features': [0.05, 0.1, 0.2],
    'min_samples_split': [2, 4],
}
keys, vals = zip(*grid_params.items())
tasks = [dict(zip(keys, combo)) for combo in product(*vals)]


def worker(params):
    rf = RandomForestClassifier(random_state=seed)
    rf.set_params(**params)
    rf.fit(X_train_all, y_train)
    f1_dev = f1_score(y_dev, rf.predict(X_dev_all), average='macro')
    f1_test = f1_score(y_test, rf.predict(X_test_all), average='macro')
    return params, f1_dev, f1_test


results = []
with ThreadPoolExecutor(max_workers=os.cpu_count() - 1 or 1) as exe:
    futures = [exe.submit(worker, p) for p in tasks]
    for i, fut in enumerate(as_completed(futures)):
        params, f1d, f1t = fut.result()
        print(f"({i / len(futures): 2.2f})  Tested {params} -> Dev F1 = {f1d:.4f}, Test F1 = {f1t:.4f}")
        results.append((f1d, f1t, params))

# ─── 9) Select best on DEV, keep model trained on TRAIN only ───
best_f1, best_f1_test, best_params = max(results, key=lambda x: x[0])
print(f"Best DEV F1 = {best_f1:.4f} with {best_params} Test F1 = {best_f1_test:.4f}")
best_rf = RandomForestClassifier(random_state=seed)
best_rf.set_params(**best_params)
best_rf.fit(X_train_all, y_train)


( 0.00)  Tested {'n_estimators': 100, 'max_depth': 10, 'max_features': 0.05, 'min_samples_split': 2} -> Dev F1 = 0.8626, Test F1 = 0.3737
( 0.03)  Tested {'n_estimators': 100, 'max_depth': 10, 'max_features': 0.05, 'min_samples_split': 4} -> Dev F1 = 0.8614, Test F1 = 0.3691
( 0.06)  Tested {'n_estimators': 100, 'max_depth': 20, 'max_features': 0.05, 'min_samples_split': 4} -> Dev F1 = 0.9743, Test F1 = 0.3522
( 0.08)  Tested {'n_estimators': 100, 'max_depth': 20, 'max_features': 0.05, 'min_samples_split': 2} -> Dev F1 = 0.9748, Test F1 = 0.3517
( 0.11)  Tested {'n_estimators': 100, 'max_depth': 10, 'max_features': 0.1, 'min_samples_split': 2} -> Dev F1 = 0.8649, Test F1 = 0.3745
( 0.14)  Tested {'n_estimators': 100, 'max_depth': 10, 'max_features': 0.1, 'min_samples_split': 4} -> Dev F1 = 0.8648, Test F1 = 0.3828
( 0.17)  Tested {'n_estimators': 200, 'max_depth': 10, 'max_features': 0.05, 'min_samples_split': 2} -> Dev F1 = 0.8638, Test F1 = 0.3673
( 0.19)  Tested {'n_estimators': 100

KeyboardInterrupt: 

In [None]:
for split, X, y in [("Train", X_train_all, y_train),
                    ("Dev", X_dev_all, y_dev),
                    ("Test", X_test_all, y_test)]:
    preds = best_rf.predict(X)
    print(f"\n=== {split} ===")
    print(f"Acc: {accuracy_score(y, preds) * 100:.2f}% | "
          f"F1-macro: {f1_score(y, preds, average='macro') * 100:.2f}%")
    print(classification_report(y, preds,
                                target_names=le.classes_,
                                digits=4))