### Label timestamp on video for easier labelling

In [7]:
!ffmpeg \
  -i /home/yogee/Desktop/boxing_move_predictor/data_raw/sparring_pov/slow_motion_sparring.mp4 \
  -vf "drawtext=fontfile=/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf:\
text='%{pts\:hms}':x=10:y=10:fontsize=24:fontcolor=white:box=1:boxcolor=0x00000099"\
  -c:v libx264 -crf 18 -c:a copy \
  /home/yogee/Desktop/boxing_move_predictor/data_raw/sparring_pov/slow_motion_sparring_with_pts.mp4


ffmpeg version 4.4.2-0ubuntu0.22.04.1+esm7 Copyright (c) 2000-2021 the FFmpeg developers
  built with gcc 11 (Ubuntu 11.4.0-1ubuntu1~22.04)
  configuration: --prefix=/usr --extra-version=0ubuntu0.22.04.1+esm7 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librabbitmq --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --enabl

### Split Video using timestamps

In [1]:
import os
import subprocess
import pandas as pd
from pathlib import Path
from shutil import which

def find_video(video_name, search_dir="."):
    """
    Recursively search for "video_name.mp4" (case‐insensitive) under search_dir.
    Returns the first full path found, or None if not found.
    """
    target = f"{video_name.lower()}.mp4"
    for root, _, files in os.walk(search_dir):
        for f in files:
            if f.lower() == target:
                return os.path.join(root, f)
    return None

def ensure_ffmpeg_exists():
    """Raise an error if ffmpeg/ffprobe are not on PATH."""
    if which("ffmpeg") is None:
        raise RuntimeError("ffmpeg not found. Please install ffmpeg.")
    if which("ffprobe") is None:
        raise RuntimeError("ffprobe not found. Please install ffmpeg (includes ffprobe).")

def timestamp_to_seconds(ts_str):
    """Convert HH:MM:SS to total seconds."""
    h, m, s = map(int, ts_str.split(":"))
    return h*3600 + m*60 + s

def validate_timestamp(ts_str):
    """
    Normalize timestamp:
      - "MM:SS"        -> "00:MM:SS"
      - "HH:MM:00"     -> "00:HH:MM"
      - "HH:MM:SS"     -> unchanged
    """
    ts_str = ts_str.strip()
    parts = ts_str.split(":")
    # MM:SS (2 parts)
    if len(parts) == 2:
        mm, ss = map(int, parts)
        if 0 <= mm < 60 and 0 <= ss < 60:
            return f"00:{mm:02d}:{ss:02d}"
    # HH:MM:SS (3 parts)
    if len(parts) == 3:
        hh, mm, ss = map(int, parts)
        # if ends in :00, assume it was really MM:SS stored as HH:MM:00
        if ss == 0:
            return f"00:{hh:02d}:{mm:02d}"
        # otherwise valid HH:MM:SS
        if hh >= 0 and 0 <= mm < 60 and 0 <= ss < 60:
            return f"{hh:02d}:{mm:02d}:{ss:02d}"
    raise ValueError(f"Invalid timestamp format: '{ts_str}'")

def ffmpeg_cut_segment(input_path, start_ts, end_ts, output_path):
    """
    Reliable cut: seek before input, fixed duration, always re-encode
    with a keyframe at start and proper header.
    """
    start_sec = timestamp_to_seconds(start_ts)
    end_sec   = timestamp_to_seconds(end_ts)
    duration  = end_sec - start_sec
    if duration <= 0:
        raise RuntimeError(f"Invalid segment length: {start_ts} → {end_ts}")

    cmd = [
        "ffmpeg",
        "-hide_banner",
        "-loglevel", "info",
        "-ss",            start_ts,        # seek before input
        "-i",             input_path,
        "-t",             str(duration),   # duration in seconds
        "-c:v",           "libx264",       # re-encode video
        "-preset",        "fast",
        "-crf",           "23",
        "-c:a",           "aac",           # re-encode audio
        "-movflags",      "+faststart",    # header at front
        "-pix_fmt",       "yuv420p",
        "-y",
        output_path
    ]

    print(f"→ Cutting '{input_path}' from {start_ts} for {duration}s → '{output_path}'")
    result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if result.returncode != 0:
        print(result.stderr)
        raise RuntimeError(f"ffmpeg failed with code {result.returncode}")

    size = os.path.getsize(output_path)
    if size < 2000:
        raise RuntimeError(f"Output file too small ({size} bytes)")
    print(f"✔ Created: {output_path} ({size:,} bytes)")

