In [480]:
%pip install folium osmium

Note: you may need to restart the kernel to use updated packages.


In [481]:
class Node:

  def __init__(self, name, value=None):
    self.name = name
    self.value = value
    self.domain = set()

  def is_assigned(self):
    return self.value != None
    
  def assign(self, value):
    self.value = value

  def unassign(self):
    self.value = None

  def get_domain(self):
    return self.domain

  def __str__(self):
    return self.name

  def __lt__(self, other):
    return True

In [482]:
class Graph:
  def __init__(self):
    self.adj_list = {}
    self.nodes = {}

  def add_node(self, name, value=None):
    self.nodes[name] = Node(name, value)
    self.adj_list[name] = {}

  def add_vertex(self, name_a, name_b, w=1, directed=False):
    if name_a not in self.adj_list.keys(): self.add_node(name_a)
    if name_b not in self.adj_list.keys(): self.add_node(name_b)
    self.adj_list[name_a][name_b] = w
    if not directed: self.adj_list[name_b][name_a] = w

  def get_nodes(self):
    return set(self.nodes.values())

  def get_node_by_name(self, name):
    return self.nodes[name]

  def get_neighbors_of(self, node_name):
    return set([self.get_node_by_name(neigh_name) for neigh_name in self.adj_list[node_name].keys()])
  
  def get_cost(self, node_a, node_b):
    return self.adj_list[node_a.name][node_b.name]

In [483]:
class GeoNode(Node):
    def __init__(self, id, x, y, lat, lon):
        self.id = id
        self.x = x
        self.y = y
        self.lat = lat
        self.lon = lon
        self.available = True
        self.is_primary = False
        self.n_lanes = 0
        super().__init__(id)
    
    def __str__(self):
        return f"GeoNode {self.id} at pos {self.x, self.y}, coords {self.lat, self.lon}"

In [484]:
import osmium as osm

class OSMHandler(osm.SimpleHandler):
    def __init__(self):
        osm.SimpleHandler.__init__(self)
        self.nodes = []
        self.ways = []
        self.node_highway_values = set()
        self.way_highway_values = set()
        self.way_highway_values_list = []

        #self.allowed_ways = set(['primary_link', 'tertiary', 'primary', 'path', 'secondary', 'trunk', 'corridor', 'pedestrian', 'service', 'raceway', 'construction', 'residential', 'steps', 'trunk_link', 'cycleway', 'tertiary_link', 'motorway', 'footway', None, 'living_street', 'motorway_link', 'secondary_link', 'unclassified', 'road', 'track'])
        #self.allowed_ways = set(['primary_link', 'tertiary', 'primary', 'secondary', 'trunk', 'service', 'raceway', 'construction', 'residential', 'trunk_link', 'tertiary_link', 'motorway', 'living_street', 'motorway_link', 'secondary_link', 'track'])
        #self.allowed_ways = set(['primary_link', 'primary', 'secondary', 'trunk', 'trunk_link', 'tertiary_link' 'secondary_link'])
        #self.allowed_ways = set(['primary_link', 'primary'])
        #self.allowed_ways = set(['primary', 'secondary', 'tertiary', 'trunk','primary_link', 'motorway', 'motorway_link'])
        #https://wiki.openstreetmap.org/wiki/Key:highway?uselang=en#Highway
        self.allowed_ways = set(['primary', 'secondary', 'tertiary', 'trunk','primary_link', 'motorway', 'motorway_link', 'residential'])

    def node(self, node):

        self.node_highway_values.add(node.tags.get('highway'))

        #if(node.tags.get('highway') == 'crossing'):
        self.nodes.append((node.id, node.location.x, node.location.y, node.location.lat, node.location.lon))

    def way(self, way):

        self.way_highway_values.add(way.tags.get('highway'))
        self.way_highway_values_list.append(way.tags.get('highway'))

        if(way.tags.get('highway') in self.allowed_ways):
            
            nodes = []

            for node in way.nodes:
                nodes.append(node)
            
            #is_closed = way.is_closed()
            directed = way.tags.get('oneway') != 'yes'
            is_primary = way.tags.get('highway') != 'residential'
            if way.tags.get('lanes'):
                n_lanes = int(way.tags.get('lanes'))
            else:
                n_lanes = 0
            self.ways.append((nodes, directed, is_primary, n_lanes))

# Haversine Distance

![haversine](haversine.png)

In [485]:
from math import sin, cos, sqrt, atan2, radians

#haversine
def calculate_distance_coords(lat1, lon1, lat2, lon2):

    # Approximate radius of earth in km
    R = 6373.0

    lat1 = radians(lat1)
    lon1 = radians(lon1)
    lat2 = radians(lat2)
    lon2 = radians(lon2)

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    distance = R * c

    return distance

