In [None]:
import os
import sys
import pickle
from pathlib import Path

import fdsvismap as fv
import jupedsim as jps
import matplotlib.pyplot as plt
import pedpy
import shapely
from matplotlib.patches import Circle
from shapely import Polygon, from_wkt
import numpy as np
from jupedsim.internal.notebook_utils import animate, read_sqlite_file
from typing import Any, List, Tuple

## TODO
- Fix journeys

## Config parameters

In [None]:
num_agents = 20
premovement_time = 1 # seconds
v0 = 1.0
seed = 1
c0 = 3.0
# Set times when the simulation should be evaluated
update_time = 1
max_vis_simulation_time = 1000 # FDS simulation time in seconds
times = range(premovement_time, max_vis_simulation_time, update_time)
trajectory_file = f"output_N{num_agents}.sqlite"  # output file

exits = [
    #left
    Polygon([(2, 15.5), (3, 15.5), (3, 16.5), (2, 16.5), (2, 15.5)]),
    # right
    Polygon([(25, 15.5), (27, 15.5), (27, 16.5), (25, 16.5), (25, 15.5)]),
]   
primary_exit = (exits[1].centroid.xy[0][0], exits[1].centroid.xy[1][0])
secondary_exit = (exits[0].centroid.xy[0][0], exits[0].centroid.xy[1][0])

distance_to_waypoints = 0.5
# waypoints: (x, y, angle)
waypoints = [
    (13.5, 8.5, 0), #1
    (10.5, 4.5, 180), #2
    (18.5, 6.5, 270), #3
    (25, 14.5, 180), #4
    (4, 6.5, 90), #5
    (2.5, 14.5, 180), #6 
]

## Vismap config

In [None]:
def load_or_compute_vis(sim_dir, waypoints, exits, times, pickle_path):
    if pickle_path.is_file():
        try:
            with open(pickle_path, 'rb') as file:
                vis = pickle.load(file)
            print("Vis object loaded successfully.")
        except FileNotFoundError:
            sys.exit(f"No file found at {pickle_path}, please check the file path.")
            
        except Exception as e:
            sys.exit(f"An error occurred while loading the visualization: {e}")
    else:
        # Read and process data if not existing
        vis = fv.VisMap()
        vis.read_fds_data(str(sim_dir))
        vis.set_start_point(8, 8)

        for wp in waypoints:
            vis.set_waypoint(x=wp[0], y=wp[1], c=c0, alpha=wp[2])

        # Set waypoint for exit
        #vis.set_waypoint(x=exits[1].centroid.xy[0][0], y=exits[1].centroid.xy[1][0], c=c0, alpha=180)

        # Define time points and compute the visualization
        vis.set_time_points(times)
        vis.compute_all()

        # Save results to pickle file
        pickle_path.parent.mkdir(exist_ok=True)  # Ensure directory exists
        try:
            # Serialize the visualization object to a pickle file
            with open(pickle_path, 'wb') as file:
                pickle.dump(vis, file)
                print("Visualization saved successfully.")
        except Exception as e:
            sys.exit(f"An error occurred while saving the visualization: {e}")

    return vis

project_root = Path(os.path.abspath('')).parent
sim_dir = project_root / "fds_data"
pickle_path = project_root / "processed_data" / "vismap.pkl"
vis = load_or_compute_vis(sim_dir, waypoints, exits, times, pickle_path)

In [None]:
fig, ax = vis.create_aset_map_plot(plot_obstructions=True)

In [None]:
fig, ax = vis.create_time_agg_wp_agg_vismap_plot(plot_obstructions=True)

# Check if waypoint is visible from given location at given time

time =  1
x = 6
y =  12#11.32
for wp_id, _ in enumerate(waypoints):
    print(f"{wp_id = }")
    print(vis.wp_is_visible(time=2, x=x, y=y, waypoint_id=wp_id))
    # Get distance from waypoint to given location
    print(f"Distance: {vis.get_distance_to_wp(x=x, y=y, waypoint_id=wp_id):.2f} [m]")
    # Calculate local visibility at given location and time, considering a specific c factor
    print("local visibility:", vis.get_local_visibility(time=100, x=x, y=y, c=3))
    print("----")

