In [9]:
!pip install ultralytics

Defaulting to user installation because normal site-packages is not writeable


In [10]:
import pandas as pd
from numpy.linalg import inv
import numpy as np
import math
import cv2
import torch
from matplotlib import pyplot as plt

import torch
from ultralytics import YOLO

In [3]:
model = YOLO('best.pt')  # Ensure you have the correct model file

# Ensure to use the GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

YOLO(
  (model): DetectionModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 48, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(48, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(48, 96, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(96, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C2f(
        (cv1): Conv(
          (conv): Conv2d(96, 96, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(96, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Conv(
          (conv): Conv2d(192, 96, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(96, eps=0.001, momentum=0.03, affine=True, track_running_

In [4]:
def draw_prediction(img: np.ndarray,
                    class_name: str,
                    df: pd.core.series.Series,
                    color: tuple = (255, 0, 0)):
    '''
    Function to draw prediction around the bounding box identified by the YOLO
    The Function also displays the confidence score top of the bounding box
    '''

    cv2.rectangle(img, (int(df.xmin), int(df.ymin)),
                  (int(df.xmax), int(df.ymax)), color, 2)
    cv2.putText(img, class_name + " " + str(round(df.confidence, 2)),
                (int(df.xmin) - 10, int(df.ymin) - 10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    return img

In [5]:
class KalmanFilter():
    def __init__(self,
                 xinit: int = 0,
                 yinit: int = 0,
                 fps: int = 30,
                 std_a: float = 0.001,
                 std_x: float = 0.0045,
                 std_y: float = 0.01,
                 cov: float = 100000) -> None:

        # State Matrix
        self.S = np.array([xinit, 0, 0, yinit, 0, 0])
        self.dt = 1 / fps

        # State Transition Model
        # Here, we assume that the model follow Newtonian Kinematics
        self.F = np.array([[1, self.dt, 0.5 * (self.dt * self.dt), 0, 0, 0],
                           [0, 1, self.dt, 0, 0, 0], [0, 0, 1, 0, 0, 0],
                           [0, 0, 0, 1, self.dt, 0.5 * self.dt * self.dt],
                           [0, 0, 0, 0, 1, self.dt], [0, 0, 0, 0, 0, 1]])

        self.std_a = std_a

        # Process Noise
        self.Q = np.array([
            [
                0.25 * self.dt * self.dt * self.dt * self.dt, 0.5 * self.dt *
                self.dt * self.dt, 0.5 * self.dt * self.dt, 0, 0, 0
            ],
            [
                0.5 * self.dt * self.dt * self.dt, self.dt * self.dt, self.dt,
                0, 0, 0
            ], [0.5 * self.dt * self.dt, self.dt, 1, 0, 0, 0],
            [
                0, 0, 0, 0.25 * self.dt * self.dt * self.dt * self.dt,
                0.5 * self.dt * self.dt * self.dt, 0.5 * self.dt * self.dt
            ],
            [
                0, 0, 0, 0.5 * self.dt * self.dt * self.dt, self.dt * self.dt,
                self.dt
            ], [0, 0, 0, 0.5 * self.dt * self.dt, self.dt, 1]
        ]) * self.std_a * self.std_a

        self.std_x = std_x
        self.std_y = std_y

        # Measurement Noise
        self.R = np.array([[self.std_x * self.std_x, 0],
                           [0, self.std_y * self.std_y]])

        self.cov = cov

        # Estimate Uncertainity
        self.P = np.array([[self.cov, 0, 0, 0, 0, 0],
                           [0, self.cov, 0, 0, 0, 0],
                           [0, 0, self.cov, 0, 0, 0],
                           [0, 0, 0, self.cov, 0, 0],
                           [0, 0, 0, 0, self.cov, 0],
                           [0, 0, 0, 0, 0, self.cov]])

        # Observation Matrix
        # Here, we are observing X & Y (0th index and 3rd Index)
        self.H = np.array([[1, 0, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0]])

        self.I = np.array([[1, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0],
                           [0, 0, 1, 0, 0, 0], [0, 0, 0, 1, 0, 0],
                           [0, 0, 0, 0, 1, 0], [0, 0, 0, 0, 0, 1]])

        # Predicting the next state and estimate uncertainity
        self.S_pred = None
        self.P_pred = None

        # Kalman Gain
        self.K = None

        # Storing all the State, Kalman Gain and Estimate Uncertainity
        self.S_hist = [self.S]
        self.K_hist = []
        self.P_hist = [self.P]

    def pred_new_state(self):
        self.S_pred = self.F.dot(self.S)

    def pred_next_uncertainity(self):
        self.P_pred = self.F.dot(self.P).dot(self.F.T) + self.Q

    def get_Kalman_gain(self):
        self.K = self.P_pred.dot(self.H.T).dot(
            inv(self.H.dot(self.P_pred).dot(self.H.T) + self.R))
        self.K_hist.append(self.K)

    def state_correction(self, z):
        if z == [None, None]:
            self.S = self.S_pred
        else:
            self.S = self.S_pred + +self.K.dot(z - self.H.dot(self.S_pred))

        self.S_hist.append(self.S)

    def uncertainity_correction(self, z):
        if z != [None, None]:
            self.l1 = self.I - self.K.dot(self.H)
            self.P = self.l1.dot(self.P_pred).dot(self.l1.T) + self.K.dot(
                self.R).dot(self.K.T)
        self.P_hist.append(self.P)

def cost_fun(a, b):
    '''
    Cost function for filter Assignment
    Uses euclidean distance for choosing the filter
    '''

    sm = 0
    for i in range(len(a)):
        sm += (a[i] - b[i])**2
    return sm

In [76]:
import json
import numpy as np
import cv2
from collections import Counter

# Global variables to track coordinate frequencies and stationary object coordinates
global_coord_frequency = {}
stationary_coords = []  # List to store coordinates of detected stationary objects

def group_similar_coordinates(coords, threshold=10):
    """
    Groups coordinates that are within a threshold distance from each other.
    Returns a list of unique coordinates (averaged within the group).
    Also updates the global frequency dictionary.
    """
    grouped_coords = []
    used = [False] * len(coords)

    for i in range(len(coords)):
        if used[i]:
            continue

        # Start a new group with the current coordinate
        current_group = [coords[i]]
        used[i] = True

        for j in range(i + 1, len(coords)):
            # Compute the Euclidean distance between coordinates
            dist = np.linalg.norm(np.array(coords[i]) - np.array(coords[j]))
            if dist < threshold:
                current_group.append(coords[j])
                used[j] = True

        # Average the grouped coordinates
        avg_coord = tuple(np.mean(current_group, axis=0))
        grouped_coords.append((avg_coord, len(current_group)))

        # Update global coordinate frequency
        if avg_coord in global_coord_frequency:
            global_coord_frequency[avg_coord] += len(current_group)
        else:
            global_coord_frequency[avg_coord] = len(current_group)

    return grouped_coords

def identify_stationary_objects(threshold=10):
    """
    Identifies stationary objects by grouping similar coordinates based on frequency of occurrence
    and merging those that are close together.
    """
    global global_coord_frequency, stationary_coords

    # Get the list of coordinates and their frequencies from the global frequency dictionary
    coords_with_freq = list(global_coord_frequency.items())

    def group_coordinates_by_proximity(coords_with_freq, threshold):
        """
        Groups coordinates in the global frequency dictionary by proximity and sums their frequencies.
        """
        grouped_freq = []
        used = [False] * len(coords_with_freq)

        for i in range(len(coords_with_freq)):
            if used[i]:
                continue

            current_group = [coords_with_freq[i][0]]  # Start a new group with the current coordinate
            total_freq = coords_with_freq[i][1]  # Initialize total frequency with current frequency
            used[i] = True

            for j in range(i + 1, len(coords_with_freq)):
                # Compute the Euclidean distance between the coordinates
                dist = np.linalg.norm(np.array(coords_with_freq[i][0]) - np.array(coords_with_freq[j][0]))
                if dist < threshold:
                    current_group.append(coords_with_freq[j][0])
                    total_freq += coords_with_freq[j][1]
                    used[j] = True

            # Average the grouped coordinates
            avg_coord = tuple(np.mean(current_group, axis=0))
            grouped_freq.append((avg_coord, total_freq))

        return grouped_freq

    # Group the coordinates by proximity
    grouped_coords_with_freq = group_coordinates_by_proximity(coords_with_freq, threshold)

    # Define a threshold for considering an object stationary based on frequency
    stationary_threshold = 25  # You can adjust this value

    # Find coordinates that occur above the threshold and add to stationary_coords
    stationary_coords = [(coord,freq) for coord, freq in grouped_coords_with_freq if freq > stationary_threshold]
    
    return stationary_coords

def is_close_to_blacklist(coord, black_list, threshold=1):
    for black_coord in black_list:
        distance = np.sqrt((coord[0] - black_coord[0])**2 + (coord[1] - black_coord[1])**2)
        if distance <= threshold:
            return True
    return False

In [90]:
def real_time_detection_and_tracking(video_path, output_json_path):
    global global_coord_frequency, stationary_coords

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    print(f"FPS: {fps}")

    # Initialize Kalman filter (assuming one object for now)
    # filter_multi = [KalmanFilter(fps=fps, xinit=60, yinit=150, std_x=0.000025, std_y=0.0001)]
    
    # Set up the video writer for saving the result
    ret, frame = cap.read()
    frame_height, frame_width = frame.shape[:2]
    out = cv2.VideoWriter('realtime_tracking_kalman.mp4', cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

    # Dictionary to store tracking data
    tracking_data = {}
    coord_counter = Counter()  # Directly maintain a counter for coordinates
    frame_count = 0

    black_list = [(2477.4579819544147, 1486.213537329001),(1529.9303159460558, 326.81500203630566), 
                  (1285.9160690307617, 848.287064022488), (2700.828621018101, 594.6784457354479),
                  (904.2800679524739, 558.4161139594185), (1285.8978812081473, 849.1556815011161), 
                  (2701.013951455393, 595.3080670756679)]
    lastx = None
    lasty = None
    lastframeno = None

    listt = {}

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Run object detection on the current frame (replace with your model inference)
        print(frame_count)
        results = model([frame])

        # Assuming YOLO model results
        boxes = results[0].boxes.xyxy.cpu().numpy()  # Get the bounding boxes
        class_ids = results[0].boxes.cls.cpu().int().numpy()  # Get the class IDs
        scores = results[0].boxes.conf.cpu().numpy()  # Get the confidence scores

        # Convert detection results into a DataFrame
        df_current = pd.DataFrame({
            'xmin': boxes[:, 0],
            'ymin': boxes[:, 1],
            'xmax': boxes[:, 2],
            'ymax': boxes[:, 3],
            'class_id': class_ids,
            'confidence': scores
        })

        # Initialize a list to collect the current frame's coordinates for grouping
        current_coords = []
        if frame_count not in listt:
            listt[frame_count] = []

        for _, row in df_current.iterrows():
            coord = [(row['xmin'] + row['xmax']) / 2, (row['ymin'] + row['ymax']) / 2]  # Object center coordinates

            # Only track specific class (e.g., class_id == 0 for a ball/shuttlecock)
            if row['class_id'] == 0:
                if not is_close_to_blacklist(coord, black_list, threshold=15):
                    current_coords.append(coord)
                  
                    if lastx is None:
                        speed = 0
                    else:
                        speed = np.sqrt((coord[0] - lastx)**2 + (coord[1] - lasty)**2) / (frame_count - lastframeno)
                    print(speed)
                    listt[frame_count].append({
                        'x_center': coord[0],
                        'y_center': coord[1],
                        'speed': speed
                    })
                    lastx = coord[0]
                    lasty = coord[1]
                    lastframeno = frame_count

        # Update tracking_data from listt
        if len(listt[frame_count]) == 1:
            tracking_data[f"{frame_count}"] = listt[frame_count][0]

        grouped_coords = group_similar_coordinates(current_coords, threshold=100)

        # Clear coord_counter and update it with the grouped coordinates
        coord_counter.clear()
        for coord, count in grouped_coords:
            coord_counter[coord] += count

        # Visualize the tracking (optional)
        tmp_img = frame.copy()
        dummy = 15
        for x, y in current_coords:
            cv2.rectangle(tmp_img, (int(x) - dummy, int(y) - dummy),
                          (int(x) + dummy, int(y) + dummy), (0, 255, 0), 2)

        # Draw black list rectangles
        dummy = 20
        for x, y in black_list:
            cv2.rectangle(tmp_img, (int(x) - dummy, int(y) - dummy),
                          (int(x) + dummy, int(y) + dummy), (0, 140, 255), 5)

            font = cv2.FONT_HERSHEY_SIMPLEX
            cv2.putText(tmp_img, 'stationary', (int(x) - dummy, int(y) - dummy - 10), font, 1, (0, 140, 255), 3)

        cv2.imwrite("tmp_img.jpg", tmp_img)
        out.write(tmp_img)

        # Increment frame count
        frame_count += 1

    cap.release()
    out.release()
    cv2.destroyAllWindows()

    # Save tracking data to a JSON file
    with open(output_json_path, 'w') as json_file:
        json.dump(tracking_data, json_file, indent=4)

    # Identify stationary objects based on frequency
    stationary_coords = identify_stationary_objects()
    print("Stationary coordinates detected:")
    print(stationary_coords)

# Example usage
# real_time_detection_and_tracking("video.mp4", "tracking_data.json")


In [91]:
real_time_detection_and_tracking('12sec.mp4', 'tracking_data.json')

FPS: 59.2
0

0: 416x640 3 Shuttlecocks, 665.0ms
Speed: 3.2ms preprocess, 665.0ms inference, 0.0ms postprocess per image at shape (1, 3, 416, 640)
1

0: 416x640 3 Shuttlecocks, 568.4ms
Speed: 0.0ms preprocess, 568.4ms inference, 0.0ms postprocess per image at shape (1, 3, 416, 640)
2

0: 416x640 3 Shuttlecocks, 538.7ms
Speed: 0.0ms preprocess, 538.7ms inference, 0.0ms postprocess per image at shape (1, 3, 416, 640)
3

0: 416x640 3 Shuttlecocks, 518.7ms
Speed: 3.8ms preprocess, 518.7ms inference, 0.0ms postprocess per image at shape (1, 3, 416, 640)
4

0: 416x640 3 Shuttlecocks, 512.1ms
Speed: 0.0ms preprocess, 512.1ms inference, 0.0ms postprocess per image at shape (1, 3, 416, 640)
5

0: 416x640 3 Shuttlecocks, 581.2ms
Speed: 0.0ms preprocess, 581.2ms inference, 0.0ms postprocess per image at shape (1, 3, 416, 640)
6

0: 416x640 3 Shuttlecocks, 537.8ms
Speed: 0.0ms preprocess, 537.8ms inference, 2.7ms postprocess per image at shape (1, 3, 416, 640)
7

0: 416x640 3 Shuttlecocks, 570.8ms


In [85]:
def identify_stationary_objects(threshold=10):
    """
    Identifies stationary objects by grouping similar coordinates based on frequency of occurrence
    and merging those that are close together.
    """
    global global_coord_frequency, stationary_coords

    # Get the list of coordinates and their frequencies from the global frequency dictionary
    coords_with_freq = list(global_coord_frequency.items())

    def group_coordinates_by_proximity(coords_with_freq, threshold):
        """
        Groups coordinates in the global frequency dictionary by proximity and sums their frequencies.
        """
        grouped_freq = []
        used = [False] * len(coords_with_freq)

        for i in range(len(coords_with_freq)):
            if used[i]:
                continue

            current_group = [coords_with_freq[i][0]]  # Start a new group with the current coordinate
            total_freq = coords_with_freq[i][1]  # Initialize total frequency with current frequency
            used[i] = True

            for j in range(i + 1, len(coords_with_freq)):
                # Compute the Euclidean distance between the coordinates
                dist = np.linalg.norm(np.array(coords_with_freq[i][0]) - np.array(coords_with_freq[j][0]))
                if dist < threshold:
                    current_group.append(coords_with_freq[j][0])
                    total_freq += coords_with_freq[j][1]
                    used[j] = True

            # Average the grouped coordinates
            avg_coord = tuple(np.mean(current_group, axis=0))
            grouped_freq.append((avg_coord, total_freq))

        return grouped_freq

    # Group the coordinates by proximity
    grouped_coords_with_freq = group_coordinates_by_proximity(coords_with_freq, threshold)

    # Define a threshold for considering an object stationary based on frequency
    stationary_threshold = 10  # You can adjust this value

    # Find coordinates that occur above the threshold and add to stationary_coords
    stationary_coords = [(coord,freq) for coord, freq in grouped_coords_with_freq if freq > stationary_threshold]
    
    return stationary_coords
identify_stationary_objects()
print(stationary_coords)


[((904.2800679524739, 558.4161139594185), 45), ((1285.8978812081473, 849.1556815011161), 35), ((2701.013951455393, 595.3080670756679), 31)]


In [49]:
global_coord_frequency

{(2477.5355224609375, 1486.3095703125): 1,
 (1529.9351806640625, 326.7829895019531): 1,
 (904.2285766601562, 558.4496459960938): 1,
 (2477.52099609375, 1486.3462524414062): 1,
 (1529.9125366210938, 326.7773132324219): 1,
 (904.2281494140625, 558.4516296386719): 1,
 (2477.5294189453125, 1486.3165893554688): 1,
 (1529.9005126953125, 326.7796936035156): 1,
 (904.223876953125, 558.433837890625): 1,
 (2477.518798828125, 1486.3561401367188): 1,
 (1529.8467407226562, 326.7969055175781): 1,
 (904.2454223632812, 558.4376525878906): 1,
 (2477.53173828125, 1486.3096923828125): 1,
 (1529.8655395507812, 326.83140563964844): 1,
 (904.2171630859375, 558.5182495117188): 1,
 (2477.518798828125, 1486.3486938476562): 1,
 (1529.8375244140625, 326.8296203613281): 1,
 (904.1736145019531, 558.4793701171875): 1,
 (2477.5328369140625, 1486.319091796875): 1,
 (1529.8358764648438, 326.8828125): 1,
 (904.2461242675781, 558.4206848144531): 1,
 (2477.5120849609375, 1486.3770141601562): 1,
 (1529.791748046875, 326.8