# 1. Import library 

In [1]:
import pandas as pd
import open3d
import numpy as np
from utilities.classification import classify_from_obb_and_plane
from utilities.obb import get_horizontal_obb
from utilities.draw_id import *
from os import path
from scipy.spatial.distance import cdist
from scipy.optimize import linear_sum_assignment
from tqdm import tqdm
o3d.utility.set_verbosity_level(o3d.utility.VerbosityLevel.Error)

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


# 2. Model implement

## 2.1. RANSAC and DBSCAN

In [9]:
def load_clustred_point_cloud(path_to_pcd):
    # 1. Load Data
    pcd = open3d.io.read_point_cloud(path_to_pcd)
    # open3d.visualization.draw_geometries([pcd])

    # 2. Voxel downsampling
    #print(f"Points before downsampling: {len(pcd.points)} ")
    pcd = pcd.voxel_down_sample(voxel_size=0.1)
    #print(f"Points after downsampling: {len(pcd.points)}")
    #open3d.visualization.draw_geometries([pcd])

    # 3. RANSAC Segmentation to identify the largest plane (in this case the ground/road) from objects above it
    plane_model, inliers = pcd.segment_plane(distance_threshold=0.2, ransac_n=3, num_iterations=150)
    ## Identify inlier points -> road
    inlier_cloud = pcd.select_by_index(inliers)
    inlier_cloud.paint_uniform_color([0, 1, 1])

    ## Identify outlier points -> objects on the road
    outlier_cloud = pcd.select_by_index(inliers, invert=True)
    outlier_cloud.paint_uniform_color([1, 0, 0])
    #open3d.visualization.draw_geometries([inlier_cloud, outlier_cloud])

    # 4. Clustering using DBSCAN -> To further segment objects on the road
    with open3d.utility.VerbosityContextManager(open3d.utility.VerbosityLevel.Error) as cm:
        
        labels = np.array(outlier_cloud.cluster_dbscan(eps=0.4, min_points=7, print_progress=False))

    max_label = labels.max()
    #print(f"point cloud has {max_label + 1} clusters")
    ## Get label colors
    colors = plt.get_cmap("tab20")(labels/(max_label if max_label>0 else 1))
    colors[labels<0] = 0
    ## Colorized objects on the road
    outlier_cloud.colors = open3d.utility.Vector3dVector(colors[:, :3])
    #open3d.visualization.draw_geometries([outlier_cloud])
    return plane_model, inlier_cloud, outlier_cloud, labels


Setting for choosing area to detect bounding box

In [None]:
# On road plane limits
# X_positive = 1.2
# X_negative = -8
# Y_positive = 14.5

# Full scene limits
X_positive = 250
X_negative = -250
Y_positive = 250

## 2.2 Bounding box detection

In [5]:
# 5. Generate 3D Bounding Boxes
from math import dist
from os import name
from tabnanny import check

from matplotlib.pyplot import flag

