In [1]:
import osmnx as ox
from shapely.geometry import LineString, mapping
import geopandas as gpd
from ipyleaflet import *
import timeit
import ipywidgets as widgets
from queue import *
from ipywidgets import Layout
from IPython.display import display, Image
from haversine import haversine
from sklearn.neighbors import BallTree
import pickle

In [2]:
#Callbacks to set markers to nearest node
def set_nearest_node(marker):
    """
    takes a marker object and updates the nearest 
    node attribute to the node nearest to the markers location.
    
    Parameters:
    ----------
    marker : Marker
             A marker object
    """
    marker.nearest_node = ox.nearest_nodes(graph, marker.location[1], marker.location[0], return_dist=False)
    return

def fix_location(event):
    """
    takes a marker object's event infromation
    and updates the markers location to nearest node.
    
    Parameters:
    ----------
    event : Event
            A marker objects event information 
    """
    global dropdown
    marker = event['owner']
    set_nearest_node(marker)
    try:
        alg = dropdown.value
    except:
        alg = 'Dijkstra'
    if valid(from_marker, to_marker, alg):
        marker.location = graph.nodes[marker.nearest_node]["y"], graph.nodes[marker.nearest_node]["x"]
        time_label.value = "changed location to nearest valid node"
        count_label.value = ""
        distance_label.value = ""
    else:
        old = event['old']
        marker.location = old[0], old[1]
        if alg == "Dijkstra" or alg == "Breadth-first Search":
            time_label.value = "Can't exceed 6000m for " + alg
        elif alg == "A* Search":
            time_label.value = "Can't exceed 15000m for " + alg
        elif alg == "Depth-first Search":
            time_label.value = "Can't exceed 3500m for " + alg
        count_label.value = ""
        distance_label.value = ""
    
def valid(mark, other, alg):
    """
    Returns whether the given algorithm can calculate the
    distance between the given markers based on their distance
    
    Parameters:
    ----------
    mark : Marker
           A marker object
           
    other : Marker
            A marker object
            
    alg : str
          string that represents selected algorithm
    
    Returns:
    -------
    bool
        True if selected algorithm is in its distance range
    """
    if alg == "Unsafe A* Search":
        return True
    
    pair1 = graph.nodes[mark.nearest_node]["y"], graph.nodes[mark.nearest_node]['x']
    pair2 = graph.nodes[other.nearest_node]["y"], graph.nodes[other.nearest_node]['x']
       
    dist = haversine(pair1, pair2, unit='m')
    if (dist >= 15000 and alg == "A* Search"):
        return False
    elif (dist >= 6000 and (alg == "Dijkstra" or alg == "Breadth-first Search")):
        return False
    elif (dist >= 3500 and alg == "Depth-first Search"):
        return False
    
    return True

#"Dijkstra", 'A* Search', 'Unsafe A* Search', 'Breadth-first Search', 'Depth-first Search'

In [3]:
#Pathfinder Algorithms

def min_node(dists, openn):
    """
    Returns node with shortest distance in dists
    iff it is in openn
    
    Parameters:
    ----------
    dists : dic
            A dictionary that stores distance info
            
    openn : set 
            A set containing unexplored nodes
    
    Returns:
    int
        The nearest node to the marker
    """
    n, d = -1, float('inf')
    
    for node in openn:
        dist = dists[node]
        if d > dist:
            n, d = node, dist
    
    return n


