In [1]:
from collections import OrderedDict
import numpy as np
from scipy.spatial import distance
import cv2 as cv
from motrackers.utils import select_caffemodel, select_videofile

In [2]:
video_file = select_videofile('..')
prototxt, weights = select_caffemodel('..')
display(video_file, prototxt, weights)

FileChooser(path='..', filename='', show_hidden='False')

FileChooser(path='..', filename='', show_hidden='False')

FileChooser(path='..', filename='', show_hidden='False')

In [3]:
video = video_file.selected

In [4]:
model = {"prototxt": prototxt.selected,
         "weights": weights.selected,
         "object_names": {0: 'background', 
                          1: 'aeroplane', 
                          2: 'bicycle', 
                          3: 'bird',
                          4: 'boat',
                          5: 'bottle',
                          6: 'bus', 
                          7: 'car', 
                          8: 'cat', 
                          9: 'chair',
                          10: 'cow', 
                          11: 'diningtable', 
                          12: 'dog', 
                          13: 'horse',
                          14: 'motorbike', 
                          15: 'person', 
                          16: 'pottedplant',
                          17: 'sheep', 
                          18: 'sofa', 
                          19: 'train',
                          20: 'tvmonitor'},
         "threshold": 0.2,
         "confidence_threshold": 0.2,
         "pixel_std":1/127.5,
         "pixel_mean": 127.5,
         "input_size": (300, 300)
        }

max_object_lost_count = 5   # maximum number of object losts counted when the object is being tracked

np.random.seed(12345)
bbox_colors = {key: np.random.randint(0, 255, size=(3,)).tolist() for key in model['object_names'].keys()}

In [5]:
class Tracker:
    def __init__(self, maxLost = 30):
        self.nextObjectID = 0                   # ID of next object
        self.objects = OrderedDict()            # stores ID:Locations
        self.lost = OrderedDict()               # stores ID:Lost_count
        
        self.maxLost = maxLost                  # maximum number of images object was not detected.
        
    def addObject(self, new_object_location):
        
        # store new object location
        self.objects[self.nextObjectID] = new_object_location
        
        # initialize image counts for when new object is undetected
        self.lost[self.nextObjectID] = 0
        
        self.nextObjectID += 1
    
    def removeObject(self, objectID):
        # remove tracker data after object is lost
        del self.objects[objectID]
        del self.lost[objectID]
    
    @staticmethod
    def getLocation(bounding_box):
        xlt, ylt, xrb, yrb = bounding_box
        return (int((xlt + xrb) / 2.0), int((ylt + yrb) / 2.0))
    
    def update(self,  detections):
        
        # if no object detected in the image
        if len(detections) == 0:
            lost_ids = list(self.lost.keys())
            for objectID in lost_ids:
                self.lost[objectID] +=1
                if self.lost[objectID] > self.maxLost: self.removeObject(objectID)
            
            return self.objects
        
        # current object locations
        new_object_locations = np.zeros((len(detections), 2), dtype="int")
        
        for (i, detection) in enumerate(detections): new_object_locations[i] = self.getLocation(detection)
            
        if len(self.objects)==0:
            for i in range(0, len(detections)): self.addObject(new_object_locations[i])
        else:
            objectIDs = list(self.objects.keys())
            previous_object_locations = np.array(list(self.objects.values()))
            
            # pairwise distance between previous and current
            D = distance.cdist(previous_object_locations, new_object_locations)
            
            # (minimum distance of previous from current).sort_as_per_index
            row_idx = D.min(axis=1).argsort()
            
            # index of minimum distance of previous from current
            cols_idx = D.argmin(axis=1)[row_idx]
            
            assignedRows, assignedCols = set(), set()
            
            for (row, col) in zip(row_idx, cols_idx):
                
                if row in assignedRows or col in assignedCols:
                    continue
                
                objectID = objectIDs[row]
                self.objects[objectID] = new_object_locations[col]
                self.lost[objectID] = 0
                
                assignedRows.add(row)
                assignedCols.add(col)
                
            unassignedRows = set(range(0, D.shape[0])).difference(assignedRows)
            unassignedCols = set(range(0, D.shape[1])).difference(assignedCols)
            
            
            if D.shape[0]>=D.shape[1]:
                for row in unassignedRows:
                    objectID = objectIDs[row]
                    self.lost[objectID] += 1
                    
                    if self.lost[objectID] > self.maxLost:
                        self.removeObject(objectID)
                        
            else:
                for col in unassignedCols:
                    self.addObject(new_object_locations[col])
            
        return self.objects


