## Person and Vehicle Counter Using OpenCV and Pre-trained Intel Model by OpenVINO

### NOTE: Work in Progress (currently includes Object Detection and Tracking)
- Model reference: https://github.com/openvinotoolkit/open_model_zoo/tree/master/models/intel/person-vehicle-bike-detection-crossroad-0078

### Import Libraries

In [1]:
import collections 
import os
import sys
import time

root_path = "c:/Users/Lenard/Person and Vehicle Counter/" #set to directory of Person and Vehicle Counter repository
sys.path += [root_path]

import cv2
import numpy as np
from IPython import display
from openvino.runtime import Core

#for object tracking
import dlib
from pyimagesearch.centroidtracker import CentroidTracker
from pyimagesearch.trackableobject import TrackableObject

import imutils
from imutils.video import VideoStream, FPS

### Load the Detection Model

In [2]:
THRESH = 0.8

model_path = "c:/Users/Lenard/intel/person-vehicle-bike-detection-crossroad-0078/FP16/person-vehicle-bike-detection-crossroad-0078.xml"

#initialize inference engine
ie = Core()
#read the network and corresponding weights from file
detector = ie.read_model(model=model_path)

#compile the model for the CPU (you may also use GPU, MYRIAD, etc.)
#or let the engine choose the available device (AUTO)
compiled_model = ie.compile_model(model=detector, device_name="GPU")

#get input and output nodes
input_layer = compiled_model.input(0)
output_layer = compiled_model.output(0)

#get input size
height, width = list(input_layer.shape)[2:]
print("Height: {}, Width: {}".format(height,width))
print(input_layer.any_name, output_layer.any_name)

Height: 1024, Width: 1024
data detection_out


### Load Centroid Tracker

In [3]:
# Instantiate the centroid tracker, then initialize a list to store
# every dlib correlation trackers, followed by a dictionary to
# map every uniqued object ID to a TrackableObject
#ct = CentroidTracker(maxDisappeared=40, maxDistance=50)
ct = CentroidTracker()
trackers = []
trackableObjects = {}

### Processing Results
- List available classes and create corresponding colors. 
- In post-processing, boxes are transformed via normalization. 
- NMS will also be used to avoid overlapping detections and those that do not meet the threshold.
- Finally, boxes and labels will be drawn

In [4]:
#classes of person-vehicle-bike-detection-crossroad-0078
classes = ["person", "vehicle", "bike"]

#map corresponding colors to classes
colors = cv2.applyColorMap(
    src = np.arange(0, 255, 255 / len(classes), dtype=np.float32).astype(np.uint8),
    colormap = cv2.COLORMAP_RAINBOW,
).squeeze()

In [5]:
def process_results(frame, results, thresh=0.6):
    # size of the original frame
    h, w = frame.shape[:2]
    results = results.squeeze()
    boxes = []
    labels = []
    scores = []
    ids = []
    new_rects = []
    for idx, label, score, xmin, ymin, xmax, ymax in results:
        # create a box using pixel coordinates
        boxes.append(
            tuple(map(int, (xmin*w, ymin*h, (xmax-xmin)*w, (ymax-ymin)*h)))
        )
        labels.append(int(label))
        scores.append(float(score))
        ids.append(int(idx))
        
    # apply NMS (Non-Maximum Suppression) to eliminate overlapping entities
    # this algorithm returns indices of objects to keep
    indices = cv2.dnn.NMSBoxes(
        bboxes=boxes, scores=scores, score_threshold=thresh, nms_threshold=0.6
    )
    
    #if there are no boxes
    if len(indices) == 0:
        return []
        
    #filter detected objects
    return [(labels[i], scores[i], boxes[i]) for i in indices.flatten()]


def draw_boxes(frame, boxes):
    for label, score, box in boxes:
        label = label-1
        color = tuple(map(int, colors[label]))
        #draw box
        x2 = box[0] + box[2]
        y2 = box[1] + box[3]
        cv2.rectangle(frame, box[:2], (x2,y2), color=color, thickness=3)
        
        #draw label
        cv2.putText(
            frame,
            f"{classes[label]} {score:.2f}",
            (box[0]+10, box[1]+30),
            cv2.FONT_HERSHEY_COMPLEX,
            frame.shape[1]/1000,
            color,
            1,
            cv2.LINE_AA
        )
    return frame

