Copyright **`(c)`** 2025 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions â€” see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# Import and setup

In [None]:
from itertools import product, combinations
import numpy as np
import networkx as nx
import heapq
import pandas as pd
import time
from typing import Dict, List, Tuple, Optional

# Problem generator

``
create_problem(size, density, negative_values, noise_level, seed) -> (problem_matrix, node_coordinates)
``

Generates random weighted directed graph as adjacency matrix.

In [None]:
def create_problem(
    size: int,
    *,
    density: float = 1.0,
    negative_values: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> Tuple[np.ndarray, np.ndarray]:

    rng = np.random.default_rng(seed)
    map = rng.random(size=(size, 2))
    problem = rng.random((size, size))
    if negative_values:
        problem = problem * 2 - 1
    problem *= noise_level
    for a, b in product(range(size), repeat=2):
        if rng.random() < density:
            problem[a, b] += np.sqrt(
                np.square(map[a, 0] - map[b, 0]) + np.square(map[a, 1] - map[b, 1])
            )
        else:
            problem[a, b] = np.inf
    np.fill_diagonal(problem, 0)
    return (problem * 1_000).round(), map

# Utility functions

``python
path_reconstruct(predecessors, current) -> path
``

Reconstructs path by backtracking through predecessor map until reaching start node (where predecessor is None).

``python
has_negative_edges(graph) -> bool
``

Iterates through all edges checking if any weight is negative. Used to determine which algorithm to apply.

In [None]:
def path_reconstruct(predecessors: Dict[int, Optional[int]], current: int) -> List[int]:

    path = []
    while current is not None:
        path.append(current)
        current = predecessors[current]
    path.reverse()
    return path

def has_negative_edges(graph: nx.DiGraph) -> bool:

    for u, v in graph.edges():
        if graph[u][v].get('weight', 1) < 0:
            return True
    return False

# Algorithms

## Dijkstra

`dijkstra(graph, start) -> (distances, predecessors)`

Single-source shortest path using priority queue. Returns distances from start to all nodes and predecessor map. 

**Complexity**: O((V + E) log V) with priority queue  
**Limitation**: Does not work with negative weights

In [None]:
def dijkstra(graph: nx.DiGraph, start: int) -> Tuple[Dict[int, float], Dict[int, Optional[int]]]:

    distances = {node: np.inf for node in graph.nodes}
    predecessors = {node: None for node in graph.nodes}
    distances[start] = 0

    queue = [(0, start)]
    visited = set()

    while queue:
        current_distance, current = heapq.heappop(queue)

        if current in visited:
            continue
        visited.add(current)

        for neighbor in graph.successors(current):
            edge_weight = graph[current][neighbor].get('weight', 1)
            tentative_distance = current_distance + edge_weight

            if tentative_distance < distances[neighbor]:
                distances[neighbor] = tentative_distance
                predecessors[neighbor] = current
                heapq.heappush(queue, (tentative_distance, neighbor))

    return distances, predecessors

## A* (A-star)

`a_star(graph, start, goal, coords_map) -> (path, distance)`

A* search from start to goal using f(n) = g(n) + h(n). Returns optimal path and its cost.

**Heuristic**: Euclidean distance between nodes  
**Complexity**: O(E) best case, O(b^d) worst case  
**Limitation**: Does not work with negative weights



In [None]:
def heuristic(node: int, goal: int, coords_map: np.ndarray) -> float:

    dist = np.linalg.norm(coords_map[node] - coords_map[goal])
    return np.round(dist * 1_000)

def a_star(graph: nx.DiGraph, start: int, goal: int, coords_map: np.ndarray) -> Tuple[Optional[List[int]], float]:

    if start == goal:
        return [start], 0
    
    counter = 0
    to_visit = [(heuristic(start, goal, coords_map), counter, start)]
    counter += 1

    predecessors = {}
    g_scores = {start: 0}
    visited = set()

    while to_visit:
        _, _, current = heapq.heappop(to_visit)

        if current in visited:
            continue
        visited.add(current)

        if current == goal:
            path = []
            node = current
            while node in predecessors:
                path.append(node)
                node = predecessors[node]
            path.append(start)
            path.reverse()
            return path, g_scores[goal]
        
        for neighbor in graph.successors(current):
            if neighbor in visited:
                continue
            
            edge_weight = graph[current][neighbor].get('weight', 1)
            tentative_g_score = g_scores[current] + edge_weight

            if neighbor not in g_scores or tentative_g_score < g_scores[neighbor]:
                predecessors[neighbor] = current
                g_scores[neighbor] = tentative_g_score
                f_score = tentative_g_score + heuristic(neighbor, goal, coords_map)
                heapq.heappush(to_visit, (f_score, counter, neighbor))
                counter += 1

    return None, np.inf

## Bellman Ford

`bellman_ford(graph, start) -> (distances, predecessors)`

More general algorithm that handles negative weights and detects negative cycles. Single-source shortest path using edge relaxation. Returns None if negative cycle detected. 

**Complexity**: O(V * E)  
**Advantage**: Works with negative weights, detects negative cycles


In [None]:
def bellman_ford(graph: nx.DiGraph, start: int) -> Tuple[Optional[Dict[int, float]], Optional[Dict[int, Optional[int]]]]:

    distances = {node: np.inf for node in graph.nodes}
    predecessors = {node: None for node in graph.nodes}
    distances[start] = 0

    for _ in range(len(graph.nodes) - 1):
        for u, v in graph.edges():
            weight = graph[u][v].get('weight', 1)
            if distances[u] + weight < distances[v]:
                distances[v] = distances[u] + weight
                predecessors[v] = u

    for u, v in graph.edges():
        weight = graph[u][v].get('weight', 1)
        if distances[u] + weight < distances[v]:
            return None, None
    
    return distances, predecessors

# All-Pairs Shortest Paths

Functions that run algorithms for all pairs of nodes

`all_pairs_dijkstra(graph) -> (all_distances, all_paths)`

Runs Dijkstra from each node as source. Returns lists of distance and path dictionaries.

`all_pairs_astar(graph, coords_map) -> (all_distances, all_paths)`

Runs A* for all pairs of nodes. Requires node coordinates for heuristic.

`all_pairs_bellman_ford(graph) -> (all_distances, all_paths)`

Runs Bellman-Ford from each node as source. Handles negative weights.

In [None]:
def all_pairs_dijkstra(graph: nx.DiGraph) -> Tuple[List[Dict], List[Dict]]:

    all_paths = []
    all_distances = []
    
    for source in graph.nodes:
        distances, predecessors = dijkstra(graph, source)
        source_paths = {}
        source_distances = {}
        
        for target in graph.nodes:
            if distances[target] != np.inf:
                path = path_reconstruct(predecessors, target)
                source_paths[target] = path
                source_distances[target] = distances[target]
            else:
                source_paths[target] = None
                source_distances[target] = np.inf
        
        all_distances.append(source_distances)
        all_paths.append(source_paths)
        
    return all_distances, all_paths

def all_pairs_astar(graph: nx.DiGraph, coords_map: np.ndarray) -> Tuple[List[Dict], List[Dict]]:

    all_paths = []
    all_distances = []
    
    for source in graph.nodes:
        source_paths = {}
        source_distances = {}
        
        for target in graph.nodes:
            path, distance = a_star(graph, source, target, coords_map)
            source_paths[target] = path
            source_distances[target] = distance
        
        all_distances.append(source_distances)
        all_paths.append(source_paths)
        
    return all_distances, all_paths

def all_pairs_bellman_ford(graph: nx.DiGraph) -> Tuple[List[Dict], List[Dict]]:

    all_paths = []
    all_distances = []
    
    for source in graph.nodes:
        distances, predecessors = bellman_ford(graph, source)

        if distances is None:
            source_paths = {target: None for target in graph.nodes}
            source_distances = {target: None for target in graph.nodes}
        else:
            source_paths = {}
            source_distances = {}
            
            for target in graph.nodes:
                if distances[target] != np.inf:
                    path = path_reconstruct(predecessors, target)
                    source_paths[target] = path
                    source_distances[target] = distances[target]
                else:
                    source_paths[target] = None
                    source_distances[target] = np.inf
        
        all_distances.append(source_distances)
        all_paths.append(source_paths)
        
    return all_distances, all_paths

# Problem Execution

`run_problem(size, density, noise_level, negative_values, save_results)`

Solves a problem with all appropriate algorithms. Automatically chooses Bellman-Ford if negative edges exist, otherwise runs Dijkstra and A*. Saves results to CSV.

In [None]:
def run_problem(size: int, density: float, noise_level: float, negative_values: bool, save_results: bool = True):

    print(f"Running: size={size}, density={density}, noise={noise_level}, neg={negative_values}")
    
    problem, coords_map = create_problem(
        size,
        density=density,
        noise_level=noise_level,
        negative_values=negative_values,
    )

    masked = np.ma.masked_array(problem, mask=np.isinf(problem))
    G = nx.from_numpy_array(masked, create_using=nx.DiGraph)

    has_neg_edges = has_negative_edges(G)

    result_for_csv = []
    
    if not has_neg_edges:
        print("  Running Dijkstra...")
        start_time = time.time()
        dijkstra_distances, dijkstra_paths = all_pairs_dijkstra(G)
        dijkstra_time = time.time() - start_time
        
        print("  Running A*...")
        start_time = time.time()
        astar_distances, astar_paths = all_pairs_astar(G, coords_map)
        astar_time = time.time() - start_time
        
        bford_distances, bford_paths = None, None
        bford_time = None
        
        print(f"  Dijkstra: {dijkstra_time:.4f}s | A*: {astar_time:.4f}s")
    else:
        print("  Running Bellman-Ford (negative edges detected)...")
        start_time = time.time()
        bford_distances, bford_paths = all_pairs_bellman_ford(G)
        bford_time = time.time() - start_time
        
        dijkstra_distances, dijkstra_paths = None, None
        astar_distances, astar_paths = None, None
        
        print(f"  Bellman-Ford: {bford_time:.4f}s")

    if save_results:
        for source in range(size):
            for target in range(size):
                result_for_csv.append({
                    "source": source,
                    "target": target,
                    "dijkstra_distance": dijkstra_distances[source][target] if not has_neg_edges else None,
                    "astar_distance": astar_distances[source][target] if not has_neg_edges else None,
                    "bford_distance": bford_distances[source][target] if has_neg_edges else None,
                    "dijkstra_path": str(dijkstra_paths[source][target]) if not has_neg_edges else None,
                    "astar_path": str(astar_paths[source][target]) if not has_neg_edges else None,
                    "bford_path": str(bford_paths[source][target]) if has_neg_edges else None,
                })
        
        df = pd.DataFrame(result_for_csv)
        filename = f"ResultsCSV/metrics_size{size}_density{density}_noise{noise_level}_neg{negative_values}.csv"
        df.to_csv(filename, index=False)
        print(f"  Saved: {filename}\n")

# Configurations

**sizes**: Dimensions of test problems (10 to 1000 nodes)

**densities**: Graph sparsity levels (0.2 = sparse, 1.0 = complete graph)

**noise_levels**: Perturbation applied to base Euclidean distances

**negative_values_options**: Toggle negative edge weights

In [None]:
sizes = [10, 20, 50, 100, 200, 500, 1000]
densities = [0.2, 0.5, 0.8, 1.0]
noise_levels = [0.0, 0.1, 0.5, 0.8]
negative_values_options = [False, True]

In [None]:
print("=" * 80)
print("All tests execution")
print("=" * 80 + "\n")

total_configs = len(sizes) * len(densities) * len(noise_levels) * len(negative_values_options)
current = 0

for size in sizes:
    for density in densities:
        for noise_level in noise_levels:
            for negative_values in negative_values_options:
                current += 1
                print(f"[{current}/{total_configs}]", end=" ")
                run_problem(size, density, noise_level, negative_values)

print("=" * 80)
print("Execution completed")
print("=" * 80)