def filter_box(plane_model, inlier_cloud, outlier_cloud, labels):

    dataframe_info = pd.DataFrame(columns=['object_id', 'x', 'y', 'z'])

    filtered_obbs = []
    indexes = pd.Series(range(len(labels))).groupby(labels, sort=False).apply(list).tolist()
    MAX_POINTS = 1500
    MIN_POINTS = 20
    a, b, c, d = plane_model
    
    normal = np.array([a, b, c])
    MAX_HEIGHT = 2   # meters
    MIN_HEIGHT = 0.5  # meters
    HEIGHT_BOX_MAX = 2  # meters
    HEIGHT_BOX_MIN = 1  # meters
    #MAX_ELEVATION = 1  # meters above road
    object_number = 0
    ## For each individual object on the road
    for i in range(0, len(indexes)):
        nb_points = len(outlier_cloud.select_by_index(indexes[i]).points)
        # If object size within the criteria, draw bounding box
        if (nb_points>MIN_POINTS and nb_points<MAX_POINTS):
            sub_cloud = outlier_cloud.select_by_index(indexes[i])
            obb = get_horizontal_obb(sub_cloud)
                #=== Filter boxes based on position
            center = obb.get_center()

            if ((center[0] < X_positive) and (center[0] > X_negative)) or (center[1] < Y_positive):
                flag_position = True
            else:
                flag_position = False

            min_bound = obb.get_min_bound()   # lowest corner [x,y,z]
            max_bound = obb.get_max_bound() 

            checking_height = abs(max_bound[2] - min_bound[2])

            dist_max_bound = abs(np.dot(normal, max_bound) + d) / np.linalg.norm(normal)
            dist_min_bound = abs(np.dot(normal, min_bound) + d) / np.linalg.norm(normal)
            height_box = abs(dist_max_bound - dist_min_bound)
        # Keep only boxes close to the plane and not too tall
            #if  height_box < HEIGHT_BOX_MAX and checking_height < 2.5 and (min_bound[2] < 1) and  height_box > HEIGHT_BOX_MIN: #and height_box > HEIGHT_BOX_MIN: #and dist_max_bound > MIN_HEIGHT, dist_max_bound < MAX_HEIGHT  and
            if  (dist_max_bound > 0.4) and (dist_max_bound <MAX_HEIGHT) and (checking_height < 2.5) and (checking_height > 0.1) and flag_position: #and (dist_min_bound < MIN_HEIGHT):  #and (checking_height > 0.1): #and height_box > HEIGHT_BOX_MIN: #and dist_max_bound > MIN_HEIGHT, dist_max_bound < MAX_HEIGHT  and
            
                filtered_obbs.append(obb)
                #=== calculate the mean value of the object points
                pts = np.asarray(sub_cloud.points)
                mean_xyz = pts.mean(axis=0)
                object_number += 1
                dataframe_info.loc[len(dataframe_info)] = {'object_id': object_number,
                                                        'x': mean_xyz[0],
                                                        'y': mean_xyz[1],
                                                        'z': mean_xyz[2],}


    #print(f"Number of Bounding Boxes calculated {len(filtered_obbs)}")

    ## Combined all visuals: outlier_cloud (objects), obbs (oriented bounding boxes), inlier_cloud (road)
    list_of_visuals = []
    list_of_visuals.append(outlier_cloud)
    #list_of_visuals.extend(obbs)
    list_of_visuals.extend(filtered_obbs)
    #list_of_visuals.extend(labels_3d)

    #== road
    list_of_visuals.append(inlier_cloud)


    #print(type(list_of_visuals))

    return list_of_visuals, dataframe_info,filtered_obbs


In [None]:
# Load CSV and save temporary PCD
def save_temp_pcd(id_frame):

    df = pd.read_csv(f"./data_IU_final/raw_data_consecutive/192.168.26.26_2020-11-25_20-01-45_frame-{id_frame}.csv", sep=';')

    # Extract XYZ
    points = df[['X', 'Y', 'Z']].to_numpy()

    # Create Open3D point cloud
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)

    # Save as PCD
    o3d.io.write_point_cloud(f"./data_IU_final/temporal_pcd_process/{id_frame}.pcd", pcd)
    return f"./data_IU_final/temporal_pcd_process/{id_frame}.pcd"

## 2.3. Hungarian Algorithm

In [7]:
def assignIds(prevDf, currDf, next_id, maxDistance=4.0):
    nextId = next_id
    prevCentroids = prevDf[['x', 'y', 'z']].to_numpy()
    currCentroids = currDf[['x', 'y', 'z']].to_numpy()

    prevLabels = prevDf["object_id"].tolist()
    currLabels = currDf["object_id"].tolist()

    costMatrix = cdist(prevCentroids, currCentroids)

    rowInd, colInd = linear_sum_assignment(costMatrix)

    #nextId = max(prevLabels) + 1 if  len(prevLabels)!=0 else 0

    #print("check  max _ pre ID:, ", nextId)

    temp_out_label = [-1 for _ in range(len(currLabels))]
    
    for r, c in zip(rowInd, colInd):
        if costMatrix[r, c] < maxDistance:  
            temp_out_label[c] = prevLabels[r]
        

    for i in range(len(temp_out_label)):
        if temp_out_label[i] == -1:
            temp_out_label[i] = nextId
            nextId += 1
    currLabels = temp_out_label
    
    return currLabels, nextId

