In [1]:
from pathlib import Path
import re
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import numpy as np
from numpy import array
import cv2
import mediapipe as mp
from numpy.linalg import inv
from mediapipe.tasks.python.components.containers import Detection, DetectionResult, BoundingBox, Category

In [2]:
# Inputs:
iou_threshold = 0.5
decision_making_mode = 'all' # other two options could be 'thermal' and 'webcam'

In [None]:
# Step 1: Get the number of images in the chosen test directory
test_dir = Path('/project_fused/data/Testing_Gunner_and_Ethan_Walking')
thermal_files = []
for f in test_dir.joinpath('thermal').glob('*'):
    thermal_files.append(f)
thermal_list = sorted(thermal_files, key=lambda x: int(re.search(r'\d+', x.stem).group()))
num_images = len(thermal_list)

In [None]:
# Step 2: Define the output directory
output_dir = Path('/project_fused/output')

In [None]:
# Step 3: Initialize the object detection models
MAX_RESULTS = 2
SCORE_THRESHOLD = 0 # In practical use, set higher, but for measuring AP, we want all detections

base_options_webcam = python.BaseOptions(model_asset_path='/project_fused/models/efficientdet_lite0.tflite')
options_webcam = vision.ObjectDetectorOptions(base_options=base_options_webcam, running_mode=vision.RunningMode.IMAGE, max_results=MAX_RESULTS, score_threshold=SCORE_THRESHOLD)
webcam_detector = vision.ObjectDetector.create_from_options(options_webcam)

# Initialize the thermal object detection model
base_options_thermal = python.BaseOptions(model_asset_path='/project_fused/models/thermal.tflite')
options_thermal = vision.ObjectDetectorOptions(base_options=base_options_thermal, running_mode=vision.RunningMode.IMAGE, max_results=MAX_RESULTS, score_threshold=SCORE_THRESHOLD)
thermal_detector = vision.ObjectDetector.create_from_options(options_thermal)

# Initialize the lidar object detection model
base_options_lidar = python.BaseOptions(model_asset_path='/project_fused/models/lidar.tflite')
options_lidar = vision.ObjectDetectorOptions(base_options=base_options_lidar, running_mode=vision.RunningMode.IMAGE, max_results=MAX_RESULTS, score_threshold=SCORE_THRESHOLD)
lidar_detector = vision.ObjectDetector.create_from_options(options_lidar)

I0000 00:00:1738967110.004705   35046 task_runner.cc:85] GPU suport is not available: INTERNAL: ; RET_CHECK failure (mediapipe/gpu/gl_context_egl.cc:77) display != EGL_NO_DISPLAYeglGetDisplay() returned error 0x300c
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
I0000 00:00:1738967110.036905   35046 task_runner.cc:85] GPU suport is not available: INTERNAL: ; RET_CHECK failure (mediapipe/gpu/gl_context_egl.cc:77) display != EGL_NO_DISPLAYeglGetDisplay() returned error 0x300c
I0000 00:00:1738967110.099360   35046 task_runner.cc:85] GPU suport is not available: INTERNAL: ; RET_CHECK failure (mediapipe/gpu/gl_context_egl.cc:77) display != EGL_NO_DISPLAYeglGetDisplay() returned error 0x300c


In [6]:
# Step 4: Define the transformation matrices
# Set extrinsic translation matrices based on physical measurements, no z translation assumed
T_l2t = array([[1, 0, 0, 0.028],
                [0, 1, 0, -0.038],
                [0, 0, 1, 0],
                [0, 0, 0, 1]])
T_l2w = array([[1, 0, 0, 0.083],
                [0, 1, 0, -0.035],
                [0, 0, 1, 0],
                [0, 0, 0, 1]])

# Set extrinsic rotation matrices from stereo calibration
R_t2cₜ = array([[0.804905, 0.593319, 0.010014],
                [-0.588094, 0.795337, 0.146920],
                [0.079206, -0.124146, 0.989098]])
R_l2cₜ = array([[0.813639, 0.571181, 0.108367],
                [-0.580035, 0.784919, 0.217856],
                [0.039376, -0.240112, 0.969946]])
