# Tutorial: Ammonia Emergency Route Optimization

This notebook is a production-grade, ready-to-run walkthrough for planning **safe evacuation routes** around an ammonia release using `pyELDQM`.

It covers:
- scenario configuration (source, weather, AEGL thresholds, shelters)
- Gaussian dispersion simulation and AEGL zone extraction
- road network download and risk-based edge classification
- shelter ranking and optimal route selection
- interactive Folium map with safe/unsafe roads and best route displayed inline

## 1) Runtime Notes

This notebook supports two execution modes:
1. **Installed package mode** (`pip install pyeldqm`)
2. **Workspace mode** (running inside the project repository)

The import cell automatically detects and handles both.

In [1]:
from __future__ import annotations
import sys
from math import radians, cos, sin, asin, sqrt
from pathlib import Path
from datetime import datetime
import numpy as np
import folium
from IPython.display import display, Markdown, HTML

IMPORT_MODE = "package"
try:
    from pyeldqm.core.dispersion_models.gaussian_model import calculate_gaussian_dispersion
    from pyeldqm.core.meteorology.stability import get_stability_class
    from pyeldqm.core.chemical_database import ChemicalDatabase
    from pyeldqm.core.utils.features import setup_computational_grid
    from pyeldqm.core.utils.zone_extraction import extract_zones
    from pyeldqm.core.visualization import (
        add_zone_polygons,
        ensure_layer_control,
        fit_map_to_polygons,
    )
    from pyeldqm.core.evacuation import (
        build_road_graph,
        classify_edges_with_risk,
        rank_shelters,
    )
except ModuleNotFoundError:
    IMPORT_MODE = "workspace"
    cwd = Path.cwd().resolve()
    candidates = [cwd] + list(cwd.parents)
    pkg_dir = next((p / "pyeldqm" for p in candidates if (p / "pyeldqm").exists()), None)
    if pkg_dir is None:
        raise RuntimeError(
            "Could not resolve project root. Run from repository root or install pyeldqm via pip."
        )
    project_root = pkg_dir.parent
    if str(project_root) not in sys.path:
        sys.path.insert(0, str(project_root))

    from pyeldqm.core.dispersion_models.gaussian_model import calculate_gaussian_dispersion
    from pyeldqm.core.meteorology.stability import get_stability_class
    from pyeldqm.core.chemical_database import ChemicalDatabase
    from pyeldqm.core.utils.features import setup_computational_grid
    from pyeldqm.core.utils.zone_extraction import extract_zones
    from pyeldqm.core.visualization import (
        add_zone_polygons,
        ensure_layer_control,
        fit_map_to_polygons,
    )
    from pyeldqm.core.evacuation import (
        build_road_graph,
        classify_edges_with_risk,
        rank_shelters,
    )

display(Markdown(f"**Import mode:** `{IMPORT_MODE}`"))


**Import mode:** `workspace`

## 2) Scenario Configuration

Update these values to model your own scenario.

In [2]:
# Chemical
CHEMICAL_NAME = "AMMONIA"
MOLECULAR_WEIGHT = 17.03              # g/mol
# Release location
SOURCE_LAT = 31.6911
SOURCE_LON = 74.0822
TIMEZONE_OFFSET_HRS = 5.0
# Release parameters
RELEASE_TYPE = "single"               # single | multi
SOURCE_TERM_MODE = "continuous"       # continuous | instantaneous
RELEASE_RATE = 800.0                  # g/s (continuous)
TANK_HEIGHT = 3.0                     # m
DURATION_MINUTES = 30.0               # min
MASS_RELEASED_KG = 500.0              # kg (instantaneous mode)
TERRAIN_ROUGHNESS = "URBAN"           # URBAN | RURAL
RECEPTOR_HEIGHT_M = 1.5               # m
# Weather
WEATHER_MODE = "manual"
WIND_SPEED = 2.5                      # m/s
WIND_DIRECTION = 90.0                 # degrees
TEMPERATURE_C = 25.0                  # C
HUMIDITY = 65.0                       # %
CLOUD_COVER = 30.0                    # %
# AEGL thresholds (ppm)
AEGL_THRESHOLDS = {
    "AEGL-1": 30.0,
    "AEGL-2": 160.0,
    "AEGL-3": 1100.0,
}
# Computational grid
X_MAX = 12000
Y_MAX = 4000
NX = 500
NY = 500
# Route optimization parameters
ROUTE_RADIUS_M = 4000.0               # road network download radius (m)
ROUTE_BUFFER_M = 150.0                # proximity buffer for unsafe edge classification (m)
SHOW_UNSAFE_ROADS = True              # draw unsafe road segments on map
# Shelter candidates
SHELTERS = [
    {"name": "North-West", "lat": 31.7111, "lon": 74.0622},
    {"name": "North",      "lat": 31.7211, "lon": 74.0822},
    {"name": "East-South", "lat": 31.6711, "lon": 74.1122},
]

