In [None]:
import pathlib
import os
import random
import jupedsim as jps
import pedpy
from matplotlib.patches import Circle
from shapely import GeometryCollection, Point, Polygon
from fdsvismap import VisMap
import shapely
import matplotlib.pyplot as plt
import matplotlib.patches as patches

## Config parameters

In [None]:
num_agents = 3
c0 = 3
premovement_time = 600 # seconds
v0 = 1.0
seed = 1
# Set times when the simulation should be evaluated
update_time = 50
max_vis_simulation_time = 1000
times = range(0, max_vis_simulation_time, update_time)
trajectory_file = f"output_N{num_agents}.sqlite"  # output file

exits = [
    #left
    Polygon([(2, 15.5), (3, 15.5), (3, 16.5), (2, 16.5), (2, 15.5)]),
    # right
    Polygon([(25, 15.5), (27, 15.5), (27, 16.5), (25, 16.5), (25, 15.5)]),
]   
distance_to_waypoints = 0.5

waypoints = [
    (13.5, 8.5), #180
    (10.5, 4.5), # 0
    (18.5, 6.5), #270
]

## Vismap config

In [None]:
project_root = pathlib.Path(os.path.abspath('')) / "examples" / "jupedsim_fire"
sim_dir = str(project_root / "fds_data_4")
vis = VisMap()
vis.read_fds_data(sim_dir)
vis.set_start_point(8, 8)
# Kristian: TODO: Flip the y-axis
vis.set_waypoint(x=waypoints[0][0], y=15-waypoints[0][1], c=c0, alpha=180)
vis.set_waypoint(x=waypoints[1][0], y=15-waypoints[1][1], c=c0, alpha=0)
vis.set_waypoint(x=waypoints[2][0], y=15-waypoints[2][1], c=c0, alpha=270)
vis.set_waypoint(x=exits[1].centroid.xy[0][0], y=exits[1].centroid.xy[1][0], c=c0, alpha=0)

vis.set_time_points(times)
# Do the required calculations to create the Vismap
vis.compute_all()
# pandas.to_pickle(vis, "vismap.pkl")
# if Path("vismap.pkl").is_file():
#     vis = pandas.read_pickle("vismap.pkl")
# else:


In [None]:
fig, ax = vis.create_aset_map_plot(plot_obstructions=True)

In [None]:
fig, ax = vis.create_time_agg_wp_agg_vismap(plot_obstructions=True)

# Check if waypoint is visible from given location at given time
print(vis.wp_is_visible(time=50, x=12.5, y=0.6, waypoint_id=1))

# Get distance from waypoint to given location
print(f"{vis.get_distance_to_wp(x=17, y=5, waypoint_id=1)=}")

# Calculate local visibility at given location and time, considering a specific c factor
print(vis.get_local_visibility(time=100, x=5, y=6, c=3))


In [None]:
v= []
for t in times:
    v.append(vis.get_local_visibility(time=t, x=25, y=12, c=3))
    #v.append(vis.get_local_visibility(time=t, x=7, y=4, c=3))

plt.plot(times, v)
plt.xlabel('Time [s]')
plt.ylabel('Local Visibility')
v

In [None]:
import numpy as np
def calculate_desired_speed(v, c, max_speed, range=2.0):
    # Calculate the proximity of v to c, and adjust speed accordingly
    if v <= c:
        return 0
    else:
        # Create a linear relation or any other functional form as needed
        # This example uses an exponential decay to decrease speed as v approaches c
        return max_speed * (1 - np.exp(-(v - c)/range ))
    
desired_speeds = [calculate_desired_speed(visibility, 3, max_speed=1.0) for visibility in v]
plt.plot(times,desired_speeds)
plt.xlabel('Time [s]')
plt.ylabel('Desired Speed [m/s]')


## Definition of Start Positions and Exit

Now we define the spawning area and way points for the leader to follow.

