In [None]:
import os
import subprocess
from pathlib import Path
import torch
from mmdet3d.evaluation.metrics import nuscenes_metric as nus_metric
from mmdet3d.evaluation.metrics.nuscenes_metric import output_to_nusc_box
import json 
from pyquaternion import Quaternion
from nuscenes.utils.data_classes import Box
from nuscenes.utils.geometry_utils import transform_matrix
import operator
from functools import reduce
from pathlib import Path
import numpy as np
from nuscenes.nuscenes import NuScenes
from nuscenes.eval.detection.config import config_factory
from nuscenes.eval.detection.evaluate import NuScenesEval
from nuscenes.utils.data_classes import RadarPointCloud
from classes import cls_attr_dist, class_names, mini_val_tokens
from custom_env import home_dir, output_dir, preds_dir, model_dir, is_set_to_mini
from custom_env import dataset_root as dataroot

import import_ipynb
import nuscenes_accumulate
import nuscenes_evaluate

In [None]:
eval_set_map = {
        'v1.0-mini': 'mini_val',
        'v1.0-trainval': 'val',
        'v1.0-test': 'test'
    }

dataset_version = 'v1.0-mini' if is_set_to_mini() else 'v1.0-trainval'
try:
    eval_version = 'detection_cvpr_2019'
    eval_config = config_factory(eval_version)
except:
    eval_version = 'cvpr_2019'
    eval_config = config_factory(eval_version)
    


In [None]:
DETECTION_THRESHOLD = 0.4

backend_args = None
nusc = NuScenes(version=dataset_version, dataroot = dataroot)
ann_file = f'{dataroot}nuscenes_infos_val.pkl'
metric='bbox'

pcd_path = f"{dataroot}/samples/LIDAR_TOP/"
mmdet_path = f"{home_dir}/software/mmdetection3d"
pcd_list = os.listdir(pcd_path)

# Instantiate evaluator:
evaluator = nus_metric.NuScenesMetric(dataroot, ann_file)

|Checkpoint.pth|config acc to links|log.json|config.py acc to log.json
|------------|----------|---|---|
|hv_pointpillars_secfpn_sbn-all_4x8_2x_nus-3d_20210826_225857-f19d00a3.pth|pointpillars_hv_secfpn_sbn-all_8xb4-2x_nus-3d.py|hv_pointpillars_secfpn_sbn-all_4x8_2x_nus-3d_20210826_225857.log.json|hv_pointpillars_secfpn_sbn-all_4x8_2x_nus-3d.py|
|hv_pointpillars_secfpn_sbn-all_fp16_2x8_2x_nus-3d_20201020_222626-c3f0483e.pth|pointpillars_hv_secfpn_sbn-all_8xb2-amp-2x_nus-3d.py|hv_pointpillars_secfpn_sbn-all_fp16_2x8_2x_nus-3d_20201020_222626.log.json||
|hv_pointpillars_fpn_sbn-all_4x8_2x_nus-3d_20210826_104936-fca299c1.pth|pointpillars_hv_fpn_sbn-all_8xb4-2x_nus-3d.py|hv_pointpillars_fpn_sbn-all_4x8_2x_nus-3d_20210826_104936.log.json|hv_pointpillars_fpn_sbn-all_4x8_2x_nus-3d.py|
|hv_pointpillars_fpn_sbn-all_fp16_2x8_2x_nus-3d_20201021_120719-269f9dd6.pth|pointpillars_hv_fpn_sbn-all_8xb2-amp-2x_nus-3d.py|hv_pointpillars_fpn_sbn-all_fp16_2x8_2x_nus-3d_20201021_120719.log.json||


In [None]:
# Modified PCDet functions:
# Box conversion
def boxes_lidar_to_nusenes(det_info):
    boxes3d = det_info['bboxes_3d']
    scores = det_info['scores_3d']
    labels = det_info['labels_3d']

    box_list = []
    for k in range(boxes3d.shape[0]):
        quat = Quaternion(axis=[0, 0, 1], radians=boxes3d[k, 6])
        velocity = (*boxes3d[k, 7:9], 0.0) if boxes3d.shape[1] == 9 else (0.0, 0.0, 0.0)
        box = Box(
            boxes3d[k, :3],
            boxes3d[k, [4, 3, 5]],  # wlh
            quat, label=labels[k], score=scores[k], velocity=velocity,
        )
        box_list.append(box)
    return box_list

def lidar_nusc_box_to_global(nusc, boxes, sample_token):
    s_record = nusc.get('sample', sample_token)
    sample_data_token = s_record['data']['LIDAR_TOP']

    sd_record = nusc.get('sample_data', sample_data_token)
    cs_record = nusc.get('calibrated_sensor', sd_record['calibrated_sensor_token'])
    sensor_record = nusc.get('sensor', cs_record['sensor_token'])
    pose_record = nusc.get('ego_pose', sd_record['ego_pose_token'])

    data_path = nusc.get_sample_data_path(sample_data_token)
    box_list = []
    for box in boxes:
        # Move box to ego vehicle coord system
        box.rotate(Quaternion(cs_record['rotation']))
        box.translate(np.array(cs_record['translation']))
        # Move box to global coord system
        box.rotate(Quaternion(pose_record['rotation']))
        box.translate(np.array(pose_record['translation']))
        box_list.append(box)
    return box_list

