In [1]:
%load_ext cython

In [2]:
from timeit import timeit
import math
import time
from typing import Any, List, Tuple, Optional
import numpy as np
from pydantic import BaseModel
from matplotlib import pyplot as plt
import random
from pydantic import BaseModel

In [3]:
class Node(BaseModel):
    """
    VRPPL Node class: customers, depots, and lockers

    Storing the node data including the node id, x and y coordinates
    """
    node_id: int
    x: float
    y: float
    early: float
    late: float
    service: float

class Customer(Node):
    """
    VRPPL Customer class

    Storing the customer data including the customer id, x and y coordinates, early time, late time, service time, and demand
    """
    demand: float
    customer_type: int
    assigned_locker: Optional[Node] = None  # changed union operator to Optional
    locker_delivery: Optional[bool] = None    # changed union operator to Optional

class Vehicle(BaseModel):
    """
    VRPPL Vehicle class

    Storing the vehicle data including the vehicle id, capacity, and fixed cost
    """
    v_id: int
    capacity: float
    load: float
    current_time: float
    visited: list[int] = []

    # Ensuring that the vehicle load is not greater than the vehicle capacity
    def __init__(self, **data):
        super().__init__(**data)
        if self.load > self.capacity:
            raise ValueError("Vehicle load cannot be greater than vehicle capacity")

