In [1]:
!pip install -r requirements.txt

!pip install ultralytics
!pip install shapely

!pip uninstall opencv-python
!pip install opencv-python==4.10.0.82

In [1]:
import copy
import cv2 as cv
from shapely.geometry import Point, Polygon
from ultralytics import YOLO

## Project utils elements

In [2]:
def load_image_rgb(img_path):
    img = cv.imread(img_path, cv.IMREAD_COLOR)
    return img

In [3]:
def get_parking_spaces_polygons():
    """
    This function obtains the delimitations of the first 10 parking spaces on the right-hand side
    of the image, as polygons
    """
    
    parking_spaces_polygons = [
        [[1587, 877], [1760, 982], [1763, 1050], [1558, 1036]],
        [[1451, 792], [1585, 879], [1556, 1038], [1400, 918]],
        [[1335, 714], [1450, 790], [1398, 916],  [1269, 823]],
        [[1238, 657], [1335, 716], [1269, 824],  [1164, 745]],
        [[1158, 602], [1240, 654], [1164, 746],  [1077, 679]],
        [[1091, 556], [1158, 602], [1077, 679],  [1002, 626]],
        [[1026, 520], [1086, 557], [999, 625],   [938, 576]],
        [[968, 485],  [1026, 518], [940, 576],   [884, 534]],
        [[919, 454],  [972, 485],  [886, 534],   [835, 499]],
        [[875, 429],  [918, 456],  [834, 499],   [794, 466]],
    ]

    image_height = 1050
    for i in range(len(parking_spaces_polygons)):
        for j in range(len(parking_spaces_polygons[i])):
            parking_spaces_polygons[i][j][1] = image_height - parking_spaces_polygons[i][j][1] # We "invert" the y axis in order to work with the "shapely" library
    
    parking_spaces_polygons = [Polygon(poly_coords) for poly_coords in parking_spaces_polygons]

    return parking_spaces_polygons

In [4]:
parking_spaces_polygons = get_parking_spaces_polygons()

In [5]:
def get_detections_bounding_boxes(img, model):
    """This function is only for tasks 1 and 2"""
    
    bounding_boxes = []
    
    detections_whole_image = yolov9_predict(model, img)

    # We crop the last 5 parking spaces from the initial image so that the model
    # will be able to more easily detect farther away cars
    h_top, h_bottom, w_left, w_right = 329, 679, 710, 1312
    last_5_parking_spaces = img[h_top:h_bottom, w_left:w_right]
    detections_last_5_parking_spaces = yolov9_predict(model, last_5_parking_spaces)
    
    for d in detections_whole_image:
        bounding_box_coords = [int(coord) for coord in d.boxes.xyxy.tolist()[0]]
        bounding_boxes.append(bounding_box_coords)
    for d in detections_last_5_parking_spaces:
        bounding_box_coords = [int(coord) for coord in d.boxes.xyxy.tolist()[0]]

        # We want to translate the coordinates of the bounding boxes from the cropped image to coordinates in the initial image 
        bounding_box_coords[0] = bounding_box_coords[0] + w_left
        bounding_box_coords[1] = bounding_box_coords[1] + h_top
        bounding_box_coords[2] = bounding_box_coords[2] + w_left
        bounding_box_coords[3] = bounding_box_coords[3] + h_top
        
        bounding_boxes.append(bounding_box_coords)

    return bounding_boxes

In [6]:
def get_parking_spaces_statuses(bounding_boxes, parking_spaces_polygons):
    no_parking_spaces = 10
    statuses = [0 for i in range(no_parking_spaces)]

    image_height = 1050
    pixels_to_verify = []
    for box_coords in bounding_boxes:
        box_coords[1], box_coords[3] = image_height - box_coords[1], image_height - box_coords[3] # We "invert" the y axis in order to work with the "shapely" library
        box_center = (int((box_coords[0] + box_coords[2]) / 2), int((box_coords[1] + box_coords[3]) / 2))
        
        pixel = [box_center[0], int((box_center[1] + box_coords[3]) / 2)]
        pixel = Point(pixel)
        pixels_to_verify.append(pixel)

    # We check if a parking space is occupied by verifying if a certain pixel from every detected object's bounding box
    # falls within the parking space's delimitation
    for pixel in pixels_to_verify:
        for i in range(no_parking_spaces):
            if parking_spaces_polygons[i].contains(pixel) or parking_spaces_polygons[i].touches(pixel):
                statuses[i] = 1
    
    return statuses

