In [1]:
import os
import sys
import time
import json
import torch
import logging
from collections import defaultdict
import matplotlib.pyplot as plt

#%matplotlib inline
import datetime
import torch.utils.data
import torch.nn as nn
import torch.nn.functional as F
from PIL import Image, ImageFile
import cv2

%matplotlib inline

#%run notebooks/printer
%run functions/architectures
%run functions/printer

import numpy as np
from nuscenes.nuscenes import NuScenes
from nuscenes.utils import splits

### Definition of the paths and constants : 

In [2]:
my_models = ["/home/bonnesoe/semester_project/monoloco/data/models/old/hyp-monoloco-boxes.pkl"]  # Trained Monoloco models for the boxes of nuscenes
my_models_types = ["FULL"]                                                                       # Tag for the dataset (not really important)

CAMERAS = ('CAM_FRONT', 'CAM_FRONT_LEFT', 'CAM_FRONT_RIGHT', 'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT')
dic_jo = {'train': dict(X=[], Y=[], names=[], kps=[], boxes_3d=[], K=[], clst=defaultdict(lambda: defaultdict(list))),
          'val': dict(X=[], Y=[], names=[], kps=[], boxes_3d=[], K=[],clst=defaultdict(lambda: defaultdict(list))),
          'test': dict(X=[], Y=[], names=[], kps=[], boxes_3d=[], K=[], clst=defaultdict(lambda: defaultdict(list)))
          }
dic_names = defaultdict(lambda: defaultdict(list))

dir_nuscenes='/data/bonnesoeur-data/data/nuscenes/' # Path to the nuscenes dataset
dataset = "nuscenes_teaser"                         # Type of dataset (nuscenes_teaser or nuscenes)

### CODE

