## Imports

In [4]:
from itertools import product
import numpy as np
import networkx as nx
import heapq
import random
from networkx.exception import NetworkXUnbounded, NetworkXNoPath
import time
from tqdm import tqdm
from itertools import product

## Problem generator

In [5]:
def create_problem(
    size: int,
    *,
    density: float = 1.0,
    negative_values: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> np.ndarray:
    """Problem generator for Lab3"""
    rng = np.random.default_rng(seed)
    map = rng.random(size=(size, 2))
    problem = rng.random((size, size))
    if negative_values:
        problem = problem * 2 - 1
    problem *= noise_level
    for a, b in product(range(size), repeat=2):
        if rng.random() < density:
            problem[a, b] += np.sqrt(
                np.square(map[a, 0] - map[b, 0]) + np.square(map[a, 1] - map[b, 1])
            )
        else:
            problem[a, b] = np.inf
    np.fill_diagonal(problem, 0)
    return (problem * 1_000).round(), map

## Dijkstra algorithm

In [6]:
# Dijkstra shortest-path algorithm
# parameters: problem (np.ndarray NxN with weights), start and goal (start and end nodes)
# return value: (min_cost, path) tuple, or (np.inf, None) tuple if there's no path
def dijkstra (problem, start, goal):
    N = problem.shape[0]
    dist = np.full(N, np.inf)
    dist[start] = 0.0
    parent = [None] * N
    pq = []
    heapq.heappush(pq, (0.0, start))
    while pq:
        curr_dist, u = heapq.heappop(pq)
        if curr_dist > dist[u]:
            continue
        for v in range(N):
            weight = problem[u, v]
            if np.isinf(weight) or u == v:
                continue
            new_dist = curr_dist + weight
            if new_dist < dist[v]:
                dist[v] = new_dist
                parent[v] = u
                heapq.heappush(pq, (new_dist, v))
    # reconstructing the path:
    if np.isinf(dist[goal]):
        return np.inf, None
    path = []
    u = goal
    while u is not None:
        path.append(u)
        u = parent[u]
    path = path[::-1]
    return dist[goal], path

## A* algorithm

In [7]:
# A* shortest-path algorithm
# parameters: problem (np.ndarray NxN with weights), start and goal (start and end nodes),
#             h (heuristic function)
# return value: (min_cost, path) tuple, or (np.inf, None) tuple if there's no path
def a_star (problem, start, goal, h):
    N = problem.shape[0]
    g = np.full(N, np.inf)
    g[start] = 0
    parent = {start: None}
    pq = []
    heapq.heappush(pq, (0 + h(start, goal), start))
    while pq:
        _, u = heapq.heappop(pq)
        if u == goal:
            # reconstructing the path:
            path = []
            cur = goal
            while cur is not None:
                path.append(cur)
                cur = parent[cur]
            path.reverse()
            return g[goal], path
        # explore neighbors:
        for v in range(N):
            weight = problem[u][v]
            if np.isinf(weight):
                continue   # no edge
            new_cost = g[u] + weight
            if new_cost < g[v]:
                g[v] = new_cost
                parent[v] = u
                f = new_cost + h(v, goal)
                heapq.heappush(pq, (f, v))
    # no path found:
    return np.inf, None


## Bellman-Ford algorithm

In [8]:
# support function for Bellman-Ford: gives you all the edges
# parameters: matrix (matrix with weights)
# return value: edges (array of all the edges)
def matrix_to_edges (matrix):
    edges = []
    n = len(matrix)
    for i in range (n):
        for j in range (n):
            if matrix[i][j] not in (None, float('inf')):
                edges.append((i, j, matrix[i][j]))
    return edges

# Bellman-Ford shortest-path algorithm
# parameters: n_nodes (number of nodes), edges (array of edges), source (source node)
# return values: dist (list of minimum distances), pred (predecessors to recreate the path), 
#                has_negative_cycle (boolean to indicate whether there were negative cycles or not)
def bellman_ford (n_nodes, edges, source):
    dist = [float('inf')] * n_nodes
    dist[source] = 0
    pred = [None] * n_nodes
    # Relax all edges for n_nodes-1 times
    for _ in range(n_nodes - 1):
        updated = False
        for u, v, w in edges:
            if dist[u] + w < dist[v]:
                dist[v] = dist[u] + w
                pred[v] = u
                updated = True
        if not updated:
            break
    # Check negative cycles:
    has_negative_cycle = False
    for u, v, w in edges:
        if dist[u] + w < dist[v]:
            has_negative_cycle = True
            break
    return dist, pred, has_negative_cycle



## Problem evaluator function

In [9]:
# Problem evaluator function
# Parameters: size (size of the problem -> num. of nodes), density (density of the edges),
#             noise_level, negative_values (boolean to indicate the presence/absence of negative values),
#             a_star_fn, bellman_ford_fn (A* and Bellman-Ford functions), heuristic_fn (heuristic function for A*),
#             n_sources, n_targets (number of source nodes and target nodes to consider for the evaluation)
# Functionality: tests my implementation of A* and Bellman-Ford comparing them to networkx library functions, 
#                printing the count of paths that were evaluated by my functions with a worse cost then nx's
def evaluate_problem (
    size,
    density,
    noise_level,
    negative_values,
    a_star_fn,
    bellman_ford_fn,
    heuristic_eucl=False,
    n_sources = 10,
    n_targets = 10,
    show_output = True
):
    # Problem generation:
    problem, map = create_problem (
        size=size,
        density=density,
        noise_level=noise_level,
        negative_values=negative_values
    )

    def h_default (u, goal):
        return 0
    
    def h_euclidean (u, goal):
        dx = map[u, 0] - map[goal, 0]
        dy = map[u, 1] - map[goal, 1]
        return np.sqrt(dx*dx + dy*dy)

    # Building NetworkX:
    G = nx.DiGraph()
    for u in range (size):
        for v in range (size):
            w = problem[u, v]
            if np.isfinite(w) and u != v:
                G.add_edge(u, v, weight=w)

    # Algorithm choice:
    if negative_values:
        algorithm = "BF"
    else:
        algorithm = "A*"

    # Choosing random n_sources*n_targets (s, t)
    sources = random.sample(range (size), n_sources)
    targets = random.sample(range (size), n_targets)
    pairs = [(s, t) for s in sources for t in targets if s != t]

    # Preparing edges list for Bellman-Ford
    edges = []
    if negative_values:
        for u in range (size):
            for v in range (size):
                w = problem[u, v]
                if np.isfinite(w) and u != v:
                    edges.append((u, v, w))

    # Test variables:
    ok = 0
    bad = 0
    neg_count = 0

    # Timer start:
    start_time = time.time()  

    iterator = tqdm(pairs, desc="Evaluating paths") if show_output else pairs

    for s, t in iterator:
        try:
            if negative_values:     # Bellman-Ford for graphs with possible negative weights:
                cost_nx = nx.bellman_ford_path_length(G, s, t, weight='weight')
            else:                   # Dijkstra / shortest_path for positive graphs:
                cost_nx = nx.shortest_path_length(G, s, t, weight='weight')
        except NetworkXNoPath:      # No path between s and t
            cost_nx = float ("inf")
        except NetworkXUnbounded:   # Negative cycle → indefinite path
            cost_nx = float ("inf")
            #print(f"[WARNING] Negative cycle between {s} → {t}")

        if algorithm == "A*":
            if heuristic_eucl == False:
                cost_custom, _ = a_star_fn (problem, s, t, h_default)
            else:
                cost_custom, _ = a_star_fn (problem, s, t, h_euclidean)
            if cost_custom == cost_nx:
                ok += 1
            else:
                bad += 1
        else:
            dist, _, neg = bellman_ford_fn (size, edges, s)
            cost_custom = dist[t]
            if neg:
                neg_count += 1
            else:
                if cost_custom == cost_nx:
                    ok += 1
                else:
                    bad += 1        

    # Timer end:
    elapsed_time = time.time() - start_time  

    # Final results:
    if show_output:
        total = len(pairs)
        if neg_count > 0:
            print(f"{total} paths tested: {ok} costs were ok, {bad} were wrong, {neg_count} negative cycles detected")
        else:
            print(f"{total} paths tested: {ok} costs were ok, {bad} were wrong")
        print(f"Elapsed time: {elapsed_time:.2f} seconds")

    return ok, bad, neg_count

## Single problem evaluation:

In [11]:
_ = evaluate_problem(
    size = 200,
    density = 0.2,
    noise_level = 0.2,
    negative_values = False,
    a_star_fn = a_star,
    bellman_ford_fn = bellman_ford,
    heuristic_eucl= True,
    n_sources = 10,
    n_targets = 10,
    show_output = True
)

Evaluating paths: 100%|██████████| 100/100 [00:02<00:00, 48.71it/s]

100 paths tested: 100 costs were ok, 0 were wrong
Elapsed time: 2.05 seconds





## All combination of problems evaluator:

In [15]:
def evaluate_all_problems (n_sources, n_targets):
    sizes = [10,20,50,100,200,500,1000]
    densities = [0.2,0.5,0.8,1.0]
    noise_levels = [0.0,0.1,0.5,0.8]
    possible_neg_values = [False, True]
    ok, bad, neg_count = 0, 0, 0
    all_combinations = list (product (sizes, densities, noise_levels, possible_neg_values))
    for s, d, n, neg in tqdm(all_combinations, desc="Evaluating all problems"):
        #print (f"\n***Size: {s}, density: {d}, noise level: {n}, negative values: {neg}***")
        this_ok, this_bad, this_nc = evaluate_problem(
                                size = s,
                                density = d,
                                noise_level = n,
                                negative_values = neg,
                                a_star_fn = a_star,
                                bellman_ford_fn = bellman_ford,
                                heuristic_eucl= True,
                                n_sources = n_sources,
                                n_targets = n_targets,
                                show_output=False
        )
        ok += this_ok
        bad += this_bad
        neg_count += this_nc
    print (f"***Final results: {ok} path costs were ok, {bad} path costs were wrong, {neg_count} negative cycles were detected***")

evaluate_all_problems (1, 1)       #testing on just one path for each problem

Evaluating all problems: 100%|██████████| 224/224 [17:12<00:00,  4.61s/it]

***Final results: 147 path costs were ok, 0 path costs were wrong, 70 negative cycles were detected***



