Cell 1: Import Libraries

This cell imports all the necessary Python libraries.

*shapely: Used for geometric operations.
*pathlib: Handles filesystem paths.
*matplotlib.pyplot: Used for plotting and visualizing the simulation setup.
*jupedsim, pedpy: The core simulation and pedestrian analysis libraries.
*numpy: Provides support for numerical arrays and fast mathematical operations.
*time: Used to measure the simulation's runtime.
*functools.lru_cache: A decorator for caching function results, which speeds up file loading.
*concurrent.futures.ThreadPoolExecutor: Enables parallel processing for faster agent distribution.
*numba: A JIT (Just-In-Time) compiler that significantly speeds up Python functions by compiling them to machine code.

In [None]:
import shapely
import pathlib
import matplotlib.pyplot as plt
import jupedsim as jps
import pedpy
import numpy as np
from shapely.geometry import Point, Polygon
from jupedsim.internal.notebook_utils import animate, read_sqlite_file
import time
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor
import numba

Cell 2: Load Geometry

This cell loads the building geometry from the HC.wkt file and caches it.

*The lru_cache decorator ensures the file is read only once, even if the function is called multiple times.

*The geometry is used to define the walkable area for the simulation.

In [None]:
@lru_cache(maxsize=1)
def load_geometry(file_path):
    """Load geometry from a WKT file with caching."""
    with open(file_path) as f:
        return shapely.from_wkt(f.readline())

geometry_file = "HC.wkt"
area = load_geometry(geometry_file)
walkable_area = pedpy.WalkableArea(area)

Cell 3: Define Exits

This cell defines the exit areas as a list of shapely.Polygon objects and computes their centroids. The centroids are used later for distance calculations.

In [None]:
def create_exits():
    """Create exit areas as polygons and precompute centroids."""
    exit_polygons = [
        Polygon([(30, 13), (32, 13), (32, 11.5), (30, 11.5)]),  # Exit 1
        Polygon([(54, 28.64), (55.5, 28.64), (55.5, 26.67), (54, 26.67)]),  # Exit 2
        Polygon([(11.5, 18.8), (13.5, 18.8), (13.5, 20.5), (11.5, 20.5)]),  # Exit 3
        Polygon([(19, 37.5), (21, 37.5), (21, 36.5), (19, 36.5)]),  # Exit 4
        Polygon([(37, 13), (40.25, 13), (40.25, 11.5), (37, 11.5)]),  # Exit 5
        Polygon([(45, 13), (43.5, 13), (43.5, 11.5), (45, 11.5)]),  # Exit 6
        Polygon([(49.15, 13), (50.25, 13), (50.25, 11.5), (49.15, 11.5)]),  # Exit 7
        Polygon([(51.5, 13), (52.5, 13), (52.5, 11.5), (51.5, 11.5)]),  # Exit 8
        Polygon([(30, 37.5), (32, 37.5), (32, 36.5), (30, 36.5)]),  # Exit 9
    ]
    
    # Precompute centroids for plotting and distance calculation
    exit_centroids = np.array([exit_area.centroid.coords[0] for exit_area in exit_polygons])
    return exit_polygons, exit_centroids

exit_areas, exit_centroids = create_exits()

Cell 4: Define Spawn Areas

This cell defines the starting points for the pedestrians. Each Polygon represents a spawn area, and the num_agents list determines how many agents will be created in each respective area.

In [None]:
spawning_areas = [
    Polygon([(28, 35), (36, 35), (36, 15), (28, 15)]),  # Spawn 1
    Polygon([(12, 24.5), (26.5, 24.5), (26.5, 23), (12, 23)]),  # Spawn 2
    Polygon([(36, 28), (52, 28), (52, 26.5), (36, 26.5)]),  # Spawn 3
    Polygon([(51, 20), (55, 20), (55, 17.5), (51, 17.5)]),  # Spawn 4
    Polygon([(12, 12), (28, 12), (28, 18), (12, 18)]),  # Spawn 5
    Polygon([(37, 25), (51, 25), (51, 21), (37, 21)]),  # Spawn 6
]

