__Import Libraries__

In [94]:
from ultralytics import YOLO
from pathlib import Path
from collections import defaultdict
from hungarian import * 
from pprint import pprint 
import torch
import json
import math
import numpy as np
import matplotlib.pyplot as plt
%run Radar_Clustering_CustomDBScan.py

__Path Definition__

In [95]:
path_to_images = Path(r'C:\Dk\Projects\Team Project\Dataset\INFRA-3DRC-Dataset\INFRA-3DRC_scene-15\camera_01\camera_01__data')
path_to_pcd = Path(r'C:\Dk\Projects\Team Project\Dataset\INFRA-3DRC-Dataset\INFRA-3DRC_scene-15\radar_01\radar_01__data')

image_list = sorted(list(image for image in path_to_images.iterdir()))
pcd_list = sorted(list(image for image in path_to_pcd.iterdir()))

yolo_model = YOLO(r"C:\Dk\Projects\Team Project\YOLO detection\Models\Harshit_Large\large_300 epoch_batch 4_augmented\train32\weights\best.pt")
radar_cluster = r'C:\Dk\Projects\Team Project\Data Association\radar_camera_fusion.py'

calibration_file = Path(r"C:\Dk\Projects\Team Project\Dataset\INFRA-3DRC-Dataset\INFRA-3DRC_scene-15\calibration.json")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


__Function to Process YOLO prediction results__

In [96]:
def class_box_generator_for_pred(prediction_results):
    for result in prediction_results:
        cls = result.boxes.cls.cpu().numpy()
        conf = result.boxes.conf.cpu().numpy()
        detection = result.boxes.xyxy.cpu().numpy()

        list_of_pred_boxes = np.column_stack((cls, detection, conf))
    
    return list_of_pred_boxes

__Import Calibration Matrix__

In [97]:
sensor_calibration_dict = {
    "camera_intrinsics": [],
    "camera_distcoeffs": [],
    "radar_to_camera": [],
    "radar_to_lidar": [],
    "lidar_to_ground": [],
    "camera_to_ground": []
}

with open(calibration_file, 'r') as f:
    data = json.load(f)

for item in data['calibration']:
    if item['calibration'] == 'camera_01':
        sensor_calibration_dict['camera_intrinsics'] = item['k']
        sensor_calibration_dict['camera_distcoeffs'] = item['D']
    elif item['calibration'] == 'radar_01_to_camera_01':
        sensor_calibration_dict['radar_to_camera'] = item['T']
    elif item['calibration'] == 'radar_01_to_lidar_01':
        sensor_calibration_dict['radar_to_lidar'] = item['T']
    elif item['calibration'] == 'lidar_01_to_ground':
        sensor_calibration_dict['lidar_to_ground'] = item['T']
    elif item['calibration'] == 'camera_01_to_ground_homography':
        sensor_calibration_dict['camera_to_ground'] = item['T']


__Calibration: Radar to Ground Plane__

In [98]:
def radar_to_ground_transfomer(points_array, T, K):

    n_p_array = np.array(points_array).reshape(1,-1)
    tranposed_array = np.transpose(n_p_array)
   
    row_of_ones = np.ones((1, 1))           #1x1
    stacked_matrix = np.vstack((tranposed_array, row_of_ones))  
  
    radar_to_lidar_matrix = np.matmul(T, stacked_matrix)             #3x1

    new_stacked_matrix = np.vstack((radar_to_lidar_matrix, row_of_ones))             #4x1
    in_ground_data = np.matmul(K, new_stacked_matrix)


    in_ground = np.transpose(in_ground_data)

    return in_ground[0]


__Radar dict: on Ground__

In [99]:
def radar_to_ground(radar_dict):
    
    T = sensor_calibration_dict['radar_to_lidar']
    K = sensor_calibration_dict['lidar_to_ground']

    in_radar = radar_dict
    in_ground = {'cluster': [], 'noise': []}
    for key, value in in_radar.items():
        if key == 'cluster':
            for point in value:
                if point:
                    updated_centroid = radar_to_ground_transfomer(point[0], T, K)
                    updated_lowest_point = radar_to_ground_transfomer(point[1], T, K)
                    updated_velocity = point[2]
                    updated_point = [list(updated_centroid), list(updated_lowest_point), list(updated_velocity)]

                    if key in in_ground:
                        in_ground[key].append(updated_point)
                    else:
                        print('no key exist')
        else:
            for point in value:
                if point:
                    updated_centroid = radar_to_ground_transfomer(point[0], T, K)
                    updated_velocity = [point[1]]
                    updated_point = [list(updated_centroid), list(updated_velocity)]

                    if key in in_ground:
                        in_ground[key].append(updated_point)
                    else:
                        print('no key exist')
                    
    return in_ground

