### **Exit Choice Models**

This notebook describes two models for computing an agent's probability of selecting an exit based on **distance, congestion, flow, and visibility**.

---

### **Model 1: Standard Flow-Based Choice**
This model considers the effect of **flow only when the exit is visible**. The utility function is:

$$
U_i = \beta_1 \cdot \text{DIST}_i + \beta_2 \cdot \text{CONG}_i + \beta_3 \cdot (\text{FLTOEXIT}_i \cdot \text{VIS}_i)
$$

where:
- $ \text{DIST}_i $ : Distance to exit $ i $ (negative effect)
- $ \text{CONG}_i $ : Congestion at exit $ i $ (negative effect)
- $ \text{FLTOEXIT}_i $ : Flow to exit $ i $ (positive or negative depending on $ \beta_3 $
- $ \text{VIS}_i $ : Visibility indicator (1 if visible, 0 otherwise)
- $ \beta_1, \beta_2, \beta_3 $ : Model parameters

In this model, **flow is ignored for non-visible exits**, meaning that agents only react to visible movement patterns.

---

### **Model 2: Visibility-Specific Flow Sensitivity**
This model **differentiates the effect of flow** based on whether the exit is visible or not. The utility function is:

$$
U_i = \beta_1 \cdot \text{DIST}_i + \beta_2 \cdot \text{CONG}_i + \beta_3 \cdot (\text{FLTOEXIT}_i \cdot \text{VIS}_i) + \beta_4 \cdot (\text{FLTOEXIT}_i \cdot (1 - \text{VIS}_i))
$$

where:
- $ \beta_4 $ allows **flow to have a different influence when the exit is not visible**.
- If ** $ \beta_3 = \beta_4 $**, the model is equivalent to **Model 1**.
- This model accounts for the possibility that agents may still infer congestion even when they **cannot directly see** an exit.

---

### **Choosing Between Models**
- **If agents primarily react to visible flow**, Model 1 is sufficient.
- **If agents are influenced by unseen movement (e.g., hearing or indirect cues)**, Model 2 provides a more nuanced approach.
- Model 2 allows testing whether unseen flow has a **weaker, equal, or stronger** influence compared to visible flow.


In [None]:
import jupedsim as jps
from shapely import wkt
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from shapely.geometry import Point, LineString
import numpy as np
from jupedsim.internal.notebook_utils import animate, read_sqlite_file

In [None]:
geometry_file = "files/routing.wkt"
with open(geometry_file, "r") as myfile:
    data = myfile.read().strip()

# Load the main GEOMETRYCOLLECTION
geometry_collection = wkt.loads(data)

# Extract main collections
polygons = list(geometry_collection.geoms)  # Unpack main-level geometries

# Extract walkable area
area = polygons[0]  # if len(polygons) > 0 else None

# Extract exits (second collection)
exit_polygons = []
if len(polygons) > 1:
    exits = polygons[1]  # GEOMETRYCOLLECTION of exits
    exit_polygons = list(exits.geoms)

# Extract distributions (third collection)
distribution_polygons = []
if len(polygons) > 2:
    distributions = polygons[2]  # GEOMETRYCOLLECTION of distributions
    distribution_polygons = list(distributions.geoms)

print(f"Exits: {len(exit_polygons)} polygons")
print(f"Distributions: {len(distribution_polygons)} polygons")

In [None]:
def plot_geometry(geometry_collection, positions=None):
    """Plot the geometry collection with different colors for each geometry type."""
    fig, ax = plt.subplots(figsize=(8, 8))
    if geometry_collection.is_empty:
        print("Skipping empty geometry.")
        return
    polygons = list(geometry_collection.geoms)  # Unpack main-level geometries
    area = polygons[0] if len(polygons) > 0 else None
    # Extract exits (second collection)
    if len(polygons) > 1:
        exits = polygons[1]

    # Extract distributions (third collection)
    if len(polygons) > 2:
        distributions = polygons[2]

    for geom in area.geoms:
        x, y = geom.exterior.xy
        plt.fill(x, y, alpha=0.1, color="gray")
        for hole in geom.interiors:
            hx, hy = hole.xy
            ax.fill(hx, hy, alpha=1, edgecolor="gray", facecolor="white")

    for e in exits.geoms:
        x, y = e.exterior.xy
        plt.fill(x, y, alpha=0.3, color="red")

    for d in distributions.geoms:
        x, y = d.exterior.xy
        plt.fill(x, y, alpha=0.3, color="green")

    for position in positions:
        x, y = position[0], position[1]
        plt.plot(x, y, "o", color="blue")    

    legend_elements = {
        "area": mpatches.Patch(color="gray", label="Walkable Area"),
        "exits": mpatches.Patch(color="red", label="Exits"),
        "distribution": mpatches.Patch(color="green", label="Distribution Zones"),
    }
    handles = [
        legend_elements[key]
        for key in ["area", "exits", "distribution"]
        if key in legend_elements
    ]
    ax.legend(
        handles=handles,
        loc="upper center",
        bbox_to_anchor=(0.5, 1.05),
        ncol=len(handles),
        frameon=False,
    )
    ax.set_xlabel("X [m]")
    ax.set_ylabel("Y [m]")
    plt.show()

In [None]:
num_agents = 100
simulation = jps.Simulation(
    model=jps.CollisionFreeSpeedModel(),
    geometry=area,
    trajectory_writer=jps.SqliteTrajectoryWriter(output_file="trajectories.sqlite"),
)
exit_ids = []
for e in exit_polygons:
    exit_id = simulation.add_exit_stage(e)
    exit_ids.append(exit_id)

journey_ids = {}
for exit_id in exit_ids:
    journey = jps.JourneyDescription([exit_id])
    journey_id = simulation.add_journey(journey)
    journey_ids[exit_id] = journey_id

positions = jps.distributions.distribute_by_number(
    polygon=distribution_polygons[0],
    number_of_agents=num_agents,
    distance_to_agents=0.4,
    distance_to_polygon=0.15,
    seed=1234,
)

In [None]:
plot_geometry(geometry_collection, positions)

In [None]:
N2 = len(positions)
for position in positions[0:N2]:
    simulation.add_agent(
        jps.CollisionFreeSpeedModelAgentParameters(
            position=position,
            stage_id=exit_ids[1],
            journey_id=journey_ids[exit_ids[1]],
            radius=0.15,
        )
    )

for position in positions[N2:]:
    simulation.add_agent(
        jps.CollisionFreeSpeedModelAgentParameters(
            position=position, 
            stage_id=exit_ids[2],
            journey_id=journey_ids[exit_ids[2]], 
            radius = 0.15,
        )
    )

In [None]:
def distance(agent_position: Point, exit_point:Point) -> float:
    """Calculate the distance between an agent and an exit."""
    p1 = agent_position #Point(agent_position.x, agent_position.y)
    return p1.distance(exit_point)

In [None]:
def congestion(simulation, radius):
    """Count how many agents are within a given radius around each exit."""
    congestion = {}
    
    for exit_id, exit_polygon in zip(exit_ids, exit_polygons):
        exit_point = exit_polygon.centroid
        congestion[exit_id] = 0  # Initialize count
        #print(f"\nChecking Exit {exit_id} at {exit_point}")
        for agent in simulation.agents():
            agent_pos = Point(agent.position)  # Convert agent position to Point
            dist = distance(exit_point, agent_pos)
            #print(f"Agent {agent.id} at {agent_pos}, Distance to exit: {dist}, Radius: {radius}")
            if dist <= radius:
                congestion[exit_id] += 1
                #print(f"  -> Agent {agent.id} is within radius!")
        
        #print(f"Total agents near Exit {exit_id}: {congestion[exit_id]}\n")
    
    return congestion


In [None]:
def flow(simulation):
    flow = {}
    for agent in simulation.agents():
        if agent.stage_id in flow:
            flow[agent.stage_id] += 1
        else:
            flow[agent.stage_id] = 1
    return flow

In [None]:
def visibility(point, exit_point, area):
    """Check if there is a clear line-of-sight between a point and the exit.
    Returns True if visible, False if blocked by the area geometry.
    """
    line_of_sight = LineString([point, exit_point])
    polygon_coords = list(area.geoms[0].exterior.coords)
    # Check each segment of the boundary
    for i in range(len(polygon_coords) - 1):
        segment = LineString([polygon_coords[i], polygon_coords[i + 1]])
        if line_of_sight.intersects(segment):
            #print(
            #    f"Line intersects polygon segment from {polygon_coords[i]} to {polygon_coords[i+1]}"
            #)
            #print(f"Intersection: {line_of_sight.intersection(segment)}")
            return False

    # Also check the interior rings (holes) if any
    for interior in area.geoms[0].interiors:
        interior_coords = list(interior.coords)
        for i in range(len(interior_coords) - 1):
            segment = LineString([interior_coords[i], interior_coords[i + 1]])
            if line_of_sight.intersects(segment):
                #print(
                #    f"Line intersects hole segment from {interior_coords[i]} to {interior_coords[i+1]}"
                #)
                #print(f"Intersection: {line_of_sight.intersection(segment)}")
                return False
    return True

In [None]:
def compute_agent_exit_probabilities0(
    agent, area, exit_ids, exit_polygons, beta_dist, beta_cong, beta_flow, radius, simulation
):
    """Compute the probability of a single agent choosing each exit using a logit model."""
    congestion_counts = congestion(simulation, radius=radius)    
    flow_counts = flow(simulation)
    utilities = {}

    for exit_id, exit_polygon in zip(exit_ids, exit_polygons):
        exit_point = exit_polygon.centroid
        dist = distance(Point(agent.position), exit_point)
        cong = congestion_counts.get(exit_id, 0)
        flw = flow_counts.get(exit_id, 0)
        vis = 1 if visibility(Point(agent.position), exit_point, area) else 0

        # Compute utility
        utilities[exit_id] = (
            beta_dist * dist + beta_cong * cong + beta_flow * (flw * vis)
        )

    # Compute probabilities using softmax
    exp_utilities = {eid: np.exp(U) for eid, U in utilities.items()}
    sum_exp_U = sum(exp_utilities.values())
    probabilities = {eid: exp_utilities[eid] / sum_exp_U for eid in utilities.keys()}

    return probabilities

In [None]:
def compute_agent_exit_probabilities(
    agent, area, exit_ids, exit_polygons, beta_dist, beta_cong, beta_flow, beta_flow_invis, radius, simulation, model_type="model1"
):
    """Compute the probability of a single agent choosing each exit using a logit model.
    Supports different model types: 'model1' (single flow term) or 'model2' (separate flow terms for visible/invisible exits)."""
    congestion_counts = congestion(simulation, radius=radius)
    flow_counts = flow(simulation)
    utilities = {}

    for exit_id, exit_polygon in zip(exit_ids, exit_polygons):
        exit_point = exit_polygon.centroid
        dist = distance(Point(agent.position), exit_point)
        cong = congestion_counts.get(exit_id, 0)
        flw = flow_counts.get(exit_id, 0)
        vis = 1 if visibility(Point(agent.position), exit_point, area) else 0

        if model_type == "model1":
            # Standard model: Flow only when visible
            utilities[exit_id] = beta_dist * dist + beta_cong * cong + beta_flow * (flw * vis)
        elif model_type == "model2":
            # Extended model: Different impact of flow when visible vs not visible
            utilities[exit_id] = (
                beta_dist * dist + beta_cong * cong + beta_flow * (flw * vis) + beta_flow_invis * (flw * (1 - vis))
            )
        else:
            raise ValueError("Invalid model_type. Choose 'model1' or 'model2'.")

    # Compute probabilities using softmax
    exp_utilities = {eid: np.exp(U) for eid, U in utilities.items()}
    sum_exp_U = sum(exp_utilities.values())
    probabilities = {eid: exp_utilities[eid] / sum_exp_U for eid in utilities.keys()}

    return probabilities

In [None]:
# Dictionary to track commitment time per agent
commitment_time = {}
def update_exit_choices(simulation, exit_ids, journey_ids, probabilities, min_commit_time=5):
    """Update agent exit choices with inertia (commitment time)."""
    global commitment_time  # Store commitment times

    for agent in simulation.agents():   
        max_prob = max(probabilities.values())

        # Identify all exits with the highest probability
        best_exits = [eid for eid, prob in probabilities.items() if prob == max_prob]

        # If multiple exits have the same probability, skip switching
        if len(best_exits) > 1:
            continue  # Prevents unnecessary switching when probabilities are tied

        new_exit_id = best_exits[0]  # Choose the best exit (single highest)
           
#        new_exit_id = max(probabilities, key=probabilities.get)
        # Ensure the agent stays committed for min_commit_time steps
        if agent.id in commitment_time:
            if commitment_time[agent.id] > 0:
                commitment_time[agent.id] -= 1  # Decrease counter
                continue  # Skip switching
        
        # Skip switch if the agent is already at the best exit
        if new_exit_id == agent.stage_id:
            continue

        # Commit agent to the new exit and reset the commitment timer
        simulation.switch_agent_journey(agent.id, journey_ids[new_exit_id], new_exit_id)
        commitment_time[agent.id] = min_commit_time  # Reset commitment timer

In [None]:
beta_dist = -1.0
beta_cong=-0.
beta_flow=0.8
beta_flow_invis=0.5
radius=2
min_commit_time=1
while simulation.agent_count() > 0 and simulation.iteration_count() < 10000:
    simulation.iterate()
    if simulation.iteration_count() % 1000 == 0:
        print(f"Time: {simulation.elapsed_time()}s",end="\r")
        for agent in simulation.agents():
            probabilities = compute_agent_exit_probabilities(
                agent,
                area,
                exit_ids,
                exit_polygons,
                model_type="model2",
                beta_dist=beta_dist,
                beta_cong=beta_cong,
                beta_flow=beta_flow,
                beta_flow_invis=beta_flow_invis,            
                radius=radius,
                simulation=simulation,
            )
        update_exit_choices(simulation, exit_ids, journey_ids, probabilities, min_commit_time=min_commit_time)

In [None]:
traj, geo = read_sqlite_file("trajectories.sqlite")
note = f"CONGESTION: {beta_cong}, FLOW: {beta_flow}, DISTANCE: {beta_dist}"
animate(traj, geo, title_note=note)