In [3]:
class MonoLoco:

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    INPUT_SIZE = 9 * 2                   # keypoints (borders and center of the bbox of nuscenes) 
    LINEAR_SIZE = 256
    N_SAMPLES = 100

    def __init__(self, model, device=None, n_dropout=0, p_dropout=0.2):

        if not device:
            self.device = torch.device('cpu')
        else:
            self.device = device
        self.n_dropout = n_dropout
        self.epistemic = bool(self.n_dropout > 0)

        # if the path is provided load the model parameters
        if isinstance(model, str):
            model_path = model
            self.model = LinearModel(p_dropout=p_dropout, input_size=self.INPUT_SIZE, linear_size=self.LINEAR_SIZE)
            self.model.load_state_dict(torch.load(model_path, map_location=lambda storage, loc: storage))

        # if the model is directly provided
        else:
            self.model = model
        self.model.eval()  # Default is train
        self.model.to(self.device)

    def forward(self, keypoints, kk):
        """forward pass of monoloco network"""
        if not keypoints:
            return None, None

        with torch.no_grad():
            inputs = preprocess_monoloco(torch.tensor(keypoints).to(self.device), torch.tensor(kk).to(self.device))
            if self.n_dropout > 0:
                self.model.dropout.training = True  # Manually reactivate dropout in eval
                total_outputs = torch.empty((0, inputs.size()[0])).to(self.device)

                for _ in range(self.n_dropout):
                    outputs = self.model(inputs)
                    outputs = unnormalize_bi(outputs)
                    samples = laplace_sampling(outputs, self.N_SAMPLES)
                    total_outputs = torch.cat((total_outputs, samples), 0)
                varss = total_outputs.std(0)
                self.model.dropout.training = False
            else:
                varss = torch.zeros(inputs.size()[0])

            #  Don't use dropout for the mean prediction
            outputs = self.model(inputs)
            outputs = unnormalize_bi(outputs)
        return outputs, varss

    @staticmethod
    def post_process(outputs, varss, boxes, keypoints, kk, dic_gt=None, iou_min=0.3):
        """Post process monoloco to output final dictionary with all information for visualizations"""

        dic_out = defaultdict(list)
        if outputs is None:
            return dic_out

        if dic_gt:
            boxes_gt, dds_gt = dic_gt['boxes'], dic_gt['dds']
            matches = [(idx, idx) for idx, _ in enumerate(boxes)]
            print("found {} matches with ground-truth".format(len(matches)))
        else:
            matches = [(idx, idx) for idx, _ in enumerate(boxes)]  # Replicate boxes

        matches = [(idx, idx) for idx, _ in enumerate(boxes)]
        
        
        matches = reorder_matches(matches, boxes, mode='left_right')
        uv_shoulders = get_keypoints(keypoints, mode='shoulder')
        uv_centers = get_keypoints(keypoints, mode='center')
        xy_centers = pixel_to_camera(uv_centers, kk, 1)
        
                # Match with ground truth if available
        for idx, idx_gt in matches:
            dd_pred = float(outputs[idx][0])
            ale = float(outputs[idx][1])
            var_y = float(varss[idx])
            dd_real = dds_gt[idx_gt] if dic_gt else dd_pred

            kps = keypoints[idx]
            box = boxes[idx]
            uu_s, vv_s = uv_shoulders.tolist()[idx][0:2]
            uu_c, vv_c = uv_centers.tolist()[idx][0:2]
            uv_shoulder = [round(uu_s), round(vv_s)]
            uv_center = [round(uu_c), round(vv_c)]
            xyz_real = xyz_from_distance(dd_real, xy_centers[idx])
            xyz_pred = xyz_from_distance(dd_pred, xy_centers[idx])
            dic_out['boxes'].append(box)
            dic_out['boxes_gt'].append(boxes_gt[idx_gt] if dic_gt else boxes[idx])
            dic_out['dds_real'].append(dd_real)
            dic_out['dds_pred'].append(dd_pred)
            dic_out['stds_ale'].append(ale)
            dic_out['stds_epi'].append(var_y)
            dic_out['xyz_real'].append(xyz_real.squeeze().tolist())
            dic_out['xyz_pred'].append(xyz_pred.squeeze().tolist())
            dic_out['uv_kps'].append(kps)
            dic_out['uv_centers'].append(uv_center)
            dic_out['uv_shoulders'].append(uv_shoulder)

        return dic_out


In [4]:
def preprocess_monoloco(keypoints, kk):

    """ Preprocess batches of inputs
    keypoints = torch tensors of (m, 3, 14)  or list [3,14]
    Outputs =  torch tensors of (m, 34) in meters normalized (z=1) and zero-centered using the center of the box
    """
    if isinstance(keypoints, list):
        keypoints = torch.tensor(keypoints)
    if isinstance(kk, list):
        kk = torch.tensor(kk)
    # Projection in normalized image coordinates and zero-center with the center of the bounding box
    uv_center = get_keypoints(keypoints, mode='center')
    xy1_center = pixel_to_camera(uv_center, kk, 10)
    xy1_all = pixel_to_camera(keypoints[:, 0:2, :], kk, 10)
    kps_norm = xy1_all - xy1_center.unsqueeze(1)  # (m, 17, 3) - (m, 1, 3)
    kps_out = kps_norm[:, :, 0:2].reshape(kps_norm.size()[0], -1)  # no contiguous for view
    return kps_out 

def prepare_kps(kps_in):
    """Convert from a list of 18 to a list of 3, 9"""

    kps = []
    for kp_in in kps_in :
        assert len(kp_in) % 2 == 0, "keypoints expected as a multiple of 2"
        xxs = kp_in[0:][::2]
        yys = kp_in[1:][::2]  # from offset 1 every 2
        ccs = [1]*len(kp_in[1:][::2])
        kps.append([xxs, yys, ccs])
    return kps


