In [1]:
!pip install opencv-python numpy ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.168-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading n

In [8]:
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
import matplotlib.pyplot as plt
from tqdm import tqdm
import math
import pandas as pd

In [3]:
model = YOLO('yolov8s.pt')

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8s.pt to 'yolov8s.pt'...


100%|██████████| 21.5M/21.5M [00:00<00:00, 95.8MB/s]


In [4]:
class Utils:
    def __init__(self, video_path, model, confidence_threshold = 0):
        self.video_path = video_path #'test.mp4'
        self.model = model
        self.confidence_threshold = confidence_threshold

    def get_frame(self, frame_id):
        cap = cv2.VideoCapture(self.video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
        # Read the frame
        ret, frame = cap.read()
        return frame

    def detect_balls(self, frame):
        # Run YOLO detection
        results = self.model(frame, classes=[32], verbose=False)  # class 32 is 'sports ball' in COCO dataset

        # Filter for billiard balls (you may need to fine-tune this)
        balls = []
        for result in results:
            for box in result.boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                conf = float(box.conf[0])
                if conf > self.confidence_threshold:  # confidence threshold
                    balls.append({
                        'bbox': (x1, y1, x2, y2),
                        'center': ((x1 + x2) // 2, (y1 + y2) // 2),
                        'confidence': conf
                    })
        return balls

    def distance(self, p1, p2):
        x1, y1 = p1
        x2, y2, _ = p2
        return int(np.sqrt((x1 - x2)**2 + (y1 - y2)**2))

    def interpolation(self, p1, p2, n_frames):
        x1, y1 = p1
        x2, y2 = p2
        x_steps = (x2 - x1) / n_frames
        y_steps = (y2 - y1) / n_frames
        return [(int(x1 + i*x_steps), int(y1 + i*y_steps)) for i in range(n_frames)]

    def predict_next_approx_linear(self, x, y):
        assert len(x) >= 1 and len(y) >= 1, 'Should have atleast 1 element'
        assert len(x) == len(y), 'Both should have 1 length'
        if len(y) == 1:
            return y[0]
        elif len(y) >= 2:
            x = np.array(x)
            y = np.array(y)
            m, c = np.polyfit(x, y, 1)
            return m * (x[-1] + 1) + c
        else:
            return None

    def show_image(self, frame):
        plt.imshow(frame)


utils = Utils('test.mp4', model)

# ========TESTING========
# frame = utils.get_frame(4)
# print(type(frame), frame.shape)

# balls = utils.detect_balls(frame)
# print(len(balls))

# utils.show_image(frame)

In [5]:
def process_video_get_pos(input_path):
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    data = {'frames': [], 'pos': []}

    pbar = tqdm()
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        # Detect and classify balls
        pos = utils.detect_balls(frame)
        data['frames'].append(frame)
        data['pos'].append(pos)
        pbar.update(1)

    cap.release()
    pbar.close()
    return data

# Process test1 video
data = process_video_get_pos('test.mp4')

306it [02:13,  2.29it/s]


In [9]:
def ball_tracking(frame_positions):
    past_n_frames = 5
    new_ball_min_dist = 45
    max_balls = 0

    for p_ind, pos in tqdm(enumerate(frame_positions)):
        if p_ind == 0:
            for ind, p in enumerate(pos):
                pos_df[max_balls] = [None] * n_frames
                pos_df[max_balls][p_ind] = (p['center'][0], p['center'][1], 0)
                max_balls += 1
            continue
        else:
            for ball_id in range(max_balls):
                p = pos_df[ball_id][max(p_ind - past_n_frames, 0): p_ind]
                ind = [_ind for _ind, val in enumerate(p) if isinstance(val, tuple)]
                x = [i[0] for i in p if isinstance(i, tuple)]
                y = [i[1] for i in p if isinstance(i, tuple)]
                if len(ind) > 0:
                    pos_df[ball_id][p_ind] = (int(utils.predict_next_approx_linear(ind, x)),
                                              int(utils.predict_next_approx_linear(ind, y)), 1)
            for ind, p in enumerate(pos):
                tracking = []
                for ball_id in range(max_balls):
                    if isinstance(pos_df[ball_id][p_ind], tuple):
                        d = utils.distance(p['center'], pos_df[ball_id][p_ind])
                        tracking.append((ball_id, d, p['center'], p['bbox']))
                d_lst = [d for bi, d, p, bb in tracking]
                min_ind = d_lst.index(min(d_lst))
                b_id, d, p1, _ = tracking[min_ind]
                if d < new_ball_min_dist:
                    pos_df[b_id][p_ind] = (p1[0], p1[1], 0)
                else:
                    pos_df[max_balls] = [None] * n_frames
                    pos_df[max_balls][p_ind] = (p1[0], p1[1], 0)
                    max_balls += 1


n_frames = len(data['frames'])
pos_df = {}
pos = data['pos']
ball_tracking(pos)

pos_df = pd.DataFrame(pos_df)
pos_df

306it [00:00, 1592.53it/s]


Unnamed: 0,0,1,2,3,4
0,"(1115, 404, 0)","(533, 369, 0)","(1078, 393, 0)","(43, 351, 0)",
1,"(1115, 404, 0)","(533, 369, 0)","(1077, 392, 0)","(43, 351, 0)",
2,"(1115, 404, 0)","(533, 369, 0)","(1077, 392, 0)","(43, 351, 0)",
3,"(1115, 404, 0)","(533, 369, 0)","(1077, 392, 0)","(42, 352, 0)",
4,"(1115, 404, 0)","(533, 369, 0)","(1076, 392, 0)","(42, 352, 0)",
...,...,...,...,...,...
301,"(1121, 342, 1)","(572, 332, 0)","(-100, 310, 1)","(92, 326, 0)","(-963, -133, 1)"
302,"(1120, 341, 1)","(572, 332, 0)","(-106, 307, 1)","(92, 326, 0)","(-964, -122, 1)"
303,"(1118, 339, 1)","(571, 332, 0)","(-112, 304, 1)","(92, 325, 0)","(-965, -112, 1)"
304,"(1117, 338, 1)","(571, 332, 0)","(-118, 301, 1)","(92, 325, 0)","(-966, -101, 1)"


In [12]:
new_pos_df = pos_df.copy()

def transform(x):
    if x == None:
        return 0
    else:
        return x[2]

for col in pos_df.columns:
    vals = pos_df[col].apply(transform)
    group = (vals != vals.shift()).cumsum()
    mask = vals.eq(1) & vals.groupby(group).transform('size').gt(10)
    pos_df[col][mask] = None

You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the original DataFrame or Series, because the intermediate object on which we are setting values will behave as a copy.
A typical example is when you are setting values in a column of a DataFrame, like:

df["col"][row_indexer] = value

Use `df.loc[row_indexer, "col"] = values` instead, to perform the assignment in a single step and ensure this keeps updating the original `df`.

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

  pos_df[col][mask] = None
You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the original DataFrame or Series, becaus

In [24]:
def transform(x):
    if x == None:
        return None
    else:
        return (x[0], x[1])

for col in pos_df.columns:
    pos_df[col] = pos_df[col].apply(transform)

In [26]:
total_x, total_y = [], []
def get_std():
    for col in pos_df.columns:
        vals = [i for i in pos_df[col] if isinstance(i, tuple)]
        x = [i[0] for i in vals]
        y = [i[1] for i in vals]
        total_x.extend(x)
        total_y.extend(y)
        print(np.std(x), np.std(y))

get_std()

9.09852531464896 21.24790008509451
17.838040642510602 15.768732075643934
351.7706023316014 24.8448248855133
23.03510497435963 13.66953178944275
0.0 0.0


In [27]:
mean_std_x, mean_std_y = np.std(total_x), np.std(total_y)
mean_std_x, mean_std_y

(np.float64(402.73412271700363), np.float64(22.915747547204735))

In [28]:
def transform(col):
    vals = [i for i in pos_df[col] if isinstance(i, tuple)]
    x = [i[0] for i in vals]
    y = [i[1] for i in vals]
    return True if np.std(x) > 0.5 * mean_std_x else False

ball_mapping = {ball_id: transform(ball_id) for ball_id in range(pos_df.shape[1])}
ball_mapping

{0: False, 1: False, 2: True, 3: False, 4: False}

In [20]:
# =======================VERSION-1===========================

# def ball_tracking(poss):
#     balls = {}
#     for pind, pos in tqdm(enumerate(poss)):
#         neartes_match = False
#         if pind == 0:
#             for ind, p in enumerate(pos):
#                 balls.setdefault(ind, [])
#                 balls[ind].append(p['center'])
#             continue
#         else:
#             for p in pos:
#                 tracking = []
#                 for ball_ind, ball_pos_lst in balls.items():
#                     d = utils.distance(p['center'], ball_pos_lst[-1])
#                     tracking.append((ball_ind, d, p['center']))
#                 d_lst = [d for bi, d, p in tracking]
#                 min_ind = d_lst.index(min(d_lst))
#                 bi = tracking[min_ind][0]
#                 p1 = tracking[min_ind][2]
#                 if len(balls[bi]) < pind:
#                     # balls[bi].extend([p1] * (pind - len(balls[bi])))
#                     balls[bi].extend(utils.interpolation(balls[min_ind][-1], p1, pind - len(balls[bi])))
#                 balls[bi].append(p1)


#     return balls

# balls = ball_tracking(poss)

In [22]:
# =======================VERSION-2===========================

def process_video_with_yolo(input_path, output_path):
    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    frame_no = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Draw stationary balls (red bounding boxes)
        for ball_no in range(pos_df.shape[1]):
            print(frame_no, ball_no, pos_df.loc[frame_no, ball_no])
            try:
                x, y, _ = pos_df.loc[frame_no, ball_no]
                cv2.rectangle(frame, (x-5, y-5), (x+5, y+5), (0, 0, 255), 2)
                if ball_mapping[ball_no]:
                    cv2.putText(frame, f"ID {ball_no} | ACTION", (x, y-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
                else:
                    cv2.putText(frame, f"ID {ball_no} | STATIONARY", (x, y-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
            except:
                pass
        frame_no += 1

        out.write(frame)

    cap.release()
    out.release()
    return data

# Process test1 video
data = process_video_with_yolo('test.mp4', 'output_test6.mp4')

0 0 (1115, 404, 0)
0 1 (533, 369, 0)
0 2 (1078, 393, 0)
0 3 (43, 351, 0)
0 4 None
1 0 (1115, 404, 0)
1 1 (533, 369, 0)
1 2 (1077, 392, 0)
1 3 (43, 351, 0)
1 4 None
2 0 (1115, 404, 0)
2 1 (533, 369, 0)
2 2 (1077, 392, 0)
2 3 (43, 351, 0)
2 4 None
3 0 (1115, 404, 0)
3 1 (533, 369, 0)
3 2 (1077, 392, 0)
3 3 (42, 352, 0)
3 4 None
4 0 (1115, 404, 0)
4 1 (533, 369, 0)
4 2 (1076, 392, 0)
4 3 (42, 352, 0)
4 4 None
5 0 (1115, 404, 0)
5 1 (533, 369, 0)
5 2 (1076, 391, 0)
5 3 (42, 352, 0)
5 4 None
6 0 (1115, 403, 0)
6 1 (533, 369, 0)
6 2 (1076, 391, 0)
6 3 (42, 352, 0)
6 4 None
7 0 (1115, 403, 0)
7 1 (533, 369, 0)
7 2 (1076, 391, 0)
7 3 (42, 352, 0)
7 4 None
8 0 (1115, 403, 0)
8 1 (532, 369, 0)
8 2 (1076, 391, 0)
8 3 (42, 351, 0)
8 4 None
9 0 (1115, 403, 0)
9 1 (532, 369, 0)
9 2 (1076, 390, 0)
9 3 (41, 352, 0)
9 4 None
10 0 (1115, 402, 0)
10 1 (532, 369, 0)
10 2 (1076, 390, 0)
10 3 (41, 352, 0)
10 4 None
11 0 (1115, 402, 0)
11 1 (532, 369, 0)
11 2 (1076, 390, 0)
11 3 (41, 352, 0)
11 4 None
12 0 (