R_w2cᵣ = array([[0.903012, -0.397065, -0.164039],
                [0.397183, 0.917127, -0.033513],
                [0.163751, -0.034891, 0.985884]])
R_l2cᵣ = array([[0.909488, -0.399788, -0.114025],
                [0.399705, 0.916314, -0.024592],
                [0.114314, -0.023211, 0.993173]])

# Set intrinsic matrices for the three sensors
Kₗ = array([[205.046875, 0.0, 107.55435943603516],
            [0.0, 205.046875, 82.43924713134766],
            [0.0, 0.0, 1.0]])
Kₜ = array([[161.393925, 0.000000, 78.062273],
            [0.000000, 161.761028, 59.925115], 
            [0.000000, 0.000000, 1.000000]])
Kᵣ = array([[446.423112, 0.000000, 163.485603], 
            [0.000000, 446.765896, 131.217485],
            [0.000000, 0.000000, 1.000000]])

In [7]:
# Step 5: For loop through each set of synchronized images
for i in range(num_images):
    pass

In [8]:
# For testing, we will take one iteration from Step 5 and perform it here
# Step 5a: Choose three synchronized images - need LiDAR equalized and 8 bit
lidar_path = test_dir.joinpath('lidar/lidar_image_50.tiff')
thermal_path = test_dir.joinpath('thermal/thermal_image_50.png')
webcam_path = test_dir.joinpath('webcam/webcam_image_50.png')

In [9]:
# Step 5b: Read in the images as OpenCV images
lidar_image = cv2.imread(lidar_path, cv2.IMREAD_UNCHANGED)
thermal_image = cv2.imread(thermal_path, cv2.IMREAD_UNCHANGED)
webcam_image = cv2.imread(webcam_path, cv2.IMREAD_UNCHANGED)

# Perform LiDAR image processing
max_depth = np.max(lidar_image)
lidar_image_clipped = np.clip(lidar_image, 0, max_depth)
lidar_image_mm = lidar_image_clipped * 1000
lidar_image_normalized = cv2.normalize(lidar_image_mm, None, 0, 65535, cv2.NORM_MINMAX)
lidar_image_8bit = cv2.convertScaleAbs(lidar_image_normalized, alpha=(255.0 / np.max(lidar_image_normalized)))
lidar_image_equalized = cv2.equalizeHist(lidar_image_8bit)

In [10]:
# Step 5c: Convert OpenCV images to RGB format
lidar_image_rgb = cv2.cvtColor(lidar_image_equalized, cv2.COLOR_GRAY2RGB)
thermal_image_rgb = cv2.cvtColor(thermal_image, cv2.COLOR_GRAY2RGB)
webcam_image_rgb = cv2.cvtColor(webcam_image, cv2.COLOR_BGR2RGB)

In [11]:
# Step 5d: Convert RGB images to MediaPipe images
lidar_image_mp = mp.Image(image_format=mp.ImageFormat.SRGB, data=lidar_image_rgb)
thermal_image_mp = mp.Image(image_format=mp.ImageFormat.SRGB, data=thermal_image_rgb)
webcam_image_mp = mp.Image(image_format=mp.ImageFormat.SRGB, data=webcam_image_rgb)

In [12]:
# Step 5e: Perform object detection on the MediaPipe images
lidar_detection_result = lidar_detector.detect(lidar_image_mp)
thermal_detection_result = thermal_detector.detect(thermal_image_mp)
webcam_detection_result = webcam_detector.detect(webcam_image_mp)

thermal_exclude_idx = []
webcam_exclude_idx = []

In [17]:
lidar_detection_result.detections[0].categories[0].score

0.929308295249939

In [19]:
# Step 5f: For loop through each LiDAR detection in the detection result
for detection in lidar_detection_result.detections:
    pass

In [20]:
# For testing, we will take one iteration from Step 5f and perform it here
# Step 5f1: Define the top left and bottom right points of the detection
lidar_detection = lidar_detection_result.detections[0]
bbox = lidar_detection.bounding_box
x1, y1 = bbox.origin_x, bbox.origin_y # Top left
x2, y2 = bbox.origin_x + bbox.width, bbox.origin_y + bbox.height # Bottom right

