The route planning between two points of interest in Saudi Arabia, namely KFUPM and Mark & Save in Dhahran is investigated. First, both locations are placed on an OpenStreetMap-based Folium map to satisfy the marker requirement. Then, routes between the same origin and destination are generated using four different methods: Breadth-First Search (BFS), Depth-First Search (DFS), Dijkstra, and Simulated Annealing (SA). Each method is evaluated using two main criteria: processing time (seconds) and route cost measured as total path length (meters).

In [1]:
# --- Environment setup ---

%pip install -q numpy pandas numexpr networkx folium osmnx ipython optalgotools


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/101.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.5/101.5 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.6 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m60.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
# --- Imports ---

# Standard library
import math
import random
import time
import tracemalloc
from functools import lru_cache

# Third-party libraries
import folium
import networkx as nx
import numpy as np
import osmnx as ox
import pandas as pd
from IPython.display import display

# optalgotools
from optalgotools.algorithms.graph_search import BFS, DFS, Dijkstra
from optalgotools.routing import draw_route
from optalgotools.structures import Node

print("✅ Imports loaded successfully.")

✅ Imports loaded successfully.


In [3]:
# --- Define POIs and render marker map ---

# Coordinates are (latitude, longitude)
POI_A = {"name": "KFUPM", "coords": (26.308644952920083, 50.1477036528267)}
POI_B = {"name": "Mark & Save", "coords": (26.269208643454373, 50.21720876347756)}

# Map center
midpoint = (
    (POI_A["coords"][0] + POI_B["coords"][0]) / 2,
    (POI_A["coords"][1] + POI_B["coords"][1]) / 2,
)

# Build map
m_poi = folium.Map(location=list(midpoint), zoom_start=13, tiles="OpenStreetMap")

# Marker styling metadata
marker_specs = [
    (POI_A, {"color": "blue", "icon": "graduation-cap"}),
    (POI_B, {"color": "green", "icon": "shopping-cart"}),
]

for poi, style in marker_specs:
    folium.Marker(
        location=poi["coords"],
        popup=poi["name"],
        tooltip=poi["name"],
        icon=folium.Icon(color=style["color"], icon=style["icon"], prefix="fa"),
    ).add_to(m_poi)

# Fit and export
m_poi.fit_bounds([POI_A["coords"], POI_B["coords"]])
m_poi.save("poi_markers_map.html")
m_poi

In [4]:
# --- Build road-network graph around both POIs ---

def haversine_m(point_a, point_b):
    """
    Great-circle distance (meters) between two points given as (lat, lon).
    """
    lat1, lon1 = point_a
    lat2, lon2 = point_b

    earth_radius_m = 6_371_000.0
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    d_phi = math.radians(lat2 - lat1)
    d_lambda = math.radians(lon2 - lon1)

    a = (
        math.sin(d_phi / 2) ** 2
        + math.cos(phi1) * math.cos(phi2) * math.sin(d_lambda / 2) ** 2
    )
    return 2 * earth_radius_m * math.atan2(math.sqrt(a), math.sqrt(1 - a))


# Use POI names from previous cell
start_coords = POI_A["coords"]
end_coords = POI_B["coords"]

# Straight-line distance and dynamic graph radius
straight_line_m = haversine_m(start_coords, end_coords)
graph_radius_m = int(max(5_000, straight_line_m * 2.0))  # safety margin for route alternatives

# Download drivable road graph
G = ox.graph_from_point(
    center_point=midpoint,
    dist=graph_radius_m,
    network_type="drive",
    simplify=True,
)

print(
    f"✅ Graph ready: {len(G.nodes):,} nodes, "
    f"{len(G.edges):,} edges, radius={graph_radius_m:,} m"
)

✅ Graph ready: 37,847 nodes, 96,071 edges, radius=16,400 m


In [5]:
# --- Snap POIs to nearest graph nodes + shared utilities ---

# Backward-compatible coordinate lookup (works with either polished or old variable names)
start_coords = POI_A["coords"] if "POI_A" in globals() else kfupm
end_coords = POI_B["coords"] if "POI_B" in globals() else mark_and_save