## 2.4. Main run, full pipeline training

In [14]:
#list_frame = list(range(2045, 2055))
#list_frame = list(range(2200, 2225))
list_frame = list(range(1849, 2567))
#list_frame = list(range(2424, 2435))
#list_frame = list(range(2424, 2425))
previous_dataframe_info = pd.DataFrame(columns=['object_id', 'x', 'y', 'z'])
global_id_object = 0

for i in tqdm(list_frame):
    current_path_pcd = save_temp_pcd(i)
    current_plane_model, current_inlier_cloud, current_outlier_cloud, current_labels = load_clustred_point_cloud(current_path_pcd)
    # list_of_visuals contains class, bounding box, and point cloud
    # dataframe_info contains object_id and its mean x,y,z
    current_list_of_visuals, current_dataframe_info, current_filtered_obbs = filter_box(current_plane_model, current_inlier_cloud, current_outlier_cloud, current_labels)

    if i == list_frame[0]:
        current_list_id = current_dataframe_info["object_id"].tolist()
        global_id_object = max(current_list_id) + 1 if len(current_list_id)>0 else 0
    else:
        current_list_id, next_id_check = assignIds(previous_dataframe_info, current_dataframe_info, global_id_object)
        if next_id_check > global_id_object:
            global_id_object = next_id_check 
        
    previous_dataframe_info = current_dataframe_info.copy()
    previous_dataframe_info["object_id"] = current_list_id
#===========================================
    labels_str = []
    for index, obb in enumerate(current_filtered_obbs):
        
        name = classify_from_obb_and_plane(obb, current_plane_model) + str(current_list_id[index])
        labels_str.append(name)
    #     if i == 2207:
    #         print("Debug at frame 2207: ", np.asarray(obb.get_box_points()))
    # if i == 2207:
    #         temp_data = current_filtered_obbs
    #         plane_2207 = current_plane_model
    visualize_matplotlib(current_list_of_visuals, labels_str, "./data_IU_final/final_result/", frameId=i)
#===========================================
    
    # print(f"Frame {i} processed with {len(current_list_id)} objects.")
    # print(previous_dataframe_info)

#open3d.visualization.draw_geometries(current_list_of_visuals, mesh_show_back_face=False)

  0%|          | 0/718 [00:00<?, ?it/s]

100%|██████████| 718/718 [20:34<00:00,  1.72s/it]


# 3. Render video
The funtion below is used for rendering video, at 7fps

In [None]:
import cv2
import os
import re

# -----------------------------
# CONFIG
# -----------------------------
input_folder = "./data_IU_final/final_result"
output_video = "output_video_experiment_1.mp4"
fps = 7

# -----------------------------
# Numeric sort helper
# -----------------------------
def numeric_key(filename):
    # Extract number from filename (e.g., "12.png" → 12)
    num = re.findall(r'\d+', filename)
    return int(num[0]) if num else -1

# -----------------------------
# Collect and sort PNG files
# -----------------------------
files = [f for f in os.listdir(input_folder) if f.lower().endswith(".png")]
files = sorted(files, key=numeric_key)

if not files:
    raise ValueError("No PNG files found in folder.")

# -----------------------------
# Read first frame to get size
# -----------------------------
first_frame_path = os.path.join(input_folder, files[0])
first_frame = cv2.imread(first_frame_path)

if first_frame is None:
    raise ValueError("Could not read first PNG file.")

height, width, _ = first_frame.shape

# -----------------------------
# Create video writer
# -----------------------------
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
video = cv2.VideoWriter(output_video, fourcc, fps, (width, height))

# -----------------------------
# Add frames to video
# -----------------------------
for f in files:
    path = os.path.join(input_folder, f)
    frame = cv2.imread(path)
    if frame is None:
        print("Skipping unreadable file:", f)
        continue
    video.write(frame)
    #print("Added frame:", f)

video.release()
print("Video saved to:", output_video)

Video saved to: output_video_experiment_1.mp4


: 