<a target="_blank" href="https://colab.research.google.com/github/Jasonchen0317/CSGY-6613-Assignment/blob/main/assignment-3/detect_with_Kalman.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

# Task 2: Kalman Filter 

This task aim to implement a Kalman filter that will track the drone in the video.

### Approach

Most part of task 2 is based on detect.py from task 1, since the output of the detection model will be used as the measurement for the kalman filter. 

#### Kalman filter instance variable

1. Filter state estimate(x) is composed by coordinates of the center of the bounding box (cx,cy), size of the box (width, height) and the change of each of these parameters, velocities.

2. Covariance matrix(P) is set to I*10 (np.eye(8) * 10.).

3. Process uncertainty/noise(Q) is set to I*0.01 (np.eye(8) * 0.01).

4. Measurement uncertainty/noise(R) is set 1 for center point, 10 for width and height.

#### Steps

First, the bounding box is found by the Faster-RCNN model, then is converted into the format of the measurement for the filter. The filter first predicts and then uses the measurement to update the resulting bounding box. 

For small amount(<20) of consecutive frames that has no detection, I've used the kalman filter to predict the bounding boxes, since it maybe false negative(miss detection). If the amount of consecutive frames that has no detection is over the threshold, the saved detected frames will be output as videos. 

#### Results

The resulting videos are saved in the 'kalman_filter_vid' folder.

In [1]:
import numpy as np
import cv2
import torch
import glob as glob
import os
import time
from model import create_model
from config import (
    NUM_CLASSES, DEVICE, CLASSES
)
from filterpy.kalman import KalmanFilter
from filterpy.common import Q_discrete_white_noise
from scipy.linalg import block_diag

In [2]:
#Function for creating the kalman filter
def tracker():
    #Dimension of state is 8, and dim for measurement is 4.
    tracker = KalmanFilter(dim_x=8, dim_z=4)
    dt = 1.0   # time step
    #x is composed by coordinates of the center of the bounding box (cx,cy), size of the box (width, height) and the change of each of these parameters, velocities.
    #State Transition: x1=x0+dt, y1=y0+dt
    tracker.F = np.array([[1, 0, 0, 0, dt, 0, 0, 0],
                          [0, 1, 0, 0, 0, dt, 0, 0],
                          [0, 0, 1, 0, 0, 0, dt, 0],
                          [0, 0, 0, 1, 0, 0, 0, dt],
                          [0, 0, 0, 0, 1, 0, 0, 0],
                          [0, 0, 0, 0, 0, 1, 0, 0],
                          [0, 0, 0, 0, 0, 0, 1, 0],
                          [0, 0, 0, 0, 0, 0, 0, 1]])
    tracker.u = 0.
    tracker.H = np.array([[1, 0, 0, 0, 0, 0, 0, 0],
                          [0, 1, 0, 0, 0, 0, 0, 0],
                          [0, 0, 1, 0, 0, 0, 0, 0],
                          [0, 0, 0, 1, 0, 0, 0, 0]])

    tracker.R = np.array([[1, 0, 0, 0],
                          [0, 1, 0, 0],
                          [0, 0, 10, 0],
                          [0, 0, 0, 10]])
    q = Q_discrete_white_noise(dim=2, dt=dt, var=0.05)
    tracker.Q = np.eye(8) * 0.01
    tracker.x = np.array([[0, 0, 0, 0, 0, 0, 0, 0]]).T
    tracker.P = np.eye(8) * 10.
    return tracker

In [3]:
#Function for converting bounding boxes to measurement format for kalman filter
#bbox:[x1, y1, x2, y2] ----> measurement(z): [center x, center y, width, height]
def bboxes_to_z(bbox):
    w = bbox[2]-bbox[0]
    h = bbox[3] - bbox[1]
    cx = (bbox[2]+bbox[0])/2
    cy = (bbox[3]+bbox[1])/2
    return np.array([cx, cy, w, h]).reshape((4, 1))

In [4]:
#Function for converting result(x) to bounding box format and add center point
#result(x): [center x, center y, width, height] ----> bbox: [x1, y1, x2, y2, center x, center y]
def x_to_bbox(x):
    x1 = x[0]-x[2]/2
    x2 = x[0]+x[2]/2
    y1 = x[1]-x[3]/2
    y2 = x[1]+x[3]/2
    #return [topleft x, topleft y, bottomright x, bottomright y, center x, center y]
    return np.array([x1, y1, x2, y2, x[0], x[1]]).reshape((6, 1))

In [8]:
#Function for converting saved frames to videos
def outputvideo(img_arr, num):
    img = img_arr[0]
    height, width, layers = img.shape
    size = (width,height)
            
    out = cv2.VideoWriter('C:/Users/jason/CSGY-6613-Assignment/assignment-3/kalman_filter_vid/detect_video_%d.mp4' % num,cv2.VideoWriter_fourcc(*'mp4v'), 15, size)
    
            
    for i in range(len(img_arr)):
         out.write(img_arr[i])
    out.release()