def dijkstra(G, source, target, astar = False, unsafe = False):
    """
    Computes the shortest path between a source node and a target node in a
    weighted graph.
    
    Parameters
    ----------
    G : networkx.MultiDiGraph
        The graph in which the shortest path needs to be computed.
    
    source : int
        The source node in the graph.
    
    target : int
        The target node in the graph.
    
    astar : bool, optional
        If True, the A* algorithm is used to compute the shortest path.
        Otherwise, the Dijkstra algorithm is used.
        Defaults to False.
    
    unsafe : bool, optional
        If True, the path is computed using unsafe Heuristic
        function which does not result in shortest path
        Defaults to False.
    
    Returns
    -------
    dict
        A dictionary with the following keys:
        
        - 'distances': A dictionary with the distances from the nodes to start node.
        - 'prev_vertices': A dictionary with the previous vertices in the shortest path.
        - 'count': The number of nodes traversed.
    """
    
    openn = {source}
    closed = set()
    dists = {source: 0}
    prev = {source: source}
    
    if not unsafe:
        def heuristic(node, target, dists):

            to = graph.nodes[target]["y"], graph.nodes[target]["x"],
            fr = graph.nodes[node]["y"], graph.nodes[node]["x"]
            return haversine(to, fr, unit='m')
    else:
        def heuristic(node, target, dists):

            to = graph.nodes[target]["y"], graph.nodes[target]["x"],
            fr = graph.nodes[node]["y"], graph.nodes[node]["x"]
            return haversine(to, fr)*(10000)
    
    if astar:
        f = {source: heuristic(source, target, dists) + dists[source]}
    
    curr = source
    count = 0
    
    while openn:
        
        openn.remove(curr)
        closed.add(curr)
        
        circle = Circle(location = (graph.nodes[curr]["y"], graph.nodes[curr]["x"]),
                        radius = 1,
                        color = node_color.value,
                        fill_color = node_color.value)
        m.add_layer(circle)
        #layer_list.add(circle)
        
        count += 1
        
        for v in G.neighbors(curr):
            
            if v not in closed:
                openn.add(v)
                
                dist = graph[curr][v][0]['length']

                if v not in dists or dists[v] > float(dists[curr] + dist):
                    dists[v] = float(dists[curr] + dist)
                    prev[v] = curr
                
                if astar:
                    new_f = dists[v] + heuristic(v, target, dists)
                    if v not in f or new_f < f[v]:
                        f[v] = new_f
                
        
        if curr == target:
            break
            
        if astar:
            curr = min_node(f, openn)
        else:
            curr = min_node(dists, openn)
            
        if curr == -1:
            break
    
    del openn
    del closed    
    return {'distances': dists, 'prev_vertices': prev,
           'count': count}
            
            
def breadth(G, source, target):
    """
    Computes path between a source node and a target node in a
    weighted graph.
    
    Parameters
    ----------
    G : networkx.MultiDiGraph
        The graph in which the shortest path needs to be computed.
    
    source : int
        The source node in the graph.
    
    target : int
        The target node in the graph.
    
    Returns
    -------
    dict
        A dictionary with the following keys:
        
        - 'distances': A dictionary with the distances from the nodes to start node.
        - 'prev_vertices': A dictionary with the previous vertices in the shortest path.
        - 'count': The number of nodes traversed.
    """
    
    q = Queue()
    q.put(source)
    
    visited = {source: True}
    dists = {source: 0}
    prev = {source: source}
    
    count = 0
    curr = source
    while curr != target:
        curr = q.get()

        circle = Circle(location = (graph.nodes[curr]["y"], graph.nodes[curr]["x"]),
                    radius = 1,
                    color = node_color.value,
                    fill_color = node_color.value)
        m.add_layer(circle)
        #layer_list.add(circle)

        count += 1

        for v in G.neighbors(curr):
            if v not in visited or not visited[v]:
                q.put(v)
                visited[v] = True
                prev[v] = curr

                dist = graph[curr][v][0]['length']
                dists[v] = float(dists[curr] + dist)
    
    del visited
    return {'distances': dists, 'prev_vertices': prev,
               'count': count}

def depth(G, source, target):
    """
    Computes path between a source node and a target node in a
    weighted graph.
    
    Parameters
    ----------
    G : networkx.MultiDiGraph
        The graph in which the shortest path needs to be computed.
    
    source : int
        The source node in the graph.
    
    target : int
        The target node in the graph.
    
    Returns
    -------
    dict
        A dictionary with the following keys:
        
        - 'distances': A dictionary with the distances from the nodes to start node.
        - 'prev_vertices': A dictionary with the previous vertices in the shortest path.
        - 'count': The number of nodes traversed.
    """
    
    s = []
    s.append(source)
    
    visited = set()
    dists = {source: 0}
    prev = {source: source}
    
    count = 0
    curr = source
    while curr != target:
        curr = s.pop()
        circle = Circle(location = (graph.nodes[curr]["y"], graph.nodes[curr]["x"]),
                    radius = 1,
                    color = node_color.value,
                    fill_color = node_color.value)
        m.add_layer(circle)
        #layer_list.add(circle)

        count += 1
        
        if curr not in visited:
            visited.add(curr)
            for v in G.neighbors(curr):
                if v not in visited:
                    s.append(v)
                    prev[v] = curr
                    dist = graph[curr][v][0]['length']
                    dists[v] = float(dists[curr] + dist)
        
    del visited
    return {'distances': dists, 'prev_vertices': prev,
               'count': count}
    


