In [9]:
import numpy as np
import cv2
import time
import imutils

from scipy.spatial.distance import cosine
from tensorflow.keras.applications.vgg16 import VGG16
from concurrent.futures import ThreadPoolExecutor, as_completed
from libs.yolov4.object_detection import Detector as Yolov4
from libs.yolov5.object_detection import Detector as Yolov5

In [25]:
VIDEO_FILEPATH = "assets/cctv-person.mp4"

WINDOW_NAME  = "app"
FRAME_HEIGHT = 600

PERSON_SIZE = (200, 200)

INCREASE_PX_DEFAULT = 15
INCREASE_PX_DELTA   = 2

COMPARING_TOTAL = 200
COMPARING_DELTA = 20

COSINE_THRESHOLD = 0.6

In [26]:
# person_detector = Yolov4("libs/yolov4/yolov4.cfg", "libs/yolov4/yolov4.weights", only_class=[0], confThreshold=0.2)
person_detector = Yolov4("libs/yolov4/yolov4-tiny.cfg", "libs/yolov4/yolov4-tiny.weights", only_class=[0], confThreshold=0.2)
# person_detector = Yolov5("yolov5s", only_class=[0], threshold=0.2)
extract_model   = VGG16(input_shape=(*PERSON_SIZE, 3), weights='imagenet', include_top=False)
executor        = ThreadPoolExecutor(max_workers=32)

In [27]:
# interpolation of cv2 resize
def inter(original_height, changed_height):
    return cv2.INTER_AREA if original_height > changed_height else cv2.INTER_LANCZOS4

# get image with coordinate `coor` of `frame`
def crop(frame, coor):
    return frame[coor[1]:coor[3], coor[0]:coor[2]]

# draw cv2 rectangle on `frame`
def draw_rect(frame, coor, color=(0, 0, 255), thickness=5):
    cv2.rectangle(frame, coor[:2], coor[2:], color=color, thickness=thickness)

# check if coordinate `point` in `box` of frame
def point_in_box(box, point):
    x, y           = point
    x0, y0, x1, y1 = box

    return x >= x0 and x <= x1 and y >= y0 and y <= y1 

# increase box for decrease searching area needs
def increase_box(frame, box, px):
    x0, y0, x1, y1 = box
    x0, y0, x1, y1 = x0-px, y0-px, x1+px, y1+px
    h, w, c        = frame.shape

    x0 = x0 if x0 > 0 else 0
    y0 = y0 if y0 > 0 else 0
    x1 = x1 if x1 < w else w
    y1 = y1 if y1 < h else h

    return np.array([x0, y0, x1, y1])

# resize object with cv2
def resize_object(img, size):
    return cv2.resize(img, size, interpolation=inter(len(img), size[0]))

# extract image to VGG16 model
def extract(img):
    return extract_model.predict(img.reshape(-1, *img.shape)).flatten()/255

# convert current coordinate to real coordinate of frame
def realize_coor(curr_coor, real_coor):
    return curr_coor + (real_coor[:2].tolist()+real_coor[:2].tolist())

# get candidate of person that will be tracked
def get_candidate(coor):
    # extract image to VGG16 model
    person = crop(frame, coor)
    person = resize_object(person, PERSON_SIZE)
    person = extract(person)

    # calculate cosine among current person and compared person
    until    = -len(person_detected) - 1 if COMPARING_DELTA*COMPARING_TOTAL > len(person_detected) else -(COMPARING_DELTA*COMPARING_TOTAL) - 1
    compared = np.array(person_detected, dtype=object)[range(-1, until, -COMPARING_DELTA)]
    distance = np.mean([cosine(extracted, person) for extracted, _ in compared])
    
    cv2.putText(frame, f'cosine: {distance:.2f} ', coor[:2]-[0,10], cv2.FONT_HERSHEY_PLAIN, 1, (255,0,255), 2, cv2.LINE_AA)
    # cv2.putText(frame, f'cosine: {distance:.2f} ', coor[:2]-[0,10], cv2.FONT_HERSHEY_PLAIN, 2, (255,0,255), 3, cv2.LINE_AA)
    
    return [person, coor, distance]
    

