<a href="https://colab.research.google.com/github/abhigyanpal1/AthleteRise-AI-Powered-Cricket-Analytics/blob/main/Cover_drive_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Step 1 — Install dependencies

In [1]:
!pip -q install opencv-python mediapipe numpy yt-dlp
!pip -q install matplotlib


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m175.5/175.5 kB[0m [31m832.9 kB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.7/35.7 MB[0m [31m28.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m82.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m294.9/294.9 kB[0m [31m17.6 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ydf 0.13.0 requires protobuf<7.0.0,>=5.29.1, but you have protobuf 4.25.8 which is incompatible.
grpcio-status 1.71.2 requires protobuf<6.0dev,>=5.26.1, but you have protobuf 4.25.8 which is incompatible.[0m[31m
[0m

# Step 2 — Download the YouTube Short to input.mp4

In [2]:
INPUT_URL = "https://youtube.com/shorts/vSX3IRxGnNY"

# Download to a fixed filename for convenience
!yt-dlp -o "input.%(ext)s" -f "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4]/best" "$INPUT_URL"

# If the file saved as something else, rename it to input.mp4 for consistency
import glob, os
cands = sorted(glob.glob("input.*"))
if cands and cands[0] != "input.mp4":
    os.rename(cands[0], "input.mp4")

# Confirm
!ls -lh input.mp4


[youtube] Extracting URL: https://youtube.com/shorts/vSX3IRxGnNY
[youtube] vSX3IRxGnNY: Downloading webpage
[youtube] vSX3IRxGnNY: Downloading tv client config
[youtube] vSX3IRxGnNY: Downloading player 6b03aad7-main
[youtube] vSX3IRxGnNY: Downloading tv player API JSON
[youtube] vSX3IRxGnNY: Downloading ios player API JSON
[youtube] vSX3IRxGnNY: Downloading m3u8 information
[info] vSX3IRxGnNY: Downloading 1 format(s): 398+140
[download] Destination: input.f398.mp4
[K[download] 100% of  918.39KiB in [1;37m00:00:00[0m at [0;32m1.64MiB/s[0m
[download] Destination: input.f140.m4a
[K[download] 100% of   73.46KiB in [1;37m00:00:00[0m at [0;32m112.48KiB/s[0m
[Merger] Merging formats into "input.mp4"
Deleting original file input.f140.m4a (pass -k to keep)
Deleting original file input.f398.mp4 (pass -k to keep)
-rw-r--r-- 1 root root 994K Aug 14 05:26 input.mp4


# Step 3 — Write the main script

In [3]:
%%writefile cover_drive_analysis_realtime.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
AthleteRise – AI-Powered Cricket Analytics (Colab-ready)
- Processes a full video (local path or YouTube URL)
- MediaPipe Pose per frame
- Live overlays: elbow, spine, head-over-knee (HOK), front foot direction
- Short ✅/❌ cues
- Final evaluation.json with 5 category scores
"""

import os, sys, json, math, time, argparse
import cv2
import numpy as np

# --- Optional YouTube download inside the script ---
def maybe_download_youtube(url, out_path="input.mp4"):
    try:
        from yt_dlp import YoutubeDL
    except Exception:
        print("[WARN] yt-dlp not installed; pass a local file instead.", file=sys.stderr)
        return None
    ydl_opts = {
        "outtmpl": out_path,
        "format": "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4]/best",
        "quiet": True,
        "noprogress": True,
    }
    with YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=True)
        return out_path if os.path.exists(out_path) else None

# --- MediaPipe Pose ---
try:
    import mediapipe as mp
except Exception as e:
    print("[ERROR] Please install mediapipe: pip install mediapipe", file=sys.stderr)
    raise e

mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
mp_styles  = mp.solutions.drawing_styles

# --- Math helpers ---
def angle_3pts(a, b, c):
    a, b, c = np.array(a, float), np.array(b, float), np.array(c, float)
    if np.any(np.isnan([a,b,c])):
        return np.nan
    ba, bc = a - b, c - b
    denom = (np.linalg.norm(ba) * np.linalg.norm(bc))
    if denom == 0:
        return np.nan
    cosang = np.clip(np.dot(ba, bc) / denom, -1.0, 1.0)
    return np.degrees(np.arccos(cosang))

def vector_angle_deg(u, v):
    u, v = np.array(u, float), np.array(v, float)
    if np.linalg.norm(u) == 0 or np.linalg.norm(v) == 0:
        return np.nan
    cosang = np.clip(np.dot(u, v) / (np.linalg.norm(u) * np.linalg.norm(v)), -1.0, 1.0)
    return np.degrees(np.arccos(cosang))

def safe_get_xy(landmarks, idx, w, h):
    lm = landmarks[idx]
    if lm.visibility is not None and lm.visibility < 0.4:
        return (np.nan, np.nan)
    return (lm.x * w, lm.y * h)

def ema(prev, new, alpha=0.2):
    if prev is None or np.isnan(prev):
        return new
    if np.isnan(new):
        return prev
    return alpha * new + (1 - alpha) * prev

# --- Metric computation ---
def compute_metrics(landmarks, frame_w, frame_h, handedness="right"):
    idx = {
        "L_SH": 11, "R_SH": 12, "L_EL": 13, "R_EL": 14, "L_WR": 15, "R_WR": 16,
        "L_HP": 23, "R_HP": 24, "L_KN": 25, "R_KN": 26, "L_AN": 27, "R_AN": 28,
        "NOSE": 0
    }
    front = "L_" if handedness == "right" else "R_"
    back  = "R_" if handedness == "right" else "L_"

    sh_f = safe_get_xy(landmarks, idx[f"{front}SH"], frame_w, frame_h)
    el_f = safe_get_xy(landmarks, idx[f"{front}EL"], frame_w, frame_h)
    wr_f = safe_get_xy(landmarks, idx[f"{front}WR"], frame_w, frame_h)

    sh_b = safe_get_xy(landmarks, idx[f"{back}SH"], frame_w, frame_h)
    hp_f = safe_get_xy(landmarks, idx[f"{front}HP"], frame_w, frame_h)
    hp_b = safe_get_xy(landmarks, idx[f"{back}HP"], frame_w, frame_h)

    kn_f = safe_get_xy(landmarks, idx[f"{front}KN"], frame_w, frame_h)
    an_f = safe_get_xy(landmarks, idx[f"{front}AN"], frame_w, frame_h)
    head = safe_get_xy(landmarks, idx["NOSE"], frame_w, frame_h)

    elbow_deg = angle_3pts(sh_f, el_f, wr_f)

    hip_mid = ((hp_f[0] + hp_b[0]) / 2.0, (hp_f[1] + hp_b[1]) / 2.0)
    sh_mid  = ((sh_f[0] + sh_b[0]) / 2.0, (sh_f[1] + sh_b[1]) / 2.0)
    spine_vec = (sh_mid[0] - hip_mid[0], sh_mid[1] - hip_mid[1])
    vertical  = (0.0, -1.0)
    spine_lean_deg = vector_angle_deg(spine_vec, vertical)

    head_over_knee_px = np.nan
    if not np.isnan(head[0]) and not np.isnan(kn_f[0]):
        head_over_knee_px = head[0] - kn_f[0]

    foot_angle_deg = np.nan
    if not np.isnan(kn_f[0]) and not np.isnan(an_f[0]):
        foot_vec = (an_f[0] - kn_f[0], an_f[1] - kn_f[1])
        foot_angle_deg = vector_angle_deg(foot_vec, (1.0, 0.0))

    return {
        "elbow_deg": elbow_deg,
        "spine_lean_deg": spine_lean_deg,
        "head_over_knee_px": head_over_knee_px,
        "foot_angle_deg": foot_angle_deg,
        "missing": {
            "elbow": np.isnan(elbow_deg),
            "spine": np.isnan(spine_lean_deg),
            "hok": np.isnan(head_over_knee_px),
            "foot": np.isnan(foot_angle_deg),
        }
    }

def live_feedback(metrics, w, handedness):
    cues = []
    if not np.isnan(metrics["elbow_deg"]):
        cues.append("✅ Good elbow elevation" if 95 <= metrics["elbow_deg"] <= 160 else "❌ Elbow angle off")
    if not np.isnan(metrics["spine_lean_deg"]):
        cues.append("✅ Balanced spine lean" if 8 <= metrics["spine_lean_deg"] <= 38 else "❌ Spine lean suboptimal")
    if not np.isnan(metrics["head_over_knee_px"]):
        band = 0.03 * w
        dx = metrics["head_over_knee_px"]
        cues.append("✅ Head over front knee" if -band <= dx <= band else "❌ Head not over front knee")
    if not np.isnan(metrics["foot_angle_deg"]):
        cues.append("✅ Front foot opens to pitch" if metrics["foot_angle_deg"] <= 35 else "❌ Foot pointing across")
    return cues[:3]

def final_scores(all_metrics, frame_w):
    elbows = np.array([m["elbow_deg"] for m in all_metrics if not np.isnan(m["elbow_deg"])])
    spines = np.array([m["spine_lean_deg"] for m in all_metrics if not np.isnan(m["spine_lean_deg"])])
    hoks   = np.array([m["head_over_knee_px"] for m in all_metrics if not np.isnan(m["head_over_knee_px"])])
    feet   = np.array([m["foot_angle_deg"] for m in all_metrics if not np.isnan(m["foot_angle_deg"])])

    def score_range(arr, low, high, tol=0.0):
        if arr.size == 0: return 5.0
        ok = np.logical_and(arr >= (low - tol), arr <= (high + tol))
        return 3 + 7 * ok.mean()

    foot_score = score_range(fees:=feet, 0, 35, 5) if (fees:=feet).size else 5.0
    if feet.size > 0:
        foot_score = max(1.0, foot_score - np.clip(np.var(np.clip(feet, 0, 90)) / 400.0, 0, 2.0))

    hok_norm = hoks / max(1.0, 0.03 * frame_w) if hoks.size > 0 else np.array([])
    head_score = score_range(hok_norm, -1.0, 1.0, 0.5)
    if hok_norm.size > 0:
        head_score = max(1.0, head_score - np.clip(np.var(hok_norm) / 2.0, 0, 2.0))

    elbow_score = score_range(elbows, 95, 160, 5)
    if elbows.size > 1:
        elbow_score = max(1.0, elbow_score - np.clip(np.mean(np.abs(np.diff(elbows))) / 20.0, 0, 2.0))

    balance_score = score_range(spines, 8, 38, 4)
    if spines.size > 1:
        balance_score = max(1.0, balance_score - np.clip(np.std(spines) / 20.0, 0, 2.0))

    n = len(all_metrics)
    sl = slice(int(n*0.8), n) if n > 0 else slice(0, 0)
    elbows_end = np.array([m["elbow_deg"] for m in all_metrics[sl] if not np.isnan(m["elbow_deg"])])
    spines_end = np.array([m["spine_lean_deg"] for m in all_metrics[sl] if not np.isnan(m["spine_lean_deg"])])
    follow_score = (score_range(elbows_end, 95, 160, 8) + score_range(spines_end, 8, 38, 6)) / 2.0

    def cmt(val, good, ok, pos, neg):
        if val >= good: return f"Excellent {pos}."
        if val >= ok:   return f"Good {pos}, can be more consistent."
        return f"Needs work on {neg}."

    return {
        "Footwork":       {"score": round(float(foot_score), 1),   "comment": cmt(foot_score, 8.5, 7.0, "foot alignment", "plant direction and stability")},
        "Head Position":  {"score": round(float(head_score), 1),   "comment": cmt(head_score, 8.5, 7.0, "head over knee", "keeping head over the front knee")},
        "Swing Control":  {"score": round(float(elbow_score), 1),  "comment": cmt(elbow_score, 8.5, 7.0, "elbow path", "maintaining elbow angle")},
        "Balance":        {"score": round(float(balance_score), 1),"comment": cmt(balance_score, 8.5, 7.0, "spine balance", "overall balance through the shot")},
        "Follow-through": {"score": round(float(follow_score), 1), "comment": cmt(follow_score, 8.5, 7.0, "finish position", "finishing position")},
    }

def put_label(img, text, x, y, scale=0.6, color=(255,255,255), bg=(0,0,0)):
    pad = 4
    (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, scale, 1)
    cv2.rectangle(img, (x, y - th - pad), (x + tw + 2*pad, y + pad), bg, -1)
    cv2.putText(img, text, (x + pad, y), cv2.FONT_HERSHEY_SIMPLEX, scale, color, 1, cv2.LINE_AA)

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--input", required=True, help="YouTube URL or local video path")
    ap.add_argument("--output_dir", default="output")
    ap.add_argument("--resize", type=int, default=960, help="Resize longer side (0=keep)")
    ap.add_argument("--target_fps", type=int, default=0, help="0=use source")
    ap.add_argument("--handedness", choices=["right", "left"], default="right")
    ap.add_argument("--show_skeleton", action="store_true")
    args = ap.parse_args()

    os.makedirs(args.output_dir, exist_ok=True)

    src = args.input
    if src.startswith("http"):
        print("[INFO] Downloading video…")
        dl = maybe_download_youtube(src)
        if not dl:
            print("[ERROR] Could not download; provide a local file.", file=sys.stderr)
            sys.exit(1)
        src = dl

    cap = cv2.VideoCapture(src)
    if not cap.isOpened():
        print(f"[ERROR] Cannot open video: {src}", file=sys.stderr)
        sys.exit(1)

    src_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    src_w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    src_h   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Resize
    if args.resize and max(src_w, src_h) != args.resize:
        scale = (args.resize / float(src_w)) if src_w >= src_h else (args.resize / float(src_h))
        out_w, out_h = int(round(src_w * scale)), int(round(src_h * scale))
    else:
        out_w, out_h = src_w, src_h

    target_fps = args.target_fps if args.target_fps > 0 else src_fps
    frame_interval = max(1, int(round(src_fps / target_fps))) if target_fps < src_fps else 1
    write_fps = min(src_fps, target_fps) if args.target_fps > 0 else src_fps

    out_mp4 = os.path.join(args.output_dir, "annotated_video.mp4")
    fourcc  = cv2.VideoWriter_fourcc(*"mp4v")
    writer  = cv2.VideoWriter(out_mp4, fourcc, write_fps, (out_w, out_h))

    # Fallback to AVI if MP4 fails on this runtime
    if (hasattr(writer, "isOpened") and not writer.isOpened()):
        print("[WARN] mp4v writer failed, falling back to AVI/XVID.")
        out_mp4 = os.path.join(args.output_dir, "annotated_video.avi")
        fourcc  = cv2.VideoWriter_fourcc(*"XVID")
        writer  = cv2.VideoWriter(out_mp4, fourcc, write_fps, (out_w, out_h))

    pose = mp_pose.Pose(
        static_image_mode=False,
        model_complexity=1,
        enable_segmentation=False,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
        smooth_landmarks=True,
    )

    ema_elbow = ema_spine = ema_hok = ema_foot = None
    all_metrics = []
    t0 = time.time()
    frames_read = frames_written = 0

    try:
        while True:
            ok, frame = cap.read()
            if not ok: break
            frames_read += 1

            if frame_interval > 1 and (frames_read % frame_interval != 0):
                continue

            if (frame.shape[1], frame.shape[0]) != (out_w, out_h):
                frame = cv2.resize(frame, (out_w, out_h), interpolation=cv2.INTER_AREA)

            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            res = pose.process(rgb)

            if res.pose_landmarks:
                lms = res.pose_landmarks.landmark
                m = compute_metrics(lms, out_w, out_h, args.handedness)

                ema_elbow = ema(ema_elbow, m["elbow_deg"])
                ema_spine = ema(ema_spine, m["spine_lean_deg"])
                ema_hok   = ema(ema_hok,   m["head_over_knee_px"])
                ema_foot  = ema(ema_foot,  m["foot_angle_deg"])

                all_metrics.append(m)

                if args.show_skeleton:
                    mp_drawing.draw_landmarks(
                        frame, res.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                        landmark_drawing_spec=mp_styles.get_default_pose_landmarks_style()
                    )

                y = 26
                def iv(x):
                    return "nan" if x is None or (isinstance(x, float) and np.isnan(x)) else int(round(x))
                put_label(frame, f"Elbow:  {iv(ema_elbow)}°", 10, y); y+=24
                put_label(frame, f"Spine:  {iv(ema_spine)}°", 10, y); y+=24
                put_label(frame, f"HOK dx: {iv(ema_hok)} px", 10, y); y+=24
                put_label(frame, f"Foot:   {iv(ema_foot)}°", 10, y); y+=28

                for c in live_feedback(m, out_w, args.handedness):
                    put_label(frame, c, 10, y); y += 24
            else:
                put_label(frame, "Pose not detected", 10, 26)

            writer.write(frame)
            frames_written += 1
    finally:
        cap.release()
        writer.release()
        pose.close()

    elapsed = max(1e-6, time.time() - t0)
    avg_fps = frames_written / elapsed
    print(f"[INFO] Frames read: {frames_read}, written: {frames_written}, avg FPS: {avg_fps:.2f}")

    eval_dict = {
        "summary": final_scores(all_metrics, out_w),
        "meta": {
            "frames_read": frames_read,
            "frames_written": frames_written,
            "avg_fps": round(avg_fps, 2),
            "handedness": args.handedness,
            "notes": "Heuristic rules; angles approximate due to 2D pose & camera."
        }
    }
    with open(os.path.join(args.output_dir, "evaluation.json"), "w") as f:
        json.dump(eval_dict, f, indent=2)

    print(f"[OK] Wrote: {out_mp4}")
    print(f"[OK] Wrote: {os.path.join(args.output_dir, 'evaluation.json')}")

if __name__ == "__main__":
    main()


Writing cover_drive_analysis_realtime.py


# Step 4 — Run the script and preview the output video inline

In [8]:
import os, glob, json, subprocess, textwrap

paths = glob.glob("output/annotated_video.*")
print("Found:", paths)
if paths:
    p = paths[0]
    print("Size (MB):", round(os.path.getsize(p)/1e6, 2))
else:
    print("⚠️ No video found — re-run the main script in Step 5 of the notebook.")

# Peek evaluation
if os.path.exists("output/evaluation.json"):
    import json, pprint
    with open("output/evaluation.json") as f:
        meta = json.load(f).get("meta", {})
    print("Eval meta:", meta)


Found: ['output/annotated_video.mp4']
Size (MB): 0.0
Eval meta: {'frames_read': 0, 'frames_written': 0, 'avg_fps': 0.0, 'handedness': 'right', 'notes': 'Heuristic rules; angles approximate due to 2D pose & camera.'}


In [11]:
import cv2, os

def make_writer(path_base, fps, size):
    # Try MP4/H.264 first
    mp4_path = os.path.join(path_base, "annotated_video.mp4")
    fourccs = [cv2.VideoWriter_fourcc(*"avc1"),  # H.264 (may fail on some OpenCV builds)
               cv2.VideoWriter_fourcc(*"mp4v")]  # MPEG-4 Part 2 (usually OK)
    for fcc in fourccs:
        w = cv2.VideoWriter(mp4_path, fcc, fps, size)
        if w.isOpened():
            return w, mp4_path
    # Fallback: AVI/XVID
    avi_path = os.path.join(path_base, "annotated_video.avi")
    w = cv2.VideoWriter(avi_path, cv2.VideoWriter_fourcc(*"XVID"), fps, size)
    return w, avi_path


In [13]:
import os, glob, json

print("Output dir exists:", os.path.isdir("output"))
print("Files in output/:", glob.glob("output/*"))

# If evaluation.json exists, show quick meta to confirm the run happened
if os.path.exists("output/evaluation.json"):
    with open("output/evaluation.json") as f:
        meta = json.load(f).get("meta", {})
    print("Eval meta:", meta)


Output dir exists: True
Files in output/: ['output/annotated_video.mp4', 'output/evaluation.json']
Eval meta: {'frames_read': 0, 'frames_written': 0, 'avg_fps': 0.0, 'handedness': 'right', 'notes': 'Heuristic rules; angles approximate due to 2D pose & camera.'}


In [16]:
import cv2, os

print("OpenCV:", cv2.__version__)

# Run shell command separately (not inside print)
!which ffmpeg || echo "ffmpeg not found"


OpenCV: 4.12.0
/usr/bin/ffmpeg


In [17]:
!pip -q install -U yt-dlp
INPUT_URL = "https://youtube.com/shorts/vSX3IRxGnNY"

# Download and remux to MP4 container explicitly
!yt-dlp -f "bv*+ba/b" --merge-output-format mp4 -o "input.mp4" "$INPUT_URL"

# Confirm non-zero size
import os
assert os.path.exists("input.mp4"), "input.mp4 missing"
sz = os.path.getsize("input.mp4")
print("input.mp4 size (MB):", round(sz/1e6,2))
assert sz > 1e6, "Downloaded file is too small; network hiccup—re-run this cell."


[youtube] Extracting URL: https://youtube.com/shorts/vSX3IRxGnNY
[youtube] vSX3IRxGnNY: Downloading webpage
[youtube] vSX3IRxGnNY: Downloading tv client config
[youtube] vSX3IRxGnNY: Downloading tv player API JSON
[youtube] vSX3IRxGnNY: Downloading ios player API JSON
[youtube] vSX3IRxGnNY: Downloading m3u8 information
[info] vSX3IRxGnNY: Downloading 1 format(s): 398+251
[download] input.mp4 has already been downloaded
input.mp4 size (MB): 1.02


In [18]:
# Install ffmpeg if needed
!apt -y -qq install ffmpeg > /dev/null

# Probe streams
!ffprobe -v error -select_streams v:0 -show_entries stream=codec_name,width,height,r_frame_rate -of default=nokey=1:noprint_wrappers=1 input.mp4




av1
720
1280
60/1


In [19]:
%%writefile cover_drive_analysis_realtime.py
import os, sys, json, time, argparse
import numpy as np, cv2

# ---------- Pose ----------
try:
    import mediapipe as mp
except Exception as e:
    print("[ERROR] Install mediapipe: pip install mediapipe", file=sys.stderr); raise
mp_pose = mp.solutions.pose
mp_draw = mp.solutions.drawing_utils
mp_styles = mp.solutions.drawing_styles

def angle_3pts(a,b,c):
    a,b,c = map(lambda p: np.array(p, float), (a,b,c))
    if np.any(np.isnan([a,b,c])): return np.nan
    ba, bc = a-b, c-b
    d = np.linalg.norm(ba)*np.linalg.norm(bc)
    if d==0: return np.nan
    cosv = np.clip(np.dot(ba,bc)/d, -1, 1)
    return np.degrees(np.arccos(cosv))

def vangle(u,v):
    u,v = np.array(u,float), np.array(v,float)
    if np.linalg.norm(u)==0 or np.linalg.norm(v)==0: return np.nan
    cosv = np.clip(np.dot(u,v)/(np.linalg.norm(u)*np.linalg.norm(v)), -1, 1)
    return np.degrees(np.arccos(cosv))

def vis_xy(lms, i, w, h):
    lm = lms[i]
    if lm.visibility is not None and lm.visibility < 0.4: return (np.nan,np.nan)
    return (lm.x*w, lm.y*h)

def compute_metrics(lms, W, H, handed="right"):
    idx = {"L_SH":11,"R_SH":12,"L_EL":13,"R_EL":14,"L_WR":15,"R_WR":16,
           "L_HP":23,"R_HP":24,"L_KN":25,"R_KN":26,"L_AN":27,"R_AN":28,"NOSE":0}
    front = "L_" if handed=="right" else "R_"
    back  = "R_" if handed=="right" else "L_"
    shf, elf, wrf = vis_xy(lms, idx[f"{front}SH"],W,H), vis_xy(lms, idx[f"{front}EL"],W,H), vis_xy(lms, idx[f"{front}WR"],W,H)
    shb, hpf, hpb = vis_xy(lms, idx[f"{back}SH"],W,H), vis_xy(lms, idx[f"{front}HP"],W,H), vis_xy(lms, idx[f"{back}HP"],W,H)
    knf, anf      = vis_xy(lms, idx[f"{front}KN"],W,H), vis_xy(lms, idx[f"{front}AN"],W,H)
    head          = vis_xy(lms, idx["NOSE"],W,H)

    elbow = angle_3pts(shf, elf, wrf)
    hip_mid = ((hpf[0]+hpb[0])/2.0, (hpf[1]+hpb[1])/2.0)
    sh_mid  = ((shf[0]+shb[0])/2.0, (shf[1]+shb[1])/2.0)
    spine = vangle((sh_mid[0]-hip_mid[0], sh_mid[1]-hip_mid[1]), (0,-1))
    hok = np.nan if np.isnan(head[0]) or np.isnan(knf[0]) else (head[0]-knf[0])
    foot = np.nan
    if not np.isnan(knf[0]) and not np.isnan(anf[0]):
        foot = vangle((anf[0]-knf[0], anf[1]-knf[1]), (1,0))
    return {"elbow":elbow,"spine":spine,"hok":hok,"foot":foot}

def put_label(img, text, x, y, s=0.6, color=(255,255,255), bg=(0,0,0)):
    (tw,th),_ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, s, 1)
    cv2.rectangle(img,(x,y-th-4),(x+tw+8,y+4),bg,-1)
    cv2.putText(img,text,(x+4,y),cv2.FONT_HERSHEY_SIMPLEX,s,color,1,cv2.LINE_AA)

def make_writer(dir_path, fps, size):
    os.makedirs(dir_path, exist_ok=True)
    # Try MP4 (H.264 then mp4v), then AVI (XVID/MJPG)
    tries = [
        ("mp4", "avc1"), ("mp4", "mp4v"),
        ("avi", "XVID"), ("avi", "MJPG"),
    ]
    for ext, four in tries:
        path = os.path.join(dir_path, f"annotated_video.{ext}")
        w = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*four), fps, size)
        if w.isOpened():
            print(f"[INFO] Using {ext.upper()} writer ({four}) → {path}")
            return w, path
    raise RuntimeError("No VideoWriter backend opened. (Codec support missing in this runtime)")

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--input", required=True)
    ap.add_argument("--output_dir", default="output")
    ap.add_argument("--resize", type=int, default=720)
    ap.add_argument("--target_fps", type=int, default=24)
    ap.add_argument("--handedness", choices=["right","left"], default="right")
    ap.add_argument("--show_skeleton", action="store_true")
    args = ap.parse_args()

    cap = cv2.VideoCapture(args.input)
    if not cap.isOpened(): sys.exit(f"[ERROR] Cannot open: {args.input}")

    src_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    W0, H0 = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    if args.resize and max(W0,H0)!=args.resize:
        scale = args.resize/float(W0) if W0>=H0 else args.resize/float(H0)
        W, H = int(round(W0*scale)), int(round(H0*scale))
    else:
        W,H = W0,H0

    write_fps = float(args.target_fps) if args.target_fps>0 else float(src_fps)
    writer, out_path = make_writer(args.output_dir, write_fps, (W,H))

    pose = mp_pose.Pose(static_image_mode=False, model_complexity=1,
                        min_detection_confidence=0.5, min_tracking_confidence=0.5,
                        smooth_landmarks=True)

    frames_read = frames_written = 0
    t0 = time.time()

    try:
        while True:
            ok, frame = cap.read()
            if not ok: break
            frames_read += 1
            if (frame.shape[1], frame.shape[0]) != (W,H):
                frame = cv2.resize(frame, (W,H), interpolation=cv2.INTER_AREA)
            res = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            if res.pose_landmarks:
                if args.show_skeleton:
                    mp_draw.draw_landmarks(frame, res.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                           landmark_drawing_spec=mp_styles.get_default_pose_landmarks_style())
                m = compute_metrics(res.pose_landmarks.landmark, W, H, args.handedness)
                y=26
                def iv(v):
                    return "nan" if v is None or (isinstance(v,float) and np.isnan(v)) else int(round(v))
                for line in (f"Elbow: {iv(m['elbow'])}°", f"Spine: {iv(m['spine'])}°",
                             f"HOK: {iv(m['hok'])} px", f"Foot: {iv(m['foot'])}°"):
                    put_label(frame, line, 10, y); y+=24
            else:
                put_label(frame, "Pose not detected", 10, 26)

            writer.write(frame)
            frames_written += 1
    finally:
        cap.release()
        writer.release()
        pose.close()

    dt = max(1e-6, time.time()-t0)
    fps = frames_written/dt
    print(f"[INFO] frames_read={frames_read} frames_written={frames_written} avg_fps={fps:.2f}")
    if frames_written == 0:
        # Remove empty file to avoid ffmpeg “no stream” confusion
        try: os.remove(out_path)
        except: pass
        raise RuntimeError("0 frames written. Likely VideoWriter codec issue. Try a different codec/resize/fps.")

    # Minimal evaluation stub
    eval_dict = {"meta":{"frames_read":frames_read,"frames_written":frames_written,"avg_fps":round(fps,2)}}
    with open(os.path.join(args.output_dir,"evaluation.json"),"w") as f:
        json.dump(eval_dict, f, indent=2)
    print("[OK] Wrote:", out_path, "and output/evaluation.json")

if __name__ == "__main__":
    main()


Overwriting cover_drive_analysis_realtime.py


In [20]:
!pip -q install mediapipe opencv-python numpy


In [22]:
!apt -y -qq install ffmpeg
!pip -q install -U yt-dlp


ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.


In [23]:
INPUT_URL = "https://youtube.com/shorts/vSX3IRxGnNY"

# Force formats with H.264 (avc1) video + m4a audio, merged to MP4
!yt-dlp \
  -f "bestvideo[ext=mp4][vcodec^=avc1]+bestaudio[ext=m4a]/best[ext=mp4][vcodec^=avc1]" \
  --merge-output-format mp4 \
  -o "input_h264.mp4" \
  "$INPUT_URL"

import os
sz = os.path.getsize("input_h264.mp4") if os.path.exists("input_h264.mp4") else 0
print("input_h264.mp4 size (MB):", round(sz/1e6,2))
assert sz > 1e6, "Download too small; re-run this cell (YouTube served a tiny stub)."


[youtube] Extracting URL: https://youtube.com/shorts/vSX3IRxGnNY
[youtube] vSX3IRxGnNY: Downloading webpage
[youtube] vSX3IRxGnNY: Downloading tv client config
[youtube] vSX3IRxGnNY: Downloading player 6b03aad7-player_es5_vflset_en_US_base
[youtube] vSX3IRxGnNY: Downloading tv player API JSON
[youtube] vSX3IRxGnNY: Downloading ios player API JSON
[youtube] vSX3IRxGnNY: Downloading m3u8 information
[info] vSX3IRxGnNY: Downloading 1 format(s): 298+140
[download] Destination: input_h264.f298.mp4
[K[download] 100% of    1.79MiB in [1;37m00:00:00[0m at [0;32m2.83MiB/s[0m
[download] Destination: input_h264.f140.m4a
[K[download] 100% of   73.46KiB in [1;37m00:00:00[0m at [0;32m245.80KiB/s[0m
[Merger] Merging formats into "input_h264.mp4"
Deleting original file input_h264.f298.mp4 (pass -k to keep)
Deleting original file input_h264.f140.m4a (pass -k to keep)
input_h264.mp4 size (MB): 1.95


In [24]:
# Verify codec
!ffprobe -v error -select_streams v:0 -show_entries stream=codec_name -of csv=p=0 input_h264.mp4

# If this prints anything other than "h264", re-encode:
codec = !ffprobe -v error -select_streams v:0 -show_entries stream=codec_name -of csv=p=0 input_h264.mp4
if not codec or codec[0].strip().lower() != "h264":
    print("Re-encoding to H.264 for OpenCV compatibility…")
    !ffmpeg -y -i input_h264.mp4 -c:v libx264 -pix_fmt yuv420p -r 30 -c:a aac input_h264_fixed.mp4
    # Replace input file with fixed one
    import os
    os.replace("input_h264_fixed.mp4", "input_h264.mp4")
    !ffprobe -v error -select_streams v:0 -show_entries stream=codec_name -of csv=p=0 input_h264.mp4


h264


In [25]:
!mkdir -p output
!python cover_drive_analysis_realtime.py \
  --input input_h264.mp4 \
  --resize 720 \
  --target_fps 24 \
  --handedness right \
  --show_skeleton \
  --output_dir output


2025-08-14 05:39:08.013986: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1755149948.038067    4449 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1755149948.045053    4449 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1755149948.062755    4449 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1755149948.062814    4449 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1755149948.062819    4449 computation_placer.cc:177] computation placer alr

In [26]:
# Convert (or just re-mux) to ensure HTML5 playback in Colab
!ffmpeg -y -i output/annotated_video.mp4 -vcodec libx264 -pix_fmt yuv420p -movflags +faststart -acodec aac output/annotated_video_fixed.mp4

from IPython.display import Video
Video("output/annotated_video_fixed.mp4", embed=True, width=720)


ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers
  built with gcc 11 (Ubuntu 11.2.0-19ubuntu1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.22.04.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librabbitmq --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --enable-libvpx --enab

In [30]:
from google.colab import files
files.download("output/annotated_video_fixed.mp4")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Step 5 — View the evaluation.json

In [32]:
%%writefile cover_drive_analysis_realtime.py
import os, sys, json, time, argparse
import numpy as np, cv2

# ---------- Pose ----------
try:
    import mediapipe as mp
except Exception as e:
    print("[ERROR] Install mediapipe: pip install mediapipe", file=sys.stderr); raise
mp_pose = mp.solutions.pose
mp_draw = mp.solutions.drawing_utils
mp_styles = mp.solutions.drawing_styles

def angle_3pts(a,b,c):
    a,b,c = map(lambda p: np.array(p, float), (a,b,c))
    if np.any(np.isnan([a,b,c])): return np.nan
    ba, bc = a-b, c-b
    d = np.linalg.norm(ba)*np.linalg.norm(bc)
    if d==0: return np.nan
    cosv = np.clip(np.dot(ba,bc)/d, -1, 1)
    return np.degrees(np.arccos(cosv))

def vangle(u,v):
    u,v = np.array(u,float), np.array(v,float)
    if np.linalg.norm(u)==0 or np.linalg.norm(v)==0: return np.nan
    cosv = np.clip(np.dot(u,v)/(np.linalg.norm(u)*np.linalg.norm(v)), -1, 1)
    return np.degrees(np.arccos(cosv))

def vis_xy(lms, i, w, h):
    lm = lms[i]
    if lm.visibility is not None and lm.visibility < 0.4: return (np.nan,np.nan)
    return (lm.x*w, lm.y*h)

def compute_metrics(lms, W, H, handed="right"):
    idx = {"L_SH":11,"R_SH":12,"L_EL":13,"R_EL":14,"L_WR":15,"R_WR":16,
           "L_HP":23,"R_HP":24,"L_KN":25,"R_KN":26,"L_AN":27,"R_AN":28,"NOSE":0}
    front = "L_" if handed=="right" else "R_"
    back  = "R_" if handed=="right" else "L_"
    shf, elf, wrf = vis_xy(lms, idx[f"{front}SH"],W,H), vis_xy(lms, idx[f"{front}EL"],W,H), vis_xy(lms, idx[f"{front}WR"],W,H)
    shb, hpf, hpb = vis_xy(lms, idx[f"{back}SH"],W,H), vis_xy(lms, idx[f"{front}HP"],W,H), vis_xy(lms, idx[f"{back}HP"],W,H)
    knf, anf      = vis_xy(lms, idx[f"{front}KN"],W,H), vis_xy(lms, idx[f"{front}AN"],W,H)
    head          = vis_xy(lms, idx["NOSE"],W,H)

    elbow = angle_3pts(shf, elf, wrf)
    hip_mid = ((hpf[0]+hpb[0])/2.0, (hpf[1]+hpb[1])/2.0)
    sh_mid  = ((shf[0]+shb[0])/2.0, (shf[1]+shb[1])/2.0)
    spine = vangle((sh_mid[0]-hip_mid[0], sh_mid[1]-hip_mid[1]), (0,-1))
    hok = np.nan if np.isnan(head[0]) or np.isnan(knf[0]) else (head[0]-knf[0])
    foot = np.nan
    if not np.isnan(knf[0]) and not np.isnan(anf[0]):
        foot = vangle((anf[0]-knf[0], anf[1]-knf[1]), (1,0))
    return {"elbow":elbow,"spine":spine,"hok":hok,"foot":foot}

def put_label(img, text, x, y, s=0.6, color=(255,255,255), bg=(0,0,0)):
    (tw,th),_ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, s, 1)
    cv2.rectangle(img,(x,y-th-4),(x+tw+8,y+4),bg,-1)
    cv2.putText(img,text,(x+4,y),cv2.FONT_HERSHEY_SIMPLEX,s,color,1,cv2.LINE_AA)

def make_writer(dir_path, fps, size):
    os.makedirs(dir_path, exist_ok=True)
    tries = [("mp4","mp4v"), ("avi","MJPG"), ("avi","XVID")]
    for ext, four in tries:
        path = os.path.join(dir_path, f"annotated_video.{ext}")
        w = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*four), fps, size)
        if w.isOpened():
            print(f"[INFO] Using {ext.upper()} writer ({four}) → {path}")
            return w, path
    raise RuntimeError("No VideoWriter backend opened.")

# ---- scoring helpers ----
def _score_range(arr, low, high, tol=0.0):
    if arr.size == 0: return 5.0
    ok = np.logical_and(arr >= (low - tol), arr <= (high + tol))
    return 3 + 7 * ok.mean()

def final_scores(all_metrics, frame_w):
    elbows = np.array([m["elbow"] for m in all_metrics if not np.isnan(m["elbow"])])
    spines = np.array([m["spine"] for m in all_metrics if not np.isnan(m["spine"])])
    hoks   = np.array([m["hok"]   for m in all_metrics if not np.isnan(m["hok"])])
    feet   = np.array([m["foot"]  for m in all_metrics if not np.isnan(m["foot"])])

    foot_score = _score_range(feet, 0, 35, 5)
    if feet.size: foot_score = max(1.0, foot_score - np.clip(np.var(np.clip(feet,0,90))/400.0, 0, 2.0))

    hok_norm = hoks / max(1.0, 0.03 * frame_w) if hoks.size else np.array([])
    head_score = _score_range(hok_norm, -1.0, 1.0, 0.5)
    if hok_norm.size: head_score = max(1.0, head_score - np.clip(np.var(hok_norm)/2.0, 0, 2.0))

    elbow_score = _score_range(elbows, 95, 160, 5)
    if elbows.size>1: elbow_score = max(1.0, elbow_score - np.clip(np.mean(np.abs(np.diff(elbows)))/20.0, 0, 2.0))

    balance_score = _score_range(spines, 8, 38, 4)
    if spines.size>1: balance_score = max(1.0, balance_score - np.clip(np.std(spines)/20.0, 0, 2.0))

    n = len(all_metrics)
    end = slice(int(n*0.8), n) if n>0 else slice(0,0)
    elbows_end = np.array([m["elbow"] for m in all_metrics[end] if not np.isnan(m["elbow"])])
    spines_end = np.array([m["spine"] for m in all_metrics[end] if not np.isnan(m["spine"])])
    follow_score = (_score_range(elbows_end, 95, 160, 8) + _score_range(spines_end, 8, 38, 6)) / 2.0

    def cmt(val, good, ok, pos, neg):
        if val >= good: return f"Excellent {pos}."
        if val >= ok:   return f"Good {pos}, can be more consistent."
        return f"Needs work on {neg}."

    return {
        "Footwork":       {"score": round(float(foot_score), 1),   "comment": cmt(foot_score, 8.5, 7.0, "foot alignment", "plant direction and stability")},
        "Head Position":  {"score": round(float(head_score), 1),   "comment": cmt(head_score, 8.5, 7.0, "head over knee", "keeping head over the front knee")},
        "Swing Control":  {"score": round(float(elbow_score), 1),  "comment": cmt(elbow_score, 8.5, 7.0, "elbow path", "maintaining elbow angle")},
        "Balance":        {"score": round(float(balance_score), 1),"comment": cmt(balance_score, 8.5, 7.0, "spine balance", "overall balance through the shot")},
        "Follow-through": {"score": round(float(follow_score), 1), "comment": cmt(follow_score, 8.5, 7.0, "finish position", "finishing position")},
    }

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--input", required=True)
    ap.add_argument("--output_dir", default="output")
    ap.add_argument("--resize", type=int, default=720)
    ap.add_argument("--target_fps", type=int, default=24)
    ap.add_argument("--handedness", choices=["right","left"], default="right")
    ap.add_argument("--show_skeleton", action="store_true")
    args = ap.parse_args()

    cap = cv2.VideoCapture(args.input)
    if not cap.isOpened(): sys.exit(f"[ERROR] Cannot open: {args.input}")

    src_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    W0, H0 = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    if args.resize and max(W0,H0)!=args.resize:
        scale = args.resize/float(W0) if W0>=H0 else args.resize/float(H0)
        W, H = int(round(W0*scale)), int(round(H0*scale))
    else:
        W,H = W0,H0

    write_fps = float(args.target_fps) if args.target_fps>0 else float(src_fps)
    writer, out_path = make_writer(args.output_dir, write_fps, (W,H))

    pose = mp_pose.Pose(static_image_mode=False, model_complexity=1,
                        min_detection_confidence=0.5, min_tracking_confidence=0.5,
                        smooth_landmarks=True)

    frames_read = frames_written = 0
    all_metrics = []
    t0 = time.time()

    try:
        while True:
            ok, frame = cap.read()
            if not ok: break
            frames_read += 1
            if (frame.shape[1], frame.shape[0]) != (W,H):
                frame = cv2.resize(frame, (W,H), interpolation=cv2.INTER_AREA)
            res = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            if res.pose_landmarks:
                if args.show_skeleton:
                    mp_draw.draw_landmarks(frame, res.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                           landmark_drawing_spec=mp_styles.get_default_pose_landmarks_style())
                m = compute_metrics(res.pose_landmarks.landmark, W, H, args.handedness)
                all_metrics.append(m)
                y=26
                def iv(v):
                    return "nan" if v is None or (isinstance(v,float) and np.isnan(v)) else int(round(v))
                for line in (f"Elbow: {iv(m['elbow'])}°", f"Spine: {iv(m['spine'])}°",
                             f"HOK: {iv(m['hok'])} px", f"Foot: {iv(m['foot'])}°"):
                    put_label(frame, line, 10, y); y+=24
            else:
                put_label(frame, "Pose not detected", 10, 26)

            writer.write(frame)
            frames_written += 1
    finally:
        cap.release()
        writer.release()
        pose.close()

    dt = max(1e-6, time.time()-t0)
    fps = frames_written/dt
    if frames_written == 0:
        try: os.remove(out_path)
        except: pass
        raise RuntimeError("0 frames written.")

    # ---- write full evaluation with scores ----
    eval_dict = {
        "summary": final_scores(all_metrics, W),
        "meta": {"frames_read":frames_read, "frames_written":frames_written, "avg_fps":round(fps,2)}
    }
    os.makedirs(args.output_dir, exist_ok=True)
    with open(os.path.join(args.output_dir,"evaluation.json"),"w") as f:
        json.dump(eval_dict, f, indent=2)
    print("[OK] Wrote:", out_path, "and output/evaluation.json")

if __name__ == "__main__":
    main()


Overwriting cover_drive_analysis_realtime.py


In [36]:
%%writefile cover_drive_analysis_realtime.py
import os, sys, json, time, argparse
import numpy as np, cv2

# ---------- Pose ----------
try:
    import mediapipe as mp
except Exception as e:
    print("[ERROR] Install mediapipe: pip install mediapipe", file=sys.stderr); raise
mp_pose = mp.solutions.pose
mp_draw = mp.solutions.drawing_utils
mp_styles = mp.solutions.drawing_styles

def angle_3pts(a,b,c):
    a,b,c = map(lambda p: np.array(p, float), (a,b,c))
    if np.any(np.isnan([a,b,c])): return np.nan
    ba, bc = a-b, c-b
    d = np.linalg.norm(ba)*np.linalg.norm(bc)
    if d==0: return np.nan
    cosv = np.clip(np.dot(ba,bc)/d, -1, 1)
    return np.degrees(np.arccos(cosv))

def vangle(u,v):
    u,v = np.array(u,float), np.array(v,float)
    if np.linalg.norm(u)==0 or np.linalg.norm(v)==0: return np.nan
    cosv = np.clip(np.dot(u,v)/(np.linalg.norm(u)*np.linalg.norm(v)), -1, 1)
    return np.degrees(np.arccos(cosv))

def vis_xy(lms, i, w, h):
    lm = lms[i]
    if getattr(lm, "visibility", 1.0) < 0.4: return (np.nan,np.nan)
    return (lm.x*w, lm.y*h)

def compute_metrics(lms, W, H, handed="right"):
    idx = {"L_SH":11,"R_SH":12,"L_EL":13,"R_EL":14,"L_WR":15,"R_WR":16,
           "L_HP":23,"R_HP":24,"L_KN":25,"R_KN":26,"L_AN":27,"R_AN":28,"NOSE":0}
    front = "L_" if handed=="right" else "R_"
    back  = "R_" if handed=="right" else "L_"
    shf, elf, wrf = vis_xy(lms, idx[f"{front}SH"],W,H), vis_xy(lms, idx[f"{front}EL"],W,H), vis_xy(lms, idx[f"{front}WR"],W,H)
    shb, hpf, hpb = vis_xy(lms, idx[f"{back}SH"],W,H), vis_xy(lms, idx[f"{front}HP"],W,H), vis_xy(lms, idx[f"{back}HP"],W,H)
    knf, anf      = vis_xy(lms, idx[f"{front}KN"],W,H), vis_xy(lms, idx[f"{front}AN"],W,H)
    head          = vis_xy(lms, idx["NOSE"],W,H)

    elbow = angle_3pts(shf, elf, wrf)
    hip_mid = ((hpf[0]+hpb[0])/2.0, (hpf[1]+hpb[1])/2.0)
    sh_mid  = ((shf[0]+shb[0])/2.0, (shf[1]+shb[1])/2.0)
    spine = vangle((sh_mid[0]-hip_mid[0], sh_mid[1]-hip_mid[1]), (0,-1))
    hok = np.nan if np.isnan(head[0]) or np.isnan(knf[0]) else (head[0]-knf[0])
    foot = np.nan
    if not np.isnan(knf[0]) and not np.isnan(anf[0]):
        foot = vangle((anf[0]-knf[0], anf[1]-knf[1]), (1,0))
    return {"elbow":elbow,"spine":spine,"hok":hok,"foot":foot}

def put_label(img, text, x, y, s=0.6, color=(255,255,255), bg=(0,0,0)):
    (tw,th),_ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, s, 1)
    cv2.rectangle(img,(x,y-th-4),(x+tw+8,y+4),bg,-1)
    cv2.putText(img,text,(x+4,y),cv2.FONT_HERSHEY_SIMPLEX,s,color,1,cv2.LINE_AA)

def make_writer(dir_path, fps, size):
    os.makedirs(dir_path, exist_ok=True)
    tries = [("mp4","mp4v"), ("avi","MJPG"), ("avi","XVID")]
    for ext, four in tries:
        path = os.path.join(dir_path, f"annotated_video.{ext}")
        w = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*four), fps, size)
        if w.isOpened():
            print(f"[INFO] Using {ext.upper()} writer ({four}) → {path}")
            return w, path
    raise RuntimeError("No VideoWriter backend opened.")

# ---- scoring helpers ----
def _score_range(arr, low, high, tol=0.0):
    if arr.size == 0: return 5.0
    ok = np.logical_and(arr >= (low - tol), arr <= (high + tol))
    return 3 + 7 * ok.mean()

def final_scores(all_metrics, frame_w):
    elbows = np.array([m["elbow"] for m in all_metrics if not np.isnan(m["elbow"])])
    spines = np.array([m["spine"] for m in all_metrics if not np.isnan(m["spine"])])
    hoks   = np.array([m["hok"]   for m in all_metrics if not np.isnan(m["hok"])])
    feet   = np.array([m["foot"]  for m in all_metrics if not np.isnan(m["foot"])])

    foot_score = _score_range(feet, 0, 35, 5)
    if feet.size: foot_score = max(1.0, foot_score - np.clip(np.var(np.clip(feet,0,90))/400.0, 0, 2.0))

    hok_norm = hoks / max(1.0, 0.03 * frame_w) if hoks.size else np.array([])
    head_score = _score_range(hok_norm, -1.0, 1.0, 0.5)
    if hok_norm.size: head_score = max(1.0, head_score - np.clip(np.var(hok_norm)/2.0, 0, 2.0))

    elbow_score = _score_range(elbows, 95, 160, 5)
    if elbows.size>1: elbow_score = max(1.0, elbow_score - np.clip(np.mean(np.abs(np.diff(elbows)))/20.0, 0, 2.0))

    balance_score = _score_range(spines, 8, 38, 4)
    if spines.size>1: balance_score = max(1.0, balance_score - np.clip(np.std(spines)/20.0, 0, 2.0))

    n = len(all_metrics)
    end = slice(int(n*0.8), n) if n>0 else slice(0,0)
    elbows_end = np.array([m["elbow"] for m in all_metrics[end] if not np.isnan(m["elbow"])])
    spines_end = np.array([m["spine"] for m in all_metrics[end] if not np.isnan(m["spine"])])
    follow_score = (_score_range(elbows_end, 95, 160, 8) + _score_range(spines_end, 8, 38, 6)) / 2.0

    def cmt(val, good, ok, pos, neg):
        if val >= good: return f"Excellent {pos}."
        if val >= ok:   return f"Good {pos}, can be more consistent."
        return f"Needs work on {neg}."

    return {
        "Footwork":       {"score": round(float(foot_score), 1),   "comment": cmt(foot_score, 8.5, 7.0, "foot alignment", "plant direction and stability")},
        "Head Position":  {"score": round(float(head_score), 1),   "comment": cmt(head_score, 8.5, 7.0, "head over knee", "keeping head over the front knee")},
        "Swing Control":  {"score": round(float(elbow_score), 1),  "comment": cmt(elbow_score, 8.5, 7.0, "elbow path", "maintaining elbow angle")},
        "Balance":        {"score": round(float(balance_score), 1),"comment": cmt(balance_score, 8.5, 7.0, "spine balance", "overall balance through the shot")},
        "Follow-through": {"score": round(float(follow_score), 1), "comment": cmt(follow_score, 8.5, 7.0, "finish position", "finishing position")},
    }

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--input", required=True)
    ap.add_argument("--output_dir", default="output")
    ap.add_argument("--resize", type=int, default=720)
    ap.add_argument("--target_fps", type=int, default=24)
    ap.add_argument("--handedness", choices=["right","left"], default="right")
    ap.add_argument("--show_skeleton", action="store_true")
    args = ap.parse_args()

    cap = cv2.VideoCapture(args.input)
    if not cap.isOpened(): sys.exit(f"[ERROR] Cannot open: {args.input}")

    src_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    W0, H0 = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    if args.resize and max(W0,H0)!=args.resize:
        scale = args.resize/float(W0) if W0>=H0 else args.resize/float(H0)
        W, H = int(round(W0*scale)), int(round(H0*scale))
    else:
        W,H = W0,H0

    write_fps = float(args.target_fps) if args.target_fps>0 else float(src_fps)
    writer, out_path = make_writer(args.output_dir, write_fps, (W,H))

    pose = mp_pose.Pose(static_image_mode=False, model_complexity=1,
                        min_detection_confidence=0.5, min_tracking_confidence=0.5,
                        smooth_landmarks=True)

    frames_read = frames_written = 0
    t0 = time.time()
    all_metrics = []

    try:
        while True:
            ok, frame = cap.read()
            if not ok: break
            frames_read += 1
            if (frame.shape[1], frame.shape[0]) != (W,H):
                frame = cv2.resize(frame, (W,H), interpolation=cv2.INTER_AREA)

            res = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            if res.pose_landmarks:
                if args.show_skeleton:
                    mp_draw.draw_landmarks(frame, res.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                           landmark_drawing_spec=mp_styles.get_default_pose_landmarks_style())
                m = compute_metrics(res.pose_landmarks.landmark, W, H, args.handedness)
                all_metrics.append(m)

                y=26
                def iv(v):
                    return "nan" if v is None or (isinstance(v,float) and np.isnan(v)) else int(round(v))
                for line in (f"Elbow: {iv(m['elbow'])}°", f"Spine: {iv(m['spine'])}°",
                             f"HOK: {iv(m['hok'])} px", f"Foot: {iv(m['foot'])}°"):
                    put_label(frame, line, 10, y); y+=24
            else:
                put_label(frame, "Pose not detected", 10, 26)

            writer.write(frame)
            frames_written += 1
    finally:
        cap.release()
        writer.release()
        pose.close()

    dt = max(1e-6, time.time()-t0)
    avg_fps = frames_written/dt
    if frames_written == 0:
        # remove empty file to avoid broken JSON/video confusion
        try: os.remove(out_path)
        except: pass
        raise RuntimeError("0 frames written.")

    # ---- compute & write evaluation (ATOMIC) ----
    eval_obj = {
        "summary": final_scores(all_metrics, W),
        "meta": {"frames_read":frames_read, "frames_written":frames_written, "avg_fps":round(avg_fps,2)}
    }
    os.makedirs(args.output_dir, exist_ok=True)
    tmp_path = os.path.join(args.output_dir, "evaluation.tmp.json")
    final_path = os.path.join(args.output_dir, "evaluation.json")
    with open(tmp_path, "w") as f:
        json.dump(eval_obj, f, indent=2)
        f.flush(); os.fsync(f.fileno())
    os.replace(tmp_path, final_path)  # atomic rename
    print("[OK] Wrote:", out_path, "and", final_path)

if __name__ == "__main__":
    main()


Overwriting cover_drive_analysis_realtime.py


In [37]:
!python cover_drive_analysis_realtime.py \
  --input input_h264.mp4 \
  --resize 720 \
  --target_fps 24 \
  --handedness right \
  --show_skeleton \
  --output_dir output


2025-08-14 06:03:03.184600: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1755151383.247147   10205 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1755151383.270862   10205 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1755151383.318359   10205 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1755151383.318436   10205 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1755151383.318442   10205 computation_placer.cc:177] computation placer alr

In [38]:
import json
with open("output/evaluation.json") as f:
    data = json.load(f)

print("Scores:")
print(json.dumps(data["summary"], indent=2))


Scores:
{
  "Footwork": {
    "score": 3.0,
    "comment": "Needs work on plant direction and stability."
  },
  "Head Position": {
    "score": 3.1,
    "comment": "Needs work on keeping head over the front knee."
  },
  "Swing Control": {
    "score": 7.0,
    "comment": "Good elbow path, can be more consistent."
  },
  "Balance": {
    "score": 8.8,
    "comment": "Excellent spine balance."
  },
  "Follow-through": {
    "score": 7.8,
    "comment": "Good finish position, can be more consistent."
  }
}
