In [1]:
import json
import os

import heapq
import networkx as nx
import osmnx as ox
import numpy as np

MAX_DEPTH = 100                          # Maximum depth for search (BFS)
MIN_STARTING_NODE_GRADE = 0.04           # Minimum gradient for starting node edges (e.g., 4%)
MAX_POSSIBLE_EDGE_GRADIENT = 0.15        # Maximum possible edge gradient for pruning (e.g., 15%)
EARLY_TERMINATION_GRADIENT_FACTOR = 0.66 # For pruning if cumulative average gradient falls below certain level
MAX_CANDIDATE_ROUTES = 75000             # Maximum number of candidate routes to find
FLAT_SECTION_MAX_LENGTH_FACTOR = 0.10    # Maximum allowed length of flat sections as a proportion of min_length
FLAT_SECTION_GRADIENT_THRESHOLD = 0.00   # Gradient below which a section is considered "flat"

def find_candidate_routes(
    graph,
    *,
    min_gradient,
    min_length,
    max_length,
    max_depth,
    early_termination_gradient_factor,
    min_starting_node_grade,
    flat_section_max_length_factor,
    flat_section_gradient_threshold,
    max_possible_edge_gradient,
    max_candidate_routes
):
    candidate_routes = []
    
    # Compute the maximum flat section length based on min_length
    max_flat_section_length = min_length * flat_section_max_length_factor

    # Preprocess edge data into a flat dictionary (for faster lookup)
    # Exclude edges with grade greater than something crazy
    edge_data_dict = {
        (u, v): data
        for u, v, data in graph.edges(data=True)
        if data.get('grade', 0) < 0.4
    }

    # Calculate termination gradient
    termination_gradient = min_gradient * early_termination_gradient_factor
    
    # Identify starting nodes connected to steep(ish) edges
    starting_nodes = set()
    for (u, v), data in edge_data_dict.items():
        edge_gradient = data.get('grade', 0)
        if edge_gradient >= min_starting_node_grade:
            starting_nodes.add(u)

    print(f"Starting nodes: {len(starting_nodes)} (total: {graph.number_of_nodes()})")

    # Using a priority queue to prioritize paths with longer lengths
    queue = []
    for start_node in starting_nodes:
        # (negative length, current_node, path, length, elevation_gain, flat_section_length)
        heapq.heappush(queue, (-0, start_node, [start_node], 0, 0, 0))
    
    while queue:
        neg_length, current_node, path, length, elevation_gain, flat_section_length = heapq.heappop(queue)
        current_depth = len(path)

        if current_depth > max_depth or length > max_length:
            continue
        
        for neighbor in graph.neighbors(current_node):
            if neighbor in path:
                continue

            # Try both (u, v) and (v, u) in case of undirected graph
            edge_data = edge_data_dict.get((current_node, neighbor)) or edge_data_dict.get((neighbor, current_node))
            if edge_data is None:
                continue
            
            edge_length = edge_data.get('length', 0)
            edge_gradient = edge_data.get('grade', 0)

            elevation_diff = edge_length * edge_gradient  # Calculated elevation gain (can be negative)

            new_length = length + edge_length
            new_elevation_gain = elevation_gain + elevation_diff
            new_avg_gradient = new_elevation_gain / new_length if new_length > 0 else 0

            # Update cumulative flat/downhill section length
            new_flat_section_length = flat_section_length
            if edge_gradient < flat_section_gradient_threshold:
                new_flat_section_length += edge_length

            # Exceeds allowed length of flat/downhill sections
            if new_flat_section_length > max_flat_section_length:
                continue

            # Early termination based on cumulative gradient
            if new_avg_gradient < termination_gradient:
                continue
            
            # Prune paths that cannot meet the minimum average gradient
            # Estimate the maximum possible elevation gain achievable
            remaining_edges = max_depth - current_depth
            average_edge_length = new_length / current_depth if current_depth > 0 else edge_length
            remaining_possible_gain = remaining_edges * average_edge_length * max_possible_edge_gradient
            max_possible_elevation_gain = new_elevation_gain + remaining_possible_gain
            max_possible_avg_gradient = max_possible_elevation_gain / new_length if new_length > 0 else 0

            # Cannot meet minimum average gradient
            if max_possible_avg_gradient < min_gradient:
                continue  
            
            new_path = path + [neighbor]

            # Add path to candidate_routes if it meets constraints
            if new_length >= min_length and new_avg_gradient >= min_gradient:
                candidate_routes.append((new_path, new_length, new_avg_gradient))
                if len(candidate_routes) >= max_candidate_routes:
                    print(f"Reached the maximum number of candidate routes ({max_candidate_routes}).")
                    return candidate_routes

            # Continue expanding the path, prioritizing by negative length to maximize route length
            heapq.heappush(queue, (
                -(new_length),
                neighbor,
                new_path,
                new_length,
                new_elevation_gain,
                new_flat_section_length,
            ))

    return candidate_routes

