In [2]:
import numpy as np
import pandas as pd
%pip install ultralytics
import cv2
import os
from ultralytics import YOLO

Collecting ultralytics
  Downloading ultralytics-8.4.13-py3-none-any.whl.metadata (39 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.4.13-py3-none-any.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m18.7 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.4.13 ultralytics-thop-2.0.18
Note: you may need to restart the kernel to use updated packages.
Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [None]:
video_path = '/kaggle/input/vollyball/volleyball_match.mp4'
output_name = '/kaggle/working/testimage.jpg'


cap = cv2.VideoCapture(video_path)


fps = cap.get(cv2.CAP_PROP_FPS)
timestamp_sec = 5 
frame_id = int(fps * timestamp_sec)


cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id)


success, frame = cap.read()

if success:
    cv2.imwrite(output_name, frame)
    print(f"✅ Success! Frame at {timestamp_sec}s saved as {output_name}")
else:
    print("❌ Error: Could not read the frame.")


cap.release()

In [None]:
model=YOLO("yolo11n.pt")
result=model.predict("/kaggle/working/output_test1.mp4",save=True)
print("boxes:")
for box in result[0].boxes:
    print(box)

In [None]:
!ffmpeg -i /kaggle/input/vollyball/volleyball_match.mp4 -t 10 -c copy /kaggle/working/output_test1.mp4


# GET DATASET

In [None]:
!pip install roboflow
from roboflow import Roboflow
rf = Roboflow(api_key="0yoIOta3LExHmhd2sksW") 
project = rf.workspace("volleyball-ai-detection").project("ai-volley-detection")
version = project.version(1)

# Specify the desired folder name (e.g., "my_volleyball_dataset")
dataset_folder_name = "my_volleyball_dataset_new"

# Download the dataset to the specified location
dataset = version.download(model_format="yolov5", location=dataset_folder_name)

In [None]:
import shutil
import os
shutil.move("/kaggle/working/my_volleyball_dataset_new/train","/kaggle/working/my_volleyball_dataset_new/my_volleyball_dataset_new/train")
shutil.move("/kaggle/working/my_volleyball_dataset_new/test","/kaggle/working/my_volleyball_dataset_new/my_volleyball_dataset_new/test")
shutil.move("/kaggle/working/my_volleyball_dataset_new/valid","/kaggle/working/my_volleyball_dataset_new/my_volleyball_dataset_new/valid")

In [None]:
!yolo task=detect mode=train model=yolov5l6u.pt data={dataset.location}/data.yaml epochs=50 imgsz=640 

In [None]:
!zip -r ballnew1.zip /kaggle/working/runs/detect/train5


In [None]:
from IPython.display import FileLink
FileLink(r'/kaggle/working/ball_tracked_toggles.mp4')

In [None]:
mo=YOLO("/kaggle/working/runs/detect/train5/weights/best.pt")
result=mo.predict("/kaggle/working/output_test1.mp4",save=True)
print("boxes:")
for box in result[0].boxes:
    print(box)

In [None]:
import cv2, numpy as np, math, csv, sys
from ultralytics import YOLO

# ================= CONFIG =================
video_path = "/kaggle/working/output_test1.mp4"
model_path = "/kaggle/working/runs/detect/train5/weights/last.pt"   # update if needed

out_video = "/kaggle/working/fixed2.mp4"
trajectory_csv = "/kaggle/working/ball_trajectory_fixed1.csv"

EXPECTED_DIAMETER = 35
EXPECTED_RADIUS = EXPECTED_DIAMETER / 2

ASSOC_DIST_GATE = 80
MAX_MISSED = 12
MIN_CONF = 0.2
# ========================================

# load model
ball_model = YOLO(model_path)

# -------- Kalman helper (all float32) --------
def create_kf():
    # Ensure type float32 everywhere
    kf = cv2.KalmanFilter(4, 2, 0, cv2.CV_32F)

    kf.transitionMatrix = np.array([
        [1,0,1,0],
        [0,1,0,1],
        [0,0,1,0],
        [0,0,0,1]
    ], dtype=np.float32)

    kf.measurementMatrix = np.array([
        [1,0,0,0],
        [0,1,0,0]
    ], dtype=np.float32)

    kf.processNoiseCov = (np.eye(4, dtype=np.float32) * 0.3).astype(np.float32)
    kf.measurementNoiseCov = (np.eye(2, dtype=np.float32) * 0.1).astype(np.float32)
    kf.errorCovPost = (np.eye(4, dtype=np.float32) * 5.0).astype(np.float32)

    # initialize statePost as zeros float32
    kf.statePost = np.zeros((4,1), dtype=np.float32)
    return kf

kf = create_kf()
have_kf = False
missed = 0

# -------- YOLO detector (robust extraction) --------
def detect_ball(frame):
    # Ultralytics returns a Results object; boxes may be tensors.
    # We convert safely to python floats / ints.
    results = ball_model.predict(frame, conf=MIN_CONF, verbose=False)
    dets = []
    for r in results:
        # r.boxes.xyxy might be a tensor or numpy array
        try:
            # convert to numpy for stable indexing
            boxes = r.boxes.xyxy.cpu().numpy()  # shape (N,4)
            confs = r.boxes.conf.cpu().numpy()  # shape (N,)
        except Exception:
            # fallback: iterate boxes object
            boxes = []
            confs = []
            for b in r.boxes:
                # each b.xyxy might be small array-like
                vals = b.xyxy[0].tolist() if hasattr(b.xyxy[0], 'tolist') else list(map(float, b.xyxy[0]))
                boxes.append(vals)
                confs.append(float(b.conf[0].item() if hasattr(b.conf[0], 'item') else b.conf[0]))
            boxes = np.array(boxes) if len(boxes) else np.zeros((0,4))
            confs = np.array(confs) if len(confs) else np.zeros((0,))

        for i in range(len(boxes)):
            x1,y1,x2,y2 = boxes[i].tolist()
            conf = float(confs[i].item() if hasattr(confs[i], 'item') else confs[i])
            # cast to ints for bbox usage
            dets.append((int(round(x1)), int(round(y1)), int(round(x2)), int(round(y2)), conf))
    return dets

# -------- Helpers --------
def bbox_center(b):
    x1,y1,x2,y2,_ = b
    cx = (x1 + x2)/2.0
    cy = (y1 + y2)/2.0
    # approximate radius from box size; keep float
    r = ((x2-x1) + (y2-y1)) / 4.0
    return cx, cy, r

# -------- Video IO --------
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) or 25
ret, frame = cap.read()
if not ret:
    print("ERROR: Cannot read video:", video_path, file=sys.stderr)
    raise SystemExit(1)
h,w = frame.shape[:2]

writer = cv2.VideoWriter(out_video,
                         cv2.VideoWriter_fourcc(*"mp4v"),
                         fps,(w,h))

cap.set(cv2.CAP_PROP_POS_FRAMES,0)

trajectory = []
frame_idx = 0

