__Import Libraries__

In [83]:
from ultralytics import YOLO
from pathlib import Path
from collections import defaultdict
from pprint import pprint 
from PIL import Image, ImageDraw
import os
import cv2
import torch
import json
import math
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
%run Radar_Clustering_CustomDBScan.py

__Path Definition__

In [84]:

path_to_images = Path(r'C:\Users\hussa\OneDrive\Desktop\Projects\Project\MOT\INFRA-3DRC_scene-22\INFRA-3DRC_scene-22\camera_01\camera_01__data')
path_to_pcd = Path(r'C:\Users\hussa\OneDrive\Desktop\Projects\Project\MOT\INFRA-3DRC_scene-22\INFRA-3DRC_scene-22\radar_01\radar_01__data')

scene_image = sorted(list(image for image in path_to_images.iterdir()))
scene_pcd = sorted(list(image for image in path_to_pcd.iterdir()))

scene_image = scene_image[92:93]
scene_pcd = scene_pcd[92:93]

yolo_model = YOLO(r"C:\Users\hussa\OneDrive\Desktop\Projects\Project\MOT\best.pt")

calibration_file = Path(r"C:\Users\hussa\OneDrive\Desktop\Projects\Project\MOT\infra_3drc_calibration_with_homography\calibration__dk_front_night.json")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


__Function to Process YOLO prediction results__

In [85]:
def class_box_generator_for_pred(prediction_results):
    for result in prediction_results:
        cls = result.boxes.cls.cpu().numpy()
        conf = result.boxes.conf.cpu().numpy()
        detection = result.boxes.xyxy.cpu().numpy()

        list_of_pred_boxes = np.column_stack((cls, detection, conf))
    
    return list_of_pred_boxes

__Import Calibration Matrix__

In [86]:
def get_sensor_calibration_dict(calibration_file):
    sensor_calibration_dict = {
        "camera_intrinsics": [],
        "camera_distcoeffs": [],
        "radar_to_camera": [],
        "radar_to_lidar": [],
        "lidar_to_ground": [],
        "camera_to_ground": []
    }

    with open(calibration_file, 'r') as f:
        data = json.load(f)

    for item in data['calibration']:
        if item['calibration'] == 'camera_01':
            sensor_calibration_dict['camera_intrinsics'] = item['k']
            sensor_calibration_dict['camera_distcoeffs'] = item['D']
        elif item['calibration'] == 'radar_01_to_camera_01':
            sensor_calibration_dict['radar_to_camera'] = item['T']
        elif item['calibration'] == 'radar_01_to_lidar_01':
            sensor_calibration_dict['radar_to_lidar'] = item['T']
        elif item['calibration'] == 'lidar_01_to_ground':
            sensor_calibration_dict['lidar_to_ground'] = item['T']
        elif item['calibration'] == 'camera_01_to_ground_homography':
            sensor_calibration_dict['camera_to_ground'] = item['T']

    return sensor_calibration_dict

__Calibration: Radar to Ground Plane__

In [87]:
def radar_to_ground_transfomer(points_array, T, K):

    n_p_array = np.array(points_array).reshape(1,-1)
    tranposed_array = np.transpose(n_p_array)
   
    row_of_ones = np.ones((1, 1))           #1x1
    stacked_matrix = np.vstack((tranposed_array, row_of_ones))  
  
    radar_to_lidar_matrix = np.matmul(T, stacked_matrix)             #3x1

    new_stacked_matrix = np.vstack((radar_to_lidar_matrix, row_of_ones))             #4x1
    in_ground_data = np.matmul(K, new_stacked_matrix)


    in_ground = np.transpose(in_ground_data)

    return in_ground[0]


__Radar dict: on Ground__

In [88]:
def radar_to_ground(radar_dict, sensor_calibration_dict):
    def transform_point(point, T, K, is_cluster=False):
        updated_centroid = radar_to_ground_transfomer(point[0], T, K)
        updated_velocity = point[-1]
        
        if is_cluster:
            updated_lowest_point = radar_to_ground_transfomer(point[1], T, K)
            return [updated_centroid, updated_lowest_point, updated_velocity]
        return [updated_centroid, updated_velocity]
    
    T = sensor_calibration_dict['radar_to_lidar']
    K = sensor_calibration_dict['lidar_to_ground']

    in_ground = {'clusters': [], 'noise': []}

    for key, points in radar_dict.items():
        is_cluster = key == 'clusters'
        for point in points:
            if point:
                transformed_point = transform_point(point, T, K, is_cluster)
                in_ground.setdefault(key, []).append(transformed_point)

    return in_ground