def split_videos_from_sheet(sheet_path, search_dir, output_dir):
    ensure_ffmpeg_exists()

    # read spreadsheet, forcing Start/End as strings
    converters = {
        "Start": lambda x: str(x).strip(),
        "End":   lambda x: str(x).strip()
    }
    if sheet_path.lower().endswith(".csv"):
        df = pd.read_csv(sheet_path, converters=converters)
    else:
        df = pd.read_excel(sheet_path, converters=converters)
    df.columns = df.columns.str.strip()

    required = {"Video Name", "Start", "End", "Label"}
    if not required.issubset(df.columns):
        raise ValueError(f"Missing columns: found {df.columns.tolist()}")

    os.makedirs(output_dir, exist_ok=True)
    success = 0

    for idx, row in df.iterrows():
        try:
            name = str(row["Video Name"]).strip()
            if not name or name.lower() in ("nan", "none"):
                continue
            raw_start = row["Start"]
            raw_end   = row["End"]
            label     = str(row["Label"]).strip().lower().replace(" ", "_")

            print(f"\n[DEBUG] Row {idx+1}: Raw Start='{raw_start}', End='{raw_end}'")
            start_ts = validate_timestamp(raw_start)
            end_ts   = validate_timestamp(raw_end)
            print(f"[DEBUG] Row {idx+1}: Normalized {start_ts} → {end_ts}")

            video_path = find_video(name, search_dir)
            if video_path is None:
                print(f"[!] Video not found for '{name}' (skipping)")
                continue

            label_dir = os.path.join(output_dir, label)
            os.makedirs(label_dir, exist_ok=True)
            out_file = os.path.join(label_dir, f"{name}_{idx+1:03}.mp4")

            ffmpeg_cut_segment(video_path, start_ts, end_ts, out_file)
            success += 1

        except Exception as e:
            print(f"[X] Row {idx+1} failed: {e}")

    total = len(df)
    print(f"\nDone: {success}/{total} segments created.")

if __name__ == "__main__":
    split_videos_from_sheet(
        sheet_path="time_stamps/Boxing_Videos_Timestamp.xlsx",
        search_dir=".",
        output_dir="dataset"
    )



[DEBUG] Row 1: Raw Start='00:29', End='00:31'
[DEBUG] Row 1: Normalized 00:00:29 → 00:00:31
→ Cutting './data_raw/sparring_pov/slow_motion_sparring.mp4' from 00:00:29 for 2s → 'dataset/jab/slow_motion_sparring_001.mp4'
✔ Created: dataset/jab/slow_motion_sparring_001.mp4 (338,861 bytes)

[DEBUG] Row 2: Raw Start='00:32', End='00:35'
[DEBUG] Row 2: Normalized 00:00:32 → 00:00:35
→ Cutting './data_raw/sparring_pov/slow_motion_sparring.mp4' from 00:00:32 for 3s → 'dataset/right_hook/slow_motion_sparring_002.mp4'
✔ Created: dataset/right_hook/slow_motion_sparring_002.mp4 (724,735 bytes)

[DEBUG] Row 3: Raw Start='01:58', End='02:00'
[DEBUG] Row 3: Normalized 00:01:58 → 00:02:00
→ Cutting './data_raw/sparring_pov/slow_motion_sparring.mp4' from 00:01:58 for 2s → 'dataset/jab/slow_motion_sparring_003.mp4'
✔ Created: dataset/jab/slow_motion_sparring_003.mp4 (484,031 bytes)

[DEBUG] Row 4: Raw Start='02:00', End='02:03'
[DEBUG] Row 4: Normalized 00:02:00 → 00:02:03
→ Cutting './data_raw/sparrin

### Dataset with poses

In [None]:
import os
import cv2
import imageio
import numpy as np
import pandas as pd
from ultralytics import YOLO

# ─────────────────────────────────────────────────────────────────────────────
# CONFIGURATION (edit as needed)
# ─────────────────────────────────────────────────────────────────────────────
POSE_MODEL_PATH = "models/yolo11n-pose.pt"   # path to your YOLO-pose .pt file
VIDEO_ROOT      = "dataset"                 # root folder containing subfolders of .mp4 videos
OUTPUT_ROOT     = "dataset_with_poses"      # root where .npy and labels.csv will be saved