def pixel_to_camera(uv_tensor, kk, z_met):
    """
    Convert a tensor in pixel coordinate to absolute camera coordinates
    It accepts lists or torch/numpy tensors of (m, 2) or (m, x, 2)
    where x is the number of keypoints
    """
    if isinstance(uv_tensor, (list, np.ndarray)):
        uv_tensor = torch.tensor(uv_tensor)
    if isinstance(kk, list):
        kk = torch.tensor(kk)
    if uv_tensor.size()[-1] != 2:
        uv_tensor = uv_tensor.permute(0, 2, 1)  # permute to have 2 as last dim to be padded
        assert uv_tensor.size()[-1] == 2, "Tensor size not recognized"
    uv_padded = F.pad(uv_tensor, pad=(0, 1), mode="constant", value=1)  # pad only last-dim below with value 1

    kk_1 = torch.inverse(kk)
    xyz_met_norm = torch.matmul(uv_padded, kk_1.t())  # More general than torch.mm
    xyz_met = xyz_met_norm * z_met

    return xyz_met


def get_keypoints(keypoints, mode):
    """
    Extract center, shoulder or hip points of a keypoint
    Input --> list or torch/numpy tensor [(m, 3, 9) or (3, 9)]
    Output --> torch.tensor [(m, 2)]
    """
    if isinstance(keypoints, (list, np.ndarray)):
        keypoints = torch.tensor(keypoints)
    if len(keypoints.size()) == 2:  # add batch dim
        keypoints = keypoints.unsqueeze(0)
        
    assert len(keypoints.size()) == 3 and keypoints.size()[1] == 3, "tensor dimensions not recognized"
    assert mode in ['center', 'bottom', 'head', 'shoulder', 'hip', 'ankle']

    kps_in = keypoints[:, 0:2, :]  # (m, 2, 9)
    if mode == 'center':
        kps_max, _ = kps_in.max(2)  # returns value, indices
        kps_min, _ = kps_in.min(2)
        kps_out = (kps_max - kps_min) / 2 + kps_min   # (m, 2) as keepdims is False

    elif mode == 'bottom':  # bottom center for kitti evaluation
        kps_max, _ = kps_in.max(2)
        kps_min, _ = kps_in.min(2)
        kps_out_x = (kps_max[:, 0:1] - kps_min[:, 0:1]) / 2 + kps_min[:, 0:1]
        kps_out_y = kps_max[:, 1:2]
        kps_out = torch.cat((kps_out_x, kps_out_y), -1)

    elif mode == 'head':
        kps_out = kps_in[:, :, 0:5].mean(2)

    elif mode == 'shoulder':
        kps_out = kps_in[:, :, 5:7].mean(2)

    elif mode == 'hip':
        kps_out = kps_in[:, :, 11:13].mean(2)

    elif mode == 'ankle':
        kps_out = kps_in[:, :, 15:17].mean(2)

    return kps_out  # (m, 2)

def xyz_from_distance(distances, xy_centers):
    """
    From distances and normalized image coordinates (z=1), extract the real world position xyz
    distances --> tensor (m,1) or (m) or float
    xy_centers --> tensor(m,3) or (3)
    """

    if isinstance(distances, float):
        distances = torch.tensor(distances).unsqueeze(0)
    if len(distances.size()) == 1:
        distances = distances.unsqueeze(1)
    if len(xy_centers.size()) == 1:
        xy_centers = xy_centers.unsqueeze(0)

    assert xy_centers.size()[-1] == 3 and distances.size()[-1] == 1, "Size of tensor not recognized"

    return xy_centers * distances / torch.sqrt(1 + xy_centers[:, 0:1].pow(2) + xy_centers[:, 1:2].pow(2))


def open_image(path_image):
    with open(path_image, 'rb') as f:
        pil_image = Image.open(f).convert('RGB')
        return pil_image
    
