#### Imports

In [None]:
import cv2
import numpy as np
import open3d as o3d
import os
import matplotlib.pyplot as plt
from scipy.spatial import Delaunay
from scipy.ndimage import distance_transform_edt
import json
import shutil
import argparse
import sys

from classes.CARLASemantics import SemanticColors, SemanticTags

#### Command-line arguments

In [None]:
# Parse the arguments if running from command line or 
# use default values if running from Jupyter Notebook
if 'ipykernel_launcher.py' in sys.argv[0]: 
    args = argparse.Namespace(
        ego_vehicle_extrinsics='/home/leppsalu/Desktop/Github/CARLA-vehicle-simulation/src/config/carla_extrinsics.urdf',
        ego_vehicle_intrinsics='/home/leppsalu/Desktop/Github/CARLA-vehicle-simulation/src/config/carla_intrinsics.json',
        input_dir='/media/leppsalu/SSD_Storage/generated_data_town02',
        output_dir='/media/leppsalu/SSD_Storage/processed_data_town02',
        n_frames_per_bag=1800
    )
else:
    # Create the parser
    parser = argparse.ArgumentParser(description='Run simulation postprocessing.')
    # Add arguments
    parser.add_argument('--ego_vehicle_extrinsics', type=str, required=False, default='/home/leppsalu/Desktop/Github/CARLA-vehicle-simulation/src/config/carla_extrinsics.urdf',
                        help='Path to the ego vehicle extrinsics file')
    parser.add_argument('--ego_vehicle_intrinsics', type=str, required=False, default='/home/leppsalu/Desktop/Github/CARLA-vehicle-simulation/src/config/carla_intrinsics.json',
                        help='Path to the ego vehicle intrinsics file')
    parser.add_argument('--input_dir', type=str, required=False, default='"/home/leppsalu/Desktop/Github/CARLA-vehicle-simulation/src/generated_data"',
                        help='Path to the input directory')
    parser.add_argument('--output_dir', type=str, required=False, default='"/home/leppsalu/Desktop/Github/CARLA-vehicle-simulation/src/processed_data"',
                        help='Path to the output directory')
    parser.add_argument('--n_frames_per_bag', type=int, required=False, default=1800)
    # Parse the arguments
    args = parser.parse_args()

SOURCE_DIR = args.input_dir
TARGET_DIR = args.output_dir 
EXTRINSICS_FILEPATH = args.ego_vehicle_extrinsics
INTRINSICS_FILEPATH = args.ego_vehicle_intrinsics
N_FRAMES_PER_BAG = args.n_frames_per_bag

#### Directories

In [None]:
LIDAR_DIR = "LIDAR_TOP"
CAM_DIRS = ["CAM_FRONT", "CAM_FRONT_LEFT", "CAM_FRONT_RIGHT", "CAM_BACK", "CAM_BACK_LEFT", "CAM_BACK_RIGHT"]
SEMANTIC_CAM_DIRS =  ["SEMANTIC_CAM_FRONT", "SEMANTIC_CAM_FRONT_LEFT", "SEMANTIC_CAM_FRONT_RIGHT", "SEMANTIC_CAM_BACK", "SEMANTIC_CAM_BACK_LEFT", "SEMANTIC_CAM_BACK_RIGHT"]
DEPTH_CAM_DIRS = ["DEPTH_CAM_FRONT", "DEPTH_CAM_FRONT_LEFT", "DEPTH_CAM_FRONT_RIGHT", "DEPTH_CAM_BACK", "DEPTH_CAM_BACK_LEFT", "DEPTH_CAM_BACK_RIGHT"]
DEPTH_BEV_DIR = "DEPTH_BEV"
DEPTH_VISIBILITY_DIR = "DEPTH_VISIBILITY"

#### Intrinsics file (list of sensor configurations)

In [None]:
with open(INTRINSICS_FILEPATH, "r") as INTRINSICS_FILE:
    INTRINSICS = json.load(INTRINSICS_FILE)

