In [1]:
%matplotlib inline

import os.path as osp
import cv2
# from nuscenes import NuScenes
from nuscenes import NuScenes
from nuscenes.utils.geometry_utils import BoxVisibility

nusc = NuScenes(version='v1.0-trainval', dataroot='/data/datasets/nuscenes', verbose=True)


Loading NuScenes tables for version v1.0-trainval...
Loading nuScenes-panoptic...
32 category,
8 attribute,
4 visibility,
64386 instance,
12 sensor,
10200 calibrated_sensor,
2631083 ego_pose,
68 log,
850 scene,
34149 sample,
2631083 sample_data,
1166187 sample_annotation,
4 map,
34149 panoptic,
Done loading in 45.436 seconds.
Reverse indexing ...
Done reverse indexing in 5.8 seconds.


In [2]:
from typing import Tuple
import numpy as np
from nuscenes.utils.geometry_utils import view_points
from nuscenes.utils.data_classes import Box

def cv2_put_multi_line_text(im: np.ndarray,
                            text: str,
                            center: Tuple,
                            color: Tuple,
                            font=cv2.FONT_HERSHEY_COMPLEX,
                            font_scale=0.5,
                            thickness=1) -> None:
    text_size, _ = cv2.getTextSize("dummy_text", font, font_scale, thickness)
    line_height = text_size[1] + 5
    y0 = int(center[1])
    for i, text_line in enumerate(text.split('\n')):
        y = y0 + i * line_height
        cv2.putText(im, text_line, (int(center[0]), y),
                    cv2.FONT_HERSHEY_COMPLEX, font_scale, color[::-1], thickness, cv2.LINE_AA)


In [3]:
from typing import List
from nuscenes.utils.data_classes import LidarPointCloud, RadarPointCloud
from nuscenes.lidarseg.lidarseg_utils import paint_points_label
from nuscenes.panoptic.panoptic_utils import paint_panop_points_label
from pyquaternion import Quaternion
from PIL import Image
import os