__Calibration: Radar to Image Plane__

In [89]:
def radar_to_camera_transformer(radar_point, T, k):
   
    n_p_array = np.array(radar_point).reshape(1,-1)
    transpose_RPA = np.transpose(n_p_array)

    new_array = np.vstack([transpose_RPA, np.ones((1, 1))])             
    product_1 = np.matmul(np.array(k), np.array(T))

    product_array = np.matmul(product_1, new_array)                      #[su, sv, s] but along column

    final_array = product_array / product_array [2]                      #[u, v, 1], along column

    u_v = np.delete(final_array, 2, axis = 0)                            #[u, v], along column      
    final_u_v = np.transpose(u_v)

    return final_u_v[0]

__Radar Dict: on Image__

In [90]:
def radar_to_camera(radar_output, sensor_calibration_dict):
    def transform_point(point, T, K, is_cluster=False):
        updated_centroid = radar_to_camera_transformer(point[0], T, K)
        updated_velocity = point[-1]
        
        if is_cluster:
            updated_lowest_point = radar_to_camera_transformer(point[1], T, K)
            return [updated_centroid, updated_lowest_point, updated_velocity]
        return [updated_centroid, updated_velocity]

    T = sensor_calibration_dict['radar_to_camera']
    K = sensor_calibration_dict['camera_intrinsics']

    in_camera = {'clusters': [], 'noise': []}

    for key, points in radar_output.items():
        is_cluster = (key == 'clusters')
        for point in points:
            if point:
                transformed_point = transform_point(point, T, K, is_cluster)
                in_camera[key].append(transformed_point)

    return in_camera

__Homography: Image to Ground__

In [91]:
def homography(points_on_image, sensor_calibration_dict):

    points = np.array(points_on_image).reshape(1, -1)

    transpose_matrix = np.vstack((np.transpose(points),np.ones((1,1))))
    
    homogeneous_coordinates = np.matmul(sensor_calibration_dict['camera_to_ground'], transpose_matrix)
    ground_coordinates = homogeneous_coordinates / homogeneous_coordinates[-1].reshape(1, -1)

    transpose_ground_coordinates = ground_coordinates.T
    g_x1y1 = transpose_ground_coordinates[0][:2]

    return g_x1y1

__Visualization: Camera Points on Ground Plane__

In [92]:
def camera_plotting(image_on_ground, my_plot):
    # Colors with cycling support for large number of points
    colors = ['blue', 'green', 'orange', 'black', 'purple', 'maroon']
    
    # Extract and plot coordinates directly
    for i, xy in enumerate(image_on_ground):
        x_coord = xy[1][0]
        y_coord = xy[1][1]

        # Cycle through colors if more points than colors
        color = colors[i % len(colors)]

        # Plot the point
        my_plot.scatter(y_coord, x_coord, color=color, label='camera', marker='o')

    # Set plot limits and labels
    my_plot.set_xlim(-30, 30)
    my_plot.set_ylim(0, 100)
    my_plot.set_xlabel('Y-axis')
    my_plot.set_ylabel('X-axis')
    my_plot.set_title('Plot of Points')

    # Invert X-axis
    my_plot.invert_xaxis()
    
    return my_plot

__Visualization: Radar points on Ground Plane__

In [93]:
def radar_plotting(radar_dict, my_plot):
    # Initialize lists for plotting
    x_lowest, y_lowest = [], []
    x_noise, y_noise = [], []

    # Process clusters
    for detection in radar_dict['clusters']:
        if detection:
            lowest_point = detection[1]
            x_lowest.append(lowest_point[0])
            y_lowest.append(lowest_point[1])
    
    # Process noise points
    for noise in radar_dict['noise']:
        if noise:
            lowest_point = noise[0]
            x_noise.append(lowest_point[0])
            y_noise.append(lowest_point[1])

    # Plot the points
    my_plot.scatter(y_lowest, x_lowest, color='red', label='Lowest Point', marker="X")
    my_plot.scatter(y_noise, x_noise, color='grey', label='Noise', marker=".")

    # Add legend if not already present
    if not my_plot.get_legend():
        my_plot.legend()

    return my_plot

__Expand Bounding Box Dimension__