class Problem:
    """
    VRPPL Problem class

    Storing the problem data including nodes, vehicles, and demands
    """

    customers: list[Customer]
    depot: Node
    lockers: list[Node]
    num_vehicles: int
    num_lockers: int
    num_customers: int
    vehicle_capacity: int
    locker_cache = {}

    def load_data(self, path: str):
        """
        Load the data from the input file

        Parameters:
        path (str): the path to the input file
        
        ## Description of file content:       
        - <Number of customers>\t<Number of lockers> 
        - <Number of vehicles>\t<Vehicle capacity>
        - num_customers X <Customer demands>
        - <Depot'x coordinate>\t<Depot's y coordinate>\t<Depot's early time>\t<Depot's late time>\t<Depot's service time>\t<Customer type = 0>
        - num_customers X <Customer'x coordinate>\t<Customer's y coordinate>\t<Customer's early time>\t<Customer's late time>\t<Customer's service time>\t<Customer type = 1, 2, 3>
        - num_lockers X <Locker'x coordinate>\t<Locker's y coordinate>\t<Locker's early time>\t<Locker's late time>\t<Locker's service time>\t<Locker type = 4>
        - num_customers X <Binary encoded locker assignment for each customer>. Ex: 0 1 => customer 1 is assigned to locker 2
        """
        print("Loading data from", path)
        with open(path, 'r') as f:
            # Read the first line to get the number of customers and lockers
            self.num_customers, self.num_lockers = map(int, f.readline().split())
            self.num_vehicles, self.vehicle_capacity = map(int, f.readline().split())
            # Read customer demands
            demands = []
            for _ in range(self.num_customers):
                demands.append(int(f.readline()))
            # Read depot data
            x, y, early, late, service, _ = map(float, f.readline().split())
            self.depot = Node(node_id=0, x=x, y=y, early=early, late=late, service=service)
            
            # Read customer data
            self.customers = []
            for i in range(1, self.num_customers + 1):
                x, y, early, late, service, customer_type = map(float, f.readline().split())
                self.customers.append(Customer(node_id=i, x=x, y=y, early=early, late=late, service=service, demand=demands[i-1], customer_type=customer_type))
            # Read locker data
            self.lockers = []
            for i in range(1, self.num_lockers + 1):
                x, y, early, late, service, _ = map(float, f.readline().split())
                self.lockers.append(Node(node_id=self.num_customers + i, x=x, y=y, early=early, late=late, service=service))
            # Read locker assignment data
            for i in range(self.num_customers):
                for j, assignment in enumerate(map(int, f.readline().split())):
                    if assignment:
                        self.customers[i].assigned_locker = self.lockers[j]
                        break
        print("Data loaded successfully")

    @staticmethod
    def euclidean_distance(n1: Node, n2: Node) -> float:
        """Helper function to calculate Euclidean distance between two nodes."""
        return hypot(n1.x - n2.x, n1.y - n2.y)
    
    def initialize_position(self, p: float) -> list[float]:
        """
        Generate an initial continuous position vector for customers in [0,1) such that
        customers that are near each other (based on their chosen delivery node, taking into
        account locker assignment) get positions with similar values.
        
        The procedure is as follows:
        1. For each customer, decide its locker delivery preference via _locker_assignment.
        2. Starting at the depot, build a nearest-neighbor ordering using the chosen delivery node.
        3. Assign positions in increasing order along [0,1), with a small random perturbation.
        
        Returns:
            positions (list[float]): A list of continuous values (one per customer) in [0,1).
                                    When sorted, the customer order reflects the nearest-neighbor ordering.
        """
        # Step 1: Create a list of (customer, chosen_node) tuples using the locker assignment.
        assignment_list = []
        for customer in self.customers:
            chosen_node, _ = self._locker_assignment(customer, explore=False, p=p)
            assignment_list.append((customer, chosen_node))
        
        # Step 2: Build a nearest-neighbor ordering starting from the depot.
        ordered_customers = []
        current_node = self.depot
        while assignment_list:
            # Find the tuple (customer, chosen_node) whose chosen_node is nearest to current_node.
            best_tuple = min(assignment_list, key=lambda tup: self.euclidean_distance(current_node, tup[1]))
            ordered_customers.append(best_tuple[0])
            current_node = best_tuple[1]
            assignment_list.remove(best_tuple)
        
        # Step 3: Assign continuous positions based on the ordering.
        n = len(self.customers)
        positions = [0.0] * n  # Placeholder: one position per customer.
        
        # We assign positions in increasing order. Adding a small noise helps keep values unique.
        for order, customer in enumerate(ordered_customers):
            # Evenly space the positions in [0,1). For example, position base = order / n.
            base = order / n
            # Add a small noise in the range [-0.5/n, 0.5/n].
            noise = random.uniform(-0.5 / n, 0.5 / n)
            pos = base + noise
            # Ensure that the final value remains in [0,1)
            pos = max(0.0, min(1.0 - 1e-8, pos))
            # Place the position in the output list.
            # Here we assume that self.customers is stored in a fixed order and that the index of a customer
            # in self.customers corresponds to its dimension in the positions vector.
            index = self.customers.index(customer)
            positions[index] = pos
    
        return positions

    def initialize_solution(self, p: float) -> Tuple[List[int], List[Node]]:
        # Copy of customers to be assigned.
        sorted_customers = sorted(self.customers, key=lambda cust: cust.node_id)
        # For each customer, assign a delivery node based on type.
        # For type 1, assign the customer itself.
        # For type 2 and type 3, assign the selected (nearest) parcel locker station.
        assignment_list = []
        for customer in sorted_customers:
            chosen_node, _ = self._locker_assignment(customer, explore=True, p=p)
            assignment_list.append((customer, chosen_node))
        
        # --- Step 2: Nearest-Neighbor Ordering ---
        # Initialize the two-row representation.
        first_row = [0]             # Row 0: depot marker at the beginning.
        second_row = [self.depot]     # Row 1: actual visited nodes, starting with depot.
        current_node = self.depot

        # While there remain unassigned customers, select the one whose assigned node
        # is closest (in Euclidean distance) to the current node.
        while assignment_list:
            candidate_tuple = min(
                assignment_list,
                key=lambda tup: self.euclidean_distance(current_node, tup[1])
            )
            customer, chosen_node = candidate_tuple
            first_row.append(customer.node_id)
            second_row.append(chosen_node)
            current_node = chosen_node
            assignment_list.remove(candidate_tuple)
        
        # Terminate the route by appending the depot at the end.
        first_row.append(0)
        second_row.append(self.depot)
        
        return first_row, second_row
    
    def get_search_space(self) -> list[tuple[float, float]]:
        """
        Get the search space for the problem.

        Returns:
            search_space (list[tuple[float, float]]): The search space for the problem.
        """
        return [(0.0, 1.0) for _ in range(self.num_customers)]

    def position2route(self, positions: list[float]) -> tuple[float, list[list[Node]]]:
        """
        Decode the positions (continuous values) to a permutation of customer visits,
        perform locker assignment similar to permu2route, split the permutation into routes 
        for vehicles (with depot at the beginning and end of each route), and compute the total 
        travel distance. If a route is infeasible (due to time windows, capacity, or duplicate locker
        visits), a penalty (float('inf')) is returned.
        
        Parameters:
            positions (list[float]): A list of continuous values (one per customer) in [0,1).
        
        Returns:
            A tuple (total_distance, routes) where:
            - total_distance (float): The sum of travel distances of all routes (or float('inf') if infeasible).
            - routes (list[list[Node]]): A list of routes (each route is a list of Node objects).
        """
        # --- Step 1: Decode Positions to an Ordered List of Customers ---
        # Sort customer indices based on the continuous values.
        sorted_indices = sorted(range(len(positions)), key=lambda i: positions[i])
        ordered_customers = [self.customers[i] for i in sorted_indices]
        
        # --- Step 2: Initialize Route and Vehicle State ---
        routes = []
        total_distance = 0.0

        current_route = [self.depot]  # Each route starts with the depot.
        vehicle_index = 0
        current_vehicle = Vehicle(
            v_id=vehicle_index,
            capacity=self.vehicle_capacity,
            load=0.0,
            current_time=0.0
        )
        last_node = self.depot
        route_distance = 0.0
        visited: set[int] = set()  # To check duplicate locker visits.
        
        # --- Step 3: Process Each Customer in the Order ---
        for customer in ordered_customers:
            # Locker Assignment: decide whether to deliver to the customer or its assigned locker.
            chosen_node, _ = self._locker_assignment(customer, explore=False)
            
            # Compute travel time from the last node to the chosen node.
            travel_time = self.euclidean_distance(last_node, chosen_node)
            arrival_time = current_vehicle.current_time + travel_time
            if arrival_time < chosen_node.early:
                arrival_time = chosen_node.early

            # Check if this customer (via chosen node) can be feasibly served:
            # - Time window is met.
            # - Vehicle has enough capacity.
            # - A locker is not visited twice in a non-consecutive manner.
            duplicate_lockers = (
                (chosen_node.node_id in visited) and 
                (chosen_node.node_id != self.depot.node_id) and 
                (chosen_node.node_id != last_node.node_id)
            )
            if (arrival_time > chosen_node.late) or (current_vehicle.load + customer.demand > current_vehicle.capacity) or duplicate_lockers:
                # End current route: return to depot.
                return_to_depot = self.euclidean_distance(last_node, self.depot)
                route_distance += return_to_depot
                current_route.append(self.depot)
                total_distance += route_distance
                routes.append(current_route)

                # Switch to the next available vehicle.
                vehicle_index += 1
                if vehicle_index >= self.num_vehicles:
                    return float('inf'), []  # Infeasible: ran out of vehicles.
                
                current_vehicle = Vehicle(
                    v_id=vehicle_index,
                    capacity=self.vehicle_capacity,
                    load=0.0,
                    current_time=0.0
                )
                # Reset the route details.
                current_route = [self.depot]
                last_node = self.depot
                route_distance = 0.0
                visited.clear()

                # Recalculate travel details from the depot to the chosen node.
                travel_time = self.euclidean_distance(last_node, chosen_node)
                arrival_time = current_vehicle.current_time + travel_time
                if arrival_time < chosen_node.early:
                    arrival_time = chosen_node.early
                if (arrival_time > chosen_node.late) or (current_vehicle.load + customer.demand > current_vehicle.capacity):
                    return float('inf'), []  # Even after starting a new route, the node is infeasible.
            
            # --- Accept the Customer (via chosen_node) ---
            route_distance += travel_time
            current_route.append(chosen_node)
            current_vehicle.current_time = arrival_time + chosen_node.service
            current_vehicle.load += customer.demand
            visited.add(chosen_node.node_id)
            last_node = chosen_node

        # --- Step 4: Finalize the Last Route ---
        return_to_depot = self.euclidean_distance(last_node, self.depot)
        route_distance += return_to_depot
        current_route.append(self.depot)
        total_distance += route_distance
        routes.append(current_route)

        return total_distance, routes


    def _locker_assignment(self, customer: Customer, explore=False, p=0.5) -> tuple[Node, Customer]:
        """
        Assign a customer to a parcel locker station based on the customer's type.
        For type 1 customers, the assigned node is the customer itself.
        For type 2 and type 3 customers, the assigned node is the nearest parcel locker station.
        
        Parameters:
            customer (Customer): The customer object to assign to a parcel locker.
        
        Returns:
            assigned_node (Node): The assigned parcel locker station.
        """
        if customer.customer_type == 1:
            customer.locker_delivery = False
            chosen_node = customer
        elif customer.customer_type == 2:
            customer.assigned_locker
            customer.locker_delivery = True
            chosen_node = customer.assigned_locker
        elif customer.customer_type == 3:
            if not explore:
                chosen_node = self.locker_cache.get(customer.node_id)
                if chosen_node is not None:
                    return chosen_node, customer
            if random.random() < p: # If r < p then home delivery
                chosen_node = customer
                customer.locker_delivery = False
                self.locker_cache[customer.node_id] = customer
            else:
                chosen_node = customer.assigned_locker
                customer.locker_delivery = True
                self.locker_cache[customer.node_id] = customer.assigned_locker
        else: # In case 
            chosen_node = customer
        return chosen_node, customer

    def permu2route(self, permutation: list[int]) -> tuple[float, list[list[Node]]]:
        routes = []
        total_distance = 0.0

        # Initialize the current route with the depot.
        current_route = [self.depot]
        # Initialize vehicle state.
        vehicle_index = 0
        current_vehicle = Vehicle(v_id=vehicle_index, capacity=self.vehicle_capacity, load=0.0, current_time=0.0)
        last_node = self.depot
        route_distance = 0.0
        visited: set[int] = set()
        # Create a lookup dictionary for customers based on their node_id.
        customer_dict = {cust.node_id: cust for cust in self.customers}

        # Process each customer ID from the first row (ignoring the depot markers).
        for cid in permutation[1:-1]:
            customer = customer_dict.get(cid)
            if customer is None:
                # Skip if the customer is not found.
                continue

            # --- Locker Assignment ---
            chosen_node, customer = self._locker_assignment(customer, False)

            # --- Feasibility Check ---
            travel_time = self.euclidean_distance(last_node, chosen_node)
            arrival_time = current_vehicle.current_time + travel_time
            if arrival_time < chosen_node.early:
                arrival_time = chosen_node.early
            # If a locker station is visited more than once (not consecutive), skip it.
            duplicate_lockers = (chosen_node.node_id in visited) and (chosen_node.node_id != self.depot.node_id) and (chosen_node.node_id != last_node.node_id)
            # Check time window and capacity feasibility.
            if (arrival_time > chosen_node.late) or (current_vehicle.load + customer.demand > current_vehicle.capacity) or duplicate_lockers:
                # Terminate the current route: return to depot.
                return_to_depot = self.euclidean_distance(last_node, self.depot)
                route_distance += return_to_depot
                current_route.append(self.depot)
                total_distance += route_distance
                routes.append(current_route)

                # Switch to a new vehicle.
                vehicle_index += 1
                if vehicle_index >= self.num_vehicles:
                    return float('inf'), []
                current_vehicle = Vehicle(v_id=vehicle_index, capacity=self.vehicle_capacity, load=0.0, current_time=0.0)
                # Reset route details.
                current_route = [self.depot]
                last_node = self.depot
                route_distance = 0.0
                visited.clear()

                # Recalculate travel details from depot to the chosen node.
                travel_time = self.euclidean_distance(last_node, chosen_node)
                arrival_time = current_vehicle.current_time + travel_time
                if arrival_time < chosen_node.early:
                    arrival_time = chosen_node.early
                if (arrival_time > chosen_node.late) or (current_vehicle.load + customer.demand > current_vehicle.capacity):
                    return float('inf'), []

            # --- Accept the Customer ---
            route_distance += travel_time
            current_route.append(chosen_node)
            # Update vehicle state: add service time and customer demand.
            current_vehicle.current_time = arrival_time + chosen_node.service
            current_vehicle.load += customer.demand
            visited.add(chosen_node.node_id)
            last_node = chosen_node

        # Finalize the last route: return to depot.
        return_to_depot = self.euclidean_distance(last_node, self.depot)
        route_distance += return_to_depot
        current_route.append(self.depot)
        total_distance += route_distance
        routes.append(current_route)

        return total_distance, routes

    def node2routes(self, permutation: list[Node]) -> tuple[float, list[list[Node]]]:
        """
        Evaluate a permutation of node IDs (integers) that includes both customer and locker nodes
        and convert it into feasible routes. The permutation is expected to include the depot
        (node with id 0) as the first and last elements.

        Parameters:
            permutation (list[int]): A list of node IDs representing the visiting order.
        
        Returns:
            tuple: (total_distance, routes)
                total_distance (float): The sum of the travel distances for all routes. If any route is
                                        infeasible, returns float('inf').
                routes (list[list[Node]]): A list of routes, where each route is a list of Node objects
                                        (starting and ending with the depot).
        """
        routes = []
        total_distance = 0.0

        # Initialize the first route starting at the depot.
        current_route = [self.depot]
        vehicle_index = 0
        current_vehicle = Vehicle(
            v_id=vehicle_index, 
            capacity=self.vehicle_capacity, 
            load=0.0, 
            current_time=0.0
        )
        last_node = self.depot
        route_distance = 0.0
        visited: set[int] = set()

        # Build a lookup dictionary for all nodes by their node_id.
        node_dict = {}
        node_dict[self.depot.node_id] = self.depot
        for cust in self.customers:
            node_dict[cust.node_id] = cust
        for locker in self.lockers:
            node_dict[locker.node_id] = locker

        # Process each node ID from the permutation, skipping the first and last (depot markers)
        for nid in permutation[1:-1]:
            node: Node = node_dict.get(nid.node_id)
            if node is None:
                # If the node ID is not found, skip it.
                continue

            # --- Feasibility Check ---
            travel_time = self.euclidean_distance(last_node, node)
            arrival_time = current_vehicle.current_time + travel_time
            if arrival_time < node.early:
                arrival_time = node.early

            # Check for duplicate locker visits (if the same locker is visited non-consecutively).
            duplicate_lockers = (
                (node.node_id in visited) and 
                (node.node_id != self.depot.node_id) and 
                (node.node_id != last_node.node_id)
            )
            # For customer nodes, get the demand; for lockers or depot, assume zero demand.
            node_demand = node.demand if hasattr(node, "demand") else 0

            if (arrival_time > node.late) or (current_vehicle.load + node_demand > current_vehicle.capacity) or duplicate_lockers:
                # Terminate the current route by returning to the depot.
                return_to_depot = self.euclidean_distance(last_node, self.depot)
                route_distance += return_to_depot
                current_route.append(self.depot)
                total_distance += route_distance
                routes.append(current_route)

                # Switch to the next vehicle.
                vehicle_index += 1
                if vehicle_index >= self.num_vehicles:
                    # If no more vehicles are available, return an infeasible penalty.
                    return float('inf'), []
                current_vehicle = Vehicle(
                    v_id=vehicle_index, 
                    capacity=self.vehicle_capacity, 
                    load=0.0, 
                    current_time=0.0
                )
                # Reset route details.
                current_route = [self.depot]
                last_node = self.depot
                route_distance = 0.0
                visited.clear()

                # Recalculate travel details from the new route start (depot) to the current node.
                travel_time = self.euclidean_distance(last_node, node)
                arrival_time = current_vehicle.current_time + travel_time
                if arrival_time < node.early:
                    arrival_time = node.early
                if (arrival_time > node.late) or (current_vehicle.load + node_demand > current_vehicle.capacity):
                    return float('inf'), []

            # --- Accept the Node ---
            route_distance += travel_time
            current_route.append(node)
            current_vehicle.current_time = arrival_time + node.service
            current_vehicle.load += node_demand
            visited.add(node.node_id)
            last_node = node

        # Finalize the last route by returning to the depot.
        return_to_depot = self.euclidean_distance(last_node, self.depot)
        route_distance += return_to_depot
        current_route.append(self.depot)
        total_distance += route_distance
        routes.append(current_route)

        return total_distance, routes

