**Name: Faiz Andrea Ganz**

**NetID: fag277**

## Introduction

For the following project we are going to build an object tracker with the help of an image detector (neural network) and the use of a 2D Kalman filter. The tracker is going to be used on two separate videos. In the first video, a ball rolls from the right to the left of the screen and passes behind a white sheet of paper. In the second video there are two balls which start from opposite ends of the screen and cross each other in the middle. Hence at one point, one of the balls is occluded by the other.

The object detector only help us find the instances of the objects when they appear in the image. Hence, to help us track the balls while occluded, we need to use probabilistic reasoning, this being a recursive state estimation problem, where our variables of interest are motion quantities in the object (position, velocity, and acceleration). The Kalman filter proves to be a good solution for this problem. We build a Kalman filter for each object we want to track and update it with the detected position. Whenever the object is occluded or simply not detected, the Kalman filter will instead provide the trajectory of object, by using the next state prediction calculated on the last detected point. The next state prediciton is then used to update the Kalman filter, since there is no detected input.

Because there are two balls in the second video, we need to keep track of the two instances separately, as the object detector does not tell us which instance is which. To do so, we update the centroids of each instance by selecting the one with the shortest euclidian distance to the predicted coordinates for this time step.

Disclaimer: The implementation uses some simplifies functionality. The trackers do initialize objects dynamically and for the second video the kalman filters and position of the two object were declared before hand for simplicity. Links to referenced sources are included at the beginning of cells.

The result frames for the videos are contained in the results folder.

## Setup

#### Mount Drive and Change Directory

In [1]:
import sys
from google.colab import drive

drive.mount('/content/drive')

MessageError: ignored

In [None]:
%cd /content/drive/MyDrive/ai/hw3/

In [None]:
!ls

#### Installing Packages and ImageAI

For the following project, we will use the ImageAI library and import a Retina (ResNet50 backbone) based object detection model to detect the instances of the objects in each frame.

In [None]:
!pip install tensorflow-gpu==2.4.0
!pip3 install scipy
!pip3 install opencv-python
!pip3 install pillow
!pip3 install h5py
!pip3 install keras

In [None]:
!pip install imageai --upgrade

#### Load Detector Model form ImageAI

For the model we chose the pretrained weights obtained from training over the COCO dataset, which contains an object class "sports ball".

In [None]:
from imageai.Detection import ObjectDetection

detector = ObjectDetection()
detector.setModelTypeAsRetinaNet()
detector.setModelPath("resnet50_coco_best_v2.1.0.h5")
detector.loadModel()

#### Reading Frames

In [None]:
import cv2
from matplotlib import pyplot as plt
import numpy as np
import os

Using the open-cv library we read the video frames and save them as images. For sake of simplicity and debugging, we save resulting images at intermediate steps and load them again for the next step, although everything can be done in one single loop.

In [None]:
def FrameCapture(path, folder): # https://www.geeksforgeeks.org/python-program-extract-frames-using-opencv/
    vidObj = cv2.VideoCapture(path)
    count = 0
    success = 1
    while success:
        success, image = vidObj.read()
        if success:
            cv2.imwrite(folder + "frame%d.jpg" % count, image)
            count += 1
    return count

#### Kalman Filter 2D Class