In [18]:
# Step 5f2: Find the depth on the LiDAR image at the center of the box
uₗ = round((x1 + x2) / 2)
vₗ = round((y1 + y2) / 2)
zₗ = lidar_image[vₗ,uₗ]

In [19]:
def transform(zₗ, uₗ, vₗ):
    if zₗ > 1E-8:
        # Calculate the 3D physical coordinate of the center of the LiDAR image
        pₗ = array([uₗ, vₗ, 1])
        l̂ₗ = inv(Kₗ) @ pₗ
        r̄ₗ = zₗ * l̂ₗ
        
        # Perform extrinsic translations to the thermal sensor and webcam
        r̄ₜ = (inv(R_t2cₜ) @ (R_l2cₜ @ r̄ₗ)) + array([T_l2t[0, 3], T_l2t[1, 3], 0]).T
        r̄ᵣ = (inv(R_w2cᵣ) @ (R_l2cᵣ @ r̄ₗ)) + array([T_l2w[0, 3], T_l2w[1, 3], 0]).T
        
        # Transform 3D coordinate to thermal and webcam pixel coordinates
        r̃ₜ = array([r̄ₜ[0]/r̄ₜ[2], r̄ₜ[1]/r̄ₜ[2], r̄ₜ[2]/r̄ₜ[2]])
        r̃ᵣ = array([r̄ᵣ[0]/r̄ᵣ[2], r̄ᵣ[1]/r̄ᵣ[2], r̄ᵣ[2]/r̄ᵣ[2]])
        pₜ = Kₜ @ r̃ₜ
        pᵣ = Kᵣ @ r̃ᵣ
        uₜ, vₜ = pₜ[0], pₜ[1]
        uᵣ, vᵣ = pᵣ[0], pᵣ[1]
    
    return uₜ, vₜ, uᵣ, vᵣ

In [20]:
# Step 5f3 & 5f4: If depth is not zero, then compute transformed u and v on webcam and thermal frames
x1ₗₜ, y1ₗₜ, x1ₗᵣ, y1ₗᵣ = transform(zₗ, x1, y1)
x2ₗₜ, y2ₗₜ, x2ₗᵣ, y2ₗᵣ = transform(zₗ, x2, y2)

In [21]:
def calc_iou(box_1, box_2):
    # Get corner values from both boxes
    x1, y1, x2, y2 = box_1
    x3, y3, x4, y4 = box_2
    
    # Get corner values for the intersection box
    x_inter1 = max(x1, x3)
    y_inter1 = max(y1, y3)
    x_inter2 = min(x2, x4)
    y_inter2 = min(y2, y4)
    
    # Calculate the area of the intersection box
    width_inter = x_inter2 - x_inter1
    height_inter = y_inter2 - y_inter1
    area_inter = width_inter * height_inter
    
    # Calculate the areas of the two boxes
    width_box1 = x2 - x1
    height_box1 = y2 - y1
    width_box2 = x4 - x3
    height_box2 = y4 - y3
    area_box1 = width_box1 * height_box1
    area_box2 = width_box2 * height_box2
    
    # Calculate the area of the full union of the two boxes
    area_union = area_box1 + area_box2 - area_inter
    
    # Calculate the IoU
    iou = area_inter / area_union

    return iou

In [22]:
# Step 5f5: Calculate IoU between the mapped bounding box and all detection results from the webcam and thermal images
thermal_mapped_box = (x1ₗₜ, y1ₗₜ, x2ₗₜ, y2ₗₜ)
thermal_ious = []
for idxₜ, thermal_detection in enumerate(thermal_detection_result.detections):
    if idxₜ in thermal_exclude_idx:
        continue
    thermal_bbox = thermal_detection.bounding_box
    x1ₜ, y1ₜ = thermal_bbox.origin_x, thermal_bbox.origin_y
    x2ₜ, y2ₜ = thermal_bbox.origin_x + thermal_bbox.width, thermal_bbox.origin_y + thermal_bbox.height
    thermal_box = (x1ₜ, y1ₜ, x2ₜ, y2ₜ)
    thermal_ious.append(calc_iou(thermal_box, thermal_mapped_box))