In [7]:
def get_video_last_frame(video_path):
    vc = cv.VideoCapture(video_path)
    
    last_frame_num = vc.get(cv.CAP_PROP_FRAME_COUNT) - 1
    vc.set(cv.CAP_PROP_POS_FRAMES, last_frame_num)
    
    _, last_frame = vc.read()
    
    return last_frame

## YOLOv9 object detection model

In [8]:
def yolov9_predict(model, img, conf=0.3, verbose=False):
    detections = model.predict(img, classes=[2, 7], conf=conf, device='cuda:0', verbose=verbose)
    detections = detections[0]
    return detections

In [9]:
model = YOLO("yolov9e.pt")

## Task 1

In [10]:
def task1(path_to_task, path_to_task_res, model, parking_spaces_polygons):
    no_contexts, no_scenarios_1, no_scenarios_2 = 15, 3, 4
    
    for i in range(1, no_contexts + 1):
        scenario_zero = 0 if i <= 9 else ''
        if i <= 10:
            no_scenarios = no_scenarios_1
        else:
            no_scenarios = no_scenarios_2
        
        for j in range(1, no_scenarios + 1):
            img_name = f'{scenario_zero}{i}_{j}.jpg'
            img_path = f'{path_to_task}/{img_name}'
            img = load_image_rgb(img_path)
            
            query_file_name = f'{scenario_zero}{i}_{j}_query.txt'
            query_file_path = f'{path_to_task}/{query_file_name}'
            with open(query_file_path) as file:
                lines = file.readlines()
                
                no_parking_spaces, parking_spaces = int(lines[0].rstrip()), []
                for parking_space in lines[1:]:
                    parking_space = int(parking_space.rstrip())
                    parking_spaces.append(parking_space)
            
            detections_bounding_boxes = get_detections_bounding_boxes(img, model)
            all_parking_spaces_statuses = get_parking_spaces_statuses(detections_bounding_boxes, parking_spaces_polygons)
            
            res_txt_name = f'{scenario_zero}{i}_{j}_predicted.txt'
            res_txt_path = f'{path_to_task_res}/{res_txt_name}'
            with open(res_txt_path, 'w') as file:
                file.write(str(no_parking_spaces) + '\n')
                for space in parking_spaces[:-1]:
                    file.write(str(space) + ' ' + str(all_parking_spaces_statuses[space - 1]) + '\n')
                file.write(str(parking_spaces[-1]) + ' ' + str(all_parking_spaces_statuses[parking_spaces[-1] - 1]))

In [11]:
# Leave the paths as they are or modify them accordingly
path_to_task_1 = '../../../train/Task1/'
path_to_task_1_res = 'Task1/'

In [None]:
task1(path_to_task_1, path_to_task_1_res, model, parking_spaces_polygons)

## Task 2

In [13]:
def task2(path_to_task, path_to_task_res, model, parking_spaces_polygons):
    no_contexts = 15
    
    for i in range(1, no_contexts + 1):
        context_zero = 0 if i <= 9 else ''
        
        video_name = f'{context_zero}{i}.mp4'
        video_path = f'{path_to_task}/{video_name}'
        video_last_frame = get_video_last_frame(video_path)
        
        detections_bounding_boxes = get_detections_bounding_boxes(video_last_frame, model)
        all_parking_spaces_statuses = get_parking_spaces_statuses(detections_bounding_boxes, parking_spaces_polygons)
        
        res_txt_name = f'{context_zero}{i}_predicted.txt'
        res_txt_path = f'{path_to_task_res}/{res_txt_name}'
        with open(res_txt_path, 'w') as file:
            for i in range(len(all_parking_spaces_statuses) - 1):
                file.write(str(all_parking_spaces_statuses[i]) + '\n')
            file.write(str(all_parking_spaces_statuses[-1]))

In [14]:
# Leave the paths as they are or modify them accordingly
path_to_task_2 = '../../../train/Task2/'
path_to_task_2_res = 'Task2/'

In [15]:
task2(path_to_task_2, path_to_task_2_res, model, parking_spaces_polygons)

## Task 3