In [94]:
def expand_bbox(box, scale=1.1):
    # Calculate the width and height of the original box
    width = box[2] - box[0]     # x2 - x1
    height = box[3] - box[1]    # y2 - y1

    # Calculate the center of the original box
    center_x = box[0] + (width/2)
    center_y = box[1] + (height/2)

    # Calculate the increase in width and height
    new_width = width * scale
    new_height = height * scale

    # Calculate the new coordinates
    new_x1 = 0 if (center_x - new_width / 2) < 0 else (center_x - new_width / 2)
    new_y1 = 0 if (center_y - new_height / 2) < 0 else (center_y - new_height / 2)
    new_x2 = center_x + new_width / 2
    new_y2 = center_y + new_height / 2
    
    return list([new_x1, new_y1, new_x2, new_y2])


__Euclidean Distance__

In [95]:
def get_euclidean_distance(clusters, images):
    d = np.sqrt(((clusters[0] - images[0])**2) + ((clusters[1] - images[1])**2))
    return d


__Case Filtering__

In [96]:
def get_association_matrix(list_of_pred_boxes, cluster_on_image, datatype='clusters', association_list=None):

    # Select the clusters or noise points based on datatype
    clusters = cluster_on_image['clusters'] if datatype == 'clusters' else cluster_on_image['noise']
    pred_boxes = list_of_pred_boxes

    # Return empty matrix if no clusters or pred_boxes exist
    if len(clusters) == 0 or len(pred_boxes) == 0:
        return np.zeros((len(clusters), len(pred_boxes)))

    # Initialize the association matrix
    matrix = np.zeros((len(clusters), len(pred_boxes)))

    # Helper function to check if a cluster centroid falls inside a bounding box
    def is_centroid_in_bbox(centroid, bbox):
        return bbox[0] < centroid[0] < bbox[2] and bbox[1] < centroid[1] < bbox[3]

    for pred_idx, prediction in enumerate(pred_boxes):
        old_bbox = prediction[1:5]
        bbox = expand_bbox(old_bbox, scale=1.2)

        for cluster_idx, cluster in enumerate(clusters):
            cluster_centroid = cluster[0]
            # Determine if we are dealing with clusters or noise
            if datatype == 'clusters' or (datatype == 'noise' and pred_idx in association_list['non_associated_bbox']):
                matrix[cluster_idx, pred_idx] = int(is_centroid_in_bbox(cluster_centroid, bbox))

    return matrix
    