def draw_path(G, source, target, algorithm):
    """
    Draws path, which is determined using given algorithm,
    between source node and target node
    
    Parameters
    ----------
    G : networkx.MultiDiGraph
        The graph in which the shortest path needs to be computed.
    
    source : int
        The source node in the graph.
    
    target : int
        The target node in the graph.
    
    algorithm : str
        The algorithm to use to find path
    
    Returns
    -------
    dict
        A dictionary with the following keys:
        
        - 'distances': A dictionary with the distances from the nodes to start node.
        - 'prev_vertices': A dictionary with the previous vertices in the shortest path.
        - 'count': The number of nodes traversed.
        
    float
        A float representing time spent to calculate path
    
    int
        An integer representing the number of nodes traversed
        by the algorithm
    
    float
        A float representing the distance in meters of the path
    """
    
    start_time = timeit.default_timer()
    
    if source==target:
        alg = {'distances': {source: 0}, 'prev_vertices': {source: source}, 'count': 0}
    if algorithm == "Dijkstra":
        alg = dijkstra(G, source, target)
    elif algorithm == "Fibonacci Heap Dijkstra":
        alg = dijkstra2(G, source, target)
    elif algorithm == "A* Search":
        alg = dijkstra(G, source, target, True)
    elif algorithm == "Breadth-first Search":
        alg = breadth(G, source, target)
    elif algorithm == "Unsafe A* Search":
        alg = dijkstra(G, source, target, True, True)
    elif algorithm == 'Depth-first Search':
        alg = depth(G, source, target)
        
    prev_vertex = alg['prev_vertices']
    
    curr = target
    path = [curr]
    if len(prev_vertex) <= 1:
        return None
    try:
        while curr != source:
            curr = prev_vertex[curr]
            path.append(curr)
    except:
        return None
        
        
    final_path = path[::-1]
#    print(final_path)
    final_distance = alg['distances'][target]
        
    elapsed = (timeit.default_timer() - start_time) * 1000
#     print('>> elapsed time', elapsed, 'ms')
#     print('>> count: ', alg['count'])
#     print('>> distance: ', alg['distances'][target])
    
    shortest_path_points = nodes.loc[final_path]
    path = gpd.GeoDataFrame([LineString(shortest_path_points.geometry.values)], columns=['geometry'])
    path_layer = GeoData(geo_dataframe=path, style={'color': path_color.value, 'weight': 2})
    m.add_layer(path_layer)

    del shortest_path_points
    del final_path
    del path
    
    return elapsed, alg['count'], alg['distances'][target]
                
    
    

In [4]:
#Building map, graph, nodes, edges, etc.

#Global Variables
graph = nodes = edges = center = m = from_marker = to_marker = dropdown = None
base = basemap_to_tiles(basemaps.OpenStreetMap.Mapnik)
loading = True

preloaded = {"toronto": 0, "paris": 1, "new york": 2, "mississauga": 3}

mapLayout=Layout(width='98%', height='690px')
from_marker_style = AwesomeIcon(
                        name='circle',
                        icon_color='white',
                        marker_color='blue',
                        spin=False
                        )

to_marker_style = AwesomeIcon(
                    name='circle',
                    icon_color='white',
                    marker_color='red',
                    spin=False
                    )

color_layout = widgets.Layout(width='240px')

path_color = widgets.ColorPicker(
                concise=False,
                description='Path Colour:',
                value='black',
                disabled=False,
                layout = color_layout
                )

path_control = WidgetControl(widget=path_color, position='bottomright')

node_color = widgets.ColorPicker(
                concise=False,
                description='Node Colour:',
                value='red',
                disabled=False,
                layout = color_layout
                )

node_control = WidgetControl(widget=node_color, position='bottomright')

time_label = widgets.Label(value="")
count_label = widgets.Label(value="")
distance_label = widgets.Label(value="")