webcam_mapped_box = (x1ₗᵣ, y1ₗᵣ, x2ₗᵣ, y2ₗᵣ)
webcam_ious = []
for idxᵣ, webcam_detection in enumerate(webcam_detection_result.detections):
    if idxᵣ in webcam_exclude_idx:
        continue
    webcam_bbox = webcam_detection.bounding_box
    x1ᵣ, y1ᵣ = webcam_bbox.origin_x, webcam_bbox.origin_y
    x2ᵣ, y2ᵣ = webcam_bbox.origin_x + webcam_bbox.width, webcam_bbox.origin_y + webcam_bbox.height
    webcam_box = (x1ᵣ, y1ᵣ, x2ᵣ, y2ᵣ)
    webcam_ious.append(calc_iou(webcam_box, webcam_mapped_box))

In [23]:
# Step 5f6: Choose the thermal or webcam detection result corresponding to the LiDAR mapped result whose IoU is the 
#           largest and also above the defined Combination IoU threshold. In the next iterations of the for loop,
#           the thermal or webcam detection result that was chosen should not be chosen again to match with another
#           LiDAR mapped result
max_thermal_iou = max(thermal_ious)
max_thermal_iou_index = thermal_ious.index(max_thermal_iou)
valid_thermal_iou = 0
if max_thermal_iou > iou_threshold:
    valid_thermal_iou, valid_thermal_idx = max_thermal_iou, max_thermal_iou_index
    thermal_exclude_idx.append(valid_thermal_idx)
    
max_webcam_iou = max(webcam_ious)
max_webcam_iou_index = webcam_ious.index(max_webcam_iou)
valid_webcam_iou = 0
if max_webcam_iou > iou_threshold:
    valid_webcam_iou, valid_webcam_idx = max_webcam_iou, max_webcam_iou_index
    webcam_exclude_idx.append(valid_webcam_idx)

In [24]:
def create_detection(data):
    bounding_box = BoundingBox(
        origin_x=data["bounding_box"][0],
        origin_y=data["bounding_box"][1],
        width=data["bounding_box"][2],
        height=data["bounding_box"][3]
    )
    
    category = Category(
        index=None,  # Optional
        score=data["score"],
        display_name=None,  # Optional
        category_name=data["category_name"]
    )
    
    detection = Detection(
        bounding_box=bounding_box,
        categories=[category],
        keypoints=[]  # Optional
    )

    return detection

In [25]:
def transform_back(zₗ, uₜ, vₜ, uᵣ, vᵣ):
    # Perform intrinsic transformations to get line of sight vectors
    pₜ = array([uₜ, vₜ, 1])
    l̂ₜ = inv(Kₜ) @ pₜ 
    pᵣ = array([uᵣ, vᵣ, 1])
    l̂ᵣ = inv(Kᵣ) @ pᵣ
    
    # Add depth for position vectors
    r̄ₜ = zₗ * l̂ₜ 
    r̄ᵣ = zₗ * l̂ᵣ
    
    # Perform extrinsic transformations to the LiDAR sensor
    r̄ₗₜ = (inv(R_l2cₜ) @ (R_t2cₜ @ r̄ₜ)) - array([T_l2t[0, 3], T_l2t[1, 3], 0]).T
    r̄ₗᵣ = (inv(R_l2cᵣ) @ (R_w2cᵣ @ r̄ᵣ)) - array([T_l2w[0, 3], T_l2w[1, 3], 0]).T
    
    # Transform to pixel coordinates
    r̃ₗₜ = array([r̄ₗₜ[0]/r̄ₗₜ[2], r̄ₗₜ[1]/r̄ₗₜ[2], r̄ₗₜ[2]/r̄ₗₜ[2]])
    r̃ₗᵣ = array([r̄ₗᵣ[0]/r̄ₗᵣ[2], r̄ₗᵣ[1]/r̄ₗᵣ[2], r̄ₗᵣ[2]/r̄ₗᵣ[2]])
    pₗₜ = Kₗ @ r̃ₗₜ 
    pₗᵣ = Kₗ @ r̃ₗᵣ 
    uₗₜ, vₗₜ = pₗₜ[0], pₗₜ[1]
    uₗᵣ, vₗᵣ = pₗᵣ[0], pₗᵣ[1]
     
    return uₗₜ, vₗₜ, uₗᵣ, vₗᵣ

