In [None]:
!pip -q install mediapipe opencv-python-headless tqdm scikit-learn


In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os, glob, shutil, json, random, math
from pathlib import Path

# === CHANGE THIS to where you uploaded CAER ===
DRIVE_CAER_ZIP_OR_FOLDER = "/content/drive/MyDrive/datasets/CAER"

# Working directory in Colab VM
WORKDIR = "/content/caer_work"
DATA_ROOT = os.path.join(WORKDIR, "CAER")  # where CAER will be extracted

os.makedirs(WORKDIR, exist_ok=True)
os.makedirs(DATA_ROOT, exist_ok=True)

print("DRIVE_CAER_ZIP_OR_FOLDER =", DRIVE_CAER_ZIP_OR_FOLDER)
print("DATA_ROOT               =", DATA_ROOT)


In [None]:
import os, glob

def unzip_if_needed(src_dir, dst_dir):
    # Case A: already-extracted CAER folder exists in Drive
    possible = [
        os.path.join(src_dir, "CAER"),
        os.path.join(src_dir, "caer"),
    ]
    for p in possible:
        if os.path.isdir(p):
            if os.path.exists(dst_dir) and len(os.listdir(dst_dir)) > 0:
                print("DATA_ROOT already has contents; skipping copy.")
                return
            print(f"Copying extracted folder from {p} → {dst_dir}")
            !rsync -a "{p}/" "{dst_dir}/"
            return

    # Case B: single zip
    zips = glob.glob(os.path.join(src_dir, "*.zip"))
    if len(zips) == 1:
        z = zips[0]
        print("Unzipping:", z)
        !unzip -q "{z}" -d "{dst_dir}"
        return

    # Case C: multiple zip parts (example patterns)
    # If your download is split: caer_split.zip, caer_split.z01, ...
    parts = sorted(glob.glob(os.path.join(src_dir, "*.z*")))
    if parts:
        print("Found possible split-zip parts:", parts[:5], "...")
        print("If this is a split zip, you can recombine. Example:")
        print('  !zip -s 0 "caer_split.zip" --out caer.zip')
        print('  !unzip caer.zip -d "{dst_dir}"')
        return

    raise FileNotFoundError(
        "Could not find an extracted CAER folder or a zip in DRIVE_CAER_ZIP_OR_FOLDER. "
        "Upload CAER into that directory first."
    )

unzip_if_needed(DRIVE_CAER_ZIP_OR_FOLDER, DATA_ROOT)

# Print a quick tree preview
for root, dirs, files in os.walk(DATA_ROOT):
    print(root, "dirs:", dirs[:5], "files:", files[:5])
    break


In [None]:
VIDEO_EXTS = {".mp4", ".avi", ".mov", ".mkv"}

# CAER's 7 categories (site lists 7 categories; these are the standard set)
# If your folder names differ (e.g., lowercase), we normalize.
CLASSES = ["angry", "disgust", "fear", "happy", "neutral", "sad", "surprise"]
class_to_idx = {c:i for i,c in enumerate(CLASSES)}

def normalize_label(s: str) -> str:
    s = s.strip().lower()
    mapping = {
        "anger": "angry",
        "angry": "angry",
        "disgust": "disgust",
        "fear": "fear",
        "happy": "happy",
        "neutral": "neutral",
        "sad": "sad",
        "surprise": "surprise",
    }
    return mapping.get(s, s)

def infer_split_from_path(p: str) -> str:
    parts = [x.lower() for x in Path(p).parts]
    if "train" in parts: return "train"
    if "validation" in parts or "val" in parts: return "val"
    if "test" in parts: return "test"
    return "unknown"

def infer_label_from_path(p: str) -> str:
    # Look for a directory name matching a class
    parts = [normalize_label(x) for x in Path(p).parts]
    for x in parts[::-1]:
        if x in class_to_idx:
            return x
    return "unknown"

def collect_videos(data_root: str):
    all_files = []
    for ext in VIDEO_EXTS:
        all_files.extend(glob.glob(os.path.join(data_root, "**", f"*{ext}"), recursive=True))

    items = []
    for f in all_files:
        split = infer_split_from_path(f)
        label = infer_label_from_path(f)
        if split == "unknown" or label == "unknown":
            continue
        items.append((f, split, label, class_to_idx[label]))
    return items