def map_pointcloud_to_image_box(nusc: NuScenes,
                        pointsensor_token: str,
                        camera_token: str,
                        min_dist: float = 1.0,
                        render_intensity: bool = False,
                        show_lidarseg: bool = False,
                        filter_lidarseg_labels: List = None,
                        lidarseg_preds_bin_path: str = None,
                        show_panoptic: bool = False) -> Tuple:
    """
    Given a point sensor (lidar/radar) token and camera sample_data token, load pointcloud and map it to the image
    plane.
    :param pointsensor_token: Lidar/radar sample_data token.
    :param camera_token: Camera sample_data token.
    :param min_dist: Distance from the camera below which points are discarded.
    :param render_intensity: Whether to render lidar intensity instead of point depth.
    :param show_lidarseg: Whether to render lidar intensity instead of point depth.
    :param filter_lidarseg_labels: Only show lidar points which belong to the given list of classes. If None
        or the list is empty, all classes will be displayed.
    :param lidarseg_preds_bin_path: A path to the .bin file which contains the user's lidar segmentation
                                    predictions for the sample.
    :param show_panoptic: When set to True, the lidar data is colored with the panoptic labels. When set
        to False, the colors of the lidar data represent the distance from the center of the ego vehicle.
        If show_lidarseg is True, show_panoptic will be set to False.
    :return (pointcloud <np.float: 2, n)>, coloring <np.float: n>, image <Image>).
    """

    cam = nusc.nusc.get('sample_data', camera_token)
    pointsensor = nusc.nusc.get('sample_data', pointsensor_token)
    pcl_path = osp.join(nusc.nusc.dataroot, pointsensor['filename'])
    if pointsensor['sensor_modality'] == 'lidar':
        if show_lidarseg or show_panoptic:
            gt_from = 'lidarseg' if show_lidarseg else 'panoptic'
            assert hasattr(nusc.nusc, gt_from), f'Error: nuScenes-{gt_from} not installed!'

            # Ensure that lidar pointcloud is from a keyframe.
            assert pointsensor['is_key_frame'], \
                'Error: Only pointclouds which are keyframes have lidar segmentation labels. Rendering aborted.'

            assert not render_intensity, 'Error: Invalid options selected. You can only select either ' \
                                            'render_intensity or show_lidarseg, not both.'

        pc = LidarPointCloud.from_file(pcl_path)
    else:
        pc = RadarPointCloud.from_file(pcl_path)
    im = Image.open(osp.join(nusc.nusc.dataroot, cam['filename']))

    # Points live in the point sensor frame. So they need to be transformed via global to the image plane.
    # First step: transform the pointcloud to the ego vehicle frame for the timestamp of the sweep.
    cs_record = nusc.nusc.get('calibrated_sensor', pointsensor['calibrated_sensor_token'])
    pc.rotate(Quaternion(cs_record['rotation']).rotation_matrix)
    pc.translate(np.array(cs_record['translation']))

    # Second step: transform from ego to the global frame.
    poserecord = nusc.nusc.get('ego_pose', pointsensor['ego_pose_token'])
    pc.rotate(Quaternion(poserecord['rotation']).rotation_matrix)
    pc.translate(np.array(poserecord['translation']))

    # Third step: transform from global into the ego vehicle frame for the timestamp of the image.
    poserecord = nusc.nusc.get('ego_pose', cam['ego_pose_token'])
    pc.translate(-np.array(poserecord['translation']))
    pc.rotate(Quaternion(poserecord['rotation']).rotation_matrix.T)

    # Fourth step: transform from ego into the camera.
    cs_record = nusc.nusc.get('calibrated_sensor', cam['calibrated_sensor_token'])
    pc.translate(-np.array(cs_record['translation']))
    pc.rotate(Quaternion(cs_record['rotation']).rotation_matrix.T)

    # Fifth step: actually take a "picture" of the point cloud.
    # Grab the depths (camera frame z axis points away from the camera).
    depths = pc.points[2, :]

    if render_intensity:
        assert pointsensor['sensor_modality'] == 'lidar', 'Error: Can only render intensity for lidar, ' \
                                                            'not %s!' % pointsensor['sensor_modality']
        # Retrieve the color from the intensities.
        # Performs arbitary scaling to achieve more visually pleasing results.
        intensities = pc.points[3, :]
        intensities = (intensities - np.min(intensities)) / (np.max(intensities) - np.min(intensities))
        intensities = intensities ** 0.1
        intensities = np.maximum(0, intensities - 0.5)
        coloring = intensities
    elif show_lidarseg or show_panoptic:
        assert pointsensor['sensor_modality'] == 'lidar', 'Error: Can only render lidarseg labels for lidar, ' \
                                                            'not %s!' % pointsensor['sensor_modality']

        gt_from = 'lidarseg' if show_lidarseg else 'panoptic'
        semantic_table = getattr(nusc.nusc, gt_from)

        if lidarseg_preds_bin_path:
            sample_token = nusc.nusc.get('sample_data', pointsensor_token)['sample_token']
            lidarseg_labels_filename = lidarseg_preds_bin_path
            assert os.path.exists(lidarseg_labels_filename), \
                'Error: Unable to find {} to load the predictions for sample token {} (lidar ' \
                'sample data token {}) from.'.format(lidarseg_labels_filename, sample_token, pointsensor_token)
        else:
            if len(semantic_table) > 0:  # Ensure {lidarseg/panoptic}.json is not empty (e.g. in case of v1.0-test).
                lidarseg_labels_filename = osp.join(nusc.nusc.dataroot,
                                                    nusc.nusc.get(gt_from, pointsensor_token)['filename'])
            else:
                lidarseg_labels_filename = None

        if lidarseg_labels_filename:
            # Paint each label in the pointcloud with a RGBA value.
            if show_lidarseg:
                coloring = paint_points_label(lidarseg_labels_filename,
                                                filter_lidarseg_labels,
                                                nusc.nusc.lidarseg_name2idx_mapping,
                                                nusc.nusc.colormap)
            else:
                coloring = paint_panop_points_label(lidarseg_labels_filename,
                                                    filter_lidarseg_labels,
                                                    nusc.nusc.lidarseg_name2idx_mapping,
                                                    nusc.nusc.colormap)

        else:
            coloring = depths
            print(f'Warning: There are no lidarseg labels in {nusc.nusc.version}. Points will be colored according '
                    f'to distance from the ego vehicle instead.')
    else:
        # Retrieve the color from the depth.
        coloring = depths

    # Take the actual picture (matrix multiplication with camera-matrix + renormalization).
    points = view_points(pc.points[:3, :], np.array(cs_record['camera_intrinsic']), normalize=True)

    # Remove points that are either outside or behind the camera. Leave a margin of 1 pixel for aesthetic reasons.
    # Also make sure points are at least 1m in front of the camera to avoid seeing the lidar points on the camera
    # casing for non-keyframes which are slightly out of sync.
    mask = np.ones(depths.shape[0], dtype=bool)
    mask = np.logical_and(mask, depths > min_dist)
    mask = np.logical_and(mask, points[0, :] > 1)
    mask = np.logical_and(mask, points[0, :] < im.size[0] - 1)
    mask = np.logical_and(mask, points[1, :] > 1)
    mask = np.logical_and(mask, points[1, :] < im.size[1] - 1)
    points = points[:, mask]
    coloring = coloring[mask]

    return points, coloring, im