In [28]:
def average_detections(lidar_detection, thermal_detection, webcam_detection, thermal_mapped_box, webcam_mapped_box):
    # Get detection coordinates
    thermal_bbox = thermal_detection.bounding_box
    x1ₜ, y1ₜ = thermal_bbox.origin_x, thermal_bbox.origin_y
    x2ₜ, y2ₜ = thermal_bbox.origin_x + thermal_bbox.width, thermal_bbox.origin_y + thermal_bbox.height
    webcam_bbox = webcam_detection.bounding_box
    x1ᵣ, y1ᵣ = webcam_bbox.origin_x, webcam_bbox.origin_y
    x2ᵣ, y2ᵣ = webcam_bbox.origin_x + webcam_bbox.width, webcam_bbox.origin_y + webcam_bbox.height
    x1ₗₜ, y1ₗₜ, x2ₗₜ, y2ₗₜ = thermal_mapped_box
    x1ₗᵣ, y1ₗᵣ, x2ₗᵣ, y2ₗᵣ = webcam_mapped_box
    
    # Average coordinates between mapped and original thermal / webcam results
    x1_avgₜ = (x1ₜ + x1ₗₜ) / 2
    x2_avgₜ = (x2ₜ + x2ₗₜ) / 2
    y1_avgₜ = (y1ₜ + y1ₗₜ) / 2
    y2_avgₜ = (y2ₜ + y2ₗₜ) / 2
    x1_avgᵣ = (x1ᵣ + x1ₗᵣ) / 2
    x2_avgᵣ = (x2ᵣ + x2ₗᵣ) / 2
    y1_avgᵣ = (y1ᵣ + y1ₗᵣ) / 2
    y2_avgᵣ = (y2ᵣ + y2ₗᵣ) / 2
    
    # Average scores between mapped and original thermal / webcam / LiDAR results
    thermal_avg_score = (lidar_detection.categories[0].score + thermal_detection.categories[0].score) / 2
    webcam_avg_score = (lidar_detection.categories[0].score + webcam_detection.categories[0].score) / 2
    lidar_avg_score = (lidar_detection.categories[0].score + thermal_detection.categories[0].score + webcam_detection.categories[0].score) / 3
    
    # Transform average results from thermal / webcam frames to the LiDAR frame
    u1ₗₜ, v1ₗₜ, u1ₗᵣ, v1ₗᵣ = transform_back(zₗ, x1_avgₜ, y1_avgₜ, x1_avgᵣ, y1_avgᵣ)
    u2ₗₜ, v2ₗₜ, u2ₗᵣ, v2ₗᵣ = transform_back(zₗ, x2_avgₜ, y2_avgₜ, x2_avgᵣ, y2_avgᵣ)
    
    # Average the two transformed average results to get the workflow result on the LiDAR frame
    x1_avgₗ = (u1ₗₜ + u1ₗᵣ) / 2
    x2_avgₗ = (u2ₗₜ + u2ₗᵣ) / 2
    y1_avgₗ = (v1ₗₜ + v1ₗᵣ) / 2
    y2_avgₗ = (v2ₗₜ + v2ₗᵣ) / 2
    
    # Create new detections for the averaged detections
    avg_lidar_detection = {
        "bounding_box": (x1_avgₗ, y1_avgₗ, x2_avgₗ - x1_avgₗ, y2_avgₗ - y1_avgₗ),
        "score": lidar_avg_score,
        "category_name": "Person"
    }
    avg_lidar_detection = create_detection(avg_lidar_detection)
    
    avg_thermal_detection = {
        "bounding_box": (x1_avgₜ, y1_avgₜ, x2_avgₜ - x1_avgₜ, y2_avgₜ - y1_avgₜ),
        "score": thermal_avg_score,
        "category_name": "Person"
    }
    avg_thermal_detection = create_detection(avg_thermal_detection)
    
    avg_webcam_detection = {
        "bounding_box": (x1_avgᵣ, y1_avgᵣ, x2_avgᵣ - x1_avgᵣ, y2_avgᵣ - y1_avgᵣ),
        "score": webcam_avg_score,
        "category_name": "Person"
    }
    avg_webcam_detection = create_detection(avg_webcam_detection)
    
    return avg_lidar_detection, avg_thermal_detection, avg_webcam_detection    