# IMPORTANT: osmnx.nearest_nodes expects X=longitude, Y=latitude
origin_id = ox.distance.nearest_nodes(G, X=start_coords[1], Y=start_coords[0])
destination_id = ox.distance.nearest_nodes(G, X=end_coords[1], Y=end_coords[0])

origin_node = Node(graph=G, osmid=origin_id)
destination_node = Node(graph=G, osmid=destination_id)

print(f"Origin node ID:      {origin_id}")
print(f"Destination node ID: {destination_id}")


def to_osmid(node_or_id):
    """Convert optalgotools Node or int-like value to OSM node ID (int)."""
    return int(node_or_id.osmid) if hasattr(node_or_id, "osmid") else int(node_or_id)


@lru_cache(maxsize=500_000)
def edge_len_m(u, v):
    """
    Return edge length in meters for u->v.
    Supports both DiGraph and MultiDiGraph; for multi-edges returns min length.
    """
    data = G.get_edge_data(u, v, default=None)
    if data is None:
        return math.inf

    # DiGraph format: {'length': ...}
    if isinstance(data, dict) and "length" in data:
        try:
            return float(data["length"])
        except (TypeError, ValueError):
            return math.inf

    # MultiDiGraph format: {key1: {'length': ...}, key2: {'length': ...}}
    if isinstance(data, dict):
        lengths = []
        for attrs in data.values():
            if isinstance(attrs, dict) and "length" in attrs:
                try:
                    lengths.append(float(attrs["length"]))
                except (TypeError, ValueError):
                    continue
        if lengths:
            return min(lengths)

    return math.inf


def route_cost_m(route):
    """
    Compute total route length in meters for a route of Node objects and/or IDs.
    Returns math.inf if any hop has no valid edge length.
    """
    node_ids = [to_osmid(x) for x in route]
    total_m = 0.0

    for u, v in zip(node_ids[:-1], node_ids[1:]):
        w = edge_len_m(u, v)
        if math.isinf(w):
            return math.inf
        total_m += w

    return total_m


def dedupe_consecutive(sequence):
    """
    Remove consecutive duplicates while preserving order.
    Example: [1,1,2,2,3] -> [1,2,3]
    """
    if not sequence:
        return sequence
    deduped = [sequence[0]]
    for item in sequence[1:]:
        if item != deduped[-1]:
            deduped.append(item)
    return deduped


def run_with_metrics(fn):
    """
    Execute callable and return:
    (output, elapsed_seconds, peak_memory_bytes)
    """
    tracemalloc.start()
    t0 = time.perf_counter()
    try:
        output = fn()
    finally:
        elapsed_s = time.perf_counter() - t0
        _, peak_bytes = tracemalloc.get_traced_memory()
        tracemalloc.stop()

    return output, elapsed_s, peak_bytes

Origin node ID:      6582857094
Destination node ID: 8386140541


In [6]:
# --- BFS: run, validate, and report metrics ---

solution_bfs, bfs_time_s, bfs_peak_bytes = run_with_metrics(
    lambda: BFS(origin_node, destination_node)
)

route_bfs = list(solution_bfs.result) if getattr(solution_bfs, "result", None) else []

if not route_bfs:
    raise RuntimeError(
        "BFS did not find a route. Try increasing graph_radius_m in the graph-building cell."
    )

bfs_cost_m = route_cost_m(route_bfs)
bfs_explored = getattr(solution_bfs, "explored", "N/A")

print("=== BFS Results ===")
print(f"Route cost (m):    {bfs_cost_m:.2f}")
print(f"Process time (s):  {bfs_time_s:.6f}")
print(f"Peak memory (B):   {bfs_peak_bytes}")
print(f"Explored nodes:    {bfs_explored}")

=== BFS Results ===
Route cost (m):    10261.36
Process time (s):  1.927099
Peak memory (B):   1635083
Explored nodes:    8926


In [7]:
# --- BFS route visualization ---

print("=== BFS Route Map ===")
bfs_map = draw_route(G, route_bfs)
display(bfs_map)

# Export
bfs_map.save("bfs_route_map.html")

