Copyright **`(c)`** 2025 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions â€” see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [10]:
!pip install icecream networkx

Defaulting to user installation because normal site-packages is not writeable


In [11]:
from itertools import product, combinations
import numpy as np
from icecream import ic
import heapq 
from typing import Tuple, List, Optional, Union

ModuleNotFoundError: No module named 'icecream'

In [None]:
import time
import networkx as nx
import numpy as np
import pandas as pd
from icecream import ic

def benchmark_algorithms(problem: np.ndarray, num_tests: int = 5):
    """
    Runs a comparative analysis between the custom SPFA solver, 
    Greedy Best-First Search, and NetworkX's Bellman-Ford (Ground Truth).
    
    Args:
        problem: The adjacency matrix of the graph.
        num_tests: Number of random source-target pairs to test.
    """
    print(f"\n--- Benchmarking Algorithms on {len(problem)} Nodes ({num_tests} iterations) ---")
    
    results = []

    masked_problem = np.ma.masked_array(problem, mask=np.isinf(problem))
    G = nx.from_numpy_array(masked_problem, create_using=nx.DiGraph)
    
    # Generate a random heuristic map for the tests
    heuristic_map = np.random.rand(problem.shape[0], 2)
    
    for _ in range(num_tests):
        source, target = np.random.choice(range(len(problem)), size=2, replace=False)
        
        # 1. Ground Truth (NetworkX Bellman-Ford)
        # We use Bellman-Ford to safely handle potential negative weights
        try:
            start_time = time.time()
            nx_path = nx.bellman_ford_path(G, source, target, weight='weight')
            nx_cost = nx.path_weight(G, nx_path, weight='weight')
            nx_time = (time.time() - start_time) * 1000 # ms
            valid_path_exists = True
        except nx.NetworkXNoPath:
            valid_path_exists = False
            nx_cost = np.inf
            nx_time = 0
        except nx.NetworkXUnbounded:
            # Negative cycle
            nx_cost = -np.inf
            valid_path_exists = False

        if not valid_path_exists:
            continue

        # 2. Custom SPFA Solver
        start_time = time.time()
        spfa_path, spfa_cost = solver(problem, source, target)
        spfa_time = (time.time() - start_time) * 1000 # ms
        
        # 3. Greedy Best-First Solver
        start_time = time.time()
        greedy_path, greedy_cost = best_first_solver(problem, heuristic_map, source, target)
        greedy_time = (time.time() - start_time) * 1000 # ms
        
        # Calculate Optimality Gap for Greedy
        if greedy_cost != np.inf and nx_cost != 0:
            gap = ((greedy_cost - nx_cost) / nx_cost) * 100
        else:
            gap = 0.0

        results.append({
            "Source -> Target": f"{source} -> {target}",
            "NX Cost (Optimal)": f"{nx_cost:.2f}",
            "SPFA Cost": f"{spfa_cost:.2f}",
            "SPFA Time (ms)": f"{spfa_time:.4f}",
            "Greedy Cost": f"{greedy_cost:.2f}",
            "Greedy Gap (%)": f"{gap:.2f}%",
            "Greedy Time (ms)": f"{greedy_time:.4f}"
        })

    # Create a DataFrame for nice visualization
    df = pd.DataFrame(results)
    
    if not df.empty:
        print(df.to_string(index=False))
        
        # Summary statistics
        print("\n--- Summary Statistics ---")
        avg_spfa_time = pd.to_numeric(df["SPFA Time (ms)"]).mean()
        avg_greedy_time = pd.to_numeric(df["Greedy Time (ms)"]).mean()
        print(f"Avg SPFA Time: {avg_spfa_time:.4f} ms")
        print(f"Avg Greedy Time: {avg_greedy_time:.4f} ms")
        print("Note: SPFA should match NX Cost (exact algorithm). Greedy trades optimality for speed.")
    else:
        print("No valid paths found in the random iterations.")



