In [1]:
import sys
sys.path.insert(0, 'utils.py')

In [2]:
# /!\ Before running the lab make sure every additional libraries is installed 

# Import local libraries
from utils import *
import time

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


INFO - 2024-11-06 15:03:15,613 - utils - NumExpr defaulting to 2 threads.
  from pandas.core.computation.check import NUMEXPR_INSTALLED


# Reading of one LiDAR scan

In [3]:
n_frame = 0
actor = 'ego_vehicle'

points = get_point_cloud(n_frame, actor)

pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points)

flag_display = True

# Voxel Down sampling

To reduce the number of points and accelerate the computation use the function _voxel_down_sample()_ of open3D.

In [4]:
downpcd = pcd # ToDo (Replace this line) 
downpcd = pcd.voxel_down_sample(0.1)
if flag_display:
    o3d.visualization.draw_geometries([downpcd])

# Normals estimation
We would like to detect opstacles, to do that the first step is to estimate normals to obtain the local orientation of the point cloud.

Use the function _estimate_normals()_ of open3D to estimate this normals.

In [5]:
#ToDo estimate the normals
downpcd.estimate_normals()
if flag_display:
    o3d.visualization.draw_geometries([downpcd], point_show_normal=True)

In [6]:
downpcd.colors = o3d.utility.Vector3dVector(np.abs(np.array(downpcd.normals)))

if flag_display:
    o3d.visualization.draw_geometries([downpcd])

# Ground and objects segmentation

Based on the height of each point, the local orientation of the points cloud arround this point and eventually the variation of this orientation filter the ground to obtain a new points cloud whith only the ground points and another one with only the objects on top of this ground.

For both of these new points clouds you can also filter the points corresponding to the roof of the ego vehicle.

In [7]:
#Notes:
#Deatch ground from objects based on a height threshold of the point coordinates
#Then we can plot the normals to see that they should all point in the same direction
#Or plot the difference of normals, to show that that should be small

'''
    The points are distributed in this form:
    Point 1 - x y z
    Point 2 - x y z
    ...
    To extract the heights, i can get the third column of all the rows (Z coordinates of the points)
'''

#Step 1 - Extract points from the point cloud
#Step 2 - Extract the height of all the points
    #An extra threshold can be used, to keep only points that have a perpendicular normal
#Step 3 - Select a subset of points whose height is smaller than the threshold (for ground) and higher than threshold (for objects)
#Step 4 - Remove the roof of the car with a distance threshold to the points (radius of 2.5m worked well)
#Step 5 - Rebuild a pointcloud for visualization

#Plot the normals with absolute value, so that it points all in the same direction


'\n    The points are distributed in this form:\n    Point 1 - x y z\n    Point 2 - x y z\n    ...\n    To extract the heights, i can get the third column of all the rows (Z coordinates of the points)\n'

In [9]:

def ground_filtering(pcd):
    points = np.array(pcd.points)
    
    height_threshold = 0.2
    normal_threshold = 0.3
    
    normals = pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.4, max_nn=100))
    normals = np.asarray(pcd.normals)
    normals_xy = normals[:,:1]
    normals_xy_norm = np.linalg.norm(normals_xy, axis=1)

    ground_mask = points[:, 2] < height_threshold
    normal_mask = normals_xy_norm < normal_threshold
    combined_mask = ground_mask & normal_mask

    ground_points = points[combined_mask]

    pcd_ground = o3d.geometry.PointCloud()
    pcd_ground.points = o3d.utility.Vector3dVector(ground_points)

    car_threshold = 2.5
    roof_mask = np.linalg.norm(ground_points, axis=1) > car_threshold
    ground_points = ground_points[roof_mask]
    
    return pcd_ground

pcd_ground = ground_filtering(downpcd)

pcd_ground.estimate_normals()
normals = np.asarray(pcd_ground.normals)
normals_abs = np.abs(normals)
pcd_ground.normals = o3d.utility.Vector3dVector(normals_abs)

if flag_display:
    o3d.visualization.draw_geometries([pcd_ground])
    o3d.visualization.draw_geometries([pcd_ground], point_show_normal=True)

In [10]:
def objects_filtering(pcd):
    points = np.array(pcd.points)
    
    height_threshold = 0.2
    normal_threshold = 0.3
    
    normals = pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.4, max_nn=100))
    normals = np.asarray(pcd.normals)
    normals_xy = normals[:,:1]
    normals_xy_norm = np.linalg.norm(normals_xy, axis=1)

    ground_mask = points[:, 2] > height_threshold
    normal_mask = normals_xy_norm > normal_threshold
    combined_mask = ground_mask | normal_mask

    object_points = points[combined_mask]

    car_threshold = 2.5
    roof_mask = (np.linalg.norm(object_points, axis=1) > car_threshold)
    object_points = object_points[roof_mask]

    pcd_objects = o3d.geometry.PointCloud()
    pcd_objects.points = o3d.utility.Vector3dVector(object_points)
    
    return pcd_objects