In [7]:
cap = cv.VideoCapture(video)
net = cv.dnn.readNetFromCaffe(model["prototxt"], model["weights"])
tracker = Tracker(maxLost=max_object_lost_count)

In [9]:
(H, W) = (None, None)
writer = None

while True:
    ok, image = cap.read()
    
    if not ok:
        print("Cannot read the video feed.")
        break
    
    if W is None or H is None: 
        (H, W) = image.shape[:2]
    
    image_resized = cv.resize(image, model["input_size"])

    blob = cv.dnn.blobFromImage(image_resized, 
                                 model["pixel_std"], 
                                 model["input_size"], 
                                 (model["pixel_mean"], model["pixel_mean"], model["pixel_mean"]), 
                                 False)

    net.setInput(blob)
    detections = net.forward()

    rows = image_resized.shape[0]
    cols = image_resized.shape[1]
    
    boxes, confidences, classIDs, detections_bbox = [], [], [], []

    for i in range(detections.shape[2]):
        confidence = detections[0, 0, i, 2]
        if confidence > model['confidence_threshold']:
            class_id = int(detections[0, 0, i, 1])

            # object location 
            left = int(detections[0, 0, i, 3] * cols) 
            top = int(detections[0, 0, i, 4] * rows)
            right = int(detections[0, 0, i, 5] * cols)
            bottom = int(detections[0, 0, i, 6] * rows)
            
            # scaling factor of image
            height_factor = image.shape[0]/float(model["input_size"][0])
            width_factor = image.shape[1]/float(model["input_size"][1])
            
            # scale object detection bounding box to original image
            left = int(width_factor * left) 
            top = int(height_factor * top)
            right = int(width_factor * right)
            bottom = int(height_factor * bottom)
            
            width, height = right - left, bottom-top
            
            boxes.append([left, top, width, height])
            confidences.append(float(confidence))
            classIDs.append(int(class_id))
        
    indices = cv.dnn.NMSBoxes(boxes, confidences, model["confidence_threshold"], model["threshold"])
    
    if len(indices)>0:
        for i in indices.flatten():
            x, y, w, h = boxes[i][0], boxes[i][1], boxes[i][2], boxes[i][3]
            
            detections_bbox.append((x, y, x+w, y+h))
            
            clr = [int(c) for c in bbox_colors[classIDs[i]]]
            cv.rectangle(image, (x, y), (x+w, y+h), clr, 2)
            
            label = "{}:{:.4f}".format(model["object_names"][classIDs[i]], confidences[i])
            (label_width, label_height), baseLine = cv.getTextSize(label, cv.FONT_HERSHEY_SIMPLEX, 0.5, 2)
            y_label = max(y, label_height)
            cv.rectangle(image, (x, y_label-label_height),
                                 (x+label_width, y_label+baseLine), (255, 255, 255), cv.FILLED)
            cv.putText(image, label, (x, y_label), cv.FONT_HERSHEY_SIMPLEX, 0.5, clr, 2)
        
    objects = tracker.update(detections_bbox)
    
    for (objectID, centroid) in objects.items():
        text = "ID {}".format(objectID)
        cv.putText(image, text, (centroid[0] - 10, centroid[1] - 10), cv.FONT_HERSHEY_SIMPLEX,
                    0.5, (0, 255, 0), 2)
        cv.circle(image, (centroid[0], centroid[1]), 4, (0, 255, 0), -1)
        
    cv.imshow("image", image)
    
    if cv.waitKey(1) & 0xFF == ord('q'):
        break
        
    if writer is None:
        fourcc = cv.VideoWriter_fourcc(*"MJPG")
        writer = cv.VideoWriter("output.avi", fourcc, 30, (W, H), True)
    writer.write(image)

writer.release()
cap.release()
cv.destroyWindow("image")

Cannot read the video feed.
