In [34]:
# import libraries
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib.animation import FuncAnimation
import mesa
from mesa import Agent, Model
from mesa.space import MultiGrid
from mesa.datacollection import DataCollector
import numpy as np
import math
from IPython.display import HTML
from matplotlib import rc
import pandas as pd
import numpy.lib.recfunctions as rf
from queue import Queue
import collections
import time
import sys
import toml

rngbeta = np.random.default_rng()
rngspeed = np.random.default_rng()


This code runs with mesa version 3.00 or higher

In [35]:
# check mesa version
print("Mesa version: ", mesa.__version__)

Mesa version:  3.1.0


In [36]:
# Function to calculate Euclidean distance (distance between two points)
def euclidean_distance(pos1, pos2):
    # Use the formula to calculate distance between two coordinates
    return math.sqrt((pos1[0] - pos2[0]) ** 2 + (pos1[1] - pos2[1]) ** 2)

# Function to calculate integer grid points between two coordinates
def connect2(ends):
    d0, d1 = np.diff(ends, axis=0)[0]
    if np.abs(d0) > np.abs(d1):
        return np.c_[np.arange(ends[0, 0], ends[1,0] + np.sign(d0), np.sign(d0), dtype=np.int32),
                     np.arange(ends[0, 1] * np.abs(d0) + np.abs(d0)//2,
                               ends[0, 1] * np.abs(d0) + np.abs(d0)//2 + (np.abs(d0)+1) * d1, d1, dtype=np.int32) // np.abs(d0)]
    else:
        return np.c_[np.arange(ends[0, 0] * np.abs(d1) + np.abs(d1)//2,
                               ends[0, 0] * np.abs(d1) + np.abs(d1)//2 + (np.abs(d1)+1) * d0, d0, dtype=np.int32) // np.abs(d1),
                     np.arange(ends[0, 1], ends[1,1] + np.sign(d1), np.sign(d1), dtype=np.int32)]

# Transforms the raw model grid data into usable arrays for the unblocked vision function
def prep_vision(vision_area, obstacles):
    # Find all obstacles within the vision range
    vision_array = np.asarray(vision_area)
    obstacles = np.asarray(obstacles)
    nrows, ncols = vision_array.shape
    dtype={'names':['f{}'.format(i) for i in range(ncols)], 'formats':ncols * [vision_array.dtype]}
    obstacles_in_sight = np.intersect1d(vision_array.view(dtype), obstacles.view(dtype))
    obstacles_in_sight = rf.structured_to_unstructured(obstacles_in_sight).tolist()

    # Trim vision_array down to only the edges (optimisation)
    xtrim = np.array([np.min(vision_array[:, 0]), np.max(vision_array[:, 0])])
    ytrim = np.array([np.min(vision_array[:, 1]), np.max(vision_array[:, 1])])

    xmask = np.isin(element = vision_array[:, 0], test_elements=xtrim)
    ymask = np.isin(element = vision_array[:, 1], test_elements=ytrim)

    xarray = vision_array[xmask]
    yarray = vision_array[ymask]
    vision_array = np.vstack((xarray, yarray))

    return vision_array, obstacles_in_sight

# Function to find all visible tiles to an agent around obstacles; call this function for enhanced vision behaviour
def unblocked_vision(pos, vision_area, obstacles):
    # Prepare data
    vision_array, obstacles_in_sight = prep_vision(vision_area, obstacles)
    x0 = pos[0]
    y0 = pos[1]
    vis_area = []

    # Iterate over all the tiles within the vision area
    for tile in vision_array[:, 0:2]:
        x1 = tile[0]
        y1 = tile[1]
        # Calculate simple sightline for straight lines (2D array); not covered by function
        if x0 == x1:
            if y0 > y1:
                liny = np.vstack(np.arange(y1, y0 + 1)[::-1])
            else:
                liny = np.vstack(np.arange(y0, y1 + 1))
            linx = (np.ones((len(liny[:, 0]), 1))*x0).astype(int)
            visline = np.hstack((linx, liny))
        elif y0 == y1:
            if x0 > x1:
                linx = np.vstack(np.arange(x1, x0 + 1)[::-1])
            else:
                linx = np.vstack(np.arange(x0, x1 + 1))
            liny = (np.ones((len(linx[:, 0]), 1))*y0).astype(int)
            visline = np.hstack((linx, liny))

        # Use function to obtain sightline for diagonals (2D array)
        else:
            coords = np.array([[x0, y0],
                               [x1, y1]])
            visline = connect2(coords)

        # Add all possible sighted tiles (not blocked by an obstacle) to new vision list of tuples
        for loc in visline[:, 0:2]:
            vis_area.append((int(loc[0]), int(loc[1])))
            if [loc[0], loc[1]] in obstacles_in_sight:
                break

    # Remove duplicates from the vision list
    vision_area = list(set(vis_area))

    return vision_area, obstacles_in_sight

# Tool to plot the vision area of a single agent; only use for development purposes
def plot_vision(pos, vision, obstacles, vision_area):
    viz = np.zeros((vision*2 + 1, vision*2 + 1))
    posx = pos[0] - vision
    posy = pos[1] - vision
    for i in range(len(viz[0, :])):
        for j in range(len(viz[:, 0])):
            y = vision*2 - j
            if (i + posx, j + posy) in vision_area:
                viz[y, i] = 1
            if (i + posx, j + posy) in obstacles:
                viz[y, i] = 2
            if (i + posx, j + posy) == pos:
                viz[y, i] = 3

    fig, ax = plt.subplots(figsize=(4, 4))
    cmap = mcolors.ListedColormap(['black', 'green', 'white', 'red'])
    bounds = [0, 1, 2, 3]
    norm = mcolors.BoundaryNorm(bounds, cmap.N, extend='max')
    ax.imshow(viz, cmap=cmap, norm=norm)

# Function that generates the shortest path in grid tile coordinates to a given index point on the grid
def exit_path(grid, pos, index):
    queue = collections.deque([[pos]])
    seen = set([pos])
    while queue:
        path = queue.popleft()
        x, y = path[-1]
        if grid[y][x] == index:
            return path
        for x2, y2 in ((x+1,y), (x-1,y), (x,y+1), (x,y-1)):
            if 0 <= x2 < len(grid[:, 0]) and 0 <= y2 < len(grid[0, :]) and grid[y2][x2] != 1 and (x2, y2) not in seen:
                queue.append(path + [(x2, y2)])
                seen.add((x2, y2))

# Function to generate the speed of an agent from the symmetric 2, 2 beta distribution
def beta_dist(lobound, hibound):
    # Get the randomly generated number and parameters for transformation
    speed = rngbeta.beta(2, 2)
    dfact = (hibound - lobound)

    # Transform to desired bounds and return float
    speed = speed*dfact+lobound

    return speed

# Convert a random speed number to a number of tiles moved through random number generation
def speed_convert(speed):
    # Move twice
    if rngspeed.uniform() < speed - 1 and speed >= 1:
        move = 2
    # Do not move
    elif rngspeed.uniform() < 1 - speed and speed < 1:
        move = 0
    # Normal; move once
    else:
        move = 1

    return move

# Base Model

In [40]:
# import simulation settings
settings = toml.load("settings.toml")

###################
### AGENT CLASS ###
###################

# Define the StaffAgent class
class StaffAgent(Agent):
    def __init__(self, model, agent_vision): # Default vision range of 5 cells
        super().__init__(model)  # MESA `Agent` class initialization, auto-assigns unique_id in Mesa 3.0
        # Attributes of each agent
        self.found_exit = False  # Track if agent has reached the exit
        self.previous_pos = None  # Previous position of the agent
        self.real_vision = settings['run_params']['real_vision']
        self.vision = agent_vision  # Vision range of the agent
        self.floor = None
        self.path = None # stores path to entrance?
        self.changefloor = False
        self.speed = beta_dist(settings['speed']['vr_staff'][0], settings['speed']['vr_staff'][1])

    # Function to move the agent towards the exit
    def move_towards(self, locations, maxpop=8, moore=True):
        self.previous_pos = self.pos  # Store the current position before moving
        # MESA `get_neighborhood` function retrieves nearby cells based on vision range
        possible_steps = self.model.grid.get_neighborhood(self.pos, moore=moore, include_center=False)

        min_distance = float('inf')  # Start with a very large distance
        best_step = None  # Initialize best step as None

        # Check each possible step to find the one closest to the exit
        for step in possible_steps:
            # Only consider steps that don't have obstacles and have less than 8 agents
            if step not in self.model.obstacles and len(self.model.grid.get_cell_list_contents(step)) < maxpop:
                dist = []
                for loc in locations:
                    dist.append(euclidean_distance(step, loc))
                dist = np.min(dist)  # Distance to the exit
                if dist < min_distance:
                    min_distance = dist
                    best_step = step  # Update best step to be closer to the exit
        if best_step:
            # MESA `move_agent` function moves the agent to the new cell
            self.model.grid.move_agent(self, best_step)

    # Function to move the agent towards a desired exit based on memory or signs
    def move_to_exits(self):
        self.previous_pos = self.pos
        # Call and store new path if no path is set yet, else use set path to assigned exit
        if self.path and self.changefloor == False:
            path = self.path[1:]
            self.path = path
        else:
            # Staff always go for emergency exit
            index = 4
            path = exit_path(self.model.sgrid, self.pos, index)[1:]
            self.path = path
            self.changefloor = False
        # If there is a spot open in the desired path cell use that, else move to random cell
        if len(self.model.grid.get_cell_list_contents(path[0])) < settings['run_params']['tile_pop']:
            self.model.grid.move_agent(self, path[0])
        else:
            possible_steps = self.model.grid.get_neighborhood(self.pos, moore=True, include_center=False)
            if self.floor == 0:
                valid_steps = [step for step in possible_steps if step not in self.model.obstacles and step not in self.model.stairs and len(self.model.grid.get_cell_list_contents(step)) < settings['run_params']['tile_pop']]
            else:
                valid_steps = [step for step in possible_steps if step not in self.model.obstacles and len(self.model.grid.get_cell_list_contents(step)) < settings['run_params']['tile_pop']]
            if valid_steps:
                # Randomly choose a valid position and move there
                random_step = self.random.choice(valid_steps)
                # MESA `move_agent` function moves the agent to the chosen position
                self.model.grid.move_agent(self, random_step)

    # Define the actions the agent will take in each step
    def step(self):
        move = speed_convert(self.speed)
        # If the agent is at the exit, mark as exited
        if self.pos[1] > settings['map_params']['map_yhalf']:
            self.floor = 1
        if self.pos[1] <= settings['map_params']['map_yhalf']:
            self.floor = 0
        if self.pos in self.model.exit_location:
            self.found_exit = True  # Set the agent's exit status to True
            self.model.grid.remove_agent(self)  # MESA function to remove the agent from the grid
            self.remove()  #self.remove() to remove from AgentSet
            self.model.cumulative_exited += 1  # Count this agent in cumulative exited agents
        else:
            # MESA `get_neighborhood` checks the agent's vision area for the exit
            vision_area = self.model.grid.get_neighborhood(self.pos, moore=True, radius=self.vision, include_center=False)

            # Obtain all obstacles within the vision area (2D array)
            if self.real_vision:
                vision_area, obstacles_in_sight = unblocked_vision(self.pos, vision_area, self.model.obstacles)

            # Tool for visualisation of vision area
            # ONLY USE WITH 1 AGENT AND 1 TIMESTEP
            # ------------------------------------ START -----------------------------------

            check_vis = False
            if check_vis:
                plot_vision(self.pos, self.vision, self.model.obstacles, vision_area)

            # ------------------------------------ END -----------------------------------
            # Find exits, staff, stairs, floor area within vision range
            exits = []
            for loc in self.model.exit_location:
                if loc in vision_area:
                    exits.append(loc)
            exit_in_vision = exits

            stairs = []
            for loc in self.model.stairs:
                if loc in vision_area:
                    stairs.append(loc)
            stairs_in_vision = stairs

            floor = []
            for loc in vision_area:
                if loc not in self.model.stairs and loc not in list(map(tuple, obstacles_in_sight)):
                    floor.append(loc)
            floor_in_vision = floor

            # Move towards the exit if it's in sight
            if exit_in_vision:
                self.move_towards(exit_in_vision, settings['run_params']['tile_pop'])
            # Move down the stairs if on the stairs
            elif self.pos in self.model.stairs and self.floor == 1:
                self.model.grid.move_agent(self, (self.pos[0], self.pos[1]+settings['map_params']['stairs_jump']))
                self.floor = 0
                self.changefloor = True
            # Move towards stairs if in sight
            elif self.floor == 1 and stairs_in_vision:
                self.move_towards(stairs_in_vision, settings['run_params']['tile_pop'])
            # If on the stairs on the ground floor, move away from stairs
            elif self.pos in self.model.stairs and self.floor == 0:
                self.move_towards(floor_in_vision, settings['run_params']['stair_pop'], moore=False)
            # Move to building main entrance or emergency exits if the agent can see signs
            else:
                self.move_to_exits()


# Define the MobileAgent class
class MobileAgent(Agent):
    def __init__(self, model, agent_vision): # Default vision range of 5 cells
        super().__init__(model)  # MESA `Agent` class initialization, auto-assigns unique_id in Mesa 3.0
        # Attributes of each agent
        self.found_exit = False  # Track if agent has reached the exit
        self.previous_pos = None  # Previous position of the agent
        self.real_vision = settings['run_params']['real_vision']
        self.vision = agent_vision  # Vision range of the agent
        self.floor = None
        self.path = None
        self.changefloor = False
        self.can_see_sign = np.random.random() # Gives a random value to the agent which is compared with sign_vis
        self.following_staff = None #unique id of the StaffAgent it is following
        self.sign_vis = settings['run_params']['sign_vision']
        self.speed = beta_dist(settings['speed']['vr_mobile'][0], settings['speed']['vr_mobile'][1])

    # Function to move the agent towards the exit
    def move_towards(self, locations, maxpop=8, moore=True):
        self.previous_pos = self.pos  # Store the current position before moving
        # MESA `get_neighborhood` function retrieves nearby cells based on vision range
        possible_steps = self.model.grid.get_neighborhood(self.pos, moore=moore, include_center=False)

        min_distance = float('inf')  # Start with a very large distance
        best_step = None  # Initialize best step as None

        # Check each possible step to find the one closest to the exit
        for step in possible_steps:
            # Only consider steps that don't have obstacles and have less than 8 agents
            if step not in self.model.obstacles and len(self.model.grid.get_cell_list_contents(step)) < maxpop:
                dist = []
                for loc in locations:
                    dist.append(euclidean_distance(step, loc))
                dist = np.min(dist)  # Distance to the exit
                if dist < min_distance:
                    min_distance = dist
                    best_step = step  # Update best step to be closer to the exit
        if best_step:
            # MESA `move_agent` function moves the agent to the new cell
            self.model.grid.move_agent(self, best_step)

    # Function to move the agent towards a desired exit based on memory or signs
    def move_to_exits(self):
        self.previous_pos = self.pos
        # Call and store new path if no path is set yet, else use set path to assigned exit
        if self.path and self.changefloor == False:
            path = self.path[1:]
            self.path = path
        else:
            # Decide whether to go for emergency or main exit
            if self.can_see_sign <= self.sign_vis:
                # Emergency exit indicator for path finding algorithm
                index = 4
            else:
                # Main entrance indicator for path finding algorithm
                index = 3
            path = exit_path(self.model.sgrid, self.pos, index)[1:]
            self.path = path
            self.changefloor = False
        # If there is a spot open in the desired path cell use that, else move to random cell
        if len(self.model.grid.get_cell_list_contents(path[0])) < settings['run_params']['tile_pop']:
            self.model.grid.move_agent(self, path[0])
        else:
            possible_steps = self.model.grid.get_neighborhood(self.pos, moore=True, include_center=False)
            if self.floor == 0:
                valid_steps = [step for step in possible_steps if step not in self.model.obstacles and step not in self.model.stairs and len(self.model.grid.get_cell_list_contents(step)) < settings['run_params']['tile_pop']]
            else:
                valid_steps = [step for step in possible_steps if step not in self.model.obstacles and len(self.model.grid.get_cell_list_contents(step)) < settings['run_params']['tile_pop']]
            if valid_steps:
                # Randomly choose a valid position and move there
                random_step = self.random.choice(valid_steps)
                # MESA `move_agent` function moves the agent to the chosen position
                self.model.grid.move_agent(self, random_step)

    # Define the actions the agent will take in each step
    def step(self):
        move = speed_convert(self.speed)
        # If the agent is at the exit, mark as exited
        if self.pos[1] > settings['map_params']['map_yhalf']:
            self.floor = 1
        if self.pos[1] <= settings['map_params']['map_yhalf']:
            self.floor = 0
        if self.pos in self.model.exit_location:
            self.found_exit = True  # Set the agent's exit status to True
            self.model.grid.remove_agent(self)  # MESA function to remove the agent from the grid
            self.remove()  #self.remove() to remove from AgentSet
            self.model.cumulative_exited += 1  # Count this agent in cumulative exited agents
        else:
            # MESA `get_neighborhood` checks the agent's vision area for the exit
            vision_area = self.model.grid.get_neighborhood(self.pos, moore=True, radius=self.vision, include_center=False)

            # Obtain all obstacles within the vision area (2D array)
            if self.real_vision:
                vision_area, obstacles_in_sight = unblocked_vision(self.pos, vision_area, self.model.obstacles)

            # Tool for visualisation of vision area
            # ONLY USE WITH 1 AGENT AND 1 TIMESTEP
            # ------------------------------------ START -----------------------------------

            check_vis = False
            if check_vis:
                plot_vision(self.pos, self.vision, self.model.obstacles, vision_area)

            # ------------------------------------ END -----------------------------------
            # Find exits, staff, stairs, floor area within vision range
            exits = []
            for loc in self.model.exit_location:
                if loc in vision_area:
                    exits.append(loc)
            exit_in_vision = exits

            staff_dict = {}
            for staff in self.model.staff_locations:
                if staff.pos in vision_area:
                    staff_dict[self.unique_id] = staff.pos
            staff_in_vision = staff_dict

            stairs = []
            for loc in self.model.stairs:
                if loc in vision_area:
                    stairs.append(loc)
            stairs_in_vision = stairs

            floor = []
            for loc in vision_area:
                if loc not in self.model.stairs and loc not in list(map(tuple, obstacles_in_sight)):
                    floor.append(loc)
            floor_in_vision = floor

            # Move towards the exit if it's in sight
            if exit_in_vision:
                self.move_towards(exit_in_vision, settings['run_params']['tile_pop'])
            # move towards staff if already following one
            elif self.following_staff:
                self.move_towards(self.model.staff_locations[self.following_staff])
            # Move down the stairs if on the stairs
            elif self.pos in self.model.stairs and self.floor == 1:
                self.model.grid.move_agent(self, (self.pos[0], self.pos[1]+settings['map_params']['stairs_jump']))
                self.floor = 0
                self.changefloor = True
            # Move towards stairs if in sight
            elif self.floor == 1 and stairs_in_vision:
                self.move_towards(stairs_in_vision, settings['run_params']['tile_pop'])
            # If on the stairs on the ground floor, move away from stairs
            elif self.pos in self.model.stairs and self.floor == 0:
                self.move_towards(floor_in_vision, settings['run_params']['stair_pop'], moore=False)
            # Get tagged to a staff if staff in sight and move towards it
            elif staff_in_vision:
                self.following_staff = next(iter(staff_in_vision)) # unique id of the first staff in staff_in_vision
                self.move_towards(self.model.staff_loactions[self.following_staff])
            else:
                self.move_to_exits()

###################
### MODEL CLASS ###
###################

# Define the model class to handle the overall environment
class FloorPlanModel(Model):
    def __init__(self, num_staff, num_agents, tot_agents, agent_vision):
        super().__init__()  # `Model` class initialization

        # Basic model settings
        self.num_staff = num_staff
        self.num_agents = num_agents
        self.agent_vision = agent_vision
        self.total_agents = tot_agents
        self.grid = MultiGrid(settings['map_params']['grid_x'], settings['map_params']['grid_y'], False)  # MESA grid with dimensions; False means no wrapping
        self.static_grid = None

        # -----------------------------------------------------------------------------------------------------------------------------
        # Initiate the floorplan and mutate the csv to fit the required array
        # -----------------------------------------------------------------------------------------------------------------------------
        floor0 = pd.read_csv(settings['run_params']['floorplan'], header=None)
        floor0 = pd.DataFrame.to_numpy(floor0)
        floor0 = np.flipud(floor0)

        # Make lists of all the different tile types
        GreenTiles = []
        BlackTiles = []
        WhiteTiles = []
        BlueTiles = []
        PurpleTiles = []
        YellowTiles = []
        OrangeTiles = []
        RedTiles = []

        for i in range(np.shape(floor0)[0]):
            for j in range(np.shape(floor0)[1]):
                if floor0[i, j] == 0:
                    GreenTiles.append((j, i))
                if floor0[i, j] == 1:
                    BlackTiles.append((j, i))
                if floor0[i, j] == 2:
                    WhiteTiles.append((j, i))
                if floor0[i, j] == 3:
                    BlueTiles.append((j, i))
                if floor0[i, j] == 4:
                    PurpleTiles.append((j, i))
                if floor0[i, j] == 5:
                    YellowTiles.append((j, i))
                if floor0[i, j] == 6:
                    OrangeTiles.append((j, i))
                if floor0[i, j] == 7:
                    RedTiles.append((j, i))

        # Save tile type lists to class objects
        self.outdoors = GreenTiles
        self.obstacles = BlackTiles
        self.indoors = WhiteTiles + BlueTiles
        self.exit_location = PurpleTiles
        self.stairs = YellowTiles
        self.lifts = OrangeTiles

        self.staff_locations = self.get_staff_locations()

        # Adapt grid for maze finding algorithm
        # Outside + obstacles = 1, Walking space = 2, entrance finding = 3, emergency exit finding = 4
        np.set_printoptions(threshold=sys.maxsize)
        floor0[floor0 == 0] = 1
        floor0[(floor0 == 3) | (floor0 == 6) | (floor0 == 7)] = 2
        topfloor = floor0[63:127]
        topfloor[topfloor == 5] = 4
        floor0[63:127] = topfloor
        floor0[30, 125] = 3
        floor0[90, 45] = 3
        self.sgrid = floor0

        # -----------------------------------------------------------------------------------------------------------------------------
        # Initiate the spawn map and mutate the csv to fit the required array
        # -----------------------------------------------------------------------------------------------------------------------------
        spawn = pd.read_csv("spawnmap.csv", header=None)
        spawn = pd.DataFrame.to_numpy(spawn)
        spawn = np.flipud(spawn)
        self.spawnmap = spawn

        # Initialize cumulative exited count
        self.cumulative_exited = 0

        # Initialize DataCollector (MESA tool for tracking metrics across steps)
        self.datacollector = DataCollector(
            model_reporters={
                "Active Agents": lambda m: len(m.agents),  # Count of agents still active
                "Exited Agents": lambda m: sum(1 for agent in m.agents if agent.found_exit == True),
                "Cumulative Exited Agents": lambda m: m.cumulative_exited,  # Cumulative exited count
                "Staff per Cell": self.count_staff_per_cell, # Counts staff in each cell
                "Agents per Cell": self.count_agents_per_cell  # Counts agents in each cell
            },
            agent_reporters={
                "Found Exit": lambda a: a.found_exit if isinstance(a, (MobileAgent, StaffAgent)) else None  # Reports exit status per agent
            }
        )

        self.place_agents(agent_vision)  # Place agents on the grid
        self.datacollector.collect(self) # Collect data at the start of the simulation

    # Function to randomly place agents in the grid
    def place_agents(self, agent_vision):
        if settings['run_params']['use_map'] == False:
            for i in range(self.num_staff):
                agent = StaffAgent(self, agent_vision)
                placed = False  # Track if the agent is successfully placed
                while not placed:
                    x = self.random.randrange(self.grid.width)
                    y = self.random.randrange(self.grid.height)
                    cell_contents = self.grid.get_cell_list_contents((x, y))
                    # Only place agent if cell has no obstacles and fewer than 8 agents
                    if (x, y) in self.indoors and len(cell_contents) < settings['run_params']['tile_pop']:
                        self.grid.place_agent(agent, (x, y))  # MESA function to place agent in grid
                        placed = True  # Mark as placed

            for i in range(self.num_agents):
                agent = MobileAgent(self, agent_vision)
                placed = False  # Track if the agent is successfully placed
                while not placed:
                    x = self.random.randrange(self.grid.width)
                    y = self.random.randrange(self.grid.height)
                    cell_contents = self.grid.get_cell_list_contents((x, y))
                    # Only place agent if cell has no obstacles and fewer than 8 agents
                    if (x, y) in self.indoors and len(cell_contents) < settings['run_params']['tile_pop']:
                        self.grid.place_agent(agent, (x, y))  # MESA function to place agent in grid
                        placed = True  # Mark as placed
        else:
            for i in range(self.total_agents):
                placed = False
                while not placed:
                    x = self.random.randrange(self.grid.width)
                    y = self.random.randrange(self.grid.height)
                    cell_contents = self.grid.get_cell_list_contents((x, y))
                    # Only place agent if cell has no obstacles and fewer than 8 agents
                    if (x, y) in self.indoors and len(cell_contents) < settings['run_params']['tile_pop']:
                        # Get the type of room from the spawn map and its spawn distribution
                        index  = self.spawnmap[y, x]
                        frac_mobile = settings['spawn_rules'][str(index)]['mobile']
                        frac_wheelchair = settings['spawn_rules'][str(index)]['wheelchair']
                        frac_staff = settings['spawn_rules'][str(index)]['staff']

                        # Decide on the type of agent
                        agent_frac = np.random.random()
                        if agent_frac <= frac_mobile:
                            agent = MobileAgent(self, agent_vision)
                        elif agent_frac <= frac_mobile + frac_wheelchair:
                            # TODO: change to wheelchair agent
                            agent = MobileAgent(self, agent_vision)
                        elif agent_frac <= frac_mobile + frac_wheelchair + frac_staff:
                            agent = StaffAgent(self, agent_vision)

                        # Place agent
                        self.grid.place_agent(agent, (x, y))  # MESA function to place agent in grid
                        placed = True  # Mark as placed


    # Function to count staff in each cell
    def count_staff_per_cell(self):
        staff_counts = {}  # Dictionary to store staff counts by cell position
        # MESA `coord_iter` function iterates over grid cells and their contents
        for cell in self.grid.coord_iter():
            cell_contents, (x, y) = cell  # Unpack cell contents and coordinates
            # Count StaffAgents in each cell
            staff_agent_count = sum(1 for obj in cell_contents if isinstance(obj, StaffAgent))
            if staff_agent_count > 0:
                staff_counts[(x, y)] = staff_agent_count
        return staff_counts

    # Function to count agents in each cell
    def count_agents_per_cell(self):
        agent_counts = {}  # Dictionary to store agent counts by cell position
        # MESA `coord_iter` function iterates over grid cells and their contents
        for cell in self.grid.coord_iter():
            cell_contents, (x, y) = cell  # Unpack cell contents and coordinates
            # Count MobileAgents in each cell
            nav_agent_count = sum(1 for obj in cell_contents if isinstance(obj, MobileAgent))
            if nav_agent_count > 0:
                agent_counts[(x, y)] = nav_agent_count
        return agent_counts

    # Function to get the staff location
    def get_staff_locations(self):
        staff_locations_dict = {}
        for agent in self.agents:
            if isinstance(agent, StaffAgent):
                staff_locations_dict[agent.unique_id] = agent.pos
        return staff_locations_dict

    # Function to get the grid data for visualization
    def get_grid(self):
        grid_data = np.zeros((self.grid.height, self.grid.width))
        # Mark obstacles on the grid
        for x, y in self.obstacles:
            grid_data[y, x] = 1
        for x, y in self.indoors:
            grid_data[y, x] = 2

        # Mark special areas on the grid
        for x, y in self.stairs:
            grid_data[y, x] = 5
        for x, y in self.lifts:
            grid_data[y, x] = 6

        # Mark staff on the grid
        for agent in self.agents:
            if isinstance(agent, StaffAgent):
                x, y = agent.pos
                grid_data[y, x] = 7

        # Mark agents on the grid
        for agent in self.agents:
            if isinstance(agent, MobileAgent):
                x, y = agent.pos
                grid_data[y, x] = 8

        # Mark exit
        for x, y in self.exit_location:
            grid_data[y, x] = 4
        self.static_grid = grid_data

        return grid_data

    # Model step function to update the simulation
    def step(self):
        self.agents.do("step")  # MESA 3.0 function to execute the `step` function of each agent
        self.datacollector.collect(self)  # MESA DataCollector collects metrics at each step



In [41]:
# Run the model and see the results
model = FloorPlanModel(settings['run_params']['n_staff'], settings['run_params']['n_mobile'], settings['run_params']['n_total'], settings['run_params']['vis_range']) # run the model with 30x30 grid and 15 staff and 100 agents and a vision of 5
for i in range(settings['run_params']['steps']): # run the model for 100 steps
    model.step() # step the model by 1

# Collect the data from the model
model_data = model.datacollector.get_model_vars_dataframe()
agent_data = model.datacollector.get_agent_vars_dataframe()

In [42]:
agent_data # print the agent data

Unnamed: 0_level_0,Unnamed: 1_level_0,Found Exit
Step,AgentID,Unnamed: 2_level_1
0,1,False
0,2,False
0,3,False
0,4,False
0,5,False
...,...,...
10,96,False
10,97,False
10,98,False
10,99,False


In [43]:
model_data # print the model data

Unnamed: 0,Active Agents,Exited Agents,Cumulative Exited Agents,Staff per Cell,Agents per Cell
0,100,0,0,"{(1, 123): 1, (7, 51): 1, (8, 114): 1, (14, 11...","{(1, 52): 1, (2, 54): 1, (3, 56): 1, (15, 119)..."
1,100,0,0,"{(2, 123): 1, (6, 51): 1, (9, 114): 1, (15, 11...","{(0, 51): 1, (3, 54): 1, (4, 56): 1, (16, 119)..."
2,99,0,1,"{(3, 123): 1, (5, 51): 1, (9, 115): 1, (16, 11...","{(4, 54): 1, (4, 55): 1, (17, 115): 1, (17, 11..."
3,99,0,1,"{(4, 51): 1, (4, 123): 1, (9, 116): 1, (17, 11...","{(4, 54): 1, (5, 54): 1, (17, 116): 1, (18, 11..."
4,99,0,1,"{(3, 51): 1, (5, 123): 1, (9, 117): 1, (17, 11...","{(4, 53): 1, (5, 53): 1, (17, 117): 1, (19, 11..."
5,99,0,1,"{(2, 51): 1, (5, 122): 1, (9, 118): 1, (17, 54...","{(4, 52): 2, (17, 118): 1, (20, 119): 1, (22, ..."
6,98,0,2,"{(1, 51): 1, (5, 121): 1, (9, 119): 1, (17, 55...","{(3, 51): 2, (17, 119): 1, (21, 52): 1, (21, 1..."
7,98,0,2,"{(0, 51): 1, (5, 120): 1, (8, 119): 1, (16, 55...","{(2, 51): 2, (18, 119): 1, (20, 52): 1, (22, 1..."
8,97,0,3,"{(4, 120): 1, (7, 119): 1, (15, 55): 1, (17, 1...","{(1, 51): 2, (19, 52): 1, (19, 119): 1, (23, 1..."
9,97,0,3,"{(3, 120): 1, (6, 119): 1, (14, 55): 1, (17, 1...","{(0, 51): 2, (18, 52): 1, (20, 119): 1, (24, 7..."


# Visualization with User Interface
You don't have to understand every single line in the visualisation code below. Please understand the code to extend that you can introduce changes to it when needed

In [44]:
# Visualization Function
def plot_grid(model, ax):
    rc("animation", embed_limit=4096)  # Set a higher limit in MB to allow smoother animation playback
    grid_data = model.get_grid()  # Retrieve the current state of the grid from the model
    ax.clear()  # Clear any previous plots on the axes to prevent overlap in visualizations

    # Define color mappings:
    # 0 (outdoors) -> green, 1 (obstacle) -> black, 2 (indoors) -> white,
    # 3 (door) -> white, 4 (exit) -> purple/mediumorchid, 5 (yellow) -> stairs, 6 (lifts) -> orange, 7 (signs) -> red, 8 (agent) -> blue, 9 (staff) -> pink
    cmap = mcolors.ListedColormap(['green', 'black', 'white', 'white', 'mediumorchid', 'yellow', 'orange', 'red', 'blue'])
    bounds = [0, 1, 2, 3, 4, 5, 6, 7, 8]  # Boundaries to separate each category
    norm = mcolors.BoundaryNorm(bounds, cmap.N, extend='max')  # Normalizes values to assign colors to each category

    # Display the grid data with color mapping applied.
    # Setting 'origin' to 'lower' places the (0,0) coordinate at the bottom-left.
    ax.imshow(grid_data, cmap=cmap, norm=norm, origin='lower')

    # Customize grid display: set grid to start from -0.5 with labels at intervals of 1
    # Set minor ticks for cell boundaries, with thicker and darker lines
    ax.set_xticks(np.arange(-0.5, model.grid.width, 1), minor=True)
    ax.set_yticks(np.arange(-0.5, model.grid.height, 1), minor=True)
    ax.grid(which='minor', color='black', linestyle='-', linewidth=0.3)  # Thicker, darker lines for cell boundaries

    # Set major ticks for labels at intervals of 5, with lighter and thinner lines
    ax.set_xticks(np.arange(0, model.grid.width, 5), minor=False)
    ax.set_yticks(np.arange(0, model.grid.height, 5), minor=False)
    ax.grid(which='major', color='lightgray', linestyle='-', linewidth=0)  # invisible line


    # Label the exit point on the grid in red text for easy identification
    #exit_x, exit_y = model.exit_location
    #ax.text(exit_x, exit_y, 'EXIT', ha='center', va='center', fontsize=12, color='red', fontweight='bold')

    # Add labels for each sign location in black text for visibility
    #for sign_pos in model.signs:
        #x, y = sign_pos
        #ax.text(x, y, 'SIGN', ha='center', va='center', fontsize=10, color='black', fontweight='bold')

    # Set the title to show the current step number in the model
    ax.set_title(f"Step {model.steps}", fontsize=16)
    ax.tick_params(axis='both', which='major', labelsize=7)  # Control tick label size

    # Draw movement arrows for agents to show the direction they are traveling
    for agent in model.agents:
        # Draw arrow if the agent has moved and is a StaffAgent
        if isinstance(agent, StaffAgent) and agent.previous_pos:
            start_x, start_y = agent.previous_pos  # Previous position of the agent
            end_x, end_y = agent.pos  # Current position of the agent

            # Draw an arrow from the previous position to the current position
            ax.arrow(
                start_x, start_y,
                end_x - start_x, end_y - start_y,
                head_width=0.3, head_length=0.3, fc='yellow', ec='yellow'  # Yellow arrow for movement direction
            )
        # Draw arrow if the agent has moved and is a MobileAgent
        if isinstance(agent, MobileAgent) and agent.previous_pos:
            start_x, start_y = agent.previous_pos  # Previous position of the agent
            end_x, end_y = agent.pos  # Current position of the agent

            # Draw an arrow from the previous position to the current position
            ax.arrow(
                start_x, start_y,
                end_x - start_x, end_y - start_y,
                head_width=0.3, head_length=0.3, fc='yellow', ec='yellow'  # Yellow arrow for movement direction
            )

# Animation Update Function
def update(frame, model, ax):
    # For every frame, update the model state if it's not the first frame
    if frame > 0:
        model.step()  # Run one step of the model simulation
    plot_grid(model, ax)  # Redraw the grid with updated agent positions

# Run the Animation with a larger figure size
def run_animation(model, steps):
    fig, ax = plt.subplots(figsize=(settings['run_params']['figsize'], settings['run_params']['figsize']))  # Create a 10x10 figure for the plot
    fig.tight_layout()
    plot_grid(model, ax)  # Plot the initial grid state
    # Create an animation that updates the grid for each step
    anim = FuncAnimation(fig, update, frames=steps+1, fargs=(model, ax), repeat=False)
    plt.close(fig)  # Close the figure after animation creation to avoid duplicate displays
    return anim

# Your imports and function definitions remain the same, up until where you initialize and run the model
import ipywidgets as widgets  # For interactive widgets (slider, button)
from IPython.display import display, HTML  # To display widgets and HTML animations
import time  # For tracking elapsed time

# Slider to choose the number of individual agents
if settings['run_params']['use_map'] == False:
    staff_slider = widgets.IntSlider(
        value=settings['run_params']['n_staff'],      # Default starting number of agents
        min=10,        # Minimum number of agents allowed
        max=200,       # Maximum number of agents allowed
        step=5,       # Step size for slider increments
        description='Num Staff:',  # Label for the slider
        continuous_update=False     # Only update value when slider is released
    )

    # Slider to choose the number of agents
    mobile_slider = widgets.IntSlider(
        value=settings['run_params']['n_mobile'],      # Default starting number of agents
        min=10,        # Minimum number of agents allowed
        max=500,       # Maximum number of agents allowed
        step=10,       # Step size for slider increments
        description='Num Mobile:',  # Label for the slider
        continuous_update=False     # Only update value when slider is released
    )

else:
    agent_slider = widgets.IntSlider(
        value=settings['run_params']['n_total'],      # Default starting number of agents
        min=10,        # Minimum number of agents allowed
        max=1000,       # Maximum number of agents allowed
        step=10,       # Step size for slider increments
        description='Num Agents:',  # Label for the slider
        continuous_update=False     # Only update value when slider is released
    )

# Slider to choose the number of time steps (frames) for the animation
time_step_slider = widgets.IntSlider(
    value=settings['run_params']['steps'],      # Default starting number of steps
    min=1,         # Minimum time steps allowed
    max=600,       # Maximum time steps allowed
    step=1,        # Step size for slider increments
    description='Time Steps:',  # Label for the slider
    continuous_update=False     # Only update value when slider is released
)

# Slider to choose the vision range (vision radius) of agents
vision_slider = widgets.IntSlider(
    value=settings['run_params']['vis_range'],      # Default starting vision range
    min=1,        # Minimum vision range
    max=20,       # Maximum vision range
    step=1,       # Step size for slider increments
    description='Vision:',  # Label for the slider
    continuous_update=False     # Only update value when slider is released
)

# Button to start the simulation with current slider settings
run_button = widgets.Button(description="Run Simulation")

# Output widget for displaying the animation and elapsed time
output_widget = widgets.Output()

# Label to show elapsed time during the simulation
elapsed_time_label = widgets.Label(value="Elapsed time: 0.0 seconds")

# Flag variable to control timing function
stop_timer = False

# Display the interface with sliders, button, label, and output area
if settings['run_params']['use_map'] == False:
    display(staff_slider, mobile_slider, time_step_slider, vision_slider, run_button, elapsed_time_label, output_widget)
else:
    display(agent_slider, time_step_slider, vision_slider, run_button, elapsed_time_label, output_widget)

# Define the function to initialize and run the model
def run_model(change):
    global stop_timer, model_data, agent_data  # Allow variables to be accessed globally
    with output_widget:  # Use output widget to display the animation
        output_widget.clear_output()  # Clear any previous output
        time_steps = time_step_slider.value  # Get the number of steps from the slider
        agent_vision = vision_slider.value  # Get the vision range from the slider
        if settings['run_params']['use_map'] == False:
            num_staff = staff_slider.value
            num_mobile = mobile_slider.value
            model = FloorPlanModel(num_staff=num_staff, num_agents=num_mobile, tot_agents=settings['run_params']['n_total'] , agent_vision=agent_vision)  # Initialize the model
        else:
            num_total = agent_slider.value
            model= FloorPlanModel(num_staff=settings['run_params']['n_staff'], num_agents=settings['run_params']['n_mobile'], tot_agents=num_total, agent_vision=agent_vision)


        # Reset timer flag and start timing for the animation
        stop_timer = False
        start_time = time.time()

        def update_time_label():
            while not stop_timer:  # Keep updating time label until timer stops
                elapsed_time = time.time() - start_time
                elapsed_time_label.value = f"Elapsed time: {elapsed_time:.1f} seconds"
                time.sleep(0.1)  # Update every 0.1 seconds for real-time effect

        # Start elapsed time tracking in a separate thread
        import threading
        timer_thread = threading.Thread(target=update_time_label, daemon=True)
        timer_thread.start()

        # Run the animation with selected number of steps
        anim = run_animation(model, steps=time_steps)

        # Display the animation output in HTML format
        output = HTML(anim.to_jshtml())
        display(output)

        # Stop the timer after the simulation completes
        stop_timer = True
        elapsed_time = time.time() - start_time
        elapsed_time_label.value = f"Total elapsed time: {elapsed_time:.1f} seconds"

        # Retrieve and display model and agent data after simulation completes
        model_data = model.datacollector.get_model_vars_dataframe()  # Data for the model over time
        agent_data = model.datacollector.get_agent_vars_dataframe()  # Data for each agent over time
    return model, model_data, agent_data  # Return model and data for further inspection

# Attach the run_model function to the run button click event
run_button.on_click(run_model)

IntSlider(value=100, continuous_update=False, description='Num Agents:', max=600, min=10, step=10)

IntSlider(value=10, continuous_update=False, description='Time Steps:', max=600, min=1)

IntSlider(value=10, continuous_update=False, description='Vision:', max=20, min=1)

Button(description='Run Simulation', style=ButtonStyle())

Label(value='Elapsed time: 0.0 seconds')

Output()

In [None]:
print("\nModel Data:")
model_data

In [None]:
print("\nAgent Data:")
agent_data.head(40)


In [None]:
#  Uncomment the code below to display the 'Agents per Cell' data for each step