In [97]:
def get_filtered_cases(matrix, datatype='clusters', association_list=None):
    """
    Checks and assigns different cases of radar-image data for spatial association 

    Examples: 
    >>> import pprint
    >>> matrix = np.array([
    ...     [1, 0, 1, 0, 0, 0],
    ...     [0, 1, 1, 0, 0, 0],
    ...     [1, 1, 0, 0, 0, 0],
    ...     [0, 0, 0, 1, 0, 0],
    ...     [0, 0, 0, 0, 1, 0],
    ...     [0, 0, 0, 0, 1, 0],
    ...     [0, 0, 0, 0, 0, 0]
    ... ])
    >>> pprint.pprint(get_associations(matrix))
    {'many_radar_to_many_image': {'cols': [0, 1, 2], 'rows': [0, 1, 2]},
     'many_radar_to_one_image': {'cols': [4], 'rows': [(array([4, 5]),)]},
     'one_radar_to_many_image': {'cols': [], 'rows': []},
     'one_radar_to_one_image': {'cols': [3], 'rows': [3]}}
    """ 
    associations = {
        "many_cluster_to_many_bbox" : {"clusters": [], "bbox": []}, 
        "many_cluster_to_one_bbox"  : {"clusters": [], "bbox": []},
        "one_cluster_to_many_bbox"  : {"clusters": [], "bbox": []}, 
        "one_cluster_to_one_bbox"   : {"assigned": []},
        "unassigned_bbox" : {"bbox": []}
    }

    # MANY TO MANY CHECKS
    # -------------------
    rows_with_multiple_truths = np.where(np.sum(matrix, axis=1) > 1)[0] 
    columns_with_multiple_truths = np.where(np.sum(matrix, axis=0) > 1)[0] 
    # many_too_many = list(set(rows_with_multiple_truths) & set(columns_with_multiple_truths))
    # many_radar_to_many_image = [many_too_many, many_too_many]
    many_too_many_rows = set() 
    many_too_many_cols = set() 
    for r_id in range(matrix.shape[0]):
        for c_id in range(matrix.shape[1]):
            if r_id in rows_with_multiple_truths and c_id in columns_with_multiple_truths: 
                if matrix[r_id, c_id] == 1:
                     many_too_many_rows.add(r_id)
                     many_too_many_cols.add(c_id)

    associations['many_cluster_to_many_bbox']["clusters"] = list(many_too_many_rows)
    associations['many_cluster_to_many_bbox']["bbox"] = list(many_too_many_cols)

    # MANY TO ONE CHECKS 
    # -------------------
    many_to_one = [] 
    for c in range(matrix.shape[1]): 
        if c in columns_with_multiple_truths and c not in many_too_many_cols:
            associated_rows = np.where(matrix[:, c] > 0)[0]
            associations['many_cluster_to_one_bbox']["clusters"].append(associated_rows.tolist())
            associations['many_cluster_to_one_bbox']["bbox"].append([c])            

    # ONE TO MANY CHECKS
    # ------------------
    one_to_many = [] 
    for r in range(matrix.shape[0]): 
        if r in rows_with_multiple_truths and r not in many_too_many_rows:
            associated_cols = np.where(matrix[r] > 0)[0]
            associations['one_cluster_to_many_bbox']["clusters"].append([r])
            associations['one_cluster_to_many_bbox']["bbox"].append(associated_cols.tolist())

    # ONE TO ONE CHECKS
    # ------------------
    for i in range(len(matrix)):
        for j in range(len(matrix[0])):
            if matrix[i,j] == 1:
                row_sum = sum(matrix[i,:])
                col_sum = sum(matrix[:,j]) 

                if row_sum == 1 and col_sum == 1:
                    associations['one_cluster_to_one_bbox']['assigned'].append([i, j]) 

    if datatype == 'clusters':
        associations["unassigned_bbox"]["bbox"].extend(list(np.where(np.sum(matrix, axis=0) == 0)[0]))
    elif datatype == 'noise':
        associations["unassigned_bbox"]["bbox"].extend(box for box in list(np.where(np.sum(matrix, axis=0) == 0)[0]) if box in association_list['non_associated_bbox']) 

    return associations

__Association: One to One @Image Plane__

In [98]:
def get_one_to_one_association(filtered_cases, association_list):
    association_list["associated"].extend(filtered_cases['one_cluster_to_one_bbox']['assigned']) 
    association_list["non_associated_bbox"].extend(filtered_cases["unassigned_bbox"]["bbox"]) 
    return association_list

__Association: One to Many @Ground Plane__

In [99]:
def get_one_to_many_association(filtered_cases, clusters_on_ground, image_on_ground, datatype='clusters', association_list=None):

    if datatype == 'clusters':
        lower_idx = 1
    elif datatype == 'noise':
        lower_idx = 0
    else:
        raise KeyError(f"Unexpected datatype '{datatype}'. Expected 'clusters' or 'noise'.")

    list_of_centroid_indices = filtered_cases['one_cluster_to_many_bbox']['clusters']
    list_of_box_indices = filtered_cases['one_cluster_to_many_bbox']['bbox']

    for centroid, bboxes in zip(list_of_centroid_indices, list_of_box_indices):
        centroid_idx = centroid[0]  # Get the centroid index

        # Compute Euclidean distances between the centroid and each bounding box
        distances = [get_euclidean_distance(clusters_on_ground[datatype][centroid_idx][lower_idx], image_on_ground[box][1]) for box in bboxes]

        # Get the index of the bounding box with the minimum distance
        min_bbox_idx = distances.index(min(distances))

        # Associate the centroid with the closest bounding box
        association_list['associated'].append([centroid_idx, bboxes[min_bbox_idx]])

        # Add non-associated bounding boxes to the list
        association_list['non_associated_bbox'].extend([bbox for i, bbox in enumerate(bboxes) if i != min_bbox_idx])

    return association_list

__Association: Many to One__

In [100]:
def nearest_point_finder(points_list, processed_radar_points_to_ground):
    x_list = []
    for point in points_list:
        x_p = processed_radar_points_to_ground['clusters'][point][1][0]
        x_list.append(x_p)
    
    return x_list.index(min(x_list))

__Association__Many to One/Many__

