# Pathfinding Framework vs NetworkX Benchmark

This notebook demonstrates the capabilities of a custom **Pathfinding Framework** by comparing its performance and results against **NetworkX**, a widely-used graph library, on a shortest path problem in a weighted graph.

The goal is to validate that the framework's implementations of **Uniform Cost Search (UCS)** and **A* Search** (with a zero heuristic, making it equivalent to UCS) yield the same optimal path and cost as the **Bellman-Ford algorithm** from NetworkX. Bellman-Ford is specifically chosen because it correctly handles graphs with **negative edge weights**, a common challenge for Dijkstra's and related algorithms.

### Objectives:
1.  **Generate** a random weighted graph.
2.  **Implement** the problem using the framework's `AbstractProblem` interface.
3.  **Run** UCS and A* (h=0) from the custom framework.
4.  **Run** Bellman-Ford from NetworkX.
5.  **Compare** results for correctness and performance.

In [54]:
import sys
import os
import numpy as np
import networkx as nx
from itertools import product
from dataclasses import dataclass
from typing import List, Tuple, Dict
from functools import partial
import time

# --- Setup for framework import ---
# Add src to path, assuming the notebook is in the same directory as the original script.
# This ensures the core framework modules can be imported.
sys.path.insert(0, os.path.join(os.path.dirname('__file__'), '..'))

# Importing framework components
try:
    from src.core import (
        AbstractProblem,
        AbstractState,
        AbstractHeuristic,
        SearchEngine,
        SearchResult
    )
    from src.data_structures import PriorityQueueFrontier
    from src.strategies import astar_priority, uniform_cost_priority
    print("Framework components loaded successfully.")
except ImportError as e:
    print(f"Error loading framework components. Ensure 'src' directory is correctly placed: {e}")
    # Handle the error gracefully if the framework is not available (e.g., use mock classes or exit)

from graph_strategies import (
        robust_uniform_cost_priority, 
        robust_astar_priority, 
        create_robust_priority_function,
        NegativeCycleDetectedError
    )


Framework components loaded successfully.


In [55]:
def create_problem(
    size: int,
    *,
    density: float = 1.0,
    negative_values: bool = False,
    noise_level: float = 0.0,
    seed: int = 42,
) -> np.ndarray:
    """
    Problem generator for graph pathfinding benchmark.
    Generates a weighted graph adjacency matrix.

    Args:
        size: Number of nodes in the graph.
        density: Probability of edge existence (0.0 to 1.0).
        negative_values: If True, allows negative edge weights.
        noise_level: Amount of random noise to add to edge weights.
        seed: Random seed for reproducibility.

    Returns:
        Adjacency matrix (size x size) where matrix[i][j] is the cost
        from node i to node j. np.inf indicates no edge.
    """
    rng = np.random.default_rng(seed)

    # Generate random 2D positions for nodes
    map_positions = rng.random(size=(size, 2))

    # Initialize with random values
    problem = rng.random((size, size))

    # Add negative values if requested
    if negative_values:
        # Scale to range: [-1, 1]
        problem = problem * 2 - 1 

    # Add noise
    problem *= noise_level

    # Build edges based on Euclidean distance + noise
    for a, b in product(range(size), repeat=2):
        if rng.random() < density:
            # Calculate Euclidean distance
            euclidean_dist = np.sqrt(
                np.square(map_positions[a, 0] - map_positions[b, 0]) + 
                np.square(map_positions[a, 1] - map_positions[b, 1])
            )
            problem[a, b] += euclidean_dist
        else:
            # No edge
            problem[a, b] = np.inf

    # Diagonal (self-loops) should be 0
    np.fill_diagonal(problem, 0)

    # Scale and round for cleaner values
    return (problem * 1_000).round()

In [56]:
# =============================================================================
# FRAMEWORK IMPLEMENTATION
# =============================================================================

@dataclass(frozen=True)
class GraphNode(AbstractState):
    """Represents a node in the graph."""
    
    node_id: int
    
    def __hash__(self) -> int:
        return hash(self.node_id)
    
    def __eq__(self, other: object) -> bool:
        if not isinstance(other, GraphNode):
            return NotImplemented
        return self.node_id == other.node_id
    
    def __repr__(self) -> str:
        return f"Node({self.node_id})"


