In [None]:
import heapq
import pickle

import networkx
import osmnx
from pyrosm import get_data, OSM
from haversine import haversine

Here we import the necessary libraries. Heapq is for the a* search algorithm, pickle allows us to serialize the graph data, pyrosm parses open street map data, and haversine calculates the distance between two coordinates.

In [1]:
data_seattle = get_data("Seattle", directory="/home/alec/Desktop/code/personal_projects/safe-path-finder/src")
osm_seattle = OSM(data_seattle)
nodes, edges = osm_seattle.get_network(nodes=True, network_type="driving", graph_type="networkx")
graph = osm_seattle.to_graph(nodes, edges)
with open("src/graph.pkl", "wb") as out_file:
    pickle.dump(graph, out_file)

with open("src/graph.pkl", "rb") as in_file:
    seattle_graph = pickle.load(in_file)

NameError: name 'get_data' is not defined

Here we get the data for the seattle map and convert the data to a graph. Lastly we save the graph to a pickle file for later use. Then we load the pickle file as seattle_graph.

In [None]:
def a_star_search(seattle_graph, start, end):
    """
    parameters
    seattle_graph: graph data for Seattle
    start: starting vertex in the graph
    end: ending/target vertex in the graph
    return: path through the graph
    """
    entry = 1
    minheap = [(0, 0, start, 0)] # add first item
    distances = {start: (0, None)} # also the visited set
    end_lat, end_lon = end['lat'], end['lon']

    # need to create node object with parents to find path
    while minheap:
        _, _, curr_node, curr_dist = heapq.heappop(minheap)

        if curr_node == end:
            break

        for out_edge_idx in seattle_graph.incident(curr_node, mode="out"):
            out_edge = seattle_graph.es[out_edge_idx]
            new_dist = curr_dist + out_edge["length"]
            neighbor = seattle_graph.vs.find(id=out_edge["v"])
            dist_to_end = haversine((neighbor['lat'], neighbor['lon']), (end_lat, end_lon)) * 1000 # kilometers to meters

            if neighbor not in distances or new_dist < distances[neighbor][0]:
                distances[neighbor] = (new_dist, curr_node)
            else: # visited
                continue
            
            weight = 2 * (new_dist + dist_to_end)
            heapq.heappush(minheap, (weight, entry, neighbor, new_dist))
            entry += 1


    path = []
    curr = distances[end][1]
    while curr is not None:
        path.append(curr)
        curr = distances[curr][1]

    return path[::-1]


Here is my implementation of the a* search algorithm to find the shortest path between two points in the city of Seattle. Unfortunately my implementation is too slow to be useful in the web application due to the high level nature of python. Because of this I will use an external library (OSMnx) that has a shortest path function.

In [None]:
source_address = ""
target_address = ""

source = osmnx.geocode(source_address)
target = osmnx.geocode(target_address)

source_node = osmnx.get_nearest_node(seattle_graph, source)
target_node = osmnx.get_nearest_node(seattle_graph, target)

route = networkx.shortest_path(seattle_graph, source_node, target_node, weight="length")

Here is the code for the shortest path algorithm utilizing two libraries for working with open street maps data, networkx and osmnx.