In [1]:
from itertools import product, combinations
import numpy as np
import networkx as nx
from icecream import ic
import heapq
import time
from itertools import combinations

I slightly modified the create_problem function to save several minutes in the graph creation

In [2]:
def create_problem(
    size: int,
    *,
    density: float = 1.0,
    negative_values: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> np.ndarray:
    """Numpy Optimized problem generator for Lab3"""
    rng = np.random.default_rng(seed)
    map = rng.random(size=(size, 2))
    problem = rng.random((size, size))
    
    if negative_values:
        problem = problem * 2 - 1
    problem *= noise_level
    
    # Vectorized distance calculation
    x_diff = map[:, 0].reshape(-1, 1) - map[:, 0].reshape(1, -1)
    y_diff = map[:, 1].reshape(-1, 1) - map[:, 1].reshape(1, -1)
    distances = np.sqrt(x_diff**2 + y_diff**2)
    
    # Vectorized density mask
    density_mask = rng.random((size, size)) < density
    
    # Apply operations vectorially
    problem += distances * density_mask
    problem[~density_mask] = np.inf

    heuristic = lambda a, b: distances[a, b] * 1000   # Multiplied by 1000 to match scaling
    
    np.fill_diagonal(problem, 0)
    return (problem * 1_000).round(), heuristic

# A* with euclidean distance heuristic
Used for positive weight problems

In [3]:
def astar_shortest_path(G, source, target, heuristic):
    """
    A* with heuristic support
    """
    # PQ by f_score (g_score + h_score)
    pq = [(heuristic(source, target), source)]
    
    # Cost from start to each node (g_score)
    g_score = {node: float('inf') for node in G.nodes}
    g_score[source] = 0
    
    previous_nodes = {node: None for node in G.nodes}

    visited = set()
    
    while pq:
        _, current_node = heapq.heappop(pq)
        
        if current_node in visited:
            continue
            
        visited.add(current_node)
        
        if current_node == target:
            path = []
            while current_node is not None:
                path.append(current_node)
                current_node = previous_nodes[current_node]
            path.reverse()
            return path, g_score[target]
        
        for neighbor in G.neighbors(current_node):
            if neighbor in visited:
                continue
                
            edge_weight = G[current_node][neighbor]['weight']
            tentative_g_score = g_score[current_node] + edge_weight
            
            if tentative_g_score < g_score[neighbor]:
                # This path to neighbor is better than any previous one
                previous_nodes[neighbor] = current_node
                g_score[neighbor] = tentative_g_score
                f_score = tentative_g_score + heuristic(neighbor, target)
                heapq.heappush(pq, (f_score, neighbor))
    
    # No path found
    return None, float('inf')

# Bellman Ford implmentation
Used for negative weight problems

In [4]:
def bellman_ford_shortest_path(G, source, target):
    distances = {node: float('inf') for node in G.nodes}
    distances[source] = 0
    previous_nodes = {node: None for node in G.nodes}
    for _ in range(len(G.nodes) - 1):
        for u, v, data in G.edges(data=True):
            weight = data['weight']
            if distances[u] + weight < distances[v]:
                distances[v] = distances[u] + weight
                previous_nodes[v] = u
    for u, v, data in G.edges(data=True):
        weight = data['weight']
        if distances[u] + weight < distances[v]:
            return None, -np.inf
    path = []
    current_node = target
    while current_node is not None:
        path.append(current_node)
        current_node = previous_nodes[current_node]
    path.reverse()
    return path, distances[target]

# Problems

In [5]:
size = [10, 20, 50, 100, 200, 500, 1000]
density = [0.2, 0.5, 0.8, 1.0]
noise_level = [0.0, 0.1, 0.5, 0.8]
negative_values = [False, True]

I will use A* for positive values, BF for negative values.

I will compare with Dijsktra of nx for positive problems, BF for negative problems

In [6]:
def process_positive_problem(size: int, density: float, noise_level: float, seed: int):
    problem, heuristic = create_problem(
        size,
        density=density,
        negative_values=False,
        noise_level=noise_level,
        seed=seed,
    )
    G = nx.from_numpy_array(problem, create_using=nx.DiGraph)

    results = []

    i = 0

    if size >= 500:
        limit = 5
    elif size >= 200:
        limit = 25
    elif size >= 100:
        limit = 40
    else:
        limit = size * (size - 1) // 2

    for source, target in combinations(range(size), 2):
        my_start = time.time()
        path_astar, cost_astar = astar_shortest_path(G, source, target, heuristic)
        my_end = time.time()
        nx_start = time.time()
        cost_dijkstra = nx.dijkstra_path_length(G, source, target)
        nx_end = time.time()

        results.append({
            "source": source,
            "target": target,
            "my_cost": cost_astar,
            "nx_cost": cost_dijkstra,
            "my_time": (my_end - my_start),
            "nx_time": (nx_end - nx_start),
            "equal": cost_astar == cost_dijkstra
        })

        i+=1
        # Sanity check limit
        if i == limit:
            break
    return results

In [7]:
# results = process_positive_problem(500, 0.1, 0.0, 42)
# print(results[0])

In [None]:
def process_negative_problem(size: int, density: float, noise_level: float, seed: int):
    problem, _ = create_problem(
        size,
        density=density,
        negative_values=True,
        noise_level=noise_level,
        seed=seed,
    )
    G = nx.from_numpy_array(problem, create_using=nx.DiGraph)

    results = []

    i = 0
    if size >= 1000:
        limit = 2
    elif size >= 500:
        limit = 2
    elif size >= 200:
        limit = 10
    elif size >= 100:
        limit = 50
    else:
        limit = size * (size - 1) // 2


    for source, target in combinations(range(size), 2):
        my_start = time.time()
        path_bf, cost_bf = bellman_ford_shortest_path(G, source, target)
        my_end = time.time()
        nx_start = time.time()
        try:
            cost_bf_nx = nx.bellman_ford_path_length(G, source, target)
        except nx.NetworkXUnbounded:
            cost_bf_nx = -np.inf
        except nx.NetworkXNoPath:
            cost_bf_nx = np.inf

        nx_end = time.time()

        results.append({
            "source": source,
            "target": target,
            "my_cost": cost_bf,
            "nx_cost": cost_bf_nx,
            "my_time": (my_end - my_start),
            "nx_time": (nx_end - nx_start),
            "equal": cost_bf == cost_bf_nx
        })

        i+=1 
        # Sanity check limit
        if i == limit:
            break
    return results

In [9]:
# results = process_negative_problem(250, 0.1, 0.0, 42)
# print(results[0])

In [10]:
import concurrent.futures
from tqdm import tqdm
from itertools import product
import os
import json
import math

size = [10, 20, 50, 100, 200, 500, 1000]
density = [0.2, 0.5, 0.8, 1.0]
noise_level = [0.0, 0.1, 0.5, 0.8]
negative_values = [False, True]


def process_problem(params):
    size, density, noise_level, negative_values, seed = params
    problem_string = f"problem_size={size}_density={density}_noise={noise_level}_negative={negative_values}"

    if not os.path.exists(f"results/{problem_string}.json"):
        # print(f"Processing {problem_string}...")
        if negative_values:
            results = process_negative_problem(size, density, noise_level, seed)
        else:
            results = process_positive_problem(size, density, noise_level, seed)

        for result in results:
            if math.isinf(result["my_cost"]):
                result["my_cost"] = "inf" if result["my_cost"] > 0 else "-inf"
            if math.isinf(result["nx_cost"]):
                result["nx_cost"] = "inf" if result["nx_cost"] > 0 else "-inf"
        with open(f"results/{problem_string}.json", "w") as f:
            json.dump(results, f, indent=2)
        
    return f"Completed {problem_string}"
        
        
    
combination_list = list(product(size, density, noise_level, negative_values, [42]))

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    futures = {executor.submit(process_problem, params): params for params in combination_list}

    for future in tqdm(concurrent.futures.as_completed(futures), total=len(combination_list), desc="Processing problems"):
        future.result()


Processing problems:   0%|          | 0/224 [00:00<?, ?it/s]

Processing problems: 100%|██████████| 224/224 [00:56<00:00,  3.99it/s]
