# Designing AVs

Now that we've seen how to run perception and tracking in series, we can venture into designing AVs as a stand-alone class. Let's set things up in the usual way. Keep the autoreload if you want to play around with the core or api libraries in editable mode.

In [1]:
import os
import avstack
import avapi
from tqdm import tqdm

%load_ext autoreload
%autoreload 2

data_base = '../../lib-avstack-api/data/'
obj_data_dir_k = os.path.join(data_base, 'KITTI/object')
raw_data_dir_k = os.path.join(data_base, 'KITTI/raw')
obj_data_dir_n = os.path.join(data_base, 'nuScenes')

KSM = avapi.kitti.KittiScenesManager(obj_data_dir_k, raw_data_dir_k, convert_raw=False)
KDM = KSM.get_scene_dataset_by_index(scene_idx=0)

NSM = avapi.nuscenes.nuScenesManager(obj_data_dir_n)
NDM = NSM.get_scene_dataset_by_index(0)

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Subclassing VehicleEgoStack

To create an ego vehicle, we subclass off of AVstack's `VehicleEgoStack`. The only methods we need to implement are (1) how to initialize the modules, and (2) how to process the data as it comes in. All the other piping is handled for us by AVstack.

In [25]:
class OurTestAV(avstack.ego.vehicle.VehicleEgoStack):
    """Our own little AV"""
    
    def _initialize_modules(self, *args, framerate=10, lidar_perception="pointpillars",
        tracking="basic-box-tracker", dataset="kitti", **kwargs):
        """Initialize modules"""
        
        self.perception = {
            "object_3d": avstack.modules.perception.object3d.MMDetObjectDetector3D(
                model=lidar_perception, dataset=dataset, **kwargs
            )
        }
        self.tracking = avstack.modules.tracking.tracker3d.BasicBoxTracker3D(framerate=framerate, **kwargs)
        self.prediction = avstack.modules.prediction.KinematicPrediction(
            dt_pred=1.0/5, t_pred_forward=1, **kwargs,
        )

    def _tick_modules(self, frame, timestamp, data_manager, platform, *args, **kwargs):
        dets_3d = self.perception["object_3d"](
            data_manager.pop("lidar-0"), frame=frame, identifier="lidar_objects_3d")
        tracks_3d = self.tracking(detections=dets_3d, t=timestamp, frame=frame, platform=platform)
        # prediction_3d = self.prediction(tracks_3d, frame=frame)  # FYI prediction is currently slow...

        # standard packing of outputs
        control=None
        localization=None
        dets_2d=None
        tracks_2d=None
        prediction_2d=None
        prediction_3d=None
        plan=None
        return control, (localization, dets_2d, dets_3d, tracks_2d, tracks_3d,\
            prediction_2d, prediction_3d, plan)

## Testing our AV

To test our AV, we can make a simple data-source agnostic loop that stores data inside a `DataManager` class. Because of the design decision to attach sensor-identifying information onto the sensor data itself, the `DataManager` will be able to automatically allocate the sensor data into the appropriate bin - e.g., so that a sensor with very high rate doesn't impact the data buffer for any other sensor.

In [26]:
def run_our_av(DM, av):
    """Run an AV through a sequence of data"""
    data_manager = avstack.datastructs.DataManager(max_size=1)  # we either pop or we lose it
    for frame in tqdm(DM.frames):
        # -- add data --> sensor data has ID attached, so it knows where to go
        data_manager.push(DM.get_lidar(frame, sensor="main_lidar"))
        data_manager.push(DM.get_image(frame, sensor="main_camera"))
        
        # -- run next frame
        t = DM.framerate * frame
        av.tick(frame, t, data_manager, platform=DM.get_calibration(frame, sensor="main_lidar").reference)

t_init = 0
ego_init = None  # we don't need this for an agent without localization
result_folder = 'results_AV'

### KITTI

Once we define the test loop, all that's left is to initialize our AV and put it through the evaluation!

In [27]:
DM = KDM
AV = OurTestAV(t_init, ego_init=None, framerate=DM.framerate, dataset=DM.NAME,
                save_output=True, save_folder=result_folder)
run_our_av(KDM, AV)