In [28]:
# for handle mouse activity
def object_selection(event, x, y, flags, params):
    global paused, person_detected, face_detected, last_frame_detected, found_in_new_cap
    
    # if left click and frame is paused
    if event == cv2.EVENT_LBUTTONDOWN and paused:
        paused = False

        for person_coor in persons:
            # if click position inner box, select this object to be tracked
            if point_in_box(person_coor, (x, y)):
                person              = crop(frame, person_coor)
                person              = resize_object(person, PERSON_SIZE)
                person              = extract(person)
                person_detected     = [[person, person_coor]]
                return

In [30]:
# tracking function
# lost             = 0
person_detected  = []
paused           = False
increase_px      = INCREASE_PX_DEFAULT

# init mouse handle function
cv2.namedWindow(WINDOW_NAME)
cv2.setMouseCallback(WINDOW_NAME, object_selection)

try:
    # read video
    cap = cv2.VideoCapture(VIDEO_FILEPATH)
    
    while cap.isOpened():
        # if frame is not paused
        if not paused:
            # read frame
            ret, frame = cap.read()
            
            if ret:
                # resize frame
                frame      = imutils.resize(frame, height=FRAME_HEIGHT, inter=inter(len(frame), FRAME_HEIGHT))
                start_time = time.time()
                tracked    = False

                # if person has not been selected
                if not person_detected:
                    # detect persons
                    _, persons = person_detector.detect(frame)
        
                    # draw person on frame
                    for person_coor in persons:
                        draw_rect(frame, person_coor, color=(0, 255, 255), thickness=5-1)
                    
                # if person has been selected
                else:
                    # box is previous box that has been increased
                    box = increase_box(frame, np.array(person_detected, dtype=object)[-1, 1], increase_px)
                
                    # detect person in box
                    _, persons = person_detector.detect(crop(frame, box))
                    candidates = []
                    
                    draw_rect(frame, box, color=(255, 0, 0), thickness=5-1)
                    
                    # concurrency for get candidate person
                    futures = [executor.submit(get_candidate, realize_coor(coor, box)) for coor in persons]
                    for coor in persons:
                        draw_rect(frame, realize_coor(coor, box), color=(0, 255, 0), thickness=10-4)
                    
                    # loop finished conncurency
                    for future in as_completed(futures):
                        result = future.result()
                        if result:
                            candidates.append(result)

                    # if there are candidates
                    if candidates:
                        # get candidate that has smallest cosine
                        candidates.sort(key=lambda x: x[2])
                        candidate = candidates[0]

                        # check if cosine lower then threshold
                        if candidate[2] < COSINE_THRESHOLD:
                            tracked     = True
                            increase_px = INCREASE_PX_DEFAULT
                            
                            # detect face of current person
                            draw_rect(frame, candidate[1], color=(0, 0, 255), thickness=3-1)
                            person_detected.append(candidate[:2])
                    
                    # if no tracked person
                    if not tracked:
                        increase_px += INCREASE_PX_DELTA

                # cv2.putText(frame, f'fps: {np.round(1/(time.time()-start_time), 2)}', (10, 40), cv2.FONT_HERSHEY_PLAIN, 2, (0,0,255), 3, cv2.LINE_AA)
                cv2.putText(frame, f'Frame: {int(cap.get(cv2.CAP_PROP_POS_FRAMES))}', (10, 40), cv2.FONT_HERSHEY_PLAIN, 2, (0,0,255), 3, cv2.LINE_AA)
                cv2.imshow(WINDOW_NAME, frame)
                
                if not person_detected:
                    paused = not paused
            else:
                break
        
        # if press q then will stop capture, if press s then will pause capture
        key = cv2.waitKey(1)
        if key == ord("q"):
            break
        elif key == ord("s"):
            paused = not paused

finally:
    cap.release()
    cv2.destroyAllWindows()
    cv2.waitKey(1)