In [None]:
class KalmanFilter(object): # https://machinelearningspace.com/2d-object-tracking-using-kalman-filter/
    def __init__(self, dt, u_x,u_y, std_acc, x_std_meas, y_std_meas):
        """
        :param dt: sampling time (time for 1 cycle)
        :param u_x: acceleration in x-direction
        :param u_y: acceleration in y-direction
        :param std_acc: process noise magnitude
        :param x_std_meas: standard deviation of the measurement in x-direction
        :param y_std_meas: standard deviation of the measurement in y-direction
        """
        # Define sampling time
        self.dt = dt
        # Define the  control input variables
        self.u = np.matrix([[u_x],[u_y]])
        # Intial State
        self.x = np.matrix([[0], [0], [0], [0]])
        # Define the State Transition Matrix A
        self.A = np.matrix([[1, 0, self.dt, 0],
                            [0, 1, 0, self.dt],
                            [0, 0, 1, 0],
                            [0, 0, 0, 1]])
        # Define the Control Input Matrix B
        self.B = np.matrix([[(self.dt**2)/2, 0],
                            [0, (self.dt**2)/2],
                            [self.dt,0],
                            [0,self.dt]])
        # Define Measurement Mapping Matrix
        self.H = np.matrix([[1, 0, 0, 0],
                            [0, 1, 0, 0]])
        #Initial Process Noise Covariance
        self.Q = np.matrix([[(self.dt**4)/4, 0, (self.dt**3)/2, 0],
                            [0, (self.dt**4)/4, 0, (self.dt**3)/2],
                            [(self.dt**3)/2, 0, self.dt**2, 0],
                            [0, (self.dt**3)/2, 0, self.dt**2]]) * std_acc**2
        #Initial Measurement Noise Covariance
        self.R = np.matrix([[x_std_meas**2,0],
                           [0, y_std_meas**2]])
        #Initial Covariance Matrix
        self.P = np.eye(self.A.shape[1])

    def predict(self):
        # Refer to :Eq.(9) and Eq.(10)  in https://machinelearningspace.com/object-tracking-simple-implementation-of-kalman-filter-in-python/?preview_id=1364&preview_nonce=52f6f1262e&preview=true&_thumbnail_id=1795
        # Update time state
        #x_k =Ax_(k-1) + Bu_(k-1)     Eq.(9)
        self.x = np.dot(self.A, self.x) + np.dot(self.B, self.u)
        # Calculate error covariance
        # P= A*P*A' + Q               Eq.(10)
        self.P = np.dot(np.dot(self.A, self.P), self.A.T) + self.Q
        return self.x[0:2]

    def update(self, z):
        # Refer to :Eq.(11), Eq.(12) and Eq.(13)  in https://machinelearningspace.com/object-tracking-simple-implementation-of-kalman-filter-in-python/?preview_id=1364&preview_nonce=52f6f1262e&preview=true&_thumbnail_id=1795
        # S = H*P*H'+R
        S = np.dot(self.H, np.dot(self.P, self.H.T)) + self.R
        # Calculate the Kalman Gain
        # K = P * H'* inv(H*P*H'+R)
        K = np.dot(np.dot(self.P, self.H.T), np.linalg.inv(S))  #Eq.(11)
        self.x = np.round(self.x + np.dot(K, (z - np.dot(self.H, self.x))))   #Eq.(12)
        I = np.eye(self.H.shape[1])
        # Update error covariance matrix
        self.P = (I - (K * self.H)) * self.P   #Eq.(13)
        return self.x[0:2]

## I. Single Object with Occlusion by another Object

### I.a Detect Object in Frames

In [None]:
num_frames = FrameCapture("ball.mp4", "./data_single/")

In [None]:
directory = './data_single/'
detections = []

for i in range(num_frames):
    f = os.path.join(directory, 'frame%d.jpg' % i)
    if os.path.isfile(f):
        detection = detector.detectObjectsFromImage(
            input_image=f, 
            output_image_path="./detections_single/out_frame%d.jpg" % i, 
            minimum_percentage_probability=10
            )
        detections += [detection]

### I.b Track through Occlusion Using Probabilistic Reasoning

#### Tracking

In [None]:
probabilities = []
x_trajectory = []
y_trajectory = []
x_predictions = []
y_predictions = []

KF = KalmanFilter(1/30, 0, 0, 0.1, 0.001, 0.001)

color = (255, 0, 0)
thickness = 2
radius = 3