class Solver:
    def __init__(self, objective_function=None, num_iterations=None):
        self.objective_function = objective_function
        self.num_iterations = num_iterations
        self.global_best_position = None
        self.global_best_fitness = float('inf')
        self.global_best_routes = []
        self.fitness_history = []
    
    def optimize(self, verbose=True) -> tuple[list[any], float]:
        raise NotImplementedError
    
    def plot_fitness_history(self):
        """
        Plots the best fitness value over iterations.
        """
        plt.figure(figsize=(10, 6))
        plt.plot(range(1, self.num_iterations + 1), self.fitness_history, marker='o', linestyle='-', color='b')
        plt.title('Fitness History')
        plt.xlabel('Iteration')
        plt.ylabel('Best Fitness')
        plt.grid(True)
        plt.show()
        # Save the plot to a file
        plt.savefig("output/fitness_history.png")

    def plot_routes(self, routes: list[list[Node]] = None) -> None:
        """
        Plot the routes for the VRP.
        
        Parameters:
            routes (list[list[Node]]): A list of routes, where each route is a list of Node objects.
                                    It is assumed that each route starts and ends at the depot.
        """
        plt.figure(figsize=(10, 8))
        
        # Define a set of colors to cycle through for different routes.
        colors = ['blue', 'green', 'red', 'cyan', 'magenta', 'yellow', 'black', 'orange']
        if routes is None:
            routes = self.global_best_routes
            
        for i, route in enumerate(routes):
            # Extract the x and y coordinates for each node in the route.
            xs = [node.x for node in route]
            ys = [node.y for node in route]
            color = colors[i % len(colors)]
            
            # Plot the route as a line connecting the nodes.
            plt.plot(xs, ys, marker='o', linestyle='-', color=color, label=f"Route {i+1}")
            
            # Optionally, annotate each node with its node_id.
            for node in route:
                plt.text(node.x, node.y, f"{node.node_id}", fontsize=9, color=color,
                        verticalalignment='bottom', horizontalalignment='right')
        
        plt.title("VRP Routes")
        plt.xlabel("X coordinate")
        plt.ylabel("Y coordinate")
        plt.legend()
        plt.grid(True)
        plt.show()
        # Save the plot to a file
        plt.savefig("output/routes.png")

