## 1) Extract Video Stills

In [1]:
# Section 1) Extract Video Stills

# === Import Libraries ===

from pathlib import Path
import cv2
from google.colab import drive

# === Configuration ===
INPUT_DIR = Path('/content/drive/MyDrive/FreeFuse_Project/Source_Videos/Alloy Personal Training')
OUTPUT_DIR = Path('/content/drive/MyDrive/FreeFuse_Project/Extracted_Stills')
CAPTURE_INTERVAL_S = 3      # seconds between captures
START_TIME_S = 1            # skip first N seconds of each video
VIDEO_EXTS = {'.mp4', '.mov', '.avi'}

# === Functions ===
def mount_drive():
    """Mount Google Drive to /content/drive"""
    print("Mounting Google Drive...")
    drive.mount('/content/drive', force_remount=True)
    print("Drive mounted.")

def extract_stills(input_dir: Path, output_dir: Path, interval_s: float, start_s: float):
    """
    Extract still frames from all videos in input_dir at every interval_s seconds,
    starting after start_s seconds, saving to output_dir.
    """
    output_dir.mkdir(parents=True, exist_ok=True)
    video_files = [f for f in input_dir.iterdir() if f.suffix.lower() in VIDEO_EXTS]

    if not video_files:
        print(f"No videos found in {input_dir}")
        return

    print(f"Found {len(video_files)} videos in {input_dir}\n")
    for idx, video_file in enumerate(video_files, start=1):
        print(f"[{idx}/{len(video_files)}] Processing '{video_file.name}'")
        cap = cv2.VideoCapture(str(video_file))
        if not cap.isOpened():
            print(f"  ✗ Could not open {video_file.name}, skipping.")
            continue

        fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
        start_frame = int(start_s * fps)
        if start_frame > 0:
            cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
            print(f"  → Skipped first {start_s}s ({start_frame} frames)")

        next_capture = start_s
        saved_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            current_s = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000.0
            if current_s >= next_capture:
                mm = int(current_s // 60)
                ss = int(current_s % 60)
                timestamp = f"{mm:02d}{ss:02d}"
                out_name = f"{video_file.stem}_{timestamp}.jpg"
                out_path = output_dir / out_name
                cv2.imwrite(str(out_path), frame)
                print(f"  ✔ Saved frame '{out_name}'")
                saved_count += 1
                next_capture += interval_s

        cap.release()
        print(f"  Completed '{video_file.name}', saved {saved_count} frames.\n")

    print("All videos processed.")

# === Main Execution ===
mount_drive()
extract_stills(INPUT_DIR, OUTPUT_DIR, CAPTURE_INTERVAL_S, START_TIME_S)

Mounting Google Drive...
Mounted at /content/drive
Drive mounted.
Found 18 videos in /content/drive/MyDrive/FreeFuse_Project/Source_Videos/Alloy Personal Training

[1/18] Processing 'Path 3 Focus on Weight Loss and Wellness.mp4'
  → Skipped first 1s (30 frames)
  ✔ Saved frame 'Path 3 Focus on Weight Loss and Wellness_0001.jpg'
  ✔ Saved frame 'Path 3 Focus on Weight Loss and Wellness_0004.jpg'
  ✔ Saved frame 'Path 3 Focus on Weight Loss and Wellness_0007.jpg'
  ✔ Saved frame 'Path 3 Focus on Weight Loss and Wellness_0010.jpg'
  Completed 'Path 3 Focus on Weight Loss and Wellness.mp4', saved 4 frames.

[2/18] Processing 'Path 2 Choice 2 Specialized Group Training Session.mp4'
  → Skipped first 1s (30 frames)
  ✔ Saved frame 'Path 2 Choice 2 Specialized Group Training Session_0001.jpg'
  ✔ Saved frame 'Path 2 Choice 2 Specialized Group Training Session_0004.jpg'
  ✔ Saved frame 'Path 2 Choice 2 Specialized Group Training Session_0007.jpg'
  ✔ Saved frame 'Path 2 Choice 2 Specialized Gr

## 2) Object Detection

### 2a) DETR ResNet v50

In [None]:
from pathlib import Path
import torch
import pandas as pd
from PIL import Image
import os
from transformers import AutoImageProcessor, AutoModelForObjectDetection
from google.colab import drive

# === Configuration ===
STILLS_DIR = Path('/content/drive/MyDrive/FreeFuse_Project/Extracted_Stills')
OUTPUT_CSV = STILLS_DIR / 'draft_annotations.csv'
MODEL_NAME = "facebook/detr-resnet-50"
MAX_OBJS = 8
CONF_THRESH = 0.9

# === Helper Functions ===
def mount_drive():
    """Mount Google Drive."""
    print("Mounting Google Drive...")
    drive.mount('/content/drive', force_remount=True)
    print("Drive mounted.")

def parse_filename(fn: str):
    """
    From image filename of form "<video_id>_<MMSS>.jpg", extract:
      - video_id (str)
      - timestamp_sec (int)
    """
    stem = Path(fn).stem
    parts = stem.rsplit('_', 1)
    if len(parts) != 2 or not parts[1].isdigit():
        return stem, None
    vid, t = parts
    mm = int(t[:2]); ss = int(t[2:])
    return vid, mm*60 + ss

# === Main Processing ===
mount_drive()

# Load model and processor
print(f"Loading model and processor ({MODEL_NAME})...")
processor = AutoImageProcessor.from_pretrained(MODEL_NAME)
model = AutoModelForObjectDetection.from_pretrained(MODEL_NAME).to(
    torch.device("cuda" if torch.cuda.is_available() else "cpu")
)
print("Model ready.\n")

# Gather images
stills = sorted(f for f in STILLS_DIR.iterdir() if f.suffix.lower() in {'.jpg','.jpeg','.png'})
if not stills:
    print(f"No stills found in {STILLS_DIR}")
else:
    print(f"Found {len(stills)} images to annotate.\n")

    annotations = []
    for idx, img_path in enumerate(stills, start=1):
        print(f"[{idx}/{len(stills)}] Processing '{img_path.name}'")
        try:
            img = Image.open(img_path).convert("RGB")
        except Exception as e:
            print(f"  ✗ Failed to open image: {e}")
            continue

        width, height = img.size
        video_id, ts = parse_filename(img_path.name)

        # Prepare tensor
        inputs = processor(images=img, return_tensors="pt").to(model.device)
        with torch.no_grad():
            outputs = model(**inputs)

        # Postprocess
        target_sizes = torch.tensor([[height, width]], device=model.device)
        results = processor.post_process_object_detection(
            outputs, threshold=CONF_THRESH, target_sizes=target_sizes
        )[0]

        # Extract detections
        for j, (score, label_id, box) in enumerate(
            zip(results["scores"], results["labels"], results["boxes"])
        ):
            if j >= MAX_OBJS:
                break
            score_v = score.item()
            # Box coords [xmin, ymin, xmax, ymax] in pixels
            x_min, y_min, x_max, y_max = box.tolist()
            # Normalized
            x_min_norm = x_min / width
            y_min_norm = y_min / height
            x_max_norm = x_max / width
            y_max_norm = y_max / height
            # Derived
            bb_area = int((x_max - x_min) * (y_max - y_min))

            annotations.append({
                'video_id': video_id,
                'image_file_name': img_path.name,
                'timestamp_sec': ts,
                'image_width_px': width,
                'image_height_px': height,
                'frame_id': img_path.stem,
                'object_id': f"{img_path.stem}_obj{j+1}",
                'class_id': label_id.item(),
                'object_name': model.config.id2label[label_id.item()],
                'object_category': 'N/A',
                'x_min_norm': round(x_min_norm, 4),
                'y_min_norm': round(y_min_norm, 4),
                'x_max_norm': round(x_max_norm, 4),
                'y_max_norm': round(y_max_norm, 4),
                'x_min': int(x_min),
                'y_min': int(y_min),
                'x_max': int(x_max),
                'y_max': int(y_max),
                'bb_area_px': bb_area,
                'confidence': round(score_v, 4),
                'frame_type': None,
                'interaction_score': None,
                'review_status': None,
                'reviewer_notes': None
            })

    # Save to CSV
    if annotations:
        print("\nSaving annotations to CSV...")
        df = pd.DataFrame(annotations)
        cols = [
            'video_id','image_file_name','timestamp_sec','image_width_px','image_height_px',
            'frame_id','object_id','class_id','object_name','object_category',
            'x_min_norm','y_min_norm','x_max_norm','y_max_norm',
            'x_min','y_min','x_max','y_max',
            'bb_area_px','confidence',
            'frame_type','interaction_score',
            'review_status','reviewer_notes'
        ]
        df.to_csv(OUTPUT_CSV, index=False, columns=cols)
        print(f"Annotations saved to {OUTPUT_CSV}")
    else:
        print("No detections above threshold.")

### 3) Label Stills

In [None]:
from pathlib import Path
import pandas as pd
import cv2
from google.colab import drive

# === Configuration ===
STILLS_DIR = Path('/content/drive/MyDrive/FreeFuse_Project/Extracted_Stills')
ANNOTATIONS_CSV = STILLS_DIR / 'draft_annotations.csv'
OUTPUT_DIR = Path('/content/drive/MyDrive/FreeFuse_Project/Labeled_Stills')

BOX_COLOR = (0, 255, 0)       # BGR
TEXT_COLOR = (255, 255, 255)  # BGR
BOX_THICKNESS = 2
FONT = cv2.FONT_HERSHEY_SIMPLEX
FONT_SCALE = 0.6
LINE_TYPE = cv2.LINE_AA

# === Helper Functions ===
def mount_drive():
    """Mount Google Drive."""
    print("Mounting Google Drive...")
    drive.mount('/content/drive', force_remount=True)
    print("Drive mounted.")

def load_annotations(csv_path: Path) -> pd.DataFrame:
    """Load and validate annotation CSV."""
    if not csv_path.exists():
        raise FileNotFoundError(f"Annotation file not found: {csv_path}")
    df = pd.read_csv(csv_path)
    required_cols = {
        'video_id','image_file_name','timestamp_sec','image_width_px','image_height_px',
        'frame_id','object_id','object_name','object_category',
        'x_min_norm','y_min_norm','x_max_norm','y_max_norm',
        'x_min','y_min','x_max','y_max','bb_area_px','bb_aspect_ratio',
        'confidence'
    }
    missing = required_cols - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns in CSV: {missing}")
    return df

def annotate_and_save(df: pd.DataFrame, stills_dir: Path, output_dir: Path):
    """Draw bounding boxes on images and save annotated copies."""
    output_dir.mkdir(parents=True, exist_ok=True)
    grouped = df.groupby('image_file_name')
    print(f"Found annotations for {len(grouped)} images.")
    for idx, (img_name, group) in enumerate(grouped, start=1):
        print(f"[{idx}/{len(grouped)}] Annotating {img_name}")
        img_path = stills_dir / img_name
        if not img_path.exists():
            print(f"  ✗ Image not found: {img_path}")
            continue
        image = cv2.imread(str(img_path))
        if image is None:
            print(f"  ✗ Failed to load image: {img_path}")
            continue

        # Draw each annotation
        for _, row in group.iterrows():
            x_min = int(row['x_min'])
            y_min = int(row['y_min'])
            x_max = int(row['x_max'])
            y_max = int(row['y_max'])
            label = row['object_name']
            conf = row['confidence']
            # Draw bounding box
            cv2.rectangle(image, (x_min, y_min), (x_max, y_max), BOX_COLOR, BOX_THICKNESS)
            # Text background
            text = f"{label}: {conf:.2f}"
            (w, h), _ = cv2.getTextSize(text, FONT, FONT_SCALE, 1)
            cv2.rectangle(image, (x_min, y_min - h - 4), (x_min + w, y_min), BOX_COLOR, -1)
            # Text overlay
            cv2.putText(image, text, (x_min, y_min - 2), FONT, FONT_SCALE, TEXT_COLOR, 1, LINE_TYPE)

        # Save annotated image
        out_name = img_path.stem + '_annotated.jpg'
        out_path = output_dir / out_name
        cv2.imwrite(str(out_path), image)
    print("Annotation of images complete.")

# === Main Execution ===
mount_drive()
try:
    annotations_df = load_annotations(ANNOTATIONS_CSV)
    annotate_and_save(annotations_df, STILLS_DIR, OUTPUT_DIR)
except Exception as e:
    print(f"ERROR: {e}")