__Calibration: Radar to Image Plane__

In [100]:
def radar_to_camera_transformer(radar_point, T, k):
   
    n_p_array = np.array(radar_point).reshape(1,-1)
    transpose_RPA = np.transpose(n_p_array)

    new_array = np.vstack([transpose_RPA, np.ones((1, 1))])             
    product_1 = np.matmul(np.array(k), np.array(T))

    product_array = np.matmul(product_1, new_array)                      #[su, sv, s] but along column

    final_array = product_array / product_array [2]                      #[u, v, 1], along column

    u_v = np.delete(final_array, 2, axis = 0)                            #[u, v], along column      
    final_u_v = np.transpose(u_v)

    return final_u_v[0]

__Radar Dict: on Image__

In [101]:
def radar_to_camera(radar_output):
    T =  sensor_calibration_dict['radar_to_camera']
    K = sensor_calibration_dict['camera_intrinsics']
    
    in_radar = radar_output
    in_camera = {'cluster': [], 'noise': []}
    for key, value in in_radar.items():
        if key == 'cluster':
            for point in value:
                if point:
                    updated_centroid = radar_to_camera_transformer(point[0], T, K)
                    updated_lowest_point = radar_to_camera_transformer(point[1], T, K)
                    updated_velocity = point[2]
                    updated_point = [list(updated_centroid), list(updated_lowest_point), list(updated_velocity)]

                    if key in in_camera:
                        in_camera[key].append(updated_point)
                    else:
                        print('no key exist')
        else:
            for point in value:
                if point:
                    updated_centroid = updated_centroid = radar_to_camera_transformer(point[0], T, K)
                    updated_velocity = [point[1]]
                    updated_point = [list(updated_centroid), list(updated_velocity)]

                    if key in in_camera:
                        in_camera[key].append(updated_point)
                    else:
                        print('no key exist')
    
    return in_camera

__Homography: Image to Ground__

In [102]:
'''
def homography2(list_of_pred_boxes):
    ground_coordinate_list = []
    for result in list_of_pred_boxes:
        bbox = list(result[1:5])
    
        # x1y1 = np.array(bbox[:2]).reshape(1, -1)
        # x2y2 = np.array(bbox[2:]).reshape(1, -1)
        bottom_center_point = np.array(list(((bbox[2] + bbox[0]) / 2, bbox[3]))).reshape(1, -1) 

        # image_coordinates = np.concatenate((x1y1,x2y2,bottom_center_point), axis=0)
        # image_coordinates = np.concatenate((bottom_center_point), axis=0)
        transpose_matrix = np.vstack((np.transpose(bottom_center_point),np.ones((1,1))))
        
        homogeneous_coordinates = np.matmul(sensor_calibration_dict['camera_to_ground'], transpose_matrix)
        ground_coordinates = homogeneous_coordinates / homogeneous_coordinates[-1].reshape(1, -1)

        transpose_ground_coordinates = ground_coordinates.T
        g_x1y1 = transpose_ground_coordinates[0][:2]
        # g_x2y2 = transpose_ground_coordinates[1][:2]
        # g_xcyc = transpose_ground_coordinates[2][:2]

        # ground_coordinate_list.append([list(g_x1y1), list(g_x2y2), list(g_xcyc)])
        ground_coordinate_list.append([list(g_x1y1)])

    # print(ground_coordinate_list)
    return ground_coordinate_list

'''

