In [1]:
import torch
import json
import os
from pathlib import Path
import numpy as np

In [4]:
def pairwise(x: torch.Tensor, dim=1):
    """Split the N x A x 2 state vector to return x_ego (N x A x 2) and x_other (N x A x (A - 1) x 4)
    where each column index in x_other corresponds to -i agents for agent i"""
    x_ = x.unsqueeze(dim=dim)
    x = x.unsqueeze(dim=dim + 1)
    _, x_b = torch.broadcast_tensors(x, x_)
    # Create a mask for the diagonal
    mask = 1 - torch.eye(x.shape[dim], device=x.device).unsqueeze(0).unsqueeze(-1)

    # Expand the mask to match the input size
    mask = mask.expand(*x_b.shape)

    # Apply the mask to the input tensor
    result = x_b[mask.bool()].view(x.shape[:2] + (x.shape[1] - 1, x.shape[-1]))
    return x.squeeze(dim=dim + 1), result

In [2]:
class Env:
    def __init__(self, envFile):
        self.file_path = Path(envFile)
        with open(envFile) as f:
            self.data = json.load(f)
        self.parseMap()
        self.parseAgents()
        self.parseTasks()
        self.numTasksReveal = int(self.data["numTasksReveal"])
        self.taskAssignmentStrategy = self.data["taskAssignmentStrategy"]

    def parseMap(self):
        with open(self.file_path.parent/self.data["mapFile"]) as f:
            lines = f.readlines()
        type_map = lines[0].strip().split(" ")
        height = int(lines[1].strip().split(" ")[1])
        width = int(lines[2].strip().split(" ")[1])
        map_str = lines[3]
        self.map_1darr = []
        for line in lines[4:]:
            for char in line.strip():
                self.map_1darr.append(char)
        
    def parseAgents(self):
        with open(self.file_path.parent/self.data["agentFile"]) as f:
            lines = f.readlines()
        self.n_agents = int(lines[0].strip().split(" ")[0])
        self.agents = []
        for line in lines[1:]:
            self.agents.append(int(line.strip().split(" ")[0]))
        self.agents = np.array(self.agents)

    def parseTasks(self):
        with open(self.file_path.parent/self.data["taskFile"]) as f:
            lines = f.readlines()
        self.n_tasks = int(lines[0].strip().split(" ")[0])
        self.tasks = []
        for line in lines[1:]:
            self.tasks.append(int(line.strip().split(" ")[0]))
        self.tasks = np.array(self.tasks)

class BatchedEnvironment:
    def __init__(self, environments, num_envs):
        self.maps = torch.stack([torch.tensor(env.map, dtype=torch.float32) for env in environments])
        self.curr_states = torch.stack([torch.tensor(env.agents) for env in environments])
        self.goal_locations = torch.stack([torch.tensor(env.tasks) for env in environments])

    def compute_goal_achieved(self):
        return torch.lingalg.norm(self.curr_state - self.goal_locations, dim=-1) == 0
    
    def compute_collision(self):
        """Construct the agent by agent distance matrix and check for two agents occupying the same cell"""
        x_ego, x_other = pairwise(self.curr_states)
        collisions = torch.norm(x_ego.unsqueeze(dim=-2) - x_other, dim=-1) < 0.01
        return collisions.any(dim=-1)

In [5]:
p = ["./example_problems/city.domain/paris_200.json"]
envs = []
for e in p:
    envs.append(Env(e))

(200,)