In [None]:
# v= []
# for t in times:
#     v.append(vis.get_local_visibility(time=t, x=25, y=12, c=3))
#     #v.append(vis.get_local_visibility(time=t, x=7, y=4, c=3))

# plt.plot(times, v)
# plt.xlabel('Time [s]')
# plt.ylabel('Local Visibility')

In [None]:
def calculate_desired_speed(v, c, max_speed, range=2.0):
    # Calculate the proximity of v to c, and adjust speed accordingly
    if v <= c:
        return 0
    else:
        # Create a linear relation or any other functional form as needed
        # This example uses an exponential decay to decrease speed as v approaches c
        return max_speed * (1 - np.exp(-(v - c)/range ))
    
    
# desired_speeds = [calculate_desired_speed(visibility, 3, max_speed=1.0) for visibility in v]
# plt.plot(times,desired_speeds)
# plt.xlabel('Time [s]')
# plt.ylabel('Desired Speed [m/s]')


In [None]:
# CO2 = np.arange(0, 10, 0.001) 
# HV = 0.141 * np.exp(0.1930* CO2+ 2.0004)
# plt.plot(CO2, HV)



## Definition of Start Positions and Exit

Now we define the spawning area and way points for the leader to follow.

In [None]:
def plot_simulation_configuration(
    walkable_area, starting_positions, exits, path1=None, path2=None
):
    axes = pedpy.plot_walkable_area(walkable_area=walkable_area)
    for exit_area in exits:
        axes.fill(*exit_area.exterior.xy, color="indianred")

    for starting_position in starting_positions:
      
        axes.scatter(*zip(*starting_position), s=1, color='gray')    

    axes.set_xlabel("x/m")
    axes.set_ylabel("y/m")
    axes.set_aspect("equal")
    for idx, waypoint in enumerate(waypoints):
        axes.plot(waypoint[0], waypoint[1], "go")
        axes.annotate(
            f"WP {idx}",
            (waypoint[0], waypoint[1]),
            textcoords="offset points",
            xytext=(10, -15),
            ha="center",
        )
        circle = Circle(
            (waypoint[0], waypoint[1]),
            distance_to_waypoints,
            fc="green",
            ec="green",
            alpha=0.1,
        )
        axes.add_patch(circle)


    if path1:
        x_coords = [point[0] for point in path1]
        y_coords = [point[1] for point in path1]
        for idx, point in enumerate(path1):
            axes.plot(point[0], point[1], "bo",  markerfacecolor='none', markeredgewidth=1, markeredgecolor='blue')
        axes.plot(x_coords, y_coords, "b--")
   
    if path2:
        x_coords = [point[0] for point in path2]
        y_coords = [point[1] for point in path2]
        for idx, point in enumerate(path2):
            axes.plot(point[0], point[1], "go", markerfacecolor='none', markeredgewidth=1, markeredgecolor='red')
        axes.plot(x_coords, y_coords, "r--")
             

In [None]:
with open("geometry.wkt", "r") as file:
    data = file.readlines()

wkt_data = from_wkt(data)
area = wkt_data[0]
obstacles = wkt_data[1:]
obstacle = shapely.union_all(obstacles)
walkable_area = pedpy.WalkableArea(shapely.difference(area, obstacle))
routing = jps.RoutingEngine(walkable_area.polygon)
spawning_area1 = Polygon([(0, 0), (0, 5), (15, 5), (15, 0)])
spawning_area2 = Polygon([(4.1, 8), (19, 8), (19, 14.5), (4.1, 14.5)])
pos_in_spawning_areas = [
    jps.distributions.distribute_by_number(
        polygon=spawning_area2,
        number_of_agents=num_agents,
        distance_to_agents=0.4,
        distance_to_polygon=0.3,
        seed=seed,
    ),
    jps.distributions.distribute_by_number(
        polygon=spawning_area1,
        number_of_agents=num_agents,
        distance_to_agents=0.4,
        distance_to_polygon=0.3,
        seed=seed,
    ),
]
#starting_point = pos_in_spawning_areas[1][1]
starting_point = (6.4, 9)
path1 = routing.compute_waypoints(starting_point, primary_exit)
path2 = routing.compute_waypoints(starting_point, secondary_exit)
plot_simulation_configuration(
    walkable_area, pos_in_spawning_areas, exits, path1, path2=None
)

