In [None]:
from __future__ import absolute_import, division

from copy import deepcopy
from math import log, exp, sqrt
import sys
import numpy as np
from numpy import dot, zeros, eye, isscalar, shape
import numpy.linalg as linalg
from filterpy.stats import logpdf
from filterpy.common import pretty_str, reshape_z


class KalmanFilter(object):

    def __init__(self, dim_x, dim_z, dim_u=0):
        if dim_x < 1:
            raise ValueError('dim_x must be 1 or greater')
        if dim_z < 1:
            raise ValueError('dim_z must be 1 or greater')
        if dim_u < 0:
            raise ValueError('dim_u must be 0 or greater')

        self.dim_x = dim_x
        self.dim_z = dim_z
        self.dim_u = dim_u

        self.x = zeros((dim_x, 1))        # state
        self.P = eye(dim_x)               # uncertainty covariance
        self.Q = eye(dim_x)               # process uncertainty
        self.B = None                     # control transition matrix
        self.F = eye(dim_x)               # state transition matrix
        self.H = zeros((dim_z, dim_x))    # measurement function
        self.R = eye(dim_z)               # measurement uncertainty
        self._alpha_sq = 1.               # fading memory control
        self.M = np.zeros((dim_x, dim_z)) # process-measurement cross correlation
        self.z = np.array([[None]*self.dim_z]).T

        # gain and residual are computed during the innovation step. We
        # save them so that in case you want to inspect them for various
        # purposes
        self.K = np.zeros((dim_x, dim_z)) # kalman gain
        self.y = zeros((dim_z, 1))
        self.S = np.zeros((dim_z, dim_z)) # system uncertainty
        self.SI = np.zeros((dim_z, dim_z)) # inverse system uncertainty

        # identity matrix. Do not alter this.
        self._I = np.eye(dim_x)

        # these will always be a copy of x,P after predict() is called
        self.x_prior = self.x.copy()
        self.P_prior = self.P.copy()

        # these will always be a copy of x,P after update() is called
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()

        # Only computed only if requested via property
        self._log_likelihood = log(sys.float_info.min)
        self._likelihood = sys.float_info.min
        self._mahalanobis = None

        self.inv = np.linalg.inv


    def predict(self, u=None, B=None, F=None, Q=None):

        if B is None:
            B = self.B
        if F is None:
            F = self.F
        if Q is None:
            Q = self.Q
        elif isscalar(Q):
            Q = eye(self.dim_x) * Q


        # x = Fx + Bu
        if B is not None and u is not None:
            self.x = dot(F, self.x) + dot(B, u)
        else:
            self.x = dot(F, self.x)

        # P = FPF' + Q
        self.P = self._alpha_sq * dot(dot(F, self.P), F.T) + Q

        # save prior
        self.x_prior = self.x.copy()
        self.P_prior = self.P.copy()


    def update(self, z, R=None, H=None):
        # set to None to force recompute
        self._log_likelihood = None
        self._likelihood = None
        self._mahalanobis = None

        if z is None:
            self.z = np.array([[None]*self.dim_z]).T
            self.x_post = self.x.copy()
            self.P_post = self.P.copy()
            self.y = zeros((self.dim_z, 1))
            return

        if R is None:
            R = self.R
        elif isscalar(R):
            R = eye(self.dim_z) * R

        if H is None:
            z = reshape_z(z, self.dim_z, self.x.ndim)
            H = self.H

        # y = z - Hx
        # error (residual) between measurement and prediction
        self.y = z - dot(H, self.x)

        # common subexpression for speed
        PHT = dot(self.P, H.T)

        # S = HPH' + R
        # project system uncertainty into measurement space
        self.S = dot(H, PHT) + R
        self.SI = self.inv(self.S)
        # K = PH'inv(S)
        # map system uncertainty into kalman gain
        self.K = dot(PHT, self.SI)

        # x = x + Ky
        # predict new x with residual scaled by the kalman gain
        self.x = self.x + dot(self.K, self.y)

        # P = (I-KH)P(I-KH)' + KRK'
        # This is more numerically stable
        # and works for non-optimal K vs the equation
        # P = (I-KH)P usually seen in the literature.

        I_KH = self._I - dot(self.K, H)
        self.P = dot(dot(I_KH, self.P), I_KH.T) + dot(dot(self.K, R), self.K.T)

        # save measurement and posterior state
        self.z = deepcopy(z)
        self.x_post = self.x.copy()
        self.P_post = self.P.copy()

In [None]:
import cv2
import numpy as np
import math
from kalmanfilter import KalmanFilter
#

#FOR　ONE BALL

#
# Load Yolo
net = cv2.dnn.readNet("yolov3.weights", "yolov3.cfg")
classes = []
with open("coco.names", "r") as f:
    classes = [line.strip() for line in f.readlines()]
layer_names = net.getLayerNames()
output_layers = []
for i in net.getUnconnectedOutLayers():
    output_layers.append(layer_names[[i][0] - 1])
colors = np.random.uniform(0, 255, size=(len(classes), 3))