def transform_det_annos_to_nusc_annos(det_annos, nusc):
    nusc_annos = {
        'results': {},
        'meta': None,
    }

    for det in det_annos:
        annos = []
        try:
            box_list = boxes_lidar_to_nusenes(det)
            box_list = lidar_nusc_box_to_global(
                nusc=nusc, boxes=box_list, sample_token=det['metadata']['token']
            )
        except:
            print("TypeError in transform_det_annos_to_nusc_annos: string indices must be integers")
        for k, box in enumerate(box_list):
            name = det['name'][k]
            if np.sqrt(box.velocity[0] ** 2 + box.velocity[1] ** 2) > 0.2:
                if name in ['car', 'construction_vehicle', 'bus', 'truck', 'trailer']:
                    attr = 'vehicle.moving'
                elif name in ['bicycle', 'motorcycle']:
                    attr = 'cycle.with_rider'
                else:
                    attr = None
            else:
                if name in ['pedestrian']:
                    attr = 'pedestrian.standing'
                elif name in ['bus']:
                    attr = 'vehicle.stopped'
                else:
                    attr = None
            attr = attr if attr is not None else max(
                cls_attr_dist[name].items(), key=operator.itemgetter(1))[0]
            nusc_anno = {
                'sample_token': det['metadata']['token'],
                'translation': box.center.tolist(),
                'size': box.wlh.tolist(),
                'rotation': box.orientation.elements.tolist(),
                'velocity': box.velocity[:2].tolist(),
                'detection_name': name,
                'detection_score': box.score,
                'attribute_name': attr
            }
            if det['scores_3d'][k] >= DETECTION_THRESHOLD:
                annos.append(nusc_anno)

        nusc_annos['results'].update({det["metadata"]["token"]: annos})

    return nusc_annos


In [None]:
def construct_token_dict():
    """
    Constructs a dictionary mapping filenames to lidar and sample tokens
    """
    token_dict = dict()
    for scene in nusc.scene:
        sample_token = scene['first_sample_token']
        sample = nusc.get('sample', sample_token)
        lidar_data = nusc.get('sample_data', sample['data']["LIDAR_TOP"])
        while sample['next'] != "":
            filename = lidar_data['filename']
            file_str = filename[filename.rfind("/")+1:].replace("bin", "json")
            token_dict[file_str] = {"lidar_token": lidar_data['token'], "sample_token": lidar_data['sample_token']}
            sample = nusc.get("sample", sample['next'])
            lidar_data = nusc.get('sample_data', sample['data']["LIDAR_TOP"])
        
        filename = lidar_data['filename']
        file_str = filename[filename.rfind("/")+1:].replace("bin", "json")
        token_dict[file_str] = {"lidar_token": lidar_data['token'], "sample_token": lidar_data['sample_token']}

    with open(model_dir + "/token_dict.json", 'w') as f:
        json.dump(token_dict, f)
    f.close()

In [None]:
def get_sample_token(fn):
    with open(model_dir + "/token_dict_mini.json", 'r') as f:
        token_dict = json.load(f)
    sample_token = token_dict[fn]['sample_token']
    f.close()
    return sample_token

# Read json file:
def read_preds_file(fn):
    full_fn = os.path.join(preds_dir, fn)
    with open(full_fn, 'r') as f:
        result = json.load(f)
        result['bboxes_3d'] = torch.Tensor(result['bboxes_3d']).numpy()
        result['scores_3d'] = torch.Tensor(result['scores_3d']).numpy()
        class_labels = [class_names[k] for k in result['labels_3d']]
        result['labels_3d'] = torch.Tensor(result['labels_3d']).numpy()
        sample_token = dict()
        sample_token['token'] = get_sample_token(fn)
        result.update({'metadata':sample_token})
        result.update({'name':class_labels})
        
    f.close()
    return result

def read_results():
    """
    Reads the results from prediction files.

    Returns:
        list: A list of dictionaries containing the processed prediction data.
    """
    preds_fn = os.listdir(preds_dir)
    results = []
    count = 1
    for fn in preds_fn:
        if count%1000 == 0:
            print("Read results count: ", str(count))
        results.append(read_preds_file(fn))
        count += 1
    return results

def custom_result(fn):
    results = [read_preds_file(fn)]
    return results