assert NX > 10 and NY > 10, "Grid resolution is too low."
assert X_MAX > 0 and Y_MAX > 0, "Grid extents must be positive."
assert WIND_SPEED > 0, "Wind speed must be > 0."
assert len(SHELTERS) > 0, "At least one shelter must be defined."
print("Scenario configured successfully.")
print("All outputs will be shown inline in this notebook.")

Scenario configured successfully.
All outputs will be shown inline in this notebook.


## 3) Helper Functions

Distance utilities for reporting zone reach and evaluating route lengths.

In [14]:
def haversine_km(lat1, lon1, lat2, lon2):
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
    return 2 * asin(sqrt(a)) * 6371.0


def max_distance_from_source_km(polygon, src_lat, src_lon):
    if polygon is None or polygon.is_empty:
        return None
    max_d = 0.0
    try:
        for lon, lat in polygon.exterior.coords:
            d = haversine_km(src_lat, src_lon, lat, lon)
            if d > max_d:
                max_d = d
    except Exception:
        return None
    return max_d if max_d > 0 else None


def path_length_m(G, path_nodes):
    """Accumulate edge lengths (m) along an ordered list of graph nodes."""
    if not path_nodes or len(path_nodes) < 2:
        return 0.0
    total = 0.0
    for u, v in zip(path_nodes[:-1], path_nodes[1:]):
        edge_data = G.get_edge_data(u, v)
        if not edge_data:
            continue
        lengths = [float(attrs.get("length", 0.0)) for _, attrs in edge_data.items()]
        if lengths:
            total += min(lengths)
    return total

## 4) Chemical Properties

Fetch core records from the chemical database for transparency and traceability.

In [15]:
print("=" * 72)
print("CHEMICAL PROPERTIES")
print("=" * 72)
try:
    db = ChemicalDatabase()
    chem_data = next(
        (c for c in db.get_all_chemicals() if c.get("name") == CHEMICAL_NAME),
        None,
    )
    if chem_data:
        fields = [
            ("Name",                  "name"),
            ("Molecular Weight",      "molecular_weight"),
            ("Boiling Point (K)",     "boiling_point_K"),
            ("Density (kg/m3)",       "density_kgm3"),
            ("Vapor Pressure (kPa)",  "vapor_pressure_kPa"),
            ("IDLH (ppm)",            "idlh_ppm"),
            ("LC50 (ppm)",            "lc50_ppm"),
            ("AEGL-1 (60 min)",       "aegl1_60min"),
            ("AEGL-2 (60 min)",       "aegl2_60min"),
            ("AEGL-3 (60 min)",       "aegl3_60min"),
            ("ERPG-1",                "erpg1"),
            ("ERPG-2",                "erpg2"),
            ("ERPG-3",                "erpg3"),
        ]
        for label, key in fields:
            value = chem_data.get(key)
            if value not in (None, "", 0):
                print(f"{label:<26}: {value}")
    else:
        print(f"Name                     : {CHEMICAL_NAME}")
        print(f"Molecular Weight         : {MOLECULAR_WEIGHT} g/mol")
        print("Database record not found; using configured molecular weight.")
except Exception as ex:
    print(f"Name                     : {CHEMICAL_NAME}")
    print(f"Molecular Weight         : {MOLECULAR_WEIGHT} g/mol")
    print(f"Database query failed     : {ex}")