def reorder_matches(matches, boxes, mode='left_rigth'):
    """
    Reorder a list of (idx, idx_gt) matches based on position of the detections in the image
    ordered_boxes = (5, 6, 7, 0, 1, 4, 2, 4)
    matches = [(0, x), (2,x), (4,x), (3,x), (5,x)]
    Output --> [(5, x), (0, x), (3, x), (2, x), (5, x)]
    """

    assert mode == 'left_right'

    # Order the boxes based on the left-right position in the image and
    ordered_boxes = np.argsort([box[0] for box in boxes])  # indices of boxes ordered from left to right
    matches_left = [idx for (idx, _) in matches]

    return [matches[matches_left.index(idx_boxes)] for idx_boxes in ordered_boxes if idx_boxes in matches_left]



In [5]:
def unnormalize_bi(outputs):
    """Unnormalize relative bi of a nunmpy array"""

    outputs[:, 1] = torch.exp(outputs[:, 1]) * outputs[:, 0]
    return outputs

In [6]:
class ImageList(torch.utils.data.Dataset):
    """It defines transformations to apply to images and outputs of the dataloader"""
    def __init__(self, image_paths, scale):
        self.image_paths = image_paths
        print(img_path)
        self.scale = scale

    def __getitem__(self, index):
        image_path = self.image_paths[index]
        ImageFile.LOAD_TRUNCATED_IMAGES = True
        with open(image_path, 'rb') as f:
            image = Image.open(f).convert('RGB')

        if self.scale > 1.01 or self.scale < 0.99:
            image = torchvision.transforms.functional.resize(image,
                                                             (round(self.scale * image.size[1]),
                                                              round(self.scale * image.size[0])),
                                                             interpolation=Image.BICUBIC)
        # PIL images are not iterables
        original_image = torchvision.transforms.functional.to_tensor(image)  # 0-255 --> 0-1
        image = image_transform(image)
        
        

        return image_path, original_image, image

    def __len__(self):
        return len(self.image_paths)

In [7]:
def factory_for_gt(im_size, name=None, path_gt=None):
    """Look for ground-truth annotations file and define calibration matrix based on image size """

    try:
        with open(path_gt, 'r') as f:
            dic_names = json.load(f)
        print('-' * 120 + "\nGround-truth file opened")
    except (FileNotFoundError, TypeError):
        print('-' * 120 + "\nGround-truth file not found")
        dic_names = {}

    try:
        kk = dic_names[name]['K']
        dic_gt = dic_names[name]
        print("Matched ground-truth file!")
    except KeyError:
        dic_gt = None
        x_factor = im_size[0] / 1600
        y_factor = im_size[1] / 900
        pixel_factor = (x_factor + y_factor) / 2   # TODO remove and check it
        if im_size[0] / im_size[1] > 2.5:
            kk = [[718.3351, 0., 600.3891], [0., 718.3351, 181.5122], [0., 0., 1.]]  # Kitti calibration
        else:
            kk = [[1266.4 * pixel_factor, 0., 816.27 * x_factor],
                  [0, 1266.4 * pixel_factor, 491.5 * y_factor],
                  [0., 0., 1.]]  # nuScenes calibration

        print("Using a standard calibration matrix...")

    return kk, dic_gt

In [8]:
def factory_outputs(args, images_outputs, output_path, pifpaf_outputs, dic_out=None, kk=None):
    """Output json files or images according to the choice"""

    if any((xx in output_types for xx in ['front', 'bird', 'combined'])):
        epistemic = False
        if args.n_dropout > 0:
            epistemic = True

        if dic_out['boxes']:  # Only print in case of detections
            printer = Printer(images_outputs[1], output_path, kk, output_types=output_types
                              , z_max=22, epistemic=epistemic) #default value for zmax 
            figures, axes = printer.factory_axes()
            printer.draw(figures, axes, dic_out, images_outputs[1], draw_box=args.draw_box,
                         save=True, show=args.show)

    """if 'json' in args.output_types:
        with open(os.path.join(output_path + '.monoloco.json'), 'w') as ff:
            json.dump(dic_out, ff)"""

### Extraction of the relevant informations from the nuscenes dataset