In [9]:
font = cv2.FONT_HERSHEY_COMPLEX
font_scale = 0.5
thickness = 1

def box_render_cv2_text(box: Box,
                    nusc: NuScenes,
                    im: np.ndarray,
                    view: np.ndarray = np.eye(3),
                    normalize: bool = False,
                    colors: Tuple = ((0, 0, 255), (255, 0, 0), (155, 155, 155)),
                    linewidth: int = 2,
                    ) -> None:
    """
    Renders box using OpenCV2.
    :param Box: Box to be rendered
    :param nusc: Active NuScenes object
    :param im: <np.array: width, height, 3>. Image array. Channels are in BGR order.
    :param view: <np.array: 3, 3>. Define a projection if needed (e.g. for drawing projection in an image).
    :param is_key_frame
    :param normalize: Whether to normalize the remaining coordinate.
    :param colors: ((R, G, B), (R, G, B), (R, G, B)). Colors for front, side & rear.
    :param linewidth: Linewidth for plot.
    """
    corners = view_points(box.corners(), view, normalize=normalize)[:2, :]

    def draw_rect(selected_corners, color):
        prev = selected_corners[-1]
        for corner in selected_corners:
            cv2.line(im,
                     (int(prev[0]), int(prev[1])),
                     (int(corner[0]), int(corner[1])),
                     color, linewidth)
            prev = corner

    # Draw the sides
    for i in range(4):
        cv2.line(im,
                 (int(corners.T[i][0]), int(corners.T[i][1])),
                 (int(corners.T[i + 4][0]), int(corners.T[i + 4][1])),
                 colors[2][::-1], linewidth)

    # Draw front (first 4 corners) and rear (last 4 corners) rectangles(3d)/lines(2d)
    draw_rect(corners.T[:4], colors[0][::-1])
    draw_rect(corners.T[4:], colors[1][::-1])

    # Draw line indicating the front
    center_bottom_forward = np.mean(corners.T[2:4], axis=0)
    center_bottom = np.mean(corners.T[[2, 3, 7, 6]], axis=0)
    cv2.line(im,
             (int(center_bottom[0]), int(center_bottom[1])),
             (int(center_bottom_forward[0]), int(center_bottom_forward[1])),
             colors[0][::-1], linewidth)

    # Add some text to the bounding box
    # Create text to be written
    attribute_tokens =  nusc.get('sample_annotation', box.token)['attribute_tokens']
    text = box.name
    for attribute_token in attribute_tokens:
        attribute = nusc.get('attribute', attribute_token)
        text += '\n' + attribute['name']
    visibility = nusc.get('sample_annotation', box.token)['visibility_token']
    text += '\nvisibility ' + visibility
    # Setup multi line text
    font = cv2.FONT_HERSHEY_COMPLEX
    font_scale = 0.5
    thickness = 1
    text_size, _ = cv2.getTextSize(box.name, font, font_scale, thickness)
    line_height = text_size[1] + 5
    center = np.mean(corners.T[:], axis=0)
    y0 = int(center[1])
    for i, text_line in enumerate(text.split('\n')):
        y = y0 + i * line_height
        cv2.putText(im, text_line, (int(center[0]), y),
                    cv2.FONT_HERSHEY_COMPLEX, font_scale, colors[0][::-1], thickness, cv2.LINE_AA)
    