CHEMICAL PROPERTIES
Name                      : AMMONIA
Molecular Weight          : 17.03
AEGL-1 (60 min)           : 30 ppm
AEGL-2 (60 min)           : 160 ppm
AEGL-3 (60 min)           : 1100 ppm
ERPG-1                    : 25
ERPG-2                    : 150
ERPG-3                    : 750


## 5) Run Dispersion and Extract AEGL Zones

In [16]:
simulation_datetime = datetime.now()
weather = {
    "wind_speed":    WIND_SPEED,
    "wind_dir":      WIND_DIRECTION,
    "temperature_K": TEMPERATURE_C + 273.15,
    "humidity":      HUMIDITY / 100.0,
    "cloud_cover":   CLOUD_COVER / 100.0,
    "source":        "manual",
}
stability_class = get_stability_class(
    wind_speed=weather["wind_speed"],
    datetime_obj=simulation_datetime,
    latitude=SOURCE_LAT,
    longitude=SOURCE_LON,
    cloudiness_index=weather.get("cloud_cover", 0) * 10,
    timezone_offset_hrs=TIMEZONE_OFFSET_HRS,
)
X, Y, _, _ = setup_computational_grid(x_max=X_MAX, y_max=Y_MAX, nx=NX, ny=NY)
release_duration_s = DURATION_MINUTES * 60.0
source_q = RELEASE_RATE
dispersion_mode = "continuous" if SOURCE_TERM_MODE == "continuous" else "instantaneous"
concentration, U_local, resolved_stability, resolved_sources = calculate_gaussian_dispersion(
    weather=weather,
    X=X,
    Y=Y,
    source_lat=SOURCE_LAT,
    source_lon=SOURCE_LON,
    molecular_weight=MOLECULAR_WEIGHT,
    default_release_rate=source_q,
    default_height=TANK_HEIGHT,
    z_ref=3.0,
    z_measurement=RECEPTOR_HEIGHT_M,
    t=release_duration_s,
    t_r=release_duration_s,
    mode=dispersion_mode,
    sources=[{
        "lat": SOURCE_LAT, "lon": SOURCE_LON,
        "name": "Release Source",
        "height": TANK_HEIGHT, "rate": source_q,
        "color": "red",
    }],
    latitude=SOURCE_LAT,
    longitude=SOURCE_LON,
    timezone_offset_hrs=TIMEZONE_OFFSET_HRS,
    roughness=TERRAIN_ROUGHNESS,
    datetime_obj=simulation_datetime,
)
threat_zones = extract_zones(
    X, Y, concentration, AEGL_THRESHOLDS,
    SOURCE_LAT, SOURCE_LON, wind_dir=weather["wind_dir"],
)
print(f"Simulation time       : {simulation_datetime}")
print(f"Resolved wind speed   : {U_local:.2f} m/s")
print(f"Stability class       : {resolved_stability}")
print(f"Grid shape            : {concentration.shape}")

Simulation time       : 2026-02-23 21:54:40.069413
Resolved wind speed   : 2.50 m/s
Stability class       : F
Grid shape            : (500, 500)


## 6) Report Zone Distances

Maximum distance is measured from source to polygon boundary.

In [17]:
print("=" * 72)
print("METEOROLOGICAL CONDITIONS")
print("=" * 72)
print(f"Wind Speed            : {weather['wind_speed']:.2f} m/s")
print(f"Wind Direction        : {weather['wind_dir']:.0f} deg")
print(f"Temperature           : {weather['temperature_K'] - 273.15:.1f} C")
print(f"Humidity              : {weather['humidity'] * 100:.0f} %")
print(f"Cloud Cover           : {weather['cloud_cover'] * 100:.0f} %")
print(f"Stability Class       : {resolved_stability}")
print()
print("=" * 72)
print("AEGL THREAT ZONE DISTANCES FROM SOURCE")
print("=" * 72)
for zone_name in ["AEGL-3", "AEGL-2", "AEGL-1"]:
    poly = threat_zones.get(zone_name)
    threshold = AEGL_THRESHOLDS.get(zone_name, "N/A")
    threshold_str = f"{threshold:.0f} ppm" if isinstance(threshold, (int, float)) else str(threshold)
    dist_km = max_distance_from_source_km(poly, SOURCE_LAT, SOURCE_LON) if poly else None
    if dist_km is not None:
        print(
            f"{zone_name:<8} threshold: {threshold_str:<10} max distance: {dist_km:.3f} km ({dist_km * 1000:.0f} m)"
        )
    else:
        print(f"{zone_name:<8} threshold: {threshold_str:<10} max distance: -- (no zone formed)")