num_agents = [300, 20, 20, 10, 150, 100]  # Number of agents for each spawning area

Cell 5: Optimized Agent Distribution

This function uses parallel processing to speed up the agent distribution. It creates agents in each spawn area simultaneously, which is much faster than doing it sequentially, especially for a large number of agents.

In [None]:
def distribute_agents_parallel(spawning_areas, num_agents):
    """Distribute agents in parallel for faster computation."""
    def distribute_single(args):
        i, spawning_area = args
        return jps.distribute_by_number(
            polygon=spawning_area,
            number_of_agents=num_agents[i],
            distance_to_agents=0.5,
            distance_to_polygon=0.5,
            seed=1,
        )
    
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(distribute_single, enumerate(spawning_areas)))
    
    return results

positions_in_spawning_areas = distribute_agents_parallel(spawning_areas, num_agents)

Cell 6: Precompute Nearest Exits

This block efficiently pre-calculates the nearest exit for each agent using NumPy. By converting the positions to a NumPy array, it can use the fast np.linalg.norm function to calculate the Euclidean distance to all exit centroids simultaneously.

In [None]:
def precompute_nearest_exits_optimized(positions_list, exit_centroids):
    """Precompute nearest exits by finding the closest exit centroid."""
    all_nearest_exits = []
    
    # Find the nearest exit index for each agent
    for positions in positions_list:
        if not positions:
            all_nearest_exits.append([])
            continue
            
        # Convert the list of tuples to a NumPy array for efficient calculation
        positions_array = np.array(positions)
            
        distances = np.linalg.norm(positions_array[:, np.newaxis] - exit_centroids, axis=2)
        nearest_exit_indices = np.argmin(distances, axis=1)
        all_nearest_exits.append(nearest_exit_indices.tolist())

    return all_nearest_exits

nearest_exits_per_spawn = precompute_nearest_exits_optimized(positions_in_spawning_areas, exit_centroids)

Cell 7: Define Scenarios

This cell defines different simulation scenarios. The @numba.njit decorator on the find_nearest_exit_index function is a key optimization here. It compiles the function to machine code, making the exit selection process incredibly fast. The exit_assignments list is crucial for solving the "wrong nearest exit" problem by manually assigning logical exits to each spawn area.

In [None]:
# A helper function to find the nearest exit among a subset of available exits
@numba.njit
def find_nearest_exit_index(position, available_exits):
    min_distance = float('inf')
    nearest_index = -1
    for i in available_exits:
        distance = np.linalg.norm(position - exit_centroids[i])
        if distance < min_distance:
            min_distance = distance
            nearest_index = i
    return nearest_index

scenarios = {
    "Scenario 1": {
        "active_spawns": [False, True, False, False, False, False],
        "exit_assignments": [None, [3], None, None, None, None],
        "T_motivation": 5,
        "speed_mean": 1.2,
        "speed_std": 0.05,
    },
    "Scenario 2": {
        "active_spawns": [False, True, False, False, False, False],
        "exit_assignments": [None, [0], None, None, None, None],
        "T_motivation": 5,
        "speed_mean": 1.2,
        "speed_std": 0.05,
    },
    # This scenario demonstrates how to handle the "wrong nearest exit" problem
    # by specifying only the logically correct exits for each spawn area.
    "Scenario 3": {
        "active_spawns": [True, False, True, True, True, True],
        "exit_assignments": [None, None, [3], [0, 1], [4, 5, 6, 7], [4, 5, 6, 7]],
        "T_motivation": 5,
        "speed_mean": 1.2,
        "speed_std": 0.05,
    },
    "Scenario 4": {
        "active_spawns": [False, False, True, True, True, True],
        "exit_assignments": [None, None, [3], [0, 1], [4, 5, 6, 7], [4, 5, 6, 7]],
        "T_motivation": 5,
        "speed_mean": 1.2,
        "speed_std": 0.05,
    },
    "Scenario 5": {
        "active_spawns": [True, True, True, True, True, True],
        "exit_assignments": None,
        "T_motivation": 5,
        "speed_mean": 1.2,
        "speed_std": 0.05,
    },
    "Scenario 6": {
        "active_spawns": [True, True, True, True, True, True],
        "exit_assignments": None,
        "T_motivation": 5,
        "speed_mean": 2,
        "speed_std": 0.05,
    },
}

