In [1]:
import numpy as np
import cv2
from shapely.geometry import Polygon
import os
import shutil

In [2]:
#constants
INPUT_WIDTH = 640
INPUT_HEIGHT = 640
MIN_CONFIDENCE = 0.4
NON_MAXIMUM_SUPPRESION_THRESH = 0.45
SCORE_THRESH = 0.25
CLASSES = ['bicycle', 'car', 'motorbike', 'bus', 'truck']
VIDEO_EXTENSION_AND_FOURCC_DICT = {
    'avi': cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'),
    'mp4': 0x7634706d
}

In [3]:
net = cv2.dnn.readNet('assets/yolov5s.onnx')

In [4]:
# image has to be converted to 640x640 for yolov5
def format_yolov5(image):
    row, col, _ = image.shape
    _max = max(col, row)
    result = np.zeros((_max, _max, 3), np.uint8)
    result[0:row, 0:col] = image
    
    return result

In [5]:
def get_preds(formatted_image, net):
    blob = cv2.dnn.blobFromImage(formatted_image , 1/255.0, (INPUT_WIDTH, INPUT_HEIGHT), swapRB=True)
    net.setInput(blob)

    return net.forward()

In [6]:
def classnames_to_ids(classnames):
    return [id for id in range(len(classnames)) if classnames[id] in CLASSES]

In [7]:
def get_classes(path="assets/classes.txt"):
    class_list = []
    with open(path, "r") as f:
        class_list = [cname.strip() for cname in f.readlines()]
    return class_list

In [8]:
def unwrap_detections(
    predictions, 
    formatted_image, 
    classes_path="assets/classes.txt"
):
    class_ids = []
    confidences = []
    boxes = []

    output_data = predictions[0]
    rows = output_data.shape[0]

    image_width, image_height, _ = formatted_image.shape
    x_factor = image_width / INPUT_WIDTH
    y_factor = image_height / INPUT_HEIGHT

    for r in range(rows):
        row = output_data[r]
        confidence = row[4]
        if confidence >= MIN_CONFIDENCE:

            classes_scores = row[5:]
            _, _, _, max_indx = cv2.minMaxLoc(classes_scores)
            class_id = max_indx[1]
            if (classes_scores[class_id] > SCORE_THRESH):

                confidences.append(confidence)

                class_ids.append(class_id)

                x, y, w, h = row[0].item(), row[1].item(), row[2].item(), row[3].item() 
                left = int((x - 0.5 * w) * x_factor)
                top = int((y - 0.5 * h) * y_factor)
                width = int(w * x_factor)
                height = int(h * y_factor)
                box = np.array([left, top, width, height])
                boxes.append(box)

    class_list = get_classes(classes_path)

    desired_classes_ids = classnames_to_ids(class_list) 

    indexes = cv2.dnn.NMSBoxes(boxes, confidences, SCORE_THRESH, NON_MAXIMUM_SUPPRESION_THRESH) 

    result_boxes = []

    for i in indexes:
        if class_ids[i] in desired_classes_ids:
            result_boxes.append(boxes[i])
    
    return result_boxes

In [9]:
def read_frames(video_path):
    """
    This function takes the video path and returns the a list of frames.
    :param video_path: Path to the video
    """
    frames = []
    cap = cv2.VideoCapture(video_path)
    if cap.isOpened() == False: 
        raise Exception("Error opening video stream or file") 
    
    while cap.isOpened():  
        ret, frame = cap.read() # Read the frame
        if ret is True:
            frames.append(frame)
        else:
            break
    cap.release()
    return frames

In [10]:
def get_IOU(poly1, poly2):
    if poly1.intersects(poly2): 
        intersect = poly1.intersection(poly2).area
        union = poly1.union(poly2).area
        return intersect / union
    return 0

In [11]:
def parse_query(query_path):
    with open(query_path, 'r') as query:
        queried_bb = query.read().splitlines()

    return queried_bb

