In [None]:
%cd ..

In [None]:
import json
import os
from nuscenes_types import from_dicts, SampleData, Scene, Sample, SampleAnnotation
from cleanup_types import InterpolatedAnnotation, SampleDataWithAnnotations, SceneWithFrames
from typing import Dict, List, TypeVar

In [None]:
data_dir = 'data/sets/nuscenes/v1.0-mini/'

In [None]:
[*os.walk(data_dir)]

In [None]:
T = TypeVar('T')
def get_index(data: T) -> Dict[str, T]:
    return {
        datum.token: datum
        for datum in data
    }

In [None]:
def read_data(data_dir: str) -> dict:
    data = {}
    for _, dirs, files in os.walk(data_dir):
        for file in files:
            with open(os.path.join(data_dir, file), 'r') as f:
                name = file[:-len('.json')]
                from_dict = from_dicts[name]
                content = json.load(f)
                data[name] = [from_dict(d) for d in content]
    sensor_index = get_index(data['sensor'])
    calibrated_sensor_index = get_index(data['calibrated_sensor'])
    data['sample_images'] = [*filter(
        lambda d: sensor_index[calibrated_sensor_index[d.calibrated_sensor_token].sensor_token].modality == 'camera',
        data['sample_data']
    )]
    return data

In [None]:
data = read_data(data_dir)

In [None]:
# data['calibrated_sensor']

In [None]:
data['visibility']

In [None]:
data['sensor']

In [None]:
sample_data = data['sample_images']

In [None]:
def get_sample_data_index(sample_data: "SampleData") -> Dict[str, List["SampleData"]]:
    sample_data_index = {}
    for s in sample_data:
        if s.is_key_frame:
            sample_token = s.sample_token
            if sample_token not in sample_data_index:
                sample_data_index[sample_token] = []
            sample_data_index[sample_token].append(s)
    return sample_data_index

In [None]:
def add_annotation(sample_data: List["SampleData"], annotations: List["SampleAnnotation"]) -> "SampleDataWithAnnotations":
    with_annotation = [SampleDataWithAnnotations.from_sample_data(sd) for sd in sample_data]
    sample_data_index = get_sample_data_index(with_annotation)
    
    for a in annotations:
        sample_token = a.sample_token
        sd = sample_data_index[sample_token]
        for s in sd:
            s.annotations.append(a)
    return with_annotation

In [None]:
sample_data_with_annotation = add_annotation(sample_data, data['sample_annotation'])

In [None]:
[s for s in sample_data_with_annotation if s.is_key_frame][0].annotations[0]

In [None]:
def annotation_token(a: "SampleAnnotation") -> str:
    return a.token

In [None]:
import numpy as np
def interpolate_array(start: "SampleAnnotation", end: "SampleAnnotation", field: str, weight: float):
    return ((np.array(getattr(start, field)) + np.array(getattr(end, field))) * weight).tolist()

In [None]:
def stack(start: "SampleAnnotation", end: "SampleAnnotation", field: str):
    return [getattr(start, field), getattr(end, field)]

In [None]:
def interpolate_annotations(start: List["SampleAnnotation"], end: List["SampleAnnotation"], weight: float) -> List["InterpolatedAnnotation"]:
    if len(start) == 0 or len(end) == 0:
        return []

    start = sorted(start, key=annotation_token)
    end = sorted(end, key=annotation_token)
    interpolated: InterpolatedAnnotation = []
    i = 0
    j = 0
    while i < len(start) and j < len(end):
        s = start[i]
        e = end[j]
        if start[i].instance_token == end[j].instance_token:
            interpolated.append(InterpolatedAnnotation(
                interpolated_weight=weight,
                instance_token=s.instance_token,
                attribute_tokens=[*set([*s.attribute_tokens, *e.attribute_tokens])],
                **{
                    field: interpolate_array(s, e, field, weight)
                    for field in ['translation', 'size', 'rotation']
                },
                **{
                    field + '2': stack(s, e, field)
                    for field in ['token', 'sample_token', 'visibility_token', 'num_lidar_pts', 'num_radar_pts']
                }
            ))
            i += 1
            j += 1
        elif start[i].instance_token > end[j].instance_token:
            j += 1
        elif start[i].instance_token < end[j].instance_token:
            i += 1
    return interpolated

In [None]:
CAMERA_SUFFIX = [
    'CAM_FRONT',
    'CAM_BACK',
    'CAM_BACK_LEFT',
    'CAM_FRONT_LEFT',
    'CAM_FRONT_RIGHT',
    'CAM_BACK_RIGHT'
]

def join_and_interpolate(
    scenes: List["Scene"],
    samples: List["Sample"],
    sample_data: List["SampleData"],
    sample_annotations: List["SampleAnnotation"]
) -> List["SceneWithFrames"]:
    scenes_with_frames: List["SceneWithFrames"] = []
    for s in data['scene']:
        for c in CAMERA_SUFFIX:
            scene = SceneWithFrames.from_scene(s)
            scene.token = scene.token + "-" + c
            scene.name = scene.name + "-" + c
            scenes_with_frames.append(scene)
            
    # scenes_with_cameras = [*map(lambda x: {**x, 'cameras': {}}, data['scene'])]
    scene_index = get_index(scenes_with_frames)
    # print(scene_index)
    sample_index = get_index(samples)
    
    sample_data_with_annotations = add_annotation(sample_data, sample_annotations)

    for s in sample_data_with_annotations:
        sample_token = s.sample_token
        camera_name = s.filename.split('/')[1]
        scene_token = sample_index[sample_token].scene_token
        scene = scene_index[scene_token + '-' + camera_name]
        frames = scene.frames
        frames.append(s)

    for scene in scenes_with_frames:
        frames = sorted(scene.frames, key=lambda x: x.timestamp)
        start_frame = None
        interpolated_frames = []
        for frame in frames:
            interpolated_frames.append(frame)
            if frame.is_key_frame:
                if start_frame is not None:
                    start_annotations = start_frame.annotations
                    end_annotations = interpolated_frames[-1].annotations
                    start_time = start_frame.timestamp
                    end_time = interpolated_frames[-1].timestamp
                    time_range = end_time - start_time
                    for f in interpolated_frames[:-1]:
                        assert len(f.annotations) == 0
                        weight = (f.timestamp - start_time) / time_range
                        f.annotations = interpolate_annotations(
                            start_annotations,
                            end_annotations,
                            weight
                        )
                interpolated_frames = []
                start_frame = frame

    return scenes_with_frames

In [None]:
interpolated_sample_data = join_and_interpolate(data['scene'], data['sample'], data['sample_images'], data['sample_annotation'])

In [None]:
interpolated_sample_data[0]

In [None]:
def camera_translation_rotation(ego_pose: dict, calibrated_sensor: dict):
    # TODO: implement the transformation
    return ego_pose['translation'], ego_pose['rotation']