In [2]:
from itertools import product, combinations
import numpy as np
import networkx as nx
from icecream import ic

In [3]:
def create_problem(
    size: int,
    *,
    density: float = 1.0,
    negative_values: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> np.ndarray:
    """Problem generator for Lab3"""
    rng = np.random.default_rng(seed)
    map = rng.random(size=(size, 2))
    problem = rng.random((size, size))
    if negative_values:
        problem = problem * 2 - 1
    problem *= noise_level
    for a, b in product(range(size), repeat=2):
        if rng.random() < density:
            problem[a, b] += np.sqrt(
                np.square(map[a, 0] - map[b, 0]) + np.square(map[a, 1] - map[b, 1])
            )
        else:
            problem[a, b] = np.inf
    np.fill_diagonal(problem, 0)
    return (problem * 1_000).round()

In [4]:
problem = create_problem(10, density=0.15, noise_level=10, negative_values=False)

In [5]:
masked = np.ma.masked_array(problem, mask=np.isinf(problem))
G = nx.from_numpy_array(masked, create_using=nx.DiGraph)

In [None]:
for s, d in combinations(range(problem.shape[0]), 2):
    try:
        path = nx.shortest_path(G, s, d, weight='weight')
        cost = cost = nx.path_weight(G, path, weight='weight')
    except nx.NetworkXNoPath:
        path = None
        cost = np.inf
    ic(s, d, path, cost)
None

__LAB_3__

__Configuration__

In [27]:
sizes = [10, 20, 50, 100, 200, 500, 1000]
densities = [0.2, 0.5, 0.8, 1.0]
noise_levels = [0.0, 0.1, 0.5, 0.8]
negative_values = [False, True]

__Dijkstra__

In [8]:
def dijkstra(graph, start_node):
    """
    Calculates the shortest path from start_node to all other nodes.
    Returns:
      - distances: dict {node: cost}
      - predecessors: dict {node: previous_node} (used to reconstruct the path)
    """
    # Initialization
    distances = {node: float('inf') for node in graph}
    distances[start_node] = 0
    predecessors = {node: None for node in graph}
    
    # Priority Queue: stores tuples (distance, node)
    pq = [(0, start_node)]
    
    while pq:
        current_dist, current_node = heapq.heappop(pq)
        
        # If a shorter path to this node was already found, skip
        if current_dist > distances[current_node]:
            continue
        
        # Edge relaxation
        for neighbor, weight in graph[current_node].items():
            distance = current_dist + weight
            
            # If a better (shorter) path is found
            if distance < distances[neighbor]:
                distances[neighbor] = distance
                predecessors[neighbor] = current_node
                heapq.heappush(pq, (distance, neighbor))
                
    return distances, predecessors

__Belman-Ford__

In [9]:
def bellman_ford(graph, start_node):
    """
    Manual implementation of Bellman-Ford.
    graph: adjacency dictionary {u: {v: weight}}
    start_node: starting node
    Returns: distances, predecessors
    Raises: ValueError if a negative cycle is found
    """
    # 1. Initialization
    distances = {node: float('inf') for node in graph}
    distances[start_node] = 0
    predecessors = {node: None for node in graph}
    num_nodes = len(graph)

    # 2. Repeated relaxation (V-1 times)
    # In each iteration, try to improve paths using every edge
    for _ in range(num_nodes - 1):
        changes = False # Optimization: if nothing changes in a round, we can exit early
        
        for u in graph: # For each source node u
            # If u hasn't been reached yet, it can't be used to reach v
            if distances[u] == float('inf'):
                continue
            
            for v, weight in graph[u].items(): # For each neighbor v of u
                if distances[u] + weight < distances[v]:
                    distances[v] = distances[u] + weight
                    predecessors[v] = u
                    changes = True
        
        if not changes:
            break # Optimal paths already found

    # 3. Negative Cycle Check (V-th iteration)
    # If a path can still be shortened, there is an infinite negative loop
    for u in graph:
        if distances[u] == float('inf'):
            continue
        for v, weight in graph[u].items():
            if distances[u] + weight < distances[v]:
                raise ValueError("Negative Cycle Detected")

    return distances, predecessors

__Utility__

In [10]:
def matrix_to_adj_list(matrix):
    """Converts the numpy matrix into a dictionary {node: {neighbor: weight}}"""
    graph = {}
    rows, cols = matrix.shape
    for i in range(rows):
        graph[i] = {}
        for j in range(cols):
            # Ignore self-loops and infinite weights (no connection)
            if i != j and not np.isinf(matrix[i, j]):
                graph[i][j] = matrix[i, j]
    return graph


def reconstruct_path(predecessors, target):
    """Reconstructs the path backwards from the predecessors"""
    if predecessors[target] is None:
        return None
    path = []
    curr = target
    while curr is not None:
        path.append(curr)
        curr = predecessors[curr]
    return path[::-1] # Reverse to get start -> end

__Uninformed strategy__

In [None]:
from itertools import product
import numpy as np
import networkx as nx
import heapq
import csv

filename = "uninformed_results.csv"

print(f"Processing started. Data will be saved in: {filename}")

# Open the file in write mode ('w')
with open(filename, mode='w', newline='', encoding='utf-8') as csv_file:
    # Create the writer object
    writer = csv.writer(csv_file, delimiter= ';')
    
    # --- MAIN LOOP ---
    # product automatically creates all possible combinations
    for size, density, noise, neg in product(sizes, densities, noise_levels, negative_values):

        print(f"Processing: Size={size}, Density={density}, Noise={noise}, Neg={neg}")
        
        # 1. Write the column HEADER
        # (Note: Writing the header inside the loop will repeat it for every configuration)
        header = [{'Size': size}, {'Density': density}, {'Noise': noise}, {'Negative': neg}]
        writer.writerow(header)
        row = ['Source', 'Dest', 'Cost', 'Path']
        writer.writerow(row)

        try:
            # Problem generation
            problem_matrix = create_problem(size, density=density, noise_level=noise, negative_values=neg)
            adj_graph = matrix_to_adj_list(problem_matrix)
            nodes = list(adj_graph.keys())

            # Calculate All-Pairs Shortest Path by running Dijkstra N times
            for source in nodes:

                algo_name = "Dijkstra"
                
                try:
                    # --- ALGORITHM SELECTION ---
                    if not neg:
                        # Positive Case -> Dijkstra
                        dists, preds = dijkstra(adj_graph, source)
                    else:
                        # Negative Case -> Bellman-Ford
                        algo_name = "Bellman-Ford"
                        dists, preds = bellman_ford(adj_graph, source)
                
                    for dest in nodes:
                        if source == dest: 
                            continue
                            
                        cost = dists[dest]
                        path = reconstruct_path(preds, dest)
                        
                        # Output formatting
                        cost_str = f"{cost:.1f}" if cost != float('inf') else "Inf"
                        path_str = str(path) if path else "None"
                        
                        # print(f"{source:^10} | {dest:^12} | {cost_str:^10} | {path_str}")
                        row = [source, dest, cost_str, path_str]
                        writer.writerow(row)

                except ValueError:
                    # Catch the "Negative Cycle" error raised by bellman_ford_custom
                    # Write a special row to indicate it
                    for dest in nodes:
                        if source == dest: continue
                        row = [size, density, noise, neg, source, dest, "-Inf", "Negative Cycle Detected", algo_name]
                        writer.writerow(row)
            
            print(f"Completed for Size={size}, Density={density}, Noise={noise}, Negative={neg}")
            
        except Exception as e:
            print(f"Unexpected error in config {size}/{neg}: {e}")

print("Finished! File generated successfully.")

__A*__

In [11]:
def get_hidden_coords(size: int, seed: int = 42) -> np.ndarray:
    """
    Reconstructs the coordinates internally used by create_problem.
    Works because the seed makes the generation deterministic.
    """
    rng = np.random.default_rng(seed)
    # This is the first call made inside create_problem, so 
    # it returns exactly the same map!
    map_coords = rng.random(size=(size, 2))
    return map_coords

def heuristic(node_u, node_v, coords):
    # Euclidean Distance * 1000 (to align with the problem's weight scale)
    dist = np.sqrt(np.sum((coords[node_u] - coords[node_v])**2))
    return dist * 1000

def a_star(graph, start, goal, coords):
    """
    Standard A* algorithm.
    Returns: (total_cost, path_list)
    """
    # Priority queue: (f_score, current_node)
    open_set = [(0, start)]
    
    # Where we came from (to reconstruct the path)
    came_from = {}
    
    # g_score: Real cost from start to node n
    g_score = {node: float('inf') for node in graph}
    g_score[start] = 0
    
    # f_score: g_score + heuristic
    f_score = {node: float('inf') for node in graph}
    f_score[start] = heuristic(start, goal, coords)
    
    visited = set()

    while open_set:
        # Get the node with the lowest f_score
        current_f, current = heapq.heappop(open_set)
        
        if current == goal:
            # Found! Reconstruct the path
            path = [current]
            while current in came_from:
                current = came_from[current]
                path.append(current)
            return g_score[goal], path[::-1] # Return cost and reversed path
        
        if current in visited:
            continue
        visited.add(current)
        
        # Explore neighbors
        for neighbor, weight in graph[current].items():
            tentative_g = g_score[current] + weight
            
            if tentative_g < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g
                f = tentative_g + heuristic(neighbor, goal, coords)
                f_score[neighbor] = f
                heapq.heappush(open_set, (f, neighbor))
                
    return float('inf'), None # No path found

__Johnson__

In [12]:
def get_johnson_potentials(graph):
    """
    Calculates the potentials h(v) using Bellman-Ford.
    Used to remove negative weights through reweighting.
    """
    # 1. Initialize h[v] = 0 for all nodes
    # (Equivalent to having a dummy source node connected to all nodes with weight 0)
    h = {node: 0 for node in graph}
    
    num_nodes = len(graph)
    
    # 2. Relaxation (V-1 times)
    for _ in range(num_nodes - 1):
        changes = False
        for u in graph:
            for v, weight in graph[u].items():
                if h[u] + weight < h[v]:
                    h[v] = h[u] + weight
                    changes = True
        if not changes:
            break
            
    # 3. Negative Cycle Check
    for u in graph:
        for v, weight in graph[u].items():
            if h[u] + weight < h[v]:
                raise ValueError("Negative Cycle")
                
    return h

def a_star_johnson(graph, start, goal, coords, potentials):
    """
    Runs A* on a virtually reweighted graph (Johnson's logic).
    """
    
    # --- STANDARD A* LOGIC ---
    open_set = [(0, start)]
    came_from = {}
    
    # g_score and f_score use REWEIGHTED weights
    g_score = {node: float('inf') for node in graph}
    g_score[start] = 0
    
    f_score = {node: float('inf') for node in graph}
    # The heuristic is fine as is (Euclidean distance)
    f_score[start] = heuristic(start, goal, coords)
    
    visited = set()

    while open_set:
        current_f, current = heapq.heappop(open_set)
        
        if current == goal:
            # --- RECONSTRUCTION AND COST CORRECTION PHASE ---
            path = [current]
            curr = current
            while curr in came_from:
                curr = came_from[curr]
                path.append(curr)
            
            # g_score[goal] is the cost in the reweighted graph.
            # We must revert to the original cost:
            # OriginalCost = ReweightedCost - h(start) + h(goal)
            reweighted_cost = g_score[goal]
            real_cost = reweighted_cost - potentials[start] + potentials[goal]
            
            return real_cost, path[::-1]
        
        if current in visited: continue
        visited.add(current)
        
        for neighbor, old_weight in graph[current].items():
            # --- ON-THE-FLY REWEIGHTING HAPPENS HERE ---
            # New weight = OldWeight + h(u) - h(v)
            # This value is guaranteed to be >= 0 (if there are no negative cycles)
            new_weight = old_weight + potentials[current] - potentials[neighbor]
            
            tentative_g = g_score[current] + new_weight
            
            if tentative_g < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g
                f = tentative_g + heuristic(neighbor, goal, coords)
                f_score[neighbor] = f
                heapq.heappush(open_set, (f, neighbor))
                
    return float('inf'), None

__Informed strategy__

In [None]:
import csv

filename = "informed_results.csv"

with open(filename, mode='w', newline='', encoding='utf-8') as csv_file:
    writer = csv.writer(csv_file, delimiter=';')
    writer.writerow(['Size', 'Density', 'Noise', 'Negative', 'Source', 'Dest', 'Cost', 'Path', 'Algorithm'])

    for size, density, noise, neg in product(sizes, densities, noise_levels, negative_values):

        try:
            # 1. Problem Generation
            problem_matrix = create_problem(size, density=density, noise_level=noise, negative_values=neg)
            adj_graph = matrix_to_adj_list(problem_matrix)
            nodes = list(adj_graph.keys())
            coords = get_hidden_coords(size, seed=42)

            # 2. PRE-PROCESSING (Only if potentials are needed)
            potentials = None
            algorithm_name = "A*"
            
            if neg:
                # If there are negative edges, calculate potentials (Johnson's Algo)
                # This might fail if there is a Negative Cycle
                
                try:
                    potentials = get_johnson_potentials(adj_graph)
                    algorithm_name = "Johnson + A*"
                except ValueError:
                    # If get_johnson_potentials raises an error, there is a negative cycle.
                    # Write error for all pairs and skip to the next config
                    for s in nodes:
                        for d in nodes:
                            if s==d: continue
                            writer.writerow([size, density, noise, neg, s, d, "-Inf", "Global Negative Cycle", "Bellman-Check"])
                    continue # Skip to the next iteration of product()

            # 3. PATH CALCULATION
            for source in nodes:
                for dest in nodes:
                    if source == dest: continue
                    
                    cost_str = "Inf"
                    path_str = "None"

                    if not neg:
                        # Simple case: positive weights -> Standard A*
                        # (We could pass an empty potentials dict, 
                        # but for cleanliness we use the standard function)
                        cost, path = a_star(adj_graph, source, dest, coords)
                    else:
                        # Complex case: negative weights -> A* with Johnson Reweighting
                        # Pass the potentials calculated earlier
                        cost, path = a_star_johnson(adj_graph, source, dest, coords, potentials)
                    
                    if path:
                        cost_str = f"{cost:.1f}"
                        path_str = str(path)

                    writer.writerow([size, density, noise, neg, source, dest, cost_str, path_str, algorithm_name])

        except Exception as e:
            print(f"Critical error: {e}")

print("Finished! File generated successfully.")

__Some stats...__

In [None]:
import time
import heapq

def dijkstra_p2p_stats(graph, start, target):
    """
    Point-to-Point Dijkstra modified to return statistics.
    """
    t_start = time.perf_counter()
    
    pq = [(0, start)]
    distances = {node: float('inf') for node in graph}
    distances[start] = 0
    predecessors = {node: None for node in graph}
    visited_count = 0
    
    while pq:
        d, u = heapq.heappop(pq)
        
        visited_count += 1 # Visited a node
        
        if u == target:
            t_end = time.perf_counter()
            return distances[target], reconstruct_path(predecessors, target), t_end - t_start, visited_count
        
        if d > distances[u]: continue
        
        for v, weight in graph[u].items():
            if distances[u] + weight < distances[v]:
                distances[v] = distances[u] + weight
                predecessors[v] = u
                heapq.heappush(pq, (distances[v], v))
                
    t_end = time.perf_counter()
    return float('inf'), None, t_end - t_start, visited_count

def a_star_stats(graph, start, target, coords):
    """
    Modified A* to return statistics.
    """
    t_start = time.perf_counter()
    
    open_set = [(0, start)]
    came_from = {}
    g_score = {node: float('inf') for node in graph}
    g_score[start] = 0
    f_score = {node: float('inf') for node in graph}
    f_score[start] = heuristic(start, target, coords)
    
    visited_set = set()
    visited_count = 0
    
    while open_set:
        _, current = heapq.heappop(open_set)
        
        if current in visited_set: continue
        
        visited_count += 1 # Visited a node
        visited_set.add(current)
        
        if current == target:
            # Path reconstruction
            path = [current]
            while current in came_from:
                current = came_from[current]
                path.append(current)
            t_end = time.perf_counter()
            return g_score[target], path[::-1], t_end - t_start, visited_count
        
        for neighbor, weight in graph[current].items():
            tentative_g = g_score[current] + weight
            if tentative_g < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g
                f = tentative_g + heuristic(neighbor, target, coords)
                heapq.heappush(open_set, (f, neighbor))
                
    t_end = time.perf_counter()
    return float('inf'), None, t_end - t_start, visited_count

In [None]:
from itertools import product
import time

# --- CONFIGURATIONS ---
sizes = [1000]
densities = [0.2]
noise_levels = [0.0]
negative_values_opts = [False] 

print("Benchmark Started (Console Output Only)")
print("-" * 130)
print(f"{'CONFIG':<22} | {'NODES (Dijkstra)':<18} | {'NODES (A*)':<12} | {'IMPROV %':<10} | {'TIME Dijkstra (s)':<18} | {'TIME A* (s)':<15}")
print("-" * 130)

for size, density, noise, neg in product(sizes, densities, noise_levels, negative_values_opts):

    sum_u_nodes = 0
    sum_i_nodes = 0
    sum_u_time = 0
    sum_i_time = 0
    count_paths = 0
    
    # Problem Generation
    print(f"Generating stats...", end='\r') 
    
    problem_matrix = create_problem(size, density=density, noise_level=noise, negative_values=neg)
    adj_graph = matrix_to_adj_list(problem_matrix)
    nodes = list(adj_graph.keys())
    coords = get_hidden_coords(size, seed=42)

    # Loop over pairs
    for source in nodes:
        for dest in nodes:
            if source == dest: continue
            
            # Run Algorithms
            u_cost, u_path, u_time, u_nodes = dijkstra_p2p_stats(adj_graph, source, dest)
            i_cost, i_path, i_time, i_nodes = a_star_stats(adj_graph, source, dest, coords)
            
            if u_path is None: continue 

            # Update Totals
            sum_u_nodes += u_nodes
            sum_i_nodes += i_nodes
            sum_u_time += u_time
            sum_i_time += i_time
            count_paths += 1

    if count_paths > 0:
        avg_u_nodes = sum_u_nodes / count_paths
        avg_i_nodes = sum_i_nodes / count_paths
        avg_u_time = sum_u_time / count_paths
        avg_i_time = sum_i_time / count_paths
        
        avg_improv = 0.0
        if avg_u_nodes > 0:
            avg_improv = ((avg_u_nodes - avg_i_nodes) / avg_u_nodes) * 100
        
        config_str = f"S={size}, D={density}, N={noise}"
        
        print(f"\r{config_str:<22} | {avg_u_nodes:<18.1f} | {avg_i_nodes:<12.1f} | {avg_improv:<10.1f} | {avg_u_time:<18.6f} | {avg_i_time:<15.6f}")
    else:
        print(f"\rS={size}, D={density}, N={noise}... No path found.")

print("-" * 130)
print("Benchmark Completed.")