# Augmentations on hand gestures dataset

#### Import libraries

In [15]:
import open3d as o3d
import os
import glob
import re
import itertools
import json
import numpy as np
import mrob

#### Set filenames

In [2]:
DATA_DIR = "data"
SET_DIR = "example_select"
CAMERAS_DIR = ["cam_center", "cam_right", "cam_left"]  # front, right, left (really not matter)

CALIBRATION_DIR = os.path.join(DATA_DIR, "calib_params")
CALIBRATION_INTRINSIC = {
    "cam_center" : "1m.json",
    "cam_right" : "2s.json",
    "cam_left" : "9s.json",
}
CALIBRATION_EXTRINSIC  = {
    ("cam_center", "cam_right") : os.path.join("2022-03-28-14-42-43", "1-2", "calibration_blob.json"),
    ("cam_center", "cam_left") : os.path.join("2022-03-28-14-42-43", "1-9", "calibration_blob.json"),
    ("cam_right", "cam_left") : os.path.join("2022-03-28-14-42-43", "2-9", "calibration_blob.json"),
}

## Functional part

#### Read images

Map images indexes by timestamps

Get a list of indexes for each frame from main stream that are nearest by timestamps

In [3]:
def find_timestamp(s):
    """
    Returns indexes of start and the end (negative) of the longest numerical sequence
    
    Parameters:
    s (str): string with numerical timestamp

    Returns:
    timestamp_index (list): start and end (negative) of the timestamp sequence
    """
    longest = max(re.findall(r'\d+', s), key=len)
    start = s.find(longest)
    
    timestamp_index = [start, start + len(longest) - len(s)]

    return timestamp_index


def map_nearest(cameras_filenames, main_camera_index=0):
    """
    Returns an array [L x N] of indexes that matches every i frame (along L)
    from main camera to the nearest frame from other cameras by timestamps
    All filenames should contain timestamps and be sorted by them
    N = number of cameras
    L = number of filenames in cameras_filenames[main_camera_index]

    Parameters:
    cameras_filenames (Sequence[Sequence[str]]): filenames from N cameras
    main_camera_index (int, default=0): main camera index

    Returns:
    nearest (np.ndarray[int]): indexes of filenames that are nearest to i frame from main camera
    """
    assert all(cameras_filenames)
    
    nearest = np.zeros((len(cameras_filenames[main_camera_index]), len(cameras_filenames)), dtype=np.int64)
    pointers = np.zeros(len(cameras_filenames), dtype=np.int64)

    patterns = np.array([find_timestamp(filenames[0]) for filenames in cameras_filenames], dtype=np.int64)

    for i, main_filename in enumerate(cameras_filenames[main_camera_index]):
        now = int(main_filename[patterns[main_camera_index, 0]:patterns[main_camera_index, 1]])

        for j, (pointer, filenames) in enumerate(zip(pointers, cameras_filenames)):
            now_camera = int(filenames[pointer][patterns[j, 0]:patterns[j, 1]])

            while pointer + 1 < len(filenames) and \
                    now - now_camera > int(filenames[pointer + 1][patterns[j, 0]:patterns[j, 1]]) - now:
                pointer += 1
                now_camera = int(filenames[pointer][patterns[j, 0]:patterns[j, 1]])
            
            nearest[i, j] = pointer
    
    return nearest

#### Read calibration

In [4]:
def get_intrinsics(filenames):
    """
    Returns a list [2*N x 6] of calibration parameters for color and depth cameras
    in form of open3d.camera.PinholeCameraIntrinsic from JSON files
    [width, height, fx, fy, cx, cy]
    N = number of filenames

    Parameters:
    filenames (Sequence[str]): JSON filenames containing calibration

    Returns:
    intrinsics (np.ndarray[float]): calibration parameters
    """
    assert all(filenames)

    intrinsics = np.zeros((2 * len(filenames), 6), dtype=np.float64)

    for i, filename in enumerate(filenames):
        with open(filename) as json_file:
            data = json.load(json_file)

        for j, camera in enumerate(("color", "depth")):
            intrinsics[2 * i + j] = [
                data[f"{camera}_camera"]["resolution_width"],
                data[f"{camera}_camera"]["resolution_height"],
                data[f"{camera}_camera"]["intrinsics"]["parameters"]["parameters_as_dict"]["fx"],
                data[f"{camera}_camera"]["intrinsics"]["parameters"]["parameters_as_dict"]["fy"],
                data[f"{camera}_camera"]["intrinsics"]["parameters"]["parameters_as_dict"]["cx"],
                data[f"{camera}_camera"]["intrinsics"]["parameters"]["parameters_as_dict"]["cy"],
            ]

    return intrinsics


def get_extrinsics(filenames):
    """
    Returns a list [N x 4 x 4] of transforms
    N = number of filenames

    Parameters:
    filenames (Sequence[str]): JSON filenames containing calibration

    Returns:
    extrinsics (np.ndarray[float]): relative calibration parameters
    """
    assert all(filenames)

    extrinsics = np.zeros((len(filenames), 4, 4), dtype=np.float64)

    for i, filename in enumerate(filenames):
        with open(filename) as json_file:
            data = json.load(json_file)

        T = np.eye(4)
        T[:3, :3] = np.array(data['CalibrationInformation']['Cameras'][1]['Rt']['Rotation']).reshape(3, 3)
        T[:3, 3] = np.array(data['CalibrationInformation']['Cameras'][1]['Rt']['Translation'])

        extrinsics[i] = T

    return extrinsics

#### Transform Depth image on Color image