def print_routes(routes: list[list[Node]]):
    print("Number of routes: ", len(routes))
    print("Longest route: ", max([len(route) for route in routes]))
    print("Shortest route: ", min([len(route) for route in routes]))
    for route in routes:
        for node in route:
            print(node.model_dump()["node_id"], end=" ")
        print()

## Solvers

### ACO

In [4]:
%%cython

import asyncio
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import matplotlib.pyplot as plt  # <-- new import
cimport cython
from libc.math cimport hypot
import random

@cython.inline
cdef double euclidean_distance(double x1, double y1, double x2, double y2) nogil:
    return hypot(x1 - x2, y1 - y2)

def export_pheromones_heatmap(pheromones, filename="pheromones_heatmap.png"):
    # Average over delivery modes to get a 2D matrix.
    heatmap = np.mean(pheromones, axis=2)
    plt.figure(figsize=(8,6))
    plt.imshow(heatmap, cmap="hot", interpolation="nearest")
    plt.colorbar()
    plt.title("Pheromone Heat Map")
    plt.savefig(filename)
    plt.close()

class SACO(Solver):
    def __init__(self, problem: Problem, num_ants=20, num_iterations=100,
                 alpha=1.0, beta=2.0, evaporation_rate=0.1, Q=1.0):
        """
        Initializes the Sequential ACO optimizer with 3D pheromone.
        """
        super().__init__(problem.node2routes, num_iterations=num_iterations)
        self.problem: Problem = problem
        self.num_ants = num_ants
        self.num_iterations = num_iterations
        self.alpha = alpha
        self.beta = beta
        self.evaporation_rate = evaporation_rate
        self.Q = Q

        self.n = self.problem.num_customers

        # Initialize 3D pheromone matrix.
        self.pheromones = np.ones((self.n + 1, self.n + 1, 2))
        for j, customer in enumerate(self.problem.customers, start=1):
            if customer.customer_type == 1:
                self.pheromones[:, j, 1] = 0.0
            elif customer.customer_type == 2:
                self.pheromones[:, j, 0] = 0.0

        self.global_best_fitness = float('inf')
        self.best_solution = None  # List of Node objects.
        self.best_solution_transitions = None  # Store transitions for elitist update.
        self.global_best_routes = None
        self.fitness_history = []

    def construct_solution(self) -> tuple:
        """
        Constructs a solution for one ant using a 3D pheromone matrix.
        Returns:
            transitions (list[tuple]): List of (prev_index, j, decision) tuples.
            final_route (list[Node]): The constructed route (with depot start and end).
        """
        cdef int j, d, idx, n_option
        cdef double cx, cy, tx, ty, dist, heuristic, tau, value, total_value, r, cumulative
        cdef list candidate_options, probs
        cdef tuple candidate_coord, selected

        # Initialize current location from the depot.
        cx = self.problem.depot.x
        cy = self.problem.depot.y
        current_location = (cx, cy)
        prev_index = 0  # Depot index
        unvisited = list(range(1, self.n + 1))
        transitions = []
        final_route = [self.problem.depot]

        while unvisited:
            candidate_options = []
            # Use the current location stored in cx,cy
            for j in unvisited:
                customer = self.problem.customers[j - 1]
                # Determine allowed delivery modes based on customer type.
                if customer.customer_type == 1:
                    allowed = [0]  # Only home delivery allowed.
                elif customer.customer_type == 2:
                    allowed = [1]  # Only locker delivery allowed.
                else:
                    allowed = [0, 1]  # Both modes allowed.
                # For each allowed mode, compute the candidate option.
                for d in allowed:
                    if d == 0:
                        tx = customer.x
                        ty = customer.y
                    else:
                        tx = customer.assigned_locker.x
                        ty = customer.assigned_locker.y
                    # Compute Euclidean distance using our inline function.
                    dist = euclidean_distance(cx, cy, tx, ty)
                    heuristic = 1.0 / dist if dist > 0 else 1e6
                    # Get pheromone value; raise it to self.alpha.
                    tau = self.pheromones[prev_index, j, d] ** self.alpha
                    value = tau * (heuristic ** self.beta)
                    candidate_options.append((j, d, value, (tx, ty)))
            # Sum up the candidate values.
            total_value = 0.0
            for option in candidate_options:
                total_value += option[2]
            # Compute probabilities.
            if total_value > 0:
                probs = [option[2] / total_value for option in candidate_options]
            else:
                n_option = len(candidate_options)
                probs = [1.0 / n_option] * n_option
            # Roulette wheel selection.
            r = random.random()
            cumulative = 0.0
            selected = None
            for option, prob in zip(candidate_options, probs):
                cumulative += prob
                if r <= cumulative:
                    selected = (option[0], option[1], option[3])
                    break
            if selected is None:
                # Fallback: choose the last option.
                last_option = candidate_options[-1]
                selected = (last_option[0], last_option[1], last_option[3])
            j, d, candidate_coord = selected
            transitions.append((prev_index, j, d))
            # Retrieve the chosen node.
            if d == 0:
                node = self.problem.customers[j - 1]
            else:
                node = self.problem.customers[j - 1].assigned_locker
            final_route.append(node)
            # Update current location.
            cx, cy = candidate_coord
            current_location = candidate_coord
            prev_index = j
            unvisited.remove(j)
        final_route.append(self.problem.depot)
        return transitions, final_route

    def update_pheromones(self, solutions: list):
        """
        Updates 3D pheromones using the transitions from ant solutions.
        Each solution is a tuple (transitions, fitness).
        """
        # Evaporate pheromones.
        self.pheromones *= (1 - self.evaporation_rate)
        for transitions, fitness in solutions:
            deposit = self.Q / (fitness if fitness > 0 else 1e-8)
            for (i, j, d) in transitions:
                self.pheromones[i, j, d] += deposit

        # Elitist update using best solution transitions.
        if self.best_solution_transitions is not None:
            deposit = self.Q / (self.global_best_fitness if self.global_best_fitness > 0 else 1e-8)
            for (i, j, d) in self.best_solution_transitions:
                self.pheromones[i, j, d] += deposit

        # Enforce fixed settings for customer types.
        for j, customer in enumerate(self.problem.customers, start=1):
            if customer.customer_type == 1:
                self.pheromones[:, j, 1] = 0.0
            elif customer.customer_type == 2:
                self.pheromones[:, j, 0] = 0.0

    def optimize(self, verbose=True):
        """
        Executes the ACO optimization process using 3D pheromone.
        """
        for iteration in range(self.num_iterations):
            solutions = []
            for ant in range(self.num_ants):
                transitions, route = self.construct_solution()
                fitness, routes = self.problem.node2routes(route)
                solutions.append((transitions, fitness))
                if fitness < self.global_best_fitness:
                    self.global_best_fitness = fitness
                    self.best_solution = route
                    self.best_solution_transitions = transitions
                    self.global_best_routes = routes
            self.update_pheromones(solutions)
            self.fitness_history.append(self.global_best_fitness)
            if verbose:
                print(f"Iteration {iteration + 1}: Best Fitness = {self.global_best_fitness}")
        return self.best_solution, self.global_best_fitness, self.global_best_routes