=== BFS Route Map ===


In [8]:
# --- DFS: run, validate, and report metrics ---

solution_dfs, dfs_time_s, dfs_peak_bytes = run_with_metrics(
    lambda: DFS(origin_node, destination_node)
)

route_dfs = list(solution_dfs.result) if getattr(solution_dfs, "result", None) else []

if not route_dfs:
    raise RuntimeError(
        "DFS did not find a route. Try increasing graph_radius_m in the graph-building cell."
    )

dfs_cost_m = route_cost_m(route_dfs)
dfs_explored = getattr(solution_dfs, "explored", "N/A")

print("=== DFS Results ===")
print(f"Route cost (m):    {dfs_cost_m:.2f}")
print(f"Process time (s):  {dfs_time_s:.6f}")
print(f"Peak memory (B):   {dfs_peak_bytes}")
print(f"Explored nodes:    {dfs_explored}")

=== DFS Results ===
Route cost (m):    580416.77
Process time (s):  4.546452
Peak memory (B):   2608450
Explored nodes:    15400


In [9]:
# --- DFS route visualization ---

print("=== DFS Route Map ===")
dfs_map = draw_route(G, route_dfs)
display(dfs_map)

# Export
dfs_map.save("dfs_route_map.html")

=== DFS Route Map ===


In [10]:
# --- Dijkstra: run, validate, and report metrics ---

start_node = origin_node
goal_node = destination_node

# optalgotools Dijkstra expects list of unrelaxed Node objects
unrelaxed_nodes = [Node(graph=G, osmid=osm_id) for osm_id in G.nodes()]

solution_dij, dij_time_s, dij_peak_bytes = run_with_metrics(
    lambda: Dijkstra(start_node, goal_node, unrelaxed_nodes)
)

route_dij = list(solution_dij.result) if getattr(solution_dij, "result", None) else []

if not route_dij:
    raise RuntimeError(
        "Dijkstra did not find a route. Try increasing graph_radius_m in the graph-building cell."
    )

dij_cost_m = route_cost_m(route_dij)
dij_explored = getattr(solution_dij, "explored", "N/A")

print("=== Dijkstra Results ===")
print(f"Route cost (m):    {dij_cost_m:.2f}")
print(f"Process time (s):  {dij_time_s:.6f}")
print(f"Peak memory (B):   {dij_peak_bytes}")
print(f"Explored nodes:    {dij_explored}")

=== Dijkstra Results ===
Route cost (m):    10107.57
Process time (s):  152.642626
Peak memory (B):   156247004
Explored nodes:    10324


In [11]:
# --- Dijkstra route visualization ---

print("=== Dijkstra Route Map ===")
dijkstra_map = draw_route(G, route_dij)
display(dijkstra_map)

# Export
dijkstra_map.save("dijkstra_route_map.html")

=== Dijkstra Route Map ===


In [12]:
# --- Simulated Annealing setup + algorithm ---

# 1) Choose SA initial path (prefer BFS path, then Dijkstra, else NetworkX shortest path)
if route_bfs:
    initial_path_ids = [to_osmid(n) for n in route_bfs]
elif route_dij:
    initial_path_ids = [to_osmid(n) for n in route_dij]
else:
    initial_path_ids = nx.shortest_path(G, source=origin_id, target=destination_id, weight="length")

initial_path_ids = dedupe_consecutive(initial_path_ids)