In [2]:
def create_problem(
    size: int,
    *,
    density: float = 1.0,
    negative_values: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> np.ndarray:
    """Problem generator for Lab3"""
    rng = np.random.default_rng(seed)
    map = rng.random(size=(size, 2))
    problem = rng.random((size, size))
    if negative_values:
        problem = problem * 2 - 1
    problem *= noise_level
    for a, b in product(range(size), repeat=2):
        if rng.random() < density:
            problem[a, b] += np.sqrt(
                np.square(map[a, 0] - map[b, 0]) + np.square(map[a, 1] - map[b, 1])
            )
        else:
            problem[a, b] = np.inf
    np.fill_diagonal(problem, 0)
    return (problem * 1_000).round()

In [3]:
#size: 10,20,50,100,200,500,1000
#density: 0.2,  0.5, 0.8, 1.0
#noise_level: 0, 0.1, 0.5, 0.8
#negative_values: True, False
problem = create_problem(1000, density=0.005, noise_level=10, negative_values=False)

In [6]:
masked = np.ma.masked_array(problem, mask=np.isinf(problem))
G = nx.from_numpy_array(masked, create_using=nx.DiGraph)

NameError: name 'nx' is not defined

In [7]:
def solver(problem: np.ndarray, source: int, target: int) -> Tuple[Optional[List[int]], Union[float, int]]:
    """
    Find the shortest path using the SPFA algorithm (Shortest Path Faster Algorithm),
    an optimization of Bellman-Ford that uses a queue.
    Handles negative weights and detects negative cycles.
    """
    num_nodes = len(problem)
    
    dist = {node: np.inf for node in range(num_nodes)}
    parent = {node: None for node in range(num_nodes)}
    # Count how many times a node has been relaxed
    relax_count = {node: 0 for node in range(num_nodes)}

    dist[source] = 0
    
    # Priority queue (min-heap) based on path cost (g)
    # Like Dijkstra/Uniform-Cost Search 
    pq = [(0, source)]  # (g(source), source)
    
    while pq:
        # Extract the node with the lowest g_score
        current_g, current_node = heapq.heappop(pq)
        
        # Optimization: if we found a shorter path to this node
        # *after* it was added to the queue, this entry is stale. Skip it.
        if current_g > dist[current_node]:
            continue
            
        # Edge relaxation (like Bellman-Ford)
        for neighbor in range(num_nodes):
            cost = problem[current_node, neighbor]
            
            # If an edge exists (cost not infinite)
            if cost != np.inf:
                new_dist_g = current_g + cost
                
                # Found a shorter path?
                if new_dist_g < dist[neighbor]:
                    dist[neighbor] = new_dist_g
                    parent[neighbor] = current_node
                    
                    # Negative cycle check (like SPFA)
                    relax_count[neighbor] += 1
                    
                    # A simple path has at most V-1 edges.
                    # If a node is relaxed V times, we have a negative cycle.
                    if relax_count[neighbor] >= num_nodes:
                        # Negative cycle detected!
                        return None, -np.inf
                    
                    heapq.heappush(pq, (new_dist_g, neighbor))

    if dist[target] == np.inf:
        # No path found
        return None, np.inf  

    path = []
    curr = target
    while curr is not None:
        path.append(curr)
        if curr == source:
            break  # We reached the start
        curr = parent[curr]
        
    # If the loop stopped because curr is None and we didn't find the source,
    # there is no path (handled by the infinite-distance check).
    if path[-1] != source:
        return None, np.inf 

    # The path is built backwards (from target to source),
    # so we need to reverse it.
    return path[::-1], dist[target]

NameError: name 'Tuple' is not defined

In [None]:
        # CLARIFICATION: Greedy Best-First uses f(n) = h(n). 
        # It ignores the accumulated path cost g(n), so it is NOT optimal 
        # for weighted graphs, but often expands fewer nodes than Dijkstra

In [8]:
import heapq 
import numpy as np
from typing import Tuple, List, Optional, Union

def best_first_solver(
    problem: np.ndarray, 
    heuristic_map: np.ndarray,
    source: int, 
    target: int
) -> Tuple[Optional[List[int]], Union[float, int]]:
    """
    Find a (non-optimal) path using Greedy Best-First Search.
    Fast but not optimal.
    """
    num_nodes = len(problem)
    
    # Compute the heuristic (straight-line distance) from node 'n' to 'target'
    def h(n):
        return np.sqrt(
            np.square(heuristic_map[n, 0] - heuristic_map[target, 0]) + 
            np.square(heuristic_map[n, 1] - heuristic_map[target, 1])
        )

    # Priority queue (min-heap)
    # Format: (h_score, g_score, current_node, path_list)
    # Priority 1: h_score (heuristic) - lower is better
    # Priority 2: g_score (actual cost) - tie-breaker prefers cheaper paths
    
    start_h = h(source)
    pq = [(start_h, 0, source, [source])]  # (h, g, node, path)
    
    # Use a set to avoid visiting the same node multiple times
    # This prevents infinite cycles
    visited = set()

    while pq:
        h_score, g_score, current_node, path = heapq.heappop(pq)
        
        # Found the target
        if current_node == target:
            return path, g_score
            
        # Skip if we've already processed this node
        if current_node in visited:
            continue
        visited.add(current_node)
        
        # Explore neighbors
        for neighbor in range(num_nodes):
            cost = problem[current_node, neighbor]
            
            # If an edge exists and the neighbor has not been visited
            if cost != np.inf and neighbor not in visited:
                new_g = g_score + cost
                new_h = h(neighbor)
                new_path = path + [neighbor]
                
                # Push into queue with priority h(n)
                heapq.heappush(pq, (new_h, new_g, neighbor, new_path))

    # If the loop finishes, no path exists
    return None, np.inf

In [None]:
for s, d in combinations(range(problem.shape[0]), 2):
    try:
        #path = nx.shortest_path(G, s, d, weight='weight')
        path = nx.bellman_ford_path(G, s, d, weight='weight') 
        cost = cost = nx.path_weight(G, path, weight='weight')
        ic(d)
    except nx.NetworkXNoPath:
        # Nodes are not connected
        path = None
        cost = np.inf
        path1 = None
        cost1 = np.inf
    except nx.NetworkXUnbounded:
        # Negative cycle detected
        path = None
        cost = -np.inf
        path1 = None
        cost1 = -np.inf
None

In [None]:
for s, d in combinations(range(problem.shape[0]), 2):
    
    try:
        #path = nx.shortest_path(G, s, d, weight='weight')
        heuristic_map = np.random.rand(problem.shape[0], 2)
        path, cost = best_first_solver(problem, heuristic_map, s, d)
        #ic(s, d, cost,  cost)
        ic(d)
    except nx.NetworkXNoPath:
        # Nodes are not connected
        path = None
        cost = np.inf
    except nx.NetworkXUnbounded:
        # Negative cycle detected
        path = None
        cost = -np.inf
None