# ─────────────────────────────────────────────────────────────────────────────
# Utility Functions
# ─────────────────────────────────────────────────────────────────────────────

def read_frames(path):
    """Read all RGB frames from a video via imageio or OpenCV."""
    try:
        reader = imageio.get_reader(path, "ffmpeg")
        frames = [f for f in reader]
        reader.close()
        if frames:
            return frames
    except Exception:
        pass
    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open video: {path}")
    frames = []
    while True:
        ret, frame_bgr = cap.read()
        if not ret:
            break
        frames.append(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))
    cap.release()
    if not frames:
        raise RuntimeError(f"No frames read from: {path}")
    return frames


def extract_pose_sequence(model, video_path):
    """
    Run pose model on each frame. Returns numpy array shape (num_frames, 2*num_keypoints).
    """
    frames = read_frames(video_path)
    if not frames:
        return None

    # Infer number of keypoints by running on first frame with a detection
    num_kpts = None
    for frame in frames:
        results = model(frame)
        if results and results[0].keypoints is not None:
            kps = results[0].keypoints.xy.cpu().numpy()
            if kps.size:
                num_kpts = kps.shape[1]
                break
    if num_kpts is None:
        raise RuntimeError("No keypoints found; cannot infer count.")

    seq = []
    for frame in frames:
        results = model(frame)
        if results and results[0].keypoints is not None:
            kps = results[0].keypoints.xy.cpu().numpy()
            if kps.size > 0:
                coords = kps[0, :, :2].flatten()
            else:
                coords = np.zeros(num_kpts * 2, dtype=float)
        else:
            coords = np.zeros(num_kpts * 2, dtype=float)
        seq.append(coords)

    return np.stack(seq, axis=0)

# ─────────────────────────────────────────────────────────────────────────────
# Main Extraction: Walk nested folders
# ─────────────────────────────────────────────────────────────────────────────

model = YOLO(POSE_MODEL_PATH)
records = []
os.makedirs(OUTPUT_ROOT, exist_ok=True)

for dirpath, _, files in os.walk(VIDEO_ROOT):
    # skip root itself if videos not directly under
    for fname in files:
        if not fname.lower().endswith('.mp4'):
            continue
        video_path = os.path.join(dirpath, fname)
        # derive label from immediate subfolder under VIDEO_ROOT
        rel_path = os.path.relpath(dirpath, VIDEO_ROOT)
        label = os.path.normpath(rel_path).replace(os.sep, '_')
        clip = os.path.splitext(fname)[0]
        print(f"Processing [{label}] {clip}.mp4")
        try:
            seq = extract_pose_sequence(model, video_path)
        except Exception as e:
            print(f"  ↳ error extracting poses: {e}")
            continue
        if seq is None or seq.size == 0:
            print("  ↳ no pose data: skipping")
            continue

        # save .npy preserving label folder
        out_dir = os.path.join(OUTPUT_ROOT, label)
        os.makedirs(out_dir, exist_ok=True)
        npy_path = os.path.join(out_dir, f"{clip}.npy")
        np.save(npy_path, seq)
        records.append({'npy_path': npy_path, 'label': label})
        print(f"  ↳ saved {npy_path} (shape={seq.shape})")

# Write labels.csv at root
if records:
    df = pd.DataFrame(records)
    csv_file = os.path.join(OUTPUT_ROOT, 'labels.csv')
    df.to_csv(csv_file, index=False)
    print(f"labels.csv written to {csv_file}")
else:
    print("No pose data extracted; check your folder structure and model.")


Processing [block] slow_motion_sparring_009.mp4

0: 384x640 1 person, 9.7ms
Speed: 1.2ms preprocess, 9.7ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 8.9ms
Speed: 1.2ms preprocess, 8.9ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 10.0ms
Speed: 1.6ms preprocess, 10.0ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 9.0ms
Speed: 1.9ms preprocess, 9.0ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 8.9ms
Speed: 1.0ms preprocess, 8.9ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 9.0ms
Speed: 1.4ms preprocess, 9.0ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 8.9ms
Speed: 1.0ms preprocess, 8.9ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 9.2ms
Speed: 1.5ms preprocess, 9.2ms inference, 1.3ms postpr

### Visualize pose estimation on Video