TODO (currently provided by dataset)

NOTE: better to pre-process this step rather to integrate in pipeline

#### Create Point clouds

NOTE: better to pre-process this step rather to integrate in pipeline

In [5]:
def get_rgbd_images(images_paths, depth_scale=1000, depth_trunc=5.0):
    """
    Returns a list [N // 2] of open3d.geometry.RGBDImage
    Input list should contain paths in the same order as all pipeline
    N = number of paths

    Parameters:
    images_paths (Sequence[str]): filenames for color and depth images
    depth_scale (float, default=1000.0): ratio to scale depth
    depth_trunc (float, default=5.0): values larger depth_trunc gets truncated.
        The depth values will first be scaled and then truncated.

    Returns:
    rgbd_images (list[open3d.geometry.RGBDImage]): RGB-D images
    """
    assert all(images_paths)
    assert len(images_paths) % 2 == 0
    
    rgbd_images = [[] for _ in range(len(images_paths) // 2)]

    for i in range(len(rgbd_images)):
        rgbd_images[i] = o3d.geometry.RGBDImage.create_from_color_and_depth(
            color=o3d.io.read_image(images_paths[2 * i]),
            depth=o3d.io.read_image(images_paths[2 * i + 1]),
            depth_scale=depth_scale,
            depth_trunc=depth_trunc,
            convert_rgb_to_intensity=False
        )
    
    return rgbd_images


def create_point_clouds(rgbd_images, intrinsics, extrinsics):
    """
    Returns a list [N] of open3d.geometry.PointCloud
    Input list should contain RGB-D images as open3d.geometry.RGBDImage
    All inputs should have the same shape
    N = number of images

    Parameters:
    rgbd_images (Sequence[open3d.geometry.RGBDImage]): RGB-D images
    intrinsics (np.ndarray[float]): intrinsics of RGB-D images
    extrinsics (np.ndarray[float]): extrinsics of RGB-D images

    Returns:
    rgbd_images (list[open3d.geometry.RGBDImage]): RGB-D images
    """
    assert len(rgbd_images) == len(intrinsics) == len(extrinsics)
    assert all(rgbd_images)

    point_clouds = [[] for _ in range(len(rgbd_images))]
    to_filter = [1, 1, 0, 0, 0, 0]

    for i in range(len(point_clouds)):
        point_clouds[i] = o3d.geometry.PointCloud.create_from_rgbd_image(
            image=rgbd_images[i],
            intrinsic=o3d.camera.PinholeCameraIntrinsic(
                *list(map(lambda val: int(val[1]) if to_filter[val[0]] else val[1], enumerate([*intrinsics[i]])))
            ),
            extrinsic=np.linalg.inv(extrinsics[i]),
            project_valid_depth_only=True
        )
    
    return point_clouds

#### Concatenate Point clouds

In [39]:
def concatenate_point_clouds(point_clouds):
    return np.sum(point_clouds)

## Overall pipeline

Read images

In [7]:
# list of [..., CAMERAS_DIR[i - 1]/depth/*.png, CAMERAS_DIR[i]/color/*.jpg, CAMERAS_DIR[i]/depth/*.png, ...]
images_paths = [sorted(glob.glob(os.path.join(DATA_DIR, SET_DIR, camera, type, '*')))
    for camera, type in itertools.product(CAMERAS_DIR, ('color', 'depth'))]
mapped_indexes = map_nearest(images_paths, main_camera_index=0)

Read calibration

In [8]:
intrinsics_paths = [os.path.join(CALIBRATION_DIR, CALIBRATION_INTRINSIC[camera])
    for camera in CAMERAS_DIR]
intrinsics = get_intrinsics(intrinsics_paths)

extrinsics_paths = [os.path.join(CALIBRATION_DIR, CALIBRATION_EXTRINSIC[camera])
    for camera in itertools.combinations(CAMERAS_DIR, 2)]
extrinsics = get_extrinsics(extrinsics_paths)

Create and Concatenate Point clouds

In [55]:
for group in mapped_indexes:
    paths = [''] * len(group)
    for i, index in enumerate(group):
        paths[i] = images_paths[i][index]
    
    rgbd_images = get_rgbd_images(
        paths,
        depth_scale=1000,
        depth_trunc=5.0
    )

    point_clouds = create_point_clouds(
        rgbd_images,
        intrinsics[::2],
        np.concatenate([[np.eye(4)], extrinsics[:len(CAMERAS_DIR) - 1]])
    )
    
    concatenated_pc = concatenate_point_clouds(point_clouds)

    T = np.array([
        [1., 0., 0., 0.],
        [0., np.cos(np.pi/6), -np.sin(np.pi/6), 0.],
        [0., np.sin(np.pi/6), np.cos(np.pi/6), 0.],
        [0., 0., 0., 1.]],
        dtype=np.float64
    )
    concatenated_pc.transform(T)
    
    break

In [56]:
import plotly.graph_objects as go

In [57]:
depth_points = np.asarray(concatenated_pc.points)
colors_points = np.asarray(concatenated_pc.colors)

step = 200

fig = go.Figure(
    data=[
        go.Scatter3d(
            x=depth_points[::step, 0],
            y=depth_points[::step, 1],
            z=depth_points[::step, 2],
            mode='markers',
            marker=dict(size=1, color=colors_points[::step])
        )
    ],
    layout=dict(
        scene=dict(
            xaxis=dict(visible=True),
            yaxis=dict(visible=True),
            zaxis=dict(visible=True)
        )
    )
)
fig.show()