def mask_pixels_from_box_cv2(box: Box,
                      width: int,
                      height: int,
                      view: np.ndarray = np.eye(3),
                      normalize: bool = False
                      ) -> None:
    """
    Renders box using OpenCV2.
    :param Box: Box to be rendered
    :param nusc: Active NuScenes object
    :param im: <np.array: width, height, 3>. Image array. Channels are in BGR order.
    :param view: <np.array: 3, 3>. Define a projection if needed (e.g. for drawing projection in an image).
    :param is_key_frame
    :param normalize: Whether to normalize the remaining coordinate.
    :param colors: ((R, G, B), (R, G, B), (R, G, B)). Colors for front, side & rear.
    :param linewidth: Linewidth for plot.
    """

    corners = view_points(box.corners(), view, normalize=normalize)[:2, :] # all x, y picture coordinates

    # simply fill all 6 sides of the cuboid without differentiating which sides are in the fore- or background
    mask = np.zeros((height, width), dtype=np.uint8)
    front = np.ix_([0, 1, 2, 3], [0, 1])
    rear = np.ix_([4, 5, 6, 7], [0, 1]) 
    bottom = np.ix_([2, 3, 7, 6], [0, 1])
    top = np.ix_([0, 1, 5, 4], [0, 1])
    left = np.ix_([0, 3, 7, 4], [0, 1])
    right = np.ix_([1, 2, 6, 5], [0, 1])
    cv2.fillPoly(mask, pts=[corners.T[front].astype(int)], color=(255))
    cv2.fillPoly(mask, pts=[corners.T[rear].astype(int)], color=(255))
    cv2.fillPoly(mask, pts=[corners.T[bottom].astype(int)], color=(255))
    cv2.fillPoly(mask, pts=[corners.T[top].astype(int)], color=(255))
    cv2.fillPoly(mask, pts=[corners.T[left].astype(int)], color=(255))
    cv2.fillPoly(mask, pts=[corners.T[right].astype(int)], color=(255))
    
    pixels_tuple = np.where(mask == 255)
    # pixels = np.array(())
    # pixels = np.transpose(np.asarray(mask == 255).nonzero())
    return mask, pixels_tuple



def contour_from_box_cv2(box: Box,
                      width: int,
                      height, int,
                      view: np.ndarray = np.eye(3),
                      normalize: bool = False
                      ) -> None:
    """
    Renders box using OpenCV2.
    :param Box: Box to be rendered
    :param nusc: Active NuScenes object
    :param im: <np.array: width, height, 3>. Image array. Channels are in BGR order.
    :param view: <np.array: 3, 3>. Define a projection if needed (e.g. for drawing projection in an image).
    :param is_key_frame
    :param normalize: Whether to normalize the remaining coordinate.
    :param colors: ((R, G, B), (R, G, B), (R, G, B)). Colors for front, side & rear.
    :param linewidth: Linewidth for plot.
    """

    corners = view_points(box.corners(), view, normalize=normalize)[:2, :]  # all x, y picture coordinates

    # simply fill all 6 sides of the cuboid without differentiating which sides are in the fore- or background
    mask = np.zeros((height, width), dtype=np.uint8)
    front = np.ix_([0, 1, 2, 3], [0, 1])
    rear = np.ix_([4, 5, 6, 7], [0, 1])
    bottom = np.ix_([2, 3, 7, 6], [0, 1])
    top = np.ix_([0, 1, 5, 4], [0, 1])
    left = np.ix_([0, 3, 7, 4], [0, 1])
    right = np.ix_([1, 2, 6, 5], [0, 1])
    cv2.fillPoly(mask, pts=[corners.T[front].astype(int)], color=(255))
    cv2.fillPoly(mask, pts=[corners.T[rear].astype(int)], color=(255))
    cv2.fillPoly(mask, pts=[corners.T[bottom].astype(int)], color=(255))
    cv2.fillPoly(mask, pts=[corners.T[top].astype(int)], color=(255))
    cv2.fillPoly(mask, pts=[corners.T[left].astype(int)], color=(255))
    cv2.fillPoly(mask, pts=[corners.T[right].astype(int)], color=(255))

    return mask