In [None]:
def save_nusc_results(det_annos, **kwargs):
    nusc_annos = transform_det_annos_to_nusc_annos(det_annos, nusc)
    nusc_annos['meta'] = {
        'use_camera': False,
        'use_lidar': True,
        'use_radar': False,
        'use_map': False,
        'use_external': False,
    }

    output_path = Path(kwargs['output_path'])
    output_path.mkdir(exist_ok=True, parents=True)
    res_path = str(output_path / 'results_nusc.json')
    with open(res_path, 'w') as f:
        json.dump(nusc_annos, f)
    
    print(f'The predictions of NuScenes have been saved to {res_path}')
    return output_path, res_path

def get_metrics(output_path, res_path):
    try:
        eval_version = 'detection_cvpr_2019'
        eval_config = config_factory(eval_version)
    except:
        eval_version = 'cvpr_2019'
        eval_config = config_factory(eval_version)

    nusc_eval = NuScenesEval(
        nusc,
        config=eval_config,
        result_path=res_path,
        eval_set=eval_set_map[dataset_version],
        output_dir=str(output_path),
        verbose=True,
    )
    metrics_summary = nusc_eval.main(plot_examples=0, render_curves=False)

    with open(output_path / 'metrics_summary.json', 'r') as f:
        metrics = json.load(f)
    return metrics, metrics_summary, nusc_eval

In [None]:
def filter_results(results):
    return [
        result
        for result in results
        if result['metadata']['token'] in mini_val_tokens
    ]

In [None]:
construct_token_dict()
results = read_results()
results = filter_results(results)
nusc_results = transform_det_annos_to_nusc_annos(results, nusc)

In [None]:
print(len(nusc_results['results']))
output_path, res_path = save_nusc_results(results, output_path=model_dir)
metrics, metrics_summary, obj = get_metrics(output_path, res_path)

In [None]:
metrics

In [None]:
metrics_summary

In [None]:
import random

sample = "b5989651183643369174912bc5641d3b"
sample = nusc.get('sample', sample)

nusc.render_pointcloud_in_image(sample['token'], pointsensor_channel='LIDAR_TOP')

In [None]:
nusc.render_sample(sample['token'])

In [None]:
from nuscenes_render import render_sample_data_with_predictions

box_list = []
for annotation in nusc_results['results'][sample['token']]:
    box_list.append(Box(center=annotation['translation'], size=annotation['size'], orientation=Quaternion(annotation['rotation'])))
    
# for ann in sample['anns']:
#     details = nusc.get('sample_annotation', ann)
#     box_list.append(Box(center=details['translation'], size = details['size'], orientation=Quaternion(details['rotation'])))
    
render_sample_data_with_predictions(sample['data']['LIDAR_TOP'], underlay_map=False, pred_boxes=box_list, nusc=nusc)

In [None]:
len(sample['anns'])
for ann in sample['anns']:
    details = nusc.get('sample_annotation', ann)
    print(f"translation = {details['translation']}, \t size = {details['size']} \t class = {details['category_name']}")

In [None]:
for annotation in nusc_results['results'][sample['token']]:
    print(f"translation = {annotation['translation']}, \t size = {annotation['size']} \t class = {annotation['detection_name']}")

In [None]:
# for annotation in nusc_results['results'][sample['token']]:
#     plot_custom_bounding_boxes(sample['token'], [Box(annotation['translation'], annotation['size'], Quaternion(annotation['rotation']), name=annotation['detection_name'])])

In [30]:
from nuscenes_evaluate import DetectionEval as CustomDetectionEval

#   def __init__(self,
#                  nusc: NuScenes,
#                  config: DetectionConfig,
#                  result_path: str,
#                  eval_set: str,
#                  output_dir: str = None,
#                  verbose: bool = True):

eval = CustomDetectionEval(
    nusc=nusc,
    config=eval_config,
    result_path=f'{model_dir}/results_nusc.json',
    eval_set=eval_set_map[dataset_version],
    output_dir=os.getcwd(),
    verbose=True,
)

eval.custom_evaluate()

Initializing nuScenes detection evaluation
Loaded results from /home/ranai/nuscenes_dataset/inference_results_mini/model2_good/results_nusc.json. Found detections for 81 samples.
Loading annotations for mini_val split from nuScenes version: v1.0-mini


100%|██████████| 81/81 [00:00<00:00, 250.25it/s]


Loaded ground truth annotations for 81 samples.
Filtering predictions
=> Original number of boxes: 1741
=> After distance based filtering: 1741
=> After LIDAR and RADAR points based filtering: 1741
=> After bike rack filtering: 1741
Filtering ground truth annotations
=> Original number of boxes: 4441
=> After distance based filtering: 3785
=> After LIDAR and RADAR points based filtering: 3393
=> After bike rack filtering: 3393
Accumulating metric data...
Calculating metrics...


(<nuscenes.eval.detection.data_classes.DetectionMetrics at 0x7fcc6b390880>,
 <nuscenes.eval.detection.data_classes.DetectionMetricDataList at 0x7fcd38293b20>)