In [2]:
# Uninstall and reinstall dependencies, especially mediapipe, to ensure a clean installation.
!pip install ultralytics opencv-python numpy pytube


Collecting ultralytics
  Downloading ultralytics-8.3.228-py3-none-any.whl.metadata (37 kB)
Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl.metadata (5.0 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collect

In [5]:
from ultralytics import YOLO
import cv2
import numpy as np
import json
import math
from collections import deque
from datetime import datetime
import os

# -----------------------------
# CONFIG
# -----------------------------
VIDEO_PATH = "sports.mp4"       # <- put your mp4 here
OUTPUT_VIDEO = "analysis_output.mp4"
OUTPUT_JSON = "analysis_output.json"

TRACE_LEN = 10
VEL_THRESH = 2.0
SEQ_GAP = 0.6
SWING_MULT = 3.0
MIN_SWING = 40.0

STEP_MOVING_WINDOW = 5       # frames to smooth step direction
BODY_ORIENT_WINDOW = 5       # frames to smooth orientation

# -----------------------------
# Helper functions
# -----------------------------

def angle(a, b, c):
    a, b, c = map(np.array, [a, b, c])
    ba = a - b
    bc = c - b
    denom = (np.linalg.norm(ba) * np.linalg.norm(bc))
    if denom == 0:
        return None
    cosang = np.dot(ba, bc) / denom
    return math.degrees(math.acos(np.clip(cosang, -1, 1)))


def dist(a, b):
    return math.hypot(a[0] - b[0], a[1] - b[1])


def mean_sign(values):
    """Return -1, 0, or +1 based on mean sign of a list."""
    if not values:
        return 0
    m = np.mean(values)
    if abs(m) < 0.001:
        return 0
    return 1 if m > 0 else -1


# -----------------------------
# Main analysis
# -----------------------------

def analyze(video_path):

    model = YOLO("yolov8n-pose.pt")

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("ERROR: Cannot open:", video_path)
        return

    W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS) or 30
    dt = 1.0 / fps

    out = cv2.VideoWriter(
        OUTPUT_VIDEO,
        cv2.VideoWriter_fourcc(*"mp4v"),
        fps,
        (W, H)
    )

    wrist_trace = deque(maxlen=TRACE_LEN)
    prev_wrist = None
    prev_t = None
    velocities = []

    all_frames = []
    sequences = []
    seq = None
    last_active = None
    frame_i = 0

    # For step direction + orientation smoothing
    step_window = deque(maxlen=STEP_MOVING_WINDOW)
    orient_window = deque(maxlen=BODY_ORIENT_WINDOW)

    print("Processing video with YOLO Pose...")

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_i += 1
        t = frame_i / fps

        # YOLO inference
        results = model(frame, verbose=False)
        kpts = results[0].keypoints

        data = {"frame": frame_i, "time_s": t}

        if kpts is not None and len(kpts.xy) > 0:

            pts = kpts.xy[0].cpu().numpy()  # (17,2)

            # Wrist coordinates (keypoint 10 = right wrist)
            wx, wy = pts[10]
            wrist = (int(wx), int(wy))
            data["wrist_px"] = list(wrist)
            wrist_trace.append(wrist)

            # Elbow & knee angles
            data["r_elbow_angle"] = angle(pts[6], pts[8], pts[10])   # shoulder–elbow–wrist
            data["r_knee_angle"] = angle(pts[12], pts[14], pts[16]) # hip–knee–ankle

            # Wrist velocity
            if prev_wrist is not None:
                vel = dist(wrist, prev_wrist) / (t - prev_t)
            else:
                vel = 0.0
            data["wrist_vel"] = vel
            velocities.append(vel)

            prev_wrist = wrist
            prev_t = t

            # -----------------------------
            # Step direction estimation
            # (use pelvis/mid-hip motion)
            # -----------------------------
            # YOLO Pose keypoints: 11=left_hip, 12=right_hip
            mid_hip_x = (pts[11][0] + pts[12][0]) / 2
            data["mid_hip_x"] = float(mid_hip_x)

            # Compute forward/back movement
            if len(all_frames) > 0 and "mid_hip_x" in all_frames[-1]:
                displacement = mid_hip_x - all_frames[-1]["mid_hip_x"]
            else:
                displacement = 0.0

            step_window.append(displacement)
            step_sign = mean_sign(step_window)
            if step_sign > 0:
                data["step_direction"] = "RIGHT"
            elif step_sign < 0:
                data["step_direction"] = "LEFT"
            else:
                data["step_direction"] = "NEUTRAL"

            # -----------------------------
            # Body orientation estimation
            # (compare shoulder-to-shoulder vector)
            # -----------------------------
            L_sh, R_sh = pts[5], pts[6]  # left shoulder, right shoulder
            orient_window.append(R_sh[0] - L_sh[0])  # positive = facing right
            orient_sign = mean_sign(orient_window)

            if orient_sign > 0:
                data["body_orientation"] = "RIGHT"
            elif orient_sign < 0:
                data["body_orientation"] = "LEFT"
            else:
                data["body_orientation"] = "FRONT"

            # -----------------------------
            # Sequence tracking
            # -----------------------------
            active = vel > VEL_THRESH
            if active:
                last_active = t
                if seq is None:
                    seq = {"start": t, "frames": []}
            if seq:
                seq["frames"].append(data)
            if seq and last_active and (t - last_active) > SEQ_GAP:
                seq["end"] = t
                sequences.append(seq)
                seq = None

            # -----------------------------
            # Draw visualization
            # -----------------------------
            # Skeleton points
            for x, y in pts:
                cv2.circle(frame, (int(x), int(y)), 3, (255, 255, 0), -1)

            # Wrist trace
            for i in range(1, len(wrist_trace)):
                cv2.line(frame, wrist_trace[i-1], wrist_trace[i], (0,0,255), 3)

            # Overlays
            cv2.putText(frame, f"vel:{vel:.1f}  step:{data['step_direction']}  orient:{data['body_orientation']}",
                        (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7,
                        (255,255,255), 2)

        # Save frame data
        all_frames.append(data)
        out.write(frame)

    cap.release()
    out.release()

    # Close sequence if needed
    if seq:
        seq["end"] = all_frames[-1]["time_s"]
        sequences.append(seq)

    # Swing detection
    swings = []
    if velocities:
        med = np.median(velocities)
        thr = max(MIN_SWING, med * SWING_MULT)
        for i in range(1, len(velocities)-1):
            if velocities[i] > thr and velocities[i] > velocities[i-1] and velocities[i] >= velocities[i+1]:
                swings.append({"frame": i+1, "vel": float(velocities[i])})

    # Save JSON
    output = {
        "video": video_path,
        "fps": fps,
        "frames": all_frames,
        "sequences": sequences,
        "swings": swings,
        "processed_on": datetime.utcnow().isoformat() + "Z"
    }

    with open(OUTPUT_JSON, "w") as f:
        json.dump(output, f, indent=2)

    print("Saved video:", OUTPUT_VIDEO)
    print("Saved data :", OUTPUT_JSON)



if __name__ == "__main__":
    analyze(VIDEO_PATH)


Processing video with YOLO Pose...
Saved video: analysis_output.mp4
Saved data : analysis_output.json