In [8]:
def render_birth_and_prebirth(prev_im: np.ndarray,
                              current_im: np.ndarray,
                              resize_factor: float) -> np.ndarray:
    
    prev_im_shrink = cv2.resize(prev_im, None, fx=resize_factor, fy=resize_factor, interpolation=cv2.INTER_AREA)
    current_im_shrink = cv2.resize(current_im, None, fx=resize_factor, fy=resize_factor, interpolation=cv2.INTER_AREA)
    return np.vstack([prev_im_shrink, current_im_shrink])

def render_birth_and_heatmap(current_im: np.ndarray,
                             heatmap: np.ndarray,
                             resize_factor: float) -> np.ndarray:

    current_im_shrink = cv2.resize(current_im, None, fx=resize_factor, fy=resize_factor, interpolation=cv2.INTER_AREA)
    heatmap_shrink = cv2.resize(heatmap, None, fx=resize_factor, fy=resize_factor, interpolation=cv2.INTER_AREA)
    heatmap_shrink = np.uint8(heatmap_shrink * 255 / np.max(heatmap_shrink))
    heatmap_shrink = cv2.applyColorMap(heatmap_shrink, cv2.COLORMAP_JET)
    return np.vstack([current_im_shrink, heatmap_shrink])

def overlay_birth_and_heatmap(current_im: np.ndarray, 
                              heatmap: np.ndarray,
                              resize_factor: float) ->np.ndarray:

    current_im_shrink = cv2.resize(current_im, None, fx=resize_factor, fy=resize_factor, interpolation=cv2.INTER_AREA)
    heatmap_shrink = cv2.resize(heatmap, None, fx=resize_factor, fy=resize_factor, interpolation=cv2.INTER_AREA)
    alpha = 0.8
    beta = 0.5
    gamma = 1.0
    return cv2.addWeighted(current_im_shrink, alpha, heatmap_shrink, beta, gamma)


In [10]:

from nuscenes.utils.geometry_utils import points_in_box

channel = 'CAM_FRONT'

width = 1600
height = 900
heatmap_histogramm = np.zeros((height, width), dtype=np.float64)