#====================================================
last_det = None        # (x,y)
last_det_frame = None
# ================= MAIN LOOP =================
while True:
    ret, frame = cap.read()
    if not ret:
        break

    detections = detect_ball(frame)  # list of (x1,y1,x2,y2,conf)

    candidates = []
    for d in detections:
        cx,cy,r = bbox_center(d)
        candidates.append((cx,cy,r))

    # Predict first (for gating)
    pred = None
    if have_kf:
        p = kf.predict()  # returns float32 Mat (4x1)
        pred = (float(p[0,0]), float(p[1,0]))

    chosen = None

    if pred is not None and len(candidates) > 0:
        best = 1e9
        for c in candidates:
            dscore = math.hypot(c[0]-pred[0], c[1]-pred[1])
            rdiff = abs(c[2]-EXPECTED_RADIUS)
            score = dscore + 0.8 * rdiff
            if score < best:
                best = score
                chosen = c
        if best > ASSOC_DIST_GATE:
            chosen = None

    elif pred is None and len(candidates) > 0:
        # no prior state -> pick detection closest in radius to expected
        chosen = min(candidates, key=lambda x: abs(x[2]-EXPECTED_RADIUS))
    # =============================================================
    if chosen is not None:
        # chosen = (cx, cy, r)
        det_x, det_y = float(chosen[0]), float(chosen[1])
    
        # Use detection for output (no smoothing lag)
        cur_x, cur_y = det_x, det_y
        detected = 1
        missed = 0
    
        # Update/initialize Kalman state so future predictions are consistent:
        if not have_kf:
            # initialize statePost: x,y,vx,vy
            kf.statePost = np.array([[np.float32(det_x)],
                                     [np.float32(det_y)],
                                     [np.float32(0.0)],
                                     [np.float32(0.0)]], dtype=np.float32)
            have_kf = True
        else:
            # Optional: estimate velocity from last detection
            if last_det is not None and last_det_frame is not None:
                dt = max(1e-6, (frame_idx - last_det_frame) / float(fps))
                vx = (det_x - last_det[0]) / dt
                vy = (det_y - last_det[1]) / dt
                # Overwrite entire statePost (trust detector + estimated velocity)
                kf.statePost = np.array([[np.float32(det_x)],
                                         [np.float32(det_y)],
                                         [np.float32(vx)],
                                         [np.float32(vy)]], dtype=np.float32)
            else:
                # if no previous detection, just correct normally
                meas = np.array([[np.float32(det_x)], [np.float32(det_y)]], dtype=np.float32)
                kf.correct(meas)
    
        # save last detection
        last_det = (det_x, det_y)
        last_det_frame = frame_idx
    
    else:
        # no detection: use Kalman prediction (predict already called earlier)
        if have_kf:
            p = kf.predict()
            cur_x, cur_y = float(p[0,0]), float(p[1,0])
        else:
            cur_x, cur_y = -1, -1
        detected = 0
        missed += 1
    
    # reset if many misses (same as before)
    if missed > MAX_MISSED:
        have_kf = False
        kf = create_kf()
        missed = 0
        last_det = None
        last_det_frame = None
    # ======================================================
    detected = 0
    cur_x, cur_y = -1.0, -1.0

    if chosen is not None:
        det_x, det_y, det_r = float(chosen[0]), float(chosen[1]), float(chosen[2])
        meas = np.array([[np.float32(det_x)], [np.float32(det_y)]], dtype=np.float32)

        if not have_kf:
            # initialize statePost and keep dtype float32
            kf.statePost = np.array([[np.float32(det_x)], [np.float32(det_y)], [np.float32(0.0)], [np.float32(0.0)]], dtype=np.float32)
            # keep some uncertainty
            kf.errorCovPost = np.eye(4, dtype=np.float32) * 5.0
            have_kf = True
        else:
            # optional: snap to measurement when jump is huge
            if pred is not None:
                jump = math.hypot(det_x - pred[0], det_y - pred[1])
                if jump > 40.0:  # snap threshold (tune)
                    kf.statePost = np.array([[np.float32(det_x)], [np.float32(det_y)], [np.float32(0.0)], [np.float32(0.0)]], dtype=np.float32)
                else:
                    kf.correct(meas)
            else:
                kf.correct(meas)

        # read corrected state (statePost is float32 4x1)
        cur_x = float(kf.statePost[0,0])
        cur_y = float(kf.statePost[1,0])
        detected = 1
        missed = 0

    else:
        # no detection chosen -> rely on prediction if present
        if pred is not None:
            cur_x, cur_y = pred
            missed += 1
        else:
            cur_x, cur_y = -1.0, -1.0
            missed += 1

    # reset if too many misses
    if missed > MAX_MISSED:
        kf = create_kf()
        have_kf = False
        missed = 0

    # ---------- Visualization ----------
    vis = frame.copy()

    # green detections
    for c in candidates:
        cv2.circle(vis, (int(round(c[0])), int(round(c[1]))), int(round(EXPECTED_RADIUS)), (0,255,0), 2)

    # red kalman
    if cur_x >= 0:
        cv2.circle(vis, (int(round(cur_x)), int(round(cur_y))), int(round(EXPECTED_RADIUS)), (0,0,255), 2)

    writer.write(vis)
    trajectory.append((frame_idx, cur_x, cur_y, detected))
    frame_idx += 1

cap.release()
writer.release()

with open(trajectory_csv,"w",newline="") as f:
    w = csv.writer(f)
    w.writerow(["frame","x","y","detected"])
    for r in trajectory:
        w.writerow(r)

print("DONE")
print("Video:", out_video)
print("CSV:", trajectory_csv)


In [None]:
import cv2
import numpy as np
import math
import csv
import sys
from ultralytics import YOLO
from pathlib import Path

# ---------------- USER CONFIG (edit) ----------------
video_path = "/kaggle/working/output_test1.mp4"
model_path = "/kaggle/working/runs/detect/train5/weights/last.pt"

out_video = "/kaggle/working/ball_tracked_toggles.mp4"
trajectory_csv = "/kaggle/working/ball_trajectory_toggles.csv"

TRACKER_TYPE = "CSRT"        # "CSRT", "KCF", "MOSSE"
MIN_CONF = 0.2
EXPECTED_DIAMETER = 35.0
EXPECTED_RADIUS = EXPECTED_DIAMETER / 2.0

DETECT_EVERY_N_FRAMES = 1    # run YOLO every N frames (1 = every frame)

# Toggles you requested
PREFER_DETECTION = True      # if True, if YOLO detects, use its center as final
APPLY_SMOOTHING = False       # exponential smoothing (EMA) on final reported point
APPLY_JUMP_REJECTION = True  # reject impossible jumps

# smoothing / jump params
SMOOTH_ALPHA = 0.75          # EMA alpha: higher => follow new value more
MAX_JUMP_PX = 120.0          # if proposed jump > this, reject (pixels)

# reinit thresholds for tracker
REINIT_DIST_THRESH = 60.0
REINIT_IOU_THRESH = 0.3

# ---------------------------------------------------

# load YOLO
ball_model = YOLO(model_path)

def detect_ball(frame, min_conf=MIN_CONF):
    results = ball_model.predict(frame, conf=min_conf, verbose=False)
    dets = []
    for r in results:
        try:
            boxes = r.boxes.xyxy.cpu().numpy()
            confs = r.boxes.conf.cpu().numpy()
        except Exception:
            boxes = []
            confs = []
            for b in r.boxes:
                try:
                    vals = b.xyxy[0].tolist()
                except Exception:
                    vals = [float(x) for x in b.xyxy[0]]
                boxes.append(vals)
                try:
                    confs.append(float(b.conf[0].item()))
                except Exception:
                    confs.append(float(b.conf[0]))
            boxes = np.array(boxes) if len(boxes) else np.zeros((0,4))
            confs = np.array(confs) if len(confs) else np.zeros((0,))
        for i in range(len(boxes)):
            x1,y1,x2,y2 = boxes[i].tolist()
            conf = float(confs[i].item() if hasattr(confs[i], "item") else confs[i])
            dets.append((int(round(x1)), int(round(y1)), int(round(x2)), int(round(y2)), conf))
    return dets

def bbox_center_radius(bbox):
    x1,y1,x2,y2 = bbox
    cx = (x1 + x2) / 2.0
    cy = (y1 + y2) / 2.0
    r = ((x2 - x1) + (y2 - y1)) / 4.0
    return cx, cy, r

def iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interW = max(0, xB - xA)
    interH = max(0, yB - yA)
    interArea = interW * interH
    areaA = (boxA[2]-boxA[0])*(boxA[3]-boxA[1])
    areaB = (boxB[2]-boxB[0])*(boxB[3]-boxB[1])
    if areaA + areaB - interArea <= 0:
        return 0.0
    return interArea / float(areaA + areaB - interArea)

def make_tracker(tracker_type="CSRT"):
    # try common APIs and legacy fallbacks
    try:
        if tracker_type == "CSRT":
            return cv2.TrackerCSRT_create()
    except Exception:
        try:
            return cv2.legacy.TrackerCSRT_create()
        except Exception:
            pass
    try:
        if tracker_type == "KCF":
            return cv2.TrackerKCF_create()
        if tracker_type == "MOSSE":
            return cv2.TrackerMOSSE_create()
    except Exception:
        pass
    try:
        if tracker_type == "KCF":
            return cv2.legacy.TrackerKCF_create()
        if tracker_type == "MOSSE":
            return cv2.legacy.TrackerMOSSE_create()
    except Exception:
        pass
    raise RuntimeError("No OpenCV tracker available on this build.")

# Video setup
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise RuntimeError("Cannot open video: " + str(video_path))
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
ret, frame = cap.read()
if not ret:
    raise RuntimeError("Cannot read first frame")
H, W = frame.shape[:2]
writer = cv2.VideoWriter(out_video, cv2.VideoWriter_fourcc(*"mp4v"), fps, (W, H))
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

# tracking state
tracker = None
tracker_bbox = None   # (x1,y1,x2,y2)
track_started_frame = None

trajectory = []  # (frame, x, y, used_detection_flag)
frame_idx = 0

# statistics
stats = {"frames":0, "detections_used":0, "tracker_only":0, "reinit_count":0, "jumps_rejected":0, "smoothed_frames":0}

# last final reported point (for smoothing/jump checks)
prev_final = None

