# Single Person Tracking with Occlusion using Kalman Filter

Please run each cell in the notebook in order.

## Install dependency

In [None]:
!pip install ultralytics
!pip install filterpy

## Detector

The weight can be download from https://docs.ultralytics.com/models/yolov8/#key-features

In [None]:
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
from ultralytics import YOLO 


model = YOLO("yolov8n.pt")
results = model.predict("sample2.mp4", show=True, save=True, classes=0, conf=0.85)

In [None]:
PERSON_MEASUREMENT = []
for r in results:
    box = r.boxes.cpu().numpy().xywh
    if(len(box) == 0):
        PERSON_MEASUREMENT.append("Not detected")
    else:
        xywh = list(box[0])
        PERSON_MEASUREMENT.append(xywh)
        
TIMESTAMPS = [t*1 for t in range(len(PERSON_MEASUREMENT))]

## Kalman Filter

In [None]:
from filterpy.kalman import KalmanFilter
from scipy.linalg import block_diag
from filterpy.common import Q_discrete_white_noise
import numpy as np
import matplotlib.pyplot as plt

def ConstanVelocityKF(R_std, Q_std, dt):
    kf = KalmanFilter(dim_x=4, dim_z=2)
    kf.P = np.eye(4)*500
    kf.R = np.eye(2) * R_std**2
    q = Q_discrete_white_noise(dim=2, dt=dt, var=Q_std**2)
    kf.Q = block_diag(q, q)
    kf.F = np.array([[1, dt, 0,  0],
                    [0,  1, 0,  0],
                    [0,  0, 1, dt],
                    [0,  0, 0,  1]])

    kf.H = np.array([[1, 0, 0, 0],
                    [0, 0, 1, 0,]])
    return kf

kf = ConstanVelocityKF(R_std=5, Q_std=0.01, dt=sum(np.diff(TIMESTAMPS))/len(TIMESTAMPS))
kf.x = np.array([640, 0, 360, 0])

counter = 0
def main_pipeline(image):
    global kf
    global counter
    meas = PERSON_MEASUREMENT[counter]
    
    
    if meas == "Not detected":
        kf.predict()
        cv2.circle(image, (int(kf.x_prior[0]), int(kf.x_prior[2])), 15, (0,255,0), -1)
    
    else:
        z = np.array((meas[0], meas[1]))
        kf.update(z)
        kf.predict()
        cv2.circle(image, (int(kf.x_prior[0]), int(kf.x_prior[2])), 15, (0,255,0), -1)   
        
    counter+=1    
    return image

## Generate output

Make sure you copy the "sample2.avi" generated by yolo to the current directory.

In [None]:
import cv2

cam = cv2.VideoCapture("sample2.avi")
set_of_images = []
while(True):
    ret,frame = cam.read()
    if ret:
        set_of_images.append(frame)
    else:
        break

cam.release()
cv2.destroyAllWindows()

out_video = []

for img in set_of_images:
    out_video.append(main_pipeline(img))
    
out = cv2.VideoWriter("track.mp4",cv2.VideoWriter_fourcc(*'MP4V'), 30, (1280,720))

for i in range(len(out_video)):
    out.write(out_video[i])
out.release()

# Constant Velocity vs Constant Acceleration Model

In [None]:
from filterpy.kalman import KalmanFilter
from scipy.linalg import block_diag
from filterpy.common import Q_discrete_white_noise
import numpy as np
import matplotlib.pyplot as plt

def ConstanVelocityKF(R_std, Q_std, dt):
    kf = KalmanFilter(dim_x=4, dim_z=2)
    kf.P = np.eye(4)*500
    kf.R = np.eye(2) * R_std**2
    q = Q_discrete_white_noise(dim=2, dt=dt, var=Q_std**2)
    kf.Q = block_diag(q, q)
    kf.F = np.array([[1, dt, 0,  0],
                    [0,  1, 0,  0],
                    [0,  0, 1, dt],
                    [0,  0, 0,  1]])

    kf.H = np.array([[1, 0, 0, 0],
                    [0, 0, 1, 0,]])
    return kf

def ConstanAccelerationKF(R_std, Q_std, dt):
    kf = KalmanFilter(dim_x=6, dim_z=2)
    kf.P = np.eye(6)*500
    kf.R = np.eye(2) * R_std**2
    q = Q_discrete_white_noise(dim=3, dt=dt, var=Q_std**2)
    kf.Q = block_diag(q, q)
    kf.F = np.array([[1, dt, 0.5*dt*dt, 0, 0,  0],
                    [0,  1, dt,0,0 , 0],
                    [0,  0, 1, 0,0,0],
                    [0,  0, 0,  1, dt, 0.5*dt*dt],
                [0,  0, 0, 0, 1, dt],
                [0,  0, 0 ,0,0,1]])

    kf.H = np.array([[1, 0, 0, 0,0,0],
                    [0, 0, 0, 1,0,0]])
    return kf


kf = ConstanVelocityKF(R_std=5, Q_std=0.01, dt=sum(np.diff(TIMESTAMPS))/len(TIMESTAMPS))
kf.x = np.array([640, 0, 360, 0])

kf_acc = ConstanAccelerationKF(R_std=5, Q_std=0.01, dt=sum(np.diff(TIMESTAMPS))/len(TIMESTAMPS))
kf_acc.x = np.array([640, 0, 0, 360, 0, 0])

counter = 0

def main_pipeline(image):
    global kf
    global kf_acc
    global counter
    meas = PERSON_MEASUREMENT[counter]
    
    
    if meas == "Not detected":
        kf.predict()
        cv2.circle(image, (int(kf.x_prior[0]), int(kf.x_prior[2])), 15, (0,255,0), -1)
        
        kf_acc.predict()
        cv2.circle(image, (int(kf_acc.x_prior[0]), int(kf_acc.x_prior[3])), 15, (0,0,255), -1)
    
    else:
        z = np.array((meas[0], meas[1]))
        kf.update(z)
        kf.predict()
        cv2.circle(image, (int(kf.x_prior[0]), int(kf.x_prior[2])), 15, (0,255,0), -1)
        
        z = np.array((meas[0], meas[1]))
        kf_acc.update(z)
        kf_acc.predict()
        cv2.circle(image, (int(kf_acc.x_prior[0]), int(kf_acc.x_prior[3])), 15, (0,0,255), -1)
        
    counter+=1    
    return image

## Generate output

In [None]:
cam = cv2.VideoCapture("sample2.avi")

set_of_images = []
while(True):
    ret,frame = cam.read()
    if ret:
        set_of_images.append(frame)
    else:
        break

cam.release()
cv2.destroyAllWindows()

out_video = []

for img in set_of_images:
    out_video.append(main_pipeline(img))
    
out = cv2.VideoWriter("track_compare.mp4",cv2.VideoWriter_fourcc(*'MP4V'), 30, (1280,720))

for i in range(len(out_video)):
    out.write(out_video[i])
out.release()