INTRINSICS_MATRICES = dict()
for SENSOR_NAME in INTRINSICS.keys():
    if SENSOR_NAME in CAM_DIRS:
        CAMERA_INTRINSICS = INTRINSICS[SENSOR_NAME]
        fx = CAMERA_INTRINSICS.get('fx', CAMERA_INTRINSICS.get('fl'))
        fy = CAMERA_INTRINSICS.get('fy', CAMERA_INTRINSICS.get('fl'))
        w, h = CAMERA_INTRINSICS.get('w'), CAMERA_INTRINSICS.get('h')
        ppx = w / 2
        ppy = h / 2
        d_type = CAMERA_INTRINSICS['disto_type']
        D = np.array(CAMERA_INTRINSICS['disto'])

        INTRINSICS_MATRICES[SENSOR_NAME] = np.array([[fx, 0, ppx, 0],
                                                     [0, fy, ppy, 0],
                                                     [0, 0, 1, 0]], dtype=int)

#### Post processing constants

In [None]:
GRID_SIZE = 52
GRID_RESOLUTION = 0.5
MAX_SENSOR_SCAN_DISTANCE = 52 # meters

#### Filesystem methods

In [None]:
def remove_files_in_dir(data_dir, string_to_find):
    for root, dirs, files in os.walk(data_dir):
        for file in files:
            if string_to_find in file:
                file_path = os.path.join(root, file)
                try:
                    os.remove(file_path)
                    # print(f"Removed: {file_path}")
                except Exception as e:
                    print(f"Error removing {file_path}: {e}")

def get_all_filenames(dir, no_extension=False):
    if no_extension:
        return [filename.split(".")[0] for filename in os.listdir(dir)]
    return [filename for filename in os.listdir(dir)]

def clean_up_lidar_dir():
    LIDAR_DIR_PATH = os.path.join(SOURCE_DIR, LIDAR_DIR)
    remove_files_in_dir(LIDAR_DIR_PATH, ".bev.")

def clean_up_camera_dirs():
    for CAM_DIR in CAM_DIRS:
        CAM_DIR_PATH = os.path.join(SOURCE_DIR, CAM_DIR)
        remove_files_in_dir(CAM_DIR_PATH, ".pointcloud.")
        remove_files_in_dir(CAM_DIR_PATH, ".visibility.")
        remove_files_in_dir(CAM_DIR_PATH, ".fov.")

def clean_up_depth_camera_dirs():
    for DEPTH_CAM_DIR in DEPTH_CAM_DIRS:
        DEPTH_CAM_DIR_PATH = os.path.join(SOURCE_DIR, DEPTH_CAM_DIR)
        remove_files_in_dir(DEPTH_CAM_DIR_PATH, ".ply")
        remove_files_in_dir(DEPTH_CAM_DIR_PATH, ".fov.")
        remove_files_in_dir(DEPTH_CAM_DIR_PATH, ".visibility.")

def clean_up_depth_bev_dir():
    DEPTH_BEV_DIR_PATH = os.path.join(SOURCE_DIR, DEPTH_BEV_DIR)
    remove_files_in_dir(DEPTH_BEV_DIR_PATH, ".")

def clean_up_depth_visibility_dir():
    DEPTH_VISIBILITY_DIR_PATH = os.path.join(SOURCE_DIR, DEPTH_VISIBILITY_DIR)
    remove_files_in_dir(DEPTH_VISIBILITY_DIR_PATH, ".")
        
def save_point_cloud(file_path, point_cloud):
    directory = os.path.dirname(file_path)
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)
    if type(point_cloud) is o3d.geometry.PointCloud:
        o3d.io.write_point_cloud(file_path, point_cloud)
        return
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(point_cloud)
    o3d.io.write_point_cloud(file_path, pcd)

def read_point_cloud(file_path):
    point_cloud = o3d.io.read_point_cloud(file_path)
    return np.asarray(point_cloud.points)

def save_image(file_path, mask):
    directory = os.path.dirname(file_path)
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)
    cv2.imwrite(file_path, mask)

### Ground Truth from depth camera point clouds

#### Depth image parsing methods

In [None]:
def calculate_depth_map_from_image(image):
    B = image[:, :, 0]
    G = image[:, :, 1]
    R = image[:, :, 2]
    normalized = (G + B * 256 + R * 256 * 256) / (256 * 256 - 1)
    depth_map = normalized * 1000
    return depth_map

def depth_image_to_point_cloud(depth_image, fov, max_distance=None):
    
    depth = calculate_depth_map_from_image(depth_image)
    if max_distance != None:
        depth[depth > max_distance] = 0.0 
    height, width = depth_image.shape[:2]

    # Create an intrinsic matrix from the camera parameters
    fx = fy = 0.5 * width / np.tan(0.5 * np.radians(fov))
    cx = width / 2.0
    cy = height / 2.0

    intrinsic = o3d.camera.PinholeCameraIntrinsic(width, height, fx, fy, cx, cy)

    depth_o3d = o3d.geometry.Image(depth.astype(np.float32))
    
    pcd = o3d.geometry.PointCloud.create_from_depth_image(depth_o3d, intrinsic)
    points = np.asarray(pcd.points)
    points[:, 0] = -points[:, 0]
    pcd.points = o3d.utility.Vector3dVector(points)
    return pcd