In [16]:
def compute_iou_score(bbox1, bbox2):
    # Intersection area
    intersection_width = min(bbox1[2], bbox2[2]) - max(bbox1[0], bbox2[0]) + 1
    intersection_width = max(intersection_width, 0)
    intersection_height = min(bbox1[3], bbox2[3]) - max(bbox1[1], bbox2[1]) + 1
    intersection_height = max(intersection_height, 0)
    intersection_area = intersection_width * intersection_height

    # Union area
    bbox1_area = (bbox1[2] - bbox1[0] + 1) * (bbox1[3] - bbox1[1] + 1)
    bbox2_area = (bbox1[2] - bbox1[0] + 1) * (bbox1[3] - bbox1[1] + 1)
    union_area = bbox1_area + bbox2_area - intersection_area

    # Computing the IoU
    iou = intersection_area / union_area
    
    return iou

In [17]:
def process_video(video_path, input_txt_path, res_txt_path):
    vc = cv.VideoCapture(video_path)
    
    res_file = open(res_txt_path, 'w')

    # Processing the input file
    input_file = open(input_txt_path)
    lines = input_file.readlines()
    initial_frame_info = lines[1].rstrip().split()
    prev_bbox = [int(initial_frame_info[1]), int(initial_frame_info[2]), int(initial_frame_info[3]), int(initial_frame_info[4])]
    prev_bbox_center = (int((prev_bbox[0] + prev_bbox[2]) / 2), int((prev_bbox[1] + prev_bbox[3]) / 2))
    res_file.write(lines[0])
    res_file.write(lines[1].rstrip() + '\n')
    input_file.close()

    _, _ = vc.read() # We "throw away" the first frame, as we already have the info about it that we need, in the input file
    current_frame = 0
    
    while vc.isOpened():
        ret, frame = vc.read()
        
        if ret:
            current_frame += 1
    
            detections = yolov9_predict(model, frame, conf=0.1)
            bboxes = [[int(coord) for coord in bbox] for bbox in detections.boxes.xyxy.tolist()]

            # Our sought bounding box and the previous bounding box should have the biggest IoU score out of all of the detected bounding boxes
            max_iou_score, selected_bbox, bbox_updated = 0, None, False
            for bbox in bboxes:
                iou_score = compute_iou_score(prev_bbox, bbox)
                if iou_score > max_iou_score:
                    max_iou_score = iou_score
                    selected_bbox = bbox.copy()
            if max_iou_score > 0.45: # For when the algorithm detects the real bounding box. Otherwise we consider that it selected the bounding box of a different object
                current_bbox = selected_bbox.copy()
                bbox_updated = True
            
            if bbox_updated:
                res_file.write(f'{current_frame} {current_bbox[0]} {current_bbox[1]} {current_bbox[2]} {current_bbox[3]}\n')
                prev_bbox = current_bbox.copy()
        else:
            break
    
    res_file.close()
    
    vc.release()

In [31]:
def task3(path_to_task, path_to_task_res, model):
    no_contexts = 15

    for i in range(1, no_contexts + 1):
        context_zero = 0 if i <= 9 else ''
        
        video_name = f'{context_zero}{i}.mp4'
        video_path = f'{path_to_task}/{video_name}'
        input_txt_name = f'{context_zero}{i}.txt'
        input_txt_path = f'{path_to_task}/{input_txt_name}'
        
        res_txt_name = f'{context_zero}{i}_predicted.txt'
        res_txt_path = f'{path_to_task_res}/{res_txt_name}'

        process_video(video_path, input_txt_path, res_txt_path)

In [33]:
# Leave the paths as they are or modify them accordingly
path_to_task_3 = '../../../train/Task3/'
path_to_task_3_res = 'Task3/'

In [34]:
task3(path_to_task_3, path_to_task_3_res, model)

## Task 4

In [35]:
def get_right_lane_polygons():
    """This function obtains the entire right lane and the beginning of the queuing area as polygons"""
    
    entire_lane_polygon = [[405, 218], [481, 221], [1548, 1049], [1293, 1049]]
    starting_queue_area_polygon = [[405, 218], [481, 221], [559, 284], [459, 286]]

    image_height = 1050
    for i in range(len(entire_lane_polygon)):
        # We "invert" the y axis in order to work with the "shapely" library
        entire_lane_polygon[i][1] = image_height - entire_lane_polygon[i][1]
        starting_queue_area_polygon[i][1] = image_height - starting_queue_area_polygon[i][1]
    
    entire_lane_polygon = Polygon(entire_lane_polygon)
    starting_queue_area_polygon = Polygon(starting_queue_area_polygon)

    return (entire_lane_polygon, starting_queue_area_polygon)

