# Autonomous Perception Robustness Testing Framework (APRTF)
### Development Journal

We show that our general framework can be used on the [NuScenes](https://www.nuscenes.org/) dataset using a multi-stage analysis proposed in ["Perception robustness testing at different levels of generality"](https://www.journalfieldrobotics.org/FR/Papers_files/10_Pezzementi.pdf).

In [None]:
import os

import numpy as np
np.random.seed(42)

from PIL import Image

import matplotlib.pyplot as plt

from nuscenes.utils.geometry_utils import view_points
from nuscenes import NuScenes
data_dir = './data/sets/nuscenes'
nusc = NuScenes(version='v1.0-mini', dataroot=data_dir, verbose=True)

import torch
torch.cuda.empty_cache()

import torchvision.transforms as T
# torchvision reference code
from aprtf.references import utils, engine

import warnings
warnings.filterwarnings('ignore')

print("All packages imported!")

## I. Pedestrian Detection

### Data and Labels

Time-ordered iterator of images and bounding boxes.

In [None]:
def box2bb(box, cam_intrinsic):
    corners = view_points(box.corners(), view=cam_intrinsic, normalize=True)[:2, :]
    bb = np.concatenate([np.min(corners, axis=1), np.max(corners, axis=1)])
    return bb

def bb2rect(bb):
    xmin, ymin, xmax, ymax = bb
    corners = np.array([[xmin, ymin], 
                        [xmin, ymax],
                        [xmax, ymax],
                        [xmax, ymin]])
    return corners

def draw_rect(axis, selected_corners, color):
    prev = selected_corners[-1]
    for corner in selected_corners:
        axis.plot([prev[0], corner[0]], [prev[1], corner[1]], color=color, linewidth=1)
        prev = corner

def render_annotations(axis, scene_data_fp, scene_data_bb):
    im = Image.open(scene_data_fp)
    axis.imshow(im)
    axis.axis('off')
    axis.set_aspect('equal')
    for bb in scene_data_bb:
        colors = ['b'] * 3
        corners = bb2rect(bb)
        draw_rect(axis, corners, colors[0])

def render_results(axis, scene_data_fp, scene_data_bb, scene_data_pred):
    im = Image.open(scene_data_fp)
    axis.imshow(im)
    axis.axis('off')
    axis.set_aspect('equal')
    
    for bb in scene_data_bb:
        colors = ['b'] * 3
        corners = bb2rect(bb)
        draw_rect(axis, corners, colors[0])
        
    for bb in scene_data_pred:
        colors = ['orange'] * 3
        corners = bb2rect(bb)
        draw_rect(axis, corners, colors[0])

In [None]:
category = 'pedestrian'
sensor = 'CAM_FRONT'
scene = nusc.scene[0]

scene_data_fps = []
scene_data_anns = []
scene_data_bbs = []

next_sample_token = scene['first_sample_token']
while next_sample_token:
    sample = nusc.get('sample', next_sample_token)
    sample_data = nusc.get('sample_data', sample['data'][sensor])

    # image filepaths
    sample_data_fp = os.path.join(data_dir,sample_data['filename'])
    scene_data_fps.append(sample_data_fp)

    # bounding boxes
    sample_data_bbs = []
    sample_data_anns = []
    for ann in sample['anns']:
        _, box, cam_intrinsic = nusc.get_sample_data(sample['data'][sensor], selected_anntokens=[ann])
        if len(box) > 1:
            raise ValueError('more than one annotation')
        elif (len(box) == 1) and (category in box[0].name):
            bb = box2bb(box[0], cam_intrinsic)
            sample_data_bbs.append(bb)
            sample_data_anns.append(ann)
    scene_data_bbs.append(sample_data_bbs)
    scene_data_anns.append(sample_data_anns)
    
    next_sample_token = sample['next']

In [None]:
idx = 15
fake_results = np.array(scene_data_bbs[idx]) + 20*np.random.rand(*np.shape(scene_data_bbs[idx]))

# simple testing
fig, (ax1, ax2) = plt.subplots(2,1)

render_annotations(ax1, scene_data_fps[idx], scene_data_bbs[idx])
render_results(ax2, scene_data_fps[idx], scene_data_bbs[idx], fake_results)

### Augmentations

Composition of 

In [None]:
#sample_file = os.path.join(data_dir,sample_data_fp)

### Model

Takes in batch or single image, and outputs pedestrian bounding boxes.

In [None]:
from torchvision.models.detection import retinanet_resnet50_fpn_v2, fasterrcnn_resnet50_fpn_v2
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.retinanet import RetinaNetClassificationHead

num_classes = 2  # 1 class (person) + background

retinanet = retinanet_resnet50_fpn_v2(weights='DEFAULT')        
# replace the pre-trained head with a new one
in_features = retinanet.head.classification_head.cls_logits.in_channels
num_anchors = retinanet.head.classification_head.num_anchors
retinanet.head.classification_head = RetinaNetClassificationHead(in_features, num_classes, num_anchors)

rcnn = fasterrcnn_resnet50_fpn_v2(weights='DEFAULT')
# replace the pre-trained head with a new one
in_features = rcnn.roi_heads.box_predictor.cls_score.in_features
rcnn.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

Training:

In [None]:
class PedestrianDetectionDataset(torch.utils.data.Dataset):
    def __init__(self, images, boxes):
        assert len(images) == len(boxes)
        self.images = images
        self.boxes = boxes
        
        # image processing
        transforms = []
        transforms.append(T.PILToTensor())
        transforms.append(T.ConvertImageDtype(torch.float))
        self.image_transforms = T.Compose(transforms)
        
    def __getitem__(self, idx):
        # image
        img_path = self.images[idx]
        img = Image.open(img_path).convert("RGB")
        img = self.image_transforms(img)
        
        # boxes
        bbs = self.boxes[idx]
        bbs = torch.as_tensor(bbs, dtype=torch.float32)
        bbs = torch.reshape(bbs, (-1,4))
        
        num_objs = len(bbs)
        target = {}
        target["boxes"] = bbs
        target["labels"] = torch.ones((num_objs,), dtype=torch.int64)
        target["image_id"] = torch.tensor([idx])
        target["area"] = (bbs[..., 3] - bbs[..., 1]) * (bbs[..., 2] - bbs[..., 0])
        target["iscrowd"] = torch.zeros((num_objs,), dtype=torch.int64)
        
        return img, target

    def __len__(self):
        return len(self.images)

Forward test:

In [None]:
"""
model = rcnn
dataset = PedestrianDetectionDataset(scene_data_fps, scene_data_bbs)
data_loader = torch.utils.data.DataLoader(
 dataset, batch_size=2, shuffle=True, num_workers=4,
 collate_fn=utils.collate_fn)
# For Training
images,targets = next(iter(data_loader))
images = list(image for image in images)
targets = [{k: v for k, v in t.items()} for t in targets]
output = model(images,targets)   # Returns losses and detections
# For inference
model.eval()
x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)]
predictions = model(x)           # Returns predictions
"""

In [None]:
def generate_random_set_splits(dataset_len, train_frac=0.8, test_frac=0.1, seed=42):
    are_positive = (0 < train_frac) and (0 < test_frac)
    in_range = (train_frac + test_frac <= 1)
    if not (are_positive and in_range):
        raise ValueError("Dataset splits are impossible")
    
    rng = np.random.default_rng(seed)
    idxs = np.arange(dataset_len)
    rand_idxs = rng.permutation(idxs)
    
    train_threshold = int(train_frac* dataset_len)
    test_threshold = int((train_frac+test_frac) * dataset_len)
    train, validate, test = np.split(rand_idxs, [train_threshold, test_threshold])
    return train, validate, test

def generate_set_splits(dataset_len, train_frac=0.8, test_frac=0.1):
    are_positive = (0 < train_frac) and (0 < test_frac)
    in_range = (train_frac + test_frac <= 1)
    if not (are_positive and in_range):
        raise ValueError("Dataset splits are impossible")
    
    idxs = np.arange(dataset_len)
    train_threshold = int(train_frac* dataset_len)
    test_threshold = int((train_frac+test_frac) * dataset_len)
    train, validate, test = np.split(idxs, [train_threshold, test_threshold])
    return train, validate, test

Actual training:

In [None]:
def main(model):
    # train on the GPU or on the CPU, if a GPU is not available
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    # our dataset has two classes only - background and person
    num_classes = 2
    
    # use our dataset and defined transformations
    dataset_train = PedestrianDetectionDataset(scene_data_fps, scene_data_bbs)
    dataset_val = PedestrianDetectionDataset(scene_data_fps, scene_data_bbs)
    dataset_test = PedestrianDetectionDataset(scene_data_fps, scene_data_bbs)
    
    train_idxs, val_idxs, test_idxs = generate_set_splits(len(scene_data_fps))

    # split the dataset in train, val and test set
    dataset_train = torch.utils.data.Subset(dataset_train, train_idxs)
    dataset_val = torch.utils.data.Subset(dataset_val, val_idxs)
    dataset_test = torch.utils.data.Subset(dataset_test, test_idxs)

    # define training and validation data loaders
    data_loader_train = torch.utils.data.DataLoader(
        dataset_train, batch_size=1, shuffle=True, num_workers=4,
        collate_fn=utils.collate_fn)

    data_loader_val = torch.utils.data.DataLoader(
        dataset_val, batch_size=1, shuffle=True, num_workers=4,
        collate_fn=utils.collate_fn)
    
    data_loader_test = torch.utils.data.DataLoader(
        dataset_test, batch_size=1, shuffle=False, num_workers=4,
        collate_fn=utils.collate_fn)

    # move model to the right device
    model.to(device)

    # construct an optimizer
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005,
                                momentum=0.9, weight_decay=0.0005)
    # and a learning rate scheduler
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                   step_size=3,
                                                   gamma=0.1)

    # let's train it for 10 epochs
    num_epochs = 10

    for epoch in range(num_epochs):
        # evaluate on the validation dataset
        engine.evaluate(model, data_loader_val, device=device)
        # train for one epoch, printing every 10 iterations
        engine.train_one_epoch(model, optimizer, data_loader_train, device, epoch, print_freq=10)
        # update the learning rate
        lr_scheduler.step()
    # evaluation on test dataset
    engine.evaluate(model, data_loader_test, device=device)

    print("That's it!")

In [None]:
main(rcnn)

### Metric

Recall and $FPR_A$.

In [None]:
def recall():
    pass

def FPRa():
    pass