In [101]:
def data_processer_for_multiple_clusters_case(filtered_cases, clusters_on_ground, mode, datatype='clusters'):
    if mode == 'many_to_many':
        clusters = filtered_cases['many_cluster_to_many_bbox']['clusters'][0]
        
    elif mode == 'many_to_one':
        clusters = filtered_cases['many_cluster_to_one_bbox']['clusters'][0]

    # Initialize proximity matrix
    first_matrix = np.zeros((len(clusters), len(clusters)))

    # Populate proximity matrix based on velocity proximity
    for i, p1 in enumerate(clusters):
        p1_data = clusters_on_ground['clusters'][p1] if datatype == 'clusters' else clusters_on_ground['noise'][p1]
        for j, p2 in enumerate(clusters):
            p2_data = clusters_on_ground['clusters'][p2] if datatype == 'clusters' else clusters_on_ground['noise'][p2]
            if abs(p1_data[2][0] - p2_data[2][0]) < 0.75:
                first_matrix[i][j] = 1

    # Merge candidates based on proximity matrix
    candidates = [[clusters[j] for j, val in enumerate(row) if val == 1] for row in first_matrix]
    candidates = [list(set(candidate)) for candidate in candidates]  # Remove duplicates
    
    # Merge clusters based on common elements
    def merge_candidates(candidates):
        while True:
            merged_any = False
            for i in range(len(candidates)):
                for j in range(i + 1, len(candidates)):
                    if set(candidates[i]) & set(candidates[j]):  # Merge if overlap exists
                        candidates[i] = list(set(candidates[i]) | set(candidates[j]))
                        del candidates[j]
                        merged_any = True
                        break
                if merged_any: break
            if not merged_any: break
        return candidates

    new_candidates = merge_candidates(candidates)

    return clusters, new_candidates

In [119]:
def many_to_one_association(filtered_cases, association_list, clusters_on_ground, image_on_ground, datatype='clusters'):
    
    clusters, new_candidates = data_processer_for_multiple_clusters_case(filtered_cases, clusters_on_ground, mode= 'many_to_one', datatype='clusters' )
    pre_boxes_list = filtered_cases['many_cluster_to_one_bbox']['bbox'][0]

    global_merged_any = any(len(c) > 1 for c in new_candidates)
    box_bottom_center = image_on_ground[pre_boxes_list[0]][1]

    if not global_merged_any:
        # Handle non-merged clusters with Euclidean distance
        distance_data = []
        for cluster in clusters:
            nearest_data = (clusters_on_ground['clusters'][cluster][1] 
                            if datatype == 'clusters' 
                            else clusters_on_ground['noise'][cluster][0])
            ec_distance = get_euclidean_distance(nearest_data[0:2], box_bottom_center)
            distance_data.append(ec_distance)

        # Find the cluster with the minimum distance
        index_of_chosen_centroid = clusters[distance_data.index(min(distance_data))]
        if [index_of_chosen_centroid, pre_boxes_list[0]] not in association_list['associated']:
            association_list['associated'].append([index_of_chosen_centroid, pre_boxes_list[0]])

    elif len(new_candidates) == 1:
            association_list['associated'].append([new_candidates[0], pre_boxes_list[0]])
    else:
        # handling for partially merged

        #This section removes the items which are not clustered
        for pair in new_candidates[:]:
            if len(pair) < 2:
                new_candidates.remove(pair)   

        distance_comparison = []
        for item in new_candidates:
            nearest_point = nearest_point_finder(item, clusters_on_ground)
            nearest_point_data = (clusters_on_ground['clusters'][item[nearest_point]][1] 
                                if datatype == 'clusters' 
                                else clusters_on_ground['noise'][item[nearest_point]][0])
            e_c_distance = get_euclidean_distance(nearest_point_data[0:2], box_bottom_center)
            distance_comparison.append(e_c_distance)
        
        # Handle multiple candidates and select the closest one
        final_cluster_to_associate = distance_comparison.index(min(distance_comparison))
        index_of_chosen_centroid = new_candidates[final_cluster_to_associate]
        if [index_of_chosen_centroid, pre_boxes_list[0]] not in association_list['associated']:
            association_list['associated'].append([index_of_chosen_centroid, pre_boxes_list[0]])
    
    # Handle non-associated boxes
    associated_boxes = set(item[1] for item in association_list['associated'])
    non_associated_boxes = [box for box in pre_boxes_list if box not in associated_boxes]
    association_list['non_associated_bbox'].extend(non_associated_boxes)

    return association_list


