# Football Analysis – Fresh Start (Notebook-Only)

This notebook implements the core requirements from `task.txt` using a single, self-contained pipeline:

- Takes a video input from `../video/`
- Performs pose estimation (MediaPipe) and ball detection (YOLOv11)
- Outputs an annotated video with insights (touch indicators, counts)

Notes:
- Uses the same virtual environment already set up
- No external Python files; all logic is inside this notebook
- Outputs are saved to `./output/`



In [111]:
# Imports and setup
import os
from pathlib import Path
import cv2
import numpy as np
from ultralytics import YOLO
import mediapipe as mp
from IPython.display import display

VIDEO_DIR = Path('../video')
OUTPUT_DIR = Path('./output')
OUTPUT_DIR.mkdir(exist_ok=True)

# Config
RESIZE_WIDTH = 960
POSE_CONFIDENCE = 0.3
BALL_CONFIDENCE = 0.45
BALL_CLASS_ID = 32  # COCO sports ball
TOUCH_THRESHOLD = 30

# Initialize detectors
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, model_complexity=1,
                    enable_segmentation=False,
                    min_detection_confidence=POSE_CONFIDENCE,
                    min_tracking_confidence=POSE_CONFIDENCE)

print('Loading YOLOv11 model...')
yolo = YOLO('yolo11n.pt')
print('YOLOv11 loaded')


Loading YOLOv11 model...
YOLOv11 loaded


I0000 00:00:1754908021.932382 31476168 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.3), renderer: Apple M1
W0000 00:00:1754908022.115712 33735779 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1754908022.147803 33735786 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [112]:
# List available videos
videos = sorted([p for p in VIDEO_DIR.glob('*.mp4')])
for i, v in enumerate(videos):
    print(f"{i}: {v.name} ({v.stat().st_size/1024/1024:.1f}MB)")

# Select index here
VIDEO_INDEX = 3  # change if needed
video_path = videos[VIDEO_INDEX]
print('Selected:', video_path.name)




0: Video-1.mp4 (49.8MB)
1: Video-2.mp4 (63.4MB)
2: Video-3.mp4 (55.4MB)
3: Video-4.mp4 (108.2MB)
4: Video-5.mp4 (97.1MB)
Selected: Video-4.mp4


In [113]:
# Utility

def resize(frame, target_width=RESIZE_WIDTH):
    h, w = frame.shape[:2]
    scale = target_width / w
    return cv2.resize(frame, (target_width, int(h * scale)))


def detect_pose_keypoints(frame_bgr):
    results = pose.process(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))
    key_points = {}
    if results.pose_landmarks:
        lm = results.pose_landmarks.landmark
        # Feet indices (MediaPipe): left_foot_index=31, right_foot_index=32
        key_points['left_foot'] = (lm[31].x, lm[31].y)
        key_points['right_foot'] = (lm[32].x, lm[32].y)
    return results, key_points


def detect_ball(frame_bgr):
    res = yolo(frame_bgr, conf=BALL_CONFIDENCE, classes=[BALL_CLASS_ID], verbose=False)
    detections = []
    for r in res:
        if r.boxes is None:
            continue
        for b in r.boxes:
            x1, y1, x2, y2 = b.xyxy[0].cpu().numpy()
            conf = float(b.conf[0].cpu().numpy())
            cx, cy = int((x1+x2)/2), int((y1+y2)/2)
            detections.append({'center': (cx, cy), 'bbox': (int(x1), int(y1), int(x2), int(y2)), 'confidence': conf})
    return detections


def annotate(frame, pose_results, ball_dets, key_points, touches):
    # Pose
    if pose_results.pose_landmarks:
        mp.solutions.drawing_utils.draw_landmarks(
            frame,
            pose_results.pose_landmarks,
            mp_pose.POSE_CONNECTIONS,
            landmark_drawing_spec=mp.solutions.drawing_styles.get_default_pose_landmarks_style()
        )
    # Ball
    for d in ball_dets:
        cx, cy = d['center']
        cv2.circle(frame, (cx, cy), 10, (0, 255, 255), 2)
    # Touches
    for tp in touches:
        cv2.circle(frame, tp['foot_pixel'], 8, (255, 0, 255), -1)
    return frame



In [99]:
# # DONT RUN 
# # # Process video and save annotated output
# cap = cv2.VideoCapture(str(video_path))
# fps = cap.get(cv2.CAP_PROP_FPS)
# frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
# height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# scale = RESIZE_WIDTH / width
# out_h = int(height * scale)

# fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# writer = cv2.VideoWriter(str(OUTPUT_DIR / f"annotated_{video_path.stem}.mp4"), fourcc, fps, (RESIZE_WIDTH, out_h))

# print(f"Video: {video_path.name} | {width}x{height} | {fps:.1f}fps | {frame_count/fps:.1f}s")

# touches = []
# touch_times = []
# frame_idx = 0

# while True:
#     ret, frame = cap.read()
#     if not ret:
#         break
#     frame_idx += 1

#     frame_r = resize(frame, RESIZE_WIDTH)
#     h_r, w_r = frame_r.shape[:2]

#     pose_results, key_points = detect_pose_keypoints(frame_r)
#     balls = detect_ball(frame_r)

#     # Touch detection (simple proximity)
#     if balls and key_points:
#         cx, cy = balls[0]['center']
#         for foot in ['left_foot', 'right_foot']:
#             if foot in key_points:
#                 fx = int(key_points[foot][0] * w_r)
#                 fy = int(key_points[foot][1] * h_r)
#                 dist = ((cx - fx)**2 + (cy - fy)**2)**0.5
#                 if dist < TOUCH_THRESHOLD:
#                     # Debounce: count if last touch was >0.5s ago
#                     t = frame_idx / max(1.0, fps)
#                     if not touch_times or (t - touch_times[-1]) > 0.5:
#                         touches.append({'foot': foot, 'foot_pixel': (fx, fy), 'time': t})
#                         touch_times.append(t)

#     annotated = annotate(frame_r.copy(), pose_results, balls, key_points, touches)
#     writer.write(annotated)

# cap.release()
# writer.release()

# print(f"Saved: {OUTPUT_DIR / f'annotated_{video_path.stem}.mp4'}")
# print(f"Touches detected: {len(touches)}")


In [114]:
# Advanced ball tracker (Kalman + association)
import math
from collections import deque