while True:
    ret, frame = cap.read()
    if not ret:
        break
    stats["frames"] += 1

    detections = []
    if (frame_idx % max(1, DETECT_EVERY_N_FRAMES)) == 0:
        detections = detect_ball(frame, min_conf=MIN_CONF)

    # choose a detection candidate if any
    chosen_det = None
    if len(detections) > 0:
        if tracker_bbox is not None:
            tx_cx, tx_cy, tx_r = bbox_center_radius(tracker_bbox)
            best_score = 1e9; best = None
            for d in detections:
                d_cx, d_cy, d_r = bbox_center_radius(d[:4])
                score = math.hypot(d_cx - tx_cx, d_cy - tx_cy) - (d[4] * 10.0)
                if score < best_score:
                    best_score = score; best = d
            chosen_det = best
        else:
            chosen_det = max(detections, key=lambda x: x[4])  # highest confidence

    detected_flag = 0
    proposed_x = -1.0
    proposed_y = -1.0

    # update tracker if present
    if tracker is not None:
        ok, tb = tracker.update(frame)
        if ok:
            x,y,w_box,h_box = tb
            x1 = int(round(x)); y1 = int(round(y)); x2 = int(round(x + w_box)); y2 = int(round(y + h_box))
            tracker_bbox = (x1,y1,x2,y2)
        else:
            tracker = None
            tracker_bbox = None

    # If detection present, decide whether to reinit or use detection
    if chosen_det is not None:
        dx1,dy1,dx2,dy2,dconf = chosen_det
        d_cx, d_cy, d_r = bbox_center_radius((dx1,dy1,dx2,dy2))
        detected_flag = 1

        need_reinit = False
        if tracker_bbox is None:
            need_reinit = True
        else:
            t_cx, t_cy, t_r = bbox_center_radius(tracker_bbox)
            dist = math.hypot(t_cx - d_cx, t_cy - d_cy)
            box_iou = iou(tracker_bbox, (dx1,dy1,dx2,dy2))
            if (dconf > 0.4 and (dist > REINIT_DIST_THRESH or box_iou < REINIT_IOU_THRESH)) or (dconf > 0.85 and box_iou < 0.6):
                need_reinit = True

        if need_reinit:
            # reinit tracker on detection
            try:
                tracker = make_tracker(TRACKER_TYPE)
                bb = (dx1, dy1, dx2 - dx1, dy2 - dy1)
                tracker.init(frame, bb)
                tracker_bbox = (dx1,dy1,dx2,dy2)
                stats["reinit_count"] += 1
            except Exception:
                tracker = None
                tracker_bbox = (dx1,dy1,dx2,dy2)
            final_x, final_y = d_cx, d_cy
            stats["detections_used"] += 1
            proposed_x, proposed_y = final_x, final_y
        else:
            # no reinit required
            if PREFER_DETECTION:
                # prefer detection as authoritative for reporting (no smoothing here yet)
                final_x, final_y = d_cx, d_cy
                stats["detections_used"] += 1
                proposed_x, proposed_y = final_x, final_y
            else:
                # rely on tracker output if available; detection used only to correct tracker occasionally
                if tracker_bbox is not None:
                    t_cx, t_cy, t_r = bbox_center_radius(tracker_bbox)
                    proposed_x, proposed_y = t_cx, t_cy
                    stats["tracker_only"] += 1
                else:
                    proposed_x, proposed_y = d_cx, d_cy
                    stats["detections_used"] += 1
    else:
        # no detection candidate this frame
        if tracker_bbox is not None:
            t_cx, t_cy, t_r = bbox_center_radius(tracker_bbox)
            proposed_x, proposed_y = t_cx, t_cy
            stats["tracker_only"] += 1
        else:
            proposed_x, proposed_y = -1.0, -1.0

    # Apply jump rejection and smoothing on proposed_x,proposed_y
    final_x, final_y = proposed_x, proposed_y

    if final_x >= 0:
        # if previous exists, check jump
        if prev_final is not None and prev_final[0] >= 0:
            prev_x, prev_y = prev_final
            # jump rejection
            if APPLY_JUMP_REJECTION:
                jump = math.hypot(final_x - prev_x, final_y - prev_y)
                if jump > MAX_JUMP_PX:
                    stats["jumps_rejected"] += 1
                    # reject jump: keep previous final
                    final_x, final_y = prev_x, prev_y
            # smoothing (EMA)
            if APPLY_SMOOTHING:
                sm_x = SMOOTH_ALPHA * final_x + (1.0 - SMOOTH_ALPHA) * prev_x
                sm_y = SMOOTH_ALPHA * final_y + (1.0 - SMOOTH_ALPHA) * prev_y
                final_x, final_y = sm_x, sm_y
                stats["smoothed_frames"] += 1
        # else no previous -> no smoothing/jump check

    # record final
    trajectory.append((frame_idx, float(final_x), float(final_y), int(detected_flag)))
    prev_final = (final_x, final_y)

    # visualization
    vis = frame.copy()
    # show detector boxes (green)
    for d in detections:
        x1,y1,x2,y2,conf = d
        cv2.rectangle(vis, (x1,y1), (x2,y2), (0,200,0), 1)
    # show tracker bbox (blue) for debugging
    if tracker_bbox is not None:
        tx1,ty1,tx2,ty2 = tracker_bbox
        cv2.rectangle(vis, (tx1,ty1), (tx2,ty2), (255,160,0), 2)
    # draw final reported point (red)
    if final_x >= 0:
        cv2.circle(vis, (int(round(final_x)), int(round(final_y))), int(round(EXPECTED_RADIUS)), (0,0,255), 2)
    cv2.putText(vis, f"Frame {frame_idx}", (10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)

    writer.write(vis)
    frame_idx += 1

# cleanup
cap.release()
writer.release()

# write CSV
with open(trajectory_csv, "w", newline="") as fh:
    w = csv.writer(fh)
    w.writerow(["frame","x","y","detected"])
    for r in trajectory:
        w.writerow(r)

# print summary stats
print("Done. output:", out_video)
print("trajectory:", trajectory_csv)
print("Frames:", stats['frames'],
      "Detections used:", stats['detections_used'],
      "Tracker-only frames:", stats['tracker_only'],
      "Reinits:", stats['reinit_count'],
      "Jumps rejected:", stats['jumps_rejected'],
      "Smoothed frames:", stats['smoothed_frames'])


In [None]:
import cv2
import numpy as np

video_path = "/kaggle/working/output_test1.mp4"   # change if needed
warp_out = "/kaggle/working/warped_example.png"

# ---------- Load first frame ----------
cap = cv2.VideoCapture(video_path)
ret, frame = cap.read()
cap.release()

if not ret:
    raise RuntimeError("Could not read video")

display = frame.copy()
points = []

# ---------- Mouse callback ----------
def click_event(event, x, y, flags, param):
    global points, display
    if event == cv2.EVENT_LBUTTONDOWN:
        points.append([x,y])
        cv2.circle(display, (x,y), 8, (0,0,255), -1)
        cv2.putText(display, str(len(points)), (x+5,y-5),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0),2)
        cv2.imshow("Select 4 court corners", display)

# ---------- Show frame & collect clicks ----------
cv2.imshow("Select 4 court corners", display)
cv2.setMouseCallback("Select 4 court corners", click_event)

print("Click 4 court corners in order:")
print("1 = top-left")
print("2 = top-right")
print("3 = bottom-right")
print("4 = bottom-left")
print("Press ESC when done.")

while True:
    cv2.imshow("Select 4 court corners", display)
    if cv2.waitKey(20) & 0xFF == 27:
        break

cv2.destroyAllWindows()

if len(points) != 4:
    raise RuntimeError("You must click exactly 4 points")

src_pts = np.array(points, dtype=np.float32)

# ---------- Define top-down destination ----------
# You can change these dimensions
COURT_W = 800
COURT_H = 400

dst_pts = np.array([
    [0,0],
    [COURT_W,0],
    [COURT_W,COURT_H],
    [0,COURT_H]
], dtype=np.float32)

# ---------- Compute homography ----------
H, status = cv2.findHomography(src_pts, dst_pts)

print("Homography matrix:\n", H)

# ---------- Warp image ----------
warped = cv2.warpPerspective(frame, H, (COURT_W, COURT_H))

cv2.imwrite(warp_out, warped)

print("Saved warped image to:", warp_out)

cv2.imshow("Warped court (top-down)", warped)
cv2.waitKey(0)
cv2.destroyAllWindows()


In [None]:
print("hi")

In [1]:
# ByteTrack (Ultralytics.track) with persistent per-track team assignment by cumulative time
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict, deque, Counter
import csv, time, math, sys

# ============== USER CONFIG ==============
video_path = "/kaggle/input/tensecondstest/output_video_10s.mp4"
model_path = "yolo11n.pt"    # or your trained person model
out_video = "/kaggle/working/output/last.avi"
out_csv   = "/kaggle/working/player_counts_persistent.csv"


# Court polygon and net (paste your values)
COURT_POLY = [
    (313, 292),
    (996, 289),
    (1198, 708),
    (125, 707),
]
NET_Y = 453
last_positions = {}

# detection/tracking params
CONF_THRES = 0.25

# persistent-assignment tuning:
MIN_FRAMES_FOR_ASSIGNMENT = 5    # need at least this many frames for an initial stable assignment

TRACK_TTL_FRAMES = 150           # keep dead track info for this many frames before forgetting
# =========================================

model = YOLO(model_path)
court_poly_np = np.array(COURT_POLY, dtype=np.int32)

# per-track structures
track_counts = defaultdict(lambda: {1:0, 2:0})   # cumulative counts of frames on each side
track_last_seen = dict()                         # last frame index this track was observed
track_assigned = dict()                          # current assigned team for each track
track_total_seen = defaultdict(int)              # total frames seen (on-court)
track_bbox_last = dict()                         # last bbox for debugging/ttl
track_ttl = dict()                               # frame index when track was last seen

# output structures
csv_rows = []
frame_idx = 0
start_t = time.time()

# streaming track (Ultralytics uses ByteTrack internally)
stream = model.track(source=video_path, conf=CONF_THRES, classes=[0], stream=True, persist=False)

def find_nearest_existing(px, py, last_positions, max_dist=80):
    best = None
    best_d = 1e9
    for tid,(x,y) in last_positions.items():
        d = math.hypot(px-x, py-y)
        if d < best_d and d < max_dist:
            best_d = d
            best = tid
    return best


