In [None]:
%load_ext autoreload
%autoreload 2

In [37]:
from nuscenes import NuScenes
from nuscenes.utils.data_io import load_bin_file
import numpy as np
np.set_printoptions(suppress=True)
import glob
import pathlib
import os
import json
from collections import Counter

In [38]:
from adperf_utils.perturb_utils import PerturbType as PT
from adperf_utils.perturb_utils import perturb_center_y
from adperf_utils.perturb_utils import add_noise, add_obs
from adperf_utils.perturb_utils import PT_LOOKUP

In [39]:
def read_nuscenes(file):
    return np.fromfile(file, dtype=np.float32).reshape((-1, 5))

### Setting parameters

In [None]:
nusc = NuScenes(version='v1.0-mini', dataroot='data/nuscenes', verbose=True)
corruption_folder = pathlib.Path('data/corruptions')
if not corruption_folder.exists():
    corruption_folder.mkdir(parents=True, exist_ok=False)

root_path = pathlib.Path('data/nuscenes')


In [41]:
# choose from ['noise_left', 'noise_right', 'obs_left', 'obs_right', 'center_y']
chosen_per_type = 'noise_left'
# choose from [0.1, 0.2, 0.3, 0.4, 0.5]
chosen_noise_level = 0.1

### Generate PCDs

In [None]:
path_not_exist = []
num_obs_pts = []
saving_to = ''

data_types = ['lidarseg', 'panoptic']
dataset = None
chosen_type = 'panoptic'

assert chosen_type in data_types

if chosen_type == 'lidarseg':
    dataset = nusc.lidarseg
elif chosen_type == 'panoptic':
    dataset = nusc.panoptic


for frame in dataset:
    # get obstacle info
    file_path = root_path / frame['filename']
    if not os.path.exists(file_path):
        path_not_exist.append(file_path)    
    label_array = load_bin_file(file_path, chosen_type)
    # masking code for filtering
    if chosen_type == 'lidarseg':
        label_array_simple = label_array
    elif chosen_type == 'panoptic':
        label_array_simple = label_array // 1000

    # filter vehicles
    mask = ((label_array_simple >= 1) & (label_array_simple <= 8)) | ((label_array_simple >= 14) & (label_array_simple <= 23))
    
    # get lidar file
    sample_data_token = frame['sample_data_token']
    sample_data_metadata = nusc.get('sample_data', sample_data_token)
    lidar_filename = sample_data_metadata['filename']
    lidar_file_path = root_path / lidar_filename
    if not os.path.exists(lidar_file_path):
        path_not_exist.append(lidar_file_path)

    # filter points
    pcd_scan = read_nuscenes(lidar_file_path)
    points_in_obs = pcd_scan[mask]
    num_obs_pts.append(len(points_in_obs))


    # extract labels for obstacle points
    label_array_in_obs = label_array[mask]
    assert len(points_in_obs) == len(label_array_in_obs)

    # add labels as a new column in pcd
    label_array_in_obs = np.reshape(label_array_in_obs, (len(label_array_in_obs), 1))
    points_in_obs_w_labels = np.hstack((points_in_obs, label_array_in_obs))

    # verify correctness
    for idx in range(len(points_in_obs)):
        if points_in_obs[idx][0] == points_in_obs_w_labels[idx][0] and points_in_obs[idx][1] == points_in_obs_w_labels[idx][1] and \
           points_in_obs[idx][2] == points_in_obs_w_labels[idx][2] and points_in_obs[idx][3] == points_in_obs_w_labels[idx][3] and \
           points_in_obs[idx][4] == points_in_obs_w_labels[idx][4] and label_array_in_obs[idx][0] == points_in_obs_w_labels[idx][5]:
            continue
        else:
            print('Incorrect labels')
            print(idx, points_in_obs[idx], label_array_in_obs[idx], points_in_obs_w_labels[idx])

    # modify PCD
    per_type = PT_LOOKUP[chosen_per_type]
   
    if not isinstance(per_type, PT):
        print('Invalid per type')
        break

    # modifications
    if per_type == PT.NOISE_LEFT or per_type == PT.NOISE_RIGHT:
        curr_corr_pcds_folder_path = corruption_folder / 'pcds' / f'{per_type.name.lower()}_{round(chosen_noise_level * 10)}'
        saving_to = curr_corr_pcds_folder_path
        corr_arr = add_noise(points_in_obs_w_labels, chosen_noise_level, per_type)
        new_pcd_scan = np.concatenate((pcd_scan, corr_arr[:, :5]), axis=0)
    elif per_type == PT.OBS_LEFT or per_type == PT.OBS_RIGHT:
        chosen_noise_level = 3
        curr_corr_pcds_folder_path = corruption_folder / 'pcds' / f'{per_type.name.lower()}_{round(chosen_noise_level * 10)}'
        saving_to = curr_corr_pcds_folder_path
        corr_arr = add_obs(points_in_obs, chosen_noise_level, per_type)
        new_pcd_scan = np.concatenate((pcd_scan, corr_arr), axis=0)
    elif per_type == PT.CENTER_Y:
        curr_corr_pcds_folder_path = corruption_folder / 'pcds' / f'{per_type.name.lower()}_{round(chosen_noise_level * 10)}'
        saving_to = curr_corr_pcds_folder_path
        corr_arr = perturb_center_y(points_in_obs_w_labels, chosen_noise_level)
        # remove current obstacles from pcd
        non_obs_mask = ~mask
        remaining_points = pcd_scan[non_obs_mask]
        # add new obstacles to pcd (without labels)
        new_pcd_scan = np.concatenate((remaining_points, corr_arr[:, :5]), axis=0)
    else:
        print(f'Invalid perturbation {per_type}')

    # write new pcd with modifications to file
    if not curr_corr_pcds_folder_path.exists():
        curr_corr_pcds_folder_path.mkdir(parents=True, exist_ok=False)
    # new_pcd_scan = np.concatenate((pcd_scan, corr_arr), axis=0)
    new_pcd_scan.astype(np.float32).tofile(curr_corr_pcds_folder_path / lidar_file_path.name)

# print('Summary')
# print(f'LIDAR_TOP not exist: {path_not_exist}!')
# print(f'Num pts in obs: {num_obs_pts}')
with open('trajectory_modification.txt', 'w') as fd:
    fd.write(f'{chosen_per_type},{chosen_noise_level}')

print(f'Saved PCD to {saving_to}')