class AdvancedKalmanBallTracker:
    def __init__(self, history_len: int = 30):
        self.state = None
        self.P = None
        self.Q = np.diag([1, 1, 10, 10]).astype(np.float32)
        self.R = np.diag([25, 25]).astype(np.float32)
        self.F = np.array([[1, 0, 1, 0],
                           [0, 1, 0, 1],
                           [0, 0, 1, 0],
                           [0, 0, 0, 1]], dtype=np.float32)
        self.H = np.array([[1, 0, 0, 0],
                           [0, 1, 0, 0]], dtype=np.float32)
        self.last_center = None
        self.misses = 0
        self.max_misses = 15
        self.history = deque(maxlen=history_len)
        self.last_diag = None
        # fallbacks
        self.ocv_tracker = None
        self.prev_gray = None
        self.prev_pt = None

    def _init_ocv_tracker(self, frame_bgr, bbox_xyxy):
        x1, y1, x2, y2 = bbox_xyxy
        w = max(1, x2 - x1); h = max(1, y2 - y1)
        box = (int(x1), int(y1), int(w), int(h))
        try:
            self.ocv_tracker = cv2.legacy.TrackerCSRT_create()
        except Exception:
            try:
                self.ocv_tracker = cv2.TrackerCSRT_create()
            except Exception:
                self.ocv_tracker = None
        if self.ocv_tracker is not None:
            self.ocv_tracker.init(frame_bgr, box)

    def _update_ocv_tracker(self, frame_bgr):
        if self.ocv_tracker is None:
            return None
        ok, box = self.ocv_tracker.update(frame_bgr)
        if not ok:
            return None
        x, y, w, h = [int(v) for v in box]
        cx, cy = x + w // 2, y + h // 2
        return (cx, cy)

    def _update_klt(self, frame_bgr):
        if self.prev_gray is None or self.prev_pt is None:
            return None
        gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
        next_pt, st, err = cv2.calcOpticalFlowPyrLK(self.prev_gray, gray, self.prev_pt, None, winSize=(21,21))
        self.prev_gray = gray
        if next_pt is None or st is None:
            return None
        valid = (st.reshape(-1) == 1)
        if not valid.any():
            return None
        pts = next_pt.reshape(-1, 2)[valid]
        mx, my = np.median(pts, axis=0)
        self.prev_pt = next_pt
        return (int(mx), int(my))

    def predict(self):
        if self.state is None:
            return None
        self.state = (self.F @ self.state)
        self.P = self.F @ self.P @ self.F.T + self.Q
        return (int(self.state[0]), int(self.state[1]))

    def update(self, measurement):
        z = np.array(measurement, dtype=np.float32)
        y = z - (self.H @ self.state)
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)
        self.state = self.state + (K @ y)
        I = np.eye(4, dtype=np.float32)
        self.P = (I - K @ self.H) @ self.P

    def step(self, detections, frame_bgr):
        chosen = None
        if detections:
            if self.state is None:
                chosen = max(detections, key=lambda d: d.get('confidence', 0))
                cx, cy = chosen['center']
                self.state = np.array([cx, cy, 0, 0], dtype=np.float32)
                self.P = np.eye(4, dtype=np.float32) * 100
                if 'diag' in chosen:
                    self.last_diag = chosen['diag']
                # bootstrap fallbacks
                self._init_ocv_tracker(frame_bgr, chosen['bbox'])
                self.prev_gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
                # seed KLT with local features around the ball
                x, y = int(cx), int(cy)
                x1, y1 = max(0, x-12), max(0, y-12)
                x2, y2 = min(frame_bgr.shape[1]-1, x+12), min(frame_bgr.shape[0]-1, y+12)
                patch = self.prev_gray[y1:y2, x1:x2]
                pts = cv2.goodFeaturesToTrack(patch, maxCorners=10, qualityLevel=0.01, minDistance=4)
                if pts is not None:
                    pts[:,0,0] += x1; pts[:,0,1] += y1
                    self.prev_pt = pts.astype(np.float32)
                else:
                    self.prev_pt = np.array([[[x, y]]], dtype=np.float32)
            else:
                pred = self.predict()
                gate_px = max(60, int(RESIZE_WIDTH * 0.12))
                def dist_px(d):
                    c = d['center']; return ((c[0]-pred[0])**2 + (c[1]-pred[1])**2)**0.5
                near = [d for d in detections if dist_px(d) <= gate_px]
                if near:
                    lambda_size = 2.0
                    lambda_conf = 50.0
                    def cost(d):
                        c = dist_px(d)
                        s = 0.0
                        if self.last_diag is not None and 'diag' in d:
                            s = lambda_size * abs(d['diag'] - self.last_diag)
                        q = lambda_conf * (1.0 - d.get('confidence', 0.0))
                        return c + s + q
                    chosen = min(near, key=cost)
                    self.update(chosen['center'])
                    if 'diag' in chosen:
                        self.last_diag = chosen['diag']
                else:
                    # fallbacks when no gated detection
                    kcxcy = self._update_klt(frame_bgr)
                    if kcxcy is None:
                        kcxcy = self._update_ocv_tracker(frame_bgr)
                    if kcxcy is not None:
                        self.update(kcxcy)

            if self.state is not None:
                # clamp velocity
                vx, vy = float(self.state[2]), float(self.state[3])
                vmax = max(30.0, RESIZE_WIDTH * 0.05)
                vmag = (vx*vx + vy*vy)**0.5
                if vmag > vmax:
                    s = vmax / max(1e-6, vmag)
                    self.state[2] *= s; self.state[3] *= s

                self.last_center = (int(self.state[0]), int(self.state[1]))
                self.history.append(self.last_center)
                # refresh KLT seed with local features
                self.prev_gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
                x, y = self.last_center
                x1, y1 = max(0, x-12), max(0, y-12)
                x2, y2 = min(frame_bgr.shape[1]-1, x+12), min(frame_bgr.shape[0]-1, y+12)
                patch = self.prev_gray[y1:y2, x1:x2]
                pts = cv2.goodFeaturesToTrack(patch, maxCorners=10, qualityLevel=0.01, minDistance=4)
                if pts is not None:
                    pts[:,0,0] += x1; pts[:,0,1] += y1
                    self.prev_pt = pts.astype(np.float32)
                else:
                    self.prev_pt = np.array([[[x, y]]], dtype=np.float32)
                self.misses = 0
        else:
            # no detections at all
            pred = self.predict()
            if pred is not None:
                self.last_center = pred
                self.history.append(pred)
            # try fallbacks
            kcxcy = self._update_klt(frame_bgr)
            if kcxcy is None:
                kcxcy = self._update_ocv_tracker(frame_bgr)
            if kcxcy is not None and self.state is not None:
                self.update(kcxcy)
                self.last_center = (int(self.state[0]), int(self.state[1]))
            self.misses += 1
            if self.misses > self.max_misses:
                self.reset()
        return self.last_center

    def reset(self):
        self.state = None
        self.P = None
        self.last_center = None
        self.misses = 0
        self.history.clear()
        self.ocv_tracker = None
        self.prev_gray = None
        self.prev_pt = None


In [115]:
# Team color assignment within bounding boxes

def extract_jersey_color(frame_bgr, bbox):
    x1, y1, x2, y2 = bbox
    x1, y1 = max(0, x1), max(0, y1)
    x2, y2 = min(frame_bgr.shape[1]-1, x2), min(frame_bgr.shape[0]-1, y2)
    if x2 <= x1 or y2 <= y1:
        return None
    # focus on upper torso region
    h = y2 - y1
    w = x2 - x1
    jy1 = y1 + int(0.2*h)
    jy2 = y1 + int(0.5*h)
    jx1 = x1 + int(0.2*w)
    jx2 = x2 - int(0.2*w)
    if jy2 <= jy1 or jx2 <= jx1:
        return None
    roi = frame_bgr[jy1:jy2, jx1:jx2]
    if roi.size == 0:
        return None
    # remove very dark/bright
    pixels = roi.reshape(-1, 3)
    s = pixels.sum(axis=1)
    mask = (s > 50) & (s < 650)
    if mask.sum() == 0:
        return tuple(np.mean(pixels, axis=0).astype(int))
    dom = pixels[mask].mean(axis=0)
    return tuple(dom.astype(int))


def classify_team_by_color(bgr):
    b, g, r = bgr
    brightness = (r+g+b)/3
    if brightness > 200:
        return 'ref'
    if b >= r and b >= g:
        return 'team_blue'
    if r >= g and r >= b:
        return 'team_red'
    return 'team_red' if (r+g) > b else 'team_blue'



In [102]:
# # Enhanced processing with tracker and team colors

# def process_with_tracker(video_path):
#     cap = cv2.VideoCapture(str(video_path))
#     fps = cap.get(cv2.CAP_PROP_FPS)
#     width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
#     height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
#     out_h = int(height * (RESIZE_WIDTH/width))
#     writer = cv2.VideoWriter(str(OUTPUT_DIR / f"annotated_{video_path.stem}_adv.mp4"),
#                              cv2.VideoWriter_fourcc(*'mp4v'), fps, (RESIZE_WIDTH, out_h))

#     tracker = AdvancedKalmanBallTracker()
#     team_colors = {'team_red': (0,0,255), 'team_blue': (255,0,0), 'ref': (255,255,255)}

#     frame_idx = 0
#     touches, touch_times = [], []

#     while True:
#         ret, frame = cap.read()
#         if not ret:
#             break
#         frame_idx += 1
#         fr = resize(frame)
#         h, w = fr.shape[:2]

#         # Pose for keypoints (touch logic)
#         pose_results, key_points = detect_pose_keypoints(fr)

#         # Ball detections + tracking (ROI-first + gated Kalman)
#         balls = enhanced_detect_ball(fr, prior_center=tracker.last_center, miss_count=tracker.misses)
#         center = tracker.step(balls, fr)
#         if center is not None:
#             cv2.circle(fr, center, 10, (0,255,0), 2)
#             pts = list(tracker.history)
#             for i in range(1, len(pts)):
#                 cv2.line(fr, pts[i-1], pts[i], (0,255,0), 2)

#         # Player detection via YOLO (class 0) and team assignment by jersey color
#         person_results = yolo(fr, conf=0.3, classes=[0], verbose=False)
#         for r in person_results:
#             if r.boxes is None:
#                 continue
#             for b in r.boxes:
#                 x1, y1, x2, y2 = b.xyxy[0].cpu().numpy()
#                 conf = float(b.conf[0].cpu().numpy())
#                 if conf < 0.3:
#                     continue
#                 bbox = (int(x1), int(y1), int(x2), int(y2))
#                 col = extract_jersey_color(fr, bbox)
#                 team = classify_team_by_color(col) if col is not None else 'team_red'
#                 color = team_colors.get(team, (100,100,100))
#                 cv2.rectangle(fr, (bbox[0],bbox[1]), (bbox[2],bbox[3]), color, 2)
#                 tag = 'REF' if team=='ref' else ('T1' if team=='team_red' else 'T2')
#                 cv2.putText(fr, tag, (bbox[0]+3, bbox[1]-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

#         # Touch check against tracked center if available
#         if center is not None and key_points:
#             cx, cy = center
#             for foot in ['left_foot','right_foot']:
#                 if foot in key_points:
#                     fx, fy = int(key_points[foot][0]*w), int(key_points[foot][1]*h)
#                     dist = ((cx - fx)**2 + (cy - fy)**2)**0.5
#                     if dist < TOUCH_THRESHOLD:
#                         t = frame_idx / max(1.0, fps)
#                         if not touch_times or (t - touch_times[-1]) > 0.5:
#                             touches.append({'foot': foot, 'foot_pixel': (fx, fy), 'time': t})
#                             touch_times.append(t)
#                             cv2.circle(fr, (fx, fy), 8, (255,0,255), -1)

#         writer.write(fr)

#     cap.release()
#     writer.release()
#     print(f"Saved advanced: {OUTPUT_DIR / f'annotated_{video_path.stem}_adv.mp4'} | Touches: {len(touches)}")

# # Run the enhanced process
# process_with_tracker(video_path)



In [116]:
# Enhanced ball detection: multi-scale + NMS with green-ring and dynamic sizing

def _iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    inter = max(0, xB - xA) * max(0, yB - yA)
    areaA = max(0, boxA[2]-boxA[0]) * max(0, boxA[3]-boxA[1])
    areaB = max(0, boxB[2]-boxB[0]) * max(0, boxB[3]-boxB[1])
    union = areaA + areaB - inter
    return inter/union if union > 0 else 0.0


def _nms(dets, iou_thr=0.3):
    if len(dets) <= 1:
        return dets
    dets = sorted(dets, key=lambda d: d.get('confidence', 0), reverse=True)
    keep = []
    while dets:
        best = dets.pop(0)
        keep.append(best)
        dets = [d for d in dets if _iou(best['bbox'], d['bbox']) < iou_thr]
    return keep


def _is_green_bgr(b,g,r):
    return (g > r + 15 and g > b + 15 and g > 60)