Cell 8: Plotting Function

This function generates a visual plot of the simulation's initial configuration, showing the walkable area, exits, and agent spawn points.

In [None]:
def plot_simulation_configuration_optimized(walkable_area, exit_areas, spawning_areas, 
                                          positions_in_spawning_areas, active_spawns, exit_assignments):
    """Optimized plot function with reduced overhead."""
    fig, axes = plt.subplots(figsize=(10, 8))
    
    pedpy.plot_walkable_area(walkable_area=walkable_area, ax=axes)
    
    for i, (area, centroid) in enumerate(zip(exit_areas, exit_centroids)):
        axes.fill(*area.exterior.xy, color="indianred", alpha=0.7)
        axes.text(centroid[0], centroid[1], f'Exit {i+1}', fontsize=8, ha='center')
    
    for i, (area, positions, active) in enumerate(zip(spawning_areas, positions_in_spawning_areas, active_spawns)):
        if active:
            axes.fill(*area.exterior.xy, color="grey", alpha=0.3)
            centroid = area.centroid
            axes.text(centroid.x, centroid.y, f'Spawn {i+1}', fontsize=8, ha='center')
            
            pos_array = np.array(positions)
            axes.scatter(pos_array[:, 0], pos_array[:, 1], s=10, alpha=0.7)
    
    axes.set_xlabel("x/m")
    axes.set_ylabel("y/m")
    axes.set_aspect("equal")
    plt.tight_layout()
    plt.show()

Cell 9: Animation Function

This function reads the simulation data from the SQLite file and creates a video animation of the evacuation.

In [None]:
def animate_trajectory_optimized(trajectory_file, evacuation_time, scenario_name):
    """Optimized trajectory animation with reduced frame processing."""
    trajectory_data, walkable_area = read_sqlite_file(trajectory_file)
    
    animate(
        trajectory_data,
        walkable_area,
        title_note=f"Scenario: {scenario_name}, Time needed: {evacuation_time:.2f} seconds",
        every_nth_frame=20,
    ).show()

Cell 10: Simulation Initialization

This cell sets up the simulation environment, including the model, geometry, and trajectory writer. It also defines the "journeys" for the agents, which are the paths to each of the exits.

In [None]:
def initialize_simulation_optimized(scenario_name):
    """Optimized simulation initialization."""
    simulation = jps.Simulation(
        model=jps.CollisionFreeSpeedModel(),
        geometry=area,
        trajectory_writer=jps.SqliteTrajectoryWriter(
            output_file=pathlib.Path(f"HC_{scenario_name}.sqlite")
        ),
    )

    exit_stages = [simulation.add_exit_stage(exit_area) for exit_area in exit_areas]
    journeys = [jps.JourneyDescription([stage]) for stage in exit_stages]
    journey_ids = [simulation.add_journey(journey) for journey in journeys]

    return simulation, exit_stages, journey_ids, walkable_area

Cell 11: Main Simulation Loop

This is the core logic of the script. It iterates through the scenarios, adds agents to the simulation with their assigned exits, and runs the simulation loop until all agents have exited or a maximum number of iterations is reached. The corrected logic for handling None exit assignments is crucial here to prevent errors.