In [36]:
right_lane_polygons = get_right_lane_polygons()

In [37]:
def get_all_detections(model, frame):
    # We will crop our image in order for the YOLO model to have a better chance at detecting farther away cars
    h_top_1, h_bottom_1, w_left_1, w_right_1 = 191, 320, 390, 587
    lane_cropping_1 = frame[h_top_1:h_bottom_1, w_left_1:w_right_1]
    detections_lane_cropping_1 = yolov9_predict(model, lane_cropping_1, conf=0.15)
    h_top_2, h_bottom_2, w_left_2, w_right_2 = 270, 450, 478, 753
    lane_cropping_2 = frame[h_top_2:h_bottom_2, w_left_2:w_right_2]
    detections_lane_cropping_2 = yolov9_predict(model, lane_cropping_2, conf=0.15)
    h_top_3, h_bottom_3, w_left_3, w_right_3 = 360, 640, 600, 1045
    lane_cropping_3 = frame[h_top_3:h_bottom_3, w_left_3:w_right_3]
    detections_lane_cropping_3 = yolov9_predict(model, lane_cropping_3, conf=0.15)
    
    detections_whole_image = yolov9_predict(model, frame)

    bounding_boxes = []

    for d in detections_lane_cropping_1:
        bounding_box_coords = [int(coord) for coord in d.boxes.xyxy.tolist()[0]]

        # We want to translate the coordinates of the bounding boxes from the cropped image to coordinates in the initial image 
        bounding_box_coords[0] = bounding_box_coords[0] + w_left_1
        bounding_box_coords[1] = bounding_box_coords[1] + h_top_1
        bounding_box_coords[2] = bounding_box_coords[2] + w_left_1
        bounding_box_coords[3] = bounding_box_coords[3] + h_top_1
        
        bounding_boxes.append(bounding_box_coords)
    for d in detections_lane_cropping_2:
        bounding_box_coords = [int(coord) for coord in d.boxes.xyxy.tolist()[0]]

        # We want to translate the coordinates of the bounding boxes from the cropped image to coordinates in the initial image 
        bounding_box_coords[0] = bounding_box_coords[0] + w_left_2
        bounding_box_coords[1] = bounding_box_coords[1] + h_top_2
        bounding_box_coords[2] = bounding_box_coords[2] + w_left_2
        bounding_box_coords[3] = bounding_box_coords[3] + h_top_2
        
        bounding_boxes.append(bounding_box_coords)
    for d in detections_lane_cropping_3:
        bounding_box_coords = [int(coord) for coord in d.boxes.xyxy.tolist()[0]]

        # We want to translate the coordinates of the bounding boxes from the cropped image to coordinates in the initial image 
        bounding_box_coords[0] = bounding_box_coords[0] + w_left_3
        bounding_box_coords[1] = bounding_box_coords[1] + h_top_3
        bounding_box_coords[2] = bounding_box_coords[2] + w_left_3
        bounding_box_coords[3] = bounding_box_coords[3] + h_top_3
        
        bounding_boxes.append(bounding_box_coords)
    for d in detections_whole_image:
        bounding_box_coords = [int(coord) for coord in d.boxes.xyxy.tolist()[0]]
        bounding_boxes.append(bounding_box_coords)

    return bounding_boxes

In [38]:
def get_intersection_area(bbox1, bbox2):
    intersection_width = min(bbox1[2], bbox2[2]) - max(bbox1[0], bbox2[0])
    intersection_width = max(intersection_width, 0)
    intersection_height = min(bbox1[3], bbox2[3]) - max(bbox1[1], bbox2[1])
    intersection_height = max(intersection_height, 0)
    
    intersection_area = intersection_width * intersection_height
    return intersection_area