def create_color_mask(image, colors, inverted=False):
    mask = np.full((image.shape[0], image.shape[1]), 0, dtype=np.uint8)
    
    B, G, R = image[:, :, 0], image[:, :, 1], image[:, :, 2]
    # Iterate through the list of colors
    for color in colors:
        # Extract color channels
        r, g, b = color
        # Create boolean masks for each channel comparison
        r_mask = R == r
        g_mask = G == g
        b_mask = B == b
        # Combine channel masks to get the final color mask
        color_mask = r_mask & g_mask & b_mask
        # Update the overall mask where any color matches
        mask[color_mask] = 255

    if inverted:
        mask = np.where(mask == 0, 255, 0).astype(np.uint8)
        return mask
    return mask

def mask_image(image, image_mask):
    masked = np.copy(image)
    masked[image_mask == 0] = 0
    return masked

#### Methods for creating FOV masks, visibility masks and BEV maps from depth images

In [None]:
def generate_mask(grid, camera_matrix, T, image_size):
    homogeneous_grid = np.vstack([grid[i].flatten() for i in range(3)] + [np.ones(grid[0].size)])
    tfd_points = np.dot(T, homogeneous_grid)
    tfd_points = np.dot(camera_matrix, tfd_points)
    mask_z = tfd_points[2] > 0 # Exclude points behind the camera
    tfd_points[2][tfd_points[2] == 0] = np.nan  # Replace zeros with NaN to avoid division by zero
    projected_points = tfd_points / tfd_points[2]
    mask = ((0 <= projected_points[0]) & (projected_points[0] < image_size[0]) &
            (0 <= projected_points[1]) & (projected_points[1] < image_size[1]) &
            mask_z)
    return mask.reshape(grid[0].shape)

def get_camera_fov_masks(camera_calibs, lidar_to_cam_tf_list=[], grid_size_m=50, resolution=0.5):
    whole_mask = np.zeros((int(grid_size_m / resolution), int(grid_size_m / resolution)))
    x, y = np.meshgrid(np.arange(whole_mask.shape[1]), np.arange(whole_mask.shape[0]))
    x = x - whole_mask.shape[1] / 2
    y = y - whole_mask.shape[0] / 2
    z = np.zeros_like(x)
    bev_grid = np.array([x, y, z])
    bev_grid = np.expand_dims(bev_grid, axis=-1)

    cam_masks = {}

    for i, (camera, calib) in enumerate(camera_calibs.items()):
        intrinsic = calib['K']

        w, h = calib['w'], calib['h']

        if lidar_to_cam_tf_list:
            cam_T_lidar = np.linalg.inv(lidar_to_cam_tf_list[i])
        else:
            cam_T_lidar = np.linalg.inv(calib['T'])

        mask = generate_mask(bev_grid, camera_matrix=intrinsic, T=cam_T_lidar, image_size=(w, h))
        visible_bev = np.array([dim[mask] for dim in bev_grid])

        mask_ref = visible_bev[:2]
        cam_mask = np.zeros_like(mask)

        mask_ref[1] += mask.shape[1] / 2
        mask_ref[0] += mask.shape[0] / 2
        mask_ref = np.round(mask_ref).astype(int)

        cam_mask[mask_ref[1], mask_ref[0]] = 1
        cam_mask = cam_mask.squeeze(-1)

        cam_masks[camera] = cam_mask
    return cam_masks

def get_fov_mask(image, transformation_matrix, camera_intrinsics_matrix):
    # Example camera calibration data for two cameras
    camera_calibs = {
        'camera': {
            'K': camera_intrinsics_matrix,
            'w': image.shape[1],  # Image width
            'h': image.shape[0],  # Image height
            'T': transformation_matrix
        }
    }
    # Generate the camera FOV masks
    cam_masks = get_camera_fov_masks(camera_calibs, grid_size_m=GRID_SIZE, resolution=GRID_RESOLUTION)
    fov_mask = np.asarray(cam_masks["camera"], dtype=np.uint8) * 255
    
    return fov_mask