kf = KalmanFilter()
# Loading video
cap = cv2.VideoCapture("ball.mp4")



bb= []
frames = []
aa = 0

center_points_prev_frame = []

tracking_objects = {}
#tracking_objects_kalman = {}
track_id = 1
output_video = cv2.VideoWriter('One_Ball_Kalman.avi',cv2.VideoWriter_fourcc('M','J','P','G'), 10, (960,540))

while(cap.isOpened()):
    aa +=1
    ret, frame = cap.read()
    if ret is False:
        break
    #img = cv2.imread("ball.jpg")
    frame = cv2.resize(frame, None, fx=1, fy=1)
    frames.append(frame)
    height, width, channels = frame.shape
    # Detecting objects
    blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False)

    net.setInput(blob)
    outs = net.forward(output_layers)

    # Showing informations on the screen
    class_ids = []
    confidences = []
    boxes = []
    center_points_cur_frame = []
    for out in outs:
        for detection in out:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]
            if confidence > 0.5:
                # Object detected
                center_x = int(detection[0] * width)
                center_y = int(detection[1] * height)
                w = int(detection[2] * width)
                h = int(detection[3] * height)

                # Rectangle coordinates
                x = int(center_x - w / 2)
                y = int(center_y - h / 2)
                #if only sports ball. then add to class_ids for pointing out
                if class_id == 32:
                    class_ids.append(class_id)
                    boxes.append([x, y, w, h])
                    bb.append([x, y, w, h])
                    confidences.append(float(confidence))
    indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4) # leave only 1 detection box here
    font = cv2.FONT_HERSHEY_PLAIN
    xx = 0
    for i in range(len(boxes)):
        
        if i in indexes:
            xx += 1
            x, y, w, h = boxes[i]
            cx = int(x + w / 2)
            cy = int(y + h / 2)
            px, py = kf.predict(cx, cy)
            center_points_cur_frame.append((cx, cy))
            label = 'Ball'
            cv2.rectangle(frame, (x, y), (x + w, y + h), (0,255,0), 2)
            cv2.circle(frame, (cx, cy), 3, (0, 0, 255), 4) #center of the detection box
            cv2.putText(frame, label, (x, y), font, 3, (0,0,255), 3)

    if aa <= 2:
#load 2 frames first for the below else statements can start to compare frames 
        for pt in center_points_cur_frame:
            #print(pt)
            for pt2 in center_points_prev_frame:
                distance = math.hypot(pt2[0] - pt[0], pt2[1] - pt[1])
                if distance < 20:
                    kalman_est = kf.predict(pt[0], pt[1])
                    arr1 = np.array([pt[0], pt[1]])
                    arr2 = np.array([kalman_est[0], kalman_est[1]])
                    gfg = np.concatenate((arr1, arr2), axis = 0)
                    #print('gfg is ',gfg)
                    tracking_objects[track_id] = gfg
                    track_id += 1
                    #print(pt)
        
 
    else:
#by comparing the distance of centers between current frame and previous frame to determine where it's the same object or not

        tracking_objects_copy = tracking_objects.copy()
        center_points_cur_frame_copy = center_points_cur_frame.copy()

        for object_id, pt2 in tracking_objects_copy.items():
            object_exists = False
            for pt in center_points_cur_frame_copy:
                distance = math.hypot(pt2[0] - pt[0], pt2[1] - pt[1])
                # Update IDs position
                if distance < 50:
                    kalman_est = kf.predict(pt[0], pt[1])
                    arr1 = np.array([pt[0], pt[1]])
                    arr2 = np.array([kalman_est[0], kalman_est[1]]) #kalman filter here and added the predicted location into dict
                    gfg = np.concatenate((arr1, arr2), axis = 0)
                    tracking_objects[object_id] = gfg
                    #tracking_objects[object_id] : centerx, centery, predict_centerx, predict_centery
                    object_exists = True
                    if pt in center_points_cur_frame:
                        center_points_cur_frame.remove(pt)
                    continue

            # Remove IDs lost
            if not object_exists:
                tracking_objects.pop(object_id)

        # Add new IDs found
        for pt in center_points_cur_frame:
            #reset new detect object
            arr1 = np.array([pt[0], pt[1]])
            arr2 = np.array([pt[0], pt[1]])
            gfg = np.concatenate((arr1, arr2), axis = 0)
            tracking_objects[track_id] = gfg
            track_id += 1
    for object_id, pt in tracking_objects.items():
        cv2.putText(frame, str(object_id), (pt[0], pt[1]), 0, 1, (0, 0, 255), 2)
        cv2.circle(frame, (int(pt[2]), int(pt[3])), 2, (255, 0, 0), 4)
        

    
    
    legend = "RED dot is Center BLUE dot is Kalman Predict"
    cv2.putText(frame, legend, (10, 20), 0, 1,(0,255,255), 2)
    cv2.imshow("Image", frame)
    output_video.write(frame)
    center_points_prev_frame = center_points_cur_frame.copy()
    key = cv2.waitKey(20)
    if key == 27:
            break
cv2.destroyAllWindows()