class GraphProblem(AbstractProblem[GraphNode]):
    """
    Weighted graph pathfinding problem implementation for the custom framework.
    Supports negative edge weights.
    """
    
    def __init__(
        self, 
        adjacency_matrix: np.ndarray, 
        start_node: int, 
        goal_node: int,
        node_positions: Dict[int, Tuple[float, float]] = None
    ):
        """
        Args:
            adjacency_matrix: Cost matrix where [i][j] is cost from i to j.
            start_node: Starting node ID.
            goal_node: Goal node ID.
            node_positions: Optional positions for heuristic calculation (unused here).
        """
        self.matrix = adjacency_matrix
        self.size = len(adjacency_matrix)
        self.start = GraphNode(start_node)
        self.goal = GraphNode(goal_node)
        self.node_positions = node_positions or {}
    
    def initial_state(self) -> GraphNode:
        """Return the starting node."""
        return self.start
    
    def is_goal(self, state: GraphNode) -> bool:
        """Check if state is the goal node."""
        return state == self.goal
    
    def get_successors(self, state: GraphNode) -> List[Tuple[GraphNode, str, float]]:
        """Generate successors by following edges from current node."""
        successors = []
        current_id = state.node_id
        
        for next_id in range(self.size):
            cost = self.matrix[current_id][next_id]
            
            # Skip if no edge exists or self-loop
            if np.isfinite(cost) and next_id != current_id:
                next_state = GraphNode(next_id)
                action = f"{current_id}->{next_id}"
                # Cast cost to float to avoid numpy scalar issues in framework
                successors.append((next_state, action, float(cost)))
        
        return successors


class ZeroHeuristic(AbstractHeuristic[GraphNode]):
    """
    Zero heuristic (always returns 0).
    Makes A* equivalent to Uniform Cost Search (UCS).
    """
    
    def h(self, state: GraphNode) -> float:
        return 0.0
    
    def is_admissible(self) -> bool:
        return True
    
    def is_consistent(self) -> bool:
        return True

In [57]:
# Benchmark execution functions
def run_framework_ucs(problem: GraphProblem) -> Tuple[SearchResult, float]:
    """Run Uniform Cost Search from the framework."""
    start_time = time.time()
    priority_fn = partial(robust_uniform_cost_priority, max_nodes=problem.size)
    engine = SearchEngine(
        problem=problem,
        frontier=PriorityQueueFrontier(),
        priority_fn=priority_fn,
        heuristic=None,
        graph_search=True,
        allow_revisit=True  # negative edge weights
    )
    try:
        result = engine.search()
        elapsed_time = time.time() - start_time
        return result, elapsed_time
    except NegativeCycleDetectedError as e:
        print(e)
        return SearchResult(
            success=False,
            total_cost=-np.inf,
            nodes_expanded=problem.size + 1
        ), time.time() - start_time
    


def run_framework_astar(problem: GraphProblem) -> Tuple[SearchResult, float]:
    """Run A* with zero heuristic from the framework."""
    start_time = time.time()
    priority_fn = partial(robust_astar_priority, max_nodes=problem.size)
    engine = SearchEngine(
        problem=problem,
        frontier=PriorityQueueFrontier(),
        priority_fn=priority_fn,
        heuristic=ZeroHeuristic(),
        graph_search=True,
        allow_revisit=True  # negative edge weights
    )
    try:
        result = engine.search()
        elapsed_time = time.time() - start_time
        return result, elapsed_time
    except NegativeCycleDetectedError as e:
        print(e)
        return SearchResult(
            success=False,
            total_cost=-np.inf,
            nodes_expanded=problem.size + 1
        ), time.time() - start_time
    


