In [2]:
from itertools import product, combinations
import numpy as np
import networkx as nx
from icecream import ic
from queue import PriorityQueue
from collections import deque
import time
import math
import statistics

In [19]:
sizes = [10, 20, 50, 100, 200, 500, 1000]
densities = [0.2, 0.5, 0.8, 1.0]
noise_levels = [0.0, 0.1, 0.5, 0.8]

In [3]:
def create_problem(
    size: int,
    *,
    density: float = 1.0,
    negative_values: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> np.ndarray:
    """Problem generator for Lab3"""
    rng = np.random.default_rng(seed)
    map = rng.random(size=(size, 2))
    problem = rng.random((size, size))
    if negative_values:
        problem = problem * 2 - 1
    problem *= noise_level
    for a, b in product(range(size), repeat=2):
        if rng.random() < density:
            problem[a, b] += np.sqrt(
                np.square(map[a, 0] - map[b, 0]) + np.square(map[a, 1] - map[b, 1])
            )
        else:
            problem[a, b] = np.inf
    np.fill_diagonal(problem, 0)
    return (problem * 1_000).round(), map

In [4]:
problem, map = create_problem(64, density=0.15, noise_level=10, negative_values=False)

In [5]:
masked = np.ma.masked_array(problem, mask=np.isinf(problem))
G = nx.from_numpy_array(masked, create_using=nx.DiGraph)

for i, (x,y) in enumerate(map):
    G.nodes[i]['pos'] = (x, y)

#### A* algorithm

I used the A* algorithm in order to balance out the exploration with Dijkstra and the optimal greedy direction.<br>
In order to use A* an optimal heuristic ```h(n)``` is needed. In order to say so the heuristic should never overestimate the real cost to reach a node, so I decided to use the euclidean distance as ```h(n)```. This way the 

In [15]:
def path_cost(G: nx.DiGraph, path: list[int]) -> float:
    return nx.path_weight(G, path, weight='weight')

def possible_actions(graph: nx.DiGraph, state: int) -> list[int]:
    return list(graph.neighbors(state))

def h(graph: nx.DiGraph, state: int, goal: int) -> float:
    x1, y1 = graph.nodes[state]['pos']
    x2, y2 = graph.nodes[goal]['pos']
    return 1000 * np.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)

def search(graph: nx.DiGraph, start: int, goal: int) -> tuple[list[int], float]:
    # A* search implementation
    distances = [ np.inf for node in graph.nodes]
    distances[start] = 0
    previous_nodes = [None for node in graph.nodes]

    frontier = PriorityQueue()
    frontier.put((0 + h(graph, start, goal), 0, start))

    while not frontier.empty():
        state = frontier.get()
        _, current_cost, current_node = state

        if current_cost > distances[current_node]:
            continue

        if current_node == goal:
            break

        for a in possible_actions(graph, current_node):
            w = graph[current_node][a]['weight']
            if distances[a] > current_cost + w:
                distances[a] = current_cost + w
                previous_nodes[a] = current_node
                frontier.put((distances[a] + h(graph, a, goal), distances[a], a))

    # if the node is not reachable   
    if distances[goal] == np.inf:
        return None, np.inf

    # reconstruct path
    path = []
    node = goal
    while node is not None:
        path.append(node)
        node = previous_nodes[node]
    path.reverse()
    return path, distances[goal]



#### SLF


In [None]:
def search_negative(graph: nx.DiGraph, start: int, goal: int) -> tuple[list[int], float]:
    # Bellman-Ford algorithm variation implementation with SLF
    n = graph.number_of_nodes()
    distances = [ np.inf for node in graph.nodes]
    previous_nodes = [None for node in graph.nodes]
    in_queue = [False for node in graph.nodes]
    relax_count = [0 for node in graph.nodes]
    d = deque([start])

    distances[start] = 0
    in_queue[start] = True
    relax_count[start] = 1

    while d:
        current_node = d.popleft()
        in_queue[current_node] = False

        for a in possible_actions(graph, current_node):
            w = graph[current_node][a]['weight']
            if distances[a] > distances[current_node] + w:
                distances[a] = distances[current_node] + w
                previous_nodes[a] = current_node
                if not in_queue[a]:
                    # compare distances[a] to te distance of the front node to decide where to insert
                    if d and distances[a] < distances[d[0]]:
                        d.appendleft(a)
                    else:
                        d.append(a) 
                    in_queue[a] = True
                relax_count[a] += 1
                if relax_count[a] > n:
                    return None, -np.inf  # Negative cycle detected
    
    # if the node is not reachable
    if distances[goal] == np.inf:
        return None, np.inf
    
    # reconstruct path
    path = []
    node = goal
    while node is not None:
        path.append(node)
        node = previous_nodes[node]
    path.reverse()
    return path, distances[goal]