"\ndef homography2(list_of_pred_boxes):\n    ground_coordinate_list = []\n    for result in list_of_pred_boxes:\n        bbox = list(result[1:5])\n    \n        # x1y1 = np.array(bbox[:2]).reshape(1, -1)\n        # x2y2 = np.array(bbox[2:]).reshape(1, -1)\n        bottom_center_point = np.array(list(((bbox[2] + bbox[0]) / 2, bbox[3]))).reshape(1, -1) \n\n        # image_coordinates = np.concatenate((x1y1,x2y2,bottom_center_point), axis=0)\n        # image_coordinates = np.concatenate((bottom_center_point), axis=0)\n        transpose_matrix = np.vstack((np.transpose(bottom_center_point),np.ones((1,1))))\n        \n        homogeneous_coordinates = np.matmul(sensor_calibration_dict['camera_to_ground'], transpose_matrix)\n        ground_coordinates = homogeneous_coordinates / homogeneous_coordinates[-1].reshape(1, -1)\n\n        transpose_ground_coordinates = ground_coordinates.T\n        g_x1y1 = transpose_ground_coordinates[0][:2]\n        # g_x2y2 = transpose_ground_coordinates[1][:2]\

In [103]:
def homography(points_on_image):

    points = np.array(points_on_image).reshape(1, -1)

    transpose_matrix = np.vstack((np.transpose(points),np.ones((1,1))))
    
    homogeneous_coordinates = np.matmul(sensor_calibration_dict['camera_to_ground'], transpose_matrix)
    ground_coordinates = homogeneous_coordinates / homogeneous_coordinates[-1].reshape(1, -1)

    transpose_ground_coordinates = ground_coordinates.T
    g_x1y1 = transpose_ground_coordinates[0][:2]

    return g_x1y1

__Visualization: Camera Points on Ground Plane__

In [104]:
def camera_plotting(image_on_ground, my_plot):
        x_plotting_list = []
        y_plotting_list = []

        for points in image_on_ground:
                x_coords = [point[0] for point in points]
                y_coords = [point[1] for point in points]

                x_plotting_list.append(x_coords)
                y_plotting_list.append(y_coords)
        
        colors = ['blue', 'green', 'orange', 'black', 'purple', 'maroon']

        for i, (x_co, y_co) in enumerate(zip(x_plotting_list, y_plotting_list)):
                my_plot.scatter(y_co, x_co, color=colors[i], label= 'camera')

        my_plot.set_xlim(-30,30)
        my_plot.set_ylim(0,100)
        my_plot.set_xlabel('Y-axis')
        my_plot.set_ylabel('X-axis')
        my_plot.set_title('Plot of Points')
        
        return my_plot

__Visualization: Radar points on Ground Plane__

In [105]:
def radar_plotting(dict, my_plot):
    clusters = dict['cluster']
    noise_points = dict['noise']

    x_lowest = []
    y_lowest = []

    x_noise = []
    y_noise = []

    for detection in clusters:
        if len(detection) != 0:
            lowest_point = detection[1]
            x_lp = lowest_point[0]
            y_lp = lowest_point[1]
            x_lowest.append(x_lp)
            y_lowest.append(y_lp)
    
    for noise in noise_points:
        if len(noise) != 0:
            lowest_point = noise[0]
            x_n = lowest_point[0]
            y_n = lowest_point[1]
            x_noise.append(x_n)
            y_noise.append(y_n)


    my_plot.scatter(y_lowest, x_lowest, color='red', label='lowest point')
    my_plot.scatter(y_noise, x_noise, color='grey', label='Noise')

    if not my_plot.get_legend():
        my_plot.legend()

    return my_plot

__Association: One to One__

In [106]:
def get_one_one_association(list_of_pred_boxes, cluster_on_image):
    
    clusters = list(cluster_on_image['cluster'])
    noise_points = list(cluster_on_image['noise'])
    pred_boxes = list(list_of_pred_boxes)
    return_var = 0

    association = {'associated': [], 'non_associated':{'YOLO':[], 'Radar':[]}}

    if len(clusters) > 0 and len(pred_boxes)>0:
        matrix = np.zeros((len(clusters), len(pred_boxes))) 
        for pred_idx, prediction in enumerate(pred_boxes):
            bbox = prediction[1:5]  
            for cluster_idx, cluster in enumerate(clusters):
                cluster_centroid = cluster[0]
                
                if bbox[0] < cluster_centroid[0] < bbox[2] and bbox[1] < cluster_centroid[1] < bbox[3]:
                    matrix[cluster_idx, pred_idx] = 1

                else: 
                    matrix[cluster_idx, pred_idx] = 0
        
        # pprint(matrix)

        special_points = []
        for i in range(len(matrix)):
            for j in range(len(matrix[0])):
                if matrix[i,j] == 1:
                    row_sum = sum(matrix[i,:])
                    col_sum = sum(matrix[:,j]) 

                    if row_sum == 1 and col_sum == 1:
                        special_points.append((i, j))
            
        # print(special_points)

        for item in special_points:
            association['associated'].append([pred_boxes[item[1]],clusters[item[0]]])

        for i in range(matrix.shape[0]):
            if not any(i == item[0] for item in special_points):
                association["non_associated"]["Radar"].append(clusters[i])
            
        for j in range(matrix.shape[1]):
            if not any(j == item[1] for item in special_points):
                association["non_associated"]["YOLO"].append(pred_boxes[j])
            
                    
    return association
    