In [9]:
def extract_from_token(sd_token):

        boxes_gt = []
        dds = []
        boxes_3d = []

        keypoints = []
        path_im, boxes_obj, kk = nusc.get_sample_data(sd_token, box_vis_level=1)  # At least one corner$

    
        kk = kk.tolist()
        name = os.path.basename(path_im)
        for box_obj in boxes_obj:
            if box_obj.name[:6] != 'animal':
                general_name = box_obj.name.split('.')[0] + '.' + box_obj.name.split('.')[1]
            else:
                general_name = 'animal'
            if general_name in select_categories('car'):
                 
                keypoint = project_2d(box_obj, kk)
                dd = np.linalg.norm(box_obj.center)
                bound = prepare_kps([keypoint])[0]
                box_gt = [min(bound[0]), min(bound[1]), max(bound[0]), max(bound[1])]
                
                dds.append(dd)
                
                
                box_3d = box_obj.center.tolist() + box_obj.wlh.tolist()
                boxes_3d.append(box_3d)
                keypoints.append(keypoint)   #Get the edges and the center of the box in the 2D coordinates
                boxes_gt.append(box_gt)
                
                dic_names[name]['boxes'].append(box_gt)
                dic_names[name]['dds'].append(dd)
                dic_names[name]['K'] = kk

        return name, path_im, boxes_gt, boxes_3d, dds, kk, keypoints

In [10]:
def select_categories(cat):
    """
    Choose the categories to extract annotations from
    """
    assert cat in ['person', 'all', 'car', 'cyclist']

    if cat == 'person':
        categories = ['human.pedestrian']
    elif cat == 'all':
        categories = ['human.pedestrian', 'vehicle.bicycle', 'vehicle.motorcycle']
    elif cat == 'cyclist':
        categories = ['vehicle.bicycle']
    elif cat == 'car':
        categories = ['vehicle.car', 'vehicle.truck']
    return categories

In [15]:
def factory(dataset, dir_nuscenes):
    """Define dataset type and split training and validation"""

    assert dataset in ['nuscenes', 'nuscenes_mini', 'nuscenes_teaser']
    if dataset == 'nuscenes_mini':
        version = 'v1.0-mini'
    else:
        version = 'v1.0-trainval'

    nusc = NuScenes(version=version, dataroot=dir_nuscenes, verbose=True)
    scenes = nusc.scene

    if dataset == 'nuscenes_teaser':
        with open("./splits/nuscenes_teaser_scenes.txt", "r") as file:
            teaser_scenes = file.read().splitlines()
        scenes = [scene for scene in scenes if scene['token'] in teaser_scenes]
        with open("./splits/split_nuscenes_teaser.json", "r") as file:
            dic_split = json.load(file)
        split_train = [scene['name'] for scene in scenes if scene['token'] in dic_split['train']]
        split_val = [scene['name'] for scene in scenes if scene['token'] in dic_split['val']]
    else:
        split_scenes = splits.create_splits_scenes()
        split_train, split_val = split_scenes['train'], split_scenes['val']

    return nusc, scenes, split_train, split_val

In [16]:
def process_keypoints(box):
    
    xc, yc, zc = box.center
    ww, ll, hh, = box.wlh
    x,y,z = box.corners().tolist()
    
    x.append(xc)
    y.append(yc)
    z.append(zc)

    return [x, y, z]

In [17]:
def project_2d(box, kk):
    """
    Project a 3D bounding box into the pixel frame using the center and the corners of the box
    """
    box_2d = []
    xc, yc, zc = box.center
    ww, ll, hh, = box.wlh
    x,y,z = box.corners().tolist()
    
    x.append(xc)
    y.append(yc)
    z.append(zc)
    
    corners_3d = np.array([[a,b,c] for a,b,c in zip(x,y,z) ])

    # Project them and convert into pixel coordinates
    for xyz in corners_3d:

        xx, yy, zz = np.dot(kk, xyz)
        uu = xx / zz
        vv = yy / zz
        box_2d.append(0 if uu < 0 else 1600 if uu>1600 else uu) #TODO replace the magic numbers
        box_2d.append(0 if vv < 0 else 900 if vv>900 else vv)   #TODO replace the magic numbers
    #print(box_2d)
    return box_2d

