In [1]:
import cv2 as cv
from scipy.spatial import distance
import numpy as np
from collections import OrderedDict

##### Object Tracking Class

In [2]:
class Tracker:
    def __init__(self, maxLost = 30):           # maxLost: maximum object lost counted when the object is being tracked
        self.nextObjectID = 0                   # ID of next object
        self.objects = OrderedDict()            # stores ID:Locations
        self.lost = OrderedDict()               # stores ID:Lost_count
        
        self.maxLost = maxLost                  # maximum number of frames object was not detected.
        
    def addObject(self, new_object_location):
        self.objects[self.nextObjectID] = new_object_location    # store new object location
        self.lost[self.nextObjectID] = 0                         # initialize frame_counts for when new object is undetected
        
        self.nextObjectID += 1
    
    def removeObject(self, objectID):                          # remove tracker data after object is lost
        del self.objects[objectID]
        del self.lost[objectID]
    
    @staticmethod
    def getLocation(bounding_box):
        xlt, ylt, xrb, yrb = bounding_box
        return (int((xlt + xrb) / 2.0), int((ylt + yrb) / 2.0))
    
    def update(self,  detections):
        
        if len(detections) == 0:   # if no object detected in the frame
            for objectID in self.lost.keys():
                self.lost[objectID] +=1
                if self.lost[objectID] > self.maxLost: self.removeObject(objectID)
            
            return self.objects
        
        new_object_locations = np.zeros((len(detections), 2), dtype="int")     # current object locations
        
        for (i, detection) in enumerate(detections): new_object_locations[i] = self.getLocation(detection)
            
        if len(self.objects)==0:
            for i in range(0, len(detections)): self.addObject(new_object_locations[i])
        else:
            objectIDs = list(self.objects.keys())
            previous_object_locations = np.array(list(self.objects.values()))
            
            D = distance.cdist(previous_object_locations, new_object_locations) # pairwise distance between previous and current
            
            row_idx = D.min(axis=1).argsort()   # (minimum distance of previous from current).sort_as_per_index
            
            cols_idx = D.argmin(axis=1)[row_idx]   # index of minimum distance of previous from current
            
            assignedRows, assignedCols = set(), set()
            
            for (row, col) in zip(row_idx, cols_idx):
                
                if row in assignedRows or col in assignedCols:
                    continue
                
                objectID = objectIDs[row]
                self.objects[objectID] = new_object_locations[col]
                self.lost[objectID] = 0
                
                assignedRows.add(row)
                assignedCols.add(col)
                
            unassignedRows = set(range(0, D.shape[0])).difference(assignedRows)
            unassignedCols = set(range(0, D.shape[1])).difference(assignedCols)
            
            
            if D.shape[0]>=D.shape[1]:
                for row in unassignedRows:
                    objectID = objectIDs[row]
                    self.lost[objectID] += 1
                    
                    if self.lost[objectID] > self.maxLost:
                        self.removeObject(objectID)
                        
            else:
                for col in unassignedCols:
                    self.addObject(new_object_locations[col])
            
        return self.objects


#### Loading Object Detector Model

##### Face Detection and Tracking

Here, the Face Detection Caffe Model is used.

The files are taken from the following link:
https://github.com/opencv/opencv_3rdparty/tree/dnn_samples_face_detector_20170830

In [3]:
caffemodel = {"prototxt":"./caffemodel_dir/deploy.prototxt",
              "model":"./caffemodel_dir/res10_300x300_ssd_iter_140000.caffemodel",
              "acc_threshold":0.50                  # neglected detections with probability less than acc_threshold value
             }

net = cv.dnn.readNetFromCaffe(caffemodel["prototxt"], caffemodel["model"])

##### Instantiate the Tracker Class

In [4]:
maxLost = 60   # maximum number of object losts counted when the object is being tracked
tracker = Tracker(maxLost = maxLost)

##### Initiate opencv video capture object

The `video_src` can take two values:
1. If `video_src=0`: OpenCV accesses the camera connected through USB
2. If `video_src='video_file_path'`: OpenCV will access the video file at the given path (can be MP4, AVI, etc format)

In [10]:
video_src = 0
cap = cv.VideoCapture(video_src)  

##### Start object detection and tracking

In [11]:
(H, W) = (None, None)  # input image height and width for the network

while(True):
    
    ok, image = cap.read()
    
    if not ok:
        print("Cannot read the video feed.")
        break
    
    image = cv.resize(image, (400, 400), interpolation = cv.INTER_AREA)
    
    if W is None or H is None: (H, W) = image.shape[:2]
    
    blob = cv.dnn.blobFromImage(image, 1.0, (W, H), (104.0, 177.0, 123.0))
    
    net.setInput(blob)
    detections = net.forward()                   # detect objects using object detection model
    
    detections_bbox = []                         # bounding box for detections
    
    for i in range(0, detections.shape[2]):
        if detections[0, 0, i, 2] > caffemodel["acc_threshold"]:
            box = detections[0, 0, i, 3:7] * np.array([W, H, W, H])
            detections_bbox.append(box.astype("int"))
            
            # draw a bounding box surrounding the object so we can visualize it
            (startX, startY, endX, endY) = box.astype("int")
            cv.rectangle(image, (startX, startY), (endX, endY), (0, 255, 0), 2)    
    
    objects = tracker.update(detections_bbox)                  # update tracker based on the newly detected objects
    
    for (objectID, centroid) in objects.items():
        text = "ID {}".format(objectID)
        cv.putText(image, text, (centroid[0] - 10, centroid[1] - 10), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        cv.circle(image, (centroid[0], centroid[1]), 4, (0, 255, 0), -1)
        
    cv.imshow("image", image)
    
    if cv.waitKey(1) & 0xFF == ord('q'):
        break
        
cap.release()
cv.destroyWindow("image")

Cannot read the video feed.
