In [None]:
import avstack
import avapi
import random

## Define Multi-Agent Algorithms

In [None]:
class _AgentModel():
    """Base class for agent fusion algorithms"""
    def __init__(self, perception, tracking):
        self.perception = perception
        self.tracking = tracking
        
    def ingest(self):
        raise NotImplementedError


class LidarPerceptionAndTracking(_AgentModel):
    """Only performs ego-based perception and tracking"""
    def __init__(self, perception, *args, **kwargs):
        tracking = avstack.modules.tracking.tracker3d.BasicBoxTracker3D()
        super().__init__(self, perception, tracking)

    def ingest(self, ego_pc, agents_pcs):
        tracks_ego = self.tracking(self.perception['ego'](ego_pc))
        return tracks_ego


class FusionAtTrackingWithDetections(_AgentModel):
    """Treat detections from other agents as detections""" 
    def __init__(self, perception, n_agents, *args, **kwargs):
        tracking = avstack.modules.tracking.tracker3d.BasicBoxTracker3D()
        super().__init__(self, perception, tracking)

    def ingest(self, ego_pc, agents_pcs):
        dets_ego = self.perception['ego'](ego_pc)
        dets_all = [self.perception['agents'](agents_pcs[i])) for i in range(len(agents_pcs))]
        dets_all.append(dets_ego)
        for dets in random.shuffle(dets_all):  # shuffle to avoid bias
            tracks_out = self.tracking(dets)  # TODO: need to adjust the configuration of the tracker?
        return tracks_out


class FusionAtTrackingWithTracks(_AgentModel):
    """Treat tracks from other agents as detections""" 
    def __init__(self, perception, n_agents, *args, **kwargs):
        tracking = {
            'ego':avstack.modules.tracking.tracker3d.BasicBoxTracker3D()
            'agents':[avstack.modules.tracking.tracker3d.BasicBoxTracker3D() for _ in range(n_agents)]
        }
        super().__init__(self, perception, tracking)

    def ingest(self, ego_pc, agents_pcs):
        dets_ego = self.perception['ego'](ego_pc)
        pseudo_dets_all = [self.tracking['agents'][i](self.perception['agents'](agents_pcs[i])) for i in range(len(agents_pcs))]
        pseudo_dets_all.append(dets_ego)
        for pseudo_dets in random.shuffle(pseudo_dets_all):  # shuffle to avoid bias
            tracks_out = self.tracking(pseudo_dets)  # TODO: need to adjust the configuration of the tracker?
        return tracks_out


class DedicatedFusionLiDAR(_AgentModel):
    """Performs distributed data fusion on tracks from other agents"""
    def __init__(self, perception, n_agents, *args, **kwargs):
        tracking = {
            'ego':avstack.modules.tracking.tracker3d.BasicBoxTracker3D()
            'agents':[avstack.modules.tracking.tracker3d.BasicBoxTracker3D() for _ in range(n_agents)]
        }
        super().__init__(self, perception, tracking)

    def fusion(self, tracks_ego, tracks_agents):
        pass
    
    def ingest(self, ego_pc, agents_pcs):
        tracks_ego = self.tracking['ego'](self.perception['ego'](ego_pc))
        tracks_agents = [self.tracking['agents'][i](self.perception['agents'](agents_pcs[i])) for i in range(len(agents_pcs))]
        tracks_out = self.fusion(tracks_ego, tracks_agents)

In [None]:
# perception always the same
perception = {
    'ego':avstack.modules.perception.object3d.MMDetObjectDetector3D(model='pointpillars', dataset='carla')
    'agents':avstack.modules.perception.object3d.MMDetObjectDetector3D(model='pointpillars', dataset='carla-infrastructure')
}

## Run Multi-Agent Experiments