items = collect_videos(DATA_ROOT)
print("Found labeled videos:", len(items))
print("Example:", items[0] if items else None)

from collections import Counter
print("Split counts:", Counter([s for _,s,_,_ in items]))
print("Label counts:", Counter([lab for _,_,lab,_ in items]))


In [None]:
import cv2
import numpy as np
from tqdm import tqdm

import mediapipe as mp
mp_pose = mp.solutions.pose

CACHE_DIR = os.path.join(WORKDIR, "pose_cache")
os.makedirs(CACHE_DIR, exist_ok=True)

# Sequence length and sampling
SEQ_LEN = 32
TARGET_FPS = 10  # sample frames at ~10fps (approx)
MIN_VIS = 0.3    # landmark visibility threshold

def make_cache_path(video_path: str) -> str:
    # stable filename based on path hash
    import hashlib
    h = hashlib.md5(video_path.encode("utf-8")).hexdigest()
    return os.path.join(CACHE_DIR, f"{h}_T{SEQ_LEN}.npy")

def sample_frame_indices(total_frames: int, fps: float, seq_len: int, target_fps: int, train: bool):
    if total_frames <= 0:
        return np.linspace(0, 0, seq_len).astype(int)

    stride = max(int(round(fps / max(target_fps, 1))), 1)
    needed = seq_len * stride

    if train and total_frames >= needed:
        start = random.randint(0, max(total_frames - needed, 0))
        idx = start + np.arange(seq_len) * stride
    else:
        # uniform coverage
        idx = np.linspace(0, total_frames - 1, seq_len).astype(int)

    idx = np.clip(idx, 0, max(total_frames - 1, 0))
    return idx.astype(int)

def extract_pose_sequence(video_path: str, seq_len=SEQ_LEN, target_fps=TARGET_FPS, train=False):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Could not open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps is None or fps <= 1e-3:
        fps = 25.0
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_idxs = sample_frame_indices(total_frames, fps, seq_len, target_fps, train=train)

    # MediaPipe Pose (single-person). For crowded scenes, you may want a person-detector + crop.
    pose = mp_pose.Pose(
        static_image_mode=False,
        model_complexity=1,
        enable_segmentation=False,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5
    )

    seq = np.zeros((seq_len, 33, 3), dtype=np.float32)  # (T, K, [x,y,vis])
    ok_count = 0

    for t, fi in enumerate(frame_idxs):
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(fi))
        ok, frame = cap.read()
        if not ok or frame is None:
            continue

        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        res = pose.process(rgb)

        if res.pose_landmarks is None:
            continue

        lms = res.pose_landmarks.landmark
        for k in range(33):
            seq[t, k, 0] = lms[k].x
            seq[t, k, 1] = lms[k].y
            seq[t, k, 2] = lms[k].visibility
        ok_count += 1

    cap.release()
    pose.close()

    return seq, ok_count

def normalize_skeleton(seq):
    """
    Normalize to reduce camera effects:
    - center at mid-hip
    - scale by shoulder-hip distance (rough body scale)
    """
    seq = seq.copy()
    # indices: left_hip=23, right_hip=24, left_shoulder=11, right_shoulder=12 (MediaPipe Pose)
    hip = (seq[:, 23, :2] + seq[:, 24, :2]) / 2.0
    sh  = (seq[:, 11, :2] + seq[:, 12, :2]) / 2.0
    scale = np.linalg.norm(sh - hip, axis=1, keepdims=True)  # (T,1)
    scale = np.maximum(scale, 1e-3)

    seq[:, :, 0] = (seq[:, :, 0] - hip[:, 0:1]) / scale
    seq[:, :, 1] = (seq[:, :, 1] - hip[:, 1:2]) / scale
    # keep visibility as-is
    return seq


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class CAERSkeletonDataset(Dataset):
    def __init__(self, items, split, cache_dir=CACHE_DIR):
        self.rows = [(vp, y) for (vp, sp, _, y) in items if sp == split]
        self.split = split
        self.cache_dir = cache_dir

    def __len__(self):
        return len(self.rows)

    def __getitem__(self, idx):
        video_path, y = self.rows[idx]
        cache_path = make_cache_path(video_path)

        if os.path.exists(cache_path):
            seq = np.load(cache_path)  # (T,33,3)
        else:
            seq_raw, ok_count = extract_pose_sequence(video_path, train=(self.split=="train"))
            seq = normalize_skeleton(seq_raw)
            np.save(cache_path, seq)

        # flatten keypoints
        x = seq.reshape(seq.shape[0], -1)  # (T, 33*3)

        # Optional: mask out low-visibility points
        # (keeps x stable; simple baseline)
        # vis = seq[:,:,2]  # (T,33)
        # You could incorporate vis into features if desired.

        x = torch.tensor(x, dtype=torch.float32)
        y = torch.tensor(y, dtype=torch.long)
        return x, y