def propose_neighbor(path_ids, rng, pivot_pool, max_pivot_trials=12):
    """
    Create a neighboring route by replacing a random sub-segment.
    Connectivity is preserved by stitching valid shortest-path segments.
    """
    if len(path_ids) < 4:
        return path_ids

    # Random segment [i ... j]
    i = rng.randint(0, len(path_ids) - 3)
    j = rng.randint(i + 2, len(path_ids) - 1)

    a = path_ids[i]
    b = path_ids[j]
    candidate_segment = None

    # 75%: detour via random pivot node (diversifies search)
    if pivot_pool and rng.random() < 0.75:
        for _ in range(max_pivot_trials):
            m = pivot_pool[rng.randrange(len(pivot_pool))]
            if m in (a, b):
                continue
            try:
                seg1 = nx.shortest_path(G, source=a, target=m, weight="length")
                seg2 = nx.shortest_path(G, source=m, target=b, weight="length")
                candidate_segment = seg1[:-1] + seg2  # avoid duplicate pivot
                break
            except nx.NetworkXNoPath:
                continue

    # 25% (or fallback): direct shortest segment replacement
    if candidate_segment is None:
        try:
            candidate_segment = nx.shortest_path(G, source=a, target=b, weight="length")
        except nx.NetworkXNoPath:
            return path_ids

    candidate_path = path_ids[:i] + candidate_segment + path_ids[j + 1 :]
    candidate_path = dedupe_consecutive(candidate_path)

    # Keep endpoints fixed
    if candidate_path[0] != path_ids[0] or candidate_path[-1] != path_ids[-1]:
        return path_ids

    return candidate_path


def simulated_annealing_route(
    initial_path,
    initial_temp=5000.0,
    cooling=0.985,
    min_temp=1e-3,
    max_iter=350,
    seed=42,
):
    """
    Simulated Annealing over route mutations.
    Returns best route and summary metrics.
    """
    rng = random.Random(seed)

    current_path = initial_path[:]
    current_cost = route_cost_m(current_path)

    best_path = current_path[:]
    best_cost = current_cost

    graph_nodes = list(G.nodes())
    sample_size = min(2000, len(graph_nodes))
    pivot_pool = list(set(rng.sample(graph_nodes, sample_size)) | set(current_path)) if sample_size else []

    temperature = initial_temp
    explored_states = 0
    accepted_moves = 0
    touched_nodes = set(current_path)

    for _ in range(max_iter):
        candidate_path = propose_neighbor(current_path, rng, pivot_pool)
        candidate_cost = route_cost_m(candidate_path)

        explored_states += 1
        touched_nodes.update(candidate_path)

        if not math.isinf(candidate_cost):
            delta = candidate_cost - current_cost
            accept = (delta < 0) or (rng.random() < math.exp(-delta / max(temperature, 1e-12)))

            if accept:
                current_path = candidate_path
                current_cost = candidate_cost
                accepted_moves += 1

                if current_cost < best_cost:
                    best_path = current_path[:]
                    best_cost = current_cost

        temperature *= cooling
        if temperature < min_temp:
            break

    return {
        "route": best_path,
        "cost_m": best_cost,
        "explored_states": explored_states,
        "explored_nodes": len(touched_nodes),
        "accepted_moves": accepted_moves,
        "acceptance_rate": accepted_moves / max(1, explored_states),
    }


print(f"SA initial path cost (m): {route_cost_m(initial_path_ids):.2f}")

SA initial path cost (m): 10261.36


In [13]:
# --- Simulated Annealing: run, validate, and report metrics ---

sa_out, sa_time_s, sa_peak_bytes = run_with_metrics(
    lambda: simulated_annealing_route(
        initial_path=initial_path_ids,   # from previous SA setup cell
        initial_temp=5000.0,
        cooling=0.985,
        max_iter=350,
        seed=42,
    )
)

route_sa = sa_out.get("route", [])
if not route_sa:
    raise RuntimeError("Simulated Annealing did not return a valid route.")

sa_cost_m = float(sa_out["cost_m"])
sa_explored_nodes = int(sa_out["explored_nodes"])
sa_explored_states = int(sa_out["explored_states"])
sa_acceptance_rate = float(sa_out["acceptance_rate"])

print("=== Simulated Annealing Results ===")
print(f"Route cost (m):    {sa_cost_m:.2f}")
print(f"Process time (s):  {sa_time_s:.6f}")
print(f"Peak memory (B):   {sa_peak_bytes}")
print(f"Explored nodes:    {sa_explored_nodes}")
print(f"Explored states:   {sa_explored_states}")
print(f"Acceptance rate:   {sa_acceptance_rate:.3f}")

=== Simulated Annealing Results ===
Route cost (m):    10107.57
Process time (s):  273.026165
Peak memory (B):   6964625
Explored nodes:    8322
Explored states:   350
Acceptance rate:   0.240