def _green_ring_ok(frame_bgr, cx, cy, r=12, samples=24):
    h, w = frame_bgr.shape[:2]
    ok, tot = 0, 0
    for i in range(samples):
        ang = 2*np.pi*i/samples
        x = int(cx + r*np.cos(ang)); y = int(cy + r*np.sin(ang))
        if 0 <= x < w and 0 <= y < h:
            b,g,r_ = frame_bgr[y, x].tolist()
            ok += 1 if _is_green_bgr(b,g,r_) else 0
            tot += 1
    return (tot >= 8) and (ok / max(1, tot) >= 0.5)


def enhanced_detect_ball(frame_bgr, prior_center=None, miss_count=0):
    h, w = frame_bgr.shape[:2]
    candidates = []

    def accept_and_add(x1, y1, x2, y2, conf):
        bw, bh = int(x2 - x1), int(y2 - y1)
        if bw <= 0 or bh <= 0:
            return
        # dynamic ball size via bbox diagonal
        diag = (bw*bw + bh*bh) ** 0.5
        min_d = 6
        max_d = max(18, int(RESIZE_WIDTH * 0.04))
        if diag < min_d or diag > max_d:
            return
        # rough circularity
        ar = max(bw, bh) / max(1, min(bw, bh))
        if ar > 1.6:
            return
        cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2)
        if not (0 <= cx < w and 0 <= cy < h):
            return
        # require green ring around center
        if not _green_ring_ok(frame_bgr, cx, cy, r=max(8, int(min_d*0.8))):
            return
        candidates.append({
            'center': (cx, cy),
            'bbox': (int(x1), int(y1), int(x2), int(y2)),
            'confidence': float(conf),
            'diag': diag
        })

    def run_yolo(img, offx=0, offy=0, conf=BALL_CONFIDENCE):
        res = yolo(img, conf=conf, classes=[BALL_CLASS_ID], verbose=False)
        for r in res:
            if r.boxes is None:
                continue
            for b in r.boxes:
                x1, y1, x2, y2 = b.xyxy[0].cpu().numpy()
                c = float(b.conf[0].cpu().numpy())
                accept_and_add(x1 + offx, y1 + offy, x2 + offx, y2 + offy, c)

    # 1) ROI around last known center (tight → wider with misses)
    if prior_center is not None and miss_count < 8:
        radius = int(80 + miss_count * 40)
        cx, cy = prior_center
        x1 = max(0, cx - radius); y1 = max(0, cy - radius)
        x2 = min(w, cx + radius); y2 = min(h, cy + radius)
        roi = frame_bgr[y1:y2, x1:x2]
        if roi.size > 0:
            run_yolo(roi, x1, y1, conf=BALL_CONFIDENCE)

    # 2) Fall back center crop
    if not candidates:
        cy1, cy2 = h // 4, 3 * h // 4
        cx1, cx2 = w // 4, 3 * w // 4
        crop = frame_bgr[cy1:cy2, cx1:cx2]
        run_yolo(crop, cx1, cy1, conf=max(0.30, BALL_CONFIDENCE * 0.9))

    # 3) Full frame as last resort
    if not candidates:
        run_yolo(frame_bgr, 0, 0, conf=max(0.30, BALL_CONFIDENCE * 0.9))

    return _nms(candidates, iou_thr=0.4)

In [117]:
# Field homography + top-down rendering utilities

FIELD_LENGTH_M = 105.0
FIELD_WIDTH_M = 68.0

# Match broadcast width; keep real field aspect
TOPDOWN_W = RESIZE_WIDTH
TOPDOWN_H = int(TOPDOWN_W * (FIELD_WIDTH_M / FIELD_LENGTH_M))
TOPDOWN_SIZE = (TOPDOWN_W, TOPDOWN_H)  # (W, H)

def _order_quad_points(pts):
    s = pts.sum(axis=1)
    diff = np.diff(pts, axis=1)[:, 0]
    tl = pts[np.argmin(s)]
    br = pts[np.argmax(s)]
    tr = pts[np.argmin(diff)]
    bl = pts[np.argmax(diff)]
    return np.array([tl, tr, br, bl], dtype=np.float32)

def estimate_pitch_homography(frame_bgr):
    hsv = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2HSV)
    lower = np.array([25, 40, 40], dtype=np.uint8)
    upper = np.array([85, 255, 255], dtype=np.uint8)
    mask = cv2.inRange(hsv, lower, upper)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8))
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((9, 9), np.uint8))
    cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not cnts:
        return None
    c = max(cnts, key=cv2.contourArea)
    if cv2.contourArea(c) < 0.05 * (frame_bgr.shape[0] * frame_bgr.shape[1]):
        return None
    rect = cv2.minAreaRect(c)
    box = cv2.boxPoints(rect).astype(np.float32)
    src = _order_quad_points(box)
    W, H = TOPDOWN_SIZE
    dst = np.array([[0, 0], [W - 1, 0], [W - 1, H - 1], [0, H - 1]], dtype=np.float32)
    return cv2.getPerspectiveTransform(src, dst)

def warp_points(H, points_xy):
    if H is None or not points_xy:
        return []
    pts = np.array(points_xy, dtype=np.float32).reshape(-1, 1, 2)
    warped = cv2.perspectiveTransform(pts, H).reshape(-1, 2)
    return [(int(x), int(y)) for x, y in warped]