In [17]:
# Batch Overlay from Numpy Pose Data (Notebook-Friendly)

import os
import cv2
import numpy as np
from IPython.display import Video, display

# ─────────────────────────────────────────────────────────────────────────────
# CONFIGURATION
# ─────────────────────────────────────────────────────────────────────────────
VIDEO_ROOT   = "dataset"                        # root folder with label subfolders of .mp4
NPY_ROOT     = "dataset_with_poses"            # root folder with label subfolders of .npy
OUTPUT_ROOT  = "dataset_with_poses_visualization"  # where to save annotated videos
POINT_RADIUS = 4
LINE_THICK   = 2

# COCO skeleton connections (0-indexed keypoints)
SKELETON = [
    (0, 1), (0, 2), (1, 3), (2, 4),  # head
    (0, 5), (0, 6),                    # shoulders
    (5, 7), (7, 9),                    # left arm
    (6, 8), (8, 10),                   # right arm
    (5, 6),                            # across shoulders
    (5, 11), (6, 12),                  # torso sides
    (11, 12),                          # across hips
    (11, 13), (13, 15),                # left leg
    (12, 14), (14, 16)                 # right leg
]

# ─────────────────────────────────────────────────────────────────────────────
# Helper: Annotate one video given its pose array
# ─────────────────────────────────────────────────────────────────────────────
def annotate_video(video_path, poses, output_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open video: {video_path}")
    fps = cap.get(cv2.CAP_PROP_FPS) or 30
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(output_path, fourcc, fps, (w, h))
    num_frames = poses.shape[0]
    num_kpts = poses.shape[1] // 2

    frame_idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if frame_idx < num_frames:
            coords = poses[frame_idx].reshape(num_kpts, 2)
            # draw skeleton
            for i, j in SKELETON:
                x1, y1 = coords[i]
                x2, y2 = coords[j]
                if x1>0 and y1>0 and x2>0 and y2>0 and x1<w and y1<h and x2<w and y2<h:
                    cv2.line(frame, (int(x1),int(y1)), (int(x2),int(y2)), (0,255,0), LINE_THICK)
            # draw points
            for x, y in coords:
                if x>0 and y>0 and x<w and y<h:
                    cv2.circle(frame, (int(x),int(y)), POINT_RADIUS, (0,0,255), -1)
        writer.write(frame)
        frame_idx += 1
    cap.release()
    writer.release()
    return output_path

# ─────────────────────────────────────────────────────────────────────────────
# Batch Processing: Iterate over labels and videos
# ─────────────────────────────────────────────────────────────────────────────
os.makedirs(OUTPUT_ROOT, exist_ok=True)

for label in os.listdir(VIDEO_ROOT):
    vdir = os.path.join(VIDEO_ROOT, label)
    ndir = os.path.join(NPY_ROOT, label)
    odir = os.path.join(OUTPUT_ROOT, label)
    if not os.path.isdir(vdir) or not os.path.isdir(ndir):
        continue
    os.makedirs(odir, exist_ok=True)
    for fname in os.listdir(vdir):
        if not fname.lower().endswith('.mp4'):
            continue
        clip = os.path.splitext(fname)[0]
        video_path = os.path.join(vdir, fname)
        npy_path = os.path.join(ndir, f"{clip}.npy")
        if not os.path.exists(npy_path):
            print(f"No pose data for {clip}, skipping")
            continue
        poses = np.load(npy_path)
        out_video = os.path.join(odir, f"{clip}_annotated.mp4")
        print(f"Annotating {label}/{clip}.mp4 → {out_video}")
        annotate_video(video_path, poses, out_video)

# ─────────────────────────────────────────────────────────────────────────────
# Display one sample annotated video
# ─────────────────────────────────────────────────────────────────────────────
# Find first annotated file
for root, _, files in os.walk(OUTPUT_ROOT):
    for f in files:
        if f.lower().endswith('_annotated.mp4'):
            display(Video(os.path.join(root, f), embed=True, width=600))
            raise SystemExit

print("Batch annotation complete. Check", OUTPUT_ROOT)


Annotating block/slow_motion_sparring_009.mp4 → dataset_with_poses_visualization/block/slow_motion_sparring_009_annotated.mp4
Annotating block/slow_motion_sparring_010.mp4 → dataset_with_poses_visualization/block/slow_motion_sparring_010_annotated.mp4


SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