def run_networkx_bellman_ford(
    problem_matrix: np.ndarray,
    start_node: int,
    goal_node: int
) -> Tuple[List[int], float, float]:
    """Run Bellman-Ford from NetworkX."""
    # Build NetworkX graph
    G = nx.DiGraph()
    # Add edges from the adjacency matrix
    for i in range(len(problem_matrix)):
        for j in range(len(problem_matrix)):
            cost = problem_matrix[i][j]
            if np.isfinite(cost) and i != j:
                G.add_edge(i, j, weight=float(cost))
    
    # Run Bellman-Ford
    start_time = time.time()
    try:
        path = nx.bellman_ford_path(G, start_node, goal_node, weight='weight')
        cost = nx.bellman_ford_path_length(G, start_node, goal_node, weight='weight')
        elapsed_time = time.time() - start_time
        return path, cost, elapsed_time
    except (nx.NetworkXUnbounded) as e:
        elapsed_time = time.time() - start_time
        print(f"NetworkX Error: {e}")
        return None, -np.inf, elapsed_time

In [58]:
# Benchmark configuration
print("PATHFINDING FRAMEWORK vs NETWORKX BENCHMARK")
print("-" * 60)

# 1. Generate the problem
print("Generating problem...")

# Use parameters that allow for negative weights for a proper Bellman-Ford test
# NOTE: Bellman-Ford is only required when negative_values is True.
# Setting it to False for this demo run to ensure UCS/A* correctness first.
# Change 'negative_values' to True to see the benefit of Bellman-Ford.
problem_matrix = create_problem(
    size=10, 
    density=0.5, # Reduced density for a more complex sparse graph
    noise_level=10, 
    negative_values=True,
    seed=42
)

# Define start and goal
start_node = 0
goal_node = 9

print(f"Problem Matrix: {problem_matrix.shape[0]} nodes")
print(f"Edges: {np.sum(np.isfinite(problem_matrix)) - len(problem_matrix)}")
# Using a check that works only on finite costs
finite_costs = problem_matrix[np.isfinite(problem_matrix) & (problem_matrix != 0)]
has_negatives = np.any(finite_costs < 0)
print(f"Has negative edges (excluding self-loops): {has_negatives}")
print(f"Task: Find shortest path from Node {start_node} to Node {goal_node}")
print("-" * 60)

# Create problem instance
problem = GraphProblem(
    adjacency_matrix=problem_matrix,
    start_node=start_node,
    goal_node=goal_node
)


# Run Framework UCS
print("FRAMEWORK: Uniform Cost Search (UCS)")
result_ucs, time_ucs = run_framework_ucs(problem)
print("-" * 30)

if result_ucs.success:
    print(f"Solution found!")
    print(f"Path: {' -> '.join([str(s.node_id) for s in result_ucs.path])}")
    print(f"Cost: {result_ucs.total_cost:.2f}")
    print(f"Nodes expanded: {result_ucs.nodes_expanded}")
    print(f"Time: {time_ucs*1000:.2f} ms")
else:
    print("Infinite loop detected.")
    print(f"Cost: {result_ucs.total_cost:.2f}")
    print(f"Nodes expanded: {result_ucs.nodes_expanded}")
    print(f"Time: {time_ucs*1000:.2f} ms")
print("\n")


# Run Framework A*
print("FRAMEWORK: A* Search (Zero Heuristic)")
result_astar, time_astar = run_framework_astar(problem)
print("-" * 30)

if result_astar.success:
    print(f"Solution found!")
    print(f"Path: {' -> '.join([str(s.node_id) for s in result_astar.path])}")
    print(f"Cost: {result_astar.total_cost:.2f}")
    print(f"Nodes expanded: {result_astar.nodes_expanded}")
    print(f"Time: {time_astar*1000:.2f} ms")
else:
    print("Infinite loop detected.")
    print(f"Cost: {result_astar.total_cost:.2f}")
    print(f"Nodes expanded: {result_astar.nodes_expanded}")
    print(f"Time: {time_astar*1000:.2f} ms")
print("\n")


# Run NetworkX Bellman-Ford
print("NETWORKX: Bellman-Ford Algorithm")
path_nx, cost_nx, time_nx = run_networkx_bellman_ford(
    problem_matrix, start_node, goal_node
)
print("-" * 30)