In [14]:
def generate_predictions(query_filename, video_path, net, out_path, pad=50, save_video=False):
    query = parse_query(query_filename)
    out_paths = out_path.split('/')
    out_paths = [path for path in out_paths if path != '']

    for i in range(len(out_paths)):
        if not os.path.exists('/'.join(out_paths[:i+1])):
            os.mkdir('/'.join(out_paths[:i+1]))

    frames = read_frames(video_path)
    start_frame = int(query[1].split(' ')[0])
    end_frame = int(query[0].split(' ')[0])
    starting_bb = [int(number) for number in query[1].split(' ')[1:]]

    tracked_bbs = [starting_bb]

    if save_video:
        video_output_name = video_path.split('/')[-1]
        output_video = cv2.VideoWriter(
            video_output_name, VIDEO_EXTENSION_AND_FOURCC_DICT["mp4"],
            60,
            (frames[0].shape[1], frames[0].shape[0])
        )

    shutil.copy(query_filename, out_path + query_filename.split('/')[-1])
    with open(out_path + query_filename.split('/')[-1], "a+") as f:
        f.write('\n')
        for i in range(start_frame, len(frames)):
            if i == end_frame:
                break

            tracked_bb = tracked_bbs[i-start_frame]
            tracked_bb_xmin, tracked_bb_ymin, tracked_bb_xmax, tracked_bb_ymax = tracked_bb 
            tracked_bb_img = frames[i][tracked_bb_ymin - pad:tracked_bb_ymax + pad, tracked_bb_xmin - pad:tracked_bb_xmax + pad]

            formatted_frame = format_yolov5(tracked_bb_img)

            preds = get_preds(formatted_frame, net)

            boxes = unwrap_detections(preds, formatted_frame)

            tracked_bb_poly = Polygon(
                [(tracked_bb[0], tracked_bb[1]), #xmin, ymin
                (tracked_bb[0], tracked_bb[3]), #xmin, ymax
                (tracked_bb[2], tracked_bb[3]), #xmax, ymax
                (tracked_bb[2], tracked_bb[1])] #xmax, ymin
            )

            if len(boxes) == 0:
                if i == 0:
                    cv2.rectangle(frames[i], tracked_bbs[i], (255, 0, 255), 2)
                    tracked_bbs.append(tracked_bbs[i])
                else: 
                    cv2.rectangle(frames[i], tracked_bbs[i-1], (255, 0, 255), 2)
                    tracked_bbs.append(tracked_bbs[i-1])
            else:
                IOU_to_box = {}
                for box in boxes:
                    box[0] = box[0] + tracked_bb_xmin - pad
                    box[1] = box[1] + tracked_bb_ymin - pad 
                    x, y, w ,h = box
                    bb_poly = Polygon([(x, y), (x+w, y), (x+w, y+h), (x, y+h)])
                    IOU = get_IOU(bb_poly, tracked_bb_poly)
                    IOU_to_box[IOU] = box

            max_iou = max(IOU_to_box.keys())
            current_box = IOU_to_box[max_iou]
            if save_video:
                cv2.rectangle(frames[i], current_box, (255, 0, 255), 2)
            tracked_bbs.append([current_box[0], current_box[1], current_box[0] + current_box[2], current_box[1] + current_box[3]])

            if i != 0:
                f.write(f'{i} {tracked_bbs[i][0]} {tracked_bbs[i][1]} {tracked_bbs[i][2]} {tracked_bbs[i][3]}')
                if i != len(frames) - 1:
                    f.write('\n')

        if save_video:
            for i in range(end_frame):
                output_video.write(frames[i])

            output_video.release()

In [17]:
def get_all_predictions(videos_path, out_path, net):
    video_filenames = [videos_path + f for f in os.listdir(videos_path) if f.endswith('.mp4')]
    query_filenames = [videos_path + f for f in os.listdir(videos_path) if f.endswith('.txt')]

    for video_filename, query_filename in zip(video_filenames, query_filenames):
        generate_predictions(query_filename, video_filename, net, out_path, pad=50, save_video=False)

In [18]:
net = cv2.dnn.readNet('assets/yolov5s.onnx')
get_all_predictions('../train/Task2/', '../submission/Task2/', net)