In [18]:
nusc, scenes, split_train, split_val = factory(dataset,dir_nuscenes)

Loading NuScenes tables for version v1.0-trainval...
23 category,
8 attribute,
4 visibility,
64386 instance,
12 sensor,
10200 calibrated_sensor,
2631083 ego_pose,
68 log,
850 scene,
34149 sample,
2631083 sample_data,
1166187 sample_annotation,
4 map,
Done loading in 36.0 seconds.
Reverse indexing ...
Done reverse indexing in 9.4 seconds.


## Processing routine to extract the images informations with the path of the image

In [19]:
output_path = '/data/bonnesoeur-data/vizualization_monoloco/results'

In [20]:
def factory_outputs( images_outputs, output_path, pifpaf_outputs, dic_out=None, kk=None):
    """Output json files or images according to the choice"""

    if any((xx in args.output_types for xx in ['front', 'bird', 'combined'])):
        epistemic = False
        if args.n_dropout > 0:
            epistemic = True

        if dic_out['boxes']:  # Only print in case of detections
            printer = Printer(images_outputs[1], output_path, kk, output_types='combined'
                              , z_max=33, epistemic=epistemic)
            figures, axes = printer.factory_axes()
            printer.draw(figures, axes, dic_out, images_outputs[1], draw_box=args.draw_box,
                         save=True, show=args.show)

In [21]:
monolocos = []

for my_model in my_models:

    monoloco = MonoLoco(my_model, device=torch.device('cuda'))
    
    monolocos.append(monoloco)
        #dic_out = monoloco.post_process(outputs, varss, boxes, keypoints, kk, dic_gt=None)


def run():
    
    cnt_samples=cnt_sd=0
    
    
    for ii, scene in enumerate(scenes):    
        current_token = scene['first_sample_token']
        print(ii)
        
        # Select the scenes that you want
        if(ii<1 or ii>4):
            continue
        while not current_token == "":
            sample_dic = nusc.get('sample', current_token)
            cnt_samples += 1
            i=0
            # Extract all the sample_data tokens for each sample
            
            for cam in CAMERAS:
                sd_token = sample_dic['data'][cam]
                cnt_sd += 1

                # Extract all the annotations of the person
                name, path_img, boxes_gt, boxes_3d, dds, kk, keypoints = extract_from_token(sd_token)
                
                print(kk)
                if keypoints :
                    
                    inputs = prepare_kps(keypoints)
                    
                    dic_gt={
                        "boxes":boxes_gt,
                        "dds":dds
                    }
                    
                    
                    for nn ,nn_type in zip(monolocos,my_models_types):                        
                        spacing = "_"*150
                        print(spacing)
                        print(nn_type)
                        
                        outputs, varss = nn.forward(inputs, kk)
                        print(outputs)
                        dic_out = nn.post_process(outputs, varss, boxes_gt ,inputs, kk, dic_gt)

                        with open(path_img, 'rb') as f:
                            pil_image = Image.open(f).convert('RGB')
                            image_output=pil_image

                        #TODO : modification of printer.py to receive my inpuths as ground truth
                        printer = Printer(image_output, output_path, kk, output_types='combined'
                              , z_max=40, epistemic=False)
                        figures, axes = printer.factory_axes()
                        printer.draw(figures, axes, dic_out, image_output, draw_box=True,
                                    save=True, show=True)
                        plt.show()
                

                    
            current_token = sample_dic['next']    
            
        

FileNotFoundError: [Errno 2] No such file or directory: '/home/bonnesoe/semester_project/monoloco/data/models/old/hyp-monoloco-boxes.pkl'

In [None]:
run()