In [15]:
import networkx as nx
import numpy as np
import d3networkx as d3nx

In [23]:
G = d3nx.d3graph.D3Graph(nx.read_weighted_edgelist("data/path.edgelist"))

In [56]:
_node_pos = {
    "A": (0, 1),
    "S": (1, 0),
    "B": (1, 3),
    "D": (0, 4),
    "H": (1, 5),
    "F": (0, 6),
    "G": (2, 6),
    "C": (3, 0),
    "I": (3, 4),
    "E": (3, 6),
    "L": (4, 2),
    "K": (4, 4),
    "J": (5, 3),
}
# Convert to numpy array
node_pos = {node: np.array(pos) for node, pos in _node_pos.items()}

In [27]:
d3 = await d3nx.create_d3nx_visualizer()
d3.clear()
d3.set_graph(G)
d3.update()

websocket server started...

networkx connected...visualizer connected...

In [61]:
import networkx as nx
import heapq
from asyncio import sleep
from typing import Optional, Union, Hashable
from collections import deque
import more_itertools

Node = Hashable


def astar_search(
    graph: Union[nx.Graph, nx.DiGraph],
    start: Node,
    end: Node,
    heuristic: Optional[dict[Node, float]]=None,
):
    """Find the shortest path between two nodes in a weighted graph using the a* algorithm."""
    # Determine if the graph is directed or undirected
    is_directed = isinstance(graph, nx.DiGraph)

    # Set the heuristic to zero if not provided
    if not heuristic:
        heuristic = {node: 0.0 for node in graph.nodes()}

    # Initialize the distances of all nodes to infinity
    distance: dict[Node, float] = {node: float("inf") for node in graph.nodes()}
    distance[start] = 0

    # Initialize the heap queue with the start node
    heap = [(0 + heuristic[start], start)]

    # Initialize the predecessor dictionary
    predecessors = {}

    while heap:
        # Pop the node with the minimum distance from the heap queue
        _, curr_node = heapq.heappop(heap)

        # If we've reached the end node, terminate early
        if curr_node == end:
            break

        # Iterate over the neighbors of the current node
        neighbors = (
            graph.successors(curr_node) if is_directed else graph.neighbors(curr_node)
        )
        for neighbor in neighbors:
            # Calculate the distance from the current node to the neighbor
            edge_weight = graph[curr_node][neighbor]["weight"]
            new_dist = distance[curr_node] + edge_weight

            # Update the distance and predecessor of the neighbor if a shorter path was found
            if new_dist < distance[neighbor]:
                distance[neighbor] = new_dist
                predecessors[neighbor] = curr_node

                # Add the neighbor to the heap queue
                heapq.heappush(heap, (new_dist + heuristic[curr_node], neighbor))

        yield predecessors, distance

    raise ValueError(f"No path found from {start} to {end}")

def predecessor_path(predecessors: dict, start: Node, end: Node):
    # Construct the shortest path from the predecessor dictionary
    path = [end]
    while path[-1] != start:
        path.append(predecessors[path[-1]])
        yield path

    return path

def astar(
    graph: Union[nx.Graph, nx.DiGraph],
    start: Node,
    end: Node,
    heuristic: Optional[dict[Hashable, float]] = None
):
    """Run the a* pathfinding algorithm on a graph.

    Returns
    -------
    final_path: list of Nodes
    distance: length of path (sum of edgeweights)
    num_nodes_searched: number of nodes searched before finding the path
    """
    # Use deque to get the last item in the generator
    predecessors, distance = deque(astar_search(graph, start, end, heuristic), maxlen=1).pop()
    path = deque(predecessor_path(predecessors, start, end), maxlen=1).pop()
    path.reverse()

    return path, distance[end], len(predecessors)

async def astar_d3(
    d3: d3nx.D3NetworkxRenderer,
    graph: Union[nx.Graph, nx.DiGraph],
    start: Node,
    end: Node,
    heuristic: Optional[dict[Hashable, float]] = None,
    sleep_time=0.5,
):
    """Visualize the a* pathfinding algorithm on a graph with d3nx
    
    Returns
    -------
    final_path: list of Nodes
    distance: length of path (sum of edgeweights)
    num_nodes_searched: number of nodes searched before finding the path
    """
    d3.clear_highlights()
    d3.highlight_nodes((start,))
    d3.update()

    predecessors = {}
    distance = {}
    for predecessors, distance in astar_search(graph, start, end, heuristic):
        d3.highlight_nodes(predecessors.keys())
        d3.update()
        await sleep(sleep_time)

    d3.clear_highlights()
    d3.highlight_nodes((start,))
    d3.update()
    await sleep(sleep_time)

    path = []
    for path in predecessor_path(predecessors, start, end):
        d3.highlight_nodes(path)
        d3.highlight_edges(more_itertools.pairwise(path))
        d3.update()
        await sleep(sleep_time)

    path.reverse()

    return path, distance[end], len(predecessors)

async def dijkstra_d3(
    d3: d3nx.D3NetworkxRenderer,
    graph: Union[nx.Graph, nx.DiGraph],
    start: Node,
    end: Node,
    sleep_time=0.5,
):
    return astar_d3(d3, graph, start, end, heuristic=None, sleep_time=sleep_time)


In [62]:
# await dijkstra_d3(d3, G, "S", "E")
node_dist = {node: float(np.linalg.norm(pos-node_pos["E"])) for node, pos in node_pos.items()} # type: dict[Node, float]
await astar_d3(d3, G, "S", "E", node_dist)

(['S', 'B', 'H', 'G', 'E'], 7.0)

In [44]:
d3.clear_highlights()
d3.update()