In [None]:
import json
import os
from pyquaternion import Quaternion
import pandas as pd
import pickle
import numpy as np

In [None]:
EXPERIMENT = True

In [None]:
full_data_dir = '/work/apperception/data/nuScenes/full-dataset-v1.0/Trainval/'

In [None]:
base_dir = full_data_dir
folder = 'v1.0-trainval'

In [None]:
with open(os.path.join(base_dir, folder, 'calibrated_sensor.json')) as f:
    calibrated_sensor_json = json.load(f)

with open(os.path.join(base_dir, folder, 'category.json')) as f:
    category_json = json.load(f)

with open(os.path.join(base_dir, folder, 'sample.json')) as f:
    sample_json = json.load(f)

with open(os.path.join(base_dir, folder, 'sample_data.json')) as f:
    sample_data_json = json.load(f)

with open(os.path.join(base_dir, folder, 'sample_annotation.json')) as f:
    sample_annotation_json = json.load(f)

with open(os.path.join(base_dir, folder, 'instance.json')) as f:
    instance_json = json.load(f)

with open(os.path.join(base_dir, folder, 'scene.json')) as f:
    scene_json = json.load(f)

with open(os.path.join(base_dir, folder, 'ego_pose.json')) as f:
    ego_pose_json = json.load(f)

with open(os.path.join(base_dir, folder, 'sensor.json')) as f:
    sensor_json = json.load(f)

In [None]:
files = os.listdir(os.path.join(full_data_dir, 'experiment_data'))

In [None]:
import math
from typing import List

def normalizeAngle(angle) -> float:
    while angle > math.pi:
        angle -= math.tau
    while angle < -math.pi:
        angle += math.tau
    assert -math.pi <= angle <= math.pi
    return angle

def get_heading(rotation):
    yaw = rotation.yaw_pitch_roll[0]
    return normalizeAngle(yaw)

def get_heading_from_north(rotation):
    yaw = rotation.yaw_pitch_roll[0]
    return normalizeAngle(yaw - (math.pi / 2))

rot = Quaternion(axis=[1, 0, 0], angle=np.pi / 2)
def get_camera_heading(rotation):
    return -get_heading(rot.rotate(rotation)) + math.pi / 2
    
def get_camera_position(
    camera_translation: List[float],
    ego_translation: List[float],
    ego_rotation: List[float]
) -> np.ndarray:
    rotated_offset = Quaternion(ego_rotation) \
        .inverse \
        .rotate(np.array(camera_translation))
    return np.array(ego_translation) + rotated_offset

In [None]:
files_set = set(files)

In [None]:
if EXPERIMENT:
    sample_data_filter = [s for s in sample_data_json if s['filename'].split('/')[2] in files_set]
else:
    sample_data_filter = sample_data_json
# sample_data_filter = [s for s in sample_data_json if 'CAM_FRONT/' in s['filename']]

In [None]:
sample_data_filter[0]

In [None]:
sample_tokens = set([s['sample_token'] for s in sample_data_filter])

In [None]:
ego_pose_tokens = set([s['ego_pose_token'] for s in sample_data_filter])

In [None]:
calibrated_sensor_tokens = set([s['calibrated_sensor_token'] for s in sample_data_filter])

In [None]:
sample_filter = [
    {
        'sample_token': s['token'],
        'scene_token': s['scene_token']
    }
    for s in sample_json
    if s['token'] in sample_tokens
]
len(sample_filter)

In [None]:
scene_tokens = set([s['scene_token'] for s in sample_filter])

In [None]:
calibrated_sensor_filter = [
    {
        'calibrated_sensor_token': c['token'],
        'camera_translation': c['translation'],
        'camera_rotation': c['rotation'],
        'camera_intrinsic': c['camera_intrinsic']
    }
    for c in calibrated_sensor_json
    if c['token'] in calibrated_sensor_tokens
]
len(calibrated_sensor_filter)

In [None]:
ego_pose_filter = [
    {
        'ego_pose_token': e['token'],
        'ego_translation': e['translation'],
        'ego_rotation': e['rotation']
    }
    for e in ego_pose_json
    if e['token'] in ego_pose_tokens
]
len(ego_pose_filter)

In [None]:
scene_filter = [
    {
        'scene_token': s['token'],
        'scene_name': s['name'],
    }
    for s in scene_json
    if s['token'] in scene_tokens
]
len(scene_filter)

In [None]:
sample_map = {
    s['sample_token']: s
    for s in sample_filter
}
calibrated_sensor_map = {
    c['calibrated_sensor_token']: c
    for c in calibrated_sensor_filter
}
ego_pose_map = {
    e['ego_pose_token']: e
    for e in ego_pose_filter
}
scene_map = {
    s['scene_token']: s
    for s in scene_filter
}

