In [None]:
import heapq
import pprint
import random
from dataclasses import dataclass
from itertools import combinations, product
from typing import List, Optional

import networkx as nx
import numpy as np
import numpy.typing as npt
from icecream import ic

In [31]:
np.random.seed(1)
random.seed(1)

## Initialize the problems


In [32]:
SIZES = [10, 20, 50, 100, 200, 500, 1000]
DENSITIES = [0.2, 0.5, 0.8, 1.0]
NOISE_LEVELS = [0, 0.1, 0.5, 0.8]
NEGTIVE_VALUES = [False, True]

In [33]:
def create_problem(
    size: int,
    density: float = 1.0,
    negative_values: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> npt.NDArray:
    """Problem generator for Lab3"""
    rng = np.random.default_rng(seed)
    map = rng.random(size=(size, 2))
    problem = rng.random((size, size))
    if negative_values:
        problem = problem * 2 - 1
    problem *= noise_level
    for a, b in product(range(size), repeat=2):
        if rng.random() < density:
            problem[a, b] += np.sqrt(
                np.square(map[a, 0] - map[b, 0]) + np.square(map[a, 1] - map[b, 1])
            )
        else:
            problem[a, b] = np.inf
    np.fill_diagonal(problem, 0)
    return (problem * 1_000).round()

In [34]:
all_combinations = list(product(SIZES, DENSITIES, NOISE_LEVELS, NEGTIVE_VALUES))

To generated a diverse set of problems, we sample randomly 20 combinations from the se of all possible combinations of the parameters. We save it as a dictionary just so we can print the parameters used to generate each graph.

In [35]:
problems = [
    {
        "_size": s,
        "_density": d,
        "_noise_level": n,
        "_negative_values": v,
        "grid": create_problem(s, d, v, n),
    }
    for s, d, n, v in random.sample(all_combinations, 20)
]

In [41]:
pprint.pprint(problems)

[{'_density': 0.2,
  '_negative_values': False,
  '_noise_level': 0.1,
  '_size': 20,
  'grid': array([[   0.,  355.,   inf,  379.,   inf,   inf,   inf,   inf,  503.,
          inf,  106.,   inf,   inf,  567.,  737.,  608.,   inf,   inf,
          inf,   inf],
       [  inf,    0.,   inf,   inf,   inf,   inf,   inf,   inf,  706.,
          inf,   inf,   inf,  594.,  785.,   inf,   inf,   inf,   inf,
          inf,   inf],
       [  inf,   inf,    0.,   inf,  529.,   inf,   inf,   inf,   inf,
          inf,   inf,   inf,   inf,   inf,   inf,   inf,   inf,   inf,
          inf,   inf],
       [  inf,   inf,   inf,    0.,   inf,   inf,   inf,   inf,   inf,
         258.,  477.,   inf,  622.,   inf,   inf,  268.,   inf,   inf,
         747.,   inf],
       [  inf,   inf,  535.,   inf,    0.,   inf,   inf,   inf,   inf,
          inf,   inf,   inf,   inf,   inf,  293.,   inf,   inf,  478.,
         104.,   inf],
       [  inf,   inf,   inf,   inf,  582.,    0.,   inf,  755.,   inf,
        

In [37]:
masked = [np.ma.masked_array(p["grid"], mask=np.isinf(p["grid"])) for p in problems]
Gs = [nx.from_numpy_array(m, create_using=nx.DiGraph) for m in masked]

## Using the A-star algorithm to find the shortest path

We implement the A-star algorithm in a classical way but we use a heuristic based on the Bellman-Ford algorithm to handle graphs with negative edge weights.


In [38]:
@dataclass
class Node:
    """
    Represents a node in the A* search algorithm.

    Attributes:
        id (int): The identifier of the node.
        g (float): The cost from the start node to this node.
        h (float): The heuristic estimate from this node to the target node.
        f (float): The total estimated cost (g + h).
        parent (Optional[Node]): The parent node in the path.
    """

    id: int
    g: float
    h: float
    parent: Optional["Node"] = None

    def __post_init__(self):
        self.f = self.g + self.h

    def __lt__(self, other: "Node"):
        return self.f < other.f


def heuristic(G: nx.DiGraph, node: int, target: int) -> float:
    """
    Computes an heuristic for the A* algorithm

    Uses networkx's implementation of the Bellman-Ford algorithm to compute the
    shortest-path distances to the target node from all other nodes by
    running the algorithm on a reverse view of the graph. This heuristic should
    handle negative edge weights and detect negative cycles. When a negative
    cycle is detected, the returned heuristic is zero, which is admissible.

    Args:
        G (nx.DiGraph): the graph. The object should be assumed mutable. A cache
            of the Bellman-Ford results is stored in
            `G.graph["_bf_to_target_cache"]` for each target node to avoid
            recomputing the same distances multiple times.
        node (int): the starting node
        target (int): the target node

    Returns:
        float: the heuristic estimate of the distance from `node` to `target`
    """

    cache = G.graph.setdefault("_bf_to_target_cache", {})
    if target not in cache:
        try:
            dist = nx.single_source_bellman_ford_path_length(
                G.reverse(copy=False), target
            )
        except nx.NetworkXUnbounded:
            cache[target] = {}
        else:
            cache[target] = dist

    return float(cache[target].get(node, 0.0))


def astar(
    G: nx.DiGraph, source: int, target: int
) -> tuple[Optional[List[int]], float] | None:
    """
    Implements the A* search algorithm.

    Args:
        G (nx.DiGraph): The directed graph on which to perform the search.
        source (int): The starting node identifier.
        target (int): The target node identifier.

    Returns:
        (tuple[Optional[List[int]], float] | None): A tuple containing the list
            of node identifiers representing the shortest path from source to
            target and the total cost of that path. If no path exists, returns
            None.
    """

    start = Node(id=source, g=0, h=0)
    open_set = []
    closed_set = set()
    heapq.heappush(open_set, start)

    while open_set:
        current = heapq.heappop(open_set)

        if current.id == target:
            path = []
            cost = current.g
            while current:
                path.append(current.id)
                current = current.parent
            return path[::-1], cost

        closed_set.add(current.id)

        for neighbor in G.neighbors(current.id):
            if neighbor in closed_set:
                continue

            g_cost = current.g + G[current.id][neighbor]["weight"]
            h_cost = heuristic(G, neighbor, target)

            neighbor_node = Node(id=neighbor, g=g_cost, h=h_cost, parent=current)

            if any(
                open_node.id == neighbor and open_node.g <= g_cost
                for open_node in open_set
            ):
                continue

            heapq.heappush(open_set, neighbor_node)

Here we compare the results from networkx's built-in Dijkstra's algorithm and our implementation of the A-star. With Dijkstra, if we find negative weights, a `ValueError` exception is raised so in that case we skip the comparison. We randomly sample 20 source-destination pairs from each problem. In the assertions, we use a "greater or equal" comparison because A-star might find a shorter path by traversing negative edges.


In [39]:
for p, G in zip(problems, Gs):
    for s, d in random.sample(list(combinations(range(p["grid"].shape[0]), 2)), 20):
        try:
            path_dijkstra = nx.shortest_path(G, s, d, weight="weight")
            cost_dijkstra = nx.path_weight(G, path_dijkstra, weight="weight")

            path_astar, cost_astar = astar(G, s, d)  # type: ignore
        except nx.NetworkXNoPath:
            path_dijkstra = None
            cost_dijkstra = np.inf
        except ValueError:
            # negative weights with Dijkstra are not admissible in networkx, but
            # we can still work with it using A*
            path_dijkstra = None
            cost_dijkstra = np.inf

        ic(s, d, path_dijkstra, cost_dijkstra, path_astar, cost_astar)
        assert cost_dijkstra >= cost_astar or cost_dijkstra == np.inf

[38;5;247mic[39m[38;5;245m|[39m[38;5;245m [39m[38;5;247ms[39m[38;5;245m:[39m[38;5;245m [39m[38;5;36m6[39m
[38;5;245m    [39m[38;5;247md[39m[38;5;245m:[39m[38;5;245m [39m[38;5;36m7[39m
[38;5;245m    [39m[38;5;247mpath_dijkstra[39m[38;5;245m:[39m[38;5;245m [39m[38;5;245m[[39m[38;5;36m6[39m[38;5;245m,[39m[38;5;245m [39m[38;5;36m7[39m[38;5;245m][39m
[38;5;245m    [39m[38;5;247mcost_dijkstra[39m[38;5;245m:[39m[38;5;245m [39m[38;5;36m709.0[39m
[38;5;245m    [39m[38;5;247mpath_astar[39m[38;5;245m:[39m[38;5;245m [39m[38;5;245m[[39m[38;5;36m6[39m[38;5;245m,[39m[38;5;245m [39m[38;5;36m7[39m[38;5;245m][39m
[38;5;245m    [39m[38;5;247mcost_astar[39m[38;5;245m:[39m[38;5;245m [39m[38;5;36m709.0[39m
[38;5;247mic[39m[38;5;245m|[39m[38;5;245m [39m[38;5;247ms[39m[38;5;245m:[39m[38;5;245m [39m[38;5;36m6[39m
[38;5;245m    [39m[38;5;247md[39m[38;5;245m:[39m[38;5;245m [39m[38;5;36m18[39m
[38;5;245m  