In [None]:
import osmnx as ox
import networkx as nx
import heapq
import json
import os

ox.config(use_cache=True, log_console=True, cache_folder='../../cache')

OUTPUT_DIR = '../../data'

  ox.config(use_cache=True, log_console=True, cache_folder='../cache')


# AStarSolver

In [2]:
class AStarSolver:
    def __init__(self, graph):
        self.graph = graph

    def heuristic(self, u, v):
        """
        Calculate the heuristic value between two nodes using the great-circle distance.
        """
        u_lat, u_lon = self.graph.nodes[u]['y'], self.graph.nodes[u]['x']
        v_lat, v_lon = self.graph.nodes[v]['y'], self.graph.nodes[v]['x']
        return ox.distance.great_circle(u_lat, u_lon, v_lat, v_lon)

    def astar_search(self, start, goal):
        """
        Perform the A* search algorithm from start to goal node.
        """
        open_set = []
        heapq.heappush(open_set, (0, start))
        came_from = {}
        g_score = {node: float('inf') for node in self.graph.nodes}
        g_score[start] = 0
        f_score = {node: float('inf') for node in self.graph.nodes}
        f_score[start] = self.heuristic(start, goal)
        open_set_hash = {start}

        steps = []

        while open_set:
            current_f, current = heapq.heappop(open_set)
            open_set_hash.remove(current)
            steps.append({
                'current_node': current,
                'f_score': current_f,
                'g_score': g_score[current],
                'neighbors': list(self.graph.neighbors(current)),
            })

            if current == goal:
                path = []
                while current in came_from:
                    path.append(current)
                    current = came_from[current]
                path.append(start)
                path.reverse()
                return path, steps

            for neighbor in self.graph.neighbors(current):
                edge_data = self.graph.get_edge_data(current, neighbor)
                min_edge_length = min(
                    data.get('length', self.heuristic(current, neighbor))
                    for data in (edge_data.values() if isinstance(edge_data, dict) else [edge_data])
                )

                tentative_g_score = g_score[current] + min_edge_length
                if tentative_g_score < g_score[neighbor]:
                    came_from[neighbor] = current
                    g_score[neighbor] = tentative_g_score
                    heuristic_value = self.heuristic(neighbor, goal)
                    f_score[neighbor] = g_score[neighbor] + heuristic_value

                    if neighbor not in open_set_hash:
                        heapq.heappush(open_set, (f_score[neighbor], neighbor))
                        open_set_hash.add(neighbor)

        return None, steps

# Path resolver

## Download graph

In [16]:
graph = ox.graph_from_point((50.444482, 30.449471), network_type='drive', dist=10000, simplify=False)

  G = graph_from_bbox(


## Define start and end points

In [17]:
start_point = (50.449507, 30.450145)
end_point = (50.466327, 30.524171)

start_node = ox.distance.nearest_nodes(graph, start_point[1], start_point[0])
end_node = ox.distance.nearest_nodes(graph, end_point[1], end_point[0])

(start_node, end_node)

(6049114772, 347862570)

## Find path

In [18]:
solver = AStarSolver(graph)
print("Running A* algorithm...")
path, steps = solver.astar_search(start_node, end_node)
if path is None:
    print("No path found.")
    exit()
print("Path found.")

Running A* algorithm...
Path found.


# Save results

In [19]:
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

print(f"Saving steps to '{OUTPUT_DIR}/astar_steps.json'...")
with open(f'{OUTPUT_DIR}/astar_steps.json', 'w') as f:
    json.dump(steps, f)
print("Steps saved.")

print(f"Saving path to '{OUTPUT_DIR}/astar_path.json'...")
with open(f'{OUTPUT_DIR}/astar_path.json', 'w') as f:
    json.dump(path, f)
print("Path saved.")

print(f"Saving graph data to '{OUTPUT_DIR}/graph_nodes_edges.json'...")
nodes = [{'id': node, 'x': data['x'], 'y': data['y']} for node, data in graph.nodes(data=True)]
edges = [{'source': u, 'target': v, 'length': min(
            data.get('length', 1) for data in graph.get_edge_data(u, v).values()
        )} for u, v in graph.edges()]
with open(f'{OUTPUT_DIR}/graph_nodes_edges.json', 'w') as f:
    json.dump({'nodes': nodes, 'edges': edges}, f)
print("Graph data saved.")

Saving steps to '../../data/astar_steps.json'...
Steps saved.
Saving path to '../../data/astar_path.json'...
Path saved.
Saving graph data to '../../data/graph_nodes_edges.json'...
Graph data saved.