def render_topdown(ball_trail_xy, players_xy, carrier_idx=None):
    W, H = TOPDOWN_SIZE
    img = np.zeros((H, W, 3), dtype=np.uint8)
    # field base + lines
    img[:] = (30, 120, 30)
    white = (240, 240, 240)
    cv2.rectangle(img, (2, 2), (W - 3, H - 3), white, 2)
    cv2.line(img, (W // 2, 0), (W // 2, H), white, 2)
    r = int(9.15 * (TOPDOWN_W / FIELD_LENGTH_M))  # 9.15 m
    cv2.circle(img, (W // 2, H // 2), r, white, 2)
    # ball trail
    if len(ball_trail_xy) > 1:
        for i in range(1, len(ball_trail_xy)):
            cv2.line(img, ball_trail_xy[i - 1], ball_trail_xy[i], (0, 255, 255), 2)
    if ball_trail_xy:
        cv2.circle(img, ball_trail_xy[-1], 6, (0, 255, 255), -1)
    # players
    for idx, p in enumerate(players_xy):
        if p is None:
            continue
        color = (0, 0, 255) if idx == (carrier_idx or -1) else (0, 120, 255)
        cv2.circle(img, (int(p[0]), int(p[1])), 6, color, -1)
    return img

def vstack_frames(bgr_top, bgr_bottom):
    # ensure same width
    h1, w1 = bgr_top.shape[:2]
    btm = cv2.resize(bgr_bottom, (w1, int(bgr_bottom.shape[0] * (w1 / bgr_bottom.shape[1]))))
    return np.vstack([bgr_top, btm])

In [118]:
# KMeans team clustering for consistent team IDs
try:
    from sklearn.cluster import KMeans
except Exception:
    KMeans = None

def bootstrap_team_clusters(frames=120):
    if KMeans is None:
        return None
    samples = []
    cap = cv2.VideoCapture(str(video_path))
    step = max(1, int(frames/10))
    idx = 0
    while idx < frames:
        ret, frame = cap.read()
        if not ret:
            break
        fr = resize(frame)
        res = yolo(fr, conf=0.35, classes=[0], verbose=False)
        for r in res:
            if r.boxes is None:
                continue
            for b in r.boxes:
                x1, y1, x2, y2 = b.xyxy[0].cpu().numpy()
                col = extract_jersey_color(fr, (int(x1),int(y1),int(x2),int(y2)))
                if col is not None:
                    samples.append(col)
        # skip forward roughly evenly
        for _ in range(step):
            cap.read()
        idx += step
    cap.release()
    if len(samples) < 2:
        return None
    X = np.array(samples)
    kmeans = KMeans(n_clusters=2, n_init=5, random_state=0)
    kmeans.fit(X)
    return kmeans



In [119]:
# Top-down (3D-like) rendering pipeline stacked under broadcast

def process_with_tracker_topdown(video_path):
    cap = cv2.VideoCapture(str(video_path))
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out_h = int(height * (RESIZE_WIDTH / width))

    # stacked output: broadcast + topdown
    stacked_h = out_h + TOPDOWN_H
    writer = cv2.VideoWriter(
        str(OUTPUT_DIR / f"annotated_{video_path.stem}_topdown.mp4"),
        cv2.VideoWriter_fourcc(*'mp4v'),
        fps,
        (RESIZE_WIDTH, stacked_h)
    )

    tracker = AdvancedKalmanBallTracker()
    H = None
    ball_trail_img_xy = []  # in broadcast-resized coords

    frame_idx = 0
    touches, touch_times = [], []

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_idx += 1
        fr = resize(frame)
        h, w = fr.shape[:2]

        # Estimate homography once (or retry occasionally if None)
        if H is None and frame_idx % 5 == 1:
            H = estimate_pitch_homography(fr)

        # Pose (touch keypoints)
        pose_results, key_points = detect_pose_keypoints(fr)

        # Ball detections + tracking (ROI-first + gated Kalman)
        balls = enhanced_detect_ball(fr, prior_center=tracker.last_center, miss_count=tracker.misses)
        center = tracker.step(balls, fr)
        if center is not None:
            cv2.circle(fr, center, 10, (0, 255, 0), 2)
            pts = list(tracker.history)
            for i in range(1, len(pts)):
                cv2.line(fr, pts[i - 1], pts[i], (0, 255, 0), 2)
            ball_trail_img_xy.append(center)
            if len(ball_trail_img_xy) > 150:
                ball_trail_img_xy.pop(0)

        # Player detection → bottom-center points
        players_img_xy = []
        results = yolo(fr, conf=0.3, classes=[0], verbose=False)
        for r in results:
            if r.boxes is None:
                continue
            for b in r.boxes:
                x1, y1, x2, y2 = b.xyxy[0].cpu().numpy()
                conf = float(b.conf[0].cpu().numpy())
                if conf < 0.3:
                    continue
                xbc = int((x1 + x2) / 2)
                ybc = int(y2)
                players_img_xy.append((xbc, ybc))

        # Touch check (against tracked center)
        if center is not None and key_points:
            cx, cy = center
            for foot in ['left_foot', 'right_foot']:
                if foot in key_points:
                    fx, fy = int(key_points[foot][0] * w), int(key_points[foot][1] * h)
                    dist = ((cx - fx) ** 2 + (cy - fy) ** 2) ** 0.5
                    if dist < TOUCH_THRESHOLD:
                        t = frame_idx / max(1.0, fps)
                        if not touch_times or (t - touch_times[-1]) > 0.5:
                            touches.append({'foot': foot, 'foot_pixel': (fx, fy), 'time': t})
                            touch_times.append(t)
                            cv2.circle(fr, (fx, fy), 8, (255, 0, 255), -1)

        # Top-down projection + render
        ball_td = warp_points(H, ball_trail_img_xy) if H is not None else []
        players_td = warp_points(H, players_img_xy) if H is not None else []
        td_img = render_topdown(ball_td, players_td)

        # Stack and write
        stacked = vstack_frames(fr, td_img)
        writer.write(stacked)

    cap.release()
    writer.release()
    print(f"Saved top-down: {OUTPUT_DIR / f'annotated_{video_path.stem}_topdown.mp4'} | Touches: {len(touches)}")

# Run
process_with_tracker_topdown(video_path)

Saved top-down: output/annotated_Video-4_topdown.mp4 | Touches: 0


In [None]:
# Re-run KMeans pipeline with metrics and summary
import json

def process_with_tracker_kmeans_metrics(video_path):
    cap = cv2.VideoCapture(str(video_path))
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out_h = int(height * (RESIZE_WIDTH/width))
    writer = cv2.VideoWriter(str(OUTPUT_DIR / f"annotated_{video_path.stem}_adv_kmeans_metrics.mp4"),
                             cv2.VideoWriter_fourcc(*'mp4v'), fps, (RESIZE_WIDTH, out_h))

    tracker = AdvancedKalmanBallTracker()
    team_colors = {'team_red': (0,0,255), 'team_blue': (255,0,0), 'ref': (255,255,255)}

    kmeans_team = bootstrap_team_clusters(frames=120)

    # Metrics state
    total_frames = 0
    frames_with_ball = 0
    touches = 0
    touch_times = []
    foot_counts = {'left': 0, 'right': 0}
    possession_frames = {'team_red': 0, 'team_blue': 0, 'ref': 0}
    prev_center = None
    ball_path_pixels = 0.0

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        total_frames += 1
        fr = resize(frame)
        h, w = fr.shape[:2]

        pose_results, key_points = detect_pose_keypoints(fr)
        balls = enhanced_detect_ball(fr, prior_center=tracker.last_center, miss_count=tracker.misses)
        center = tracker.step(balls, fr)
        if center is not None:
            frames_with_ball += 1
            cv2.circle(fr, center, 10, (0,255,0), 2)
            if prev_center is not None:
                dx = center[0] - prev_center[0]
                dy = center[1] - prev_center[1]
                ball_path_pixels += (dx*dx + dy*dy) ** 0.5
            prev_center = center
            pts = list(tracker.history)
            for i in range(1, len(pts)):
                cv2.line(fr, pts[i-1], pts[i], (0,255,0), 2)

        # Player detection + team assignment, and build list for possession
        players_with_team = []  # (xc, yb, team)
        players_with_team_bboxh = []  # (xc, yb, team, bbox_h)
        person_results = yolo(fr, conf=0.3, classes=[0], verbose=False)
        for r in person_results:
            if r.boxes is None:
                continue
            for b in r.boxes:
                x1, y1, x2, y2 = b.xyxy[0].cpu().numpy()
                conf = float(b.conf[0].cpu().numpy())
                if conf < 0.3:
                    continue
                bbox = (int(x1), int(y1), int(x2), int(y2))
                col = extract_jersey_color(fr, bbox)
                team = None
                if col is not None and kmeans_team is not None:
                    pred = int(kmeans_team.predict(np.array(col).reshape(1,-1))[0])
                    team = 'team_red' if pred == 0 else 'team_blue'
                if team is None:
                    team = classify_team_by_color(col) if col is not None else 'team_red'
                color = team_colors.get(team, (100,100,100))
                cv2.rectangle(fr, (bbox[0],bbox[1]), (bbox[2],bbox[3]), color, 2)
                tag = 'REF' if team=='ref' else ('T1' if team=='team_red' else 'T2')
                cv2.putText(fr, tag, (bbox[0]+3, bbox[1]-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
                xbc = int((bbox[0]+bbox[2])/2)
                ybc = int(bbox[3])
                players_with_team.append((xbc, ybc, team))
                players_with_team_bboxh.append((xbc, ybc, team, int(y2 - y1)))

        # Robust possession using hysteresis
        if 'poss_tracker' not in globals():
            poss_tracker = PossessionTracker(hold_frames=4, min_switch_gap_frames=3, base_px=12)
        if center is not None:
            team_poss = poss_tracker.step(total_frames, center, players_with_team_bboxh)
            if team_poss is not None:
                possession_frames[team_poss] = possession_frames.get(team_poss, 0) + 1

        # Touch check against tracked center
        if center is not None and key_points:
            cx, cy = center
            for foot in ['left_foot','right_foot']:
                if foot in key_points:
                    fx, fy = int(key_points[foot][0]*w), int(key_points[foot][1]*h)
                    dist = ((cx - fx)**2 + (cy - fy)**2)**0.5
                    if dist < TOUCH_THRESHOLD:
                        t = total_frames / max(1.0, fps)
                        if not touch_times or (t - touch_times[-1]) > 0.5:
                            touches += 1
                            touch_times.append(t)
                            side = 'left' if foot.startswith('left') else 'right'
                            foot_counts[side] += 1
                            cv2.circle(fr, (fx, fy), 8, (255,0,255), -1)

        writer.write(fr)

    cap.release()
    writer.release()

    # Summaries
    duration_s = total_frames / max(1.0, fps)
    ball_det_rate = frames_with_ball / max(1, total_frames)
    avg_touch_interval = (float(np.mean(np.diff(touch_times))) if len(touch_times) >= 2 else None)
    possession_pct = {k: (v / max(1, frames_with_ball)) for k, v in possession_frames.items()}

    summary = {
        'video': video_path.name,
        'fps': float(fps),
        'duration_s': float(duration_s),
        'total_frames': int(total_frames),
        'frames_with_ball': int(frames_with_ball),
        'ball_detection_rate': float(ball_det_rate),
        'touches_total': int(touches),
        'touches_by_foot': foot_counts,
        'avg_touch_interval_s': (float(avg_touch_interval) if avg_touch_interval is not None else None),
        'possession_pct': possession_pct,
        'ball_path_pixels': float(ball_path_pixels)
    }

    print("\n=== Inference Summary ===")
    print(f"Video: {summary['video']}")
    print(f"Frames: {summary['total_frames']}  |  Ball frames: {summary['frames_with_ball']}  |  Ball det. rate: {summary['ball_detection_rate']*100:.1f}%")
    print(f"Touches: {summary['touches_total']}  |  L/R: {foot_counts['left']}/{foot_counts['right']}")
    if avg_touch_interval is not None:
        print(f"Avg touch interval: {avg_touch_interval:.2f}s")
    print("Possession (of ball frames): " + ", ".join([f"{k}:{v*100:.1f}%" for k,v in possession_pct.items()]))

    with open(OUTPUT_DIR / f"metrics_{video_path.stem}_adv_kmeans.json", 'w') as f:
        json.dump(summary, f, indent=2)
    print(f"Saved metrics: {OUTPUT_DIR / f'metrics_{video_path.stem}_adv_kmeans.json'}")

# Run and summarize
process_with_tracker_kmeans_metrics(video_path)



=== Inference Summary ===
Video: Video-4.mp4
Frames: 1108  |  Ball frames: 687  |  Ball det. rate: 62.0%
Touches: 0  |  L/R: 0/0
Possession (of ball frames): team_red:16.7%, team_blue:53.3%, ref:0.0%
Saved metrics: output/metrics_Video-4_adv_kmeans.json


In [121]:
# Professional HUD overlay + metrics pipeline with coach tips

import math

def _draw_bar(img, origin, size, pct, fg=(0,0,255), bg=(60,60,60), border=(220,220,220)):
    x, y = origin; w, h = size
    cv2.rectangle(img, (x, y), (x+w, y+h), bg, -1)
    fill_w = int(max(0, min(1.0, pct)) * w)
    cv2.rectangle(img, (x, y), (x+fill_w, y+h), fg, -1)
    cv2.rectangle(img, (x, y), (x+w, y+h), border, 1)


def draw_hud(frame, state, team_colors):
    h, w = frame.shape[:2]
    panel_w = int(w * 0.35)
    panel_h = int(h * 0.20)
    x0, y0 = 10, 10
    overlay = frame.copy()
    cv2.rectangle(overlay, (x0, y0), (x0+panel_w, y0+panel_h), (20,20,20), -1)
    cv2.addWeighted(overlay, 0.4, frame, 0.6, 0, frame)

    # Titles
    cv2.putText(frame, 'Analysis HUD', (x0+10, y0+20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (240,240,240), 1)

    # Possession bars
    y = y0 + 40
    total_ball = max(1, state['frames_with_ball'])
    poss_red = state['possession_frames'].get('team_red', 0) / total_ball
    poss_blue = state['possession_frames'].get('team_blue', 0) / total_ball
    _draw_bar(frame, (x0+10, y), (panel_w-20, 16), poss_red, fg=team_colors['team_red'])
    cv2.putText(frame, f"Team Red {poss_red*100:.0f}%", (x0+14, y+13), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (230,230,230), 1)
    y += 24
    _draw_bar(frame, (x0+10, y), (panel_w-20, 16), poss_blue, fg=team_colors['team_blue'])
    cv2.putText(frame, f"Team Blue {poss_blue*100:.0f}%", (x0+14, y+13), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (230,230,230), 1)
    y += 28

    # Touches and L/R
    cv2.putText(frame, f"Touches: {state['touches']}  |  L/R: {state['foot_counts']['left']}/{state['foot_counts']['right']}",
                (x0+10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (240,240,240), 1)
    y += 20

    # Ball detection rate
    det_rate = state['frames_with_ball'] / max(1, state['total_frames'])
    cv2.putText(frame, f"Ball detection: {det_rate*100:.0f}%", (x0+10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (240,240,240), 1)

    # Event timeline (touch ticks)
    if state['touch_times']:
        y2 = y0 + panel_h - 10
        cv2.line(frame, (x0+10, y2), (x0+panel_w-10, y2), (200,200,200), 1)
        dur = max(1e-6, state['total_frames'] / max(1.0, state['fps']))
        for t in state['touch_times']:
            tx = x0+10 + int((t / dur) * (panel_w-20))
            cv2.line(frame, (tx, y2-6), (tx, y2+6), (255,0,255), 2)

    return frame


def process_with_tracker_kmeans_pro(video_path):
    cap = cv2.VideoCapture(str(video_path))
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out_h = int(height * (RESIZE_WIDTH/width))
    writer = cv2.VideoWriter(str(OUTPUT_DIR / f"annotated_{video_path.stem}_pro.mp4"),
                             cv2.VideoWriter_fourcc(*'mp4v'), fps, (RESIZE_WIDTH, out_h))

    tracker = AdvancedKalmanBallTracker()
    team_colors = {'team_red': (0,0,255), 'team_blue': (255,0,0), 'ref': (255,255,255)}

    kmeans_team = bootstrap_team_clusters(frames=120)

    # State for metrics
    state = {
        'total_frames': 0,
        'frames_with_ball': 0,
        'touches': 0,
        'touch_times': [],
        'foot_counts': {'left': 0, 'right': 0},
        'possession_frames': {'team_red': 0, 'team_blue': 0, 'ref': 0},
        'fps': float(fps)
    }

    prev_center = None

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        state['total_frames'] += 1
        fr = resize(frame)
        h, w = fr.shape[:2]

        pose_results, key_points = detect_pose_keypoints(fr)
        balls = enhanced_detect_ball(fr, prior_center=tracker.last_center, miss_count=tracker.misses)
        center = tracker.step(balls, fr)
        if center is not None:
            state['frames_with_ball'] += 1
            cv2.circle(fr, center, 10, (0,255,0), 2)
            pts = list(tracker.history)
            for i in range(1, len(pts)):
                cv2.line(fr, pts[i-1], pts[i], (0,255,0), 2)

        # Players + team
        players_with_team = []
        person_results = yolo(fr, conf=0.3, classes=[0], verbose=False)
        for r in person_results:
            if r.boxes is None:
                continue
            for b in r.boxes:
                x1, y1, x2, y2 = b.xyxy[0].cpu().numpy()
                conf = float(b.conf[0].cpu().numpy())
                if conf < 0.3:
                    continue
                bbox = (int(x1), int(y1), int(x2), int(y2))
                col = extract_jersey_color(fr, bbox)
                team = None
                if col is not None and kmeans_team is not None:
                    pred = int(kmeans_team.predict(np.array(col).reshape(1,-1))[0])
                    team = 'team_red' if pred == 0 else 'team_blue'
                if team is None:
                    team = classify_team_by_color(col) if col is not None else 'team_red'
                color = team_colors.get(team, (100,100,100))
                cv2.rectangle(fr, (bbox[0],bbox[1]), (bbox[2],bbox[3]), color, 2)
                tag = 'REF' if team=='ref' else ('T1' if team=='team_red' else 'T2')
                cv2.putText(fr, tag, (bbox[0]+3, bbox[1]-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
                xbc = int((bbox[0]+bbox[2])/2); ybc = int(bbox[3])
                players_with_team.append((xbc, ybc, team))

        # Possession nearest to ball
        if center is not None and players_with_team:
            cx, cy = center
            nearest = min(players_with_team, key=lambda p: (p[0]-cx)**2 + (p[1]-cy)**2)
            dist = ((nearest[0]-cx)**2 + (nearest[1]-cy)**2) ** 0.5
            if dist < 80:
                state['possession_frames'][nearest[2]] = state['possession_frames'].get(nearest[2], 0) + 1

        # Touches
        if center is not None and key_points:
            cx, cy = center
            for foot in ['left_foot','right_foot']:
                if foot in key_points:
                    fx, fy = int(key_points[foot][0]*w), int(key_points[foot][1]*h)
                    dist = ((cx - fx)**2 + (cy - fy)**2)**0.5
                    if dist < TOUCH_THRESHOLD:
                        t = state['total_frames'] / max(1.0, fps)
                        if not state['touch_times'] or (t - state['touch_times'][-1]) > 0.5:
                            state['touches'] += 1
                            state['touch_times'].append(t)
                            side = 'left' if foot.startswith('left') else 'right'
                            state['foot_counts'][side] += 1
                            cv2.circle(fr, (fx, fy), 8, (255,0,255), -1)

        # HUD
        fr = draw_hud(fr, state, team_colors)
        writer.write(fr)

    cap.release()
    writer.release()

    # Final summary + tips
    total = max(1, state['total_frames'])
    ball_frames = max(1, state['frames_with_ball'])
    poss_red = state['possession_frames'].get('team_red',0)/ball_frames
    poss_blue = state['possession_frames'].get('team_blue',0)/ball_frames
    avg_touch_interval = (float(np.mean(np.diff(state['touch_times']))) if len(state['touch_times']) >= 2 else None)

    tips = []
    if state['touches'] >= 1 and avg_touch_interval is not None and avg_touch_interval > 1.2:
        tips.append('Touches are too far apart; shorten interval for tighter control.')
    if state['foot_counts']['left'] + state['foot_counts']['right'] >= 4:
        total_t = state['foot_counts']['left'] + state['foot_counts']['right']
        left_pct = state['foot_counts']['left']/total_t
        if left_pct > 0.8:
            tips.append('Strong left-foot bias; incorporate more right-foot touches.')
        elif left_pct < 0.2:
            tips.append('Strong right-foot bias; incorporate more left-foot touches.')
    if abs(poss_red - poss_blue) > 0.5:
        tips.append('Severe possession imbalance; adjust team spacing/support.')
    if (state['frames_with_ball'] / total) < 0.35:
        tips.append('Low ball visibility; adjust detector/conf or camera scaling.')

    summary = {
        'video': video_path.name,
        'ball_detection_rate': state['frames_with_ball']/total,
        'touches': state['touches'],
        'touches_by_foot': state['foot_counts'],
        'avg_touch_interval_s': avg_touch_interval,
        'possession_pct': {'team_red': poss_red, 'team_blue': poss_blue},
        'coach_tips': tips
    }

    # Save JSON + summary image panel
    with open(OUTPUT_DIR / f"summary_{video_path.stem}_pro.json", 'w') as f:
        import json; json.dump(summary, f, indent=2)

    panel = np.zeros((220, RESIZE_WIDTH, 3), dtype=np.uint8)
    cv2.putText(panel, f"Summary: {video_path.name}", (10, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (240,240,240), 2)
    cv2.putText(panel, f"Ball detection: {summary['ball_detection_rate']*100:.1f}%", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (240,240,240), 1)
    cv2.putText(panel, f"Touches: {summary['touches']}  L/R: {state['foot_counts']['left']}/{state['foot_counts']['right']}", (10, 88), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (240,240,240), 1)
    cv2.putText(panel, f"Possession - Red: {poss_red*100:.0f}%  Blue: {poss_blue*100:.0f}%", (10, 116), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (240,240,240), 1)
    if tips:
        y = 148
        cv2.putText(panel, "Coach Tips:", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 1)
        for t in tips[:3]:
            y += 22
            cv2.putText(panel, f"- {t}", (20, y), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (220,220,220), 1)
    cv2.imwrite(str(OUTPUT_DIR / f"summary_{video_path.stem}_pro.png"), panel)
    print(f"Saved: {OUTPUT_DIR / f'annotated_{video_path.stem}_pro.mp4'}")
    print(f"Saved: {OUTPUT_DIR / f'summary_{video_path.stem}_pro.json'}")
    print(f"Saved: {OUTPUT_DIR / f'summary_{video_path.stem}_pro.png'}")

# Run professional HUD pipeline
process_with_tracker_kmeans_pro(video_path)


Saved: output/annotated_Video-4_pro.mp4
Saved: output/summary_Video-4_pro.json
Saved: output/summary_Video-4_pro.png


In [126]:
# Robust possession tracker with adaptive thresholds and hysteresis

class PossessionTracker:
    def __init__(self, hold_frames: int = 4, min_switch_gap_frames: int = 3, base_px: int = 20):
        self.hold_frames = hold_frames
        self.min_switch_gap_frames = min_switch_gap_frames
        self.base_px = base_px
        self.current_team = None
        self.current_player = None
        self.hold_counter = 0
        self.last_switch_frame = -9999

    @staticmethod
    def _dist(a, b):
        if a is None or b is None:
            return 1e9
        dx, dy = a[0]-b[0], a[1]-b[1]
        return (dx*dx + dy*dy) ** 0.5

    def step(self, frame_idx: int, ball_xy, players_with_team_bboxh):
        # players_with_team_bboxh: list of (xbc, ybc, team, bbox_h)
        if ball_xy is None or not players_with_team_bboxh:
            self.hold_counter = max(0, self.hold_counter-1)
            return self.current_team
        # adaptive distance: base + fraction of bbox height
        candidates = []
        for (xc, yc, team, bbox_h) in players_with_team_bboxh:
            thr = self.base_px + 0.6 * max(1, bbox_h)
            d = self._dist(ball_xy, (xc, yc))
            if d <= thr:
                candidates.append((d, team, (xc, yc), bbox_h))
        if not candidates:
            self.hold_counter = max(0, self.hold_counter-1)
            return self.current_team
        # pick nearest
        d, team, _, _ = min(candidates, key=lambda t: t[0])
        # hysteresis: require consecutive frames to switch
        if team == self.current_team:
            self.hold_counter = min(self.hold_frames, self.hold_counter+1)
            return self.current_team
        # different team
        if frame_idx - self.last_switch_frame < self.min_switch_gap_frames:
            return self.current_team
        self.hold_counter += 1
        if self.hold_counter >= self.hold_frames:
            self.current_team = team
            self.last_switch_frame = frame_idx
            self.hold_counter = 0
        return self.current_team



In [None]:
# # Hook KMeans into main processing (override snippet)

# def process_with_tracker_kmeans(video_path):
#     cap = cv2.VideoCapture(str(video_path))
#     fps = cap.get(cv2.CAP_PROP_FPS)
#     width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
#     height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
#     out_h = int(height * (RESIZE_WIDTH/width))
#     writer = cv2.VideoWriter(str(OUTPUT_DIR / f"annotated_{video_path.stem}_adv_kmeans.mp4"),
#                              cv2.VideoWriter_fourcc(*'mp4v'), fps, (RESIZE_WIDTH, out_h))

#     tracker = AdvancedKalmanBallTracker()
#     team_colors = {'team_red': (0,0,255), 'team_blue': (255,0,0), 'ref': (255,255,255)}

#     # Bootstrap KMeans once at the start
#     kmeans_team = bootstrap_team_clusters(frames=120)

#     frame_idx = 0
#     touches, touch_times = [], []

#     while True:
#         ret, frame = cap.read()
#         if not ret:
#             break
#         frame_idx += 1
#         fr = resize(frame)
#         h, w = fr.shape[:2]

#         pose_results, key_points = detect_pose_keypoints(fr)
#         balls = enhanced_detect_ball(fr, prior_center=tracker.last_center, miss_count=tracker.misses)
#         center = tracker.step(balls, fr)
#         if center is not None:
#             cv2.circle(fr, center, 10, (0,255,0), 2)
#             pts = list(tracker.history)
#             for i in range(1, len(pts)):
#                 cv2.line(fr, pts[i-1], pts[i], (0,255,0), 2)

#         person_results = yolo(fr, conf=0.3, classes=[0], verbose=False)
#         for r in person_results:
#             if r.boxes is None:
#                 continue
#             for b in r.boxes:
#                 x1, y1, x2, y2 = b.xyxy[0].cpu().numpy()
#                 conf = float(b.conf[0].cpu().numpy())
#                 if conf < 0.3:
#                     continue
#                 bbox = (int(x1), int(y1), int(x2), int(y2))
#                 col = extract_jersey_color(fr, bbox)
#                 team = None
#                 if col is not None and kmeans_team is not None:
#                     pred = int(kmeans_team.predict(np.array(col).reshape(1,-1))[0])
#                     team = 'team_red' if pred == 0 else 'team_blue'
#                 if team is None:
#                     team = classify_team_by_color(col) if col is not None else 'team_red'
#                 color = team_colors.get(team, (100,100,100))
#                 cv2.rectangle(fr, (bbox[0],bbox[1]), (bbox[2],bbox[3]), color, 2)
#                 tag = 'REF' if team=='ref' else ('T1' if team=='team_red' else 'T2')
#                 cv2.putText(fr, tag, (bbox[0]+3, bbox[1]-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

#         if center is not None and key_points:
#             cx, cy = center
#             for foot in ['left_foot','right_foot']:
#                 if foot in key_points:
#                     fx, fy = int(key_points[foot][0]*w), int(key_points[foot][1]*h)
#                     dist = ((cx - fx)**2 + (cy - fy)**2)**0.5
#                     if dist < TOUCH_THRESHOLD:
#                         t = frame_idx / max(1.0, fps)
#                         if not touch_times or (t - touch_times[-1]) > 0.5:
#                             touches.append({'foot': foot, 'foot_pixel': (fx, fy), 'time': t})
#                             touch_times.append(t)
#                             cv2.circle(fr, (fx, fy), 8, (255,0,255), -1)

#         writer.write(fr)

#     cap.release()
#     writer.release()
#     print(f"Saved advanced (KMeans): {OUTPUT_DIR / f'annotated_{video_path.stem}_adv_kmeans.mp4'} | Touches: {len(touches)}")

# # Run KMeans-enhanced process
# process_with_tracker_kmeans(video_path)



Saved advanced (KMeans): output/annotated_Video-1_adv_kmeans.mp4 | Touches: 0


In [127]:
# Passing & Vision Analyzer + Pro pipeline with passing metrics

from collections import deque

class PassingAnalyzer:
    def __init__(
        self,
        fps: float,
        own_dist_px: int = 55,
        release_min_speed_pxpf: float = 5,
        cone_deg: float = 55,
        receive_dist_px: int = 95,
        window_frames: int = 45,
        min_pass_distance_px: int = 80,
        min_air_frames: int = 2,
    ):
        self.fps = max(1.0, float(fps))
        self.own_dist_px = own_dist_px
        self.release_min_speed_pxpf = release_min_speed_pxpf
        self.cone_deg = cone_deg
        self.receive_dist_px = receive_dist_px
        self.window_frames = window_frames
        self.min_pass_distance_px = min_pass_distance_px
        self.min_air_frames = min_air_frames
        # state
        self.current_owner_team = None
        self.current_owner_pos = None
        self.last_owner_team = None
        self.last_ball_xy = None
        self.last_ball_frame = None
        self.last_ball_vel = None
        # release state
        self.release_team = None
        self.release_frame = None
        self.release_pos = None
        self.release_vel = None
        # hysteresis and smoothing
        self.history = deque(maxlen=7)
        self.speed_ema = 0.0
        self.ema_alpha = 0.5
        self.owner_loss_frames = 0
        self.reception_close_frames = 0
        self.receive_speed_max_pxpf = 3.0
        # events
        self.events = []

    @staticmethod
    def _dist(a, b):
        if a is None or b is None:
            return 1e9
        dx, dy = a[0]-b[0], a[1]-b[1]
        return (dx*dx + dy*dy) ** 0.5

    @staticmethod
    def _angle_deg(u, v):
        ux, uy = u; vx, vy = v
        nu = max(1e-6, (ux*ux + uy*uy) ** 0.5)
        nv = max(1e-6, (vx*vx + vy*vy) ** 0.5)
        cos = max(-1.0, min(1.0, (ux*vx + uy*vy) / (nu*nv)))
        import math
        return abs(math.degrees(math.acos(cos)))

    def _nearest_player(self, ball_xy, players_with_team):
        if ball_xy is None or not players_with_team:
            return None
        bx, by = ball_xy
        nearest = min(players_with_team, key=lambda p: (p[0]-bx)**2 + (p[1]-by)**2)
        dist = self._dist(ball_xy, (nearest[0], nearest[1]))
        return nearest, dist

    def _compute_ball_velocity(self, frame_idx, ball_xy):
        if self.last_ball_xy is None or self.last_ball_frame is None or ball_xy is None:
            return None, 0.0
        dtf = max(1, frame_idx - self.last_ball_frame)
        vx = (ball_xy[0] - self.last_ball_xy[0]) / dtf
        vy = (ball_xy[1] - self.last_ball_xy[1]) / dtf
        speed = (vx*vx + vy*vy) ** 0.5
        return (vx, vy), speed

    def step(self, frame_idx: int, ball_xy, players_with_team):
        # ownership
        owner_team, owner_pos = None, None
        nearest = self._nearest_player(ball_xy, players_with_team)
        if nearest is not None:
            (px, py, team), d = nearest
            if d < self.own_dist_px:
                owner_team, owner_pos = team, (px, py)

        # velocity and EMA
        vel, speed = self._compute_ball_velocity(frame_idx, ball_xy)
        if vel is not None:
            self.speed_ema = self.ema_alpha*speed + (1.0-self.ema_alpha)*self.speed_ema

        if ball_xy is not None:
            self.history.append((frame_idx, ball_xy))

        # owner-loss hysteresis
        if owner_team is None and self.current_owner_team is not None:
            self.owner_loss_frames += 1
        else:
            self.owner_loss_frames = 0

        # detect release
        if (self.release_team is None and self.owner_loss_frames >= 2 and ball_xy is not None):
            last_owner = self.current_owner_team
            moving = (self.speed_ema >= self.release_min_speed_pxpf)
            traveled = (len(self.history) >= 2 and self._dist(self.history[0][1], self.history[-1][1]) >= max(60, 0.06*RESIZE_WIDTH))
            if moving or traveled:
                self.release_team = self.last_owner_team or last_owner
                self.release_frame = frame_idx
                self.release_pos = ball_xy
                self.release_vel = vel

        # reception search
        if self.release_team is not None and self.release_frame is not None and ball_xy is not None:
            frames_since_release = frame_idx - self.release_frame
            if frames_since_release > self.window_frames:
                self.release_team = None; self.release_frame = None; self.release_pos = None; self.release_vel = None
                self.reception_close_frames = 0
            else:
                if frames_since_release >= self.min_air_frames:
                    rel_vec = (ball_xy[0] - self.release_pos[0], ball_xy[1] - self.release_pos[1])
                    cand = []
                    for (px, py, team) in players_with_team:
                        to_player = (px - self.release_pos[0], py - self.release_pos[1])
                        ang = self._angle_deg(rel_vec if rel_vec != (0,0) else (1,0), to_player)
                        if ang <= self.cone_deg:
                            dist_now = self._dist(ball_xy, (px, py))
                            if dist_now <= self.receive_dist_px and self._dist(self.release_pos, ball_xy) >= self.min_pass_distance_px:
                                cand.append((dist_now, team))
                    self.reception_close_frames = (self.reception_close_frames + 1) if cand else 0
                    if self.reception_close_frames >= 2 and self.speed_ema <= self.receive_speed_max_pxpf:
                        recv_team = min(cand, key=lambda x: x[0])[1] if cand else None
                        result = 'success' if recv_team == self.release_team else 'intercepted'
                        self.events.append({
                            'type': 'pass',
                            'start_frame': self.release_frame,
                            'end_frame': frame_idx,
                            'team': self.release_team,
                            'result': result,
                            'distance_px': self._dist(self.release_pos, ball_xy),
                            'duration_s': frames_since_release / self.fps
                        })
                        self.release_team = None; self.release_frame = None; self.release_pos = None; self.release_vel = None
                        self.reception_close_frames = 0

        # update state
        self.last_owner_team = self.current_owner_team
        self.current_owner_team = owner_team
        self.current_owner_pos = owner_pos
        self.last_ball_xy = ball_xy
        self.last_ball_frame = frame_idx
        self.last_ball_vel = vel

    def summarize(self):
        total = len([e for e in self.events if e['type'] == 'pass'])
        success = len([e for e in self.events if e['type'] == 'pass' and e['result'] == 'success'])
        intercepted = len([e for e in self.events if e['type'] == 'pass' and e['result'] == 'intercepted'])
        success_rate = (success / total) if total > 0 else 0.0
        import numpy as _np
        avg_dist = float(_np.mean([e['distance_px'] for e in self.events if e['type']=='pass'])) if total>0 else 0.0
        avg_dur = float(_np.mean([e['duration_s'] for e in self.events if e['type']=='pass'])) if total>0 else 0.0
        return {
            'passes_total': total,
            'passes_success': success,
            'passes_intercepted': intercepted,
            'pass_success_rate': success_rate,
            'avg_pass_distance_px': avg_dist,
            'avg_pass_duration_s': avg_dur
        }


def process_with_tracker_kmeans_pro_passing(video_path):
    cap = cv2.VideoCapture(str(video_path))
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out_h = int(height * (RESIZE_WIDTH/width))
    writer = cv2.VideoWriter(str(OUTPUT_DIR / f"annotated_{video_path.stem}_pro_passing.mp4"),
                             cv2.VideoWriter_fourcc(*'mp4v'), fps, (RESIZE_WIDTH, out_h))

    tracker = AdvancedKalmanBallTracker()
    team_colors = {'team_red': (0,0,255), 'team_blue': (255,0,0), 'ref': (255,255,255)}

    kmeans_team = bootstrap_team_clusters(frames=120)

    # State for metrics
    state = {
        'total_frames': 0,
        'frames_with_ball': 0,
        'touches': 0,
        'touch_times': [],
        'foot_counts': {'left': 0, 'right': 0},
        'possession_frames': {'team_red': 0, 'team_blue': 0, 'ref': 0},
        'fps': float(fps)
    }

    passing = PassingAnalyzer(
        fps=fps,
        own_dist_px=55,
        release_min_speed_pxpf=5,
        cone_deg=55,
        receive_dist_px=95,
        window_frames=45,
        min_pass_distance_px=80,
        min_air_frames=2,
    )

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        state['total_frames'] += 1
        fr = resize(frame)
        h, w = fr.shape[:2]

        pose_results, key_points = detect_pose_keypoints(fr)
        balls = enhanced_detect_ball(fr, prior_center=tracker.last_center, miss_count=tracker.misses)
        center = tracker.step(balls, fr)
        if center is not None:
            state['frames_with_ball'] += 1
            cv2.circle(fr, center, 10, (0,255,0), 2)
            pts = list(tracker.history)
            for i in range(1, len(pts)):
                cv2.line(fr, pts[i-1], pts[i], (0,255,0), 2)

        # Players + team
        players_with_team = []
        person_results = yolo(fr, conf=0.3, classes=[0], verbose=False)
        for r in person_results:
            if r.boxes is None:
                continue
            for b in r.boxes:
                x1, y1, x2, y2 = b.xyxy[0].cpu().numpy()
                conf = float(b.conf[0].cpu().numpy())
                if conf < 0.3:
                    continue
                bbox = (int(x1), int(y1), int(x2), int(y2))
                col = extract_jersey_color(fr, bbox)
                team = None
                if col is not None and kmeans_team is not None:
                    pred = int(kmeans_team.predict(np.array(col).reshape(1,-1))[0])
                    team = 'team_red' if pred == 0 else 'team_blue'
                if team is None:
                    team = classify_team_by_color(col) if col is not None else 'team_red'
                color = team_colors.get(team, (100,100,100))
                cv2.rectangle(fr, (bbox[0],bbox[1]), (bbox[2],bbox[3]), color, 2)
                tag = 'REF' if team=='ref' else ('T1' if team=='team_red' else 'T2')
                cv2.putText(fr, tag, (bbox[0]+3, bbox[1]-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
                xbc = int((bbox[0]+bbox[2])/2); ybc = int(bbox[3])
                players_with_team.append((xbc, ybc, team))

        # Possession nearest to ball
        if center is not None and players_with_team:
            cx, cy = center
            nearest = min(players_with_team, key=lambda p: (p[0]-cx)**2 + (p[1]-cy)**2)
            dist = ((nearest[0]-cx)**2 + (nearest[1]-cy)**2) ** 0.5
            if dist < 80:
                state['possession_frames'][nearest[2]] = state['possession_frames'].get(nearest[2], 0) + 1

        # Touches
        if center is not None and key_points:
            cx, cy = center
            for foot in ['left_foot','right_foot']:
                if foot in key_points:
                    fx, fy = int(key_points[foot][0]*w), int(key_points[foot][1]*h)
                    dist = ((cx - fx)**2 + (cy - fy)**2)**0.5
                    if dist < TOUCH_THRESHOLD:
                        t = state['total_frames'] / max(1.0, fps)
                        if not state['touch_times'] or (t - state['touch_times'][-1]) > 0.5:
                            state['touches'] += 1
                            state['touch_times'].append(t)
                            side = 'left' if foot.startswith('left') else 'right'
                            state['foot_counts'][side] += 1
                            cv2.circle(fr, (fx, fy), 8, (255,0,255), -1)

        # Passing step
        passing.step(state['total_frames'], center, players_with_team)

        # HUD
        fr = draw_hud(fr, state, team_colors)
        # render quick pass stats on HUD area bottom line
        summary = passing.summarize()
        cv2.putText(fr, f"Passes: {summary['passes_total']}  SR: {summary['pass_success_rate']*100:.0f}%  (Succ {summary['passes_success']}/Int {summary['passes_intercepted']})",
                    (12, 10 + int(fr.shape[0]*0.20) - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,255), 1)

        writer.write(fr)

    cap.release()
    writer.release()

    # Final summaries
    pass_summary = passing.summarize()
    with open(OUTPUT_DIR / f"summary_{video_path.stem}_pro_passing.json", 'w') as f:
        import json; json.dump(pass_summary, f, indent=2)
    print(f"Saved: {OUTPUT_DIR / f'annotated_{video_path.stem}_pro_passing.mp4'}")
    print(f"Saved: {OUTPUT_DIR / f'summary_{video_path.stem}_pro_passing.json'}")

# Run
process_with_tracker_kmeans_pro_passing(video_path)


Saved: output/annotated_Video-4_pro_passing.mp4
Saved: output/summary_Video-4_pro_passing.json


In [128]:
# Alternative: Trajectory-based Pass Analyzer (kick spike + straight segment + reception)

class TrajectoryPassAnalyzer:
    def __init__(self,
                 fps: float,
                 kick_speed_min: float = 7.0,   # px/frame to start a pass
                 start_straightness_deg: float = 35.0,  # avg angle change threshold
                 end_speed_max: float = 3.0,    # px/frame to consider ball slowed
                 receive_dist_px: int = 95,
                 min_travel_px: int = 80,
                 min_frames: int = 5,
                 history_len: int = 20):
        self.fps = max(1.0, float(fps))
        self.kick_speed_min = kick_speed_min
        self.start_straightness_deg = start_straightness_deg
        self.end_speed_max = end_speed_max
        self.receive_dist_px = receive_dist_px
        self.min_travel_px = min_travel_px
        self.min_frames = min_frames
        self.hist = deque(maxlen=history_len)   # (frame_idx, (x,y))
        self.speeds = deque(maxlen=history_len)
        self.in_pass = False
        self.segment = None  # dict with keys: start_frame, start_pos, points, max_speed, kick_team
        self.events = []

    @staticmethod
    def _dist(a, b):
        dx, dy = a[0]-b[0], a[1]-b[1]
        return (dx*dx + dy*dy) ** 0.5

    @staticmethod
    def _angle_deg(u, v):
        ux, uy = u; vx, vy = v
        nu = max(1e-6, (ux*ux + uy*uy) ** 0.5)
        nv = max(1e-6, (vx*vx + vy*vy) ** 0.5)
        cos = max(-1.0, min(1.0, (ux*vx + uy*vy) / (nu*nv)))
        return abs(np.degrees(np.arccos(cos)))

    def _avg_turn_deg(self, pts, last_k=6):
        if len(pts) < 3:
            return 0.0
        vecs = []
        for i in range(1, len(pts)):
            v = (pts[i][0]-pts[i-1][0], pts[i][1]-pts[i-1][1])
            vecs.append(v)
        vecs = vecs[-last_k:]
        if len(vecs) < 2:
            return 0.0
        angs = []
        for i in range(1, len(vecs)):
            angs.append(self._angle_deg(vecs[i-1], vecs[i]))
        return float(np.mean(angs)) if angs else 0.0

    def _nearest_player(self, ball_xy, players_with_team):
        if ball_xy is None or not players_with_team:
            return None
        bx, by = ball_xy
        nearest = min(players_with_team, key=lambda p: (p[0]-bx)**2 + (p[1]-by)**2)
        dist = self._dist(ball_xy, (nearest[0], nearest[1]))
        return (nearest, dist)

    def _speed_from_hist(self):
        if len(self.hist) < 2:
            return 0.0
        (f1, p1), (f2, p2) = self.hist[-2], self.hist[-1]
        dtf = max(1, f2 - f1)
        return self._dist(p1, p2) / dtf

    def step(self, frame_idx: int, ball_xy, players_with_team, key_points=None):
        # update histories
        if ball_xy is not None:
            self.hist.append((frame_idx, ball_xy))
            self.speeds.append(self._speed_from_hist())
        else:
            # break segment if we lose the ball for too long
            if self.in_pass and len(self.hist) >= 1:
                # allow brief loss; do nothing immediate
                pass

        # compute foot kick candidate
        kick_team = None
        if key_points and ball_xy is not None:
            bx, by = ball_xy
            for foot in ['left_foot','right_foot']:
                if foot in key_points:
                    fx, fy = int(key_points[foot][0]*1), int(key_points[foot][1]*1)  # normalized; will be rescaled by caller
                    # we use caller’s scaling, so skip here; kick_team decided via nearest player team
                    pass
        # We infer kick_team via nearest player at start

        # speed and straightness
        speed = self.speeds[-1] if len(self.speeds) >= 1 else 0.0
        pts = [p for _, p in self.hist]
        turn = self._avg_turn_deg(pts, last_k=6)

        # start condition
        if not self.in_pass and ball_xy is not None:
            # candidate if speed high enough AND trajectory straight enough over last few steps
            if speed >= self.kick_speed_min and turn <= self.start_straightness_deg:
                # assign team from nearest player to current ball
                nearest = self._nearest_player(ball_xy, players_with_team)
                if nearest is not None:
                    (px, py, team), d = nearest
                    # require not under close possession to avoid dribbles
                    if d >= 0.8 * self.receive_dist_px:
                        self.in_pass = True
                        self.segment = {
                            'start_frame': frame_idx,
                            'start_pos': ball_xy,
                            'points': [ball_xy],
                            'max_speed': speed,
                            'kick_team': team
                        }
        elif self.in_pass:
            # continue segment
            if ball_xy is not None:
                self.segment['points'].append(ball_xy)
                self.segment['max_speed'] = max(self.segment['max_speed'], speed)
            # check reception end condition
            end = False
            recv_team = None
            if ball_xy is not None:
                nearest = self._nearest_player(ball_xy, players_with_team)
                if nearest is not None:
                    (px, py, team), d = nearest
                    recv_team = team if d <= self.receive_dist_px else None
            traveled = self._dist(self.segment['start_pos'], ball_xy) if (ball_xy is not None) else 0.0
            duration_frames = frame_idx - self.segment['start_frame']
            straight_enough = (self._avg_turn_deg(self.segment['points'], last_k=6) <= max(20.0, self.start_straightness_deg))
            slowed = speed <= self.end_speed_max
            if duration_frames >= self.min_frames and traveled >= self.min_travel_px and straight_enough and (recv_team is not None) and slowed:
                end = True
            # timeout or invalid path
            if duration_frames > int(self.fps*3):
                # too long, cancel
                self.in_pass = False
                self.segment = None
                return
            if end:
                # record event
                self.events.append({
                    'type': 'pass',
                    'start_frame': self.segment['start_frame'],
                    'end_frame': frame_idx,
                    'team': self.segment['kick_team'],
                    'result': 'success' if recv_team == self.segment['kick_team'] else 'intercepted',
                    'distance_px': traveled,
                    'duration_s': duration_frames / self.fps
                })
                # reset
                self.in_pass = False
                self.segment = None

    def summarize(self):
        total = len([e for e in self.events if e['type'] == 'pass'])
        success = len([e for e in self.events if e['type'] == 'pass' and e['result'] == 'success'])
        intercepted = len([e for e in self.events if e['type'] == 'pass' and e['result'] == 'intercepted'])
        sr = (success / total) if total > 0 else 0.0
        avg_dist = float(np.mean([e['distance_px'] for e in self.events if e['type']=='pass'])) if total>0 else 0.0
        avg_dur = float(np.mean([e['duration_s'] for e in self.events if e['type']=='pass'])) if total>0 else 0.0
        return {
            'passes_total': total,
            'passes_success': success,
            'passes_intercepted': intercepted,
            'pass_success_rate': sr,
            'avg_pass_distance_px': avg_dist,
            'avg_pass_duration_s': avg_dur
        }


def process_with_tracker_passing_traj(video_path):
    cap = cv2.VideoCapture(str(video_path))
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out_h = int(height * (RESIZE_WIDTH/width))
    writer = cv2.VideoWriter(str(OUTPUT_DIR / f"annotated_{video_path.stem}_pro_passing_traj.mp4"),
                             cv2.VideoWriter_fourcc(*'mp4v'), fps, (RESIZE_WIDTH, out_h))

    tracker = AdvancedKalmanBallTracker()
    team_colors = {'team_red': (0,0,255), 'team_blue': (255,0,0), 'ref': (255,255,255)}
    kmeans_team = bootstrap_team_clusters(frames=120)

    # HUD state
    state = {
        'total_frames': 0,
        'frames_with_ball': 0,
        'touches': 0,
        'touch_times': [],
        'foot_counts': {'left': 0, 'right': 0},
        'possession_frames': {'team_red': 0, 'team_blue': 0, 'ref': 0},
        'fps': float(fps)
    }

    analyzer = TrajectoryPassAnalyzer(fps=fps)

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        state['total_frames'] += 1
        fr = resize(frame)
        h, w = fr.shape[:2]

        pose_results, key_points = detect_pose_keypoints(fr)
        balls = enhanced_detect_ball(fr, prior_center=tracker.last_center, miss_count=tracker.misses)
        center = tracker.step(balls, fr)
        if center is not None:
            state['frames_with_ball'] += 1
            cv2.circle(fr, center, 10, (0,255,0), 2)
            pts = list(tracker.history)
            for i in range(1, len(pts)):
                cv2.line(fr, pts[i-1], pts[i], (0,255,0), 2)

        # Players + team
        players_with_team = []
        person_results = yolo(fr, conf=0.3, classes=[0], verbose=False)
        for r in person_results:
            if r.boxes is None:
                continue
            for b in r.boxes:
                x1, y1, x2, y2 = b.xyxy[0].cpu().numpy()
                conf = float(b.conf[0].cpu().numpy())
                if conf < 0.3:
                    continue
                bbox = (int(x1), int(y1), int(x2), int(y2))
                col = extract_jersey_color(fr, bbox)
                team = None
                if col is not None and kmeans_team is not None:
                    pred = int(kmeans_team.predict(np.array(col).reshape(1,-1))[0])
                    team = 'team_red' if pred == 0 else 'team_blue'
                if team is None:
                    team = classify_team_by_color(col) if col is not None else 'team_red'
                color = team_colors.get(team, (100,100,100))
                cv2.rectangle(fr, (bbox[0],bbox[1]), (bbox[2],bbox[3]), color, 2)
                tag = 'REF' if team=='ref' else ('T1' if team=='team_red' else 'T2')
                cv2.putText(fr, tag, (bbox[0]+3, bbox[1]-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
                xbc = int((bbox[0]+bbox[2])/2); ybc = int(bbox[3])
                players_with_team.append((xbc, ybc, team))

        # Touches (for HUD only)
        if center is not None and key_points:
            cx, cy = center
            for foot in ['left_foot','right_foot']:
                if foot in key_points:
                    fx, fy = int(key_points[foot][0]*w), int(key_points[foot][1]*h)
                    dist = ((cx - fx)**2 + (cy - fy)**2)**0.5
                    if dist < TOUCH_THRESHOLD:
                        t = state['total_frames'] / max(1.0, fps)
                        if not state['touch_times'] or (t - state['touch_times'][-1]) > 0.5:
                            state['touches'] += 1
                            state['touch_times'].append(t)
                            side = 'left' if foot.startswith('left') else 'right'
                            state['foot_counts'][side] += 1
                            cv2.circle(fr, (fx, fy), 8, (255,0,255), -1)

        # Trajectory-based pass step
        analyzer.step(state['total_frames'], center, players_with_team, key_points)
        pass_summary = analyzer.summarize()
        cv2.putText(fr, f"Passes(traj): {pass_summary['passes_total']}  SR: {pass_summary['pass_success_rate']*100:.0f}%",
                    (12, 10 + int(fr.shape[0]*0.20) - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,255), 1)

        # HUD
        fr = draw_hud(fr, state, team_colors)
        writer.write(fr)

    cap.release()
    writer.release()

    with open(OUTPUT_DIR / f"summary_{video_path.stem}_pro_passing_traj.json", 'w') as f:
        import json; json.dump(analyzer.summarize(), f, indent=2)
    print(f"Saved: {OUTPUT_DIR / f'annotated_{video_path.stem}_pro_passing_traj.mp4'}")
    print(f"Saved: {OUTPUT_DIR / f'summary_{video_path.stem}_pro_passing_traj.json'}")

# Run alternative pass detector
process_with_tracker_passing_traj(video_path)


Saved: output/annotated_Video-4_pro_passing_traj.mp4
Saved: output/summary_Video-4_pro_passing_traj.json
