### Initialization

In [12]:
%load_ext autoreload
%autoreload 2
import sys

sys.path.append("../")

from typing import List

import numpy as np
from matplotlib import pyplot as plt
import open3d as o3d
import pandas as pd
import cv2 as cv



%matplotlib inline
# plt.rcParams["figure.figsize"] = [20, 10]

# import the ZOD DevKit
from zod import ZodSequences

# NOTE! Set the path to dataset and choose a version
folder_path = "/Users/quany/Downloads/GPDF_MPC_ADAS"  # local folder 
dataset_root = folder_path+"/mini"  # your local path to zod
version = "full"  # "mini" or "full"


# initialize ZodSequences
zod_sequences = ZodSequences(dataset_root=dataset_root, version=version)

### Fetch a ZodSequence

In [13]:
# Fetch certain zod sequence
seq = zod_sequences[330]

### Object Detection Functions

#### Clustering function

In [14]:
def down_segment_cluster(pcd, voxel_size, distance_threshold, ransac_n, num_iterations, eps, min_points):
    
    # Segmentation of the road and objects
    _, inliers = pcd.segment_plane(distance_threshold=distance_threshold, ransac_n=ransac_n, num_iterations=num_iterations)
    inlier_cloud = pcd.select_by_index(inliers)
    pcd = pcd.select_by_index(inliers, invert=True)
    inlier_cloud.paint_uniform_color([1,0,0])
    pcd.paint_uniform_color([0,0,1])

    # Voxel downsample to remove uneccessary points
    pcd_down = pcd.voxel_down_sample(voxel_size=voxel_size)
    
    # Clustering and Labeling
    with o3d.utility.VerbosityContextManager(o3d.utility.VerbosityLevel.Debug) as cm:
        labels = np.array(pcd_down.cluster_dbscan(eps=eps, min_points=min_points, print_progress=False))
    max_label = labels.max()
    print(f"point cloud has {max_label + 1} clusters")
    colors = plt.get_cmap("tab20")(labels / (max_label if max_label > 0 else 1))
    colors[labels < 0] = 0
    pcd_down.colors = o3d.utility.Vector3dVector(colors[:, :3])
    
    return labels, pcd_down, pcd

In [15]:
def get_bounding_boxes(labels, pcd):
    
    # Get the bounding boxes given the labels and the point cloud
    obbs = []
    indexes = pd.Series(range(len(labels))).groupby(labels, sort=False).apply(list).tolist()

    Max_Points = 1000
    Min_Points = 30
    Max_leng = 5
    Min_width = 1

    for i in range(0, len(indexes)):
        nb_pts = len(pcd.select_by_index(indexes[i]).points)
        if (nb_pts > Min_Points and nb_pts < Max_Points):
            sub_cloud = pcd.select_by_index(indexes[i])
            obb = sub_cloud.get_axis_aligned_bounding_box()
            obb_length = obb.max_bound[1]-obb.min_bound[1]
            obb_width = obb.max_bound[0]-obb.min_bound[0]
            if (obb_length < Max_leng and obb_width > Min_width):
                obb.color=(0,0,1)
                obbs.append(obb)
            
    return obbs

#### Draw bbx functions

In [16]:
# Draw bounding box and clusters 
lines = [[0, 1], [1, 7], [7, 2], [0, 2], 
         [4, 5], [5, 3], [3, 6], [4, 6],
         [0, 3], [1, 6], [2, 5], [4, 7]]

def hard_code_add_geometry(vis, bbxs):
    
    for c in range(len(bbxs)):
        
        line_set = o3d.geometry.LineSet()
        line_set.lines = o3d.utility.Vector2iVector(lines)
        line_set.points = bbxs[c].get_box_points()
        vis.add_geometry(line_set)
        
    return vis


def hard_code_update_geometry(vis, bbxs):
    
    for c in range(len(bbxs)):
        
        line_set = o3d.geometry.LineSet()
        line_set.lines = o3d.utility.Vector2iVector(lines)
        line_set = line_set.clear()
        line_set.points = bbxs[c].get_box_points()
        vis.update_geometry(line_set)
            
    return vis

#### Change bbx form for tracker

In [17]:
def bbx4tracker(bbxs):
    bbx_list = []
    confidences = []
    class_ids = []
    for c in range(len(bbxs)):
        tl_x = bbxs[c].min_bound[0]
        tl_y = bbxs[c].max_bound[1]
        width = bbxs[c].max_bound[0]-bbxs[c].min_bound[0]
        length = bbxs[c].max_bound[1]-bbxs[c].min_bound[1]
        bbx_list.append([tl_x, tl_y, width, length])
        confidences.append(1)
        class_ids.append(1)

    return bbx_list, confidences, class_ids


#### Draw tracked bbx functions

In [18]:
def add_text_to_visualizer(vis, track_single):
    txt = str(track_single[1])
    text_mesh = o3d.t.geometry.TriangleMesh.create_text(txt, depth=8).to_legacy()
    text_mesh.paint_uniform_color((1, 0, 0))
    location = (track_single[2]+0.5*track_single[4], track_single[3], 0)  # Example location
    text_mesh.transform([[0.1, 0, 0, location[0]], [0, 0.1, 0, location[1]], [0, 0, 0.1, location[2]], [0, 0, 0, 1]])
    vis.add_geometry(text_mesh)
    return vis