In [486]:
class GeoGraph(Graph):

    def __init__(self, osm_file_path='santo_domingo.osm'):
        
        super().__init__()

        self.osmHandler = OSMHandler()
        self.osmHandler.apply_file(osm_file_path)

        #print('nodes highway: ', self.osmHandler.node_highway_values)
        #print('ways highway: ', self.osmHandler.way_highway_values)
        #print('Qty each way: ')

        #for value in self.osmHandler.way_highway_values:
        #    print(f'for {value} there are: {self.osmHandler.way_highway_values_list.count(value)}')

        #print('intersection: ', self.osmHandler.node_highway_values.intersection(self.osmHandler.way_highway_values))

        for node in self.osmHandler.nodes:
            self.add_geo_node(node[0], node[1], node[2], node[3], node[4])

        for way in self.osmHandler.ways:
            self.add_way(way[0], way[1], way[2], way[3])

        for node in self.get_nodes():
            if len(self.get_neighbors_of(node.name)) == 0:
                self.nodes[node.name].available = False

    def add_geo_node(self, id, x, y, lat, lon):
        self.nodes[id] = GeoNode(id, x, y, lat, lon)
        self.adj_list[id] = {}

    def calculate_euclidian_distance(self, node_name_a, node_name_b):
        node_a: GeoNode = self.get_node_by_name(node_name_a)
        node_b: GeoNode = self.get_node_by_name(node_name_b)

        distance = calculate_distance_coords(node_a.lat, node_a.lon, node_b.lat, node_b.lon)#math.sqrt((node_a.lat - node_b.lat)**2 + (node_a.lon - node_b.lon)**2 )
        return distance

    def add_way(self, nodes, directed, is_primary, n_lanes):

        for i in range(len(nodes)-1):

            name_a = nodes[i].ref
            name_b = nodes[i+1].ref

            if is_primary:
                self.nodes[name_a].is_primary = is_primary
                self.nodes[name_b].is_primary = is_primary

            if n_lanes:
                self.nodes[name_a].n_lanes = n_lanes
                self.nodes[name_b].n_lanes = n_lanes

            distance = self.calculate_euclidian_distance(name_a, name_b)
            
            try:
                self.adj_list[name_a][name_b] = distance
                if directed: self.adj_list[name_b][name_a] = distance
            except KeyError:
                print('node not found')
    
    def get_nearest_geoNode(self, lat, lon):
        
        min_distance = 0.05

        for geoNode in self.get_nodes():
            if geoNode.available:
                distance = calculate_distance_coords(geoNode.lat, geoNode.lon, lat, lon)

                if distance < min_distance:
                    return geoNode

In [487]:
import heapq

class PriorityQueue():
  def __init__(self, data=[]):
    self.data = data
  
  def put(self, value):
    heapq.heappush(self.data, value)

  def get(self):
    return heapq.heappop(self.data)

  def isEmpty(self):
    return len(self.data) == 0
  
  def view(self):
    return heapq.heapify(self.data)

  def putReplace(self, value):
    for data in self.data:
      if value[1] == data[1]:
        if value[0] < data[0]:
          self.data.remove(data)
          self.put(value)
          return True
    return False

In [488]:
def a_star_solver(graph: Graph, initial_node: GeoNode, goal_node: GeoNode, heuristic_func):
    
  frontier = PriorityQueue()
  frontier.put((0, initial_node))

  #Stores the order on which the nodes were visited
  visited = []
  
  #Stores the cost from the start node to the current node
  cost_of_path = {}
  cost_of_path[initial_node] = 0

  parents = {}
  parents[initial_node] = None

  while not frontier.isEmpty():
    value, current_node = frontier.get()

    visited.append(current_node)

    if current_node == goal_node:
      return True, visited, parents,cost_of_path#[goal_node]

    for neighbor in graph.get_neighbors_of(current_node.name):

      neighbor_cost = cost_of_path[current_node] + graph.get_cost(current_node, neighbor)
      nodes_in_frontier = [node[1] for node in frontier.data]

      #if it is not in the frontier and has not been visited add it to the frontier with its value
      if(neighbor not in nodes_in_frontier and neighbor not in visited):
        #Calculate the heuristic cost for this node and add it to the PriorityQueue
        neighbor_node_heuristic_cost = heuristic_func(neighbor, goal_node)
        frontier.put((neighbor_cost + neighbor_node_heuristic_cost, neighbor))
        parents[neighbor] = current_node
        cost_of_path[neighbor] = neighbor_cost
      #elif it is in the frontier but you found a lower value, replace it in the frontier
      elif neighbor in nodes_in_frontier:
        if frontier.putReplace((neighbor_cost, neighbor)):
          parents[neighbor] = current_node
          cost_of_path[neighbor] = neighbor_cost

  return False, visited, parents, cost_of_path#[goal_node]