In [None]:
import math
def distance(point1, point2):
    return math.sqrt((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2)

def get_next_waypoint(point, waypoints):
    """Return the waypoint that is closest to point."""
    min_distance = float('inf')
    next_waypoint = None
    next_waypoint_id = -1
    #print(point)
    for wp in waypoints:
        d = distance(point, wp)
        if d < min_distance:
            min_distance = d
            next_waypoint = wp
            next_waypoint_id = waypoints.index(wp)

    #print(f"Next waypoint: {next_waypoint}")
    return next_waypoint_id, next_waypoint

In [None]:
def check_and_update_journeys(
    simulation: jps.Simulation,
    time: float,
    primary_exit: Tuple[float, float],
    new_journey: int,
    waypoints: List[Tuple[float, float, float]],
    new_exit_id: int,
    vis: Any,
):
    for num, agent in enumerate(simulation.agents()):
        x, y = agent.position
        #############
        path = routing.compute_waypoints((x, y), primary_exit)
        wp_id, wp = get_next_waypoint(point=path[1], waypoints=waypoints)
        wp_is_visible = vis.wp_is_visible(time=time, x=x, y=y, waypoint_id=wp_id)
        #############
        # 1. Set the speed
        v = vis.get_local_visibility(time=time, x=x, y=y, c=c0)
        agent.model.desiredSpeed = calculate_desired_speed(v, c0, v0)
        if num == 1:
            print(
                f"{time =  :.2f}, {x = :.2f}, {y =  :.2f}, {v =  :.2f}, {agent.id = }, {wp_id = }, {wp = } {wp_is_visible =}, {agent.model.desiredSpeed = :.2f}",
                end="\n",
            )
        # 2. Redirection
        if not wp_is_visible:
            simulation.switch_agent_journey(agent.id, new_journey, new_exit_id)

## Specification of Parameters und Running the Simulation

Now we just need to define the details of the operational model as well as the exit.

In [None]:
def run_simulation(trajectory_file, walkable_area, exits, vis):
    simulation = jps.Simulation(
        model=jps.SocialForceModel(),
        geometry=walkable_area.polygon,
        trajectory_writer=jps.SqliteTrajectoryWriter(
            output_file=Path(trajectory_file)
        ),
    )
    exit_ids = [simulation.add_exit_stage(exit_area.exterior.coords[:-1]) for exit_area in exits]
    waypoint_ids = [
        simulation.add_waypoint_stage(waypoint[:-1], distance_to_waypoints)
        for waypoint in waypoints
    ]
    # Notausgang
    journey_secondary = jps.JourneyDescription([*waypoint_ids, exit_ids[0]])
    journey_secondary_id = simulation.add_journey(journey_secondary)
    # Hauptausgang
    journey_primary = jps.JourneyDescription([*waypoint_ids, exit_ids[1]])
    journey_primary_id = simulation.add_journey(journey_primary)
    # Add agents to the upper room
    ids_up = set(
        [
            simulation.add_agent(
                jps.SocialForceModelAgentParameters(
                    desiredSpeed=0,
                    radius=0.1,
                    journey_id=journey_primary_id,
                    stage_id=exit_ids[1],
                    position=pos,
                )
            )
            for pos in pos_in_spawning_areas[0]
        ]
    )
    # Add agents to the lower room
    ids_down = set(
        [
            simulation.add_agent(
                jps.SocialForceModelAgentParameters(
                    desiredSpeed=0,
                    radius=0.1,
                    journey_id=journey_primary_id,
                    stage_id=exit_ids[1],
                    position=pos,
                )
            )
            for pos in pos_in_spawning_areas[1]
        ]
    )
    #first_agents = list(ids_up)[0]
    simulation.iterate(premovement_time*int(1/simulation.delta_time()))
    i = 0
    while simulation.agent_count() > 0 and simulation.elapsed_time() < 400:#max_vis_simulation_time:
        t = simulation.elapsed_time() # seconds
        if t > times[i]:
            check_and_update_journeys(simulation=simulation, 
                                      time=t,
                                      primary_exit=primary_exit, 
                                      new_journey=journey_secondary_id, 
                                      new_exit_id=exit_ids[0], 
                                      waypoints=waypoints, vis=vis)
            i += 1
        simulation.iterate()

    print(f"Simulation finished after {simulation.elapsed_time():.2f} seconds.")

In [None]:
run_simulation(walkable_area=walkable_area, exits=exits, trajectory_file=trajectory_file, vis=vis)
trajectory_data, walkable_area = read_sqlite_file(trajectory_file)
animate(trajectory_data, walkable_area, every_nth_frame=100)

# Plot Trajectories

In [None]:
# from pedpy import plot_walkable_area
# from pedpy.column_identifier import (
#     ID_COL,
#     X_COL,
#     Y_COL,
# )

# def plot_trajectories(traj_data, walkable_area = None, axes = None, **kwargs):
#     traj_color = kwargs.pop("traj_color", 'red')
#     traj_width = kwargs.pop("traj_width", 1.0)
#     traj_alpha = kwargs.pop("traj_alpha", 1.0)

#     traj_start_marker = kwargs.pop("traj_start_marker", "")
#     traj_end_marker = kwargs.pop("traj_end_marker", "")

#     if axes is None:
#         axes = plt.gca()

#     if walkable_area is not None:
#         axes = plot_walkable_area(
#             walkable_area=walkable_area, axes=axes, **kwargs
#         )

#     for _, ped in traj_data.groupby(ID_COL):
#         axes.plot(
#             ped[X_COL],
#             ped[Y_COL],
#             alpha=traj_alpha,
#             color=traj_color,
#             linewidth=traj_width,
#         )
#         axes.scatter(
#             ped[ped.frame == ped.frame.min()][X_COL],
#             ped[ped.frame == ped.frame.min()][Y_COL],
#             color=traj_color,
#             marker=traj_start_marker,
#         )
#         axes.scatter(
#             ped[ped.frame == ped.frame.max()][X_COL],
#             ped[ped.frame == ped.frame.max()][Y_COL],
#             color=traj_color,
#             marker=traj_end_marker,
#         )

#     axes.set_xlabel(r"x/m")
#     axes.set_ylabel(r"y/m")

#     return axes



# trajectory_data_blue = trajectory_data.data[trajectory_data.data['id'].isin(smoke_non_affected_agent_ids)]
# trajectory_data_red = trajectory_data.data[trajectory_data.data['id'].isin(smoke_affected_agent_ids)]

# plot_trajectories(
#     traj_data=trajectory_data_blue,
#     walkable_area= walkable_area,
#     traj_alpha=0.5,
#     traj_width=1,
#     traj_start_marker='.',
#     traj_color='blue'
# ).set_aspect("equal")
# plot_trajectories(
#     traj_data=trajectory_data_red,
#     traj_alpha=0.5,
#     traj_width=1,
#     traj_start_marker='.',
#     traj_color='red'
# ).set_aspect("equal")
# plt.plot([], [], 'r', label='smoke affected agents')
# plt.plot([], [], 'b', label='smoke non affected agents')
# plt.legend()
# plt.xlim(0, 30)
# plt.ylim(0, 15)
# plt.xlabel("X / m")
# plt.ylabel("Y / m")
# plt.yticks([0, 5, 10, 15])
# plt.tight_layout()
# plt.savefig('trajectories.png', dpi=300, bbox_inches='tight')