In [1]:
import osmnx as ox
import networkx as nx
from heapq import heappop, heappush
from math import sqrt
import ipyleaflet
from ipyleaflet import Map, Marker, Polyline, LayerGroup, FullScreenControl, basemaps
import ipywidgets as widgets
from IPython.display import display
import time

# Define the area of interest
place = 'Manhattan, New York, USA'
G = ox.graph_from_place(place, network_type='drive')

# Create an ipyleaflet map for interactive pin placement and visualization
m_ipy = Map(center=(40.748817, -73.985428), zoom=13, basemap=basemaps.CartoDB.DarkMatter)
m_ipy.add_control(FullScreenControl())

# Initialize variables to store start and end coordinates
start_coords = None
end_coords = None
markers = LayerGroup(name='Markers')
m_ipy.add_layer(markers)

# Event handler for map clicks
def handle_map_click(**kwargs):
    global start_coords, end_coords
    if kwargs.get('type') == 'click':
        coords = kwargs.get('coordinates')
        if start_coords is None:
            start_coords = coords
            marker = Marker(location=start_coords, draggable=False, title='Start')
            markers.add_layer(marker)
        elif end_coords is None:
            end_coords = coords
            marker = Marker(location=end_coords, draggable=False, title='End')
            markers.add_layer(marker)
        else:
            print("Both start and end coordinates are set")

m_ipy.on_interaction(handle_map_click)

display(m_ipy)

# Define the Euclidean heuristic function
def euclidean_heuristic(node1, node2):
    lat1, lon1 = G.nodes[node1]['y'], G.nodes[node1]['x']
    lat2, lon2 = G.nodes[node2]['y'], G.nodes[node2]['x']
    return sqrt((lat1 - lat2)**2 + (lon1 - lon2)**2)

# Define the enhanced heuristic function
def enhanced_heuristic(node, end, initial_distance):
    current_distance = euclidean_heuristic(node, end)
    penalty_factor = 1000  # Adjust this factor to control the penalty strength
    
    # Penalize paths that increase distance significantly
    if current_distance > initial_distance:
        return current_distance * penalty_factor
    else:
        return current_distance

# Define the bidirectional A* search algorithm
def bidirectional_a_star_search(G, start, end):
    # Initialize forward search
    open_set_start = []
    initial_distance = euclidean_heuristic(start, end)
    heappush(open_set_start, (0, start))
    came_from_start = {}
    g_score_start = {node: float('inf') for node in G.nodes}
    g_score_start[start] = 0
    f_score_start = {node: float('inf') for node in G.nodes}
    f_score_start[start] = enhanced_heuristic(start, end, initial_distance)
    open_set_hash_start = {start}

    # Initialize backward search
    open_set_end = []
    heappush(open_set_end, (0, end))
    came_from_end = {}
    g_score_end = {node: float('inf') for node in G.nodes}
    g_score_end[end] = 0
    f_score_end = {node: float('inf') for node in G.nodes}
    f_score_end[end] = enhanced_heuristic(end, start, initial_distance)
    open_set_hash_end = {end}

    explored_edges = []
    meeting_node = None

    while open_set_start and open_set_end:
        # Expand from start node
        current_start = heappop(open_set_start)[1]
        open_set_hash_start.remove(current_start)

        if current_start in open_set_hash_end:
            meeting_node = current_start
            break

        for neighbor in G.neighbors(current_start):
            tentative_g_score = g_score_start[current_start] + G[current_start][neighbor][0]['length']
            if tentative_g_score < g_score_start[neighbor]:
                came_from_start[neighbor] = current_start
                g_score_start[neighbor] = tentative_g_score
                f_score_start[neighbor] = g_score_start[neighbor] + enhanced_heuristic(neighbor, end, initial_distance)
                if neighbor not in open_set_hash_start:
                    heappush(open_set_start, (f_score_start[neighbor], neighbor))
                    open_set_hash_start.add(neighbor)
                    explored_edges.append((current_start, neighbor))

        # Expand from end node
        current_end = heappop(open_set_end)[1]
        open_set_hash_end.remove(current_end)

        if current_end in open_set_hash_start:
            meeting_node = current_end
            break

        for neighbor in G.neighbors(current_end):
            tentative_g_score = g_score_end[current_end] + G[current_end][neighbor][0]['length']
            if tentative_g_score < g_score_end[neighbor]:
                came_from_end[neighbor] = current_end
                g_score_end[neighbor] = tentative_g_score
                f_score_end[neighbor] = g_score_end[neighbor] + enhanced_heuristic(neighbor, start, initial_distance)
                if neighbor not in open_set_hash_end:
                    heappush(open_set_end, (f_score_end[neighbor], neighbor))
                    open_set_hash_end.add(neighbor)
                    explored_edges.append((current_end, neighbor))

    if meeting_node:
        path_start_to_meeting = reconstruct_path(came_from_start, meeting_node)
        path_meeting_to_end = reconstruct_path(came_from_end, meeting_node)[::-1]
        path = path_start_to_meeting + path_meeting_to_end[1:]
        return path, explored_edges, True  # Final result

    return None, explored_edges, False  # No path found

def animate_bidirectional_a_star():
    global start_coords, end_coords
    assert start_coords is not None and end_coords is not None, "Set start and end coordinates on the map."
    start_node = ox.distance.nearest_nodes(G, start_coords[1], start_coords[0])
    end_node = ox.distance.nearest_nodes(G, end_coords[1], end_coords[0])

    explored_layer = LayerGroup()
    path_layer = LayerGroup()
    m_ipy.add_layer(explored_layer)
    m_ipy.add_layer(path_layer)

    # Run bidirectional A* search and handle results
    path, new_edges, is_final = bidirectional_a_star_search(G, start_node, end_node)
    if path is None:
        print("No path found")
        return

    # Animate explored edges
    for edge in new_edges:
        line = Polyline(
            locations=[
                (G.nodes[edge[0]]['y'], G.nodes[edge[0]]['x']),
                (G.nodes[edge[1]]['y'], G.nodes[edge[1]]['x'])
            ],
            color='blue',
            opacity=0.5
        )
        explored_layer.add_layer(line)
        time.sleep(0.1)  # Adjust delay for animation speed

    # Display the final path
    if path:
        path_coords = [(G.nodes[node]['y'], G.nodes[node]['x']) for node in path]
        final_polyline = Polyline(locations=path_coords, color='yellow', weight=5)
        path_layer.add_layer(final_polyline)

        # Handle deprecated `route_to_gdf`
        try:
            route_gdf = ox.routing.route_to_gdf(G, path)
            path_length = route_gdf['length'].sum()
            print(f'Path Length: {path_length:.2f} meters')
        except Exception as e:
            print(f'Error generating route GeoDataFrame: {e}')

    time.sleep(0.1)  # Adjust delay for animation speed

def reconstruct_path(came_from, current):
    total_path = [current]
    while current in came_from:
        current = came_from[current]
        total_path.append(current)
    return total_path[::-1]

# Button to trigger Bidirectional A* visualization
button = widgets.Button(description="Run Animated Bidirectional A* Search")
button.on_click(lambda x: animate_bidirectional_a_star())
display(button)


Map(center=[40.748817, -73.985428], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title'…

Button(description='Run Animated Bidirectional A* Search', style=ButtonStyle())

Error generating route GeoDataFrame: 42427787