class PACO(Solver):
    def __init__(self, problem: Problem, num_ants=1000, num_iterations=100, batch_size=100, alpha=1.0, beta=2.0, evaporation_rate=0.1, Q=1.0):
        """
        Initializes the ACO optimizer for the VRPPL problem using a 3D pheromone matrix and sets up key parameters for
        ant colony optimization with batch processing for parallel execution.

        Parameters:
            problem (Problem): The routing problem instance containing customer data and related properties.
            num_ants (int, optional): The number of ants to simulate per iteration, influencing exploration. Defaults to 1000.
            num_iterations (int, optional): The number of iterations the algorithm will perform to refine solutions. Defaults to 100.
            batch_size (int, optional): The number of iterations to process in each parallel batch. Defaults to 100.
            alpha (float, optional): The factor controlling the influence of pheromone trails in ant decision-making. Defaults to 1.0.
            beta (float, optional): The factor controlling the influence of heuristic information (e.g., distance) in ant decisions. Defaults to 2.0.
            evaporation_rate (float, optional): The rate at which pheromone intensity diminishes over time, promoting exploration. Defaults to 0.1.
        """
        super().__init__(problem.node2routes, num_iterations=num_iterations)
        self.problem: Problem = problem
        self.num_iterations = num_iterations
        self.num_ants = num_ants
        self.batch_size = batch_size
        self.alpha = alpha
        self.beta = beta
        self.evaporation_rate = evaporation_rate
        self.Q = Q

        # n: number of customers
        self.n = self.problem.num_customers

        # Initialize a single 3D pheromone matrix with dimensions (n+1) x (n+1) x 2.
        self.pheromones = np.ones((self.n + 1, self.n + 1, 2))
        for j, customer in enumerate(self.problem.customers, start=1):
            if customer.customer_type == 1:
                self.pheromones[:, j, 1] = 0.0
            elif customer.customer_type == 2:
                self.pheromones[:, j, 0] = 0.0

        self.global_best_fitness = float('inf')
        self.global_best_solution = None
        self.global_best_routes = None
        self.fitness_history = []

    def construct_solution(self):
        """
        Constructs one ant's solution using the integrated 3D pheromone matrix.
        """
        current_location = (self.problem.depot.x, self.problem.depot.y)
        prev_index = 0
        unvisited = list(range(1, self.n + 1))
        transitions = []
        final_route = [self.problem.depot]

        while unvisited:
            candidate_options = []
            for j in unvisited:
                customer = self.problem.customers[j - 1]
                allowed = [0] if customer.customer_type == 1 else [1] if customer.customer_type == 2 else [0, 1]
                for d in allowed:
                    candidate_coord = (customer.x, customer.y) if d == 0 else (customer.assigned_locker.x, customer.assigned_locker.y)
                    distance = euclidean_distance(current_location, candidate_coord)
                    heuristic = 1.0 / distance if distance > 0 else 1e6
                    tau = self.pheromones[prev_index, j, d]
                    value = (tau ** self.alpha) * (heuristic ** self.beta)
                    candidate_options.append((j, d, value, candidate_coord))

            total_value = sum(option[2] for option in candidate_options)
            probs = [option[2] / total_value for option in candidate_options] if total_value > 0 else [1 / len(candidate_options)] * len(candidate_options)
            r = random.random()
            cumulative = 0.0
            selected = None
            for (j, d, _, candidate_coord), prob in zip(candidate_options, probs):
                cumulative += prob
                if r <= cumulative:
                    selected = (j, d, candidate_coord)
                    break
            if selected is None:
                selected = candidate_options[-1][0:2] + (candidate_options[-1][3],)

            j, d, candidate_coord = selected
            transitions.append((prev_index, j, d))
            final_route.append(self.problem.customers[j - 1] if d == 0 else self.problem.customers[j - 1].assigned_locker)
            current_location = candidate_coord
            prev_index = j
            unvisited.remove(j)
        final_route.append(self.problem.depot)
        return transitions, final_route

    def update_pheromones(self, solutions):
        """Updates the integrated 3D pheromone matrix based on the ant solutions."""
        self.pheromones *= (1 - self.evaporation_rate)
        for transitions, fitness in solutions:
            deposit = self.Q / (fitness if fitness > 0 else 1e-8)
            for (i, j, d) in transitions:
                self.pheromones[i, j, d] += deposit

        for j, customer in enumerate(self.problem.customers, start=1):
            if customer.customer_type == 1:
                self.pheromones[:, j, 1] = 0.0
            elif customer.customer_type == 2:
                self.pheromones[:, j, 0] = 0.0

    def single_ant_solution(self):
        """Constructs and evaluates a single ant's solution."""
        transitions, final_route = self.construct_solution()
        fitness, routes = self.problem.node2routes(final_route)
        return transitions, fitness, final_route, routes

    def optimize(self, verbose=True):
        """Executes the ACO optimization process using parallel processing."""
        with ThreadPoolExecutor() as executor:
            for iteration in range(self.num_iterations):
                solutions = []
                for i in range(0, self.num_ants, self.batch_size):
                    futures = [executor.submit(self.single_ant_solution) for _ in range(self.batch_size)]
                    batch_results = [future.result() for future in futures]
                    solutions.extend(batch_results)
                self.update_pheromones([(transitions, fitness) for (transitions, fitness, _, _) in solutions])
                for (transitions, fitness, final_route, routes) in solutions:
                    if fitness < self.global_best_fitness:
                        self.global_best_fitness = fitness
                        self.global_best_solution = final_route
                        self.global_best_routes = routes
                self.fitness_history.append(self.global_best_fitness)
                if verbose:
                    print(f"Iteration {iteration + 1}: Best Fitness = {self.global_best_fitness}")
        return self.global_best_solution, self.global_best_fitness, self.global_best_routes
    
