# Base Parser

# Brief Explanation

The following notebook provides a converter for setting up a lidar scene for the Dataloop Platform (In this example for: [Pandaset](https://www.kaggle.com/datasets/usharengaraju/pandaset-dataset)), and it can be extended to support Custom Lidar Scenes by updating the following available `Customizable` methods as needed.

`Notice:` The converter assumes that the remove dataset have the Pandaset `.ply` files converted to either `.pcd` files (for the point cloud files) or  `.csv` files for the annotations.

## Imports (Customizable)

The required libraries to to parse the Lidar Scene Data. Add more libraries as needed for custom scenes.

In [None]:
from dtlpylidar.parser_base import extrinsic_calibrations
from dtlpylidar.parser_base import images_and_pcds, camera_calibrations, lidar_frame, lidar_scene
import dtlpylidar.utilities.transformations as transformations
import dtlpy as dl
import pandas as pd
import numpy as np
import os
import json
import logging
import shutil
import pathlib

logger = logging.Logger(name="lidar_base_parser")

## Get Dataloop Dataset and Remote Path (On the Dataloop Dataset)

Set the target Dataloop Dataset to build the lidar scene for, with the path to the folder where the lidar scene files are located in the target Dataloop Dataset.

In [None]:
dataset_id = ""
dataset = dl.datasets.get(dataset_id=dataset_id)
remote_path: str = "/"

## Download Data (Customizable)

A method to define what required binaries and JSON annotations to be downloaded for the next parse functions.

In [None]:
def download_data(dataset: dl.Dataset, remote_path: str, download_path) -> tuple:
    """
    Download the required data for the parser
    :param dataset: Input dataset
    :param remote_path: Path to the remote folder where the Lidar data is uploaded
    :param download_path: Path to the downloaded data
    :return: (items_path, json_path) Paths to the downloaded items and annotations JSON files directories
    """
    # Download items dataloop annotation JSONs
    # (PCD and Image annotation JSONs contains the Dataloop platform references (Like: ID) to the remote files)
    filters = dl.Filters(field="metadata.system.mimetype", values="*pcd*", method=dl.FiltersMethod.OR)
    filters.add(field="metadata.system.mimetype", values="*image*", method=dl.FiltersMethod.OR)
    dataset.download_annotations(local_path=download_path, filters=filters)

    # Download required binaries (Calibrations Data)
    # Pandaset Calibration Data is saved in JSON files (Like: poses.json, intrinsics.json, timestamps.json)
    filters = dl.Filters(field="metadata.system.mimetype", values="*json*")
    dataset.items.download(local_path=download_path, filters=filters)

    # Download required binaries (Annotations Data)
    # Pandaset Annotations Data is saved in CSV files (Like: 01.csv in cuboids folder)
    filters = dl.Filters(field="metadata.system.mimetype", values="*csv*")
    dataset.items.download(local_path=download_path, filters=filters)

    items_path = os.path.join(download_path, "items", remote_path)
    json_path = os.path.join(download_path, "json", remote_path)
    return items_path, json_path

In [None]:
if remote_path.startswith("/"):
    remote_path = remote_path[1:]

if remote_path.endswith("/"):
    remote_path = remote_path[:-1]

download_path = os.path.join(os.getcwd(), dataset.name)

items_path, json_path = download_data(
    dataset=dataset,
    remote_path=remote_path,
    download_path=download_path,
)
print(items_path + "\n" + json_path)

## Parse Lidar Data (Customizable)

A method to parse the Lidar Sensor data from the downloaded files 
(Extrinsic and Timestamps).

In [None]:
def parse_lidar_data(items_path, json_path) -> dict:
    """
    Parse the Lidar Calibration data to build all the scene LidarPcdData objects
    :param items_path: Paths to the downloaded items directory
    :param json_path: Paths to the downloaded annotations JSON files directory
    :return: lidar_data: Dictionary containing mapping of frame number to LidarPcdData object
    """
    lidar_data = dict()

    lidar_json_path = os.path.join(json_path, "lidar")
    lidar_items_path = os.path.join(items_path, "lidar")

    # Opening the poses.json file to get the Extrinsic (Translation and Rotation) of the Lidar Scene per frame
    poses_json = os.path.join(lidar_items_path, "poses.json")
    with open(poses_json, 'r') as f:
        poses_json_data: list = json.load(f)

    # Opening the poses.json file to get the Timestamps of the Lidar Scene per frame
    timestamps_json = os.path.join(lidar_items_path, "timestamps.json")
    with open(timestamps_json, 'r') as f:
        timestamps_json_data: list = json.load(f)

    # Get all the lidar JSONs sorted by frame number
    lidar_jsons = pathlib.Path(lidar_json_path).rglob('*.json')
    lidar_jsons = sorted(lidar_jsons, key=lambda x: int(x.stem))

    for lidar_frame_idx, lidar_json in enumerate(lidar_jsons):
        with open(lidar_json, 'r') as f:
            lidar_json_data = json.load(f)

        ground_map_id = lidar_json_data.get("metadata", dict()).get("user", dict()).get(
            "lidar_ground_detection", dict()).get("groundMapId", None)
        lidar_translation = extrinsic_calibrations.Translation(
            x=poses_json_data[lidar_frame_idx].get("position", dict()).get("x", 0),
            y=poses_json_data[lidar_frame_idx].get("position", dict()).get("y", 0),
            z=poses_json_data[lidar_frame_idx].get("position", dict()).get("z", 0),
        )
        lidar_rotation = extrinsic_calibrations.QuaternionRotation(
            x=poses_json_data[lidar_frame_idx].get("heading", dict()).get("x", 0),
            y=poses_json_data[lidar_frame_idx].get("heading", dict()).get("y", 0),
            z=poses_json_data[lidar_frame_idx].get("heading", dict()).get("z", 0),
            w=poses_json_data[lidar_frame_idx].get("heading", dict()).get("w", 1)
        )
        lidar_timestamp = str(timestamps_json_data[lidar_frame_idx])

        lidar_pcd_data = images_and_pcds.LidarPcdData(
            item_id=lidar_json_data.get("id"),
            ground_id=ground_map_id,
            remote_path=lidar_json_data.get("filename"),
            extrinsic=extrinsic_calibrations.Extrinsic(
                rotation=lidar_rotation,
                translation=lidar_translation
            ),
            timestamp=lidar_timestamp
        )
        lidar_data[lidar_frame_idx] = lidar_pcd_data

    return lidar_data

In [None]:
lidar_data = parse_lidar_data(items_path=items_path, json_path=json_path)
# print(lidar_data)

## Parse Cameras Data (Customizable)

A method to parse the Cameras data from all the available cameras downloaded files 
(Intrinsic, Extrinsic, Timestamps and Distortion).

In [None]:
def parse_cameras_data(items_path, json_path) -> dict:
    """
    Parse the Cameras Calibration data to build all the scene LidarCameraData and LidarImageData objects
    :param items_path: Paths to the downloaded items directory
    :param json_path: Paths to the downloaded annotations JSON files directory
    :return: lidar_data: Dictionary containing mapping of camera and frame number
    to LidarCameraData and LidarImageData objects
    """
    cameras_data = dict()

    camera_json_path = os.path.join(json_path, "camera")
    camera_items_path = os.path.join(items_path, "camera")

    # Get the list of all the available camera folders, and building the cameras data objects per camera per frame
    camera_folders_list = sorted(os.listdir(camera_json_path))
    for camera_folder_idx, camera_folder in enumerate(camera_folders_list):
        cameras_data[camera_folder] = dict()

        camera_folder_json_path = os.path.join(camera_json_path, camera_folder)
        camera_folder_items_path = os.path.join(camera_items_path, camera_folder)

        # Opening the intrinsics.json file to get the Intrinsics (fx, fy, cx, cy) of the Current Camera per frame
        intrinsics_json = os.path.join(camera_folder_items_path, "intrinsics.json")
        with open(intrinsics_json, 'r') as f:
            intrinsics_json_data: dict = json.load(f)

        # Opening the poses.json file to get the Extrinsic (Translation and Rotation) of the Current Camera per frame
        poses_json = os.path.join(camera_folder_items_path, "poses.json")
        with open(poses_json, 'r') as f:
            poses_json_data: list = json.load(f)

        # Opening the poses.json file to get the Timestamps of the Current Camera per frame
        timestamps_json = os.path.join(camera_folder_items_path, "timestamps.json")
        with open(timestamps_json, 'r') as f:
            timestamps_json_data: list = json.load(f)

        # Get all the camera JSONs sorted by frame number
        camera_jsons = pathlib.Path(camera_folder_json_path).rglob('*.json')
        camera_jsons = sorted(camera_jsons, key=lambda x: int(x.stem))

        for camera_frame_idx, camera_json in enumerate(camera_jsons):
            with open(camera_json, 'r') as f:
                camera_json_data = json.load(f)

            camera_id = f"{camera_folder}_frame_{camera_frame_idx}"
            camera_intrinsic = camera_calibrations.Intrinsic(
                fx=intrinsics_json_data.get("fx", 0),
                fy=intrinsics_json_data.get("fy", 0),
                cx=intrinsics_json_data.get("cx", 0),
                cy=intrinsics_json_data.get("cy", 0)
            )
            camera_rotation = extrinsic_calibrations.QuaternionRotation(
                x=poses_json_data[camera_frame_idx].get("heading", dict()).get("x", 0),
                y=poses_json_data[camera_frame_idx].get("heading", dict()).get("y", 0),
                z=poses_json_data[camera_frame_idx].get("heading", dict()).get("z", 0),
                w=poses_json_data[camera_frame_idx].get("heading", dict()).get("w", 1)
            )
            camera_translation = extrinsic_calibrations.Translation(
                x=poses_json_data[camera_frame_idx].get("position", dict()).get("x", 0),
                y=poses_json_data[camera_frame_idx].get("position", dict()).get("y", 0),
                z=poses_json_data[camera_frame_idx].get("position", dict()).get("z", 0)
            )
            camera_distortion = camera_calibrations.Distortion(
                k1=0,
                k2=0,
                k3=0,
                p1=0,
                p2=0
            )
            camera_timestamp = str(timestamps_json_data[camera_frame_idx])

            lidar_camera_data = camera_calibrations.LidarCameraData(
                intrinsic=camera_intrinsic,
                extrinsic=extrinsic_calibrations.Extrinsic(
                    rotation=camera_rotation,
                    translation=camera_translation
                ),
                channel=camera_json_data.get("filename"),
                distortion=camera_distortion,
                cam_id=camera_id,
            )

            lidar_image_data = images_and_pcds.LidarImageData(
                item_id=camera_json_data.get("id"),
                lidar_camera=lidar_camera_data,
                remote_path=camera_json_data.get("filename"),
                timestamp=camera_timestamp
            )

            cameras_data[camera_folder][camera_frame_idx] = {
                "lidar_camera": lidar_camera_data,
                "lidar_image": lidar_image_data
            }

    return cameras_data

In [None]:
cameras_data = parse_cameras_data(items_path=items_path, json_path=json_path)
# print(cameras_data)

## Build Lidar Scene

Combine the `lidar_data` and `cameras_data` togther to build the `frames.json`, a LiDAR video file, with all the point cloud and image files stitched together, where each frame contains the following information:

1. `PCD file:` For further information about how a PCD file must look, refer to [Why a new file format?](https://pointclouds.org/documentation/tutorials/pcd_file_format.html#why-a-new-file-format).

2. `JPEG/PNG files:` The images of the available cameras for the given frame.

In [None]:
def build_lidar_scene(lidar_data: dict, cameras_data: dict):
    """
    Merge the all the object of lidar_data and cameras_data to build the LidarScene object that will be uploaded as
    the frames.json item
    :return: scene_data: LidarScene data as JSON that will to be uploaded to the dataloop platform as
    the frames.json item
    """
    scene = lidar_scene.LidarScene()
    frames_number = len(lidar_data)
    for frame_number in range(frames_number):
        logger.info(f"Processing PCD data [Frame: {frame_number}]")
        frame_lidar_pcd_data = lidar_data[frame_number]
        lidar_frame_images = list()

        for camera_idx, (camera_folder, camera_data) in enumerate(cameras_data.items()):
            logger.info(f"Processing Camera data [Frame: {frame_number}, Camera Index: {camera_idx}]")
            frame_lidar_camera_data = camera_data.get(frame_number, dict()).get("lidar_camera", None)
            frame_lidar_image_data = camera_data.get(frame_number, dict()).get("lidar_image", None)

            if frame_lidar_camera_data is None or frame_lidar_image_data is None:
                continue

            scene.add_camera(frame_lidar_camera_data)
            lidar_frame_images.append(frame_lidar_image_data)

        lidar_scene_frame = lidar_frame.LidarSceneFrame(
            lidar_frame_pcd=frame_lidar_pcd_data,
            lidar_frame_images=lidar_frame_images
        )
        scene.add_frame(lidar_scene_frame)

    scene_data = scene.to_json()
    return scene_data

In [None]:
scene_data = build_lidar_scene(lidar_data=lidar_data, cameras_data=cameras_data)

frames_item = dataset.items.upload(
    remote_name="frames.json",
    remote_path=f"/{remote_path}",
    local_path=json.dumps(scene_data).encode(),
    overwrite=True,
    item_metadata={
        "system": {
            "shebang": {
                "dltype": "PCDFrames"
            }
        },
        "fps": 1
    }
)
print(frames_item)

## Parse Annotations (Customizable)

A method to parse the Annotations data from all the downloaded files.
Notices:

1. It is possible to modify the function to upload 3D annotations to the 3D point cloud files and 2D annotations to the images.

2. Annotations uploaded to the separated point cloud and images files will not be visible on the frames.json file. 

In [None]:
def parse_annotations(frames_item: dl.Item, items_path, json_path):
    """
    Parse the annotations data to build and upload the annotations to the frames.json item
    :param items_path: Paths to the downloaded items directory
    :param json_path: Paths to the downloaded annotations JSON files directory
    :return: None
    """
    # annotations_json_path = os.path.join(json_path, "annotations")
    annotations_items_path = os.path.join(items_path, "annotations")

    builder = frames_item.annotations.builder()
    frames_json_data = json.loads(s=frames_item.download(save_locally=False).getvalue())

    next_object_id = 0
    uid_to_object_id_map = dict()
    labels = set()

    # Parse the cuboid annotations and add them to the annotations builder
    cuboids_items_path = os.path.join(annotations_items_path, "cuboids")
    cuboids_csvs = pathlib.Path(cuboids_items_path).rglob('*.csv')
    cuboids_csvs = sorted(cuboids_csvs, key=lambda x: int(x.stem))

    for csv_frame_idx, cuboids_csv in enumerate(cuboids_csvs):
        # Getting the Lidar Scene Frame Translation and Rotation
        frame_pcd_translation = frames_json_data["frames"][csv_frame_idx]["translation"]
        frame_pcd_translation = np.array(
            [frame_pcd_translation["x"], frame_pcd_translation["y"], frame_pcd_translation["z"]]
        )
        frame_pcd_rotation = frames_json_data["frames"][csv_frame_idx]["rotation"]
        frame_pcd_rotation = np.array(
            [frame_pcd_rotation["x"], frame_pcd_rotation["y"], frame_pcd_rotation["z"], frame_pcd_rotation["w"]]
        )

        # Opening the current Scene Frame, cuboid annotations CSV file to get the cuboids annotation data
        cuboids_csv_data = pd.read_csv(filepath_or_buffer=cuboids_csv)
        for _, row_data in cuboids_csv_data.iterrows():
            object_id = uid_to_object_id_map.get(row_data["uuid"], None)
            if object_id is None:
                object_id = next_object_id
                uid_to_object_id_map[row_data["uuid"]] = object_id
                next_object_id += 1

            ann_label = row_data["label"]
            ann_position = np.array([row_data["position.x"], row_data["position.y"], row_data["position.z"]])
            ann_quaternion = transformations.quaternion_from_euler(*[0, 0, row_data["yaw"]])
            ann_scale = np.array([row_data["dimensions.x"], row_data["dimensions.y"], row_data["dimensions.z"]])

            # Calculate the transform matrix of the cuboid annotation relatively to the Scene Frame
            ann_transform_matrix = transformations.calc_cuboid_scene_transform_matrix(
                cuboid_position=ann_position,
                cuboid_quaternion=ann_quaternion,
                cuboid_scale=ann_scale,
                scene_position=frame_pcd_translation,
                scene_quaternion=frame_pcd_rotation
            )

            # Extract the cuboid Translation and Rotation from the transform matrix
            ann_position = transformations.translation_vector_from_transform_matrix(
                transform_matrix=ann_transform_matrix
            )
            ann_rotation_matrix = transformations.rotation_matrix_from_transform_matrix(
                transform_matrix=ann_transform_matrix
            )
            ann_rotation = transformations.euler_from_rotation_matrix(rotation_matrix=ann_rotation_matrix)

            # Add the cuboid annotation to the annotations builder
            annotation_definition = dl.Cube3d(
                label=ann_label,
                position=ann_position,
                scale=ann_scale,
                rotation=ann_rotation
            )
            builder.add(
                annotation_definition=annotation_definition,
                object_id=object_id,
                frame_num=csv_frame_idx
            )
            labels.add(ann_label)

    builder.upload()
    frames_item.dataset.update_labels(label_list=list(labels), upsert=True)

In [None]:
parse_annotations(frames_item=frames_item, items_path=items_path, json_path=json_path)
# frames_item.open_in_web()

## Cleanup

Delete the directory with all the downloaded files.

In [None]:
shutil.rmtree(path=download_path, ignore_errors=True)