METEOROLOGICAL CONDITIONS
Wind Speed            : 2.50 m/s
Wind Direction        : 90 deg
Temperature           : 25.0 C
Humidity              : 65 %
Cloud Cover           : 30 %
Stability Class       : F

AEGL THREAT ZONE DISTANCES FROM SOURCE
AEGL-3   threshold: 1100 ppm   max distance: -- (no zone formed)
AEGL-2   threshold: 160 ppm    max distance: 0.622 km (622 m)
AEGL-1   threshold: 30 ppm     max distance: 1.769 km (1769 m)


## 7) Build Road Network and Classify Edges

Downloads the road graph within `ROUTE_RADIUS_M` of the source using OSMnx, then labels each
edge as **safe** (outside threat zones) or **unsafe** (within `ROUTE_BUFFER_M` of any zone).

In [18]:
print(f"Downloading road graph  : radius = {ROUTE_RADIUS_M:.0f} m")
G = build_road_graph(SOURCE_LAT, SOURCE_LON, ROUTE_RADIUS_M)
print(f"Graph nodes             : {len(G.nodes):,}")
print(f"Graph edges             : {len(G.edges):,}")

print(f"Classifying edges       : buffer = {ROUTE_BUFFER_M:.0f} m")
safe_gdf, unsafe_gdf = classify_edges_with_risk(
    G, threat_zones, proximity_buffer_m=ROUTE_BUFFER_M
)
print(f"Safe road segments      : {len(safe_gdf):,}")
print(f"Unsafe road segments    : {len(unsafe_gdf):,}")

Downloading road graph  : radius = 4000 m
Graph nodes             : 560
Graph edges             : 1,406
Classifying edges       : buffer = 150 m
Safe road segments      : 1,402
Unsafe road segments    : 4


## 8) Rank Shelters and Select Best Route

Ranks each shelter by distance from the source via the road network, then selects the
recommended shelter with the shortest traversable path.

In [19]:
shelter_points = [(s["lat"], s["lon"]) for s in SHELTERS]
ranking = rank_shelters(G, (SOURCE_LAT, SOURCE_LON), shelter_points)
best = next((r for r in ranking if "path" in r), None)

print("=" * 72)
print("SHELTER RANKING")
print("=" * 72)
for i, r in enumerate(ranking):
    dist_m = path_length_m(G, r.get("path", []))
    name = next(
        (s["name"] for s in SHELTERS
         if abs(s["lat"] - r.get("lat", -999)) < 1e-9 and abs(s["lon"] - r.get("lon", -999)) < 1e-9),
        f"Shelter {i + 1}",
    )
    tag = "  <-- RECOMMENDED" if r is best else ""
    if dist_m > 0:
        print(f"  {i + 1}. {name:<14} distance: {dist_m / 1000:.3f} km ({dist_m:.0f} m){tag}")
    else:
        print(f"  {i + 1}. {name:<14} distance: -- (no path found){tag}")

if best:
    best_dist_m = path_length_m(G, best.get("path", []))
    print()
    print(f"Best route distance     : {best_dist_m / 1000:.3f} km ({best_dist_m:.0f} m)")
else:
    print()
    print("No routable shelter found.")

SHELTER RANKING
  1. North-West     distance: 4.038 km (4038 m)  <-- RECOMMENDED
  2. East-South     distance: 4.657 km (4657 m)
  3. North          distance: 5.637 km (5637 m)

Best route distance     : 4.038 km (4038 m)


## 9) Build Route Map (Inline)

In [20]:
m = folium.Map(location=[SOURCE_LAT, SOURCE_LON], zoom_start=13, tiles="OpenStreetMap")

# AEGL hazard zones
add_zone_polygons(
    m,
    {k: v for k, v in threat_zones.items() if k in ["AEGL-1", "AEGL-2", "AEGL-3"]},
    thresholds_context={"AEGL": AEGL_THRESHOLDS},
    name_prefix=None,
)