def dedupe_routes(candidate_routes):
    # Sort routes by length (longest first)
    candidate_routes.sort(key=lambda x: x[1], reverse=True)

    final_routes = []
    used_edges = set()

    for path, length, avg_gradient in candidate_routes:
        # Check if any edge in the path has already been used
        route_edges = [(u, v) for u, v in zip(path[:-1], path[1:])]
        if any(edge in used_edges or (edge[1], edge[0]) in used_edges for edge in route_edges):
            continue  # Skip this route as it overlaps with an assigned route

        # Assign the route and mark its edges as used
        final_routes.append((path, length, avg_gradient))
        for edge in route_edges:
            used_edges.add(edge)
            used_edges.add((edge[1], edge[0]))  # Add both directions if the graph is undirected

    return final_routes

def get_route_name(path, graph):
    street_names = []
    last_name = None

    for u, v in zip(path[:-1], path[1:]):
        edge_data = graph.get_edge_data(u, v, 0)
        names = edge_data.get('name', None)

        if names:
            if isinstance(names, list):
                names = [name.strip() for name in names if name.strip()]
                street_name = ', '.join(names)
            else:
                street_name = str(names).strip()

            if street_name and street_name != last_name:
                street_names.append(street_name)
                last_name = street_name

    return ' - '.join(street_names) if street_names else 'Unknown Streets'

# Load graph from file
graph = ox.load_graphml("../data/places/asheville.graphml")

# Get bounding box of the graph by computing min/max of node coordinates
latitudes = [data['y'] for _, data in graph.nodes(data=True)]
longitudes = [data['x'] for _, data in graph.nodes(data=True)]

bbox = {
    'min_lat': min(latitudes),
    'max_lat': max(latitudes),
    'min_lng': min(longitudes),
    'max_lng': max(longitudes)
}

# Define minimum gradients (from 5% to 10%)
min_gradients = [0.05, 0.06, 0.07, 0.08, 0.09, 0.10]

# Define distance ranges in meters
distance_ranges = [
    (1609.34, 3218.68),        # 1-2 miles
    (3218.68, 4828.02),        # 2-3 miles
    (4828.02, 8046.70),        # 3-5 miles
    (8046.70, 16093.40),       # 5-10 miles
]

# Initialize a list to hold all results
all_results = []