In [None]:
area = Polygon([(0, 0), (30, 0), (30, 20), (0, 20)])
obstacles = [
Polygon([(0, 5), (10, 5), (10, 5.1), (0, 5.1)]),
Polygon([(11, 5), (19, 5), (19, 5.1), (11, 5.1)]),

Polygon([(4, 8), (4.1, 8), (4.1, 15), (4, 15)]),

Polygon([(0, 15), (2.0, 15), (2.0, 15.1), (0, 15.1)]),
Polygon([(3.0, 15), (4.1, 15), (4.1, 15.1), (3.0, 15.1)]),

Polygon([(4.1, 15), (19, 15), (19, 15.1), (4.1, 15.1)]),
Polygon([(4.1, 8), (13, 8), (13, 8.1), (4.1, 8.1)]),

Polygon([(14, 8), (18.99, 8), (18.99, 8.1), (14, 8.1)]),

Polygon([(18.9, 5.0), (19.0, 5.0), (19.0, 6), (18.9, 6), (18.9, 5.0)]),
Polygon([(18.9, 0), (18.9, 5), (19.0, 5), (19, 0), (18.9,0)]),

Polygon([(18.9, 7.0), (19.0, 7.0), (19.0, 15.1), (18.9, 15.1), (18.9, 7.0)]),
Polygon([(15, 0), (15.1, 0), (15.1, 4.99), (15, 4.99)]),
]
obstacle = shapely.union_all(obstacles)
walkable_area = pedpy.WalkableArea(shapely.difference(area, obstacle))
#pedpy.plot_walkable_area(walkable_area=walkable_area)

In [None]:
spawning_area1 = Polygon([(0, 0), (0, 5), (15, 5), (15, 0)])
spawning_area2 = Polygon([(4.1, 8), (19, 8), (19, 14.5), (4.1, 14.5)])
pos_in_spawning_areas = [
    jps.distributions.distribute_by_number(
    polygon=spawning_area2,
    number_of_agents=num_agents,
    distance_to_agents=0.4,
    distance_to_polygon=0.3,
    seed=seed,
),
  jps.distributions.distribute_by_number(
    polygon=spawning_area1,
    number_of_agents=num_agents,
    distance_to_agents=0.4,
    distance_to_polygon=0.3,
    seed=seed,
)
]

In [None]:
def plot_simulation_configuration(
    walkable_area, starting_positions, exits
):
    axes = pedpy.plot_walkable_area(walkable_area=walkable_area)
    for exit_area in exits:
        axes.fill(*exit_area.exterior.xy, color="indianred")

    for starting_position in starting_positions:
      
        axes.scatter(*zip(*starting_position), s=1, color='gray')    

    axes.set_xlabel("x/m")
    axes.set_ylabel("y/m")
    axes.set_aspect("equal")
    for idx, waypoint in enumerate(waypoints):
        axes.plot(waypoint[0], waypoint[1], "ro")
        axes.annotate(
            f"WP {idx+1}",
            (waypoint[0], waypoint[1]),
            textcoords="offset points",
            xytext=(10, -15),
            ha="center",
        )
        circle = Circle(
            (waypoint[0], waypoint[1]),
            distance_to_waypoints,
            fc="red",
            ec="red",
            alpha=0.1,
        )
        axes.add_patch(circle)


plot_simulation_configuration(
    walkable_area, pos_in_spawning_areas, exits
)

## Specification of Parameters und Running the Simulation

Now we just need to define the details of the operational model as well as the exit.

In [None]:
simulation = jps.Simulation(
    # model=jps.GeneralizedCentrifugalForceModel(
    #     max_neighbor_repulsion_force=8,
    #     max_geometry_repulsion_force=5,
    #     max_neighbor_interaction_distance=2,
    #     max_neighbor_interpolation_distance=0.1,
    #     strength_neighbor_repulsion=0.3,
    # ),
    model=jps.SocialForceModel(),
    #model=jps.CollisionFreeSpeedModel(),
    geometry=walkable_area.polygon,
    trajectory_writer=jps.SqliteTrajectoryWriter(
        output_file=pathlib.Path(trajectory_file)
    ),
)
exit_ids = [simulation.add_exit_stage(exit_area.exterior.coords[:-1]) for exit_area in exits]

## Define Journey for people in the upper room going right


In [None]:
waypoint_ids = [
    simulation.add_waypoint_stage(waypoint, distance_to_waypoints)
    for waypoint in waypoints
]
journey_up_right= jps.JourneyDescription([*waypoint_ids, *exit_ids])
journey_up_right.set_transition_for_stage(
        waypoint_ids[0],
        jps.Transition.create_fixed_transition(
            waypoint_ids[2]),
    )