train_ds = CAERSkeletonDataset(items, "train")
val_ds   = CAERSkeletonDataset(items, "val")
test_ds  = CAERSkeletonDataset(items, "test")

BATCH_SIZE = 32

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)
test_loader  = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

print(len(train_ds), len(val_ds), len(test_ds))


In [None]:
import torch.nn as nn
import torch.nn.functional as F

class BiLSTMSkeletonEmotion(nn.Module):
    def __init__(self, in_dim, hidden=256, num_layers=2, num_classes=7, dropout=0.3):
        super().__init__()
        self.proj = nn.Linear(in_dim, hidden)
        self.lstm = nn.LSTM(
            input_size=hidden,
            hidden_size=hidden,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout if num_layers > 1 else 0.0
        )
        self.head = nn.Sequential(
            nn.Linear(hidden * 2, hidden),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden, num_classes)
        )

    def forward(self, x):
        # x: (B,T,D)
        x = self.proj(x)
        out, _ = self.lstm(x)      # (B,T,2H)
        feat = out[:, -1, :]       # last timestep
        return self.head(feat)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BiLSTMSkeletonEmotion(in_dim=33*3, num_classes=len(CLASSES)).to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss()

print("Device:", device)


In [None]:
from sklearn.metrics import accuracy_score

def run_epoch(model, loader, train: bool):
    model.train(train)
    all_preds, all_y = [], []
    total_loss = 0.0

    for x, y in tqdm(loader, leave=False):
        x = x.to(device)
        y = y.to(device)

        if train:
            optimizer.zero_grad(set_to_none=True)

        logits = model(x)
        loss = criterion(logits, y)

        if train:
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

        total_loss += loss.item() * x.size(0)
        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.detach().cpu().tolist())
        all_y.extend(y.detach().cpu().tolist())

    avg_loss = total_loss / len(loader.dataset)
    acc = accuracy_score(all_y, all_preds) if len(all_y) else 0.0
    return avg_loss, acc

best_val = -1.0
best_path = os.path.join(WORKDIR, "best_skeleton_model.pt")

EPOCHS = 10
for epoch in range(1, EPOCHS+1):
    tr_loss, tr_acc = run_epoch(model, train_loader, train=True)
    va_loss, va_acc = run_epoch(model, val_loader, train=False)

    print(f"Epoch {epoch:02d} | train loss {tr_loss:.4f} acc {tr_acc:.4f} | val loss {va_loss:.4f} acc {va_acc:.4f}")

    if va_acc > best_val:
        best_val = va_acc
        torch.save({"model": model.state_dict(), "classes": CLASSES}, best_path)
        print("Saved best →", best_path)


In [None]:
ckpt = torch.load(best_path, map_location=device)
model.load_state_dict(ckpt["model"])

te_loss, te_acc = run_epoch(model, test_loader, train=False)
print(f"TEST | loss {te_loss:.4f} acc {te_acc:.4f}")


In [None]:
def predict_video_emotion(video_path: str):
    model.eval()
    cache_path = make_cache_path(video_path)

    if os.path.exists(cache_path):
        seq = np.load(cache_path)
    else:
        seq_raw, _ = extract_pose_sequence(video_path, train=False)
        seq = normalize_skeleton(seq_raw)

    x = torch.tensor(seq.reshape(seq.shape[0], -1), dtype=torch.float32).unsqueeze(0).to(device)
    with torch.no_grad():
        logits = model(x)
        pred = int(torch.argmax(logits, dim=1).item())
    return CLASSES[pred]

# Example usage:
# some_video = train_ds.rows[0][0]
# print(some_video, "→", predict_video_emotion(some_video))