# Loop through each combination of min_gradient and distance_range
for min_gradient in min_gradients:
    for min_length, max_length in distance_ranges:
        print(f"\nProcessing min_gradient={min_gradient*100:.1f}%, distance range={min_length:.1f}m to {max_length if max_length != float('inf') else 'infinity'}m")
        
        # Find all candidate routes
        candidate_routes = find_candidate_routes(
            graph,
            min_gradient=min_gradient,
            min_length=min_length,
            max_length=max_length,
            max_depth=MAX_DEPTH,
            early_termination_gradient_factor=EARLY_TERMINATION_GRADIENT_FACTOR,
            min_starting_node_grade=MIN_STARTING_NODE_GRADE,
            flat_section_max_length_factor=FLAT_SECTION_MAX_LENGTH_FACTOR,
            flat_section_gradient_threshold=FLAT_SECTION_GRADIENT_THRESHOLD,
            max_possible_edge_gradient=MAX_POSSIBLE_EDGE_GRADIENT,
            max_candidate_routes=MAX_CANDIDATE_ROUTES
        )
        print(f"Found {len(candidate_routes)} candidate routes.")
        
        # Deduplicate routes (remove overlaps)
        final_routes = dedupe_routes(candidate_routes)
        print(f"Finalized {len(final_routes)} routes after removing overlaps.")

        # Collect results along with parameters
        routes_info = []
        for idx, (path, length, gradient) in enumerate(final_routes):
            route_name = get_route_name(path, graph)
            segments = []
            for u, v in zip(path[:-1], path[1:]):
                edge_data = graph.get_edge_data(u, v, 0)
                edge_length = edge_data.get('length', 0)
                edge_gradient = edge_data.get('grade', 0)

                u_data = graph.nodes[u]
                v_data = graph.nodes[v]

                segment_info = {
                    'start_node': u,
                    'start_lat': u_data.get('y', None),
                    'start_lng': u_data.get('x', None),
                    'start_elevation': u_data.get('elevation', None),
                    'end_node': v,
                    'end_lat': v_data.get('y', None),
                    'end_lng': v_data.get('x', None),
                    'end_elevation': v_data.get('elevation', None),
                    'edge_length': edge_length,
                    'edge_grade': edge_gradient
                }
                segments.append(segment_info)

            route_info = {
                'route_number': idx + 1,
                'length_m': length,
                'avg_gradient': gradient,
                'route_name': route_name,
                'path': path,
                'segments': segments
            }
            routes_info.append(route_info)
            print(f"{idx + 1}. length = {length:.1f} m, avg gradient = {gradient:.2%}, route name: {route_name}")
        
        # Append to all_results
        result = {
            'parameters': {
                'min_gradient': min_gradient,
                'min_length': min_length,
                'max_length': max_length,
                'max_depth': MAX_DEPTH,
                'early_termination_gradient_factor': EARLY_TERMINATION_GRADIENT_FACTOR,
                'min_starting_node_grade': MIN_STARTING_NODE_GRADE,
                'flat_section_max_length_factor': FLAT_SECTION_MAX_LENGTH_FACTOR,
                'flat_section_gradient_threshold': FLAT_SECTION_GRADIENT_THRESHOLD,
                'max_possible_edge_gradient': MAX_POSSIBLE_EDGE_GRADIENT,
                'max_candidate_routes': MAX_CANDIDATE_ROUTES
            },
            'bbox': bbox,  # Include bounding box information
            'routes': routes_info
        }
        all_results.append(result)

# Output all results to a JSON file
output_file = "../data/results/asheville-routes.json"

# Convert numpy types to Python types for JSON serialization
def convert_types(obj):
    if isinstance(obj, (np.integer, np.int64)):
        return int(obj)
    elif isinstance(obj, (np.float64, np.float32)):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    else:
        return obj

with open(output_file, 'w') as f:
    json.dump(all_results, f, default=convert_types, indent=4)

print(f"\nAll results have been saved to {output_file}")


Processing min_gradient=5.0%, distance range=1609.3m to 3218.68m
Starting nodes: 17247 (total: 38629)
Reached the maximum number of candidate routes (75000).
Found 75000 candidate routes.
Finalized 92 routes after removing overlaps.
1. length = 4644.0 m, avg gradient = 5.26%, route name: Lynwood Road - Griffing Boulevard - North Griffing Boulevard - Patton Mountain Road
2. length = 4432.7 m, avg gradient = 5.65%, route name: Edwin Place - Charlotte Street - Cherokee Road - Orchard Road - Albemarle Road - Baird Street - Oak Park Road - Westview Road - Town Mountain Road
3. length = 4312.9 m, avg gradient = 5.90%, route name: Elk Mountain Scenic Highway
4. length = 4259.5 m, avg gradient = 5.10%, route name: Baird Cove Road - Versant Drive - Starling Pass
5. length = 4190.3 m, avg gradient = 7.09%, route name: Ox Creek Road - Elk Mountain Scenic Highway
6. length = 4158.4 m, avg gradient = 5.75%, route name: Webb Cove Road
7. length = 4002.0 m, avg gradient = 5.62%, route name: Sycamore