journey_up_right.set_transition_for_stage(
        waypoint_ids[2],
        jps.Transition.create_fixed_transition(
            exit_ids[1]),
    )
journey_up_right_id = simulation.add_journey(journey_up_right)

In [None]:
# unbenutzt: draft
journey1 = jps.JourneyDescription([exit_ids[0]])
journey2 = jps.JourneyDescription([exit_ids[1]])
journey1_id = simulation.add_journey(journey1)
journey2_id = simulation.add_journey(journey2)

## Define Journey for people in the upper room going left


In [None]:
journey_up_left= jps.JourneyDescription([*waypoint_ids, *exit_ids])
journey_up_left.set_transition_for_stage(
        waypoint_ids[0],
        jps.Transition.create_fixed_transition(
            exit_ids[0]),
    )
journey_up_left_id = simulation.add_journey(journey_up_left)

## Define Journey for people in the lower room going right

In [None]:
journey_down_right= jps.JourneyDescription([waypoint_ids[1], waypoint_ids[2], *exit_ids])
journey_down_right.set_transition_for_stage(
        waypoint_ids[1],
        jps.Transition.create_fixed_transition(
            waypoint_ids[2]),
    )
journey_down_right.set_transition_for_stage(
        waypoint_ids[2],
        jps.Transition.create_fixed_transition(
            exit_ids[1]),
    )
journey_down_right_id = simulation.add_journey(journey_down_right)

## Define Journey for people in the lower room going left

In [None]:
journey_down_left= jps.JourneyDescription([*waypoint_ids, *exit_ids])
journey_down_left.set_transition_for_stage(
        waypoint_ids[1],
        jps.Transition.create_fixed_transition(
            exit_ids[0]),
    )
journey_down_left_id = simulation.add_journey(journey_down_left)

## Add agents

First, add leader, then its followers.

In [None]:
ids_up = set(
    [
        simulation.add_agent(
            jps.SocialForceModelAgentParameters(
                desiredSpeed=0,
                radius=0.1,
                journey_id=journey_up_right_id,
                stage_id=waypoint_ids[0],
                position=pos,
            )
        )
        for pos in pos_in_spawning_areas[0]
    ]
)
ids_down = set(
    [
        simulation.add_agent(
            jps.SocialForceModelAgentParameters(
                desiredSpeed=v0,
                radius=0.1,
                journey_id=journey_down_right_id,
                stage_id=waypoint_ids[1],
                position=pos,
            )
        )
        for pos in pos_in_spawning_areas[1]
    ]
)
first_agents = list(ids_up)[0]

## Simulation loop

In [None]:
simulation.iterate(premovement_time*int(1/simulation.delta_time()))
while simulation.agent_count() > 0:# and simulation.iteration_count() < 100000:
    t = simulation.elapsed_time() # seconds
    if t % update_time == 0:
        print(f"Time: {t:3.2f}, Agents: {simulation.agent_count()}")
        for agent in simulation.agents():
            condition = False #simulation.iteration_count() > 300
            x, y = agent.position
            condition = (not vis.wp_is_visible(time=t, x=x, y=y, waypoint_id=1)) and (x < waypoints[2][0])
            v = vis.get_local_visibility(time=t, x=x, y=y, c=c0)
            print(f"{t = }, {x = }, {y = }, {v = }, {calculate_desired_speed(v, c0, v0) = }")
            # getting slower
            agent.model.desiredSpeed = calculate_desired_speed(v, c0, v0)
            # redirection
            if agent.journey_id == journey_up_right_id and condition:
                simulation.switch_agent_journey(agent.id, journey_up_left_id, exit_ids[0])
            if agent.journey_id == journey_down_right_id and condition:
                simulation.switch_agent_journey(agent.id, journey_down_left_id, exit_ids[0])

    simulation.iterate()

print(f"Simulation finished after {simulation.elapsed_time()} seconds.")

## Visualization

Let's have a look at the visualization of the simulated trajectories:

In [None]:
from jupedsim.internal.notebook_utils import animate, read_sqlite_file

trajectory_data, walkable_area = read_sqlite_file(trajectory_file)
animate(trajectory_data, walkable_area, every_nth_frame=100)