for i, detection in enumerate(detections):
    f = os.path.join(directory, 'frame%d.jpg' % i)
    out_img = cv2.imread(f)
    found = False

    for obj in detection:
        if obj['name'] == 'sports ball' and not found:
            x_min, y_min, x_max, y_max = obj['box_points']
            x_center = (x_min + x_max) / 2
            y_center = (y_min + y_max) / 2
            x_trajectory += [x_center]
            y_trajectory += [y_center]
            probabilities += [obj['percentage_probability']]
            center = np.array([[x_center], [y_center]])
            (x, y) = KF.predict()
            (x1, y1) = KF.update(center)
            x_predictions.append(x.item())
            y_predictions.append(y.item())
            out_img = cv2.rectangle(out_img, (x_min, y_min), (x_max, y_max), color, thickness)
            found = True

    if not found:
        x_trajectory += [-1]
        y_trajectory += [-1]
        probabilities += [0]
        (x, y) = KF.predict()
        center = np.array([[int(x)], [int(y)]])
        (x1, y1) = KF.update(center)
        x_predictions.append(x.item())
        y_predictions.append(y.item())
        out_img = cv2.rectangle(out_img,
                                    (int(x_predictions[-1] - 48), int(y_predictions[-1] - 48)),
                                    (int(x_predictions[-1] + 48), int(y_predictions[-1] + 48)),
                                    (255, 51, 255),
                                    thickness)

    for j in range(len(x_trajectory)):
        if x_trajectory[j] == -1:
            out_img = cv2.circle(out_img, (int(x_predictions[j]), int(y_predictions[j])), radius, (255, 51, 255), thickness)
        else:
            out_img = cv2.circle(out_img, (int(x_predictions[j]), int(y_predictions[j])), radius, (255, 51, 255), thickness)
            out_img = cv2.circle(out_img, (int(x_trajectory[j]), int(y_trajectory[j])), radius, (255, 0, 0), thickness)
    
    cv2.imwrite("./results_single/res_frame%d.jpg" % i, out_img)

In [None]:
fig, axs = plt.subplots(1,3, figsize=(25,5))

axs[0].plot(range(len(probabilities)), probabilities)
axs[0].set_xlabel('frame')
axs[0].set_ylabel('confidence of detection')
axs[0].set_title('Detection Confidence')

axs[1].plot(range(len(x_trajectory)), x_trajectory, label='detected')
axs[1].plot(range(len(x_predictions)), x_predictions, label='Kalman')
axs[1].set_xlabel('frame')
axs[1].set_ylabel('x-position')
axs[1].set_title('Displacement (x)')
axs[1].legend()

axs[2].plot(range(len(y_trajectory)), y_trajectory, label='detected')
axs[2].plot(range(len(y_predictions)), y_predictions, label='Kalman')
axs[2].set_xlabel('frame')
axs[2].set_ylabel('y-position')
axs[2].set_title('Displacement (y)')
axs[2].legend()

plt.show()

#### Save Video

In [None]:
# https://stackoverflow.com/questions/57377185/how-play-mp4-video-in-google-colab
frame = cv2.imread("./results_single/res_frame0.jpg")
height, width, layers = frame.shape
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
video = cv2.VideoWriter('ball_tracking.mp4', fourcc, 15, (width,height))

for i in range(num_frames):
    img = cv2.imread("./results_single/res_frame%d.jpg" % i)
    video.write(img)
    
cv2.destroyAllWindows()
video.release()

## II. Occlusion by the Same Object

### II.a Detect Object in Frames

In [None]:
num_frames = FrameCapture("objectTracking_examples_multiObject.avi", "./data_double/")

In [None]:
directory = './data_double/'
detections = []

for i in range(num_frames):
    f = os.path.join(directory, 'frame%d.jpg' % i)
    if os.path.isfile(f):
        detection = detector.detectObjectsFromImage(
            input_image=f, 
            output_image_path="./detections_double/out_frame%d.jpg" % i, 
            minimum_percentage_probability=12
            )
        detections += [detection]

### II.b Track through Occlusion Using Probabilistic Reasoning

#### Object Tracker Class

