In [1]:
from ultralytics import YOLO
from ultralytics.utils.plotting import Annotator
import numpy as np
import cv2
import time
import torch
import torchvision

model_track = YOLO("runs/detect/train11/weights/best.pt")
model_pose = YOLO("yolov8n-pose.pt")
model_seg = YOLO("runs/segment/train2/weights/last.pt")

In [2]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"

model_track.to(device)
model_seg.to(device)
model_pose.to(device)

YOLO(
  (model): PoseModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(16, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C2f(
        (cv1): Conv(
          (conv): Conv2d(32, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Conv(
          (conv): Conv2d(48, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=

In [3]:
def is_collision_detected(tracked_points):
    """
    Check if a collision is detected based on sudden motion change and height change.
    """

    if len(tracked_points) < 3:
        return False

    # Calculate the velocity vector between the last two tracked points
    last_point = tracked_points[-1][0]
    second_last_point = tracked_points[-2][0]
    velocity_vector = np.array(last_point) - np.array(second_last_point)

    # Calculate the magnitude of the velocity vector
    velocity_magnitude = np.linalg.norm(velocity_vector)

    # Set a velocity threshold
    velocity_threshold = 50  # Adjust as needed based on the speed of the ball

    # Check if the magnitude of the velocity vector exceeds the threshold
    if velocity_magnitude > velocity_threshold:
        # Calculate the height difference between the last two tracked points
        last_height = tracked_points[-1][0][1]
        second_last_height = tracked_points[-2][0][1]
        height_difference = last_height - second_last_height

        # Set a height threshold
        height_threshold = 50  # Adjust as needed based on the height difference indicating a collision

        # Check if the height difference exceeds the threshold
        if height_difference > height_threshold:
            return True

    return False

In [4]:
from collections import deque

def track_video_or_camera(video_source, device='cpu'):
    """
    Tracks points of interest in video file or live camera feed.
    
    Parameters:
    - video_source: Path to the video file or integer for camera index.
    - device: Device to run the segmentation and tracking models (default 'cpu').
    
    Press 'q' to exit the loop and close the window.
    """

    cap = cv2.VideoCapture(video_source)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    
    tracked_points = deque(maxlen=10)
    kernel = np.ones((100, 100), np.uint8)

    draw_circle = False
    circle_duration = 5.0
    circle_start_time = 0
    collision_position = None
    
    while cap.isOpened():
        success, frame = cap.read()
        if success:
            pose_results = model_pose(frame, conf=0.5, device=device, show_labels=False)
            if len(pose_results) > 0:
                keypoints = pose_results[0].keypoints.xy[0]
                
                for point in keypoints:
                    x, y = point[0], point[1]
                    
                    cv2.circle(frame, (int(x), int(y)), 3, (0, 255, 0), thickness=-1, lineType=cv2.FILLED)
        
            segmentation_results = model_seg(frame, conf=0.5, device=device, show_labels=False, vid_stride=60)
            overlay = frame.copy()
            if segmentation_results[0].masks and segmentation_results[0].masks.shape[0] > 0:
                mask = segmentation_results[0].masks.xy[0]
                stencil = np.zeros(overlay.shape[:-1]).astype(np.uint8)
                cv2.fillPoly(stencil, [np.int32([mask])], 255)

                dilated_stencil = cv2.dilate(stencil, kernel, iterations=1)
                padding_mask = cv2.subtract(dilated_stencil, stencil)

                overlay[padding_mask == 255] = (0, 0, 255)

                overlay[stencil == 255] = (0, 255, 0)

                frame = cv2.addWeighted(overlay, 0.15, frame, 0.85, 0)


                tracking_results = model_track(frame, conf=0.60, device=device)

                if tracking_results[0].boxes.shape[0] > 0:
                    first_box = tracking_results[0].boxes.data[0]

                    center_x = int((first_box[0] + first_box[2]) / 2)
                    center_y = int((first_box[1] + first_box[3]) / 2)

                    tracked_points.append(((center_x, center_y), time.time()))

                    if is_collision_detected(tracked_points):
                        print("Collision detected at ({}, {})".format(center_x, center_y))
                        draw_circle = True
                        circle_start_time = time.time()
                        collision_position = (center_x, center_y)

                current_time = time.time()
                tracked_points = [(pt, t) for (pt, t) in tracked_points if current_time - t < 1]

                if draw_circle and collision_position:
                    if time.time() - circle_start_time < circle_duration:
                        # Draw a circle on the collision position
                        cv2.circle(frame, collision_position, radius=5, color=(255, 0, 0), thickness=10)
                    else:
                        draw_circle = False
                

                for i in range(1, len(tracked_points)):
                    cv2.line(frame, tracked_points[i - 1][0], tracked_points[i][0], (0, 255, 0), 2)

                cv2.imshow("YOLOv8 Tracking", frame)
            else:
                cv2.imshow("YOLOv8 Tracking", frame)

            if cv2.waitKey(1) & 0xFF in [ord("q"), ord("Q")]:
                break
        else:
            break

    cap.release()
    cv2.destroyAllWindows()

In [None]:
pose_results[0].keypoints

ultralytics.engine.results.Keypoints object with attributes:

conf: tensor([[0.2242, 0.1020, 0.1760, 0.2866, 0.5499, 0.9413, 0.9649, 0.7761, 0.8914, 0.6563, 0.7804, 0.9790, 0.9845, 0.9584, 0.9682, 0.9310, 0.9438]], device='cuda:0')
data: tensor([[[0.0000e+00, 0.0000e+00, 2.2424e-01],
         [0.0000e+00, 0.0000e+00, 1.0195e-01],
         [0.0000e+00, 0.0000e+00, 1.7602e-01],
         [0.0000e+00, 0.0000e+00, 2.8655e-01],
         [2.4821e+02, 5.4442e+02, 5.4993e-01],
         [2.3185e+02, 5.5360e+02, 9.4133e-01],
         [2.4489e+02, 5.5914e+02, 9.6488e-01],
         [2.1773e+02, 5.6845e+02, 7.7608e-01],
         [2.4446e+02, 5.7920e+02, 8.9142e-01],
         [2.1337e+02, 5.8123e+02, 6.5634e-01],
         [2.4652e+02, 5.9200e+02, 7.8035e-01],
         [2.1893e+02, 5.8713e+02, 9.7902e-01],
         [2.2839e+02, 5.8913e+02, 9.8455e-01],
         [2.1501e+02, 6.1068e+02, 9.5840e-01],
         [2.3327e+02, 6.1304e+02, 9.6817e-01],
         [1.9314e+02, 6.2935e+02, 9.3103e-01],
         [

# TODO: Improve detection speed on cpu detection 

In [5]:
video_path_or_camera_index = "tenis2.mp4"  # or 0 for webcam
track_video_or_camera(video_path_or_camera_index, device=device)


0: 384x640 1 person, 98.1ms
Speed: 3.0ms preprocess, 98.1ms inference, 123.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 36.0ms
Speed: 3.0ms preprocess, 36.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 90.1ms
Speed: 3.0ms preprocess, 90.1ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 9.0ms
Speed: 10.0ms preprocess, 9.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 8.0ms
Speed: 1.0ms preprocess, 8.0ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 9.0ms
Speed: 1.0ms preprocess, 9.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 8.0ms
Speed: 1.0ms preprocess, 8.0ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 16.0ms
Speed: 1.0ms preprocess, 16.0ms inference, 2.0ms postprocess per i