In [6]:
def detect(source=0, flip=False, use_popup=False, skip_first_frames=0):
    vid = cv2.VideoCapture(source)

    #give camera time to warm up
    time.sleep(0.1)
    
    processing_times = collections.deque()
    
    frame_count = 0
    
    #begin video capturing
    while (vid.isOpened()):
        #capture frame
        ret, frame = vid.read()
        
        if frame is None:
            print('Completed!')
            break
        
        #if frame is larger than full HD, reduce size to improve performance
        scale = 1280 / max(frame.shape)
        if scale < 1:
            frame = cv2.resize(
                src=frame,
                dsize=None,
                fx=scale,
                fy=scale,
                interpolation=cv2.INTER_AREA
            )
        
        
        # resize image to fit neural network input
        input_frame = cv2.resize(
            frame, dsize=(width,height), interpolation=cv2.INTER_AREA
        )
        frame_rgb = cv2.cvtColor(input_frame, cv2.COLOR_BGR2RGB)
        #create batch of images
        input_frame = np.expand_dims(input_frame.transpose(2,0,1), axis=0)
        
        ## Initialize status
        statyus = "waiting"
        rects = []
        
        #perform detection on every 5 frames
        if frame_count % 5 == 0:
            # set status to detection
            status = "detecting"
            trackers = []
            
            #measure processing time
            t1 = time.time()
            #get detections
            results = compiled_model([input_frame])[output_layer]
            t2 = time.time()
            
            # size of the original frame
            h, w = frame.shape[:2]
            results_squeezed = results.squeeze()
    
            for idx, label, score, xmin, ymin, xmax, ymax in results_squeezed:
                
                if score > THRESH:
                    #get (x,y) coordinates of bounding box
                    (startX, startY, endX, endY) = tuple(map(int, (xmin*w, ymin*h, xmax*w, ymax*h)))
                    
                    #start dlib correlation tracker
                    tracker = dlib.correlation_tracker()
                    rect = dlib.rectangle(startX, startY, endX, endY)
                    tracker.start_track(frame, rect)
                    
                    #add the tracker to tracker list
                    trackers.append(tracker)
        else:
            for tracker in trackers:
                #set status to tracking
                status = "tracking"
                
                #update tracker
                tracker.update(frame)
                pos = tracker.get_position()
                
                #Unpack the position object
                startX = int(pos.left())
                startY = int(pos.top())
                endX = int(pos.right())
                endY = int(pos.bottom())
                
                #append to rects list
                rects.append((startX, startY, endX, endY))
        
        # get poses from network results
        boxes = process_results(frame=frame, results=results, thresh=THRESH)
        # draw bboxes
        frame = draw_boxes(frame=frame, boxes=boxes)
        
        #use centroid tracker to associate old object centroids with new ones
        objects = ct.update(rects)
        
        #loop over tracked objects
        for (objectID, centroid) in objects.items():
            #check for an existing object ID for a trackable object
            to = trackableObjects.get(objectID, None)
            
            #if there is no existing trackable object, create one
            if to is None:
                to = TrackableObject(objectID, centroid)
                
            else:
                to.centroids.append(centroid)
                
            #store trackable object in dictionary
            trackableObjects[objectID] = to
            
            #display text and circle relating to the tracked object
            cv2.putText(
                frame, "ID {}".format(objectID),
                (centroid[0]-10, centroid[1]-10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2
            )
            cv2.circle(frame, (centroid[0],centroid[1]), 4, (0,255,0), -1)
        
        processing_times.append(t1 - t2)
        #use processing times from last 200 frames
        if len(processing_times) > 200:
            processing_times.popleft()
            
        _, f_width = frame.shape[:2]
        # get mean processing time [ms]
        processing_time = np.mean(processing_times) * 1000
        #get fps
        fps = 1000 / processing_time
        #display fps
        cv2.putText(
            frame,
            f"Inference Time: {processing_time:.1f}ms ({fps:.1f} FPS)",
            (20, 40),
            cv2.FONT_HERSHEY_COMPLEX,
            f_width / 1000,
            (0,0,255),
            1,
            cv2.LINE_AA
        )
        
        frame_count += 1
        
        #display video stream
        cv2.imshow('Video', frame)

        if cv2.waitKey(1) & 0xFF == ord('q'): #press Q to quit
            break

    #clear stream capture
    vid.release()
    cv2.destroyAllWindows()

In [7]:
video_path = root_path + 'data/video/test.mp4'
detect(source=video_path, flip=True, use_popup=False)