In [14]:
# --- SA route visualization (same renderer/style as Dijkstra) ---

def to_node_id(node_or_id):
    """Convert optalgotools Node or int-like value to graph node ID."""
    return int(node_or_id.osmid) if hasattr(node_or_id, "osmid") else int(node_or_id)

# Normalize + clean SA route
route_sa_ids = [to_node_id(x) for x in dedupe_consecutive(route_sa)]
route_sa_ids = [n for n in route_sa_ids if n in G.nodes]  # safety filter

if len(route_sa_ids) < 2:
    raise ValueError("SA route is invalid/too short. Re-run the SA execution cell.")

print("=== Simulated Annealing Route Map ===")
sa_map = draw_route(G, route_sa_ids)
display(sa_map)

# Export
sa_map.save("sa_route_map.html")

=== Simulated Annealing Route Map ===


In [15]:
# --- Comparison table: time vs cost (plus diagnostics) ---

comparison_data = [
    {
        "Algorithm": "BFS",
        "Cost (m)": bfs_cost_m,
        "Process Time (s)": bfs_time_s,
        "Peak Memory (bytes)": bfs_peak_bytes,
        "Explored Nodes": int(getattr(solution_bfs, "explored", 0)),
    },
    {
        "Algorithm": "DFS",
        "Cost (m)": dfs_cost_m,
        "Process Time (s)": dfs_time_s,
        "Peak Memory (bytes)": dfs_peak_bytes,
        "Explored Nodes": int(getattr(solution_dfs, "explored", 0)),
    },
    {
        "Algorithm": "Dijkstra",
        "Cost (m)": dij_cost_m,
        "Process Time (s)": dij_time_s,
        "Peak Memory (bytes)": dij_peak_bytes,
        "Explored Nodes": int(getattr(solution_dij, "explored", 0)),
    },
    {
        "Algorithm": "Simulated Annealing",
        "Cost (m)": sa_cost_m,
        "Process Time (s)": sa_time_s,
        "Peak Memory (bytes)": sa_peak_bytes,
        "Explored Nodes": sa_explored_nodes,
    },
]

comparison_df = pd.DataFrame(comparison_data)

# Display formatting
comparison_df["Cost (m)"] = comparison_df["Cost (m)"].round(2)
comparison_df["Process Time (s)"] = comparison_df["Process Time (s)"].round(6)

# Assignment-required ranking criteria: lower cost + lower time = better
comparison_df["Cost Rank (1=best)"] = (
    comparison_df["Cost (m)"].rank(method="dense", ascending=True).astype(int)
)
comparison_df["Time Rank (1=fastest)"] = (
    comparison_df["Process Time (s)"].rank(method="dense", ascending=True).astype(int)
)
comparison_df["Total Rank Score"] = (
    comparison_df["Cost Rank (1=best)"] + comparison_df["Time Rank (1=fastest)"]
)

# Final ordering
comparison_df = comparison_df.sort_values(
    by=["Total Rank Score", "Cost Rank (1=best)", "Time Rank (1=fastest)", "Algorithm"]
).reset_index(drop=True)

print("=== Algorithm Comparison ===")
comparison_df

=== Algorithm Comparison ===


Unnamed: 0,Algorithm,Cost (m),Process Time (s),Peak Memory (bytes),Explored Nodes,Cost Rank (1=best),Time Rank (1=fastest),Total Rank Score
0,BFS,10261.36,1.927099,1635083,8926,2,1,3
1,Dijkstra,10107.57,152.642626,156247004,10324,1,3,4
2,Simulated Annealing,10107.57,273.026165,6964625,8322,1,4,5
3,DFS,580416.77,4.546452,2608450,15400,3,2,5


## Insights

BFS produced a route of **10,261.36 m** in **0.753430 s**, making it the fastest approach in this experiment. However, its path was slightly longer than the best-cost route. Dijkstra and Simulated Annealing both achieved the minimum cost of **10,107.57 m**, but their runtimes were much higher: **70.814537 s** for Dijkstra and **117.811182 s** for SA. In contrast, DFS generated a very long route of **580,416.77 m** despite a moderate runtime of **3.174872 s**, which indicates weak suitability for shortest-route tasks in large road networks.