In [39]:
def remove_duplicate_detections(lane_detections):
    """
    Due to the function get_all_detections (more specifically, due to the cropping done by it), the YOLO model might
    have detected certain cars more than one time. As such, we seek to remove duplicate detections
    """
    
    removed_detection = True
    while removed_detection:
        removed_detection = False
        for i in range(len(lane_detections)):
            width, height = lane_detections[i][2] - lane_detections[i][0], lane_detections[i][3] - lane_detections[i][1]
            detection_area = width * height

            # We check how much of the surface from bounding box i is overlapping with the surface 
            # of any other bounding box j and retain the maximum percentage of this value with report
            # to the surface of bounding box i, after which if the percentage is greater than 0.75 (we
            # truly have a duplicate bounding box, not the detections of two different cars intersecting),
            # we treat bounding box i as a duplicate and remove it. If it is not greater than 0.75, we
            # keep the bounding box i
            max_overlapping_percentage = 0
            for j in range(len(lane_detections)):
                if j != i:
                    intersection_area = get_intersection_area(lane_detections[i], lane_detections[j])
                    overlapping_percentage = intersection_area / detection_area
                    
                    if overlapping_percentage > max_overlapping_percentage:
                        max_overlapping_percentage = overlapping_percentage

            if max_overlapping_percentage > 0.75:
                lane_detections.pop(i)
                removed_detection = True
                break

    return lane_detections

In [40]:
def get_lane_detections(model, frame, entire_lane_polygon):
    """This function obtains the detections of all cars situated in the right lane of the image"""
    
    detections = get_all_detections(model, frame)
    
    image_height = 1050
    # We "invert" the y axis of the detections in order to work with the "shapely" library
    inverted_detections = [[d[0], image_height - d[1], d[2], image_height - d[3]] for d in detections]
    
    lane_detections = []
    for i in range(len(inverted_detections)):
        detection_center = [int((inverted_detections[i][0] + inverted_detections[i][2]) / 2), int((inverted_detections[i][1] + inverted_detections[i][3]) / 2)]
        detection_center = Point(detection_center)
        if entire_lane_polygon.contains(detection_center) or entire_lane_polygon.touches(detection_center):
            lane_detections.append(detections[i])

    lane_detections = remove_duplicate_detections(lane_detections)
    # We want to obtain the detected cars from top to bottom order in their lane
    lane_detections.sort(key=lambda index : index[1])

    return lane_detections

In [41]:
def get_car_count_queuing(video_path, model, right_lane_polygons):
    entire_lane_polygon, starting_queue_area_polygon = right_lane_polygons[0], right_lane_polygons[1]
    
    frame = get_video_last_frame(video_path)
    lane_detections = get_lane_detections(model, frame, entire_lane_polygon)

    cars_queuing = 0

    if len(lane_detections) > 0:
        image_height = 1050
        center_first_detection = [int((lane_detections[0][0] + lane_detections[0][2]) / 2), int((lane_detections[0][1] + lane_detections[0][3]) / 2)]
        center_first_detection[1] = image_height - center_first_detection[1] # We "invert" the y axis in order to work with the "shapely" library
        center_first_detection = Point(center_first_detection)
        
        # If there is no car at the beginning area of the queue, then we treat as there being no cars queuing
        if starting_queue_area_polygon.contains(center_first_detection) or starting_queue_area_polygon.touches(center_first_detection):
            cars_queuing += 1

            # We check if cars are queuing by comparing their coordinates
            for i in range(1, len(lane_detections)):
                if lane_detections[i][1] <= lane_detections[i - 1][3]:
                    cars_queuing += 1
                else:
                    break
    
    return cars_queuing

In [42]:
def task4(path_to_task, path_to_task_res, model, right_lane_polygons):
    no_contexts = 15

    for i in range(1, no_contexts + 1):
        context_zero = 0 if i <= 9 else ''
        
        video_name = f'{context_zero}{i}.mp4'
        video_path = f'{path_to_task}/{video_name}'

        car_count_queuing = get_car_count_queuing(video_path, model, right_lane_polygons)
        
        res_txt_name = f'{context_zero}{i}_predicted.txt'
        res_txt_path = f'{path_to_task_res}/{res_txt_name}'

        with open(res_txt_path, 'w') as file:
            file.write(str(car_count_queuing))

In [43]:
# Leave the paths as they are or modify them accordingly
path_to_task_4 = '../../../train/Task4/'
path_to_task_4_res = 'Task4/'

In [44]:
task4(path_to_task_4, path_to_task_4_res, model, right_lane_polygons)