In [None]:
class SportsBall:
    def __init__(self, x_hist, y_hist, p_hist, x_pred_hist, y_pred_hist):
        self.initialized = True
        self.x_trajectory = x_hist
        self.y_trajectory = y_hist
        self.probabilities = p_hist
        self.x_predictions = x_pred_hist
        self.y_predictions = y_pred_hist
    
    def update(self, x_center, y_center, prob, x_pred, y_pred):
        self.x_trajectory += [x_center]
        self.y_trajectory += [y_center]
        self.probabilities += [prob]
        self.x_predictions += [x_pred]
        self.y_predictions += [y_pred]
    
    def initialize(self):
        self.initialized = True

    def get_expected_coordinates(self):
        return self.x_predictions[-1], self.y_predictions[-1]

#### Tracking

In [None]:
# It would be very hard to build a tracker that initializes the objects and 
# kalman filters on the spot so I opted to start with assumed, initial values 
# for the two sports balls

filters = [KalmanFilter(1/30, 0, 0, 0.1, 0.001, 0.001),
           KalmanFilter(1/30, 0, 0, 0.1, 0.001, 0.001)]

filters[0].update(np.array([[622.5], [153.0]]))
filters[1].update(np.array([[6.5], [162.5]]))

(x1, y1) = filters[0].predict()
(x2, y2) = filters[1].predict()

objects = [SportsBall(x_hist=[622.5], y_hist=[153.0], p_hist=[],x_pred_hist=[x1], y_pred_hist=[y1]),
           SportsBall(x_hist=[6.5], y_hist=[162.5], p_hist=[], x_pred_hist=[x2], y_pred_hist=[y2])]

colors = [(255, 0, 0), (0, 0, 255)]
pred_colors = [(255, 128, 0), (0, 128, 255)]
thickness = 2
radius = 5
    