def s_map(s):
    sample = sample_map[s['sample_token']]
    calibrated_sensor = calibrated_sensor_map[s['calibrated_sensor_token']]
    ego_pose = ego_pose_map[s['ego_pose_token']]
    ego_heading = get_heading_from_north(Quaternion(ego_pose['ego_rotation']))
    camera_heading = get_camera_heading(Quaternion(calibrated_sensor['camera_rotation']))
    ret = {
        **s,
        **sample,
        **calibrated_sensor,
        **ego_pose,
        **scene_map[sample['scene_token']],
        'ego_heading': ego_heading * 180 / math.pi,
        'camera_heading': normalizeAngle(camera_heading + ego_heading) * 180 / math.pi,
        'camera_translation_absolute': get_camera_position(
            calibrated_sensor['camera_translation'],
            ego_pose['ego_translation'],
            ego_pose['ego_rotation']
        )
    }
    del ret['ego_pose_token']
    del ret['calibrated_sensor_token']
    del ret['fileformat']
    del ret['height']
    del ret['width']
    del ret['prev']
    del ret['next']
    del ret['scene_token']
    return ret

sample_data_res = [*map(s_map, sample_data_filter)]

len(sample_data_res)

In [None]:
sample_annotation_filter = [
    sa
    for sa in sample_annotation_json
    if sa['sample_token'] in sample_tokens
]
len(sample_annotation_filter)

In [None]:
instance_tokens = set([
    sa['instance_token']
    for sa in sample_annotation_filter
])

instance_filter = [
    {
        'instance_token': i['token'],
        'category_token': i['category_token']
    }
    for i in instance_json
    if i['token'] in instance_tokens
]
len(instance_filter)

In [None]:
category_tokens = set([
    i['category_token']
    for i in instance_filter
])

category_filter = [
    {
        'category_token': c['token'],
        'category': c['name']
    }
    for c in category_json
    if c['token'] in category_tokens
]
len(category_filter)

In [None]:
instance_map = {
    i['instance_token']: i
    for i in instance_filter
}
category_map = {
    c['category_token']: c
    for c in category_filter
}

def sa_map(sa):
    instance = instance_map[sa['instance_token']]
    ret = {
        **sa,
        **instance,
        **category_map[instance['category_token']],
        'heading': (get_heading_from_north(Quaternion(sa['rotation']))) * 180 / math.pi
    }
    
    del ret['visibility_token']
    del ret['attribute_tokens']
    del ret['prev']
    del ret['next']
    del ret['num_lidar_pts']
    del ret['num_radar_pts']
    del ret['category_token']
    
    return ret

sample_annotation_res = [*map(sa_map, sample_annotation_filter)]
len(sample_annotation_res)

In [None]:
sample_data_final = pd.DataFrame(sample_data_res)

In [None]:
sample_annotation_final = pd.DataFrame(sample_annotation_res)

In [None]:
df_sample_data_keyframe = sample_data_final[sample_data_final["is_key_frame"]][
    ["token", "sample_token"]
]
sample_annotation_final = sample_annotation_final.set_index("sample_token").join(
    df_sample_data_keyframe.set_index("sample_token"), on="sample_token", rsuffix="_sample_data"
)

In [None]:
sample_data_final = sample_data_final.sort_values(by=['scene_name', 'timestamp'])

In [None]:
sample_data_final["frame_order"] = 0
scene = ""
i = 1
for index, row in sample_data_final.iterrows():
    if row['scene_name'] != scene:
        scene = row['scene_name']
        sample_data_final.loc[index, "frame_order"] = 1
        i = 2
    else:
        sample_data_final.loc[index, "frame_order"] = i
        i += 1


In [None]:
print(len(sample_data_final))

In [None]:
print(len(sample_annotation_final))

In [None]:
if EXPERIMENT:
    suffix = '_experiment'
else:
    suffix = ''

In [None]:
with open(os.path.join(base_dir, f"sample_data{suffix}.pickle"), "wb") as f:
    pickle.dump(sample_data_final, f)
with open(os.path.join(base_dir, f"annotation{suffix}.pickle"), "wb") as f:
    pickle.dump(sample_annotation_final, f)

In [None]:
sample_data_final.to_csv(os.path.join(base_dir, f"sample_data{suffix}.csv"), index=False)

In [None]:
sample_annotation_final.to_csv(os.path.join(base_dir, f"annotation{suffix}.csv"), index=False)

In [None]:
calibrated_sensor = [
    {
        **c,
        **[s for s in sensor_json if s['token'] == c['sensor_token']][0]
    }
    for c in calibrated_sensor_json
]
calibrated_sensor = [
    c
    for c in calibrated_sensor
    if c['modality'] == 'camera'
]
rot = Quaternion(axis=[1, 0, 0], angle=np.pi / 2)
[
    normalizeAngle(get_heading(rot.rotate(Quaternion(c['rotation'])))) * 180 / math.pi
    for c in calibrated_sensor
    if c['channel'] == "CAM_FRONT"
]

In [None]:
sample_data_final