In [None]:
def run_simulation_optimized(scenario_name, scenario_config):
    """Optimized simulation runner."""
    active_spawns = scenario_config["active_spawns"]
    exit_assignments = scenario_config["exit_assignments"]
    T_motivation = scenario_config["T_motivation"]
    speed_mean = scenario_config["speed_mean"]
    speed_std = scenario_config["speed_std"]

    simulation, exit_stages, journey_ids, walkable_area = initialize_simulation_optimized(scenario_name)

    print(f"Configuration for {scenario_name}:")
    plot_simulation_configuration_optimized(
        walkable_area, exit_areas, spawning_areas, positions_in_spawning_areas,
        active_spawns, exit_assignments
    )
    
    # Add agents in batches
    for i, (positions, nearest_exits) in enumerate(zip(positions_in_spawning_areas, nearest_exits_per_spawn)):
        if not active_spawns[i]:
            continue
            
        num_agents_in_spawn = len(positions)
        if num_agents_in_spawn == 0:
            continue
            
        velocities = np.random.normal(speed_mean, speed_std, num_agents_in_spawn)
        positions_array = np.array(positions)
        
        # Determine exit assignments based on the scenario configuration
        if exit_assignments is None or exit_assignments[i] is None:
            # Use precomputed nearest exits when no specific assignment is given for the scenario or this spawn
            spawn_exit_ids = nearest_exits
        elif isinstance(exit_assignments[i], list):
            # Find the nearest exit from a given list of available exits
            spawn_exit_ids = [find_nearest_exit_index(pos, exit_assignments[i]) for pos in positions_array]
        else:
            # Assign all agents to a single specified exit
            spawn_exit_id = exit_assignments[i]
            spawn_exit_ids = [spawn_exit_id] * num_agents_in_spawn
            
        # Add all agents for this spawn area
        for j in range(num_agents_in_spawn):
            simulation.add_agent(
                jps.CollisionFreeSpeedModelAgentParameters(
                    journey_id=journey_ids[spawn_exit_ids[j]],
                    stage_id=exit_stages[spawn_exit_ids[j]],
                    position=positions_array[j],
                    time_gap=T_motivation,
                    v0=velocities[j]
                )
            )

    start_time = time.time()
    dt = simulation.delta_time()
    total_iterations = 0
    max_iterations = 10000 
    
    while simulation.agent_count() > 0 and total_iterations < max_iterations:
        simulation.iterate()
        total_iterations += 1
        if total_iterations % 100 == 0:
            print(f"Iteration {total_iterations}, Agents remaining: {simulation.agent_count()}")

    evacuation_time = total_iterations * dt
    total_time = time.time() - start_time
    
    print(f"{scenario_name} evacuation time: {evacuation_time:.2f} seconds")
    print(f"Time needed to run the simulation: {total_time:.2f} seconds")
    print(f"Number of iterations: {total_iterations}")
    print("-----\n")

    trajectory_file = f"HC_{scenario_name}.sqlite"
    animate_trajectory_optimized(trajectory_file, evacuation_time, scenario_name)

    print(f"{scenario_name} complete.")
    return evacuation_time, total_time

Cell 12: Run All Scenarios

This simple loop iterates through the scenarios dictionary and runs the simulation for each one, storing the results.

In [None]:
results = {}
for scenario_name, scenario_config in scenarios.items():
    print(f"Running {scenario_name}...")
    evacuation_time, sim_time = run_simulation_optimized(scenario_name, scenario_config)
    results[scenario_name] = {
        "evacuation_time": evacuation_time,
        "simulation_time": sim_time
    }

Cell 13: Print Summary

The final cell prints the results of each simulation run, providing a clear summary of evacuation times and simulation times.

In [None]:
print("\n=== Simulation Results Summary ===")
for scenario, data in results.items():
    print(f"{scenario}: {data['evacuation_time']:.2f}s evacuation, {data['simulation_time']:.2f}s simulation time")