In [489]:
def heuristic_function(current_node: GeoNode, goal_node: GeoNode):
    distance = calculate_distance_coords(current_node.lat, current_node.lon, goal_node.lat, goal_node.lon)#math.sqrt((current_node.lat - goal_node.lat)**2 + (current_node.lon - goal_node.lon)**2 )
    is_primary = 0 if not current_node.is_primary else 1
    n_lanes = current_node.n_lanes
    return distance*5 - is_primary*0.5 - n_lanes*0.5

In [490]:
geoGraph = GeoGraph()

In [491]:
initial_node = geoGraph.get_nearest_geoNode(18.497205, -69.895533) #geoGraph.get_node_by_name(248926634)
goal_node = geoGraph.get_nearest_geoNode(18.452902, -69.967182) #geoGraph.get_node_by_name(4747207392)

#initial_node = geoGraph.get_nearest_geoNode(18.491504, -69.892999)
#goal_node = geoGraph.get_nearest_geoNode(18.464901, -69.945862)

print('initial_node: ', initial_node)
print('goal_node: ', goal_node)

initial_node:  GeoNode 309000866 at pos (-698958112, 184975126), coords (18.4975126, -69.8958112)
goal_node:  GeoNode 310521273 at pos (-699675560, 184528156), coords (18.4528156, -69.967556)


In [492]:
SUCCESS, visited, parent_node, cost_of_path = a_star_solver(geoGraph, initial_node, goal_node, heuristic_function)
print('Success: ', SUCCESS)
print('Visited: ', [node.name for node in visited])

Success:  True
Visited:  [309000866, 309000830, 622720638, 521074797, 622720639, 521074798, 309000729, 309000718, 309000883, 309000746, 308981264, 309000741, 309000723, 309000757, 308972818, 308972826, 308972806, 309000682, 309000761, 6689804139, 308972815, 308923892, 309000736, 6235487144, 308924006, 7044735503, 308923930, 3052411089, 308924242, 7329213337, 7329213338, 7329213330, 2384418672, 7329213331, 7329213339, 308923967, 7329213333, 7329213340, 7329213332, 7329213341, 308923966, 308923965, 308923964, 308923963, 308973169, 308923872, 308923893, 310225872, 310225871, 310225870, 309249107, 310225869, 310225863, 309775061, 309775063, 6346208366, 6346208367, 4147222201, 309216526, 309216528, 309216529, 309216531, 309216532, 309216533, 310225891, 310225890, 310225850, 1763126005, 2088100807, 309216546, 1763125972, 310225889, 309216555, 309216562, 309216567, 2088100815, 1706781817, 310225887, 1706781816, 2088100801, 310225885, 310225883, 309248730, 310239611, 3377051819, 309197945, 309

In [493]:
def reconstruct_path(start_node, goal_node, parent_node):
  path = [goal_node]

  current_parent = parent_node[goal_node]
  path.append(current_parent)

  while current_parent:

    next_parent = parent_node[current_parent]
    path.append(next_parent)

    if next_parent == start_node:
      break
    
    current_parent = next_parent
  
  path.reverse()

  cost = 0

  for i in range(len(path)-1):
    cost += geoGraph.get_cost(path[i], path[i+1])

  return path, cost

In [494]:
path, distance = reconstruct_path(initial_node,goal_node, parent_node)
print(f'Distance: {distance:.2f} km')

Distance: 13.94 km


In [495]:
import folium

initial_marker = [initial_node.lat, initial_node.lon]
goal_marker = [goal_node.lat, goal_node.lon]

map = folium.Map(location=initial_marker, zoom_start=17)

folium.Marker(initial_marker).add_to(map)
folium.Marker(goal_marker).add_to(map)

points = []

for node in path:
    point = [node.lat, node.lon]
    points.append(point)

folium.PolyLine(points, weight=5, opacity=1).add_to(map)
map

In [496]:
import folium

initial_marker = [initial_node.lat, initial_node.lon]

map = folium.Map(location=initial_marker, zoom_start=17)

for visited_node in visited:
    point = [visited_node.lat, visited_node.lon]
    folium.Marker(point).add_to(map)

map

In [498]:
test_nodes = []

centroid_node = geoGraph.get_nearest_geoNode(18.472998, -69.891905)

min_distance = 0.6

for geoNode in geoGraph.get_nodes():
    distance = calculate_distance_coords(geoNode.lat, geoNode.lon, centroid_node.lat, centroid_node.lon)

    if distance < min_distance and geoNode.available:
        test_nodes.append(geoNode)

import folium

initial_marker = [18.4708214, -69.895577]

map = folium.Map(location=initial_marker, zoom_start=17)

for test_node in test_nodes:
    point = [test_node.lat, test_node.lon]
    folium.Marker(point).add_to(map)

map