In [None]:
if not os.path.exists('kalman_filter_vid'):
   os.makedirs('kalman_filter_vid')

In [None]:
color = (0, 255, 0)
# load the best model and trained weights
model = create_model(num_classes=NUM_CLASSES)
checkpoint = torch.load('outputs/best_model.pth', map_location=DEVICE)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(DEVICE).eval()

# directory where all the images are present
DIR_TEST = 'dataset/test'
test_images = glob.glob(f"{DIR_TEST}/*.jpg")
print(f"Test instances: {len(test_images)}")

#Threshold for bounding boxes, if lower then threshold, the bbox isn't valid
detection_threshold=0.5
#The Kalman filter for tracking
trk = tracker()
#List for saving the centers of bounding boxes, for plotting trajectories
centers=[]
#List for saving frames that detected drones
img_arr=[]
#Counter for consecutive frames without detection
miss_count=0
#Counter for currently saved videos
video_num=0

for i in range(len(test_images)):
    # get the image file name for saving output later on
    image_name = test_images[i].split(os.path.sep)[-1].split('.')[0]
    image = cv2.imread(test_images[i])
    orig_image = image.copy()
    # BGR to RGB
    image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB).astype(np.float32)
    # make the pixel range between 0 and 1
    image /= 255.0
    # bring color channels to front
    image = np.transpose(image, (2, 0, 1)).astype(np.float32)
    # convert to tensor
    image = torch.tensor(image, dtype=torch.float).cuda()
    # add batch dimension
    image = torch.unsqueeze(image, 0)
    start_time = time.time()
    with torch.no_grad():
        outputs = model(image.to(DEVICE))
    end_time = time.time()
    
    # load all detection to CPU for further operations
    outputs = [{k: v.to('cpu') for k, v in t.items()} for t in outputs]
    
    # carry further only if there are detected boxes
    if len(outputs[0]['boxes']) != 0:
        boxes = outputs[0]['boxes'].data.numpy()
        scores = outputs[0]['scores'].data.numpy()
        # get the bbox with the highest confidence
        max_score = np.argmax(scores)
        
        #Check if the confidence is higher then the threshold
        #If higher, proceed to apply Kalman filter
        if(scores[max_score]>detection_threshold):
            miss_count=0
            trk.predict()
            trk.update(bboxes_to_z(boxes[max_score]))
            box = x_to_bbox(trk.x).astype(np.int32)
            centers.append((int(box[4]), int(box[5])))
        #Lower and less then 20 consecutive frames without detection, predict the bounding box by previous detections
        elif miss_count<20:
            miss_count = miss_count+1
            trk.predict()
            box = x_to_bbox(trk.x).astype(np.int32)
            centers.append((int(box[4]), int(box[5])))
        
        #If more 20 consecutive frames without detection, output the saved frames to video
        else:
            #If detected more than 50 detected frames, convert and output as video
            if len(img_arr)>50:
                outputvideo(img_arr, video_num)
                video_num=video_num+1
            #Clear frame list and center list    
            img_arr=[]
            centers=[]
            continue
            
        # draw the trajectory, bounding boxes, and class name
        class_name = 'drone'
        if(len(centers)>=2):
            for i in range(len(centers)-1):
                cv2.line(orig_image,centers[i],centers[i+1],(255,0,0),3)
                
        cv2.rectangle(orig_image,
                        (int(box[0]), int(box[1])),
                        (int(box[2]), int(box[3])),
                        color, 2)
        cv2.putText(orig_image, class_name, 
                        (int(box[0]), int(box[1]-5)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 
                        2, lineType=cv2.LINE_AA)
        cv2.imshow('Prediction', orig_image)
        cv2.waitKey(1)
        #Keep frame in list
        img_arr.append(orig_image)
    
    #If less then 20 consecutive frames without detection, predict the bounding box by previous detections 
    elif miss_count<20:
        miss_count = miss_count+1
        trk.predict()
        box = x_to_bbox(trk.x).astype(np.int32)
        
        # draw the bounding boxes and write the class name on top of it
        class_name = 'drone'
        centers.append((int(box[4]), int(box[5])))
        
        if(len(centers)>=2):
            for i in range(len(centers)-1):
                cv2.line(orig_image,centers[i],centers[i+1],(255,0,0),3)
                
        cv2.rectangle(orig_image,
                        (int(box[0]), int(box[1])),
                        (int(box[2]), int(box[3])),
                        color, 2)
        cv2.putText(orig_image, class_name, 
                        (int(box[0]), int(box[1]-5)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 
                        2, lineType=cv2.LINE_AA)
        cv2.imshow('Prediction', orig_image)
        cv2.waitKey(1)
        img_arr.append(orig_image)
    
    #If more 20 consecutive frames without detection, output the saved frames to video
    else:
        if len(img_arr)>50:
            outputvideo(img_arr, video_num)
            video_num=video_num+1
                
        img_arr=[]
        centers=[]
        
print('TEST PREDICTIONS COMPLETE')
cv2.destroyAllWindows()