#[Multiple Object Tracking](https://pantelis.github.io/artificial-intelligence/aiml-common/assignments/object-tracking-kalman/_index.html)
Multi-Object Tracking (MOT) is a core visual ability that humans poses to perform kinetic tasks and coordinate other tasks. The AI community has recognized the importance of MOT via a series of competitions.

The ability to reason even in the absence of perception input task was highlighted in Lecture 1 using a document camera and a canopy type of occlusion where an object moves below it. In this assignment, the object class is ball and the ability to reason over time will be demonstrated using Kalman Filters. There will be two cases of occlusion: occlusion by a different object and occlusion by the same object (typical case of the later is on tracking people in crowds).

*Note: You can use OpenCV (import cv2) for only the satellite parts of this assignment - Use numpy, or better, jax to code the Kalman filter. You need to submit the assignment either as a notebook URL or a Github URL.*

##Task 1: Understand the problem and setup environment (20 points)
The problem is best described using this explanatory video below of the raw source files of this assignment:

[Single object tracking](https://github.com/sseshadr/auvsi-cv-all/blob/master/objectTracking/examples/ball.mp4)

[Multi-object tracking](https://github.com/sseshadr/auvsi-cv-all/blob/master/objectTracking/examples/multiObject.avi)

[Video](https://www.youtube.com/watch?v=0jAC9sMQQuM)

[The associated to the video github is here](https://github.com/sseshadr/auvsi-cv-all).

### Object detection Steps
1) Parse video into frames [1]

2) Detect object in each frame [2]

3) Add bounding box[3], [4] (or centroid) to each frame

4) Add the frame to list of frames

5) Convert the list of frames back to video

### Prediction
1) Use model to get bounding box from first frame in image

2) get centroid from bounding box
 - this is the initial position of the object

3) Add centroid to first frame and add frame to output video

4) Kalman filter for each frame + add centroid to frame

5) Add frame to list of frames

6) convert list of frames back to video