In [103]:
def many_to_many_association(filtered_cases, association_list, clusters_on_ground, image_on_ground, datatype='clusters'):
    clusters, new_candidates = data_processer_for_multiple_clusters_case(filtered_cases, clusters_on_ground, mode= 'many_to_many', datatype='clusters' )
    pre_boxes_list = filtered_cases['many_cluster_to_many_bbox']['bbox'][0]
    box_bottom_centers = np.array([image_on_ground[box][1][:2] for box in pre_boxes_list])
    
    global_merged_any = any(len(c) > 1 for c in new_candidates)
    if not global_merged_any:
        
        nearest_points = np.array([clusters_on_ground['clusters'][cluster][1][:2] for cluster in clusters])

        # Compute distances in a vectorized manner
        distance_matrix = np.linalg.norm(nearest_points[:, np.newaxis] - box_bottom_centers[np.newaxis, :], axis=2)

        # Find associations where the distance is below the threshold
        combined_indices = [(i, np.argmin(distance_matrix[i])) for i in range(distance_matrix.shape[0])
                            if np.min(distance_matrix[i]) < 2]
        association_list['associated'].extend([(clusters[i], pre_boxes_list[j]) for i, j in combined_indices])

    # Merging case: calculate distance using merged candidates
    else:
        nearest_points = []
        for merged in new_candidates:
            if len(merged) > 1:
                nearest_point_index = nearest_point_finder(merged, clusters_on_ground)
                nearest_points.append(clusters_on_ground['clusters'][merged[nearest_point_index]][1][:2])
            else:
                nearest_points.append(clusters_on_ground['clusters'][merged[0]][1][:2])

        nearest_points = np.array(nearest_points)

        # Compute distances in a vectorized manner for merged candidates
        distance_matrix = np.linalg.norm(nearest_points[:, np.newaxis] - box_bottom_centers[np.newaxis, :], axis=2)

        # Find associations where the distance is below the threshold
        combined_indices = [(i, np.argmin(distance_matrix[i])) for i in range(distance_matrix.shape[0])
                            if np.min(distance_matrix[i]) < 2]
        association_list['associated'].extend([(new_candidates[i], pre_boxes_list[j]) for i, j in combined_indices])

       # Handle non-associated boxes
    associated_boxes = set(item[1] for item in association_list['associated'])
    non_associated_boxes = [box for box in pre_boxes_list if box not in associated_boxes]
    association_list['non_associated_bbox'].extend(non_associated_boxes)

    return association_list

__Association Visualization @Image Plane__

In [104]:
def draw_drawings(draw, corner_1, corner_2, color, thickness=3, dash_length=10):
    # Draw a dashed rectangle
    for i in range(corner_1[0], corner_2[0], dash_length * 2):
        for t in range(thickness):
            draw.line([(i, corner_1[1] + t), (i + dash_length, corner_1[1] + t)], fill=color)

            draw.line([(i, corner_2[1] + t), (i + dash_length, corner_2[1] + t)], fill=color)

    for i in range(corner_1[1], corner_2[1], dash_length * 2):
        for t in range(thickness):
            draw.line([(corner_1[0] + t, i), (corner_1[0] + t, i + dash_length)], fill=color)

            draw.line([(corner_2[0] + t, i), (corner_2[0] + t, i + dash_length)], fill=color)


def get_drawings(colour_idx, image, box_data, centroid_data, datatype='clusters'):
    colors = [
        (255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0),
        (255, 0, 255), (0, 255, 255), (128, 0, 0), (0, 128, 0),
        (0, 0, 128), (128, 128, 0)
    ]

    # Draw original bounding box
    original_box = list(map(int, box_data[1:5]))
    corner_1, corner_2 = (original_box[0], original_box[1]), (original_box[2], original_box[3])
    color = colors[colour_idx]
    cv2.rectangle(image, corner_1, corner_2, color, 3)

    # Expand the bounding box
    expanded_box = list(map(int, expand_bbox(box_data[1:5], scale=1.2)))
    exp_corner_1, exp_corner_2 = (expanded_box[0], expanded_box[1]), (expanded_box[2], expanded_box[3])

    # Convert to PIL to draw dashed rectangle
    img_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    draw = ImageDraw.Draw(img_pil)
    draw_dashed_rectangle(draw, exp_corner_1, exp_corner_2, color[::-1])  # RGB to BGR

    # Convert back to OpenCV
    image = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)

    # Draw centroid based on datatype
    if datatype == 'clusters':
        centroid_point = tuple(map(int, centroid_data[:2]))
        cv2.circle(image, centroid_point, 15, color, -1)  # Filled circle for clusters

    elif datatype == 'noise':
        centroid_point_1 = tuple(map(int, [centroid_data[0] - 12, centroid_data[1] - 12]))
        centroid_point_2 = tuple(map(int, [centroid_data[0] + 12, centroid_data[1] + 12]))
        cv2.rectangle(image, centroid_point_1, centroid_point_2, color, 3)

    return image


