In [None]:
# ---------- Install necessary packages ----------
%pip install numpy pandas opencv-python torch torchvision scikit-learn matplotlib tqdm pillow timm pyyaml joblib


In [4]:
# ---------- Import necessary libraries ----------
import os
from pathlib import Path
import numpy as np
import pandas as pd
import cv2
from tqdm import tqdm
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision import transforms
from torchvision.models import efficientnet_b7, EfficientNet_B7_Weights, inception_v3, Inception_V3_Weights
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

In [5]:
# ---------- Device setup and check GPU availability ----------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.backends.cudnn.benchmark = True
if torch.cuda.is_available():
    print(torch.cuda.get_device_name(0))
    print("CUDA is available! Training on GPU...")
else:
    print("CUDA is not available. Training on CPU...")

CUDA is not available. Training on CPU...


In [None]:
# ---------- config ----------
ROOT = Path(".")  # Project-Code root
CSV_FALL = ROOT / "FallDetectionLabels.csv"
CSV_GEST = ROOT / "HandGestureLabels.csv"

DATASET_DIR = ROOT / "Dataset"
OUT_FALL = ROOT / "Processed Dataset" / "ProcessedFallDetection"
OUT_GEST = ROOT / "Processed Dataset" / "ProcessedHandGesture"

PROCESSED_CSV_FALL = ROOT / "ProcessedFallDetection.csv"
PROCESSED_CSV_GEST = ROOT / "ProcessedHandGesture.csv"

SAMPLE_EVERY = 5
OUT_SIZE = 600  # EfficientNet-B7 scale
OVERWRITE = True
VIDEO_EXTS = [".mp4", ".avi", ".mov", ".mkv", ".MP4", ".AVI", ".MOV", ".MKV"]


# ---------- utils ----------
def ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)


def resolve_video_path(rel_path: str) -> Path:
    """
    rel_path may be:
      - a file path with extension
      - a file path without extension
      - a directory containing a single video
    Returns absolute path or raises FileNotFoundError.
    """
    p = ROOT / rel_path
    if p.is_file():
        return p

    if p.suffix == "" and p.with_suffix(".mp4").is_file():
        return p.with_suffix(".mp4")

    if p.is_dir():
        # pick first known video file
        for ext in VIDEO_EXTS:
            vids = sorted(p.glob(f"*{ext}"))
            if vids:
                return vids[0]

    # try any matching extension at same parent
    if p.suffix == "":
        for ext in VIDEO_EXTS:
            cand = p.with_suffix(ext)
            if cand.is_file():
                return cand

    raise FileNotFoundError(f"Video not found for entry: {rel_path}")


def pad_to_square(img: np.ndarray) -> np.ndarray:
    h, w = img.shape[:2]
    s = max(h, w)
    top = (s - h) // 2
    bottom = s - h - top
    left = (s - w) // 2
    right = s - w - left
    return cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=0)


def process_video_to_motion_image(video_path: Path, sample_every: int = 5, out_size: int = 600) -> np.ndarray:
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        raise RuntimeError(f"Failed to open {video_path}")

    sampled = []
    idx = 0
    while True:
        ok, frame = cap.read()
        if not ok:
            break
        if idx % sample_every == 0:
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            sampled.append(gray.astype(np.float32))
        idx += 1
    cap.release()

    if len(sampled) < 2:
        raise RuntimeError(f"Not enough sampled frames ({len(sampled)}) in {video_path}")

    # differences with per-frame median threshold
    acc = np.zeros_like(sampled[0], dtype=np.float32)
    prev = sampled[0]
    for cur in sampled[1:]:
        diff = np.abs(cur - prev)
        thr = np.median(diff)
        # keep values above threshold, zero below
        mask = diff >= thr
        acc[mask] += diff[mask]
        prev = cur

    # normalize 0..1 by max, then scale to 0..255 uint8
    m = acc.max()
    if m > 0:
        acc = acc / m
    acc = (acc * 255.0).clip(0, 255).astype(np.uint8)

    # pad to square and resize
    acc = pad_to_square(acc)
    acc = cv2.resize(acc, (out_size, out_size), interpolation=cv2.INTER_AREA)

    # stack to 3 channels
    img3 = np.stack([acc, acc, acc], axis=2)
    return img3


def target_path_for_output(out_root: Path, rel_video_path: str, ext: str = ".png") -> Path:
    """
    Mirrors the path under Dataset into the processed folder to avoid name collisions.
    Example:
      rel_video_path = 'Dataset/HandGesture/CurtainGesture/Hamad/CGH1.mp4'
      -> OUT_GEST / 'HandGesture/CurtainGesture/Hamad/CGH1.png'
    """
    rel = Path(rel_video_path)
    # drop any drive/root artifacts and leading 'Dataset'
    parts = list(rel.parts)
    if parts and parts[0].lower() == "dataset":
        parts = parts[1:]
    rel_no_ext = Path(*parts).with_suffix(ext)
    return out_root / rel_no_ext


def process_table(df: pd.DataFrame, out_root: Path, has_user: bool) -> pd.DataFrame:
    records = []
    failures = []

    for row in tqdm(df.itertuples(index=False), total=len(df)):
        rel_path = getattr(row, "video_path")
        label = getattr(row, "label")
        user_id = getattr(row, "user_id") if has_user else None

        try:
            abs_video = resolve_video_path(rel_path)
            out_img = target_path_for_output(out_root, rel_path, ext=".png")
            ensure_dir(out_img.parent)

            if OVERWRITE or not out_img.exists():
                img = process_video_to_motion_image(abs_video, sample_every=SAMPLE_EVERY, out_size=OUT_SIZE)
                # write PNG
                ok = cv2.imwrite(str(out_img), cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
                if not ok:
                    raise RuntimeError("cv2.imwrite failed")

            rel_img = out_img.relative_to(ROOT).as_posix()
            rec = {"image_path": rel_img, "label": label, "video_path": rel_path}
            if has_user:
                rec["user_id"] = user_id
            records.append(rec)

        except Exception as e:
            failures.append({"video_path": rel_path, "label": label, "error": str(e)})

    if failures:
        fail_csv = out_root / "processing_failures.csv"
        pd.DataFrame(failures).to_csv(fail_csv, index=False)

    return pd.DataFrame(records)


# ---------- main ----------
def main():
    ensure_dir(OUT_FALL)
    ensure_dir(OUT_GEST)

    # Fall dataset
    if CSV_FALL.exists():
        df_fall = pd.read_csv(CSV_FALL)
        # expected columns: video_path, label
        out_fall_df = process_table(df_fall, OUT_FALL, has_user=False)
        out_fall_df.to_csv(PROCESSED_CSV_FALL, index=False)
    else:
        print(f"Missing CSV: {CSV_FALL}")

    # Gesture dataset
    if CSV_GEST.exists():
        df_gest = pd.read_csv(CSV_GEST)
        # expected columns: video_path, label, user_id
        out_gest_df = process_table(df_gest, OUT_GEST, has_user=True)
        out_gest_df.to_csv(PROCESSED_CSV_GEST, index=False)
    else:
        print(f"Missing CSV: {CSV_GEST}")

    print("Done.")


if __name__ == "__main__":
    main()