for res in stream:
    # robustly extract frame
    frame = None
    try:
        frame = res.orig_img
    except Exception:
        try:
            imgs = getattr(res, "imgs", None)
            if imgs is not None and len(imgs) > 0:
                frame = imgs[0]
        except Exception:
            frame = None
    if frame is None:
        try:
            frame = res.plot()
        except Exception:
            print("Cannot extract frame from result object; aborting", file=sys.stderr)
            break

    vis = frame.copy()
    H, W = frame.shape[:2]
    cv2.polylines(vis, [court_poly_np], True, (255,255,0), 2)
    cv2.line(vis, (0, NET_Y), (W, NET_Y), (0,0,255), 2)

    team1_count = 0
    team2_count = 0
    observed_ids = set()

    boxes = getattr(res, "boxes", None)
    if boxes is None:
        # write frame with zeros and continue
        csv_rows.append((frame_idx, 0, 0))
        frame_idx += 1
        continue

    # robust extraction of xyxy, conf, id
    try:
        xyxy = boxes.xyxy.cpu().numpy()
        confs = boxes.conf.cpu().numpy()
        # try id arrays
        try:
            ids_arr = boxes.id.cpu().numpy()
        except Exception:
            ids_arr = None
    except Exception:
        # fallback to iterating boxes
        xyxy, confs, ids_arr = [], [], []
        for b in boxes:
            try:
                coords = b.xyxy[0].cpu().numpy()
            except Exception:
                coords = np.array(b.xyxy[0].tolist())
            xyxy.append(coords)
            try:
                confs.append(float(b.conf[0].item()))
            except Exception:
                confs.append(float(b.conf[0]))
            # id attempt
            iid = None
            if hasattr(b, "id"):
                try:
                    iid = int(b.id[0].item())
                except Exception:
                    try:
                        iid = int(b.id[0])
                    except Exception:
                        iid = None
            ids_arr.append(iid)
        xyxy = np.array(xyxy) if len(xyxy) else np.zeros((0,4))
        confs = np.array(confs) if len(confs) else np.zeros((0,))

    N = xyxy.shape[0]
    for i in range(N):
        x1,y1,x2,y2 = xyxy[i].astype(int).tolist()
        conf = float(confs[i])
        # extract id robustly
        track_id = None
        try:
            if ids_arr is not None:
                val = ids_arr[i]
                if hasattr(val, "item"):
                    track_id = int(val.item())
                else:
                    track_id = int(val)
        except Exception:
            track_id = None
        if track_id is None:
            # try per-box attribute
            try:
                b = boxes[i]
                if hasattr(b, "id"):
                    v = b.id
                    track_id = int(v.item()) if hasattr(v, "item") else int(v)
            except Exception:
                track_id = None
        # fallback temporary negative id
        if track_id is None:
            track_id = - (i + 1)

        observed_ids.add(track_id)
        track_last_seen[track_id] = frame_idx
        track_bbox_last[track_id] = (x1,y1,x2,y2)
        track_ttl[track_id] = frame_idx

        # foot point
        foot_x = int((x1 + x2) / 2)
        foot_y = int(y2)
        foot_pt = (foot_x, foot_y)

        last_positions[track_id] = foot_pt


        # skip if outside court polygon
        inside = cv2.pointPolygonTest(court_poly_np, foot_pt, False)
        if inside < 0:
            # optional: draw faint box
            cv2.rectangle(vis, (x1,y1), (x2,y2), (120,120,120), 1)
            continue

        # determine side
        side = 1 if foot_y < NET_Y else 2
        # ---- HARD LOCK TEAM PER TRACK ----

        if track_id not in track_assigned:
        
            nearest = find_nearest_existing(foot_x, foot_y, last_positions)
        
            if nearest is not None and nearest in track_assigned:
                # inherit team from nearby player
                track_assigned[track_id] = track_assigned[nearest]
            else:
                # fallback: net-based
                track_assigned[track_id] = 1 if foot_y < NET_Y else 2

        
        assigned_team = track_assigned.get(track_id, side)

        # update cumulative counts
        track_counts[track_id][side] += 1
        track_total_seen[track_id] += 1

        # if no assignment yet, possibly create initial assignment if enough frames seen
        if track_id not in track_assigned:
            # if enough frames observed, assign to majority
            total = track_total_seen[track_id]
            if total >= MIN_FRAMES_FOR_ASSIGNMENT:
                # assign to side with greater cumulative frames
                if track_counts[track_id][1] >= track_counts[track_id][2]:
                    track_assigned[track_id] = 1
                else:
                    track_assigned[track_id] = 2
            else:
                # provisional assignment to current observed side (helps early drawing)
                track_assigned[track_id] = side

        assigned_team = track_assigned.get(track_id, side)
        color = (0,200,0) if assigned_team == 1 else (200,0,0)

        cv2.rectangle(vis, (x1,y1), (x2,y2), color, 2)
        cv2.circle(vis, foot_pt, 3, color, -1)
        cv2.putText(vis, f"ID{track_id}", (x1, max(y1-6,0)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

        if assigned_team == 1:
            team1_count += 1
        else:
            team2_count += 1

    # remove/forget very old tracks to free memory (if not seen for long)
    to_remove = []
    for tid, lastf in list(track_ttl.items()):
        if frame_idx - lastf > TRACK_TTL_FRAMES:
            to_remove.append(tid)
    for tid in to_remove:
        track_counts.pop(tid, None)
        track_assigned.pop(tid, None)
        track_total_seen.pop(tid, None)
        track_last_seen.pop(tid, None)
        track_bbox_last.pop(tid, None)
        track_ttl.pop(tid, None)

    # draw counts
    cv2.putText(vis, f"Team1: {team1_count}", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,200,0), 2)
    cv2.putText(vis, f"Team2: {team2_count}", (10,60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (200,0,0), 2)

    # write video frame and CSV row
    if 'writer' not in locals():
        # create writer lazily using frame size
        Ht, Wt = vis.shape[:2]
        writer = cv2.VideoWriter(out_video, cv2.VideoWriter_fourcc(*"mp4v"), 25, (Wt,Ht))
    writer.write(vis)
    csv_rows.append((frame_idx, team1_count, team2_count, len(observed_ids)))

    frame_idx += 1

# cleanup
if 'writer' in locals():
    writer.release()

with open(out_csv, "w", newline="") as fh:
    w = csv.writer(fh)
    w.writerow(["frame","team1_count","team2_count","active_tracks"])
    for row in csv_rows:
        w.writerow(row)

elapsed = time.time() - start_t
print("Done. Output video:", out_video)
print("CSV:", out_csv)
print(f"Frames processed: {frame_idx}, elapsed {elapsed:.1f}s, approx fps {frame_idx/elapsed:.1f}")



video 1/1 (frame 1/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 49.4ms
video 1/1 (frame 2/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 8.1ms
video 1/1 (frame 3/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 7.9ms
video 1/1 (frame 4/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 7.6ms
video 1/1 (frame 5/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 17 persons, 7.8ms
video 1/1 (frame 6/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 17 persons, 8.1ms
video 1/1 (frame 7/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 17 persons, 8.1ms
video 1/1 (frame 8/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 17 persons, 8.8ms
video 1/1 (frame 9/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 16 persons, 10.7ms
video 1/1 (frame 10/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 16 p

In [12]:
import cv2, numpy as np, csv
from ultralytics import YOLO

# ================= USER CONFIG =================
video_path = "/kaggle/input/tensecondstest/output_video_10s.mp4"
model_path = "yolov12x.pt"
out_video = "/kaggle/working/output/hysteresis_tracker.avi"

COURT_POLY = [(313, 292), (996, 289), (1198, 708), (125, 707)]
NET_Y = 453

# The "No-Man's Land" size. 
# Players must cross this entire zone to switch teams.
BUFFER = 60 
UPPER_THRESHOLD = NET_Y - BUFFER
LOWER_THRESHOLD = NET_Y + BUFFER
# ==============================================

court_poly_np = np.array(COURT_POLY, dtype=np.int32).reshape((-1, 1, 2))
model = YOLO('yolov8x.pt')

# Dictionary to store the "Locked" team state for each ID
# { track_id: team_number }
track_team_state = {}

stream = model.track(source=video_path, conf=0.3, classes=[0], stream=True, persist=True)
writer = None

for frame_idx, res in enumerate(stream):
    frame = res.orig_img
    vis = frame.copy()
    
    if writer is None:
        H, W = frame.shape[:2]
        writer = cv2.VideoWriter(out_video, cv2.VideoWriter_fourcc(*"mp4v"), 25, (W, H))

    # Visualizing the Hysteresis Zones
    cv2.line(vis, (0, UPPER_THRESHOLD), (W, UPPER_THRESHOLD), (0, 255, 255), 1) # Yellow line
    cv2.line(vis, (0, LOWER_THRESHOLD), (W, LOWER_THRESHOLD), (0, 255, 255), 1) # Yellow line
    cv2.line(vis, (0, NET_Y), (W, NET_Y), (0, 0, 255), 2) # Actual Net

    if res.boxes is not None and res.boxes.id is not None:
        boxes = res.boxes.xyxy.cpu().numpy()
        ids = res.boxes.id.cpu().numpy().astype(int)
        
        t1_count, t2_count = 0, 0

        for i, tid in enumerate(ids):
            x1, y1, x2, y2 = boxes[i].astype(int)
            
            # Use a point slightly above the feet for more stability during jumps
            stable_y = y1 + (y2 - y1) * 0.9 
            foot_center = (float((x1 + x2) / 2), float(y2))

            # 1. Spectator Filter
            if cv2.pointPolygonTest(court_poly_np, foot_center, False) < 0:
                continue

            # 2. Hysteresis Team Assignment
            if tid not in track_team_state:
                # Initial assignment
                track_team_state[tid] = 1 if stable_y < NET_Y else 2
            else:
                # Only switch if they cross the OPPOSITE threshold
                current_team = track_team_state[tid]
                if current_team == 1 and stable_y > LOWER_THRESHOLD:
                    track_team_state[tid] = 2
                elif current_team == 2 and stable_y < UPPER_THRESHOLD:
                    track_team_state[tid] = 1

            # 3. Final Count
            assigned_team = track_team_state[tid]
            color = (0, 255, 0) if assigned_team == 1 else (255, 0, 0)
            
            if assigned_team == 1: t1_count += 1
            else: t2_count += 1

            cv2.rectangle(vis, (x1, y1), (x2, y2), color, 2)
            cv2.putText(vis, f"P{tid}", (x1, y1-5), 0, 0.5, color, 2)

        cv2.putText(vis, f"Team 1: {t1_count} | Team 2: {t2_count}", (20, 50), 0, 1, (255,255,255), 3)

    writer.write(vis)

writer.release()

[KDownloading https://github.com/ultralytics/assets/releases/download/v8.4.0/yolov8x.pt to 'yolov8x.pt': 100% ━━━━━━━━━━━━ 130.5MB 365.8MB/s 0.4s 0.3s<0.1s

video 1/1 (frame 1/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 19 persons, 64.5ms
video 1/1 (frame 2/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 19 persons, 37.1ms
video 1/1 (frame 3/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 35.7ms
video 1/1 (frame 4/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 36.3ms
video 1/1 (frame 5/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 32.8ms
video 1/1 (frame 6/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 33.2ms
video 1/1 (frame 7/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 19 persons, 32.6ms
video 1/1 (frame 8/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 19 persons, 33.2ms
video 1/1 (frame 9/292) /k

In [9]:
import cv2, torch, numpy as np, csv, time
from ultralytics import YOLO
from torchvision import models, transforms
from scipy.spatial.distance import cosine
from collections import defaultdict

# ================= USER CONFIG =================
video_path = "/kaggle/input/tensecondstest/output_video_10s.mp4"
model_path = "yolo11l.pt"
out_video = "/kaggle/working/output/reid.avi"
out_csv   = "/kaggle/working/player_counts_final.csv"

COURT_POLY = [(313, 292), (996, 289), (1198, 708), (125, 707)]
NET_Y = 453
SIMILARITY_THRESHOLD = 0.75  # How closely they must match the team gallery
# ==============================================

# 1. SETUP HARDWARE & MODELS
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# Feature Extractor (ResNet50)
base_model = models.resnet50(weights='IMAGENET1K_V1').to(device)
reid_encoder = torch.nn.Sequential(*(list(base_model.children())[:-1]))
reid_encoder.eval()

# Detection & Tracking
detector = YOLO(model_path)

# Image Preprocessing for Re-ID
preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 2. DATA STRUCTURES
court_poly_np = np.array(COURT_POLY, dtype=np.int32).reshape((-1, 1, 2))
team_gallery = {1: [], 2: []} # List of embeddings for each team
track_to_team = {}            # Persistent mapping of ID -> Team
csv_rows = []

def get_fingerprint(img, box):
    x1, y1, x2, y2 = box.astype(int)
    crop = img[max(0, y1):y2, max(0, x1):x2]
    if crop.size == 0 or crop.shape[0] < 10 or crop.shape[1] < 10:
        return None
    
    input_tensor = preprocess(crop).unsqueeze(0).to(device)
    with torch.no_grad():
        features = reid_encoder(input_tensor)
    return features.cpu().numpy().flatten()

# 3. PROCESSING LOOP
stream = detector.track(source=video_path, conf=0.3, stream=True, persist=True)
writer = None

for frame_idx, res in enumerate(stream):
    frame = res.orig_img
    vis = frame.copy()
    
    if writer is None:
        H, W = frame.shape[:2]
        writer = cv2.VideoWriter(out_video, cv2.VideoWriter_fourcc(*"mp4v"), 25, (W, H))

    # Visual Aids
    cv2.polylines(vis, [court_poly_np], True, (255, 255, 0), 2)
    cv2.line(vis, (0, NET_Y), (W, NET_Y), (0, 0, 255), 2)

    if res.boxes is not None and res.boxes.id is not None:
        boxes = res.boxes.xyxy.cpu().numpy()
        ids = res.boxes.id.cpu().numpy().astype(int)
        
        t1, t2 = 0, 0
        for i, tid in enumerate(ids):
            box = boxes[i]
            foot = (float((box[0] + box[2]) / 2), float(box[3]))

            # A. Filter Spectators
            if cv2.pointPolygonTest(court_poly_np, foot, False) < 0:
                continue

            # B. Team Identity Logic
            if tid not in track_to_team:
                current_feat = get_fingerprint(frame, box)
                if current_feat is None: continue

                # Seed galleries if they are small, otherwise compare
                if len(team_gallery[1]) < 6 or len(team_gallery[2]) < 6:
                    # Initial seeding by position
                    assigned = 1 if (box[1] + box[3])/2 < NET_Y else 2
                    team_gallery[assigned].append(current_feat)
                    track_to_team[tid] = assigned
                else:
                    # Compare to all known fingerprints
                    best_sim = -1
                    best_team = 1
                    for team_id in [1, 2]:
                        for sig in team_gallery[team_id]:
                            sim = 1 - cosine(current_feat, sig)
                            if sim > best_sim:
                                best_sim = sim
                                best_team = team_id
                    
                    if best_sim > SIMILARITY_THRESHOLD:
                        track_to_team[tid] = best_team
                    else:
                        # Fallback to current side if no match
                        track_to_team[tid] = 1 if (box[1] + box[3])/2 < NET_Y else 2
            
            # C. Count and Visuals
            final_team = track_to_team.get(tid, 1)
            color = (0, 255, 0) if final_team == 1 else (255, 0, 0)
            
            if final_team == 1: t1 += 1
            else: t2 += 1

            cv2.rectangle(vis, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), color, 2)
            cv2.putText(vis, f"P{tid}-T{final_team}", (int(box[0]), int(box[1]-5)), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        csv_rows.append([frame_idx, t1, t2])
        cv2.putText(vis, f"T1: {t1} T2: {t2}", (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2)

    writer.write(vis)

writer.release()

# 4. SAVE OUTPUT
with open(out_csv, "w", newline="") as f:
    w = csv.writer(f)
    w.writerow(["frame", "team1", "team2"])
    w.writerows(csv_rows)

print(f"Success! Video saved to {out_video}")

Using device: cuda

video 1/1 (frame 1/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 32.0ms
video 1/1 (frame 2/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 25.1ms
video 1/1 (frame 3/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 25.2ms
video 1/1 (frame 4/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 1 tennis racket, 20.9ms
video 1/1 (frame 5/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 1 tennis racket, 20.3ms
video 1/1 (frame 6/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 1 sports ball, 1 tennis racket, 19.8ms
video 1/1 (frame 7/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 1 sports ball, 1 tennis racket, 19.7ms
video 1/1 (frame 8/292) /kaggle/input/tensecondstest/output_video_10s.mp4: 384x640 18 persons, 1 sports ball, 1 tennis racket, 20.0ms
video 1/1 (frame 9/292) /kaggle/in

In [8]:
%ls

[0m[01;34moutput[0m/  player_counts_final.csv  yolo11l.pt


In [8]:
import cv2, numpy as np, torch
from ultralytics import YOLO
from collections import deque
from tqdm import tqdm

# ==================== CONFIG ====================
PLAYER_MODEL_PATH = "yolo11x.pt" 
BALL_MODEL_PATH   = "/kaggle/input/ballweight/last.pt"
COURT_POLY = [(313, 292), (996, 289), (1198, 708), (125, 707)]
NET_Y = 453

TRAIL_LENGTH = 20
CUT_THRESHOLD = 35.0 
BALL_SPEED_MAX = 200 # Pixels per frame
SPIKE_THRESHOLD = 70 # km/h to trigger trail color change
video_path = '/kaggle/input/vollyball/volleyball_match.mp4'
# ================================================

class VolleyballPipeline:
    def __init__(self):
        self.p_model = YOLO(PLAYER_MODEL_PATH)
        self.b_model = YOLO(BALL_MODEL_PATH)
        self.court_np = np.array(COURT_POLY, dtype=np.int32)
        self.fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=16, detectShadows=False)
        
        # State
        self.player_registry = {} 
        self.ball_trail = deque(maxlen=TRAIL_LENGTH)
        self.prev_frame_gray = None
        self.peak_speed = 0.0
        self.current_speed = 0.0
        
    def calculate_speed(self, p1, p2, fps=30):
        """Converts pixel displacement to km/h."""
        pixel_dist = np.linalg.norm(np.array(p1) - np.array(p2))
        # Approximation: 18m court length / approx 700px wide = 0.025m/px
        meters_per_pixel = 0.025 
        speed_kmh = (pixel_dist * meters_per_pixel * fps) * 3.6
        return speed_kmh

    def detect_cut(self, frame):
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        if self.prev_frame_gray is None:
            self.prev_frame_gray = gray
            return False
        diff = cv2.absdiff(gray, self.prev_frame_gray)
        score = np.mean(diff)
        self.prev_frame_gray = gray
        if score > CUT_THRESHOLD:
            self.player_registry = {}
            self.ball_trail.clear()
            self.peak_speed = 0
            return True
        return False

    def validate_circularity(self, contour):
        area = cv2.contourArea(contour)
        if area < 40 or area > 1200: return False
        perimeter = cv2.arcLength(contour, True)
        if perimeter == 0: return False
        circularity = 4 * np.pi * (area / (perimeter * perimeter))
        return 0.65 < circularity < 1.2

    def get_ball_pos(self, frame, motion_mask):
        # 1. YOLO Detection
        results = self.b_model.predict(frame, conf=0.35, verbose=False)
        if results[0].boxes:
            box = results[0].boxes.xyxy[0].cpu().numpy()
            center = (int((box[0]+box[2])/2), int((box[1]+box[3])/2))
            return self.physics_check(center)

        # 2. Motion Fallback
        mask = np.zeros(frame.shape[:2], dtype=np.uint8)
        cv2.fillPoly(mask, [self.court_np], 255)
        masked_motion = cv2.bitwise_and(motion_mask, motion_mask, mask=mask)
        contours, _ = cv2.findContours(masked_motion, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        for cnt in contours:
            if self.validate_circularity(cnt):
                M = cv2.moments(cnt)
                if M["m00"] != 0:
                    center = (int(M["m10"] / M["m00"]), int(M["m01"] / M["m00"]))
                    return self.physics_check(center)
        return None

    def physics_check(self, center):
        if len(self.ball_trail) > 0:
            dist = np.linalg.norm(np.array(center) - np.array(self.ball_trail[-1]))
            if dist > BALL_SPEED_MAX: return None
        return center

    def draw_aura(self, img, foot_pos, team):
        color = (0, 0, 255) if team == 1 else (0, 255, 255) # Team 1: Red, Team 2: Yellow
        overlay = img.copy()
        cv2.ellipse(overlay, foot_pos, (40, 15), 0, 0, 360, color, -1)
        cv2.addWeighted(overlay, 0.4, img, 0.6, 0, img)

    def process(self, video_in, video_out):
        cap = cv2.VideoCapture(video_in)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        w, h = int(cap.get(3)), int(cap.get(4))
        writer = cv2.VideoWriter(video_out, cv2.VideoWriter_fourcc(*"mp4v"), 30, (w, h))
        pbar = tqdm(total=total_frames, desc="Analyzing Match")

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret: break
            
            if self.detect_cut(frame):
                cv2.putText(frame, "SCENE RESET", (w//2-100, h//2), 0, 1.2, (255,255,255), 3)
            
            motion_mask = self.fgbg.apply(frame)

            # Player Logic
            p_res = self.p_model.track(frame, persist=True, verbose=False, tracker="bytetrack.yaml", classes=[0])
            t1, t2 = 0, 0
            if p_res[0].boxes.id is not None:
                boxes = p_res[0].boxes.xyxy.cpu().numpy()
                ids = p_res[0].boxes.id.cpu().numpy().astype(int)
                for i, tid in enumerate(ids):
                    foot = (int((boxes[i][0] + boxes[i][2])/2), int(boxes[i][3]))
                    if cv2.pointPolygonTest(self.court_np, foot, False) >= 0:
                        if tid not in self.player_registry:
                            self.player_registry[tid] = 2 if foot[1] < NET_Y else 1
                        team = self.player_registry[tid]
                        if team == 1: t1 += 1
                        else: t2 += 1
                        self.draw_aura(frame, foot, team)

            # Ball Logic
            ball_pos = self.get_ball_pos(frame, motion_mask)
            if ball_pos:
                if len(self.ball_trail) > 0:
                    self.current_speed = self.calculate_speed(ball_pos, self.ball_trail[-1])
                    if self.current_speed > self.peak_speed: self.peak_speed = self.current_speed
                self.ball_trail.append(ball_pos)
            
            # Trail Drawing with FIX
            for j in range(1, len(self.ball_trail)):
                # Ensure thickness is ALWAYS at least 1
                thickness = max(1, int((j / len(self.ball_trail)) * 7))
                # Color changes to Orange/Red if ball is fast
                trail_color = (0, 165, 255) if self.current_speed > SPIKE_THRESHOLD else (255, 255, 255)
                cv2.line(frame, self.ball_trail[j-1], self.ball_trail[j], trail_color, thickness)
            
            if ball_pos: cv2.circle(frame, ball_pos, 6, (0, 255, 0), -1)

            # HUD
            cv2.rectangle(frame, (20, 20), (320, 150), (0,0,0), -1)
            cv2.putText(frame, f"T2 (TOP): {min(t2, 6)}", (40, 50), 0, 0.7, (0,255,255), 2)
            cv2.putText(frame, f"T1 (BOT): {min(t1, 6)}", (40, 80), 0, 0.7, (0,0,255), 2)
            cv2.putText(frame, f"SPEED: {int(self.current_speed)} km/h", (40, 110), 0, 0.7, (0,255,0), 2)
            cv2.putText(frame, f"PEAK: {int(self.peak_speed)} km/h", (40, 140), 0, 0.7, (255,255,255), 2)
            
            writer.write(frame)
            pbar.update(1)
            
        pbar.close()
        cap.release()
        writer.release()

pipeline = VolleyballPipeline()
pipeline.process(video_path, "/kaggle/working/pro_ball_physics.mp4")


Analyzing Match:   0%|          | 0/3437 [00:00<?, ?it/s][A
Analyzing Match:   0%|          | 1/3437 [00:01<1:11:14,  1.24s/it][A
Analyzing Match:   0%|          | 2/3437 [00:01<33:19,  1.72it/s]  [A
Analyzing Match:   0%|          | 3/3437 [00:01<20:55,  2.74it/s][A
Analyzing Match:   0%|          | 4/3437 [00:01<15:08,  3.78it/s][A
Analyzing Match:   0%|          | 5/3437 [00:01<11:58,  4.77it/s][A
Analyzing Match:   0%|          | 6/3437 [00:01<09:59,  5.72it/s][A
Analyzing Match:   0%|          | 7/3437 [00:01<08:43,  6.56it/s][A
Analyzing Match:   0%|          | 8/3437 [00:02<07:56,  7.19it/s][A
Analyzing Match:   0%|          | 9/3437 [00:02<07:29,  7.63it/s][A
Analyzing Match:   0%|          | 10/3437 [00:02<07:06,  8.03it/s][A
Analyzing Match:   0%|          | 11/3437 [00:02<07:00,  8.15it/s][A
Analyzing Match:   0%|          | 12/3437 [00:02<06:56,  8.22it/s][A
Analyzing Match:   0%|          | 13/3437 [00:02<06:52,  8.30it/s][A
Analyzing Match:   0%|          |

In [18]:
import cv2
import numpy as np
import torch
from ultralytics import YOLO
from collections import deque, Counter
from tqdm import tqdm
from filterpy.kalman import KalmanFilter
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Tuple, Dict, List

# ==================== CONFIG ====================
PLAYER_MODEL_PATH = "yolo11x.pt" 
BALL_MODEL_PATH = "/kaggle/input/ballweight/last.pt"
COURT_POLY = [(313, 292), (996, 289), (1198, 708), (125, 707)]
NET_Y = 453

# Detection Parameters
BALL_CONF_THRESHOLD = 0.35  # Lowered for better detection
PLAYER_CONF_THRESHOLD = 0.4

# Trail & Tracking
TRAIL_LENGTH = 20
MAX_BALL_PREDICTION_FRAMES = 10  # Predict ball position during occlusion
TEAM_CONSENSUS_FRAMES = 30
TEAM_CONSENSUS_THRESHOLD = 0.7

# Cut Detection
HIST_DIFF_THRESHOLD = 0.65  # Histogram difference threshold
EDGE_DIFF_THRESHOLD = 0.35  # Edge change threshold

# Memory Management
STALE_TRACK_TIMEOUT = 100  # Frames before removing inactive track
GRACE_PERIOD_FRAMES = 30   # Keep recent tracks after cut

# Performance
USE_ASYNC_INFERENCE = True
ASYNC_WORKERS = 2

# Paths
VIDEO_INPUT = '/kaggle/input/tensecondstest/output_video_10s.mp4'
#VIDEO_INPUT = '/kaggle/input/vollyball/volleyball_match.mp4'
VIDEO_OUTPUT = '/kaggle/working/volleyball_final_enhanced.mp4'

# Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ================================================


class BallTracker:
    """Kalman filter-based ball tracking with occlusion handling"""
    
    def __init__(self):
        self.kf = KalmanFilter(dim_x=4, dim_z=2)
        
        # State: [x, y, vx, vy]
        self.kf.F = np.array([[1, 0, 1, 0],  # State transition matrix
                              [0, 1, 0, 1],
                              [0, 0, 1, 0],
                              [0, 0, 0, 1]], dtype=np.float32)
        
        # Measurement function (observe position only)
        self.kf.H = np.array([[1, 0, 0, 0],
                              [0, 1, 0, 0]], dtype=np.float32)
        
        # Noise covariance
        self.kf.R *= 10   # Measurement noise
        self.kf.Q *= 0.1  # Process noise
        self.kf.P *= 1000 # Initial uncertainty
        
        self.last_detection_frame = -1
        self.initialized = False
        
    def update(self, detection: Optional[np.ndarray], frame_num: int) -> Optional[Tuple[int, int]]:
        """
        Update tracker with detection or predict during occlusion
        
        Args:
            detection: [x, y] numpy array or None
            frame_num: Current frame number
            
        Returns:
            Estimated position (x, y) or None if lost
        """
        gap = frame_num - self.last_detection_frame
        
        if detection is not None:
            # We have a detection
            if not self.initialized or gap > 15:
                # Reinitialize if first detection or large gap
                self.kf.x[:2] = detection.reshape(-1, 1)
                self.kf.x[2:] = 0  # Reset velocity
                self.initialized = True
            else:
                # Normal update
                self.kf.predict()
                self.kf.update(detection.reshape(-1, 1))
            
            self.last_detection_frame = frame_num
            return tuple(self.kf.x[:2].astype(int))
            
        else:
            # No detection - predict if possible
            if self.initialized and gap < MAX_BALL_PREDICTION_FRAMES:
                self.kf.predict()
                return tuple(self.kf.x[:2].astype(int))
            else:
                # Lost track completely
                return None
    
    def reset(self):
        """Reset tracker state"""
        self.initialized = False
        self.last_detection_frame = -1
        self.kf.P *= 1000


class CutDetector:
    """Multi-metric scene cut detection"""
    
    def __init__(self):
        self.prev_frame_gray = None
        self.prev_hist = None
        self.prev_edges = None
        
    def detect(self, frame: np.ndarray) -> bool:
        """
        Detect if current frame is a scene cut
        
        Args:
            frame: BGR frame
            
        Returns:
            True if cut detected
        """
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        if self.prev_frame_gray is None:
            self._update_state(gray)
            return False
        
        # Method 1: Histogram comparison
        hist = cv2.calcHist([gray], [0], None, [256], [0, 256])
        cv2.normalize(hist, hist)
        hist_diff = cv2.compareHist(self.prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA)
        
        # Method 2: Edge-based comparison
        edges = cv2.Canny(gray, 100, 200)
        edge_diff = np.sum(self.prev_edges != edges) / edges.size
        
        # Combined decision
        is_cut = (hist_diff > HIST_DIFF_THRESHOLD) or (edge_diff > EDGE_DIFF_THRESHOLD)
        
        if is_cut:
            logger.info(f"Cut detected - Hist diff: {hist_diff:.3f}, Edge diff: {edge_diff:.3f}")
        
        self._update_state(gray)
        return is_cut
    
    def _update_state(self, gray: np.ndarray):
        """Update internal state"""
        self.prev_frame_gray = gray
        self.prev_hist = cv2.calcHist([gray], [0], None, [256], [0, 256])
        cv2.normalize(self.prev_hist, self.prev_hist)
        self.prev_edges = cv2.Canny(gray, 100, 200)


class TeamAssigner:
    """Multi-frame consensus-based team assignment"""
    
    def __init__(self):
        self.player_history: Dict[int, deque] = {}  # {track_id: deque of side assignments}
        self.player_registry: Dict[int, int] = {}   # {track_id: team_num}
        
    def assign_team(self, track_id: int, foot_pos: Tuple[int, int]) -> int:
        """
        Assign team based on temporal consensus
        
        Args:
            track_id: Player tracking ID
            foot_pos: (x, y) foot position
            
        Returns:
            Team number (1 or 2)
        """
        # Initialize history if new player
        if track_id not in self.player_history:
            self.player_history[track_id] = deque(maxlen=TEAM_CONSENSUS_FRAMES)
        
        # Determine current side based on Y-coordinate
        current_side = 2 if foot_pos[1] < NET_Y else 1
        self.player_history[track_id].append(current_side)
        
        # Require consensus over multiple frames
        if len(self.player_history[track_id]) >= 10:
            counter = Counter(self.player_history[track_id])
            dominant_team, count = counter.most_common(1)[0]
            
            # Require strong consensus
            if count / len(self.player_history[track_id]) > TEAM_CONSENSUS_THRESHOLD:
                self.player_registry[track_id] = dominant_team
                return dominant_team
        
        # Return existing assignment or default to current side
        return self.player_registry.get(track_id, current_side)
    
    def cleanup_stale_tracks(self, active_ids: set, frame_num: int, last_seen: Dict[int, int]):
        """Remove tracks not seen recently"""
        stale_ids = [
            tid for tid in self.player_registry
            if tid not in active_ids and 
            frame_num - last_seen.get(tid, 0) > STALE_TRACK_TIMEOUT
        ]
        
        for tid in stale_ids:
            if tid in self.player_registry:
                del self.player_registry[tid]
            if tid in self.player_history:
                del self.player_history[tid]
        
        if stale_ids:
            logger.debug(f"Cleaned up {len(stale_ids)} stale tracks")
    
    def reset_with_grace_period(self, frame_num: int, last_seen: Dict[int, int]):
        """Soft reset - keep recently seen players"""
        recent_ids = {
            tid for tid, frame in last_seen.items()
            if frame_num - frame < GRACE_PERIOD_FRAMES
        }
        
        # Keep only recent tracks
        self.player_registry = {
            k: v for k, v in self.player_registry.items()
            if k in recent_ids
        }
        self.player_history = {
            k: v for k, v in self.player_history.items()
            if k in recent_ids
        }
        
        logger.info(f"Soft reset - retained {len(recent_ids)} recent tracks")


class VolleyballPipelineEnhanced:
    """Enhanced volleyball tracking pipeline with robust features"""
    
    def __init__(self):
        # Load models
        logger.info("Loading YOLO models...")
        self.p_model = YOLO(PLAYER_MODEL_PATH)
        self.b_model = YOLO(BALL_MODEL_PATH)
        
        # Court geometry
        self.court_np = np.array(COURT_POLY, dtype=np.int32)
        
        # Component systems
        self.ball_tracker = BallTracker()
        self.cut_detector = CutDetector()
        self.team_assigner = TeamAssigner()
        
        # State tracking
        self.ball_trail: deque = deque(maxlen=TRAIL_LENGTH)  # [(pos, is_detected), ...]
        self.last_seen: Dict[int, int] = {}  # {track_id: last_frame_num}
        self.frame_count = 0
        
        # Performance
        self.executor = ThreadPoolExecutor(max_workers=ASYNC_WORKERS) if USE_ASYNC_INFERENCE else None
        
        logger.info("Pipeline initialized successfully")
    
    def detect_ball(self, frame: np.ndarray) -> Optional[np.ndarray]:
        """
        Detect ball in frame
        
        Returns:
            [x, y] numpy array or None
        """
        try:
            results = self.b_model.predict(
                frame, 
                conf=BALL_CONF_THRESHOLD, 
                verbose=False,
                device='cuda' if torch.cuda.is_available() else 'cpu'
            )
            
            if not results[0].boxes:
                return None
            
            box = results[0].boxes.xyxy[0].cpu().numpy()
            center = np.array([(box[0] + box[2]) / 2, (box[1] + box[3]) / 2])
            return center
            
        except Exception as e:
            logger.warning(f"Ball detection failed: {e}")
            return None
    
    def track_players(self, frame: np.ndarray) -> List[Tuple[int, Tuple[int, int], int]]:
        """
        Track players and assign teams
        
        Returns:
            List of (track_id, foot_pos, team_num)
        """
        try:
            results = self.p_model.track(
                frame,
                persist=True,
                verbose=False,
                tracker="bytetrack.yaml",
                classes=[0],  # person class
                conf=PLAYER_CONF_THRESHOLD,
                device='cuda' if torch.cuda.is_available() else 'cpu'
            )
            
            players = []
            
            if results[0].boxes.id is not None:
                boxes = results[0].boxes.xyxy.cpu().numpy()
                ids = results[0].boxes.id.cpu().numpy().astype(int)
                
                for i, tid in enumerate(ids):
                    # Calculate foot position (bottom center of bbox)
                    foot_x = int((boxes[i][0] + boxes[i][2]) / 2)
                    foot_y = int(boxes[i][3])
                    foot_pos = (foot_x, foot_y)
                    
                    # Only process players inside court
                    if cv2.pointPolygonTest(self.court_np, foot_pos, False) >= 0:
                        # Update last seen
                        self.last_seen[tid] = self.frame_count
                        
                        # Assign team
                        team = self.team_assigner.assign_team(tid, foot_pos)
                        players.append((tid, foot_pos, team))
            
            return players
            
        except Exception as e:
            logger.warning(f"Player tracking failed: {e}")
            return []
    
    def draw_aura(self, img: np.ndarray, foot_pos: Tuple[int, int], team: int):
        """Draw subtle team indicator at player feet"""
        color = (0, 0, 255) if team == 1 else (0, 255, 255)  # Red: Bot, Yellow: Top
        overlay = img.copy()
        cv2.ellipse(overlay, foot_pos, (40, 15), 0, 0, 360, color, -1)
        cv2.addWeighted(overlay, 0.4, img, 0.6, 0, img)
    
    def draw_ball_trail(self, frame: np.ndarray):
        """Draw ball trail without gaps"""
        if len(self.ball_trail) < 2:
            return
        
        for j in range(1, len(self.ball_trail)):
            prev_pos, prev_detected = self.ball_trail[j - 1]
            curr_pos, curr_detected = self.ball_trail[j]
            
            # Only draw line if both points are from actual detections or predictions
            # Skip if there's a large gap (both are non-detected)
            if prev_pos is None or curr_pos is None:
                continue
            
            # Progressive thickness
            thickness = max(1, int((j / len(self.ball_trail)) * 6))
            
            # Different color for predicted vs detected
            if prev_detected and curr_detected:
                color = (255, 255, 255)  # White for detected
            else:
                color = (200, 200, 200)  # Gray for predicted
            
            pt1 = (int(prev_pos[0]), int(prev_pos[1]))
            pt2 = (int(curr_pos[0]), int(curr_pos[1]))
            cv2.line(frame, pt1, pt2, color, thickness)
        
        # Draw current ball position
        if self.ball_trail:
            last_pos, last_detected = self.ball_trail[-1]
            if last_pos is not None:
                color = (0, 255, 0) if last_detected else (0, 200, 0)
                radius = 5
                cv2.circle(frame, (int(last_pos[0]), int(last_pos[1])), radius, color, -1)
    
    def draw_hud(self, frame: np.ndarray, t1_count: int, t2_count: int):
        """Draw team counter HUD"""
        # Semi-transparent background
        overlay = frame.copy()
        cv2.rectangle(overlay, (20, 20), (280, 100), (0, 0, 0), -1)
        cv2.addWeighted(overlay, 0.7, frame, 0.3, 0, frame)
        
        # Team counts (capped at 6)
        cv2.putText(
            frame, 
            f"TEAM 2 (TOP): {min(t2_count, 6)}", 
            (35, 50), 
            cv2.FONT_HERSHEY_SIMPLEX, 
            0.7, 
            (0, 255, 255), 
            2
        )
        cv2.putText(
            frame, 
            f"TEAM 1 (BOT): {min(t1_count, 6)}", 
            (35, 85), 
            cv2.FONT_HERSHEY_SIMPLEX, 
            0.7, 
            (0, 0, 255), 
            2
        )
    
    def process_frame(self, frame: np.ndarray) -> np.ndarray:
        """Process single frame"""
        self.frame_count += 1
        
        # 1. Detect cuts and reset if needed
        if self.cut_detector.detect(frame):
            self.team_assigner.reset_with_grace_period(self.frame_count, self.last_seen)
            self.ball_tracker.reset()
            self.ball_trail.clear()
        
        # 2. Track players
        players = self.track_players(frame)
        
        # Count teams
        t1_count = sum(1 for _, _, team in players if team == 1)
        t2_count = sum(1 for _, _, team in players if team == 2)
        
        # Draw player auras
        for tid, foot_pos, team in players:
            self.draw_aura(frame, foot_pos, team)
        
        # 3. Track ball with Kalman filter
        ball_detection = self.detect_ball(frame)
        ball_pos = self.ball_tracker.update(ball_detection, self.frame_count)
        
        # Add to trail with detection flag
        is_detected = ball_detection is not None
        self.ball_trail.append((ball_pos, is_detected))
        
        # Draw ball trail
        self.draw_ball_trail(frame)
        
        # 4. Draw HUD
        self.draw_hud(frame, t1_count, t2_count)
        
        # 5. Periodic cleanup
        if self.frame_count % 50 == 0:
            active_ids = {tid for tid, _, _ in players}
            self.team_assigner.cleanup_stale_tracks(
                active_ids, 
                self.frame_count, 
                self.last_seen
            )
        
        return frame
    
    def process_video(self, input_path: str, output_path: str):
        """Process entire video"""
        logger.info(f"Processing video: {input_path}")
        
        cap = cv2.VideoCapture(input_path)
        if not cap.isOpened():
            logger.error(f"Failed to open video: {input_path}")
            return
        
        # Video properties
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        logger.info(f"Video info - Frames: {total_frames}, FPS: {fps}, Resolution: {width}x{height}")
        
        # Video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        
        # Progress bar
        pbar = tqdm(total=total_frames, desc="Processing Video", unit="frame")
        
        try:
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                # Process frame
                processed_frame = self.process_frame(frame)
                
                # Write output
                writer.write(processed_frame)
                pbar.update(1)
                
        except KeyboardInterrupt:
            logger.warning("Processing interrupted by user")
        except Exception as e:
            logger.error(f"Error during processing: {e}")
        finally:
            pbar.close()
            cap.release()
            writer.release()
            if self.executor:
                self.executor.shutdown(wait=True)
        
        logger.info(f"Processing complete - Output saved to: {output_path}")
        logger.info(f"Total frames processed: {self.frame_count}")


def main():
    """Main execution"""
    try:
        pipeline = VolleyballPipelineEnhanced()
        pipeline.process_video(VIDEO_INPUT, VIDEO_OUTPUT)
        logger.info("Pipeline execution completed successfully")
    except Exception as e:
        logger.error(f"Pipeline failed: {e}", exc_info=True)


if __name__ == "__main__":
    main()

2026-02-09 13:51:21,857 - INFO - Loading YOLO models...
2026-02-09 13:51:22,349 - INFO - Pipeline initialized successfully
2026-02-09 13:51:22,350 - INFO - Processing video: /kaggle/input/tensecondstest/output_video_10s.mp4
2026-02-09 13:51:22,365 - INFO - Video info - Frames: 292, FPS: 28, Resolution: 1280x720
  cv2.circle(frame, (int(last_pos[0]), int(last_pos[1])), radius, color, -1)
  pt1 = (int(prev_pos[0]), int(prev_pos[1]))
  pt2 = (int(curr_pos[0]), int(curr_pos[1]))
Processing Video: 100%|█████████▉| 291/292 [00:31<00:00,  9.38frame/s]
2026-02-09 13:51:53,398 - INFO - Processing complete - Output saved to: /kaggle/working/volleyball_final_enhanced.mp4
2026-02-09 13:51:53,399 - INFO - Total frames processed: 291
2026-02-09 13:51:53,399 - INFO - Pipeline execution completed successfully


# MAIN IMPLEMENT

In [None]:
import cv2, numpy as np, torch
from ultralytics import YOLO
from collections import deque
from tqdm import tqdm

PLAYER_MODEL_PATH = "yolo11x.pt" 
BALL_MODEL_PATH   = "/kaggle/input/ballweight/last.pt"
COURT_POLY = [(313, 292), (996, 289), (1198, 708), (125, 707)]
NET_Y = 453

TRAIL_LENGTH = 15
CUT_THRESHOLD = 35.0 
video_path = '/kaggle/input/tensecondstest/output_video_10s.mp4'

In [None]:
class VolleyballPipeline:
    def __init__(self):
        self.p_model = YOLO(PLAYER_MODEL_PATH)
        self.b_model = YOLO(BALL_MODEL_PATH)
        self.court_np = np.array(COURT_POLY, dtype=np.int32)
        
        self.player_registry = {} 
        self.ball_trail = deque(maxlen=TRAIL_LENGTH)
        self.prev_frame_gray = None
        
    def detect_cut(self, frame):
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        if self.prev_frame_gray is None:
            self.prev_frame_gray = gray
            return False
        
        diff = cv2.absdiff(gray, self.prev_frame_gray)
        score = np.mean(diff)
        self.prev_frame_gray = gray
        
        if score > CUT_THRESHOLD:
            self.player_registry = {} 
            self.ball_trail.clear()
            return True
        return False

    def get_ball_pos(self, frame):
        results = self.b_model.predict(frame, conf=0.45, verbose=False)
        if not results[0].boxes: 
            self.ball_trail.clear() 
            return None
        
        box = results[0].boxes.xyxy[0].cpu().numpy()
        return (int((box[0]+box[2])/2), int((box[1]+box[3])/2))

    def draw_aura(self, img, foot_pos, team):
        color = (0, 0, 255) if team == 1 else (0, 255, 255) 
        overlay = img.copy()
        cv2.ellipse(overlay, foot_pos, (40, 15), 0, 0, 360, color, -1)
        cv2.addWeighted(overlay, 0.4, img, 0.6, 0, img)

    def process(self, video_in, video_out):
        cap = cv2.VideoCapture(video_in)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        w, h = int(cap.get(3)), int(cap.get(4))
        writer = cv2.VideoWriter(video_out, cv2.VideoWriter_fourcc(*"mp4v"), 30, (w, h))

        pbar = tqdm(total=total_frames, desc="Processing")

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret: break
            
            if self.detect_cut(frame):
                pass 

            p_res = self.p_model.track(frame, persist=True, verbose=False, tracker="bytetrack.yaml", classes=[0])
            t1_count, t2_count = 0, 0
            
            if p_res[0].boxes.id is not None:
                boxes = p_res[0].boxes.xyxy.cpu().numpy()
                ids = p_res[0].boxes.id.cpu().numpy().astype(int)
                
                for i, tid in enumerate(ids):
                    foot = (int((boxes[i][0] + boxes[i][2])/2), int(boxes[i][3]))
                    
                    if cv2.pointPolygonTest(self.court_np, foot, False) >= 0:
                        if tid not in self.player_registry:
                            self.player_registry[tid] = 2 if foot[1] < NET_Y else 1
                        
                        team = self.player_registry[tid]
                        if team == 1: t1_count += 1
                        else: t2_count += 1
                        self.draw_aura(frame, foot, team)
            
            ball_pos = self.get_ball_pos(frame)
            if ball_pos: 
                self.ball_trail.append(ball_pos)
            
            for j in range(1, len(self.ball_trail)):
                thickness = max(1, int((j / len(self.ball_trail)) * 6))
                cv2.line(frame, self.ball_trail[j-1], self.ball_trail[j], (255, 255, 255), thickness)
            
            if ball_pos:
                cv2.circle(frame, ball_pos, 5, (0, 255, 0), -1)

            cv2.rectangle(frame, (20, 20), (260, 100), (0,0,0), -1)
            cv2.putText(frame, f"TEAM 2 (TOP): {min(t2_count, 6)}", (35, 50), 0, 0.7, (0,255,255), 2)
            cv2.putText(frame, f"TEAM 1 (BOT): {min(t1_count, 6)}", (35, 85), 0, 0.7, (0,0,255), 2)
            
            writer.write(frame)
            pbar.update(1)
            
        pbar.close()
        cap.release()
        writer.release()

In [19]:
pipeline = VolleyballPipeline()
pipeline.process(video_path, "/kaggle/working/clean_match_v1.mp4")

Processing Clean Pipeline: 100%|█████████▉| 291/292 [00:28<00:00, 10.17it/s]
