In [None]:
import os
import numpy as np
import pandas as pd

In [None]:
!pip install -q ultralytics

In [None]:
%%writefile cricket.yaml
path: /kaggle/input/datasets/kshitijkhonde/cricket-dataset-3/cricket_ball_data - 2
train: train/images
val: valid/images

names:
  0: cricket_ball

## Training the Model

In [None]:
from ultralytics import YOLO

model = YOLO("yolov8l.pt")

model.train(
    data="cricket.yaml",
    epochs=80,
    imgsz=1280,
    batch=4,
    lr0=0.005,
    patience=20,
    mosaic=1.0,
    mixup=0.2,
    copy_paste=0.2,
    device='cuda'
)

In [None]:
!zip -r runs.zip /kaggle/working/runs

## Model Inference

In [None]:
from collections import deque
from ultralytics import YOLO
import math
import time
import cv2
import os
import csv

# -------------------------
# Helper Functions
# -------------------------

def angle_between_lines(m1, m2=1):
    if m1 != -1/m2:
        return math.degrees(math.atan(abs((m2 - m1) / (1 + m1 * m2))))
    return 90.0

class FixedSizeQueue:
    def __init__(self, max_size):
        self.queue = deque(maxlen=max_size)

    def add(self, item):
        self.queue.append(item)

    def clear(self):
        self.queue.clear()

    def get(self):
        return self.queue

    def __len__(self):
        return len(self.queue)

# -------------------------
# Paths
# -------------------------
video_num = 6
MODEL_PATH = "/kaggle/input/datasets/kshitijkhonde/yolov8l-best-model/kaggle/working/runs/detect/train/weights/last.pt"
INPUT_VIDEO = f"/kaggle/input/datasets/kshitijkhonde/yolov8-cricket-ball-detection-test-dataset/{video_num}.mov"
OUTPUT_DIR = "/kaggle/working/results"
CSV_PATH = os.path.join(OUTPUT_DIR, f"centroids_{video_num}.csv")

os.makedirs(OUTPUT_DIR, exist_ok=True)
OUTPUT_VIDEO = os.path.join(OUTPUT_DIR, f"output_{video_num}.mp4")

# -------------------------
# Load Model
# -------------------------

model = YOLO(MODEL_PATH)

# -------------------------
# Video Setup
# -------------------------

cap = cv2.VideoCapture(INPUT_VIDEO)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

out = cv2.VideoWriter(
    OUTPUT_VIDEO,
    cv2.VideoWriter_fourcc(*"mp4v"),
    fps,
    (w, h)
)

centroid_history = FixedSizeQueue(15)

prev_frame_time = 0
csv_file = open(CSV_PATH, mode='w', newline='')
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["frame_index", "x_centroid", "y_centroid", "visibility_flag"])
frame_id = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break

    h, w, _ = frame.shape

    # ---------------------------------
    # MASK SCOREBOARD / WATERMARK AREA
    # ---------------------------------
    mask = frame.copy()
    
    mask[:, int(w*0.80):w] = 0   # rigth 20% mked
    mask[:, 0:int(w*0.20)] = 0   # left 20% masked
    mask[int(h*0.80):h, :] = 0   # bottom 20% masked
    mask[0:int(h*0.20), :] = 0   # top 20% masked
    
    frame_for_yolo = mask

    # FPS
    new_frame_time = time.time()
    fps_disp = 1/(new_frame_time-prev_frame_time+1e-6)
    prev_frame_time = new_frame_time

    # ---------------------------------
    # Detection + Tracking
    # ---------------------------------
    results = model.track(
        frame_for_yolo,
        persist=True,
        conf=0.35,
        verbose=False
    )

    best_conf = 0
    best_center = None
    best_box = None

    if results[0].boxes.conf is not None:
        boxes = results[0].boxes.xyxy.cpu()
        confs = results[0].boxes.conf.cpu()

        # choose highest confidence detection
        for box, conf in zip(boxes, confs):

            x1,y1,x2,y2 = map(int, box)
            cx = (x1+x2)//2
            cy = (y1+y2)//2

            # ---------------------------------
            # Reject detections in scoreboard area
            # ---------------------------------
            if cx > 0.7*w and cy > 0.8*h:
                continue

            if conf > best_conf:
                best_conf = conf
                best_center = (cx,cy)
                best_box = (x1,y1,x2,y2)

    # ---------------------------------
    # Store centroid
    # ---------------------------------
    if best_center is not None:
        centroid_history.add(best_center)

        x1,y1,x2,y2 = best_box
        cv2.rectangle(frame,(x1,y1),(x2,y2),(0,0,255),2)
        cv2.circle(frame,best_center,4,(0,0,255),-1)
    
        # ---- Save centroid to CSV ----
        csv_writer.writerow([frame_id, best_center[0], best_center[1], 1])
    
    else:
        # No detection in this frame
        csv_writer.writerow([frame_id, "", "", 0])

    frame_id += 1

    # ---------------------------------
    # Draw trajectory
    # ---------------------------------
    pts = list(centroid_history.get())
    for i in range(1,len(pts)):
        cv2.line(frame,pts[i-1],pts[i],(255,0,0),3)

    # ---------------------------------
    # Angle + Future path
    # ---------------------------------
    angle = 0
    if len(pts) > 1:
        dx = pts[-1][0] - pts[-2][0]
        dy = pts[-1][1] - pts[-2][1]

        if dx != 0:
            m1 = dy/dx
            angle = 90 - angle_between_lines(m1)

        future = [pts[-1]]
        for i in range(1,5):
            future.append(
                (pts[-1][0]+dx*i, pts[-1][1]+dy*i)
            )

        for i in range(1,len(future)):
            cv2.line(frame,future[i-1],future[i],(0,255,0),3)
            cv2.circle(frame,future[i],3,(0,0,255),-1)

    # ---------------------------------
    # Overlay Text
    # ---------------------------------
    cv2.putText(frame,f"Angle: {angle:.1f}",
                (20,30),
                cv2.FONT_HERSHEY_SIMPLEX,1,(255,0,0),2)

    cv2.putText(frame,f"FPS: {int(fps_disp)}",
                (20,65),
                cv2.FONT_HERSHEY_SIMPLEX,1,(255,0,0),2)

    out.write(frame)

# -------------------------
# Cleanup
# -------------------------

cap.release()
out.release()
csv_file.close()

print("Saved output video at:", OUTPUT_VIDEO)

In [None]:
!zip -r results.zip /kaggle/working/results_final