# 02 - Sample frames from face-cropped videos
This notebook samples N frames per video (default 8) from your face-cropped video files (or directories of frames).
No face detection / MTCNN is performed â€” we assume preprocessing already produced face-only videos or folder-of-frames.

Run cell-by-cell. Start with smoke-test (small_test=True), then switch to small_test=False for full extraction.


In [1]:
from pathlib import Path
import cv2
from PIL import Image
import shutil

ROOT = Path.cwd().parent                     # project root (adjust if needed)
SPLITS = {
    "train": ROOT / "data" / "train.txt",
    "val":   ROOT / "data" / "val.txt",
    "test":  ROOT / "data" / "test_internal.txt"
}
OUT_ROOT = ROOT / "preprocessed" / "frames"
FRAMES_PER_VIDEO = 8
JPEG_QUALITY = 100  # 0..100

OUT_ROOT.mkdir(parents=True, exist_ok=True)

In [2]:
def sample_frames_from_video(video_path, out_dir, n=FRAMES_PER_VIDEO):
    out_dir.mkdir(parents=True, exist_ok=True)
    # if already done, skip
    if (out_dir / "frame_00.jpg").exists():
        return "skipped"
    cap = cv2.VideoCapture(str(video_path))
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if total <= 0:
        cap.release()
        return "no_frames"
    indices = [int(i * total / n) for i in range(n)]
    saved = 0
    for i, idx in enumerate(indices):
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret or frame is None:
            continue
        out_p = out_dir / f"frame_{i:02d}.jpg"
        cv2.imwrite(str(out_p), frame, [int(cv2.IMWRITE_JPEG_QUALITY), JPEG_QUALITY])
        saved += 1
    cap.release()
    return saved

In [3]:
def copy_first_images_from_folder(folder_path, out_dir, n=FRAMES_PER_VIDEO):
    out_dir.mkdir(parents=True, exist_ok=True)
    # if already done, skip
    if (out_dir / "frame_00.jpg").exists():
        return "skipped"
    imgs = sorted([p for p in Path(folder_path).glob("*.jpg")] + [p for p in Path(folder_path).glob("*.png")])
    if not imgs:
        return 0
    saved = 0
    for i, p in enumerate(imgs[:n]):
        dst = out_dir / f"frame_{i:02d}.jpg"
        # just copy to keep things simple and preserve image quality (PIL to re-save optional)
        shutil.copy(str(p), str(dst))
        saved += 1
    return saved

In [5]:
# main loop - very simple prints for progress
for split_name, list_path in SPLITS.items():
    if not list_path.exists():
        print(f"[{split_name}] split file not found: {list_path}  (skipping)")
        continue
    print(f"\nProcessing split: {split_name}")
    with open(list_path, "r") as f:
        lines = [l.strip() for l in f if l.strip()]
    #lines = lines[:20]  # limit to first 20 entries for quick testing; remove or adjust as needed
    total = len(lines)
    processed = 0
    skipped = 0
    failed = 0
    for i, pstr in enumerate(lines, 1):
        src = Path(pstr)
        stem = src.stem
        out_dir = OUT_ROOT / split_name / stem
        # if already processed, quick skip
        if (out_dir / "frame_00.jpg").exists():
            skipped += 1
            continue
        # if it's a folder and contains images -> copy first N images
        if src.exists() and src.is_dir():
            # prefer treating it as a folder-of-frames (your preprocessed case)
            imgs = list(src.glob("*.jpg")) + list(src.glob("*.png"))
            if imgs:
                res = copy_first_images_from_folder(src, out_dir, n=FRAMES_PER_VIDEO)
                if res in ("skipped","no_frames"):
                    skipped += 1
                elif res == 0:
                    failed += 1
                else:
                    processed += 1
                continue
            # otherwise, maybe the folder contains a single video file
            vids = sorted(list(src.glob("*.mp4")) + list(src.glob("*.mov")) + list(src.glob("*.mkv")))
            if vids:
                res = sample_frames_from_video(vids[0], out_dir, n=FRAMES_PER_VIDEO)
                if res == "skipped":
                    skipped += 1
                elif res in ("no_frames", 0):
                    failed += 1
                else:
                    processed += 1
                continue
            # nothing usable in that folder -> count as fail
            failed += 1
            continue
        # if it's a file: treat as video
        if src.exists() and src.is_file():
            res = sample_frames_from_video(src, out_dir, n=FRAMES_PER_VIDEO)
            if res == "skipped":
                skipped += 1
            elif res in ("no_frames", 0):
                failed += 1
            else:
                processed += 1
            continue
        # if path doesn't exist: fail
        failed += 1

    print(f"[{split_name}] done. processed: {processed}, skipped: {skipped}, failed: {failed}, total listed: {total}")

print("\nAll splits finished. Check 'preprocessed/frames/<split>/' folders for sampled frames.")


Processing split: train
[train] done. processed: 4046, skipped: 20, failed: 0, total listed: 4066

Processing split: val
[val] done. processed: 741, skipped: 20, failed: 1, total listed: 762

Processing split: test
[test] done. processed: 235, skipped: 20, failed: 0, total listed: 255

All splits finished. Check 'preprocessed/frames/<split>/' folders for sampled frames.