In [105]:
def handle_association(associated_points, box_data, clusters_on_image, image, datatype, colour_idx):
    for associated_point in associated_points:
        if isinstance(associated_point[0], int):
            centroid_data = clusters_on_image[datatype][associated_point[0]][0]
            image = get_drawings(colour_idx, image, box_data, centroid_data, datatype)
        elif isinstance(associated_point[0], list):
            for point in associated_point[0]:
                centroid_data = clusters_on_image[datatype][point][0]
                image = get_drawings(colour_idx, image, box_data, centroid_data, datatype)
    return image, colour_idx + 1


def get_image_visualization(final_association_dict, list_of_pred_boxes, clusters_on_image, img_path):

    # Load the image
    image = cv2.imread(img_path, cv2.IMREAD_COLOR)
    colour_idx = 0

    # Handle cluster associations
    if final_association_dict.get('with_cluster'):
        for associated_point in final_association_dict['with_cluster']:
            box_data = list_of_pred_boxes[associated_point[1]]
            image, colour_idx = handle_association([associated_point], box_data, clusters_on_image, image, datatype='clusters', colour_idx=colour_idx)

    # Handle noise associations
    if final_association_dict.get('with_noise'):
        for associated_point in final_association_dict['with_noise']:
            box_data = list_of_pred_boxes[associated_point[1]]
            image, colour_idx = handle_association([associated_point], box_data, clusters_on_image, image, datatype='noise', colour_idx=colour_idx)

    # Handle unassigned bounding boxes
    if final_association_dict.get('unassigned_bbox'):
        for non_associated_point in final_association_dict['unassigned_bbox']:
            box_data = list_of_pred_boxes[non_associated_point]
            image = get_drawings(colour_idx, image, box_data, None, datatype='unassigned')
            colour_idx += 1

    return image


__Inference__

In [106]:
def process_scene(scene_image, scene_pcd, calibration_file):
    sensor_calibration_dict = get_sensor_calibration_dict(calibration_file)

    for idx, (img, pcd) in enumerate(zip(scene_image, scene_pcd)):
        print(f"Processing Scene {idx}...")

        # YOLO prediction
        results = yolo_model.predict(img)
        list_of_pred_boxes = class_box_generator_for_pred(results)
        pprint(list_of_pred_boxes)

        # Cluster Formation
        clusters_on_radar = my_custom_dbscan(eps1=0.1, eps2=0.250, min_samples=2).process_pcd_files(pcd)
        pprint(clusters_on_radar)

        # Project bounding boxes to the ground plane
        image_on_ground = []
        for result in list_of_pred_boxes:
            cls, bbox = result[0], list(result[1:5])
            bottom_center_point = [(bbox[2] + bbox[0]) / 2, bbox[3]]
            image_point_on_ground = homography(bottom_center_point, sensor_calibration_dict)
            image_on_ground.append([[cls], list(image_point_on_ground)])

        # Radar dictionaries on different planes
        clusters_on_ground = radar_to_ground(clusters_on_radar, sensor_calibration_dict)
        clusters_on_image = radar_to_camera(clusters_on_radar, sensor_calibration_dict)

        # Association matrices
        association_matrix = get_association_matrix(list_of_pred_boxes, clusters_on_image, datatype='clusters')
        association_list = {"associated": [], "non_associated_bbox": []}
        print(association_matrix)

        if association_matrix is not None:
            filtered_cases = get_filtered_cases(association_matrix, datatype='clusters')
            print(filtered_cases)

            # Association steps
            if filtered_cases:  # Check if there are filtered cases

                if filtered_cases:
                    association_list = get_one_to_one_association(filtered_cases, association_list)

                if len(filtered_cases['one_cluster_to_many_bbox']['clusters']):
                    association_list = get_one_to_many_association(
                        filtered_cases, clusters_on_ground, image_on_ground, association_list=association_list, datatype='clusters')
                
                if len(filtered_cases['many_cluster_to_one_bbox']['clusters']) > 0:
                    association_list = many_to_one_association(filtered_cases, association_list, clusters_on_ground, image_on_ground, datatype='clusters')
     
                if len(filtered_cases['many_cluster_to_many_bbox']['clusters']):
                    association_list = many_to_many_association(filtered_cases, association_list, clusters_on_ground, image_on_ground, datatype='clusters')


        # Final associations
        print(f'association_list: {association_list}')
        final_association_dict = {
            'with_cluster': association_list['associated'],
            # 'with_noise': noise_association_list['associated'],
            # 'unassigned_bbox': noise_association_list['non_associated_bbox']
        }

        print(final_association_dict)
        # Uncomment the following lines to visualize the results
        # final_image = get_image_visualization(final_association_dict, list_of_pred_boxes, clusters_on_image, img)
        # cv2.imshow('Image Window', final_image)
        # cv2.waitKey(0)
    
    # cv2.destroyAllWindows()