for idx_scene, scene in enumerate(nusc.scene):
    first_sample_rec = nusc.get('sample', scene['first_sample_token'])
    first_sd_rec = nusc.get('sample_data', first_sample_rec['data'][channel])
    current_sd_rec = first_sd_rec
    prev_sd_rec = None
    prev_im = None

    window_name= '{}'.format(scene['name'])
    # cv2.namedWindow(window_name)
    # cv2.moveWindow(window_name, 0, 0)
    
    has_more_frames = True
    break_all = False
    break_scene = False
    milliseconds = 0
    unique_instances = []
    minimum_visibility = 2
    while has_more_frames:

        # Get annotations, boxes, camera_intrinsic
        # When using BoxVisibility.ALL objects are detected too late in certain cases
        impath, boxes, camera_intrinsic = nusc.get_sample_data(current_sd_rec['token'], box_vis_level=BoxVisibility.ANY)
        if not osp.exists(impath):
            raise Exception('Error: Missing image %s' % impath)

        current_im = cv2.imread(impath)
        if current_sd_rec['is_key_frame']:
            cv2_put_multi_line_text(current_im, "KEYFRAME", (int(current_im.shape[1]/2), int(current_im.shape[0]/2-80)), (255, 0, 0))
        im_contains_unique_instance = False
        is_first_frame = False
        # if prev_im is None:
        #     prev_im = current_im.copy()
        #     is_first_frame = True
        #     cv2_put_multi_line_text(prev_im, "First frame", (int(prev_im.shape[1]/2), int(prev_im.shape[0]/2-25)), (0, 0, 0))
        #     cv2_put_multi_line_text(current_im, "First frame", (int(prev_im.shape[1]/2), int(prev_im.shape[0]/2-25)), (0, 0, 0))

        for box in boxes:
            sample_annotation = nusc.get('sample_annotation', box.token)
            instance = nusc.get('instance', sample_annotation['instance_token'])
            category = nusc.get('category', instance['category_token'])
            # Exclude certain objects
            if category['index'] > 8 and category['index'] < 14:
                continue
            # Only keep objects with certain visibility
            if int(sample_annotation['visibility_token']) < minimum_visibility:
                continue
            skip_box = False
            for attribute_token in sample_annotation['attribute_tokens']:
                attribute_name = nusc.get('attribute', attribute_token)['name']
                if 'without_rider' in attribute_name or 'sitting' in attribute_name:
                    skip_box = True
            if skip_box:
                continue

            # Gather unique instances
            instance_token = sample_annotation['instance_token']

            # print(current_sd_rec)
            # if instance_token not in unique_instances:
            #     im_contains_unique_instance = True
            #     unique_instances.append(instance_token)
            #     if current_sd_rec['timestamp'] != first_sd_rec['timestamp']:
                    # This box can mean a potential object birth
            c = nusc.explorer.get_color(box.name)
            box_render_cv2_text(box, nusc, current_im, view=camera_intrinsic, normalize=True, colors=(c, c, c))
            mask, pixels_tuple = mask_pixels_from_box_cv2(box, width, height, view=camera_intrinsic, normalize=True)
            current_im[pixels_tuple[0], pixels_tuple[1]] = (0, 0, 0)
            heatmap_histogramm[pixels_tuple[0], pixels_tuple[1]] += 1
            
        # birth_im = render_birth_and_prebirth(current_im, heatmap_histogramm, 0.7)
        current_im_and_heatmap = render_birth_and_heatmap(current_im, heatmap_histogramm, 0.7)
        cv2.imshow("birth image and heatmap", current_im_and_heatmap)
        if im_contains_unique_instance or is_first_frame:
            key = cv2.waitKey(1)
        else:
            key = cv2.waitKey(1)
        if key == 27:  # if ESC is pressed, exit.
            cv2.destroyAllWindows()
            break_all = True
            break_scene = True
            break

        # cv2.imshow(window_name, current_im)
        # key = cv2.waitKey(milliseconds)
        # if key == 32: # if space is pressed, pause.
        #     key = cv2.waitKey()
        # if key == 27: # if ESC is pressed, exit.
        #     break_all = True
        #     cv2.destroyAllWindows()
        #     break
        # if key == 110: # if n is pressed, skip current scene.
        #     cv2.destroyAllWindows()
        #     break

        if break_scene:
            break

        if current_sd_rec['next'] == '':
            has_more_frames = False
            # cv2.destroyAllWindows()
        else:
            prev_im = current_im    
            prev_sd_rec = current_sd_rec
            current_sd_rec = nusc.get('sample_data', current_sd_rec['next'])

    filename_im = "/home/serov/code/python/nuscenes-devkit/results/object_heatmaps/birth_heatmap_" + '{0:04d}'.format(idx_scene) + ".png"
    cv2.imwrite(filename_im, cv2.applyColorMap(np.uint8(heatmap_histogramm * 255 / np.max(heatmap_histogramm)), cv2.COLORMAP_JET))
    filename_npy = "/home/serov/code/python/nuscenes-devkit/results/object_heatmaps/birth_heatmap_" + \
        '{0:04d}'.format(idx_scene) + ".npy"
    np.save(filename_npy, heatmap_histogramm)
    if break_all:
        break

cv2.destroyAllWindows()
 