__Euclidean Distance__

In [107]:
'''
def get_euclidean_distance(clusters, images):
    d = np.sqrt(((clusters[0] - images[0])**2) + ((clusters[1] - images[1])**2))
    return d
'''    

'\ndef get_euclidean_distance(clusters, images):\n    d = np.sqrt(((clusters[0] - images[0])**2) + ((clusters[1] - images[1])**2))\n    return d\n'

__Make Association Matrix__

In [108]:
'''
def get_association_matrix(image_points_on_ground, radar_points):
    clusters = radar_points['cluster']
    noise_points = radar_points['noise']
    
    cluster_nearest_points = []
    noise_nearest_points = []

    for detection in clusters: 
        if len(detection) != 0:
            lowest_point = detection[1]
            x_lp = lowest_point[0]
            y_lp = lowest_point[1]
            cluster_nearest_points.append([list(np.array([x_lp, y_lp]))]) 
    
    for noise in noise_points: 
        if len(noise) != 0:
            lowest_point = noise[0]
            x_n = lowest_point[0]
            y_n = lowest_point[1]
            noise_nearest_points.append([list(np.array([x_n, y_n]))])     
    

    total_list = cluster_nearest_points + noise_nearest_points

    association_matrix = np.zeros((len(total_list),len(image_points_on_ground)))

    for cluster_idx, cluster_point in enumerate(total_list):
        for img_idx, img_point in enumerate(image_points_on_ground):
            association_matrix[cluster_idx, img_idx] = get_euclidean_distance(cluster_point[0], img_point[0])

    
    return association_matrix   

'''

"\ndef get_association_matrix(image_points_on_ground, radar_points):\n    clusters = radar_points['cluster']\n    noise_points = radar_points['noise']\n    \n    cluster_nearest_points = []\n    noise_nearest_points = []\n\n    for detection in clusters: \n        if len(detection) != 0:\n            lowest_point = detection[1]\n            x_lp = lowest_point[0]\n            y_lp = lowest_point[1]\n            cluster_nearest_points.append([list(np.array([x_lp, y_lp]))]) \n    \n    for noise in noise_points: \n        if len(noise) != 0:\n            lowest_point = noise[0]\n            x_n = lowest_point[0]\n            y_n = lowest_point[1]\n            noise_nearest_points.append([list(np.array([x_n, y_n]))])     \n    \n\n    total_list = cluster_nearest_points + noise_nearest_points\n\n    association_matrix = np.zeros((len(total_list),len(image_points_on_ground)))\n\n    for cluster_idx, cluster_point in enumerate(total_list):\n        for img_idx, img_point in enumerate(image_

__Greedy Object Association__

In [109]:
'''
def assign_objects(_matrix):
    
    for i_dx in range(_matrix.shape[0]):
        _matrix[i_dx,][_matrix[i_dx,] != np.max(_matrix[i_dx,])] = 0

    for i_gx in range(_matrix.shape[1]):
        _matrix[:, i_gx][_matrix[:,i_gx] != np.max(_matrix[:,i_gx])] = 0 

    return _matrix 
'''


'\ndef assign_objects(_matrix):\n    \n    for i_dx in range(_matrix.shape[0]):\n        _matrix[i_dx,][_matrix[i_dx,] != np.max(_matrix[i_dx,])] = 0\n\n    for i_gx in range(_matrix.shape[1]):\n        _matrix[:, i_gx][_matrix[:,i_gx] != np.max(_matrix[:,i_gx])] = 0 \n\n    return _matrix \n'

__Update Dict: Image Plane to Ground Plane__

