In [36]:
from itertools import product, combinations
import numpy as np
import networkx as nx
from icecream import ic
from typing import Dict, List, Tuple, Any
import time
import json
import os
from tqdm import tqdm
import heapq
import random

In [3]:
SIZE = [10, 20, 50, 100, 200, 500, 1000]
DENSITY = [0.2, 0.5, 0.8, 1.0]
NOISE_LEVEL = [0.0, 0.1, 0.5, 0.8]
NEGATIVE_VALUES = [False, True]

## Code to create the problem instance

In [4]:
def create_problem(
    size: int,
    *,
    density: float = 1.0,
    negative_values: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> np.ndarray:
    """Problem generator for Lab3"""
    rng = np.random.default_rng(seed)
    map = rng.random(size=(size, 2))
    problem = rng.random((size, size))
    if negative_values:
        problem = problem * 2 - 1
    problem *= noise_level
    for a, b in product(range(size), repeat=2):
        if rng.random() < density:
            problem[a, b] += np.sqrt(
                np.square(map[a, 0] - map[b, 0]) + np.square(map[a, 1] - map[b, 1])
            )
        else:
            problem[a, b] = np.inf
    np.fill_diagonal(problem, 0)
    return (problem * 1_000).round(), map

## Utility functions

In [None]:
def euclidean_heuristic(u: int, v: int, pos_map: np.ndarray) -> float:
    #pos_map[u] gives the (x, y) coordinates for node u
    distance = np.linalg.norm(pos_map[u] - pos_map[v])
    return distance

def _reconstruct_path(came_from: Dict[int, int], current: int) -> List[int]:
    """Reconstructs the path from the 'came_from' dictionary."""
    path = [current]
    while current in came_from:
        current = came_from[current]
        path.append(current)
    return path[::-1] #Reverse to get path from start to goal


## A* algorithm

In [None]:
def a_star_search(graph: nx.DiGraph, start_node: int, goal_node: int, pos_map: np.ndarray) -> Tuple[List[int], float, int]:
    """
    Finds the shortest path between start_node and goal_node using A* search.

    Args:
        graph: The NetworkX DiGraph object (with 'weight' attributes).
        start_node: The source node.
        goal_node: The destination node.
        pos_map: The (N, 2) array of node coordinates for the heuristic.

    Returns:
        A tuple: (list of nodes in the path, total path cost, nodes_explored)
    """

    #g(n): Actual cost from start to n
    g_score: Dict[int, float] = {node: float('inf') for node in graph.nodes}
    g_score[start_node] = 0.0

    #f(n) = g(n) + h(n): Estimated total cost from start through n to goal
    f_score: Dict[int, float] = {node: float('inf') for node in graph.nodes}
    #Initial f_score = 0 (g(start)) + h(start)
    f_score[start_node] = euclidean_heuristic(start_node, goal_node, pos_map)

    #Dictionary to store the cheapest path found so far to reach a node
    came_from: Dict[int, int] = {}

    #The set of discovered nodes that may need to be explored.
    open_set: Dict[int, float] = {start_node: f_score[start_node]}

    #Counter for explored nodes
    nodes_explored = 0

    while open_set:
        #Get the node with the lowest f_score (Best-First Search component)
        current_node = min(open_set, key=open_set.get)
        nodes_explored += 1

        if current_node == goal_node:
            #Goal reached! Reconstruct and return the path.
            return _reconstruct_path(came_from, current_node), g_score[goal_node], nodes_explored

        #Remove current from the open set
        del open_set[current_node]

        #Explore neighbors
        for neighbor in graph.neighbors(current_node):
            weight = graph.get_edge_data(current_node, neighbor)['weight']

            #Tentative g_score is the cost to reach current + cost to reach neighbor
            tentative_g_score = g_score[current_node] + weight

            if tentative_g_score < g_score[neighbor]:
                #This path to neighbor is better than any previous one.
                came_from[neighbor] = current_node
                g_score[neighbor] = tentative_g_score
                f_score[neighbor] = tentative_g_score + euclidean_heuristic(neighbor, goal_node, pos_map)

                if neighbor not in open_set:
                    open_set[neighbor] = f_score[neighbor]

    #If the loop finishes without reaching the goal
    return [], float('inf'), nodes_explored

## Reduced Cost A*

In [None]:
def bellman_ford_potential(graph: nx.DiGraph, nodes: List[int]) -> Tuple[np.ndarray, bool]:
    """
    With this version we still have a dummy source but it is connected to everyone.
    (done by initializing all nodes to 0.0)
    """
    num_nodes = len(nodes)
    
    # FIX: initialize everyone to 0.0 instead of infinity.
    # by ensuring that disconnected nodes are active immediately, we catch all negative cycles.
    potentials = {node: 0.0 for node in nodes}

    #Relax edges to check for negative cycles.
    for i in range(num_nodes): 
        is_last_pass = (i == num_nodes - 1)
        changes_made = False
        
        for u, v, data in graph.edges(data=True):
            weight = data['weight']
            
            # relaxation Logic
            if potentials[u] + weight < potentials[v]:
                if is_last_pass:
                    # if we update on the Nth pass, there is a negative cycle
                    return np.array([]), True
                
                potentials[v] = potentials[u] + weight
                changes_made = True
            
            # optimization: If nothing changed this pass, we can stop early
            if not changes_made:
                break
    
    #No negative cycle detected
    potential_array = np.array([potentials[node] for node in nodes])
    return potential_array, False

def reduced_cost_a_star_search(
    graph: nx.DiGraph,
    start_node: int,
    goal_node: int,
    pos_map: np.ndarray,
    p_map: np.ndarray #The potential function array (p(v))
) -> Tuple[List[int], float, int]:
    """Finds the shortest path using A* on reduced costs."""
    
    nodes = list(graph.nodes)
    node_to_index = {node: i for i, node in enumerate(nodes)}

    g_score_prime: Dict[int, float] = {node: float('inf') for node in graph.nodes}
    g_score_prime[start_node] = 0.0
    came_from: Dict[int, int] = {}
    nodes_explored = 0

    #Heap stores tuples: (f_score_prime, node)
    open_set_heap = []
    
    #f'(n) = g'(n) + h'(n) where h'(n) = h(n) + p(n)
    h_prime_start = euclidean_heuristic(start_node, goal_node, pos_map) + p_map[node_to_index[start_node]]
    initial_f_score = g_score_prime[start_node] + h_prime_start

    heapq.heappush(open_set_heap, (initial_f_score, start_node))

    DEBUG_LIMIT = 500000 
    pop_count = 0

    while open_set_heap:
        pop_count += 1

        if pop_count > DEBUG_LIMIT:
            print(f"!!! DEBUG BREAK: Exceeded pop limit ({DEBUG_LIMIT}) for path {start_node} -> {goal_node}. Abandoning search.")
            break

        #Get the node with the lowest f_score (O(log V) operation)
        f_score_current, current_node = heapq.heappop(open_set_heap)

        #Guard against stale entries (nodes pushed multiple times with higher costs)
        if f_score_current > g_score_prime[current_node] + euclidean_heuristic(current_node, goal_node, pos_map) + p_map[node_to_index[current_node]]:
            continue #Skip this stale entry

        nodes_explored += 1
        
        if current_node == goal_node:
            path = _reconstruct_path(came_from, current_node)
            #The true cost D(s, d) is recovered: D'(s, d) - p(s) + p(d)
            start_index = node_to_index[start_node]
            goal_index = node_to_index[goal_node]
            true_cost = g_score_prime[goal_node] - p_map[start_index] + p_map[goal_index]
            return path, true_cost, nodes_explored


        for neighbor in graph.neighbors(current_node):
            original_cost = graph.get_edge_data(current_node, neighbor)['weight']
            
            #Calculate the Reduced Cost of the edge
            u_idx = node_to_index[current_node]
            v_idx = node_to_index[neighbor]
            reduced_cost = original_cost + p_map[u_idx] - p_map[v_idx]
            
            tentative_g_score_prime = g_score_prime[current_node] + reduced_cost
            
            if tentative_g_score_prime < g_score_prime[neighbor]:
                came_from[neighbor] = current_node
                g_score_prime[neighbor] = tentative_g_score_prime
                
                #Calculate f' score
                h_prime_n = euclidean_heuristic(neighbor, goal_node, pos_map) + p_map[v_idx]
                f_score_prime = tentative_g_score_prime + h_prime_n

                #Push the neighbor onto the heap (O(log V) operation)
                heapq.heappush(open_set_heap, (f_score_prime, neighbor))

    return [], float('inf'), nodes_explored

## Approximation for large problem size
used for the last 55 runs (approximately starting with size 500)

In [46]:
def hub_based_apsp_approximation(
    G: nx.DiGraph,
    nodes: List[int],
    pos_map: np.ndarray,
    p_map: np.ndarray,
    k: int = 5  #Number of hubs to select
) -> Tuple[Dict[str, Any], int]:
    """
    Approximates All-Pairs Shortest Path using k hubs.
    
    Returns: (apsp_paths_approximation, total_nodes_explored)
    """
    
    #Hub Selection (Random Sampling)
    #Ensure k is not larger than the total number of nodes
    num_hubs = min(k, len(nodes))
    hubs = random.sample(nodes, num_hubs)
    
    #Stores the distances D(u, h) - Shortest path from u to hub h
    dist_to_hubs = {h: {} for h in hubs}
    #Stores the distances D(h, v) - Shortest path from hub h to v
    dist_from_hubs = {h: {} for h in hubs}
    
    total_nodes_explored = 0
    
    #SSSP calculations (2k runs of A* total)
    
    #Calculate D(h, v) for all h in hubs (k runs)
    for h in hubs:
        #Run A* from hub h to all other nodes v
        for v in nodes:
            if h == v:
                dist_from_hubs[h][v] = 0.0
                continue
                
            #Reuse single-source A* function
            _, cost_hv, nodes_explored_hv = reduced_cost_a_star_search(G, h, v, pos_map, p_map)
            dist_from_hubs[h][v] = cost_hv
            total_nodes_explored += nodes_explored_hv

    #Calculate D(u, h) for all h in hubs (k runs)
    for h in hubs:
        #Run A* from all other nodes u to hub h
        for u in nodes:
            if h == u:
                dist_to_hubs[h][u] = 0.0
                continue
            
            _, cost_uh, nodes_explored_uh = reduced_cost_a_star_search(G, u, h, pos_map, p_map)
            dist_to_hubs[h][u] = cost_uh
            total_nodes_explored += nodes_explored_uh

    #Final approximation for all pairs
    apsp_paths = {}
    
    for s, d in combinations(nodes, 2):
        #Calculate min_h (D(s, h) + D(h, d))
        min_approx_cost_sd = float('inf')
        min_approx_cost_ds = float('inf')
        
        for h in hubs:
            #Approximation for s -> d
            D_sh = dist_to_hubs[h].get(s, float('inf'))
            D_hd = dist_from_hubs[h].get(d, float('inf'))
            min_approx_cost_sd = min(min_approx_cost_sd, D_sh + D_hd)
            
            #Approximation for d -> s
            D_dh = dist_to_hubs[h].get(d, float('inf'))
            D_hs = dist_from_hubs[h].get(s, float('inf'))
            min_approx_cost_ds = min(min_approx_cost_ds, D_dh + D_hs)

        #Store results (path is unavailable in this method, only cost)
        apsp_paths[f"{s}->{d}"] = {"path": None, "cost": min_approx_cost_sd, "cycle": False}
        apsp_paths[f"{d}->{s}"] = {"path": None, "cost": min_approx_cost_ds, "cycle": False}
        
    return apsp_paths, total_nodes_explored

## Run for all combinations

In [47]:
def run_experiment_and_save(
    problem_matrix: np.ndarray, 
    pos_map: np.ndarray, 
    instance_params: Dict[str, Any],
    output_filename: str,
    approximation_threshold: int = 200, #Use approximation for N > 200
    num_hubs: int = 5,
):
    """
    Runs A* APSP, collects all metrics, and saves the results to a file.
    """
    
    #Setup Graph
    masked = np.ma.masked_array(problem_matrix, mask=np.isinf(problem_matrix))
    G = nx.from_numpy_array(masked, create_using=nx.DiGraph)
    nodes = list(G.nodes())
    N = len(nodes)

    #Calculate potentials (p_map) and check for negative cycles
    p_map, cycle_detected = bellman_ford_potential(G, nodes)
    
    #Run A* and Time Measurement
    
    apsp_paths = {}
    total_nodes_explored = 0
    total_time = 0

    #Determine whether to use exact A* or the approximation
    is_approximation = N > approximation_threshold
    
    if cycle_detected:
        #NEGATIVE CYCLE DETECTED
        
        #Save placeholder results for all pairs in this instance
        for s, d in combinations(nodes, 2):
            #Path s -> d and d -> s
            apsp_paths[f"{s}->{d}"] = {"path": None, "cost": -float('inf'), "cycle": True}
            apsp_paths[f"{d}->{s}"] = {"path": None, "cost": -float('inf'), "cycle": True}
        
    elif is_approximation:
        #APPROXIMATION MODE (for large N)
        start_time = time.time()
        
        apsp_paths, total_nodes_explored = hub_based_apsp_approximation(
            G, nodes, pos_map, p_map, k=num_hubs
        )
        
        end_time = time.time()
        total_time = end_time - start_time

    else:
        #NO CYCLE: Run Reduced Cost A*
        start_time = time.time()
        
        for s, d in combinations(nodes, 2):
            #Path s -> d
            path_sd, cost_sd, nodes_sd = reduced_cost_a_star_search(G, s, d, pos_map, p_map)
            apsp_paths[f"{s}->{d}"] = {"path": path_sd, "cost": cost_sd, "cycle": False}
            total_nodes_explored += nodes_sd
            
            #Path d -> s
            path_ds, cost_ds, nodes_ds = reduced_cost_a_star_search(G, d, s, pos_map, p_map)
            apsp_paths[f"{d}->{s}"] = {"path": path_ds, "cost": cost_ds, "cycle": False}
            total_nodes_explored += nodes_ds

        end_time = time.time()
        total_time = end_time - start_time

    #Data Structuring and Saving
    
    experiment_data = {
        "parameters": instance_params,
        "metrics": {
            "total_time_seconds": total_time,
            "total_nodes_explored": total_nodes_explored,
            "number_of_pairs": N * (N - 1),
            "negative_cycle_detected": cycle_detected
        },
        #Store the matrix data so to can recreate the graph later
        "problem_matrix": problem_matrix.tolist(), 
        "node_coordinates": pos_map.tolist(), 
        "a_star_paths": apsp_paths,
    }

    #Append to a file containing multiple experimental runs
    #It's safest to append to a line-delimited JSON file (JSONL) for batch experiments
    with open(output_filename, 'a') as f:
        json.dump(experiment_data, f, default=lambda x: str(x) if isinstance(x, float) and abs(x) == float('inf') else x)
        f.write('\n')
    

#### More utility functions
The first part of the code computes all the combinations for the parameters

The second part is a function to recover already completed runs and restart the algorithm from new ones

In [None]:
EXPERIMENT_RUNS = []

#Use itertools.product to get every combination of the four lists
#The product will yield a tuple like (size, density, noise_level, negative_values)
for size, density, noise_level, negative_values in product(
    SIZE, DENSITY, NOISE_LEVEL, NEGATIVE_VALUES
):
    #Construct the dictionary for this specific run
    run_params = {
        'size': size,
        'density': density,
        'noise_level': noise_level,
        'negative_values': negative_values,
    }
    EXPERIMENT_RUNS.append(run_params)

print(f"Total number of experiment runs generated: {len(EXPERIMENT_RUNS)}")
#Print the first few runs to verify the structure
print("Example of the first 3 runs:")
for i in range(3):
    print(EXPERIMENT_RUNS[i])

#Resilience Check: Load Completed Runs

def load_completed_keys(filename: str) -> set:
    """Reads the JSONL file and returns a set of completed parameter tuples."""
    if not os.path.exists(filename):
        return set()
    
    completed = set()
    with open(filename, 'r') as f:
        for line in f:
            try:
                data = json.loads(line)
                p = data['parameters']
                #Create a hashable key for parameter matching
                key = (p['size'], p['density'], p['noise_level'], p['negative_values'])
                completed.add(key)
            except json.JSONDecodeError:
                #Skip corrupted lines
                continue
    return completed

Total number of experiment runs generated: 224
Example of the first 3 runs:
{'size': 10, 'density': 0.2, 'noise_level': 0.0, 'negative_values': False}
{'size': 10, 'density': 0.2, 'noise_level': 0.0, 'negative_values': True}
{'size': 10, 'density': 0.2, 'noise_level': 0.1, 'negative_values': False}


In [None]:
OUTPUT_FILE = "a_star_experiment_results.jsonl"
TOTAL_RUNS = len(EXPERIMENT_RUNS)

completed_keys = load_completed_keys(OUTPUT_FILE)

RESUMABLE_RUNS = []
for params in EXPERIMENT_RUNS:
    key = (params['size'], params['density'], params['noise_level'], params['negative_values'])
    if key not in completed_keys:
        RESUMABLE_RUNS.append(params)

print(f"Total defined runs: {TOTAL_RUNS}")
print(f"Completed runs found: {len(completed_keys)}")
print(f"Remaining runs to execute: {len(RESUMABLE_RUNS)}")
print("-" * 50)

pbar = tqdm(RESUMABLE_RUNS, desc="Running APSP Experiments")

for params in pbar:
    try:
        #Generate the problem instance
        matrix, coordinates = create_problem(**params)
        
        #Run the A* search and save the data
        run_experiment_and_save(
            problem_matrix=matrix,
            pos_map=coordinates,
            instance_params=params,
            output_filename=OUTPUT_FILE
        )
        
        #Update the progress bar status with current run details
        pbar.set_postfix_str(f"Size: {params['size']}, Neg: {params['negative_values']}")

    except Exception as e:
        #Crucial for resilience: Print error, but continue to the next run
        print(f"\n‚ÄºÔ∏è ERROR processing run: {params}. Error: {e}")
        continue

print("\n\nüöÄ All scheduled experiments complete!")

Total defined runs: 224
Completed runs found: 218
Remaining runs to execute: 6
--------------------------------------------------


Running APSP Experiments:   0%|          | 0/6 [1:06:52<?, ?it/s]


KeyboardInterrupt: 