# Safe road segments (green)
for _, row in safe_gdf.iterrows():
    geom = row.geometry
    if geom is None:
        continue
    folium.PolyLine(
        [(lat, lon) for lon, lat in geom.coords],
        color="#00AA00", weight=3, opacity=0.9,
    ).add_to(m)

# Unsafe road segments (red, faded)
if SHOW_UNSAFE_ROADS:
    for _, row in unsafe_gdf.iterrows():
        geom = row.geometry
        if geom is None:
            continue
        folium.PolyLine(
            [(lat, lon) for lon, lat in geom.coords],
            color="#CC0000", weight=3, opacity=0.3,
        ).add_to(m)

# Best route (blue)
if best and best.get("path"):
    route_coords = [
        (G.nodes[n]["y"], G.nodes[n]["x"])
        for n in best["path"] if n in G.nodes
    ]
    if route_coords:
        folium.PolyLine(route_coords, color="#0066FF", weight=5, opacity=1.0).add_to(m)

# Release source marker
folium.Marker([SOURCE_LAT, SOURCE_LON], tooltip="Release Source").add_to(m)

# Shelter markers
for s in SHELTERS:
    is_best = (
        best is not None
        and abs(s["lat"] - best.get("lat", -999)) < 1e-9
        and abs(s["lon"] - best.get("lon", -999)) < 1e-9
    )
    folium.Marker(
        [s["lat"], s["lon"]],
        tooltip=f"RECOMMENDED: {s['name']}" if is_best else s["name"],
    ).add_to(m)

ensure_layer_control(m)
fit_map_to_polygons(m, threat_zones.values())
print("Route map generated. Displaying inline below.")

Route map generated. Displaying inline below.


## 10) Display Route Map Inline

In [21]:
if 'm' in globals() and m is not None:
    display(HTML(m._repr_html_()))
else:
    print("Map object not found. Run Cell 19 first.")

## 11) Inline Results Summary

Quick run summary for tutorial users.

In [22]:
peak_ppm = float(np.nanmax(concentration))
zone_stats = {}
for zone_name in ["AEGL-3", "AEGL-2", "AEGL-1"]:
    poly = threat_zones.get(zone_name)
    zone_stats[zone_name] = max_distance_from_source_km(poly, SOURCE_LAT, SOURCE_LON) if poly else None

best_name = next(
    (s["name"] for s in SHELTERS
     if best and abs(s["lat"] - best.get("lat", -999)) < 1e-9
     and abs(s["lon"] - best.get("lon", -999)) < 1e-9),
    "none",
)
best_dist_m = path_length_m(G, best.get("path", [])) if best else 0.0

print("Completed successfully.")
print(f"Chemical              : {CHEMICAL_NAME}")
print(f"Peak concentration    : {peak_ppm:.2f} ppm")
print(f"Safe road segments    : {len(safe_gdf):,}")
print(f"Unsafe road segments  : {len(unsafe_gdf):,}")
print(f"Recommended shelter   : {best_name}")
print(f"Best route distance   : {best_dist_m / 1000:.3f} km ({best_dist_m:.0f} m)")
print("Max zone distances:")
for z, d in zone_stats.items():
    if d is None:
        print(f"  {z}: no zone formed")
    else:
        print(f"  {z}: {d:.3f} km ({d * 1000:.0f} m)")

Completed successfully.
Chemical              : AMMONIA
Peak concentration    : 852.65 ppm
Safe road segments    : 1,402
Unsafe road segments  : 4
Recommended shelter   : North-West
Best route distance   : 4.038 km (4038 m)
Max zone distances:
  AEGL-3: no zone formed
  AEGL-2: 0.622 km (622 m)
  AEGL-1: 1.769 km (1769 m)


## Troubleshooting

- If imports fail, run from repository root **or** install package mode: `pip install pyeldqm`.

- If the map does not render inline, rerun Cell 19 then Cell 21.

- If zones are empty, increase release rate/duration, reduce thresholds, or expand the grid extent.

- If road graph download fails, check internet connectivity (OSMnx queries OpenStreetMap).

- If no shelter path is found, increase `ROUTE_RADIUS_M` or move shelter coordinates closer to the populated road network.