In [None]:
import pandas as pd
from numpy.linalg import inv
import numpy as np
import math
import cv2
import torch

from matplotlib import pyplot as plt
%matplotlib inline

In [None]:
model = torch.hub.load('ultralytics/yolov5', 'yolov5m', verbose=False)
device = torch.device('cuda')
model = model.to(torch.device('cpu'))

In [None]:
def draw_prediction(img: np.ndarray,
                    class_name: str,
                    df: pd.core.series.Series,
                    color: tuple = (255, 0, 0)):
    '''
    Function to draw prediction around the bounding box identified by the YOLO
    The Function also displays the confidence score top of the bounding box
    '''

    cv2.rectangle(img, (int(df.xmin), int(df.ymin)),
                  (int(df.xmax), int(df.ymax)), color, 2)
    cv2.putText(img, class_name + " " + str(round(df.confidence, 2)),
                (int(df.xmin) - 10, int(df.ymin) - 10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    return img

In [None]:
def convert_video_to_frame(path: str):
    '''
    The function take input as video file and returns a list of images for every video
    '''

    cap = cv2.VideoCapture(path)
    fps = cap.get(cv2.CAP_PROP_FPS)

    img = []
    while cap.isOpened():
        ret, frame = cap.read()
        if ret == True:
            img.append(frame)
        else:
            break

    cap.release()
    return img, fps

In [None]:
img_multi, fps_multi = convert_video_to_frame('/content/multiple_balls.avi')
img_sin, fps_sin = convert_video_to_frame('/content/WhatsApp Video 2024-04-16 at 11.36.19 AM.mp4')

In [None]:
results_sin = model(img_sin)
results_multi = model(img_multi)

df_sin = results_sin.pandas().xyxy
df_multi = results_multi.pandas().xyxy



In [None]:
class KalmanFilter():
    def __init__(self,
                 xinit: int = 0,
                 yinit: int = 0,
                 fps: int = 30,
                 std_a: float = 0.001,
                 std_x: float = 0.0045,
                 std_y: float = 0.01,
                 cov: float = 100000) -> None:

        # State Matrix
        self.S = np.array([xinit, 0, 0, yinit, 0, 0])
        self.dt = 1 / fps

        # State Transition Model
        # Here, we assume that the model follow Newtonian Kinematics
        self.F = np.array([[1, self.dt, 0.5 * (self.dt * self.dt), 0, 0, 0],
                           [0, 1, self.dt, 0, 0, 0], [0, 0, 1, 0, 0, 0],
                           [0, 0, 0, 1, self.dt, 0.5 * self.dt * self.dt],
                           [0, 0, 0, 0, 1, self.dt], [0, 0, 0, 0, 0, 1]])

        self.std_a = std_a

        # Process Noise
        self.Q = np.array([
            [
                0.25 * self.dt * self.dt * self.dt * self.dt, 0.5 * self.dt *
                self.dt * self.dt, 0.5 * self.dt * self.dt, 0, 0, 0
            ],
            [
                0.5 * self.dt * self.dt * self.dt, self.dt * self.dt, self.dt,
                0, 0, 0
            ], [0.5 * self.dt * self.dt, self.dt, 1, 0, 0, 0],
            [
                0, 0, 0, 0.25 * self.dt * self.dt * self.dt * self.dt,
                0.5 * self.dt * self.dt * self.dt, 0.5 * self.dt * self.dt
            ],
            [
                0, 0, 0, 0.5 * self.dt * self.dt * self.dt, self.dt * self.dt,
                self.dt
            ], [0, 0, 0, 0.5 * self.dt * self.dt, self.dt, 1]
        ]) * self.std_a * self.std_a

        self.std_x = std_x
        self.std_y = std_y

        # Measurement Noise
        self.R = np.array([[self.std_x * self.std_x, 0],
                           [0, self.std_y * self.std_y]])

        self.cov = cov

        # Estimate Uncertainity
        self.P = np.array([[self.cov, 0, 0, 0, 0, 0],
                           [0, self.cov, 0, 0, 0, 0],
                           [0, 0, self.cov, 0, 0, 0],
                           [0, 0, 0, self.cov, 0, 0],
                           [0, 0, 0, 0, self.cov, 0],
                           [0, 0, 0, 0, 0, self.cov]])

        # Observation Matrix
        # Here, we are observing X & Y (0th index and 3rd Index)
        self.H = np.array([[1, 0, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0]])

        self.I = np.array([[1, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0],
                           [0, 0, 1, 0, 0, 0], [0, 0, 0, 1, 0, 0],
                           [0, 0, 0, 0, 1, 0], [0, 0, 0, 0, 0, 1]])

        # Predicting the next state and estimate uncertainity
        self.S_pred = None
        self.P_pred = None

        # Kalman Gain
        self.K = None

        # Storing all the State, Kalman Gain and Estimate Uncertainity
        self.S_hist = [self.S]
        self.K_hist = []
        self.P_hist = [self.P]

    def pred_new_state(self):
        self.S_pred = self.F.dot(self.S)

    def pred_next_uncertainity(self):
        self.P_pred = self.F.dot(self.P).dot(self.F.T) + self.Q

    def get_Kalman_gain(self):
        self.K = self.P_pred.dot(self.H.T).dot(
            inv(self.H.dot(self.P_pred).dot(self.H.T) + self.R))
        self.K_hist.append(self.K)

    def state_correction(self, z):
        if z == [None, None]:
            self.S = self.S_pred
        else:
            self.S = self.S_pred + +self.K.dot(z - self.H.dot(self.S_pred))

        self.S_hist.append(self.S)

    def uncertainity_correction(self, z):
        if z != [None, None]:
            self.l1 = self.I - self.K.dot(self.H)
            self.P = self.l1.dot(self.P_pred).dot(self.l1.T) + self.K.dot(
                self.R).dot(self.K.T)
        self.P_hist.append(self.P)

In [None]:
filter_sin = KalmanFilter(fps=fps_sin, xinit=60,
                          yinit=150, std_x=0.000025, std_y=0.0001)

filter_multi = [
    KalmanFilter(fps=fps_multi, xinit=60, yinit=150,
                 std_x=0.000025, std_y=0.0001),
    KalmanFilter(fps=fps_multi, xinit=620, yinit=150,
                 std_x=0.000025, std_y=0.0001)
]

In [None]:
for df in df_sin:
    df = df.loc[df['name'] == 'sports ball']
    x_cen, y_cen = None, None

    if len(df) > 0:
        x_cen = (df.xmin.values[0] + df.xmax.values[0]) / 2
        y_cen = (df.ymin.values[0] + df.ymax.values[0]) / 2

    filter_sin.pred_new_state()
    filter_sin.pred_next_uncertainity()
    filter_sin.get_Kalman_gain()
    filter_sin.state_correction([x_cen, y_cen])
    filter_sin.uncertainity_correction([x_cen, y_cen])

In [None]:
out = cv2.VideoWriter('single_ball_kalman.mp4', cv2.VideoWriter_fourcc(*'mp4v'), 10,
                      (img_sin[0].shape[1], img_sin[0].shape[0]))

for i in range(len(img_sin)):
    x, y = filter_sin.S_hist[i][0], filter_sin.S_hist[i][3]
    df = df_sin[i].loc[df_sin[i]['name'] == 'sports ball']
    tmp_img = img_sin[i]

    for j in df.index.values:
        tmp_img = draw_prediction(tmp_img, 'Ball', df.loc[j])

    tmp_img = cv2.circle(tmp_img, (math.floor(
        filter_sin.S_hist[i][0]), math.floor(filter_sin.S_hist[i][3])),
        radius=1,
        color=(255, 0, 0),
        thickness=3)

    out.write(tmp_img)

out.release()