### Kalman Filter
[single object](https://machinelearningspace.com/object-tracking-python/)

[multi object](https://machinelearningspace.com/2d-object-tracking-using-kalman-filter/)
#### Starting State
We can use a detection model on the first frame to find the starting position of the object (sport ball). 

The model returns two coordinates $(x_1, y_1)$ and $(x_2, y_2)$ that describe a rectangular bounding box around around the ball. For the Kalman filter, we will need the centroid of the ball which is also the midpoint of the bounding box around the ball.

Since the bounding box is a rectangle, we can use the following midpoint formula to find the centroid of the object within the box:

$ (\frac{x_1 + x_2}{2}, \frac{y_1 + y_2}{2})$ 

#### 1 dimentional Kalman Filter with fixed velocity
Input
 - gaussian prior: $N(μ_t, σ_t)$
 - velocity: $N(μ_{velocity}, σ_{velocity})$
 - model (for getting measurments)

Output
 - guassian prediction $N(μ_{t+1}, σ_{t+1})$

Step 1: Prediction
 - $n_t = n_{t-1} + velocity * time$ (Newton's equation of motion)
 - for this use case n and velocity are both gaussians distributions
  - $N(μ_{t-1}, σ_{t-1}^2) + N(\mu_{velocity},σ_{velocity}^2)$
  - $ μ_t = μ_{t-1} + μ_{velocity} * time $
  - $ σ^2_t = σ_{t-1}^2 + σ_{velocity}^2 $
  - $\hat{x}_t = N(μ,σ^2_t)$

Step 2: Update (if the object is detected)
 - $x_z = N(μ_z, σ_z^2)$
  - this is the centroid derived from the object detection output
 - $\hat{x}_t = \hat{x}_t * x_z$
  - $μ_t = \frac{\hat{\sigma}^2μ_z + σ_z^2\hat{μ}}{\hat{σ}^2 + σ_z^2 }$
  - $σ_t = \frac{\hat{σ}^2σ_z^2}{\hat{σ}^2 + σ_z^2}$
  - $\hat{x}_t = N(μ_t, σ_t^2)$

### Multi Object Tracking
To track multiple objects you need to: 


1) correctly assign each measurement to a tracker
 - use the distance formula to figure out which measurement is closest to the pre-update prediction
  - use that measurement to update the prediction
 - if distance to the closest predection is greater than some threshold, don't assign the measurement to any tracker
 - note: any trackers that don't recieve a measurement for a given frame will skip the update step of the Kalman filter

2) for any unassigned measurememnts, initialize a new tracker object 

[1]: https://www.google.com/search?q=play+mp4+in+colab&rlz=1C5CHFA_enUS904US904&source=lnms&tbm=vid&sa=X&ved=2ahUKEwjd4fiIlKD7AhUoLFkFHa_aAWEQ_AUoAXoECAIQAw&biw=1332&bih=592&dpr=1#fpstate=ive&vld=cid:5e2ea0c6,vid:o3h6ptvCBYk
[2]: https://pytorch.org/vision/main/models/generated/torchvision.models.detection.fasterrcnn_resnet50_fpn.html
[3]: https://pytorch.org/vision/stable/generated/torchvision.utils.draw_bounding_boxes.html
[4]: https://pytorch.org/vision/main/auto_examples/plot_repurposing_annotations.html#sphx-glr-auto-examples-plot-repurposing-annotations-py

### Mount drive to access files
### Pip install imagio for output video

In [1]:
from google.colab import drive
import os

drive.mount('/content/drive')
os.chdir("drive/My Drive/NYU/Tandon_MS_in_CS/2022_Fall_Artificial_Intelligence/AI_Assignments/snb331_object_tracking_assignment")

!pip install imageio-ffmpeg

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### display_video
For showing the output video with the bounding boxes / centroids

In [2]:
import IPython
import imageio
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
import cv2

def display_video(video):
  fig = plt.figure(figsize=(8,4))
  
  mov = []
  for frame in video:
    img = plt.imshow(frame, animated=True)
    plt.axis('off')
    mov.append([img])
  
  anime = animation.ArtistAnimation(fig, mov, interval=50, repeat_delay=1000)
  plt.close()
  return anime


def output_video(video, filename, vidtype='mp4'):
  
  if vidtype == 'mp4':
    resolution = (960,540)
  elif vidtype == 'avi':
    v_size = video[0].shape
    resolution = (v_size[1], v_size[0])
  
  codec = cv2.VideoWriter_fourcc("F", "M", "P", "4")
  framerate = 20   
  video_out = cv2.VideoWriter(filename, codec, framerate, resolution)
  for frame in video:
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    video_out.write(frame)
  video_out.release()


In [3]:
# Play single ball video
video = imageio.mimread('ball.mp4')
test_out_1 = "ball_test.mp4"
output_video(video, test_out_1)
# HTML(display_video(video).to_html5_video())
test1 = imageio.mimread("ball_test.mp4")
HTML(display_video(test1).to_html5_video())

In [4]:
# play multi ball video
video2 = imageio.mimread('multiObject.avi')

test_out_2 = "multi_test.mp4"
output_video(video2, test_out_2, vidtype='avi')
test2 = imageio.mimread(test_out_2)
HTML(display_video(test2).to_html5_video())


##Task 2: Object Detector (40 points)
In this task you will use a CNN-based object detector to bound box all ball instances in each frame. Because the educational value is not object detection, you are allowed to use an object detector of your choice trained to distinguish the ball class. You are free to use a pre-trained model (eg on MS COCO that contains the class sports ball or train a model yourself. Ensure that you explain thoroughly the code.

### Object Detector

In [5]:
import cv2      #opencv
import torch
import torchvision
import numpy as np
from torchvision.utils import draw_bounding_boxes

def object_detector(capture, model):
  # STEP 0: Initialization          
  output_frames_list = []           # list of frames for ouput
  while True:
    # STEP 1: parse the video into frames
    ret, frame = capture.read()     # frame is numpy.ndarray (360, 480, 3)
    if ret is False:
      break

    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # color conversion
    orig_image = frame
    image = frame.astype(np.float32)/255.0
    image = np.transpose(image, [2,0,1])
    image = torch.tensor(image, dtype=torch.float)
    image = torch.unsqueeze(image, 0)
    
    # STEP 2: object detection model on each frame
    with torch.no_grad():                  # removes the gradient
      model_dict = model(image)[0]         # tensor
      boxes = model_dict['boxes']
      scores = model_dict['scores']
      labels = model_dict['labels']
      

    # STEP 3: Add the bounding box to each frame
    boxes = boxes[labels==37]              # sports ball is label 37
    
    for box in boxes:                      # draw all the sports ball detections
        cv2.rectangle(orig_image ,  (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (0,0,230), 2  )

  
    # STEP 4: add the images with the bounding box to the output list
    output_frames_list.append(orig_image)   # series of images for video
  return output_frames_list


In [6]:
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()                 # get model out of training mode

# Single object detection
cap1 = cv2.VideoCapture('ball.mp4')
output1 = object_detector(cap1, model)

# STEP 5: Convert this list to video / play video
detect_single_out = 'single_detect.mp4'
output_video(output1, detect_single_out)

HTML(display_video(output1).to_html5_video())

  f"The parameter '{pretrained_param}' is deprecated since 0.13 and will be removed in 0.15, "
Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth


  0%|          | 0.00/160M [00:00<?, ?B/s]

In [7]:
# Multi obect detection
cap2 = cv2.VideoCapture('multiObject.avi')
output2 = object_detector(cap2, model)

detect_multi_out = 'multi_detect.mp4'
output_video(output2, detect_multi_out,  vidtype='avi')

HTML(display_video(output2).to_html5_video())

##Task 3: Tracker (40 points)
The detector outputs can be used to obtain the centroid(s) of the ball instances across time. You can assign a suitable starting state in the 1st frame of the video and obtain the predicted trajectory of the object during both visible and occluded frames. You need to superpose your predicted position of the object in each frame and the raw frame and store a sequence of all frames (generate a video). Ensure that you explain thoroughly the code.

### Implementation

In [8]:
# First Draft -- see next cell for final draft
# from collections import namedtuple
# import torch
# import torchvision
# model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
# model.eval() 


# # Centroid params
# mtype = 4
# p_size = 10
# z_size = 20
# p_color = (0,0,230)         # blue for prediction
# z_color = (250, 103, 35)    # orange for measurement
# thickness = 4

# def get_bounding_boxes(frame):
#   with torch.no_grad():
#     model_dict = model(frame)[0]       # tensor
#     bounding_boxes = model_dict['boxes']
#     labels = model_dict['labels']
#     boxes = bounding_boxes[labels==37]
#   return boxes  

# def get_centroid(od_box):
#   # note: Torch bounding box returns points as [ xmin, ymin, xmax, ymax]
#   x_1 = od_box[0]
#   x_2 = od_box[2]
#   y_1 = od_box[1]
#   y_2 = od_box[3]
#   x_c = int((x_1 + x_2)/2)
#   y_c = int((y_1 + y_2)/2)
#   centroid = (x_c, y_c)
#   return centroid

# def frame_converter(frame):
#   image = frame.astype(np.float32)/255.0
#   image = np.transpose(image, [2,0,1])
#   image = torch.tensor(image, dtype=torch.float)
#   image = torch.unsqueeze(image, 0)
#   return image

# gaussian = namedtuple("Gaussian", ["mu", "sigma"])
# def predict(prior, movement):
#   """
#   prior: Gaussian named tuple; previous position of the ball
#   movement: Gaussian named tuple; velocity of the ball along x axis
#   returns prediction: Gaussian named tuple; prediction of the new possition of the ball
#   """
#   prediction = gaussian(prior.mu + movement.mu, prior.sigma + movement.sigma)
#   return prediction

# def update(pred, z):
#   """
#   pred: Gaussian named tuple; prediction of location of the ball
#   z: Gaussian named tuple; measurement of the centroid based on obj detector
#   returns pred: Gaussian named tuple; updated prediction of the position
#   """
#   mu = ((pred.sigma*z.mu)+(z.sigma*pred.mu))/(pred.sigma + z.sigma)
#   sigma = (pred.sigma*z.sigma)/(pred.mu + z.mu)
#   prediction = gaussian(mu, sigma)
#   return prediction

# def tracker(video):
#   velocity = gaussian(-17, 1)     # N(mean velocity, velocity variance)only x is changing
#   # STEP 1: use model to get bounding box(es) from first frame in image
#   first_frame = video[0]
#   image = frame_converter(first_frame)

#   measurement = get_bounding_boxes(image)[0]

#   # STEP 2: get centroid from bounding box
#   init_centroid = get_centroid(measurement)
#   init_state_x = init_centroid[0]
#   y = init_centroid[1]
#   prior = gaussian(init_state_x, 0.1)

#   # STEP 3: add centroid to first frame and add frame to output video
#   output_frames_list = []   # list for frames that will compse the video
#   # output_frame = cv2.circle(first_frame, centroid, 5, (255, 255, 255), -1)
#   output_frame = cv2.drawMarker(first_frame, init_centroid, z_color, mtype, z_size, thickness) 
#   output_frames_list.append(output_frame)

#   # STEP 4: kalman filter for each frame + add current and previous centroids to frame
#   # for i, frame in enumerate(video):
#   for frame in video[1:]:
#     # Prediction
#     prediction = predict(prior, velocity)
    
#     # Correction
#     tframe = frame_converter(frame) 
#     boxes = get_bounding_boxes(tframe)
#     if boxes.size != 0:
#       for box in boxes:
#         measurement = get_centroid(box)[0]          # only want x value
#         z = gaussian(measurement, 1)
#         prediction = update(prediction, z)
#         draw_updated = ( int(prediction.mu), y)       # (x, y)
#         frame = cv2.drawMarker(frame, draw_updated, z_color, mtype, z_size, thickness)
#     prior = prediction
#     draw_pred = (int(prediction.mu), y)
#     frame = cv2.drawMarker(frame, draw_pred, p_color, mtype, p_size, thickness)
#     output_frames_list.append(frame) 
    
#   return output_frames_list


# video1 = imageio.mimread('ball.mp4')
# output_vid1 = tracker(video1)

# track_single_out = 'single_track.mp4'
# output_video(video1, track_single_out)

# HTML(display_video(output_vid1).to_html5_video())

In [9]:
# UPDATED VERSION SINGLE OBJECT TRACKER

from collections import namedtuple
import cv2
import numpy as np
import torch
import torchvision
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.eval() 

# Centroid params
mtype = 4
p_size = 10
z_size = 20
p_color = (0,0,230)         # blue for prediction
z_color = (250, 103, 35)    # orange for measurement
thickness = 4

def get_bounding_boxes(frame):
  with torch.no_grad():
    model_dict = model(frame)[0]       # tensor
    bounding_boxes = model_dict['boxes']
    labels = model_dict['labels']
    boxes = bounding_boxes[labels==37]
  return boxes  

def get_centroid(od_box):
  # note: Torch bounding box returns points as [ xmin, ymin, xmax, ymax]
  x_1 = od_box[0]
  x_2 = od_box[2]
  y_1 = od_box[1]
  y_2 = od_box[3]
  x_c = int((x_1 + x_2)/2)
  y_c = int((y_1 + y_2)/2)
  centroid = (x_c, y_c)
  return centroid

def frame_converter(frame):
  image = frame.astype(np.float32)/255.0
  image = np.transpose(image, [2,0,1])
  image = torch.tensor(image, dtype=torch.float)
  image = torch.unsqueeze(image, 0)
  return image

gaussian = namedtuple("Gaussian", ["mu", "sigma"])
class Tracker:
  def __init__(self, start, velocity):
    self.prior = gaussian(int(start[0]), 0.1)
    self.y = int(start[1])
    self.v = gaussian(velocity, 0.1)
  
  def get_location(self):
    return (self.prior.mu, self.y)
  
  def predict(self):
    new_mu = self.prior.mu + self.v.mu
    new_sigma = self.prior.sigma + self.v.sigma
    self.prior = gaussian(new_mu, new_sigma)

  def update(self, z):
    pred = self.prior
    new_mu = int( ((pred.sigma*z.mu)+(z.sigma*pred.mu))/(pred.sigma + z.sigma))
    new_sigma = (pred.sigma*z.sigma)/(pred.sigma + z.sigma)
    self.prior = gaussian(new_mu, new_sigma)

def single_tracker(capture, model):
  velocity = gaussian(-17, 1)     # N(mean velocity, velocity variance) only x is changing
  output_frames_list = []
  # STEP 1: use model to get bounding box(es) from first frame in image
  ret, frame = capture.read()
  first_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # color conversion
  first_out = first_frame
  image = frame_converter(first_frame)

  measurement = get_bounding_boxes(image)[0]
  # STEP 2: get centroid from bounding box & initialize tracker
  init_centroid = get_centroid(measurement)
  velocity = -17
  single_track = Tracker(init_centroid, velocity)
  
  # STEP 3: add centroid to first frame and add frame to output video
  output_frame = cv2.drawMarker(first_out, init_centroid, z_color, mtype, z_size, thickness) 
  output_frames_list.append(output_frame)

  # STEP 4: kalman filter for each frame
  while True:
    ret, frame = capture.read()
    if ret is False:
      break
    
    # Prediction
    single_track.predict()
    pred = single_track.get_location()

    # Correction if there is a measurement
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # color conversion
    out_frame = frame
    tframe = frame_converter(frame) 
    boxes = get_bounding_boxes(tframe)
    if boxes.size != 0:
      for box in boxes:
        measurement = get_centroid(box)[0]          # only want x value
        z = gaussian(measurement, 0.01)
        single_track.update(z)
        pred = single_track.get_location()               # (x,y)
        frame = cv2.drawMarker(out_frame, pred, z_color, mtype, z_size, thickness)

    out_frame = cv2.drawMarker(out_frame, pred, p_color, mtype, p_size, thickness)
    output_frames_list.append(frame)
    output_frames_list.append(out_frame)
  
  return output_frames_list



cap3 = cv2.VideoCapture('ball.mp4')
output3 = single_tracker(cap3, model)

track_single_out = 'single_track.mp4'
output_video(output3, track_single_out)

HTML(display_video(output3).to_html5_video())

  f"The parameter '{pretrained_param}' is deprecated since 0.13 and will be removed in 0.15, "


In [10]:
# Multi Object Tracking
from collections import namedtuple
import torch
import torchvision
import math
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.eval() 
gaussian = namedtuple("Gaussian", ["mu", "sigma"])

blue = (0,0,230)           # blue for ball 1
orange = (250, 103, 35)    # orange for ball 2
size = 10


class Tracker:
  def __init__(self, start, velocity, color):
    self.prior = gaussian(int(start[0]), 0.1)
    self.y = int(start[1])
    self.v = gaussian(velocity, 0.1)
    self.color = color
  
  def get_location(self):
    return (self.prior.mu, self.y)
  
  def get_color(self):
    return self.color
  
  def predict(self):
    new_mu = self.prior.mu + self.v.mu
    new_sigma = self.prior.sigma + self.v.sigma
    self.prior = gaussian(new_mu, new_sigma)

  def update(self, z):
    pred = self.prior
    new_mu = int( ((pred.sigma*z.mu)+(z.sigma*pred.mu))/(pred.sigma + z.sigma))
    new_sigma = (pred.sigma*z.sigma)/(pred.sigma + z.sigma)
    self.prior = gaussian(new_mu, new_sigma)

def assign_measurement(prediction, measurements):
  """
  prediction: tuple (x,y) coordinates
  measuremnts: list of tuples [(x_1,y_1), (x_2, y_2)]
  return z: if none, don't assign measurement, else, assign
  return i: index of measurement to assign
  """
  min_distance = 100
  z = None
  index = None
  for i, m in enumerate(measurements):
    d = math.sqrt( (prediction[0] - m[0])**2 + (prediction[1] - m[1])**2)
    if d < min_distance:
      min_distance = d
      index = i
      z = m
  return index, z
  

def multi_tracker(video):
  output_frames_list = []   # list for frames that will compse the video
  trackers = []
  v = [-19, 28]
  c = [blue, orange]

  while True:
    ret, frame = video.read()
    if ret is False:
      break
    
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # color conversion
    output_frame = frame
    tframe = frame_converter(frame)
    measurements = get_bounding_boxes(tframe)
    centroid_list = [get_centroid(z) for z in measurements]

    for t in trackers:
      t.predict()
      i, m = assign_measurement(t.get_location(), centroid_list)
      if i != None:
        centroid_list.pop(i)
        z = gaussian(m[0], 0.1)
        t.update(z)
      loc = t.get_location()
      output_frame = cv2.drawMarker(output_frame, loc, t.get_color(), mtype, size, thickness)
      output_frames_list.append(output_frame)
    
    for m in centroid_list:
      tracker = Tracker(m, v.pop(0), c.pop(0))
      trackers.append(tracker)

  return output_frames_list


cap4 = cv2.VideoCapture('multiObject.avi')
output4 = multi_tracker(cap4)

track_multi_out = 'multi_track.mp4'
output_video(output4, track_multi_out, vidtype='avi')

HTML(display_video(output4).to_html5_video())

  f"The parameter '{pretrained_param}' is deprecated since 0.13 and will be removed in 0.15, "