In [120]:
process_scene(scene_image, scene_pcd, calibration_file)

Processing Scene 0...

image 1/1 C:\Users\hussa\OneDrive\Desktop\Projects\Project\MOT\INFRA-3DRC_scene-22\INFRA-3DRC_scene-22\camera_01\camera_01__data\camera_01__2023-06-02-22-32-29-989.png: 416x640 1 bus, 2880.6ms
Speed: 4.5ms preprocess, 2880.6ms inference, 2.1ms postprocess per image at shape (1, 3, 416, 640)
array([[          4,           0,      193.74,       666.5,      670.41,     0.94762]], dtype=float32)
{'clusters': [[[13.745777130126953, 5.611412048339844, -0.5118474960327148],
               [6.67625093460083, 5.611412048339844, -0.6593458652496338],
               [-9.468095779418945]],
              [[14.996161460876465, 14.07302188873291, -0.8995653986930847],
               [13.802179336547852, 14.07302188873291, -0.02230207808315754],
               [-0.23880572617053986]],
              [[18.8121395111084, 12.777000427246094, 0.8344192504882812],
               [18.167457580566406, 12.777000427246094, 0.7909148335456848],
               [0.1470252424478531]],
       

In [108]:
"""# Final Association List
    final_association_dict = {'with_cluster': [], 'with_noise': [], 'unassigned_bbox': []}
    final_association_dict['with_cluster'].extend(association_list['associated'])
    final_association_dict['with_noise'].extend(noise_association_list['associated'])
    final_association_dict['unassigned_bbox'].extend(noise_association_list['non_associated_bbox'])
    
    if my_image and my_plot:
        my_image.clear()
        my_plot.clear()

    # Plot Ground Plane Points
    camera_plotting(image_on_ground, my_plot)
    radar_plotting(clusters_on_ground, my_plot)

    # Visualize Association on Image Plane
    image_visualize = get_image_visualization(final_association_dict, list_of_pred_boxes, clusters_on_image, img)

    # Convert BGR to RGB for displaying with Matplotlib
    image_rgb = cv2.cvtColor(image_visualize, cv2.COLOR_BGR2RGB)

    # Update the image display
    my_image.imshow(image_rgb)
    my_image.set_title('Image')

    # Refresh the canvas
    fig.canvas.draw()
    fig.canvas.flush_events()

    # Small pause to allow for real-time interaction without freezing
    plt.pause(0.001)

    # Disable interactive mode after the visualization is complete
    plt.ioff()

"""

"# Final Association List\n    final_association_dict = {'with_cluster': [], 'with_noise': [], 'unassigned_bbox': []}\n    final_association_dict['with_cluster'].extend(association_list['associated'])\n    final_association_dict['with_noise'].extend(noise_association_list['associated'])\n    final_association_dict['unassigned_bbox'].extend(noise_association_list['non_associated_bbox'])\n    \n    if my_image and my_plot:\n        my_image.clear()\n        my_plot.clear()\n\n    # Plot Ground Plane Points\n    camera_plotting(image_on_ground, my_plot)\n    radar_plotting(clusters_on_ground, my_plot)\n\n    # Visualize Association on Image Plane\n    image_visualize = get_image_visualization(final_association_dict, list_of_pred_boxes, clusters_on_image, img)\n\n    # Convert BGR to RGB for displaying with Matplotlib\n    image_rgb = cv2.cvtColor(image_visualize, cv2.COLOR_BGR2RGB)\n\n    # Update the image display\n    my_image.imshow(image_rgb)\n    my_image.set_title('Image')\n\n    # 