In [11]:

from __future__ import annotations

import math
import random
import time
from dataclasses import dataclass
from itertools import product
from typing import Dict, Hashable, Iterable, List, Optional, Tuple

import networkx as nx
import numpy as np



### Problem data structure

In [None]:
@dataclass
class ProblemInstance:
    cost_matrix: np.ndarray   # (n, n) with +inf for missing edges
    coords: np.ndarray        # (n, 2) city coordinates
    graph: nx.DiGraph         # directed graph with 'weight' attribute


def build_problem(
    n_nodes: int,
    *,
    density: float = 1.0,
    allow_negative: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> ProblemInstance:
    rng = np.random.default_rng(seed)

    # coordinates of cities
    coords = rng.random((n_nodes, 2))

    # initially all weights are +inf (no edge)
    weights = np.full((n_nodes, n_nodes), np.inf, dtype=float)

    for i, j in product(range(n_nodes), repeat=2):
        if i == j:
            weights[i, j] = 0.0
            continue

        # apply density (probability that the edge exists)
        if rng.random() > density:
            continue

        dx, dy = coords[i] - coords[j]
        dist = math.hypot(dx, dy)

        if allow_negative:
            noise = (rng.random() * 2.0 - 1.0) * noise_level
        else:
            noise = rng.random() * noise_level

        weights[i, j] = dist + noise

    scaled = np.round(weights * 1_000.0)

    # build the NetworkX graph
    masked = np.ma.masked_array(scaled, mask=np.isinf(scaled))
    G = nx.from_numpy_array(masked, create_using=nx.DiGraph)

    return ProblemInstance(cost_matrix=scaled, coords=coords, graph=G)


### Shortest path algorithms

In [13]:


def dijkstra_all(
    G: nx.DiGraph, source: Hashable
) -> Tuple[Dict[Hashable, float], Dict[Hashable, Optional[Hashable]]]:
    import heapq

    dist: Dict[Hashable, float] = {u: math.inf for u in G.nodes}
    parent: Dict[Hashable, Optional[Hashable]] = {u: None for u in G.nodes}
    dist[source] = 0.0

    visited = set()
    heap: List[Tuple[float, Hashable]] = [(0.0, source)]

    while heap:
        d, u = heapq.heappop(heap)
        if u in visited:
            continue
        visited.add(u)

        for v, data in G[u].items():
            w = float(data.get("weight", 1.0))
            nd = d + w
            if nd < dist[v]:
                dist[v] = nd
                parent[v] = u
                heapq.heappush(heap, (nd, v))

    return dist, parent



In [None]:
def bellman_ford_all(
    G: nx.DiGraph, source: Hashable
) -> Tuple[Optional[Dict[Hashable, float]], Optional[Dict[Hashable, Optional[Hashable]]]]:

    nodes = list(G.nodes)
    dist: Dict[Hashable, float] = {u: math.inf for u in nodes}
    parent: Dict[Hashable, Optional[Hashable]] = {u: None for u in nodes}
    dist[source] = 0.0

    # |V|-1 relaxation iterations
    for _ in range(len(nodes) - 1):
        updated = False
        for u, v, data in G.edges(data=True):
            w = float(data.get("weight", 1.0))
            if dist[u] + w < dist[v]:
                dist[v] = dist[u] + w
                parent[v] = u
                updated = True
        if not updated:
            break

    # negative cycle check
    for u, v, data in G.edges(data=True):
        w = float(data.get("weight", 1.0))
        if dist[u] + w < dist[v]:
            return None, None

    return dist, parent


In [15]:

def heuristic_euclidean(u: int, v: int, coords: np.ndarray) -> float:
    dx, dy = coords[u] - coords[v]
    return math.hypot(dx, dy) * 1_000.0


In [None]:
def astar_one_pair(
    G: nx.DiGraph,
    source: int,
    target: int,
    coords: np.ndarray,
) -> Tuple[Optional[List[int]], float]:

    import heapq

    if source == target:
        return [source], 0.0

    open_heap: List[Tuple[float, float, int]] = []  # (f, g, node)
    heapq.heappush(open_heap, (0.0, 0.0, source))

    came_from: Dict[int, Optional[int]] = {source: None}
    g_score: Dict[int, float] = {source: 0.0}
    closed: set[int] = set()

    while open_heap:
        f, g, u = heapq.heappop(open_heap)
        if u in closed:
            continue
        closed.add(u)

        if u == target:
            # reconstruct path
            path: List[int] = []
            cur: Optional[int] = u
            while cur is not None:
                path.append(cur)
                cur = came_from[cur]
            path.reverse()
            return path, g

        for v, data in G[u].items():
            w = float(data.get("weight", 1.0))
            tentative_g = g + w
            best_g = g_score.get(v, math.inf)
            if tentative_g < best_g:
                g_score[v] = tentative_g
                came_from[v] = u
                h = heuristic_euclidean(v, target, coords)
                heapq.heappush(open_heap, (tentative_g + h, tentative_g, v))

    return None, math.inf


### Comparison and statistics utilities

In [None]:
@dataclass
class RunStats:
    n_nodes: int
    density: float
    noise_std: float
    has_negative_edges: bool

    algorithm: str
    n_pairs: int
    reachable_pairs: int
    mismatches: int
    max_abs_error: float
    runtime_sec: float


def graph_has_negative_edge(G: nx.DiGraph) -> bool:
    """True if there exists at least one edge with negative weight."""
    for _, _, data in G.edges(data=True):
        if float(data.get("weight", 1.0)) < 0:
            return True
    return False


def compare_with_networkx(
    G: nx.DiGraph,
    source: int,
    my_dist: Dict[int, float],
    *,
    use_bellman_ford: bool,
) -> Tuple[int, int, float]:
    
    if use_bellman_ford:
        nx_dist, _ = nx.single_source_bellman_ford(G, source)
    else:
        nx_dist, _ = nx.single_source_dijkstra(G, source)

    nodes = list(G.nodes)
    n_pairs = len(nodes)
    mismatches = 0
    max_err = 0.0

    for t in nodes:
        d_my = my_dist.get(t, math.inf)
        d_nx = nx_dist.get(t, math.inf)  # NetworkX uses dict only for reachable nodes

        if math.isinf(d_my) and math.isinf(d_nx):
            continue

        diff = abs(d_my - d_nx)
        if diff > 1e-6:
            mismatches += 1
            if diff > max_err:
                max_err = diff

    return n_pairs, mismatches, max_err


In [None]:
def run_all_pairs_on_instance(
    inst: ProblemInstance,
    *,
    sample_astar: bool = True,
    astar_samples: int = 200,
) -> List[RunStats]:
    
    G = inst.graph
    coords = inst.coords
    n = G.number_of_nodes()

    has_neg = graph_has_negative_edge(G)

    results: List[RunStats] = []

    # ---------------- Dijkstra or Bellman-Ford (all sources) ----------------
    algo_name = "bellman_ford" if has_neg else "dijkstra"
    use_bf = has_neg

    t0 = time.perf_counter()
    total_pairs = 0
    total_mismatch = 0
    max_error = 0.0
    reachable_pairs = 0

    for s in G.nodes:
        if use_bf:
            dist, parent = bellman_ford_all(G, s)
            if dist is None:
                print(f"[WARN] negative cycle reachable from source {s}, ")
                print("results ignored for this source.")
                continue
        else:
            dist, parent = dijkstra_all(G, s)

        reachable_pairs += sum(1 for d in dist.values() if not math.isinf(d))

        n_pairs, mismatches, err = compare_with_networkx(
            G, s, dist, use_bellman_ford=use_bf
        )
        total_pairs += n_pairs
        total_mismatch += mismatches
        max_error = max(max_error, err)

    t1 = time.perf_counter()

    stats_main = RunStats(
        n_nodes=n,
        density=float(nx.density(G)),
        noise_std=float(np.std(inst.cost_matrix[np.isfinite(inst.cost_matrix)])),
        has_negative_edges=has_neg,
        algorithm=algo_name,
        n_pairs=total_pairs,
        reachable_pairs=reachable_pairs,
        mismatches=total_mismatch,
        max_abs_error=max_error,
        runtime_sec=t1 - t0,
    )
    results.append(stats_main)

    # ---------------- A* on a sample of pairs ----------------
    if (not has_neg) and sample_astar:
        rng = random.Random(123)
        nodes = list(G.nodes)

        t0 = time.perf_counter()
        n_pairs = 0
        mismatches = 0
        max_err = 0.0
        reach_astar = 0

        for _ in range(astar_samples):
            s = rng.choice(nodes)
            t = rng.choice(nodes)
            if s == t:
                continue

            path, cost = astar_one_pair(G, s, t, coords)
            my_cost = cost if path is not None else math.inf
            if path is not None:
                reach_astar += 1

            # reference with NetworkX (Dijkstra)
            try:
                ref_cost = nx.shortest_path_length(
                    G, s, t, weight="weight", method="dijkstra"
                )
            except nx.NetworkXNoPath:
                ref_cost = math.inf

            if math.isinf(my_cost) and math.isinf(ref_cost):
                pass
            else:
                diff = abs(my_cost - ref_cost)
                if diff > 1e-6:
                    mismatches += 1
                    max_err = max(max_err, diff)

            n_pairs += 1

        t1 = time.perf_counter()

        stats_astar = RunStats(
            n_nodes=n,
            density=float(nx.density(G)),
            noise_std=float(np.std(inst.cost_matrix[np.isfinite(inst.cost_matrix)])),
            has_negative_edges=False,
            algorithm="astar_sample",
            n_pairs=n_pairs,
            reachable_pairs=reach_astar,
            mismatches=mismatches,
            max_abs_error=max_err,
            runtime_sec=t1 - t0,
        )
        results.append(stats_astar)

    return results


### Parameter grid + print statistics

In [19]:


def main() -> None:
    sizes = [10, 20, 50, 100, 200]
    densities = [0.2, 0.5, 0.8, 1.0]
    noise_levels = [0.0, 0.1, 0.5, 0.8]
    negative_flags = [False, True]

    experiments_total = (
        len(sizes) * len(densities) * len(noise_levels) * len(negative_flags)
    )
    exp_id = 0
    all_stats: List[RunStats] = []

    for n, dens, noise, neg in product(sizes, densities, noise_levels, negative_flags):
        exp_id += 1
        print(f"\n=== Esperimento {exp_id}/{experiments_total} ===")
        print(f"n={n}, density={dens}, noise={noise}, allow_negative={neg}")

        inst = build_problem(
            n_nodes=n,
            density=dens,
            allow_negative=neg,
            noise_level=noise,
            seed=42,
        )

        stats_list = run_all_pairs_on_instance(inst)
        for st in stats_list:
            all_stats.append(st)
            print(
                f"[{st.algorithm:12s}] pairs={st.n_pairs:5d}, "
                f"reachable={st.reachable_pairs:5d}, "
                f"mismatches={st.mismatches:3d}, "
                f"max_err={st.max_abs_error:.3g}, "
                f"time={st.runtime_sec:.2f}s"
            )

    print("\n--- Riepilogo finale ---")
    for st in all_stats:
        print(
            f"n={st.n_nodes:4d} | alg={st.algorithm:12s} | "
            f"dens={st.density:.3f} | noise_std={st.noise_std:.2f} | "
            f"neg={st.has_negative_edges} | "
            f"pairs={st.n_pairs:6d} | mis={st.mismatches:4d} | "
            f"max_err={st.max_abs_error:.3g} | time={st.runtime_sec:.2f}s"
        )


if __name__ == "__main__":
    main()
    


=== Esperimento 1/160 ===
n=10, density=0.2, noise=0.0, allow_negative=False
[dijkstra    ] pairs=  100, reachable=   76, mismatches=  0, max_err=0, time=0.00s
[astar_sample] pairs=  183, reachable=  136, mismatches=  0, max_err=0, time=0.02s

=== Esperimento 2/160 ===
n=10, density=0.2, noise=0.0, allow_negative=True
[dijkstra    ] pairs=  100, reachable=   76, mismatches=  0, max_err=0, time=0.00s
[astar_sample] pairs=  183, reachable=  136, mismatches=  0, max_err=0, time=0.01s

=== Esperimento 3/160 ===
n=10, density=0.2, noise=0.1, allow_negative=False
[dijkstra    ] pairs=  100, reachable=   76, mismatches=  0, max_err=0, time=0.00s
[astar_sample] pairs=  183, reachable=  136, mismatches=  0, max_err=0, time=0.01s

=== Esperimento 4/160 ===
n=10, density=0.2, noise=0.1, allow_negative=True
[dijkstra    ] pairs=  100, reachable=   76, mismatches=  0, max_err=0, time=0.00s
[astar_sample] pairs=  183, reachable=  136, mismatches=  0, max_err=0, time=0.02s

=== Esperimento 5/160 ===

KeyboardInterrupt: 