for i, detection in enumerate(detections):
    f = os.path.join(directory, 'frame%d.jpg' % i)
    out_img = cv2.imread(f)

    boxes = []
    probs = []
    for obj in detection:
        if obj['name'] == 'sports ball':
            x_min, y_min, x_max, y_max = obj['box_points']
            boxes += [obj['box_points']]
            probs += [obj['percentage_probability']]

    if len(boxes) == 2:
        for j, box in enumerate(boxes):
            distances = []
            x_min, y_min, x_max, y_max = box
            x_center = (x_min + x_max) / 2
            y_center = (y_min + y_max) / 2

            for ball in objects:
                (cx, cy) = ball.get_expected_coordinates()
                d = np.sqrt( (x_center - cx)**2 + (y_center - cy)**2)
                distances += [d]

            obj_idx = np.argmin(distances)
            (x1, y1) = filters[obj_idx].update([[x_center], [y_center]])
            (x, y) = filters[obj_idx].predict()
            objects[obj_idx].update(x_center, y_center, probs[j], x.item(), y.item())
            out_img = cv2.rectangle(out_img, (x_min, y_min), (x_max, y_max), colors[obj_idx], thickness)

    elif len(boxes) == 1:
        for j, box in enumerate(boxes):
            distances = []
            x_min, y_min, x_max, y_max = box
            x_center = (x_min + x_max) / 2
            y_center = (y_min + y_max) / 2

            for ball in objects:
                (cx, cy) = ball.get_expected_coordinates()
                d = np.sqrt( (x_center - cx)**2 + (y_center - cy)**2)
                distances += [d]
                
            obj_idx = np.argmin(distances)
            (x1, y1) = filters[obj_idx].update([[x_center], [y_center]])
            (x, y) = filters[obj_idx].predict()
            objects[obj_idx].update(x_center, y_center, probs[j], x.item(), y.item())

            missed_idx = 0 if obj_idx == 1 else 1
            (oldx, oldy) = objects[missed_idx].get_expected_coordinates()
            (x1, y1) = filters[missed_idx].update(np.array([[int(oldx)], [int(oldy)]]))
            (x, y) = filters[missed_idx].predict()
            objects[missed_idx].update(oldx, oldy, 0, x.item(), y.item())
            out_img = cv2.rectangle(out_img, (x_min, y_min), (x_max, y_max), colors[obj_idx], thickness)
            out_img = cv2.rectangle(out_img, (int(oldx) - 45, int(oldy) - 45), (int(oldx) + 45, int(oldy) + 45), pred_colors[missed_idx], thickness)

    elif len(boxes) == 0:
        for j, ball in enumerate(objects):
            (oldx, oldy) = objects[j].get_expected_coordinates()
            (x1, y1) = filters[j].update(np.array([[int(oldx)], [int(oldy)]]))
            (x, y) = filters[j].predict()
            ball.update(oldx, oldy, 0, x.item(), y.item())
            out_img = cv2.rectangle(out_img, (int(oldx) - 45, int(oldy) - 45), (int(oldx) + 45, int(oldy) + 45), colors[j], thickness)
    
    # # Plot Entire Trajectory
    # for j, ball in enumerate(objects):
    #     for k in range(len(ball.x_trajectory[1:])):
    #         if ball.probabilities[k] == 0:
    #             out_img = cv2.circle(out_img, (int(ball.x_predictions[k]), int(ball.y_predictions[k])), radius, pred_colors[j], thickness)
    #         else:
    #             out_img = cv2.circle(out_img, (int(ball.x_trajectory[k+1]), int(ball.y_trajectory[k+1])), radius, colors[j], thickness)
    #             out_img = cv2.circle(out_img, (int(ball.x_predictions[k]), int(ball.y_predictions[k])), radius, pred_colors[j], thickness)

    for j, ball in enumerate(objects):
        if ball.probabilities[-1] == 0:
            out_img = cv2.circle(out_img, (int(ball.x_predictions[-2]), int(ball.y_predictions[-2])), radius, pred_colors[j], thickness)
        else:
            out_img = cv2.circle(out_img, (int(ball.x_predictions[-2]), int(ball.y_predictions[-2])), radius, pred_colors[j], thickness)
            out_img = cv2.circle(out_img, (int(ball.x_trajectory[-1]), int(ball.y_trajectory[-1])), radius, colors[j], thickness)
            
    cv2.imwrite("./results_double/res_frame%d.jpg" % i, out_img)

In [None]:
fig, axs = plt.subplots(1,3, figsize=(25,5))

axs[0].plot(range(len(objects[0].probabilities)), objects[0].probabilities, label='ball 1')
axs[0].plot(range(len(objects[1].probabilities)), objects[1].probabilities, label='ball 2')
axs[0].set_xlabel('frame')
axs[0].set_ylabel('confidence of detection')
axs[0].set_title('Detection Confidence')
axs[0].legend()

axs[1].plot(range(len(objects[0].x_trajectory)), objects[0].x_trajectory, label='detected + Kalman')
axs[1].plot(range(len(objects[0].x_predictions)), objects[0].x_predictions, label='Kalman prediction')
axs[1].set_xlabel('frame')
axs[1].set_ylabel('x-position')
axs[1].set_title('Ball 1')
axs[1].legend()

axs[2].plot(range(len(objects[1].x_trajectory)), objects[1].x_trajectory, label='detected + Kalman')
axs[2].plot(range(len(objects[1].x_predictions)), objects[1].x_predictions, label='Kalman prediction')
axs[2].set_xlabel('frame')
axs[2].set_ylabel('x-position')
axs[2].set_title('Ball 2')
axs[2].legend()

plt.show()

#### Save Video

In [None]:
# https://stackoverflow.com/questions/57377185/how-play-mp4-video-in-google-colab
frame = cv2.imread("./results_double/res_frame0.jpg")
height, width, layers = frame.shape
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
video = cv2.VideoWriter('ball2_tracking.mp4', fourcc, 30, (width,height))

for i in range(num_frames):
    img = cv2.imread("./results_double/res_frame%d.jpg" % i)
    video.write(img)
    
cv2.destroyAllWindows()
video.release()