class Colony:
    """
    A helper class representing an independent ACO colony.
    Each colony maintains its own (local) pheromone matrix and runs for a number of iterations.
    """
    def __init__(self, colony_id, problem, num_iterations, sync_interval,
                 num_ants, alpha, beta, evaporation_rate, Q, initial_pheromones):
        self.colony_id = colony_id
        self.problem = problem
        self.num_iterations = num_iterations
        self.sync_interval = sync_interval  # How often to synchronize with the global pheromone matrix
        self.num_ants = num_ants
        self.alpha = alpha
        self.beta = beta
        self.evaporation_rate = evaporation_rate
        self.Q = Q
        # Start with a local copy of the global pheromone matrix.
        self.pheromones = np.copy(initial_pheromones)
        self.local_best_fitness = float('inf')
        self.local_best_solution = None

    def construct_solution(self):
        """
        Constructs one ant's solution using the local 3D pheromone matrix.
        Returns:
            transitions: list of (prev_index, j, decision) tuples.
            final_route: list of Node objects representing the constructed route.
        """
        current_location = (self.problem.depot.x, self.problem.depot.y)
        prev_index = 0
        unvisited = list(range(1, self.problem.num_customers + 1))
        transitions = []
        final_route = [self.problem.depot]

        while unvisited:
            candidate_options = []
            for j in unvisited:
                customer = self.problem.customers[j - 1]
                # Determine allowed delivery modes based on customer type.
                if customer.customer_type == 1:
                    allowed = [0]
                elif customer.customer_type == 2:
                    allowed = [1]
                else:  # Flexible customer (type 3)
                    allowed = [0, 1]
                for d in allowed:
                    # Use home coordinates if d==0, locker coordinates if d==1.
                    candidate_coord = (customer.x, customer.y) if d == 0 else (customer.assigned_locker.x, customer.assigned_locker.y)
                    distance = euclidean_distance(current_location, candidate_coord)
                    heuristic = 1.0 / distance if distance > 0 else 1e6
                    tau = self.pheromones[prev_index, j, d]
                    value = (tau ** self.alpha) * (heuristic ** self.beta)
                    candidate_options.append((j, d, value, candidate_coord))
            total_value = sum(option[2] for option in candidate_options)
            if total_value > 0:
                probs = [option[2] / total_value for option in candidate_options]
            else:
                probs = [1 / len(candidate_options)] * len(candidate_options)
            r = random.random()
            cumulative = 0.0
            selected = None
            for (j, d, _, candidate_coord), prob in zip(candidate_options, probs):
                cumulative += prob
                if r <= cumulative:
                    selected = (j, d, candidate_coord)
                    break
            if selected is None:
                selected = candidate_options[-1][0:2] + (candidate_options[-1][3],)
            j, d, candidate_coord = selected
            transitions.append((prev_index, j, d))
            # Append the chosen node: home (if d==0) or the assigned locker (if d==1).
            if d == 0:
                final_route.append(self.problem.customers[j - 1])
            else:
                final_route.append(self.problem.customers[j - 1].assigned_locker)
            current_location = candidate_coord
            prev_index = j
            unvisited.remove(j)
        final_route.append(self.problem.depot)
        return transitions, final_route

    def update_local_pheromones(self, solutions):
        """Local pheromone update for the colony."""
        # Evaporate pheromones.
        self.pheromones *= (1 - self.evaporation_rate)
        for transitions, fitness in solutions:
            deposit = self.Q / (fitness if fitness > 0 else 1e-8)
            for (i, j, d) in transitions:
                self.pheromones[i, j, d] += deposit
        # Enforce the fixed pheromone settings for customer types.
        for j, customer in enumerate(self.problem.customers, start=1):
            if customer.customer_type == 1:
                self.pheromones[:, j, 1] = 0.0
            elif customer.customer_type == 2:
                self.pheromones[:, j, 0] = 0.0

    def single_ant_solution(self):
        """
        Constructs one ant's solution and evaluates its fitness.
        Returns:
            (transitions, fitness)
        """
        transitions, final_route = self.construct_solution()
        fitness, _ = self.problem.node2routes(final_route)
        return transitions, fitness

    async def run(self, global_pheromone_queue):
        print(f"[Colony {self.colony_id}] Starting optimization for {self.num_iterations} iterations.")
        for iteration in range(self.num_iterations):
            solutions = [self.single_ant_solution() for _ in range(self.num_ants)]
            self.update_local_pheromones(solutions)
            # Update local best.
            for transitions, fitness in solutions:
                if fitness < self.local_best_fitness:
                    self.local_best_fitness = fitness

            # Yield control to allow other colonies to run.
            await asyncio.sleep(0)
            
            if iteration % 10 == 0:
                print(f"[Colony {self.colony_id}] Iteration {iteration+1}/{self.num_iterations} complete. Local best fitness: {self.local_best_fitness}")
            if iteration % self.sync_interval == 0:
                print(f"[Colony {self.colony_id}] Pushing pheromone update at iteration {iteration+1}.")
                await global_pheromone_queue.put((self.colony_id, np.copy(self.pheromones)))
        print(f"[Colony {self.colony_id}] Optimization complete. Final local best fitness: {self.local_best_fitness}")
        return self.local_best_fitness