if path_nx is not None:
    print(f"Solution found!")
    print(f"Path: {' -> '.join(map(str, path_nx))}")
    print(f"Cost: {cost_nx:.2f}")
    print(f"Time: {time_nx*1000:.2f} ms")
else:
    print("Infinite loop detected.")
    print(f"Cost: {cost_nx:.2f}")
    print(f"Time: {time_nx*1000:.2f} ms")
print("\n")

PATHFINDING FRAMEWORK vs NETWORKX BENCHMARK
------------------------------------------------------------
Generating problem...
Problem Matrix: 10 nodes
Edges: 47
Has negative edges (excluding self-loops): True
Task: Find shortest path from Node 0 to Node 9
------------------------------------------------------------
FRAMEWORK: Uniform Cost Search (UCS)
Negative cycle detected: path length (11) exceeds number of nodes (10). This indicates a cycle with negative total cost that would cause infinite looping.
------------------------------
Infinite loop detected.
Cost: -inf
Nodes expanded: 11
Time: 0.36 ms


FRAMEWORK: A* Search (Zero Heuristic)
Negative cycle detected: path length (11) exceeds number of nodes (10). This indicates a cycle with negative total cost that would cause infinite looping.
------------------------------
Infinite loop detected.
Cost: -inf
Nodes expanded: 11
Time: 0.24 ms


NETWORKX: Bellman-Ford Algorithm
NetworkX Error: Negative cycle detected.
---------------------

In [59]:
# Comparison summary
print("COMPARISON SUMMARY")
print("=" * 71)

results_table = []

if result_ucs.success:
    results_table.append([
        "Framework UCS",
        result_ucs.total_cost,
        len(result_ucs.path),
        result_ucs.nodes_expanded,
        f"{time_ucs*1000:.2f} ms"
    ])

if result_astar.success:
    results_table.append([
        "Framework A* (h=0)",
        result_astar.total_cost,
        len(result_astar.path),
        result_astar.nodes_expanded,
        f"{time_astar*1000:.2f} ms"
    ])

if path_nx is not None:
    results_table.append([
        "NetworkX Bellman-Ford",
        cost_nx,
        len(path_nx),
        "N/A",
        f"{time_nx*1000:.2f} ms"
    ])

# Print table header
print(f"{'Algorithm':<25} {'Cost':>10} {'Path Len':>10} {'Expanded':>10} {'Time':>12}")
print("-" * 71)
# Print table rows
for row in results_table:
    print(f"{row[0]:<25} {row[1]:>10.2f} {row[2]:>10} {str(row[3]):>10} {row[4]:>12}")
print("\n")


# Validation
print("VALIDATION")
print("=" * 71)

if result_ucs.success and path_nx is not None:
    # Check if costs are essentially the same (floating point tolerance)
    cost_match = abs(result_ucs.total_cost - cost_nx) < 0.01
    # Check if the path nodes are the same sequence
    path_ucs_nodes = [s.node_id for s in result_ucs.path]
    path_match = path_ucs_nodes == path_nx
    
    print("Framework UCS/A* vs NetworkX Bellman-Ford:")
    print(f"Cost match: {'YES' if cost_match else 'NO'}")
    # NOTE: Path can differ if multiple optimal paths exist, but costs must match.
    print(f"Path match: {'YES' if path_match else 'NO'} (Optimal Path might not be unique)")
    print()
    
    if cost_match:
        print("VALIDATION SUCCESSFUL! Framework produces the same optimal cost as NetworkX.")
    else:
        print("RESULTS DIFFER! Check for negative cycles or implementation bugs.")
        if has_negatives:
            print("Note: UCS/A* (without potential function or re-weighting) is NOT guaranteed to be optimal with negative edge weights.")
            
else:
    print("Cannot validate - one or more algorithms failed to find a solution or a negative cycle was found.")

print("\n")

COMPARISON SUMMARY
Algorithm                       Cost   Path Len   Expanded         Time
-----------------------------------------------------------------------


VALIDATION
Cannot validate - one or more algorithms failed to find a solution or a negative cycle was found.


