In [45]:
from collections import deque
from dataclasses import dataclass
import copy

import numpy as np
import mesa


AREA_STATIC_BONUS = 0.5

MAX_GROUPS = 10
GATE = "G"
OBSTACLE = "#"
GROUP = [str(i) for i in range(MAX_GROUPS)]
EMPTY = " "

MAP_SYMBOLS = {str(i): i for i in range(MAX_GROUPS)}
MAP_SYMBOLS[GATE] = 100
MAP_SYMBOLS[OBSTACLE] = -1
MAP_SYMBOLS[EMPTY] = 0

MAP_VALUES = {v: k for k, v in MAP_SYMBOLS.items()}

In [2]:
def create_map(width, height, gate=None):
    if width < 3 or height < 3:
        raise ValueError("Map cannot have dimensions lower than 3 due to walls")
    dimensions = (height, width)
    grid = np.zeros(shape=dimensions)
    grid[:,0] = -1
    grid[:,-1] = -1
    grid[0, :] = -1
    grid[-1, :] = -1
    if gate:
        gate = (gate[1], gate[0])
        grid[gate] = 100
    return grid

In [3]:
def load_map(filename):
    grid = None
    with open(filename) as f:
        dimensions = list(map(int, f.readline().split()))
        width, height = dimensions
        dimensions = height, width
        grid = np.zeros(shape=dimensions)
        for row in range(height):
            line = f.readline().strip()
            for column, c in enumerate(line):
                if c not in MAP_SYMBOLS:
                    raise ValueError("Unknown character when loading map from file")
                np_coords = (row, column)
                grid[np_coords] = MAP_SYMBOLS[c]
    return grid

In [4]:
def save_map(grid, filename):
    columns, rows = grid.shape
    with open(filename, "w+") as f:
        f.write(str(columns)+" "+str(rows)+"\n")
        for row in range(rows):
            for c in grid[row]:
                f.write(MAP_VALUES[c])
            f.write("\n")