In [20]:
output_path = "positive_paths_statistics.txt"
with open(output_path, "w") as fout:
    fout.write("size,density,noise_level,accuracy(%),average_optimal_time(s),average_time(s)\n")


for size in sizes:
    for density in densities:
        for noise_level in noise_levels:

            problem, map = create_problem(size, density=density, noise_level=noise_level, negative_values=False)

            masked = np.ma.masked_array(problem, mask=np.isinf(problem))
            G = nx.from_numpy_array(masked, create_using=nx.DiGraph)

            for i, (x,y) in enumerate(map):
                G.nodes[i]['pos'] = (x, y)

            # used for statistics
            tested = 0
            correct = 0
            times_opt = []
            times_my = []


            for s, d in combinations(range(problem.shape[0]), 2):
                tested += 1

                # optimal path computation

                t0 = time.perf_counter()    # used for timing

                try:
                    opt_cost = nx.bellman_ford_path_length(G, s, d, weight='weight')
                except nx.NetworkXNoPath:
                    opt_cost = np.inf
                except nx.NetworkXUnbounded:
                    opt_cost = -np.inf

                t1 = time.perf_counter()
                times_opt.append(t1 - t0)       


                
                # my algorithm computation
                
                t0 = time.perf_counter()    # used for timing

                path, cost = search_negative(G, s, d)

                t1 = time.perf_counter()
                times_my.append(t1 - t0)

                equal = False
                if cost is None:
                    equal = False
                elif cost == -np.inf and opt_cost == -np.inf:
                    equal = True
                elif math.isinf(cost) and math.isinf(opt_cost) and cost > 0 and opt_cost > 0:
                    equal = True
                elif abs(cost - opt_cost) < 1e-6:
                        equal = True

                if equal:
                    correct += 1

            accuracy = (correct / tested) * 100 if tested > 0 else 0.0
            avg_opt = statistics.mean(times_opt) if times_opt else float('nan')
            avg_my = statistics.mean(times_my) if times_my else float('nan')

            with open(output_path, "a") as fout:
                fout.write(f"{size},{density},{noise_level},{accuracy:.2f},{avg_opt:.8f},{avg_my:.8f}\n")

            
    
None

KeyboardInterrupt: 

In [None]:
output_path = "negative_paths_statistics.txt"
with open(output_path, "w") as fout:
    fout.write("size,density,noise_level,accuracy(%),average_optimal_time(s),average_time(s)\n")


for size in sizes:
    for density in densities:
        for noise_level in noise_levels:

            problem, map = create_problem(size, density=density, noise_level=noise_level, negative_values=True)

            masked = np.ma.masked_array(problem, mask=np.isinf(problem))
            G = nx.from_numpy_array(masked, create_using=nx.DiGraph)

            for i, (x,y) in enumerate(map):
                G.nodes[i]['pos'] = (x, y)

            # used for statistics
            tested = 0
            correct = 0
            times_opt = []
            times_my = []


            for s, d in combinations(range(problem.shape[0]), 2):
                tested += 1

                # optimal path computation

                t0 = time.perf_counter()    # used for timing

                try:
                    opt_cost = nx.bellman_ford_path_length(G, s, d, weight='weight')
                except nx.NetworkXNoPath:
                    opt_cost = np.inf
                except nx.NetworkXUnbounded:
                    opt_cost = -np.inf

                t1 = time.perf_counter()
                times_opt.append(t1 - t0)       


                
                # my algorithm computation
                
                t0 = time.perf_counter()    # used for timing

                path, cost = search_negative(G, s, d)

                t1 = time.perf_counter()
                times_my.append(t1 - t0)

                equal = False
                if cost is None:
                    equal = False
                elif cost == -np.inf and opt_cost == -np.inf:
                    equal = True
                elif math.isinf(cost) and math.isinf(opt_cost) and cost > 0 and opt_cost > 0:
                    equal = True
                elif abs(cost - opt_cost) < 1e-6:
                        equal = True

                if equal:
                    correct += 1

            accuracy = (correct / tested) * 100 if tested > 0 else 0.0
            avg_opt = statistics.mean(times_opt) if times_opt else float('nan')
            avg_my = statistics.mean(times_my) if times_my else float('nan')

            with open(output_path, "a") as fout:
                fout.write(f"{size},{density},{noise_level},{accuracy:.2f},{avg_opt:.8f},{avg_my:.8f}\n")

            
    
None