pcd_objects = objects_filtering(downpcd)

if flag_display:
    o3d.visualization.draw_geometries([pcd_objects])

# Clustering

Now that the points of the objects are not connected to the ground anymore we can regroup all the points of the same object on one cluster.

To do that, use the function _cluster_dbscan()_ of open3D to obtain the cluster id for each point.

In [11]:
def clustering(pcd_objects):
    
    cluster_ids = np.array(
        pcd_objects.cluster_dbscan(eps=2, min_points=10, print_progress=True)) # ToDo (Replace this line)
    
    cluster_nb = cluster_ids.max()
    colors = plt.get_cmap("tab20")(cluster_ids / (cluster_nb if cluster_nb > 0 else 1))
    colors[cluster_ids < 0] = 0
    pcd_objects.colors = o3d.utility.Vector3dVector(colors[:, :3])
    return pcd_objects, cluster_ids

pcd_objects, cluster_ids = clustering(pcd_objects)

print("The points cloud has", cluster_ids.max() + 1, "clusters")

if flag_display:
    o3d.visualization.draw_geometries([pcd_objects])



# Bounding Boxes estimation
Estimate for each of these clusters its bounding box. You can use the function _get_axis_aligned_bounding_box()_ or _get_oriented_bounding_box()_ of open3D.

What are the limits of these functions ?

You can keep only the small bounding boxes correponding to the small objects (such as vehicles or pedestrians and not buildings)

In [12]:
def cluster_boxes(pcd_objects, cluster_ids):
    
    # Bounding boxes
    boxes = []
    points = np.asarray(pcd_objects.points) #Extract all the points from the objects
    
    for i in range(cluster_ids.max() + 1):          #loop through clusters
        cluster_points = points[cluster_ids == i]

        cluster_pcd = o3d.geometry.PointCloud()
        cluster_pcd.points = o3d.utility.Vector3dVector(cluster_points)

        # Create the OrientedBoundingBox for this cluster
        bbox = cluster_pcd.get_oriented_bounding_box()

        bbox_extent = bbox.extent
        max_size = 5.5  #By trial and error, this was a value that allowed to select all the important obstacles (cars, signs, pedestrians) and not the buildings
        if all(bbox_extent <= np.array(max_size)):
            # Add the bounding box to the list
            boxes.append(bbox)
    return boxes

boxes = cluster_boxes(pcd_objects, cluster_ids)

obj_to_display = boxes
obj_to_display.append(pcd_objects)

if flag_display:
    o3d.visualization.draw_geometries(obj_to_display)

# Objects detection and tracking on LiDAR scans

Applied the previous functions on the different scan to estimate the bounding boxes.

### Bounding boxes association

Create a function to associate the bounding boxes of the current time to the one of the previous time based the shortest distances of there centers.

### Bounding boxes orientation estimation
Based on these association estimate the orientation of there motion to correct the orientation of the bounding boxes.

### Points cloud mapping
Build a map by accumulating the different points cloud and removing the moving objects (keep the cluster id and add a new one for the ground points)

### Iterative Closest Point tracking
To better associate the clusters you can also use the function _icp()_ of open3D on each cluster

In [13]:
# Initialize Open3D Visualizer
if flag_display:
    vis = o3d.visualization.Visualizer()
    vis.create_window()

for n_frame in range(nb_frames):
    
    # Read point cloud
    points = get_point_cloud(n_frame, actor)
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    
    # Voxel down sampling
    downpcd = pcd.voxel_down_sample(0.1) # ToDo (Replace this line)
    
    # Transform points cloud from the LiDAR to the world frame
    tf = get_tf_lidar2world(actor, n_frame)
    # ToDo applied this transformation to the points cloud
    downpcd.transform(tf)
    
    # Estimate normals
    #ToDo estimate the normals
    downpcd.estimate_normals()
    downpcd.colors = o3d.utility.Vector3dVector(np.abs(np.array(downpcd.normals)))
    
    # Filter ground points
    pcd_ground = ground_filtering(downpcd)
    
    # Filter ground points
    pcd_ground = ground_filtering(downpcd)
    
    # Filter objects points
    pcd_objects = objects_filtering(downpcd)
    
    # Clustering
    pcd_objects, cluster_ids = clustering(pcd_objects)
    
    if flag_display:
        # Clear previous geometries and add new point cloud to visualizer
        vis.clear_geometries()
        vis.add_geometry(pcd_objects)
        
        # Update the visualizer to show the new point cloud
        vis.poll_events()
        vis.update_renderer()

        # Wait before updating the points cloud (adjust time as needed)
        time.sleep(0.1)

if flag_display:
    # Close the visualizer
    vis.destroy_window()