In [5]:
def hex_to_rgb(value):
    """Return (red, green, blue) for the color given as #rrggbb."""
    value = value.lstrip('#')
    lv = len(value)
    return tuple(int(value[i:i + lv // 3], 16) for i in range(0, lv, lv // 3))

def rgb_to_hex(red, green, blue):
    """Return color as #rrggbb for the given color values."""
    return '#%02x%02x%02x' % (red, green, blue)

In [6]:
def agent_portrayal(agent):
    portrayal = {
        "Shape": "rect",
        "Color": agent.color,
        "Filled": "true",
        "Layer": 0,
        "w": 1,
        "h": 1}
    if agent.name.startswith("A"):
        portrayal = {
            "Shape": "circle",
            "Color": agent.color,
            "Filled": "true",
            "Layer": 1,
            "r": 0.5
        }
    
    if agent.name.startswith("F"):
        portrayal = {
            "Shape": "circle",
            "Color": agent.color,
            "Filled": "true",
            "Layer": 2,
            "r": 0.5
        }    
    
    return portrayal


In [7]:
def compute_static_field(grid, normalize=True):
    @dataclass
    class Node():
        coords: (int, int)
        obstacle: bool
        price: int = float("inf")
        
        def enter(self, parent):
            self.price = parent.price + self.distance(parent)
        
        def distance(self, other):
            sigma = np.power(self.coords[0] - other.coords[0], 2)
            sigma += np.power(self.coords[1] - other.coords[1], 2)
            return np.sqrt(sigma)
            
        def neighbours(self, width, height):
            valid_coords = []
            for x in [-1, 0, 1]:
                for y in [-1, 0, 1]:
                    if 0 <= self.coords[0] + x < width and 0 <= self.coords[1] + y < height:
                        if x == 0 and y == 0:
                            continue
                        valid_coords.append((self.coords[0]+x, self.coords[1]+y))
            return valid_coords          
        
    q = deque()
    grid_nodes = []
    gate = None
    height, width = grid.shape
    for y in range(height):
        grid_nodes.append([])
        for x in range(width):
            coords = (x, y)
            np_coords = (y, x)
            node = Node(coords, grid[np_coords] < 0)
            if grid[np_coords] == 100:
                gate = Node(coords, False)
                node = gate
            grid_nodes[y].append(node)
            
    if not gate:
        raise ValueError("Gate is not present in the map. Can't compute static field.")
    
    gate.price = 0
    q.append(gate)
    while q:
        current_node = q.pop()
        while current_node.obstacle:
            current_node = q.pop()
        for coords in current_node.neighbours(width, height):
            x, y = coords
            np_coords = (y, x)
            other_node = grid_nodes[y][x]
            if not other_node.obstacle:
                distance = current_node.distance(other_node)
                if current_node.price + distance < other_node.price:
                    other_node.enter(current_node)
                    q.append(other_node)
    static_field = np.zeros(grid.shape)
    for x in range(width):
        for y in range(height):
            np_coords = (y, x)
            static_field[np_coords] = grid_nodes[y][x].price
    if normalize:
        return  static_field / np.nanmax(static_field[static_field != np.inf])
    return static_field

In [53]:
class LeaderAgent(mesa.Agent):
    def __init__(self, name, model):
        super().__init__(name, model)
        self.name = "A"+name
        self.model = model
        self.color = "red"

    def step(self):
        values = []
        c = []
        for coords in self.model.grid.get_neighborhood(self.pos, moore=True):
            np_coords = coords[1], coords[0]
            cell = self.model.grid[coords[0]][coords[1]][0]
            values.append(cell.static)
            c.append(coords)
        choice = np.argmin(values)
        coords = c[choice]
        cell = self.model.grid[coords[0]][coords[1]][0]
        if cell.enter(self):
            self.model.grid.move_agent(self, coords)
            self.model.update_static(coords)


class Cell(mesa.Agent):
    def __init__(self, name, model, coords, static):
        super().__init__(name, model)
        self.name = "C"+name
        self.model = model
        self.coords = coords
        self.static = static
        color = [0, 255*np.abs(static), 128]
        if color[1] > 255 :
            color[1] = 255
            color[2] = 255
        color[1] = round(color[1])
        self.color = rgb_to_hex(*color)
        self.empty = True
        
    def enter(self, agent):
        if self.empty:
            self.empty = False
            self.agent = agent
            return True
        else:
            return False
    
    def step(self):
        self.empty = True

    
class RoomModel(mesa.Model):
    def __init__(self, n_agents, width, height, gate, planner
        super().__init__()
        self.schedule = mesa.time.BaseScheduler(self)
        self.grid = mesa.space.MultiGrid(width, height, torus=False)
        self.dimensions = (width, height)
        self.gate = gate
        self.planner = planner
        self.planner.add_goal([gate])
        self.sff()
        
        leader_positions = [ (11, 3) ]
        
        follower_positions = [(12, 3), (12, 2), (12, 4), (13, 3), (13, 2), (13, 4)]
                
        # cells
        for x in range(width):
            for y in range(height):
                coords = (x, y)
                np_coords = (y, x)
                static_bonus = 0
                if self.in_checkpoint(coords, checkpoint):
                    static_bonus = AREA_STATIC_BONUS
                cell = Cell(str(x)+"C"+str(y), self, coords, self.static[np_coords] - static_bonus)
                self.grid.place_agent(cell, coords)
                self.schedule.add(cell)
        # leader        
        for coords in leader_positions:
            a = LeaderAgent(str(coords[0])+"A"+str(coords[1]), self)
            self.grid.place_agent(a, coords)
            cell = self.grid[coords[0]][coords[1]][0]
            cell.enter(a)
            self.schedule.add(a)
            self.update_static(coords)
        
        # followers     
        for coords in follower_positions:
            a = FollowerAgent(str(coords[0])+"F"+str(coords[1]), self)
            self.grid.place_agent(a, coords)
            cell = self.grid[coords[0]][coords[1]][0]
            cell.enter(a)
            self.schedule.add(a)
            
    def sff(self):
        width, height = self.dimensions
        room = create_map(width, height, gate=self.gate)
        static_values = compute_static_field(room)
        
            
    def update_static(self, coords):
        np_coords = coords[1], coords[0]
        copy_room = copy.deepcopy(self.room)
        copy_room[np_coords] = 100
        static_values = compute_static_field(copy_room)
        self.follower_static = static_values
        
     
    def in_checkpoint(self, coords, checkpoint):
        x, y = coords
        tlX, tlY = checkpoint[0]
        rbX, rbY = checkpoint[1]
        if tlX <= x <= rbX and tlY <= y <= rbY:
            return True
        else:
            return False
        
    
    def static_potential(self, coords):
        return self.grid[coords[0]][coords[1]].static
        
            
    def step(self):
        self.schedule.step()


In [55]:
dimensions = (15, 7)
width, height = dimensions
gate = (0,3)
grid = mesa.visualization.CanvasGrid(agent_portrayal, width, height, 500, 500)
server = mesa.visualization.ModularServer(
    RoomModel, [grid], "Room Model", {"n_agents": 1, "width": width, 
                                      "height": height, "gate": gate}
)
server.port = np.random.randint(8000, 9000) # The default
server.launch()

Interface starting at http://127.0.0.1:8683


RuntimeError: This event loop is already running

libva error: /usr/lib/x86_64-linux-gnu/dri/iHD_drv_video.so init failed


Opening in existing browser session.
Socket opened!
{"type":"reset"}




{"type":"get_step","step":1}
{"type":"get_step","step":2}
{"type":"get_step","step":3}
{"type":"get_step","step":4}
{"type":"get_step","step":5}
{"type":"get_step","step":6}
{"type":"get_step","step":7}
{"type":"get_step","step":8}
{"type":"get_step","step":9}
{"type":"get_step","step":10}
{"type":"get_step","step":11}
{"type":"get_step","step":12}
{"type":"get_step","step":13}
{"type":"get_step","step":14}
{"type":"get_step","step":15}
{"type":"get_step","step":16}
{"type":"get_step","step":17}
{"type":"get_step","step":18}
{"type":"get_step","step":19}
{"type":"get_step","step":20}
{"type":"get_step","step":21}
{"type":"get_step","step":22}
{"type":"get_step","step":23}
{"type":"get_step","step":24}
{"type":"get_step","step":25}
{"type":"get_step","step":26}
{"type":"get_step","step":27}
{"type":"get_step","step":28}
{"type":"get_step","step":29}
{"type":"get_step","step":30}
{"type":"get_step","step":31}
{"type":"get_step","step":32}