In [110]:
def update_dict_image_ground(dictionary_after_1t1):
    _to_ground_dict = {'associated': [], 'non_associated': {'YOLO': [], 'Radar': []}}
    
    for result in dictionary_after_1t1['non_associated']['YOLO']:
        cls = result[0]
        bbox = list(result[1:5])
        bottom_center_point = list(((bbox[2] + bbox[0]) / 2, bbox[3]))
        image_point_on_ground = list(homography(bottom_center_point))
        _to_ground_dict['non_associated']['YOLO'].append([[cls], image_point_on_ground])


    for cluster in dictionary_after_1t1['non_associated']['Radar']:
        centroid = cluster[0]
        lower = cluster[1]
        velocity = cluster[2]
        _centroid = list(homography(centroid))
        _lower = list(homography(lower))
        _to_ground_dict['non_associated']['Radar'].append([_centroid, _lower, velocity])


    for item in dictionary_after_1t1['associated']:
        cls = item[0][0]
        radar_data = item[1]
        centroid = radar_data[0]
        velocity = radar_data[2]
        _centroid = list(homography(centroid))
        _to_ground_dict['associated'].append([[cls], _centroid, velocity])

    return _to_ground_dict


__Inference__

In [111]:
fig, my_plot = plt.subplots()
plt.ion()

for img, pcd in zip(image_list, pcd_list):

    # YOLO prediction
    results = yolo_model.predict(img)
    list_of_pred_boxes = class_box_generator_for_pred(results)
    
    # Cluster Formation
    db_scan = my_custom_dbscan(eps1=0.1, eps2=0.5, min_samples=2)
    clusters_on_radar = db_scan.process_pcd_files(pcd)

    # Bbox point on the Ground Plane
    image_on_ground = []
    for result in list_of_pred_boxes:
        bbox = list(result[1:5])
        bottom_center_point = list(((bbox[2] + bbox[0]) / 2, bbox[3]))
        image_point_on_ground = homography(bottom_center_point)
        image_on_ground.append([list(image_point_on_ground)])

    # Radar Dictionaries on different Planes
    clusters_on_ground = radar_to_ground(clusters_on_radar)
    clusters_on_image = radar_to_camera(clusters_on_radar)
    
    # Visualize Poitns on Ground Plane
    my_plot.clear()
    plot = camera_plotting(image_on_ground, my_plot)
    plot2 = radar_plotting(clusters_on_ground, plot)
    fig.canvas.draw()
    plt.pause(0.1)
    
    # One to One Association on Image Plane
    data_association = get_one_one_association(list_of_pred_boxes, clusters_on_image) 
    pprint(data_association)

    # Update Dictionary from Image Plane to Ground Plane
    data_association = update_dict_image_ground(data_association)
    pprint(data_association)
    

plt.ioff()



    


image 1/1 C:\Dk\Projects\Team Project\Dataset\INFRA-3DRC-Dataset\INFRA-3DRC_scene-15\camera_01\camera_01__data\camera_01__2023-06-02-21-28-09-314.png: 416x640 1 car, 64.0ms
Speed: 3.0ms preprocess, 64.0ms inference, 2.0ms postprocess per image at shape (1, 3, 416, 640)
{'associated': [], 'non_associated': {'Radar': [], 'YOLO': []}}
{'associated': [], 'non_associated': {'Radar': [], 'YOLO': []}}

image 1/1 C:\Dk\Projects\Team Project\Dataset\INFRA-3DRC-Dataset\INFRA-3DRC_scene-15\camera_01\camera_01__data\camera_01__2023-06-02-21-28-09-381.png: 416x640 1 car, 63.0ms
Speed: 2.0ms preprocess, 63.0ms inference, 2.0ms postprocess per image at shape (1, 3, 416, 640)
{'associated': [], 'non_associated': {'Radar': [], 'YOLO': []}}
{'associated': [], 'non_associated': {'Radar': [], 'YOLO': []}}

image 1/1 C:\Dk\Projects\Team Project\Dataset\INFRA-3DRC-Dataset\INFRA-3DRC_scene-15\camera_01\camera_01__data\camera_01__2023-06-02-21-28-09-481.png: 416x640 1 car, 63.0ms
Speed: 2.0ms preprocess, 63.

KeyboardInterrupt: 