async def global_pheromone_manager(global_pheromones, num_colonies, global_pheromone_queue, sync_interval):
    """
    Aggregates updates from colonies and updates the global pheromone matrix periodically.
    """
    print("[Global Manager] Starting global pheromone management.")
    iteration = 0
    while True:
        updates = []
        # Try to gather updates from all colonies with a timeout to prevent freezing.
        for _ in range(num_colonies):
            try:
                update = await asyncio.wait_for(global_pheromone_queue.get(), timeout=30)
                updates.append(update[1])
                print(f"[Global Manager] Received update from Colony {update[0]}.")
            except asyncio.TimeoutError:
                print("[Global Manager] Timeout waiting for colony update. Proceeding with available updates.")
                break
        if updates:
            aggregated = np.mean(updates, axis=0)
            global_pheromones[:] = (global_pheromones + aggregated) / 2
            print(f"[Global Manager] Aggregated pheromone update at sync iteration {iteration+1}.")
        else:
            print("[Global Manager] No updates received in this cycle.")
        iteration += 1
        await asyncio.sleep(sync_interval)

async def run_multicolony(problem, num_colonies, num_iterations, num_ants, sync_interval,
                           alpha, beta, evaporation_rate, Q, global_pheromones):
    """
    Launches multiple colonies asynchronously and returns the best fitness found and the final global pheromones.
    """
    print(f"[Multicolony] Starting optimization with {num_colonies} colonies, each running {num_iterations} iterations.")
    global_pheromone_queue = asyncio.Queue()
    colonies = [
        Colony(
            colony_id=i,
            problem=problem,
            num_iterations=num_iterations,
            sync_interval=sync_interval,
            num_ants=num_ants,
            alpha=alpha,
            beta=beta,
            evaporation_rate=evaporation_rate,
            Q=Q,
            initial_pheromones=global_pheromones
        )
        for i in range(num_colonies)
    ]
    colony_tasks = [asyncio.create_task(colony.run(global_pheromone_queue)) for colony in colonies]
    manager_task = asyncio.create_task(
        global_pheromone_manager(global_pheromones, num_colonies, global_pheromone_queue, sync_interval)
    )
    # Wait for all colonies to finish.
    results = await asyncio.gather(*colony_tasks)
    # Cancel the manager task once colonies have finished.
    manager_task.cancel()
    best_overall = min(results)
    print(f"[Multicolony] All colonies complete. Best overall fitness: {best_overall}")
    return best_overall, global_pheromones

