In [None]:
import cv2
import torch
import numpy as np
import pandas as pd
import concurrent.futures

In [None]:
# Model 1: For Helmet/Head Detection
path = ".../custom_weights.pt"
model1 = torch.hub.load('ultralytics/yolov5', 'custom', path, force_reload = True)

In [None]:
# Model 2: For Motorcycle Detection
model2 = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained = True)

coco_file = open(".../coco.txt") 
data = coco_file.read()
class_list = data.split("\n") 

In [None]:
# Model 1
def run_model1(frame):
    results1 = model1(frame)
    
    px1 = pd.DataFrame(results1.xyxy[0])
    list1 = []
    for index,row in px1.iterrows():
        x1 = int(row[0]) 
        y1 = int(row[1])
        x2 = int(row[2]) 
        y2 = int(row[3])  
        d = int(row[5])
        if d == 2: #0 - helmet, 1 - person, 2 - head
            list1.append([x1, y1, x2, y2])

    return list1

In [None]:
# Model 2
def run_model2(frame):
    results2 = model2(frame)
    
    px2 = pd.DataFrame(results2.xyxy[0])
    list2 = []
    for index,row in px2.iterrows():
        x1 = int(row[0]) 
        y1 = int(row[1]) 
        x2 = int(row[2]) 
        y2 = int(row[3])  
        c = class_list[int(row[5])] 
        if 'motorcycle' in c:
            list2.append([x1, y1 - int(0.2*image_height), x2, y2])

    return list2

In [None]:
# Integration of results from Model 1 and 2
def find_point_box_relationship(frame, list1, list2):
    for box1 in list1:
        for box2 in list2:
            x1, y1, x2, y2 = box1
            p_inside = box2[0] <= x1 <= box2[2] and box2[1] <= y1 <= box2[3] and box2[0] <= x2 <= box2[2] and box2[1] <= y2 <= box2[3]

            if p_inside:
                cv2.rectangle(frame,(box1[0], box1[1]),(box1[2], box1[3]), (0, 255, 0), 2)
                cv2.rectangle(frame,(box2[0], box2[1]),(box2[2], box2[3]), (0, 0, 255), 2)
                cv2.putText(frame, "Head" ,(box1[0], box1[1]),cv2.FONT_HERSHEY_COMPLEX,(0.5),(0,255,255),1)

                # roi = frame[box2[1]:box2[3], box2[0]:box2[2]]
                # if roi.size != 0:
                #     roi = cv2.cvtColor(roi, cv2.COLOR_RGB2BGR)
                #     roi_height, roi_width, _ = roi.shape
                #     roi = cv2.resize(roi, (roi_width*3, roi_height*3))
                #     cv2.imwrite(".../Bike Frames/image_%d.jpg" %box2[2], roi)

In [None]:
# Main
cap = cv2.VideoCapture(".../sample video.mp4")
count = 0

with concurrent.futures.ThreadPoolExecutor(max_workers = 2) as executor:
    while True:
        ret, frame = cap.read()
        if ret is False: break
        count += 1
        if count % 3 != 0: 
            continue
        
        # frame = cv2.resize(frame, (1020, 500))
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        image_height, image_width, _ = frame.shape
        
        future1 = executor.submit(run_model1, frame)
        future2 = executor.submit(run_model2, frame)
        
        # concurrent.futures.wait([future1, future2])
        
        list1 = future1.result()
        list2 = future2.result()

        if list1 and list2:
            find_point_box_relationship(frame, list1, list2)
        
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        cv2.imshow("Frame", frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break

cap.release()
cv2.destroyAllWindows()
cv2.waitKey(1)