# Alternative Path Testing & Visualization

This notebook tests the new uni_lca-based alternative path algorithm using random edge pairs.

In [1]:
import sys
import os
import random
sys.path.append(os.path.abspath(".."))

from pyengine import Router
from pyengine.algorithms import query_uni_lca, query_alternative
from pyengine.core import expand_path
import folium

## Initialize Router

In [2]:
db_path = "../../../data/Burnaby.db"
router = Router(db_path)
print("Router initialized and data loaded.")
print(f"Total edges in fwd_adj: {len(router.data.fwd_adj)}")

Router initialized and data loaded.
Total edges in fwd_adj: 35217


## Select Random Source and Target Edges

In [3]:
# Get all edge IDs from the adjacency list
all_edges = list(router.data.fwd_adj.keys())
print(f"Total edges available: {len(all_edges)}")

# Select random source and target
random.seed(92)  # For reproducibility, change or remove for different pairs


Total edges available: 35217


In [28]:
router.data.fwd_adj.get(16115)[0]

Shortcut(from_edge=16115, to_edge=4075, cost=29.020748138427734, via_edge=4060, cell=613208529142022143, inside=0, lca_res=8)

In [14]:
source_edge = random.choice(all_edges)
target_edge = random.choice(all_edges)

# Make sure they're different
while target_edge == source_edge:
    target_edge = random.choice(all_edges)

print(f"Source edge: {source_edge}")
print(f"Target edge: {target_edge}")

Source edge: 16115
Target edge: 31960


## Test Shortest Path + Alternative Path (Direct API)

In [15]:
# Run shortest path using uni_lca directly
sp_cost, sp_path, sp_success = query_uni_lca(source_edge, target_edge, router.data)
print(f"Shortest Path: cost={sp_cost:.2f}, edges={len(sp_path)}, success={sp_success}")

if sp_success:
    sp_expanded = expand_path(sp_path, router.data.via_lookup)
    print(f"Expanded path: {len(sp_expanded)} base edges")

Shortest Path: cost=109.48, edges=6, success=True
Expanded path: 50 base edges


In [16]:
# Run alternative path with different penalty factors
if sp_success:
    print("\nTesting different penalty factors:")
    print("="*60)
    
    for pf in [1.5, 2.0, 3.0, 5.0, 10.0, 50.0, 100.0]:
        alt_cost, alt_path, alt_success = query_alternative(
            source_edge, target_edge, router.data,
            penalty_factor=pf,
            shortest_path_expanded=sp_expanded
        )
        
        if alt_success:
            alt_expanded = expand_path(alt_path, router.data.via_lookup)
            overlap = len(set(sp_expanded) & set(alt_expanded))
            overlap_pct = 100 * overlap / len(sp_expanded) if sp_expanded else 0
            cost_pct = 100 * (alt_cost / sp_cost - 1)
            print(f"penalty={pf:5.1f}: overlap={overlap_pct:5.1f}%, cost_diff={cost_pct:+6.1f}%, alt_edges={len(alt_expanded)}")
        else:
            print(f"penalty={pf:5.1f}: FAILED")


Testing different penalty factors:
penalty=  1.5: overlap= 46.0%, cost_diff=  +6.7%, alt_edges=69
penalty=  2.0: overlap= 46.0%, cost_diff=  +6.7%, alt_edges=69
penalty=  3.0: overlap= 46.0%, cost_diff=  +6.7%, alt_edges=69
penalty=  5.0: overlap= 46.0%, cost_diff=  +6.7%, alt_edges=69
penalty= 10.0: overlap= 46.0%, cost_diff=  +6.7%, alt_edges=69
penalty= 50.0: overlap= 46.0%, cost_diff=  +6.7%, alt_edges=69
penalty=100.0: overlap= 46.0%, cost_diff=  +6.7%, alt_edges=69


In [17]:
sp_path

[16115, 3990, 18067, 24819, 18077, 31960]

In [18]:
sp_expanded

[16115,
 16113,
 4047,
 4048,
 16111,
 4055,
 15965,
 4062,
 15959,
 16716,
 4071,
 15953,
 4076,
 15950,
 3997,
 3990,
 3998,
 4001,
 4004,
 4007,
 4010,
 4013,
 4016,
 18386,
 22028,
 22080,
 33693,
 21104,
 2071,
 21107,
 16009,
 22014,
 24827,
 29895,
 7321,
 33799,
 29927,
 29923,
 33803,
 24816,
 18067,
 24819,
 24822,
 24826,
 18077,
 18829,
 31951,
 31953,
 31955,
 31960]

In [19]:
alt_path

[16115, 24197, 22107, 18036, 31962, 31960]

In [20]:
alt_expanded = expand_path(alt_path, router.data.via_lookup)
alt_expanded

[16115,
 16112,
 4039,
 4041,
 19310,
 3988,
 16120,
 31106,
 24197,
 3971,
 17807,
 31104,
 14488,
 8719,
 3406,
 14487,
 14484,
 26749,
 26745,
 23235,
 26744,
 26741,
 22104,
 3474,
 16189,
 22101,
 2120,
 22097,
 3508,
 19376,
 22095,
 22093,
 22042,
 7250,
 22110,
 22107,
 2092,
 2089,
 2086,
 2083,
 2081,
 2076,
 2074,
 2071,
 21107,
 16009,
 22014,
 24827,
 29895,
 7321,
 33799,
 29927,
 29923,
 33803,
 24816,
 18067,
 24819,
 24824,
 18036,
 24822,
 24826,
 18077,
 18829,
 31951,
 31953,
 31955,
 31959,
 31962,
 31960]

## Visualize Paths on Map