In [29]:
# Step 5f7: Depending on the Decision making mode, choose to either keep the mapped result or not based on whether there 
#           is agreement between all 3 or only two sensors
# Step 5f8: If the mapped result is being kept, average the coordinates between the mapped result and corresponding
#           individual thermal and webcam results. Then, transform the average result from the thermal and webcam frames
#           back to the LiDAR frame. Next, average those two transformed average results to get the workflow result on the 
#           LiDAR frame. Now, there should be three averaged detection results corresponding to the three sensors. If the
#           mapped result is not being kept, skip
#NOTE: Add else: continue lines when inside the actual loop
thermal_detection = thermal_detection_result.detections[valid_thermal_idx]
webcam_detection = webcam_detection_result.detections[valid_webcam_idx]
if decision_making_mode == 'all':
    if valid_thermal_iou and valid_webcam_iou:
        lidar_avg_detection, thermal_avg_detection, webcam_avg_detection = \
            average_detections(lidar_detection, thermal_detection, webcam_detection, thermal_mapped_box, webcam_mapped_box)

if decision_making_mode == 'thermal':
    if valid_thermal_iou:
        lidar_avg_detection, thermal_avg_detection, webcam_avg_detection = \
            average_detections(lidar_detection, thermal_detection, webcam_detection, thermal_mapped_box, webcam_mapped_box)

if decision_making_mode == 'webcam':
    if valid_webcam_iou:
        lidar_avg_detection, thermal_avg_detection, webcam_avg_detection = \
            average_detections(lidar_detection, thermal_detection, webcam_detection, thermal_mapped_box, webcam_mapped_box)

In [None]:
# Step 5f9: Store the three averaged detection results at each iteration
# WHEN THE ACTUAL LOOP IS FORMED, ADD CODE HERE

In [None]:
TEXT_COLOR = (0, 0, 255)
BOX_THICKNESS = 3
MARGIN = 5
ROW_SIZE = -15
FONT_SIZE = 0.5
FONT_THICKNESS = 1

In [None]:
def visualize(image, detection_result):

    for detection in detection_result.detections:
        # Draw the bounding box.
        bbox = detection.bounding_box
        start_point = bbox.origin_x, bbox.origin_y
        end_point = bbox.origin_x + bbox.width, bbox.origin_y + bbox.height
        cv2.rectangle(image, start_point, end_point, TEXT_COLOR, BOX_THICKNESS)

        # Write the label.
        category = detection.categories[0]
        category_name = category.category_name
        probability = round(category.score, 2)
        result_text = category_name + ' (' + str(probability) + ')'
        text_location = (MARGIN + bbox.origin_x,
                            MARGIN + ROW_SIZE + bbox.origin_y)
        cv2.putText(image, result_text, text_location, cv2.FONT_HERSHEY_DUPLEX,
                    FONT_SIZE, TEXT_COLOR, FONT_THICKNESS, cv2.LINE_AA)
        
    return image

In [None]:
# Step 5g: With all of the averaged, mapped detection results, and with all of the individual sensor detection results, draw
#          the bounding boxes on the images corresponding to the results

In [None]:
# Step 5h: Output the 3 new images (LiDAR, thermal, webcam) with bounding boxes drawn on them to the output directory for viewing