class MultiColony(Solver):
    """
    An asynchronous, multicolony ACO solver.
    Each colony runs independently and periodically updates a shared global pheromone matrix.
    """
    def __init__(self, problem, num_colonies=4, num_iterations=100, num_ants=100, sync_interval=10,
                 alpha=1.0, beta=2.0, evaporation_rate=0.1, Q=1.0):
        """
        Initializes the asynchronous multicolony ACO solver.
        
        Parameters:
            problem (Problem): The routing problem instance.
            num_colonies (int): Number of independent colonies.
            num_iterations (int): Iterations per colony.
            num_ants (int): Number of ants per colony per iteration.
            sync_interval (int): Frequency (in iterations) at which colonies push updates.
            alpha (float): Influence of pheromone.
            beta (float): Influence of heuristic.
            evaporation_rate (float): Pheromone evaporation rate.
            Q (float): Constant used for pheromone deposit.
        """
        super().__init__(problem.node2routes, num_iterations=num_iterations)
        self.problem = problem
        self.num_colonies = num_colonies
        self.num_iterations = num_iterations
        self.num_ants = num_ants
        self.sync_interval = sync_interval
        self.alpha = alpha
        self.beta = beta
        self.evaporation_rate = evaporation_rate
        self.Q = Q
        self.n = self.problem.num_customers

        # Initialize the global pheromone matrix.
        self.global_pheromones = np.ones((self.n + 1, self.n + 1, 2))
        for j, customer in enumerate(self.problem.customers, start=1):
            if customer.customer_type == 1:
                self.global_pheromones[:, j, 1] = 0.0
            elif customer.customer_type == 2:
                self.global_pheromones[:, j, 0] = 0.0

        self.global_best_fitness = float('inf')
        self.global_best_solution = None  # Extend this if you want to store routes/solutions.
        self.fitness_history = []

    def optimize(self, verbose=True):
        """
        Runs the asynchronous multicolony ACO optimization and returns the best solution found.
        """
        if verbose:
            print("[AsyncMulticolonyACO] Starting optimization.")
        best_fitness, final_pheromones = asyncio.run(
            run_multicolony(
                self.problem,
                num_colonies=self.num_colonies,
                num_iterations=self.num_iterations,
                num_ants=self.num_ants,
                sync_interval=self.sync_interval,
                alpha=self.alpha,
                beta=self.beta,
                evaporation_rate=self.evaporation_rate,
                Q=self.Q,
                global_pheromones=self.global_pheromones
            )
        )
        # Export pheromone matrix as heat map after run completion.
        self.global_best_fitness = best_fitness
        self.fitness_history.append(best_fitness)
        if verbose:
            print("[AsyncMulticolonyACO] Optimization complete. Best fitness =", best_fitness)
        # Return best solution (if stored) and best fitness.
        return self.global_best_solution, self.global_best_fitness


Error compiling Cython file:
------------------------------------------------------------
...
    plt.colorbar()
    plt.title("Pheromone Heat Map")
    plt.savefig(filename)
    plt.close()

class SACO(Solver):
           ^
------------------------------------------------------------

/home/aphuc/.cache/ipython/cython/_cython_magic_6f860be4247fe6e44707ab4a3a3151a17e0362f5.pyx:24:11: undeclared name not builtin: Solver

Error compiling Cython file:
------------------------------------------------------------
...
            for j in unvisited:
                customer = self.problem.customers[j - 1]
                allowed = [0] if customer.customer_type == 1 else [1] if customer.customer_type == 2 else [0, 1]
                for d in allowed:
                    candidate_coord = (customer.x, customer.y) if d == 0 else (customer.assigned_locker.x, customer.assigned_locker.y)
                    distance = euclidean_distance(current_location, candidate_coord)
                         

In [5]:
instance = Problem()
instance.load_data("data/25/C101_co_25.txt")

start_time = time.time()
# Use PACO as example (SACO can be used similarly after modification)
aco = PACO(instance, num_iterations=50, num_ants=1000, batch_size=100,
            alpha=2.5, beta=1.0, evaporation_rate=0.1, Q=1.0)
export_pheromones_heatmap(aco.pheromones, "output/initial_pheromones.png")
aco.optimize()
print("Elapsed time (s):", time.time() - start_time)
print("Distance: ", aco.global_best_fitness)
print([node.node_id for node in aco.global_best_solution])
print_routes(aco.global_best_routes)
aco.plot_fitness_history()
aco.plot_routes()
export_pheromones_heatmap(aco.pheromones, "output/pheromones_heatmap.png")

NameError: name 'Problem' is not defined