def loadmap(wdgt):
    """
    Draws interface which includes map, buttons, and outputs
    in Hbox and sets up callback functions used as the
    backend of the interactive map
    
    Parameters
    ----------
    wdgt : Textfield
        textfield to input city/region
    
    """
    
    global graph
    global nodes
    global edges
    global center
    global m
    global from_marker
    global to_marker
    global dropdown
        
    loading = widgets.IntProgress(
                value=10,
                min=0,
                max=10,
                description='Loading:',
                bar_style='', # 'success', 'info', 'warning', 'danger' or ''
                style={'bar_color': 'maroon'},
                orientation='horizontal'
                )
    
    box.children = [box.children[0], loading]
    

    try:
        
        if graph is not None:
            del graph
            del nodes
            del edges
            del center
            del m
            del from_marker
            del to_marker
        
        #Loading graph
        
        if (wdgt.value.lower() in preloaded):
            
            if preloaded[wdgt.value.lower()] == 0:
                graph = pickle.load(open("data/Toronto.pickle", "rb"))
            elif preloaded[wdgt.value.lower()] == 1:
                graph = pickle.load(open("data/Paris.pickle", "rb"))
            elif preloaded[wdgt.value.lower()] == 2:
                graph = pickle.load(open("data/New_York.pickle", "rb"))
            elif preloaded[wdgt.value.lower()] == 3:
                graph = pickle.load(open("data/Mississauga.pickle", "rb"))
            
        else:  
            graph = ox.graph_from_place(wdgt.value, network_type = 'drive')
        nodes, edges = ox.graph_to_gdfs(graph)
    
        #Setting up map
        center = graph.nodes[nodes.index[0]]["y"], graph.nodes[nodes.index[0]]["x"]
        m = Map(center = center, 
                basemap = basemaps.OpenStreetMap.Mapnik,
                zoom = 18,
                max_zoom = 18,
                min_zoom = 8,
                scroll_wheel_zoom = True, 
                layout = mapLayout)
    
        #Setting up markers
        from_marker = Marker(location=center, icon=from_marker_style)
        to_marker = Marker(location=center, icon=to_marker_style)
        from_marker.observe(lambda event: fix_location(event), 'location')
        to_marker.observe(lambda event: fix_location(event), 'location')
        m.add_layer(from_marker)
        m.add_layer(to_marker)
        set_nearest_node(from_marker)
        set_nearest_node(to_marker)
    
        #Setting up map widgets and interactivity
               
        dropdown = widgets.Dropdown(
                    options=["Dijkstra", 'A* Search', 'Unsafe A* Search', 'Breadth-first Search', 'Depth-first Search'],
                    value="Dijkstra",
                    description='Algorithm:',
                    tooltip='Select algorithm to visualize',
                    disabled=False,
                    )

        dropdown_control = WidgetControl(widget=dropdown, position='topright')


        layout = widgets.Layout(width='100px', height='30px')

        button = widgets.Button(
                description='Visualize',
                disabled=False,
                button_style='', # 'success', 'info', 'warning', 'danger' or ''
                tooltip='Click to visualize selected pathfinding algorithm',
                icon='play', # (FontAwesome names without the `fa-` prefix)
                layout = layout
                )

        def button_action(arg):
            if not valid(from_marker, to_marker, dropdown.value):
                to_marker.location = from_marker.location
                if dropdown.value == "Dijkstra" or dropdown.value == "Breadth-first Search":
                    time_label.value = "Can't exceed 6000m for " + dropdown.value
                elif dropdown.value == "A* Search":
                    time_label.value = "Can't exceed 15000m for " + dropdown.value
                elif dropdown.value == "Depth-first Search":
                    time_label.value = "Can't exceed 3500m for " + dropdown.value
                count_label.value = ""
                distance_label.value = ""
                return
            
            
            x = draw_path(graph, from_marker.nearest_node, to_marker.nearest_node, dropdown.value)
            if x is not None:
                time_label.value = "elapsed time: " + str(x[0]) + ' ms'
                count_label.value = "nodes traversed: " + str(x[1])
                distance_label.value = "total path distance: " + str(x[2]) + " m"
            
            else:
                time_label.value = "No path found"
                count_label.value = ""
                distance_label.value = ""
                


        button.on_click(button_action)
        button_control = WidgetControl(widget=button, position='topright')

        clear = widgets.Button(
                description='Clear Map',
                disabled=False,
                button_style='', # 'success', 'info', 'warning', 'danger' or ''
                tooltip='Clear nodes and path from map',
                icon='trash', # (FontAwesome names without the `fa-` prefix)
                layout = layout
                )

        def clear_action(arg):
            m.clear_layers()
            m.add_layer(from_marker)
            m.add_layer(to_marker)
            m.add_layer(base)
            


        clear.on_click(clear_action)
        clear_control = WidgetControl(widget=clear, position='topright')

        m.add_control(dropdown_control)
        m.add_control(button_control)
        m.add_control(clear_control)
        m.add_control(path_control)
        m.add_control(node_control)

        box.children = [box.children[0], widgets.Label(value="Total Nodes: " + str(len(nodes))),
                        widgets.Label(value="Total Edges: " + str(len(edges))), 
                        m, time_label, count_label, distance_label]

    except:
        box.children = [box.children[0], widgets.Label(value="Couldn't find location try again (e.g. Monaco)")]



<h1><center>Pathfinder Visualizer</center></h1>

In [5]:
#Textfield and Hbox setup

regionInput = widgets.Text(
                value = 'Type an address and hit enter (e.g. Monaco)',
                placeholder = 'Type a Location',
                description = 'Location:',
                disabled = False,
                )   

regionInput.on_submit(loadmap)

box_layout = widgets.Layout(display='flex',
                flex_flow='column',
                width='100%',
                height='100%')

box = widgets.HBox([regionInput], layout=box_layout)
display(box)

HBox(children=(Text(value='Type an address and hit enter (e.g. Monaco)', description='Location:', placeholderâ€¦

<a href='https://github.com/SyedTahaA/PathFinderOnMap'>Instructions/Code</a>