In [1]:
# SHORTEST PATH

In [2]:
from itertools import product, combinations
import numpy as np
import networkx as nx
from icecream import ic
from tqdm.auto import tqdm


In [3]:
### CODE GIVEN BY PROFESSOR

In [4]:
def create_problem(
    size: int,
    *,
    density: float = 1.0,
    negative_values: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> np.ndarray:
    rng = np.random.default_rng(seed)
    map = rng.random(size=(size, 2))
    problem = rng.random((size, size))
    if negative_values:
        problem = problem * 2 - 1
    problem *= noise_level
    for a, b in product(range(size), repeat=2):
        if rng.random() < density:
            problem[a, b] += np.sqrt(
                np.square(map[a, 0] - map[b, 0]) + np.square(map[a, 1] - map[b, 1])
            )
        else:
            problem[a, b] = np.inf
    np.fill_diagonal(problem, 0)
    return (problem * 1_000).round()

In [5]:
def shortest_path_analysis(problem: np.ndarray) -> None: # by professor
    masked = np.ma.masked_array(problem, mask=np.isinf(problem))
    G = nx.from_numpy_array(masked, create_using=nx.DiGraph)
    #ic(G.edges(data=True))
    for s, d in combinations(range(problem.shape[0]), 2):
        try:
            # path = nx.shortest_path(G, s, d, weight='weight')
            path = nx.bellman_ford_path(G, s, d, weight='weight')
            cost = cost = nx.path_weight(G, path, weight='weight')
        except nx.NetworkXNoPath:
            # Nodes are not connected
            path = None
            cost = np.inf
        except nx.NetworkXUnbounded:
            # Negative cycle detected
            path = None
            cost = -np.inf
        #ic(s, d, path, cost)
    None

In [6]:
### MY SOLVING ALGORITHM

In [7]:
import heapq
from typing import List, Dict, Optional, Tuple

# Data structures for loops and cities

class Loop:
    def __init__(self):
        self.path: List[int] = []  
        self.exits: List[int] = []  

class City:
    def __init__(self):
        self.loops: List[int] = []  
        self.path: List[int] = []  
        self.cost: float = float('inf')  

loops: List[Loop] = []
cities: Dict[int, City] = {}

# Utility functions 

def has_negative_edges(graph: np.ndarray) -> bool:
    finite_edges = graph[np.isfinite(graph)]
    return np.any(finite_edges < 0)


def choose_next_edge(current: int, graph: np.ndarray, backtracking_from: Optional[int] = None) -> Optional[int]:
    
    outgoing_edges = graph[current, :].copy()
    
    # Exclude self-loop only if it has non-negative cost
    if graph[current, current] >= 0:
        outgoing_edges[current] = np.inf
    
    # Always exclude previous city from path information
    city_path = cities[current].path
    if len(city_path) >= 2:
        came_from = city_path[-2]
        outgoing_edges[came_from] = np.inf
    
    if backtracking_from is not None:
        # Backtracking case: choose edge following in cost order
        # Get all finite edges sorted by cost
        finite_mask = np.isfinite(outgoing_edges)
        finite_indices = np.where(finite_mask)[0]
        finite_costs = outgoing_edges[finite_indices]
        
        # Sort edges by cost
        sorted_indices = np.argsort(finite_costs)
        sorted_destinations = finite_indices[sorted_indices]
        
        # Find backtracking_from edge position in sorted order
        backtrack_pos = -1
        for i, dest in enumerate(sorted_destinations):
            if dest == backtracking_from:
                backtrack_pos = i
                break
        
        # Choose the next edge in cost order after backtracking_from
        if backtrack_pos >= 0 and backtrack_pos + 1 < len(sorted_destinations):
            return int(sorted_destinations[backtrack_pos + 1])
        else:
            return None  # No more edges available
    
    else:
        # Normal case: return cheapest destination
        finite_mask = np.isfinite(outgoing_edges)
        if not np.any(finite_mask):
            return None
        
        min_cost_idx = np.argmin(outgoing_edges)
        if np.isfinite(outgoing_edges[min_cost_idx]):
            return int(min_cost_idx)
        else:
            return None


def compute_min_edge_heuristic(current: int, goal: int, graph: np.ndarray) -> float:
    
    if current == goal:
        return 0
    
    # Get the next edge that would be chosen
    next_dest = choose_next_edge(current, graph)
    
    if next_dest is None:
        return 0
    
    return graph[current, next_dest]

def detect_loop(current_path: List[int], current_city: int) -> bool:
    """Check if current_city is already in the path (forms a loop)"""
    if current_city in current_path:
        return True
    return False

# solver functions

def a_star_shortest_path(graph: np.ndarray, start: int, goal: int) -> Tuple[Optional[List[int]], float]:
    
    # Initialize
    frontier = []
    
    # Initialize cities dictionary
    cities.clear()
    start_city = City()
    start_city.path = [start]
    start_city.cost = 0
    cities[start] = start_city
    
    start_f_score = cities[start].cost + compute_min_edge_heuristic(start, goal, graph)
    heapq.heappush(frontier, (start_f_score, start))
    
    while frontier:
        current_f, current = heapq.heappop(frontier)
        
        if current == goal:
            return cities[current].path, cities[goal].cost
        
        # Get neighbors
        neighbors = []
        for neighbor in range(graph.shape[0]):
            if np.isfinite(graph[current, neighbor]) and neighbor != current:
                neighbors.append(neighbor)
        
        for neighbor in neighbors:
            # Check for loop before processing neighbor
            if detect_loop(cities[current].path, neighbor):
                # Skip this neighbor as it would create a loop
                continue
                
            tentative_cost = cities[current].cost + graph[current, neighbor]
            
            if neighbor not in cities or tentative_cost < cities[neighbor].cost:
                # Update cities dictionary for neighbor
                if neighbor not in cities:
                    cities[neighbor] = City()
                
                cities[neighbor].path = cities[current].path + [neighbor]
                cities[neighbor].cost = tentative_cost
                
                h_score = compute_min_edge_heuristic(neighbor, goal, graph)
                f_score = tentative_cost + h_score
                
                # Add to frontier if not already there with better score
                in_frontier = False
                for i, (f, node) in enumerate(frontier):
                    if node == neighbor:
                        if f_score < f:
                            frontier[i] = (f_score, neighbor)
                            heapq.heapify(frontier)
                        in_frontier = True
                        break
                
                if not in_frontier:
                    heapq.heappush(frontier, (f_score, neighbor))
    
    return None, float('inf')


In [None]:
# Parameters from README
negative_values_list = [False, True]
noise_level_list = [0, 0.1, 0.5, 0.8]
density_list = [0.2, 0.5, 0.8, 1]
size_list = [10, 20, 50, 100, 200, 500, 1000]

# Create all combinations of problems
for negative_values, noise_level, density, size in product(negative_values_list, noise_level_list, density_list, size_list):
    print(negative_values, noise_level, density, size)
    problem = create_problem(size=size, density=density, noise_level=noise_level, negative_values=negative_values)
    for start in range(size): 
        for goal in range(size) :
            a_star_shortest_path(problem,start,goal)


False 0 0.2 10
False 0 0.5 10
False 0 0.8 10
False 0 1 10
False 0.1 0.2 10
False 0.1 0.5 10
False 0.1 0.8 10
False 0.1 1 10
False 0.5 0.2 10
False 0.5 0.5 10
False 0.5 0.8 10
False 0.5 1 10
False 0.8 0.2 10
False 0.8 0.5 10
False 0.8 0.8 10
False 0.8 1 10
True 0 0.2 10
True 0 0.5 10
True 0 0.8 10
True 0 1 10
True 0.1 0.2 10
True 0.1 0.5 10
True 0.1 0.8 10
True 0.1 1 10
True 0.5 0.2 10
True 0.5 0.5 10
True 0.5 0.8 10
True 0.5 1 10
True 0.8 0.2 10
True 0.8 0.5 10
False 0.8 0.5 10
False 0.8 0.8 10
False 0.8 1 10
True 0 0.2 10
True 0 0.5 10
True 0 0.8 10
True 0 1 10
True 0.1 0.2 10
True 0.1 0.5 10
True 0.1 0.8 10
True 0.1 1 10
True 0.5 0.2 10
True 0.5 0.5 10
True 0.5 0.8 10
True 0.5 1 10
True 0.8 0.2 10
True 0.8 0.5 10
True 0.8 0.8 10
True 0.8 1 10
True 0.8 0.8 10
True 0.8 1 10