In [21]:
def get_edge_coords(edge_id, router):
    """Get list of (lat, lng) coordinates for an edge."""
    query = f"SELECT geometry FROM edges WHERE id = {edge_id}"
    result = router.con.execute(query).fetchone()
    if result and result[0]:
        wkt = result[0]
        coords_str = wkt.replace('LINESTRING (', '').replace(')', '')
        coords = []
        for pair in coords_str.split(', '):
            parts = pair.strip().split(' ')
            if len(parts) >= 2:
                lng, lat = float(parts[0]), float(parts[1])
                coords.append((lat, lng))
        return coords
    return []

def get_path_coords(path, router):
    """Get all coordinates for a path of edge IDs."""
    all_coords = []
    for edge_id in path:
        coords = get_edge_coords(edge_id, router)
        all_coords.extend(coords)
    return all_coords

def get_edge_center(edge_id, router):
    """Get center point of an edge."""
    coords = get_edge_coords(edge_id, router)
    if coords:
        mid = len(coords) // 2
        return coords[mid]
    return None

In [22]:
# Best alternative with high penalty
best_pf = 10.0
alt_cost, alt_path, alt_success = query_alternative(
    source_edge, target_edge, router.data,
    penalty_factor=best_pf,
    shortest_path_expanded=sp_expanded
)

if sp_success:
    # Get center for map
    src_center = get_edge_center(source_edge, router)
    tgt_center = get_edge_center(target_edge, router)
    
    if src_center and tgt_center:
        center_lat = (src_center[0] + tgt_center[0]) / 2
        center_lng = (src_center[1] + tgt_center[1]) / 2
        m = folium.Map(location=[center_lat, center_lng], zoom_start=14)
        
        # Shortest path (blue)
        sp_coords = get_path_coords(sp_expanded, router)
        if sp_coords:
            folium.PolyLine(
                sp_coords, 
                color='blue', 
                weight=5, 
                opacity=0.8,
                popup=f"Shortest: {sp_cost:.1f}m"
            ).add_to(m)
        
        # Alternative path (red)
        if alt_success:
            alt_expanded = expand_path(alt_path, router.data.via_lookup)
            alt_coords = get_path_coords(alt_expanded, router)
            if alt_coords:
                folium.PolyLine(
                    alt_coords, 
                    color='red', 
                    weight=4, 
                    opacity=0.7,
                    dash_array='10',
                    popup=f"Alternative: {alt_cost:.1f}m"
                ).add_to(m)
        
        # Markers
        folium.Marker(src_center, popup=f'Start: edge {source_edge}', icon=folium.Icon(color='green')).add_to(m)
        folium.Marker(tgt_center, popup=f'End: edge {target_edge}', icon=folium.Icon(color='red')).add_to(m)
        
        display(m)

    else:
        print("Could not get edge coordinates for visualization")
else:
    print("Shortest path failed")

## Test Multiple Random Pairs

In [13]:
# Test 10 random pairs
print("Testing 10 random source/target pairs with penalty_factor=5.0:")
print("="*70)

random.seed(123)
successes = 0
total_overlap_pct = 0

for i in range(10):
    src = random.choice(all_edges)
    tgt = random.choice(all_edges)
    while tgt == src:
        tgt = random.choice(all_edges)
    
    sp_cost, sp_path, sp_ok = query_uni_lca(src, tgt, router.data)
    if sp_ok:
        sp_exp = expand_path(sp_path, router.data.via_lookup)
        alt_cost, alt_path, alt_ok = query_alternative(src, tgt, router.data, penalty_factor=5.0, shortest_path_expanded=sp_exp)
        
        if alt_ok:
            alt_exp = expand_path(alt_path, router.data.via_lookup)
            overlap = len(set(sp_exp) & set(alt_exp))
            overlap_pct = 100 * overlap / len(sp_exp) if sp_exp else 0
            cost_pct = 100 * (alt_cost / sp_cost - 1)
            print(f"{i+1:2}. src={src:6}, tgt={tgt:6}: sp_edges={len(sp_exp):3}, overlap={overlap_pct:5.1f}%, cost_diff={cost_pct:+5.1f}%")
            successes += 1
            total_overlap_pct += overlap_pct
        else:
            print(f"{i+1:2}. src={src:6}, tgt={tgt:6}: Alt FAILED")
    else:
        print(f"{i+1:2}. src={src:6}, tgt={tgt:6}: SP FAILED")

if successes > 0:
    print(f"\nAverage overlap: {total_overlap_pct/successes:.1f}%")

Testing 10 random source/target pairs with penalty_factor=5.0:
 1. src=  6430, tgt=  2394: sp_edges= 53, overlap= 18.9%, cost_diff=+10.0%
 2. src= 16114, tgt=  5589: sp_edges=109, overlap= 58.7%, cost_diff=+10.0%
 3. src= 10466, tgt= 26523: sp_edges=167, overlap= 10.2%, cost_diff= +9.0%
 4. src= 18485, tgt= 18815: sp_edges= 50, overlap= 54.0%, cost_diff=+60.2%
 5. src= 33977, tgt= 33935: sp_edges= 53, overlap=  5.7%, cost_diff=+21.0%
 6. src= 10283, tgt= 14740: sp_edges= 50, overlap= 90.0%, cost_diff=+11.0%
 7. src=  8355, tgt= 16033: sp_edges= 48, overlap= 20.8%, cost_diff=+33.5%
 8. src=  1573, tgt= 16877: sp_edges= 50, overlap= 16.0%, cost_diff=+14.3%
 9. src=  9897, tgt= 33934: sp_edges= 88, overlap= 20.5%, cost_diff=+26.8%
10. src=  6963, tgt= 10387: sp_edges= 55, overlap= 30.9%, cost_diff= +2.0%

Average overlap: 32.6%