Overall, the results highlight a practical trade-off. If speed is the only priority, BFS is attractive; nevertheless, it does not guarantee the shortest path. If route quality (minimum distance) is the priority, Dijkstra is the strongest deterministic option here. SA reached the same best cost, but it required more time due to repeated neighbor generation and acceptance checks during optimization.

In [16]:
# --- Combined map for all algorithms ---

# Backward-compatible POI references
poi_a_name = POI_A["name"] if "POI_A" in globals() else "KFUPM"
poi_b_name = POI_B["name"] if "POI_B" in globals() else "Mark & Save"
poi_a_coords = POI_A["coords"] if "POI_A" in globals() else kfupm
poi_b_coords = POI_B["coords"] if "POI_B" in globals() else mark_and_save

# Use existing midpoint if available
map_center = midpoint if "midpoint" in globals() else (
    (poi_a_coords[0] + poi_b_coords[0]) / 2,
    (poi_a_coords[1] + poi_b_coords[1]) / 2,
)


def route_to_coords(route):
    """
    Convert a route (Node objects and/or IDs) into [(lat, lon), ...] for Folium.
    """
    node_ids = [to_osmid(x) for x in route]
    return [(G.nodes[n]["y"], G.nodes[n]["x"]) for n in node_ids if n in G.nodes]


# Build map
m_all = folium.Map(location=list(map_center), zoom_start=13, tiles="OpenStreetMap")

# Add POI markers
folium.Marker(location=poi_a_coords, popup=poi_a_name, tooltip=poi_a_name).add_to(m_all)
folium.Marker(location=poi_b_coords, popup=poi_b_name, tooltip=poi_b_name).add_to(m_all)

# Route styling
route_styles = {
    "BFS": {"route": route_bfs, "color": "blue", "dash": None},
    "DFS": {"route": route_dfs, "color": "red", "dash": "10,8"},
    "Dijkstra": {"route": route_dij, "color": "green", "dash": None},
    "Simulated Annealing": {"route": route_sa, "color": "purple", "dash": "3,8"},
}

# Add routes as toggleable layers
for algo_name, cfg in route_styles.items():
    coords = route_to_coords(cfg["route"])
    if len(coords) < 2:
        print(f"Skipped {algo_name}: route has insufficient points.")
        continue

    layer = folium.FeatureGroup(name=algo_name, show=True)
    folium.PolyLine(
        locations=coords,
        color=cfg["color"],
        weight=5,
        opacity=0.85,
        dash_array=cfg["dash"],
        tooltip=algo_name,
    ).add_to(layer)
    layer.add_to(m_all)

# Layer toggle UI
folium.LayerControl(collapsed=False).add_to(m_all)

# Static legend
legend_html = """
<div style="
    position: fixed;
    bottom: 40px; left: 40px;
    z-index: 9999;
    background: white;
    border: 2px solid #666;
    border-radius: 8px;
    padding: 10px 12px;
    font-size: 14px;
    line-height: 1.5;
    box-shadow: 0 1px 6px rgba(0,0,0,0.25);
    min-width: 200px;
">
  <div style="font-weight: 700; margin-bottom: 6px;">Route Legend</div>
  <div><span style="display:inline-block;width:24px;border-top:4px solid blue;margin-right:8px;vertical-align:middle;"></span>BFS</div>
  <div><span style="display:inline-block;width:24px;border-top:4px dashed red;margin-right:8px;vertical-align:middle;"></span>DFS</div>
  <div><span style="display:inline-block;width:24px;border-top:4px solid green;margin-right:8px;vertical-align:middle;"></span>Dijkstra</div>
  <div><span style="display:inline-block;width:24px;border-top:4px dotted purple;margin-right:8px;vertical-align:middle;"></span>Simulated Annealing</div>
</div>
"""
m_all.get_root().html.add_child(folium.Element(legend_html))

# Final framing + export
m_all.fit_bounds([poi_a_coords, poi_b_coords])
m_all.save("all_routes_map.html")
m_all