### Iteration

In [20]:
from motrackers import CentroidTracker 

tracker = CentroidTracker(max_lost=1) # Create a tracker object and set max_lost to an integer value

vis = o3d.visualization.Visualizer()
vis.create_window()

params = {'voxel_size': 0.3,
          'distance_threshold': 0.2, 
          'ransac_n': 3, 
          'num_iterations': 500, 
          'eps': 0.9, 
          'min_points': 30}


log = list()
log_bbx = list()

# Initilize geometry which is the point cloud used in animaiton
geometry = o3d.geometry.PointCloud()
# vis.add_geometry(geometry)

# Iteration over frames
draw_every_nth = 1
frames = seq.info.get_lidar_frames()

for i, frame in enumerate(frames):
    if i % draw_every_nth == 0:
        # Read pcd of current frame
        lidar_frame = frame
        pcd_frame = seq.get_compensated_lidar(lidar_frame.time)
        
        # Get the pcd in FOV
        pcd_FOV = [i for i in pcd_frame.points if i[0] > -6]
        pcd_FOV = [i for i in pcd_FOV if i[0] < 6]
        pcd_FOV = [i for i in pcd_FOV if i[1] > 0]
        pcd_FOV = [i for i in pcd_FOV if i[1] < 30]
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(pcd_FOV)
        # Create labeled clusters
        labels, cloud_clusters, original_cloud = down_segment_cluster(pcd, **params)
        # Get bounding boxes
        bbxs = get_bounding_boxes(labels = labels, pcd = cloud_clusters)
        bbx_list, confidences, class_ids = bbx4tracker(bbxs)
        # Update tracker
        tracks = tracker.update(bbx_list, confidences, class_ids)       

        # Append results to list
        log.append(tracks)


        # Draw pcd
        geometry.points = cloud_clusters.points
        vis.add_geometry(geometry)
        vis.update_geometry(geometry)
        # Draw bounding boxes
        vis = hard_code_add_geometry(vis, bbxs)
        vis = hard_code_update_geometry(vis, bbxs)
        # Draw tracks
        for track in tracks:
            vis = add_text_to_visualizer(vis, track)

        vis.poll_events()
        vis.update_renderer()
        vis.capture_screen_image(folder_path+"/Imgs/temp_%05d.jpg" % i)

        vis.clear_geometries()

vis.destroy_window()




[Open3D DEBUG] Precompute neighbors.
[Open3D DEBUG] Done Precompute neighbors.
[Open3D DEBUG] Compute Clusters
[Open3D DEBUG] Done Compute Clusters: 3
point cloud has 3 clusters
[Open3D DEBUG] Precompute neighbors.
[Open3D DEBUG] Done Precompute neighbors.
[Open3D DEBUG] Compute Clusters
[Open3D DEBUG] Done Compute Clusters: 3
point cloud has 3 clusters
[Open3D DEBUG] Precompute neighbors.
[Open3D DEBUG] Done Precompute neighbors.
[Open3D DEBUG] Compute Clusters
[Open3D DEBUG] Done Compute Clusters: 3
point cloud has 3 clusters
[Open3D DEBUG] Precompute neighbors.
[Open3D DEBUG] Done Precompute neighbors.
[Open3D DEBUG] Compute Clusters
[Open3D DEBUG] Done Compute Clusters: 3
point cloud has 3 clusters
[Open3D DEBUG] Precompute neighbors.
[Open3D DEBUG] Done Precompute neighbors.
[Open3D DEBUG] Compute Clusters
[Open3D DEBUG] Done Compute Clusters: 3
point cloud has 3 clusters
[Open3D DEBUG] Precompute neighbors.
[Open3D DEBUG] Done Precompute neighbors.
[Open3D DEBUG] Compute Clusters

### Creat GIF

In [21]:
import imageio

#### Log data to files

In [22]:
import pickle

# Specify the file path where you want to save the log
file_path = folder_path+"/log_file.pkl"

# Save the log to a file
with open(file_path, 'wb') as file:
    pickle.dump(log, file)

print(f"Log has been saved to {file_path}")

Log has been saved to /Users/quany/Downloads/GPDF_MPC_ADAS/log_file.pkl


In [None]:
# # Load the log from the file
# with open(file_path, 'rb') as file:
#     log = pickle.load(file)

# print("Log has been loaded from the file")

In [23]:
# Extract arrays where the second element is 3
log_obs3 = []
for log_single in log:
    filtered_log = [entry for entry in log_single if entry[1] == 3]
    log_obs3.append(filtered_log)

# Specify the file path where you want to save the log
file_path = folder_path+"/log_obs3_file.pkl"

# Save the log to a file
with open(file_path, 'wb') as file:
    pickle.dump(log_obs3, file)

print(f"Log has been saved to {file_path}")


Log has been saved to /Users/quany/Downloads/GPDF_MPC_ADAS/log_obs3_file.pkl
