In [7]:
import sys

sys.path.insert(0, 'my_utils.py')

In [8]:
# /!\ Before running the lab make sure every additional libraries is installed 

# Import local libraries
from my_utils import *
import time
import numpy as np 
import open3d as o3d
import einops
import pyquaternion

# Reading of one LiDAR scan

In [9]:
n_frame = 0
actor = 'ego_vehicle'

points = get_point_cloud(n_frame, actor)

pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points)

flag_display = True

ego_vehicle


# Voxel Down sampling

To reduce the number of points and accelerate the computation use the function _voxel_down_sample()_ of open3D.

In [10]:
downpcd = pcd.voxel_down_sample(voxel_size=0.005)

if flag_display:
    o3d.visualization.draw_geometries([downpcd])

# Normals estimation
We would like to detect opstacles, to do that the first step is to estimate normals to obtain the local orientation of the point cloud.

Use the function _estimate_normals()_ of open3D to estimate this normals.

In [11]:
#ToDo estimate the normals

downpcd.estimate_normals()

if flag_display:
    o3d.visualization.draw_geometries([downpcd], point_show_normal=True)

In [12]:
downpcd.colors = o3d.utility.Vector3dVector(np.abs(np.array(downpcd.normals)))

if flag_display:
    o3d.visualization.draw_geometries([downpcd])

# Ground and objects segmentation

Based on the height of each point, the local orientation of the points cloud around this point and eventually the variation of this orientation filter the ground to obtain a new points cloud whith only the ground points and another one with only the objects on top of this ground.

For both of these new points clouds you can also filter the points corresponding to the roof of the ego vehicle.

In [13]:
def ground_filtering(pcd):
    threshold = 0.5
    
    points = np.asarray(pcd.points)
    normals = np.asarray(pcd.normals)
    
    ground_points = []
    
    for i in range(len(points)):
        if points[i][2] < threshold:
            
            ground_points.append(points[i])
            
    pcd_ground = o3d.geometry.PointCloud()
    pcd_ground.points = o3d.utility.Vector3dVector(np.array(ground_points))
    
    return pcd_ground

pcd_ground = ground_filtering(downpcd)

if flag_display:
    o3d.visualization.draw_geometries([pcd_ground])

In [8]:
def objects_filtering(pcd):
    threshold = 0.5
    
    points = np.asarray(pcd.points)
    normals = np.asarray(pcd.normals)
    
    objects_points = []
    
    for i in range(len(points)):
        if points[i][2] > threshold:
            
            objects_points.append(points[i])
            
    pcd_objects = o3d.geometry.PointCloud()
    pcd_objects.points = o3d.utility.Vector3dVector(np.array(objects_points))
    
    return pcd_objects

pcd_objects = objects_filtering(downpcd)

if flag_display:
    o3d.visualization.draw_geometries([pcd_objects])

# Clustering

Now that the points of the objects are not connected to the ground anymore we can regroup all the points of the same object on one cluster.

To do that, use the function _cluster_dbscan()_ of open3D to obtain the cluster id for each point.

In [9]:
def clustering(pcd_objects):
    
    scan = pcd_objects.cluster_dbscan(1, 10)
    
    cluster_ids = np.array(scan)
    
    cluster_nb = cluster_ids.max()
    colors = plt.get_cmap("tab20")(cluster_ids / (cluster_nb if cluster_nb > 0 else 1))
    colors[cluster_ids < 0] = 0
    pcd_objects.colors = o3d.utility.Vector3dVector(colors[:, :3])
    return pcd_objects, cluster_ids

pcd_objects, cluster_ids = clustering(pcd_objects)

print("The points cloud has", cluster_ids.max() + 1, "clusters")

if flag_display:
    o3d.visualization.draw_geometries([pcd_objects])

The points cloud has 72 clusters


# Bounding Boxes estimation
Estimate for each of these clusters its bounding box. You can use the function _get_axis_aligned_bounding_box()_ or _get_oriented_bounding_box()_ of open3D.

What are the limits of these functions ?

You can keep only the small bounding boxes correponding to the small objects (such as vehicles or pedestrians and not buildings)

In [107]:
def cluster_boxes(pcd_objects, cluster_ids):
    
    boxes = []
    max_box_size = 5
    
    # Get unique cluster IDs
    unique_cluster_ids = np.unique(cluster_ids)
    
    # Iterate over each unique cluster ID
    for cluster_id in unique_cluster_ids:
        # Gather indices for points belonging to the current cluster
        indices = []
        for i, cid in enumerate(cluster_ids):
            if cid == cluster_id:
                indices.append(i)
        
        # Select points for the current cluster
        cluster_points = pcd_objects.select_by_index(indices)
        
        # Get the axis-aligned bounding box for the selected cluster points
        box = cluster_points.get_axis_aligned_bounding_box()
        
        # Get the size (width, height, depth) of the bounding box
        box_size = box.get_extent()
        
        # Check if the bounding box meets the maximum size criteria
        if max(box_size) <= max_box_size:
            boxes.append(box)
    
    return boxes

boxes = cluster_boxes(pcd_objects, cluster_ids)

obj_to_display = boxes
obj_to_display.append(pcd_objects)

if flag_display:
    o3d.visualization.draw_geometries(obj_to_display)
    #o3d.visualization.draw_geometries([obj_to_display, boxes])

# Objects detection and tracking on LiDAR scans

Applied the previous functions on the different scan to estimate the bounding boxes.

### Bounding boxes association

Create a function to associate the bounding boxes of the current time to the one of the previous time based the shortest distances of there centers.

### Bounding boxes orientation estimation
Based on these association estimate the orientation of there motion to correct the orientation of the bounding boxes.

### Points cloud mapping
Build a map by accumulating the different points cloud and removing the moving objects (keep the cluster id and add a new one for the ground points)

### Iterative Closest Point tracking
To better associate the clusters you can also use the function _icp()_ of open3D on each cluster

In [110]:
# Initialize Open3D Visualizer
if flag_display:
    vis = o3d.visualization.Visualizer()
    vis.create_window()

for n_frame in range(nb_frames):
    
    # Read point cloud
    points = get_point_cloud(n_frame, actor)
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    
    # Voxel down sampling
    downpcd = pcd.voxel_down_sample(voxel_size=0.005)
    
    # Transform points cloud from the LiDAR to the world frame
    tf = get_tf_lidar2world(actor, n_frame)
    # ToDo applied this transformation to the points cloud
    
    # Estimate normals
    #ToDo estimate the normals
    downpcd.estimate_normals()
    downpcd.colors = o3d.utility.Vector3dVector(np.abs(np.array(downpcd.normals)))
    
    # Filter ground points
    pcd_ground = ground_filtering(downpcd)
    
    # Filter ground points
    pcd_ground = ground_filtering(downpcd)
    
    # Filter objects points
    pcd_objects = objects_filtering(downpcd)
    
    # Clustering
    pcd_objects, cluster_ids = clustering(pcd_objects)
    
    if flag_display:
        # Clear previous geometries and add new point cloud to visualizer
        vis.clear_geometries()
        vis.add_geometry(pcd_objects)
        
        # Update the visualizer to show the new point cloud
        vis.poll_events()
        vis.update_renderer()

        # Wait before updating the points cloud (adjust time as needed)
        time.sleep(0.1)

if flag_display:
    # Close the visualizer
    vis.destroy_window()

ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle
ego_vehicle


KeyboardInterrupt: 