def calculate_fov(focal_length, image_width):
    fov_radians = 2 * np.arctan(image_width / (2 * focal_length))
    fov_degrees = np.degrees(fov_radians)
    return fov_degrees

def rasterize_to_bev(points, resolution=0.5, grid_size=25):
    bev_map = np.zeros((int(grid_size / resolution), int(grid_size / resolution)))
    # Converting to grid coordinates
    grid_coords = np.floor(points[:, :2] / resolution).astype(np.int32) + int(grid_size // (2 * resolution))
    
    # Ensure that grid coordinates are within the bounds of the BEV map
    valid_points = (grid_coords[:, 0] >= 0) & (grid_coords[:, 0] < bev_map.shape[0]) & \
                   (grid_coords[:, 1] >= 0) & (grid_coords[:, 1] < bev_map.shape[1])
    # Populate the BEV map with occupancy
    bev_map[grid_coords[valid_points, 0], grid_coords[valid_points, 1]] = 255
    return bev_map

#### Create point clouds, FOV masks, visibility masks and occupancy maps from depth images

In [None]:
def get_all_filenames(dir, no_extension=False):
    if no_extension:
        return [filename.split(".")[0] for filename in os.listdir(dir)]
    return [filename for filename in os.listdir(dir)]

def get_obstacle_point_cloud(depth_image, semantic_image, fov):
    obstacles_mask = create_color_mask(
        semantic_image, 
        colors=[
            SemanticColors.ROADLINE.value, SemanticColors.ROAD.value, SemanticColors.SIDEWALK.value,
            SemanticColors.GROUND.value, SemanticColors.WATER.value, SemanticColors.TERRAIN.value,
            SemanticColors.SKY.value
        ],
        inverted=True
    )
    obstacles_depth_image = mask_image(depth_image, obstacles_mask)
    obstacles_point_cloud = depth_image_to_point_cloud(obstacles_depth_image, fov=fov, max_distance=MAX_SENSOR_SCAN_DISTANCE)
    return obstacles_point_cloud

def get_ground_point_cloud(depth_image, semantic_image, fov):
    ground_mask = create_color_mask(semantic_image, colors=[
        SemanticColors.ROADLINE.value, SemanticColors.ROAD.value, SemanticColors.SIDEWALK.value,
        SemanticColors.GROUND.value, SemanticColors.WATER.value, SemanticColors.TERRAIN.value
    ])
    ground_depth_image = mask_image(depth_image, ground_mask)
    ground_point_cloud = depth_image_to_point_cloud(ground_depth_image, fov=fov, max_distance=MAX_SENSOR_SCAN_DISTANCE)
    return ground_point_cloud

def get_corrected_obstacle_point_cloud(obstacles_point_cloud, ground_point_cloud):
    def gen_mesh(pcd): 
        try:
            points = np.asarray(pcd.points)
        except:
            points = pcd
        tri = Delaunay(points[:, :2])  # We only use the X and Y coordinates
        mesh = o3d.geometry.TriangleMesh()
        mesh.vertices = o3d.utility.Vector3dVector(points)
        mesh.triangles = o3d.utility.Vector3iVector(tri.simplices)
        return mesh
    
    def mesh_to_cloud_signed_distances(o3d_mesh: o3d.t.geometry.TriangleMesh, cloud: o3d.t.geometry.PointCloud) -> np.ndarray:
        scene = o3d.t.geometry.RaycastingScene()
        _ = scene.add_triangles(o3d_mesh)
        sdf = scene.compute_signed_distance(cloud.point.positions)
        return sdf.numpy()

    def filter_points_far_from_mesh(pcd, distances, t1, t2):
        indices1 = np.where((distances > t1) & (distances <= t2))[0]
        indices2 = np.where(distances < t1)[0]
        objects = pcd.select_by_index(indices1)
        ground = pcd.select_by_index(indices2)
        return objects, ground

    def remove_points_far_from_mesh(pcd, mesh, height_range=(0.4, 2)):
        mesh_t = o3d.t.geometry.TriangleMesh.from_legacy(mesh)
        tpcd = o3d.t.geometry.PointCloud.from_legacy(pcd)
        sdf = mesh_to_cloud_signed_distances(mesh_t, tpcd)
        sdf = np.abs(sdf)
        obstacles, ground = filter_points_far_from_mesh(pcd, sdf, *height_range)
        return obstacles, ground
    try:
        ground_mesh = gen_mesh(ground_point_cloud)
        obstacles_point_cloud, ground_point_cloud = remove_points_far_from_mesh(obstacles_point_cloud, ground_mesh, height_range=(0.2, 3))
    except:
        pass
    return obstacles_point_cloud

clean_up_depth_camera_dirs()
clean_up_depth_bev_dir()
clean_up_depth_visibility_dir()

all_timestamps = get_all_filenames(os.path.join(SOURCE_DIR, DEPTH_CAM_DIRS[0]), no_extension=True)
unique_timestamps = set(all_timestamps)

print(f"Post-processing simulation data ...")
for i, timestamp in enumerate(unique_timestamps):
    cumulative_obstacle_point_cloud = o3d.geometry.PointCloud()
    cumulative_ground_point_cloud = o3d.geometry.PointCloud()
    
    for (SEM_DIR, DEPTH_DIR, CAM_DIR) in zip(SEMANTIC_CAM_DIRS, DEPTH_CAM_DIRS, CAM_DIRS):
        depth_camera_transform_path = os.path.join(SOURCE_DIR, DEPTH_DIR, f"{timestamp}.npy")
        depth_camera_transform = np.load(depth_camera_transform_path)
        
        depth_image_path = os.path.join(SOURCE_DIR, DEPTH_DIR, f"{timestamp}.png")
        depth_image = cv2.imread(depth_image_path)
        
        image_width = float(INTRINSICS.get(CAM_DIR).get("w"))
        focal_length = float(INTRINSICS.get(CAM_DIR).get("fl"))
        depth_camera_fov = calculate_fov(focal_length, image_width)
        
        semantic_image_path = os.path.join(SOURCE_DIR, SEM_DIR, f"{timestamp}.png")
        semantic_image = cv2.imread(semantic_image_path)
        
        obstacle_point_cloud = get_obstacle_point_cloud(depth_image, semantic_image, depth_camera_fov)
        obstacle_point_cloud.transform(depth_camera_transform)
        cumulative_obstacle_point_cloud.points.extend(obstacle_point_cloud.points)
        
        ground_point_cloud = get_ground_point_cloud(depth_image, semantic_image, depth_camera_fov)
        ground_point_cloud.transform(depth_camera_transform)
        cumulative_ground_point_cloud.points.extend(ground_point_cloud.points)

    cumulative_ground_point_cloud = cumulative_ground_point_cloud.voxel_down_sample(0.25) 
    cumulative_obstacle_point_cloud = get_corrected_obstacle_point_cloud(cumulative_obstacle_point_cloud, cumulative_ground_point_cloud)
    
    lidar_transform = np.load(os.path.join(SOURCE_DIR, LIDAR_DIR, f"{timestamp}.npy"))
    cumulative_obstacle_point_cloud.transform(np.linalg.inv(lidar_transform))
    
    obstacles_point_cloud_file_path = os.path.join(SOURCE_DIR, DEPTH_BEV_DIR, f"{timestamp}.ply")
    save_point_cloud(obstacles_point_cloud_file_path, cumulative_obstacle_point_cloud)
    # print(f"Added: {obstacles_point_cloud_file_path}")
    
    cumulative_ground_point_cloud.transform(np.linalg.inv(lidar_transform))
    ground_point_cloud_file_path = os.path.join(SOURCE_DIR, DEPTH_BEV_DIR, f"{timestamp}.ground.ply")
    save_point_cloud(ground_point_cloud_file_path, cumulative_ground_point_cloud)
    # print(f"Added: {ground_point_cloud_file_path}")

    occupancy_image = rasterize_to_bev(np.asarray(cumulative_obstacle_point_cloud.points), resolution=GRID_RESOLUTION, grid_size=GRID_SIZE)
    occupancy_image_path = os.path.join(SOURCE_DIR, DEPTH_BEV_DIR, f"{timestamp}.bev.png")
    save_image(occupancy_image_path, occupancy_image)
    # print(f"Added: {occupancy_image_path}")

    def get_signed_distance_field_from_occupancy_image(occupancy_image):
        img = np.array(occupancy_image)
        inv_arr = (255 - np.array(img))
        sdf = distance_transform_edt(inv_arr).astype(np.float32)
        sdf += 1
        sdf = 1 / sdf 
        return sdf
    
    def get_sdf_as_image(sdf_array):
        sdf_array = (sdf_array / sdf_array.max() * 255).astype(np.uint8)
        sdf_image = cv2.applyColorMap(sdf_array, cv2.COLORMAP_JET)
        return sdf_image

    sdf = get_signed_distance_field_from_occupancy_image(occupancy_image)
    sdf_path = os.path.join(SOURCE_DIR, DEPTH_BEV_DIR, f"{timestamp}.sdf.npy")
    np.save(sdf_path, sdf)
    # print(f"Added: {sdf_path}")
    sdf_image = get_sdf_as_image(sdf)
    sdf_image_path = os.path.join(SOURCE_DIR, DEPTH_BEV_DIR, f"{timestamp}.sdf.png")
    save_image(sdf_image_path, sdf_image)
    # print(f"Added: {sdf_image_path}")


    transform_file_path = os.path.join(SOURCE_DIR, DEPTH_BEV_DIR, f"{timestamp}.npy")
    np.save(transform_file_path, lidar_transform)
    # print(f"Added: {transform_file_path}")

    visible_point_cloud = cumulative_ground_point_cloud + cumulative_obstacle_point_cloud
    cumulative_visibility_mask = rasterize_to_bev(np.asarray(visible_point_cloud.points), resolution=GRID_RESOLUTION, grid_size=GRID_SIZE).T
    cumulative_visibility_mask_path = os.path.join(SOURCE_DIR, DEPTH_VISIBILITY_DIR, f"{timestamp}.png")
    save_image(cumulative_visibility_mask_path, cumulative_visibility_mask)
    # print(f"Added: {cumulative_visibility_mask_path}")

    transform_file_path = os.path.join(SOURCE_DIR, DEPTH_VISIBILITY_DIR, f"{timestamp}.npy")
    np.save(transform_file_path, lidar_transform)
    # print(f"Added: {transform_file_path}")

    for CAM_DIR, DEPTH_DIR in zip(CAM_DIRS, DEPTH_CAM_DIRS):
        depth_image_path = os.path.join(SOURCE_DIR, DEPTH_DIR, f"{timestamp}.png")
        depth_image = cv2.imread(depth_image_path)
        
        depth_camera_transform_path = os.path.join(SOURCE_DIR, DEPTH_DIR, f"{timestamp}.npy")
        depth_camera_transform = np.load(depth_camera_transform_path)
        
        lidar_transform_path = os.path.join(SOURCE_DIR, LIDAR_DIR, f"{timestamp}.npy")
        lidar_transform = np.load(lidar_transform_path)
        lidar_transform_inv = np.linalg.inv(lidar_transform)
        combined_transform = np.dot(lidar_transform_inv, depth_camera_transform)
        
        camera_fov_mask = get_fov_mask(depth_image, combined_transform, INTRINSICS_MATRICES[CAM_DIR])
        fov_mask_path = os.path.join(SOURCE_DIR, DEPTH_DIR, f"{timestamp}.fov.png")
        save_image(fov_mask_path, camera_fov_mask)
        # print(f"Added: {fov_mask_path}")

        camera_visibility_mask = np.array((cumulative_visibility_mask > 0) & (camera_fov_mask > 0), dtype=np.uint8) * 255
        visibility_mask_path = os.path.join(SOURCE_DIR, DEPTH_DIR, f"{timestamp}.visibility.png")
        save_image(visibility_mask_path, camera_visibility_mask)
        # print(f"Added: {visibility_mask_path}")

    def get_lidar_obstacle_point_cloud(semantic_point_cloud):
        # Extract colors from the point cloud
        semantic_colors = np.asarray(semantic_point_cloud.colors)
        r_channel = semantic_colors[:, 0] * 255
        mask = np.isin(
            r_channel, 
            [ 
                SemanticTags.ROADLINE.value, SemanticTags.ROAD.value, SemanticTags.SIDEWALK.value,
                SemanticTags.GROUND.value, SemanticTags.WATER.value, SemanticTags.TERRAIN.value, 
                SemanticTags.SKY.value
            ],
            invert=True
        )
        # Filter the points and colors based on the mask
        filtered_points = np.asarray(semantic_point_cloud.points)[mask]
        filtered_colors = semantic_colors[mask]
        # Create a new point cloud with the filtered points and colors
        filtered_point_cloud = o3d.geometry.PointCloud()
        filtered_point_cloud.points = o3d.utility.Vector3dVector(filtered_points)
        filtered_point_cloud.colors = o3d.utility.Vector3dVector(filtered_colors)
        return filtered_point_cloud
    
    def get_lidar_ground_point_cloud(semantic_point_cloud):
        # Extract colors from the point cloud
        semantic_colors = np.asarray(semantic_point_cloud.colors)
        r_channel = semantic_colors[:, 0] * 255
        mask = np.isin(
            r_channel, 
            [
                SemanticTags.ROADLINE.value, SemanticTags.ROAD.value, SemanticTags.SIDEWALK.value,
                SemanticTags.GROUND.value, SemanticTags.WATER.value, SemanticTags.TERRAIN.value
            ]
        )
        # Filter the points and colors based on the mask
        filtered_points = np.asarray(semantic_point_cloud.points)[mask]
        filtered_colors = semantic_colors[mask]
        # Create a new point cloud with the filtered points and colors
        filtered_point_cloud = o3d.geometry.PointCloud()
        filtered_point_cloud.points = o3d.utility.Vector3dVector(filtered_points)
        filtered_point_cloud.colors = o3d.utility.Vector3dVector(filtered_colors)
        return filtered_point_cloud
    
    
    lidar_point_cloud_path = os.path.join(SOURCE_DIR, LIDAR_DIR, f"{timestamp}.ply")
    lidar_point_cloud = o3d.io.read_point_cloud(lidar_point_cloud_path)
    lidar_ground_point_cloud = get_lidar_ground_point_cloud(lidar_point_cloud)
    lidar_ground_point_cloud_path = os.path.join(SOURCE_DIR, LIDAR_DIR, f"{timestamp}.ground.ply")
    save_point_cloud(lidar_ground_point_cloud_path, lidar_ground_point_cloud)
    # print(f"Added: {lidar_ground_point_cloud_path}")
    
    lidar_obstacle_point_cloud = get_lidar_obstacle_point_cloud(lidar_point_cloud)
    lidar_obstacle_point_cloud = get_corrected_obstacle_point_cloud(lidar_obstacle_point_cloud, lidar_ground_point_cloud)
    lidar_obstacle_point_cloud_path = os.path.join(SOURCE_DIR, LIDAR_DIR, f"{timestamp}.obstacles.ply")

    lidar_bev_image = rasterize_to_bev(np.asarray(lidar_obstacle_point_cloud.points), resolution=GRID_RESOLUTION, grid_size=GRID_SIZE)
    lidar_bev_image_path = os.path.join(SOURCE_DIR, LIDAR_DIR, f"{timestamp}.bev.png")
    save_image(lidar_bev_image_path, lidar_bev_image)
    # print(f"Added: {lidar_bev_image_path}")

    if i % 10 == 0:
        print(f"Processed {((i+1) / len(unique_timestamps) * 100):.6f}% of frames in the dataset")
print(f"Processed {((i+1) / len(unique_timestamps) * 100):.6f}% of frames in the dataset")
print(f"Processed data in {SOURCE_DIR}")

### Export processed files to target directory (in suitable directory tree format for machine learning pipeline) 

In [None]:
TARGET_CAM_DIRS = ["CAM_FRONT", "CAM_FRONT_LEFT", "CAM_FRONT_RIGHT", "CAM_BACK", "CAM_BACK_LEFT", "CAM_BACK_RIGHT"]
TARGET_LIDAR_DIR = "LIDAR_TOP"
TARGET_FOV_MASKS_DIR = "fov_masks"
TARGET_BEVS_DIR = "bevs"
TARGET_SDFS_DIR = "sdfs"
TARGET_VISIBILITY_MASKS_DIR = "visibility_masks"
TARGET_CUMULATIVE_MASKS_DIR = "cumulative_masks"

In [None]:
def clean_up_target_dir():
    if not os.path.exists(TARGET_DIR):
        return
    for subdir in os.listdir(TARGET_DIR):
        shutil.rmtree(os.path.join(TARGET_DIR, subdir))

In [None]:
def copy_file_to_target_dir(source_file_path, target_file_path):
    target_directory_path = os.path.dirname(target_file_path)
    os.makedirs(target_directory_path, exist_ok=True)
    shutil.copyfile(source_file_path, target_file_path)

clean_up_target_dir()

all_timestamps = get_all_filenames(os.path.join(SOURCE_DIR, DEPTH_CAM_DIRS[0]), no_extension=True)
unique_timestamps = set(all_timestamps)
sorted_timestamps = sorted(unique_timestamps, key=int)

print(f"Exporting post-processed data...")
for i, timestamp in enumerate(sorted_timestamps):
    if (i % N_FRAMES_PER_BAG) == 0:
        start_timestamp = sorted_timestamps[i]
        end_timestamp = sorted_timestamps[-1] if (i+N_FRAMES_PER_BAG-1) >= len(sorted_timestamps) else sorted_timestamps[i+N_FRAMES_PER_BAG-1]
        TARGET_DIRNAME = os.path.basename(TARGET_DIR)
        TARGET_BAG_DIR = f"{TARGET_DIRNAME}_frames_{start_timestamp}_{end_timestamp}"
    
    for CAM_DIR in CAM_DIRS:
        source_file_path = os.path.join(SOURCE_DIR, CAM_DIR, f"{timestamp}.png")
        target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, CAM_DIR, f"{timestamp}.png")
        copy_file_to_target_dir(source_file_path, target_file_path)
        source_file_path = os.path.join(SOURCE_DIR, CAM_DIR, f"{timestamp}.npy")
        target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, CAM_DIR, f"{timestamp}.npy")
        copy_file_to_target_dir(source_file_path, target_file_path)

    source_file_path = os.path.join(SOURCE_DIR, LIDAR_DIR, f"{timestamp}.ply")
    target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, LIDAR_DIR, f"{timestamp}.ply")
    copy_file_to_target_dir(source_file_path, target_file_path)
    source_file_path = os.path.join(SOURCE_DIR, LIDAR_DIR, f"{timestamp}.npy")
    target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, LIDAR_DIR, f"{timestamp}.npy")
    copy_file_to_target_dir(source_file_path, target_file_path)

    for CAM_DIR, DEPTH_CAM_DIR in zip(CAM_DIRS, DEPTH_CAM_DIRS):
        source_file_path = os.path.join(SOURCE_DIR, DEPTH_CAM_DIR, f"{timestamp}.fov.png")
        target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, TARGET_FOV_MASKS_DIR, CAM_DIR, f"{timestamp}.png")
        copy_file_to_target_dir(source_file_path, target_file_path)
        source_file_path = os.path.join(SOURCE_DIR, DEPTH_CAM_DIR, f"{timestamp}.npy")
        target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, TARGET_FOV_MASKS_DIR, CAM_DIR, f"{timestamp}.npy")
        copy_file_to_target_dir(source_file_path, target_file_path)

        source_file_path = os.path.join(SOURCE_DIR, DEPTH_CAM_DIR, f"{timestamp}.visibility.png")
        target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, TARGET_VISIBILITY_MASKS_DIR, CAM_DIR, f"{timestamp}.png")
        copy_file_to_target_dir(source_file_path, target_file_path)
        source_file_path = os.path.join(SOURCE_DIR, DEPTH_CAM_DIR, f"{timestamp}.npy")
        target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, TARGET_VISIBILITY_MASKS_DIR, CAM_DIR, f"{timestamp}.npy")
        copy_file_to_target_dir(source_file_path, target_file_path)

    source_file_path = os.path.join(SOURCE_DIR, DEPTH_BEV_DIR, f"{timestamp}.bev.png")
    target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, TARGET_BEVS_DIR, f"{timestamp}.png")
    copy_file_to_target_dir(source_file_path, target_file_path)

    source_file_path = os.path.join(SOURCE_DIR, DEPTH_BEV_DIR, f"{timestamp}.sdf.npy")
    target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, TARGET_SDFS_DIR, f"{timestamp}.npy")
    copy_file_to_target_dir(source_file_path, target_file_path)
    source_file_path = os.path.join(SOURCE_DIR, DEPTH_BEV_DIR, f"{timestamp}.sdf.png")
    target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, TARGET_SDFS_DIR, f"{timestamp}.png")
    copy_file_to_target_dir(source_file_path, target_file_path)

    source_file_path = os.path.join(SOURCE_DIR, DEPTH_VISIBILITY_DIR, f"{timestamp}.png")
    target_file_path = os.path.join(TARGET_DIR, TARGET_BAG_DIR, TARGET_VISIBILITY_MASKS_DIR, TARGET_CUMULATIVE_MASKS_DIR, f"{timestamp}.png")
    copy_file_to_target_dir(source_file_path, target_file_path)

    if i % 10 == 0:
        print(f"Exported {((i+1) / len(unique_timestamps) * 100):.6f}% of frames in the dataset")

print(f"Exported {((i+1) / len(unique_timestamps) * 100):.6f}% of frames in the dataset")
print(f"Exported post-processed data to {TARGET_DIR}")

In [None]:
print(f"Post-processing completed!")