Loads checkpoint by local backend from path: /home/spencer/Documents/Projects/AVstack/avstack-docs/lib-avstack-core/third_party/mmdetection3d/checkpoints/kitti/hv_pointpillars_secfpn_6x8_160e_kitti-3d-3class_20220301_150306-37dc2420.pth


100%|█████████████████████████████████████████████████████████████████| 108/108 [00:06<00:00, 17.08it/s]


In [36]:
# %%capture
percep_res_frames, percep_res_seq, _ = avapi.evaluation.get_percep_results_from_folder(
    DM, AV.perception['object_3d'].save_folder, sensor_eval='main_lidar', multiprocess=True)
# track_res_frames, track_res_seq, track_res_exp = avapi.evaluation.get_track_results_from_folder(
#     DM, AV.tracking.save_folder, sensor_eval='main_lidar')

100%|█████████████████████████████████████████████████████████████████| 108/108 [00:21<00:00,  4.97it/s]


In [37]:
print('Aggregate perception results:\n', percep_res_seq, '\n')
# print('Aggregate tracking results:\n', track_res_seq, '\n')
# print(f'     e.g., HOTA_LocA(0): {track_res_exp["HOTA_LocA(0)"]}')

Aggregate perception results:
 {'tot_TP': 0, 'tot_FP': 0, 'tot_FN': 450, 'tot_T': 450, 'mean_precision': 0.0, 'mean_recall': 0.0} 



### nuScenes

Note that the only difference here is that we have to pass in nuScenes as the dataset so the perception models can initialize to the nuScenes weights.

In [38]:
DM = NDM
AV = OurTestAV(t_init, ego_init=None, framerate=DM.framerate, dataset=DM.NAME,
                save_output=True, save_folder=result_folder)
run_our_av(DM, AV)

Loads checkpoint by local backend from path: /home/spencer/Documents/Projects/AVstack/avstack-docs/lib-avstack-core/third_party/mmdetection3d/checkpoints/nuscenes/hv_pointpillars_fpn_sbn-all_fp16_2x8_2x_nus-3d_20201021_120719-269f9dd6.pth


100%|███████████████████████████████████████████████████████████████████| 39/39 [00:02<00:00, 14.32it/s]


In [39]:
# %%capture
percep_res_frames, percep_res_seq, _ = avapi.evaluation.get_percep_results_from_folder(
    DM, AV.perception['object_3d'].save_folder, sensor_eval='main_lidar', multiprocess=True)
# track_res_frames, track_res_seq, track_res_exp = avapi.evaluation.get_track_results_from_folder(
#     DM, AV.tracking.save_folder, sensor_eval='main_lidar')

100%|███████████████████████████████████████████████████████████████████| 39/39 [00:27<00:00,  1.40it/s]


In [40]:
print('Aggregate perception results:\n', percep_res_seq, '\n')
# print('Aggregate tracking results:\n', track_res_seq, '\n')
# print(f'     e.g., HOTA_LocA(0): {track_res_exp["HOTA_LocA(0)"]}')

Aggregate perception results:
 {'tot_TP': 0, 'tot_FP': 0, 'tot_FN': 2057, 'tot_T': 2057, 'mean_precision': 0.0, 'mean_recall': 0.0} 



## Exploring Pre-Made AVs

We've already designed a few AVs we think will be useful while you're spinning up AVstack. 

In [41]:
import sys, inspect

clsmembers = [item[0] for item in reversed(inspect.getmembers(avstack.ego.vehicle, inspect.isclass))]
clsmembers

['VehicleEgoStack',
 'PassthroughAutopilotVehicle',
 'LidarPerceptionAndTrackingVehicle',
 'LidarCollabPerceptionAndTrackingVehicle',
 'LidarCameraPerceptionAndTrackingVehicle',
 'LidarCamera3DFusionVehicle',
 'Level2LidarBasedVehicle',
 'Level2GtPerceptionGtLocalization',
 'GroundTruthBehaviorAgent',
 'GoStraightEgo',
 'AutopilotWithCameraPerception']

Later, we'll get to how we can design "active" AVs that use perception and tracking information to make plans and control actuators in our environment. But for that, we'll need an AV simulator (or real hardware).