In [27]:
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class TOPTWNode:
    i: int
    x: float
    y: float
    service: float    # service duration
    profit: float     # profit / score
    tw_open: float    # earliest service time
    tw_close: float   # latest service time
    f: int            # extra field from original format
    a: int            # length of aux_list in original file
    aux_list: List[int]
    demand: int = 0   # NEW: demand of this node (0 for depot)

@dataclass
class TOPTWInstance:
    path: str
    k: int            # number of vehicles / routes allowed
    v: int            # carried over from source format
    N: int            # number of customers (excl. depot)
    t: int            # carried over from source format
    D: Optional[float]
    Q: Optional[float]   # vehicle capacity we'll assign
    Tmax: float          # usually depot.tw_close in source data
    nodes: List[TOPTWNode]


In [28]:
import math
import pathlib
import re
from typing import Tuple

def _floor_to_decimals(x: float, decimals: int) -> float:
    if decimals is None:
        return float(x)
    f = 10 ** decimals
    return math.floor(x * f) / f

def _auto_rounding_decimals_from_name(path: str) -> int:
    # c101, r201, rc2xx -> Solomon (1 decimal); else Cordeau (2 decimals)
    name = pathlib.Path(path).name.lower()
    if re.search(r'(?:^|_)((c|r|rc)\d+)', name):
        return 1
    return 2

def build_distance_time_matrices(inst: TOPTWInstance,
                                 speed: float = 1.0,
                                 rounding: str = "auto"  # "auto"|"solomon"|"cordeau"|"none"
                                 ) -> Tuple[List[List[float]], List[List[float]]]:
    pts = [(n.x, n.y) for n in inst.nodes]
    n = len(pts)
    if rounding == "auto":
        dec = _auto_rounding_decimals_from_name(inst.path)
    elif rounding == "solomon":
        dec = 1
    elif rounding == "cordeau":
        dec = 2
    else:
        dec = None

    def euclid(a, b): return math.hypot(a[0]-b[0], a[1]-b[1])

    dist = [[0.0]*n for _ in range(n)]
    tmat = [[0.0]*n for _ in range(n)]
    for i in range(n):
        for j in range(n):
            if i == j:
                d = 0.0
            else:
                d = euclid(pts[i], pts[j])
                d = _floor_to_decimals(d, dec)
            dist[i][j] = d
            tmat[i][j] = d / speed if speed > 0 else d
    return dist, tmat

In [29]:
import json

def load_capacitated_instance(json_path: str) -> TOPTWInstance:
    """Load a JSON file created by convert_to_capacitated_json back into TOPTWInstance."""
    with open(json_path, "r") as f:
        data = json.load(f)

    nodes: List[TOPTWNode] = []
    for nd in data["nodes"]:
        nodes.append(
            TOPTWNode(
                i=nd["i"],
                x=nd["x"],
                y=nd["y"],
                service=nd["service"],
                profit=nd["profit"],
                tw_open=nd["tw_open"],
                tw_close=nd["tw_close"],
                f=nd["f"],
                a=nd["a"],
                aux_list=nd["aux_list"],
                demand=nd.get("demand",0),
            )
        )

    inst = TOPTWInstance(
        path=data.get("name", json_path),
        k=data["k"],
        v=data["v"],
        N=data["N"],
        t=data["t"],
        D=data["D"],
        Q=data["Q"],
        Tmax=data["Tmax"],
        nodes=nodes,
    )
    return inst


In [30]:
from dataclasses import dataclass
from typing import List, Tuple, Optional
import heapq
import numpy as np

@dataclass
class Route:
    """One vehicle route: list of node indices (0 = depot)"""
    nodes: List[int]          # e.g. [0, 5, 12, 0]
    load: int = 0
    time: float = 0.0         # arrival time at the *last* node (before returning)
    profit: float = 0.0
    cost: float = 0.0         # travel distance cost of the route (excluding fixed cost)

    def copy(self) -> "Route":
        return Route(nodes=self.nodes[:], load=self.load,
                     time=self.time, profit=self.profit, cost=self.cost)

In [31]:
def can_insert(inst: TOPTWInstance, dist: List[List[float]], tmat: List[List[float]],
               route: Route, pos: int, cust: int,
               hard_tw: bool = True, beta: float = 0.0) -> Tuple[bool, float, float]:
    """
    Check if customer `cust` can be inserted at position `pos` in `route`.
    Returns (feasible, new_load, new_arrival_at_cust, new_route_time, lateness_at_cust)
    """
    n = len(route.nodes)
    if pos == 0 or pos == n:               # cannot insert before start or after end
        return False, 0, 0, 0, 0

    prev = route.nodes[pos-1]
    nxt  = route.nodes[pos]

    # ----- capacity -----
    new_load = route.load + inst.nodes[cust].demand
    if new_load > inst.Q:
        return False, 0, 0, 0, 0

    # ----- time -----
    arrival_prev = route.time if pos == 1 else \
        route.time + inst.nodes[prev].service + tmat[prev][cust]

    # earliest possible start of service at cust
    earliest = max(arrival_prev, inst.nodes[cust].tw_open)

    if hard_tw and earliest > inst.nodes[cust].tw_close:
        return False, 0, 0, 0, 0

    # service start (wait if we arrive early)
    start_service = earliest
    lateness = 0.0
    if not hard_tw:
        if start_service > inst.nodes[cust].tw_close:
            lateness = start_service - inst.nodes[cust].tw_close
            start_service = start_service   # we still serve at arrival time

    # departure from cust
    departure = start_service + inst.nodes[cust].service

    # time to next node
    arrival_next = departure + tmat[cust][nxt]

    # check TW of the *next* node (must shift its start time)
    nxt_node = inst.nodes[nxt]
    earliest_next = max(arrival_next, nxt_node.tw_open)
    if hard_tw and earliest_next > nxt_node.tw_close:
        return False, 0, 0, 0, 0

    # new route time = arrival at the node *after* the inserted one
    new_route_time = arrival_next + nxt_node.service   # ready to leave nxt

    # optional global route duration limit
    if inst.Tmax is not None:
        return_time = new_route_time + tmat[nxt][0]   # back to depot
        if return_time > inst.Tmax + 1e-6:
            return False, 0, 0, 0, 0

    return True, new_load, start_service, new_route_time, lateness

In [32]:
def greedy_profit_density(inst: TOPTWInstance,
                         dist: List[List[float]],
                         tmat: List[List[float]],
                         K: int,
                         hard_tw: bool = True,
                         f: float = 0.0,
                         beta: float = 0.0) -> List[Route]:
    """
    Classic greedy: at each step insert the customer with highest
    p_i / (d_0i + s_i) into the *cheapest* feasible position of any route.
    """
    unvisited = set(range(1, inst.N+1))
    routes: List[Route] = [Route(nodes=[0, 0]) for _ in range(K)]  # empty routes

    def density(c):
        node = inst.nodes[c]
        d0 = dist[0][c]
        return node.profit / (d0 + node.service + 1e-9)

    while unvisited:
        # 1. candidate with best density
        best_c = max(unvisited, key=density, default=None)
        if best_c is None:
            break

        inserted = False
        best_route_idx = -1
        best_pos = -1
        best_extra_cost = np.inf

        for r_idx, route in enumerate(routes):
            # try every possible slot (between two consecutive nodes)
            for pos in range(1, len(route.nodes)):
                feasible, new_load, _, new_time, lateness = can_insert(
                    inst, dist, tmat, route, pos, best_c, hard_tw, beta)

                if not feasible:
                    continue

                # extra travel cost of the insertion
                prev = route.nodes[pos-1]
                nxt  = route.nodes[pos]
                old_cost = dist[prev][nxt]
                new_cost = dist[prev][best_c] + dist[best_c][nxt]
                extra = new_cost - old_cost

                if extra < best_extra_cost:
                    best_extra_cost = extra
                    best_route_idx = r_idx
                    best_pos = pos

        if best_route_idx == -1:                     # cannot insert anywhere
            unvisited.remove(best_c)
            continue

        # ----- perform insertion -----
        r = routes[best_route_idx]
        cnode = inst.nodes[best_c]
        r.nodes.insert(best_pos, best_c)
        r.load += cnode.demand
        r.profit += cnode.profit
        r.cost += best_extra_cost
        # update arrival time at the node *after* insertion
        _, _, _, new_time, _ = can_insert(inst, dist, tmat, r, best_pos+1, best_c,
                                          hard_tw, beta)
        r.time = new_time

        unvisited.remove(best_c)

    # remove completely empty routes (they cost a fixed fee if f>0)
    
    routes = [r for r in routes if len(r.nodes) > 2]
    return routes

In [33]:
def evaluate_solution(inst: TOPTWInstance,
                      routes: List[Route],
                      dist: List[List[float]],
                      f: float = 0.0,
                      beta: float = 0.0,
                      hard_tw: bool = True,
                      alpha: float = 0.1) -> float:
    """
    Returns the *objective value* (to be maximised):
        Σ profit – α * Σ distance – f * #vehicles – β * Σ lateness
    
    Args:
        alpha: Distance weight. Typical values:
               - 0.01: profit outweighs distance 100:1
               - 0.1:  profit outweighs distance 10:1  (DEFAULT)
               - 0.5:  profit outweighs distance 2:1
    """
    total_profit = sum(r.profit for r in routes)
    total_dist   = sum(r.cost for r in routes)
    n_veh        = len(routes)
    lateness     = 0.0

    if not hard_tw:                     # recompute lateness for soft-TW
        for r in routes:
            cur_time = 0.0
            for i in range(1, len(r.nodes)-1):
                prev = r.nodes[i-1]
                cur  = r.nodes[i]
                cur_time = max(cur_time + inst.nodes[prev].service + dist[prev][cur],
                               inst.nodes[cur].tw_open)
                if cur_time > inst.nodes[cur].tw_close:
                    lateness += cur_time - inst.nodes[cur].tw_close
                cur_time += inst.nodes[cur].service

    return total_profit - alpha * total_dist - f * n_veh - beta * lateness

In [34]:
import time
import copy
from typing import List, Tuple, Optional, Dict
import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass, field

# Assumes your existing Route, TOPTWInstance, and helper functions are already defined

@dataclass
class SearchMetrics:
    """Track local search performance"""
    iteration: int = 0
    objective_history: List[float] = field(default_factory=list)
    time_history: List[float] = field(default_factory=list)
    operator_counts: Dict[str, int] = field(default_factory=lambda: {
        'insert': 0, 'relocate': 0, 'swap': 0, '2opt': 0
    })
    improvement_counts: Dict[str, int] = field(default_factory=lambda: {
        'insert': 0, 'relocate': 0, 'swap': 0, '2opt': 0
    })
    customers_served_history: List[int] = field(default_factory=list)
    
    def log_iteration(self, obj: float, elapsed: float, customers: int):
        self.iteration += 1
        self.objective_history.append(obj)
        self.time_history.append(elapsed)
        self.customers_served_history.append(customers)


class LocalSearch:
    def __init__(self, inst: TOPTWInstance, dist: List[List[float]], 
                 tmat: List[List[float]], hard_tw: bool = True, 
                 f: float = 0.0, beta: float = 0.0, alpha: float = 0.1, seed: int = 42):
        self.inst = inst
        self.dist = dist
        self.tmat = tmat
        self.hard_tw = hard_tw
        self.f = f
        self.beta = beta
        self.alpha = alpha  # Distance weight (profit weight is implicitly 1.0)
        self.rng = np.random.default_rng(seed)
        self.metrics = SearchMetrics()
        
    def copy_solution(self, routes: List[Route]) -> List[Route]:
        """Deep copy of solution"""
        return [r.copy() for r in routes]
    
    def get_unvisited(self, routes: List[Route]) -> set:
        """Get set of unvisited customers"""
        visited = set()
        for r in routes:
            visited.update(r.nodes[1:-1])  # exclude depot
        return set(range(1, self.inst.N + 1)) - visited
    
    def recalculate_route_metrics(self, route: Route):
        """Recalculate load, cost, time, and profit for a route"""
        route.load = sum(self.inst.nodes[c].demand for c in route.nodes[1:-1])
        route.profit = sum(self.inst.nodes[c].profit for c in route.nodes[1:-1])
        
        # Calculate cost (distance)
        route.cost = 0.0
        for i in range(len(route.nodes) - 1):
            route.cost += self.dist[route.nodes[i]][route.nodes[i+1]]
        
        # Calculate time (arrival at last customer before depot)
        route.time = 0.0
        for i in range(1, len(route.nodes) - 1):
            prev = route.nodes[i-1]
            curr = route.nodes[i]
            route.time = max(route.time + self.inst.nodes[prev].service + self.tmat[prev][curr],
                           self.inst.nodes[curr].tw_open)
            if i == len(route.nodes) - 2:  # last customer
                route.time += self.inst.nodes[curr].service
    
    def is_route_feasible(self, route: Route) -> bool:
        """Check if route satisfies capacity and time window constraints"""
        # Check capacity
        if route.load > self.inst.Q:
            return False
        
        # Check time windows
        current_time = 0.0
        for i in range(1, len(route.nodes)):
            prev = route.nodes[i-1]
            curr = route.nodes[i]
            
            # Travel time + service at previous
            current_time += self.inst.nodes[prev].service + self.tmat[prev][curr]
            
            # Wait if arriving early
            current_time = max(current_time, self.inst.nodes[curr].tw_open)
            
            # Check if too late
            if current_time > self.inst.nodes[curr].tw_close + 1e-6:
                return False
        
        # Check route duration (time to return to depot)
        if self.inst.Tmax is not None:
            if current_time > self.inst.Tmax + 1e-6:
                return False
        
        return True
    
    # ==================== NEIGHBORHOOD OPERATORS ====================
    
    def try_insert(self, routes: List[Route], current_obj: float) -> Tuple[bool, List[Route], str, float]:
        """Insert operator: find BEST insertion of unvisited customer
        Returns: (feasible, best_routes, message, delta_obj)
        """
        unvisited = self.get_unvisited(routes)
        if not unvisited:
            return False, routes, "", 0.0
        
        best_routes = None
        best_delta = 0.0
        best_message = ""
        
        for customer in unvisited:
            for r_idx in range(len(routes)):
                route = routes[r_idx]
                # Try all positions in route
                for pos in range(1, len(route.nodes)):
                    new_route = route.copy()
                    new_route.nodes.insert(pos, customer)
                    self.recalculate_route_metrics(new_route)
                    
                    if self.is_route_feasible(new_route):
                        new_routes = self.copy_solution(routes)
                        new_routes[r_idx] = new_route
                        new_obj = evaluate_solution(self.inst, new_routes, self.dist, 
                                                   self.f, self.beta, self.hard_tw, self.alpha)
                        
                        delta = new_obj - current_obj
                        if delta > best_delta + 1e-6:
                            best_delta = delta
                            best_routes = new_routes
                            best_message = f"INSERT: Added customer {customer} to route {r_idx} at position {pos} (Δ={delta:.2f})"
        
        if best_routes is not None:
            return True, best_routes, best_message, best_delta
        return False, routes, "", 0.0
    
    def try_relocate(self, routes: List[Route], current_obj: float) -> Tuple[bool, List[Route], str, float]:
        """Relocate operator: find BEST relocation of a customer
        Returns: (feasible, best_routes, message, delta_obj)
        """
        best_routes = None
        best_delta = 0.0
        best_message = ""
        
        for r1_idx in range(len(routes)):
            route1 = routes[r1_idx]
            if len(route1.nodes) <= 2:  # Empty route
                continue
            
            for cust_pos in range(1, len(route1.nodes) - 1):
                customer = route1.nodes[cust_pos]
                
                # Try inserting into all routes (including same route)
                for r2_idx in range(len(routes)):
                    route2 = routes[r2_idx]
                    
                    for insert_pos in range(1, len(route2.nodes)):
                        # Skip if same position in same route
                        if r1_idx == r2_idx and insert_pos == cust_pos:
                            continue
                        
                        # Create new routes
                        new_route1 = route1.copy()
                        new_route1.nodes.pop(cust_pos)
                        self.recalculate_route_metrics(new_route1)
                        
                        new_route2 = route2.copy()
                        # Adjust insert position if same route and removing before insertion
                        adj_insert_pos = insert_pos
                        if r1_idx == r2_idx and cust_pos < insert_pos:
                            adj_insert_pos -= 1
                        new_route2.nodes.insert(adj_insert_pos, customer)
                        self.recalculate_route_metrics(new_route2)
                        
                        if self.is_route_feasible(new_route1) and self.is_route_feasible(new_route2):
                            new_routes = self.copy_solution(routes)
                            new_routes[r1_idx] = new_route1
                            if r1_idx != r2_idx:
                                new_routes[r2_idx] = new_route2
                            else:
                                new_routes[r1_idx] = new_route2
                            
                            new_obj = evaluate_solution(self.inst, new_routes, self.dist, 
                                                       self.f, self.beta, self.hard_tw, self.alpha)
                            
                            delta = new_obj - current_obj
                            if delta > best_delta + 1e-6:
                                best_delta = delta
                                best_routes = new_routes
                                best_message = f"RELOCATE: Moved customer {customer} from route {r1_idx} to route {r2_idx} (Δ={delta:.2f})"
        
        if best_routes is not None:
            return True, best_routes, best_message, best_delta
        return False, routes, "", 0.0
    
    def try_swap(self, routes: List[Route], current_obj: float) -> Tuple[bool, List[Route], str, float]:
        """Swap operator: find BEST swap of two customers between different routes
        Returns: (feasible, best_routes, message, delta_obj)
        """
        if len(routes) < 2:
            return False, routes, "", 0.0
        
        best_routes = None
        best_delta = 0.0
        best_message = ""
        
        for r1_idx in range(len(routes)):
            for r2_idx in range(r1_idx + 1, len(routes)):
                route1, route2 = routes[r1_idx], routes[r2_idx]
                
                if len(route1.nodes) <= 2 or len(route2.nodes) <= 2:
                    continue
                
                for pos1 in range(1, len(route1.nodes) - 1):
                    for pos2 in range(1, len(route2.nodes) - 1):
                        cust1 = route1.nodes[pos1]
                        cust2 = route2.nodes[pos2]
                        
                        # Create new routes with swap
                        new_route1 = route1.copy()
                        new_route1.nodes[pos1] = cust2
                        self.recalculate_route_metrics(new_route1)
                        
                        new_route2 = route2.copy()
                        new_route2.nodes[pos2] = cust1
                        self.recalculate_route_metrics(new_route2)
                        
                        if self.is_route_feasible(new_route1) and self.is_route_feasible(new_route2):
                            new_routes = self.copy_solution(routes)
                            new_routes[r1_idx] = new_route1
                            new_routes[r2_idx] = new_route2
                            new_obj = evaluate_solution(self.inst, new_routes, self.dist, 
                                                       self.f, self.beta, self.hard_tw, self.alpha)
                            
                            delta = new_obj - current_obj
                            if delta > best_delta + 1e-6:
                                best_delta = delta
                                best_routes = new_routes
                                best_message = f"SWAP: Exchanged customer {cust1} (route {r1_idx}) with {cust2} (route {r2_idx}) (Δ={delta:.2f})"
        
        if best_routes is not None:
            return True, best_routes, best_message, best_delta
        return False, routes, "", 0.0
    
    def try_2opt(self, routes: List[Route], current_obj: float) -> Tuple[bool, List[Route], str, float]:
        """2-opt operator: find BEST segment reversal within a single route
        Returns: (feasible, best_routes, message, delta_obj)
        """
        best_routes = None
        best_delta = 0.0
        best_message = ""
        
        for r_idx in range(len(routes)):
            route = routes[r_idx]
            n = len(route.nodes)
            
            if n <= 3:  # Need at least 2 customers to reverse
                continue
            
            # Try all possible segment reversals
            for i in range(1, n - 1):
                for j in range(i + 1, n - 1):
                    # Reverse segment between i and j (inclusive)
                    new_route = route.copy()
                    new_route.nodes[i:j+1] = reversed(new_route.nodes[i:j+1])
                    self.recalculate_route_metrics(new_route)
                    
                    if self.is_route_feasible(new_route):
                        new_routes = self.copy_solution(routes)
                        new_routes[r_idx] = new_route
                        new_obj = evaluate_solution(self.inst, new_routes, self.dist, 
                                                   self.f, self.beta, self.hard_tw, self.alpha)
                        
                        delta = new_obj - current_obj
                        if delta > best_delta + 1e-6:
                            best_delta = delta
                            best_routes = new_routes
                            best_message = f"2OPT: Reversed segment [{i},{j}] in route {r_idx} (Δ={delta:.2f})"
        
        if best_routes is not None:
            return True, best_routes, best_message, best_delta
        return False, routes, "", 0.0
    
    # ==================== MAIN SEARCH ====================
    
    def search(self, initial_routes: List[Route], max_time: float = 120.0, 
               strategy: str = 'best', 
               operator_set: Tuple[str, ...] = ('insert', 'relocate', 'swap', '2opt'),
               shuffle_operators: bool = True,
               verbose: bool = True) -> Tuple[List[Route], SearchMetrics]:
        """
        Main local search.
        
        Args:
            initial_routes: The starting solution
            max_time: Time limit in seconds
            strategy: 'best' (Best-Improvement) or 'first' (First-Improvement)
            operator_set: Tuple of operator names (strings) to use.
            shuffle_operators: Whether to shuffle the operator list each iteration.
            verbose: Print logs
        """
        if strategy not in ['best', 'first']:
            raise ValueError("Strategy must be 'best' or 'first'")
            
        routes = self.copy_solution(initial_routes)
        start_time = time.time()
        
        # Log initial solution
        initial_obj = evaluate_solution(self.inst, routes, self.dist, self.f, self.beta, self.hard_tw, self.alpha)
        initial_customers = sum(len(r.nodes) - 2 for r in routes)
        self.metrics.log_iteration(initial_obj, 0.0, initial_customers)
        
        if verbose:
            print(f"{'='*70}")
            print(f"LOCAL SEARCH STARTED (STRATEGY: {strategy.upper()})")
            print(f"{'='*70}")
            print(f"Operators: {operator_set}")
            print(f"Shuffle: {shuffle_operators}")
            print(f"Initial objective: {initial_obj:.2f}")
            print(f"Max time: {max_time}s")
            print(f"{'='*70}\n")
        
        # Define all *possible* operators
        all_operators = {
            'insert': self.try_insert,
            'relocate': self.try_relocate,
            'swap': self.try_swap,
            '2opt': self.try_2opt
        }
        
        # --- Filter operators based on the hyperparameter ---
        # This is the list of (name, function) tuples we will actually use
        operators_to_use = []
        for op_name in operator_set:
            if op_name in all_operators:
                operators_to_use.append((op_name, all_operators[op_name]))
                # Initialize counts for operators we are actually using
                if op_name not in self.metrics.operator_counts:
                    self.metrics.operator_counts[op_name] = 0
                if op_name not in self.metrics.improvement_counts:
                    self.metrics.improvement_counts[op_name] = 0
            else:
                print(f"Warning: Unknown operator '{op_name}' ignored.")
        
        improvement_found = True
        iteration = 0
        
        while improvement_found:
            improvement_found = False
            elapsed = time.time() - start_time
            
            # Check time limit
            if elapsed >= max_time:
                if verbose:
                    print(f"\n{'='*70}\nTIME LIMIT REACHED ({max_time}s)\n{'='*70}")
                break
            
            current_obj = evaluate_solution(self.inst, routes, self.dist, 
                                            self.f, self.beta, self.hard_tw, self.alpha)
            
            # --- STRATEGY-DEPENDENT LOGIC ---
            
            best_new_routes = None
            best_delta = 0.0
            best_operator = None
            best_message = ""

            # --- Use shuffle hyperparameter ---
            if shuffle_operators:
                self.rng.shuffle(operators_to_use) 
            
            for op_name, op_func in operators_to_use: # Use the filtered list
                self.metrics.operator_counts[op_name] += 1
                
                if verbose and iteration % 5 == 0:
                    print(f"   Evaluating {op_name.upper()} neighborhood...", end='\r')
                
                improved, new_routes, message, delta = op_func(routes, current_obj)
                
                if improved:
                    if strategy == 'best':
                        # BI: Find the best move across ALL operators
                        if delta > best_delta + 1e-6:
                            best_delta = delta
                            best_new_routes = new_routes
                            best_operator = op_name
                            best_message = message
                    
                    elif strategy == 'first':
                        # FI: Take the first improving move we find
                        # (which is the *best* from this *one* neighborhood)
                        best_delta = delta
                        best_new_routes = new_routes
                        best_operator = op_name
                        best_message = message
                        improvement_found = True  # Mark improvement
                        break # <-- Key difference: break operator loop
                
                if time.time() - start_time >= max_time:
                    break # Break operator loop if time's up
            
            # --- END STRATEGY LOGIC ---

            # Apply best move
            # For 'best' strategy, this applies the best move found (if any)
            # For 'first' strategy, this applies the first improving move found
            if best_new_routes is not None:
                routes = best_new_routes
                self.metrics.improvement_counts[best_operator] += 1
                improvement_found = True
                iteration += 1
                
                # Log metrics
                new_obj = evaluate_solution(self.inst, routes, self.dist, 
                                            self.f, self.beta, self.hard_tw, self.alpha)
                current_customers = sum(len(r.nodes) - 2 for r in routes)
                elapsed = time.time() - start_time
                self.metrics.log_iteration(new_obj, elapsed, current_customers)
                
                if verbose:
                    print(f"Iter {iteration:4d} | Time: {elapsed:6.2f}s | Obj: {new_obj:8.2f} | "
                          f"Customers: {current_customers:3d} | {best_message}")

        # Final statistics
        final_obj = evaluate_solution(self.inst, routes, self.dist, self.f, self.beta, self.hard_tw, self.alpha)
        final_customers = sum(len(r.nodes) - 2 for r in routes)
        total_time = time.time() - start_time
        
        if verbose:
            print(f"\n{'='*70}\nLOCAL SEARCH COMPLETED\n{'='*70}")
            print(f"Total iterations: {iteration}")
            print(f"Total time: {total_time:.2f}s")
            print(f"Final objective: {final_obj:.2f}")
            if initial_obj != 0:
                print(f"Improvement: {final_obj - initial_obj:.2f} ({((final_obj/initial_obj - 1)*100):.1f}%)")
            else:
                print(f"Improvement: {final_obj - initial_obj:.2f}")
            print(f"Final customers served: {final_customers}/{self.inst.N}")
            print(f"\nOperator statistics:")
            
            for op_name in operator_set:
                imp_count = self.metrics.improvement_counts.get(op_name, 0)
                att_count = self.metrics.operator_counts.get(op_name, 0)
                print(f"  {op_name.upper():<8}: {imp_count:4d} improvements / {att_count:5d} evaluations")
            print(f"{'='*70}\n")
            
        return routes, self.metrics


# ==================== VISUALIZATION ====================

def plot_search_progress(metrics: SearchMetrics, instance_name: str = ""):
    """Create comprehensive visualization of search progress"""
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    fig.suptitle(f'Local Search Progress: {instance_name}', fontsize=14, fontweight='bold')
    
    # Plot 1: Objective value over time
    ax1 = axes[0, 0]
    ax1.plot(metrics.time_history, metrics.objective_history, 'b-', linewidth=2)
    ax1.set_xlabel('Time (seconds)', fontsize=10)
    ax1.set_ylabel('Objective Value', fontsize=10)
    ax1.set_title('Objective Value Over Time', fontsize=11)
    ax1.grid(True, alpha=0.3)
    
    # Add improvement annotation
    if len(metrics.objective_history) > 1:
        improvement = metrics.objective_history[-1] - metrics.objective_history[0]
        pct_improvement = (improvement / metrics.objective_history[0]) * 100 if metrics.objective_history[0] != 0 else 0
        ax1.text(0.02, 0.98, f'Improvement: {improvement:.1f} ({pct_improvement:.1f}%)', 
                transform=ax1.transAxes, fontsize=9, verticalalignment='top',
                bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
    
    # Plot 2: Objective value over iterations
    ax2 = axes[0, 1]
    ax2.plot(range(len(metrics.objective_history)), metrics.objective_history, 'g-', linewidth=2)
    ax2.set_xlabel('Iteration', fontsize=10)
    ax2.set_ylabel('Objective Value', fontsize=10)
    ax2.set_title('Objective Value Over Iterations', fontsize=11)
    ax2.grid(True, alpha=0.3)
    
    # Plot 3: Customers served over time
    ax3 = axes[1, 0]
    ax3.plot(metrics.time_history, metrics.customers_served_history, 'r-', linewidth=2)
    ax3.set_xlabel('Time (seconds)', fontsize=10)
    ax3.set_ylabel('Customers Served', fontsize=10)
    ax3.set_title('Customers Served Over Time', fontsize=11)
    ax3.grid(True, alpha=0.3)
    
    # Plot 4: Operator effectiveness
    ax4 = axes[1, 1]
    operators = list(metrics.improvement_counts.keys())
    improvements = [metrics.improvement_counts[op] for op in operators]
    attempts = [metrics.operator_counts[op] for op in operators]
    
    x = np.arange(len(operators))
    width = 0.35
    
    bars1 = ax4.bar(x - width/2, attempts, width, label='Attempts', alpha=0.7)
    bars2 = ax4.bar(x + width/2, improvements, width, label='Improvements', alpha=0.7)
    
    ax4.set_xlabel('Operator', fontsize=10)
    ax4.set_ylabel('Count', fontsize=10)
    ax4.set_title('Operator Statistics', fontsize=11)
    ax4.set_xticks(x)
    ax4.set_xticklabels([op.upper() for op in operators], fontsize=9)
    ax4.legend(fontsize=9)
    ax4.grid(True, alpha=0.3, axis='y')
    
    # Add success rate text
    for i, (op, att, imp) in enumerate(zip(operators, attempts, improvements)):
        success_rate = (imp / att * 100) if att > 0 else 0
        ax4.text(i, max(att, imp) * 1.05, f'{success_rate:.1f}%', 
                ha='center', fontsize=8)
    
    plt.tight_layout()
    return fig

In [35]:
import math
import random

# NEW CLASS: Simulated Annealing
# This class inherits from LocalSearch to reuse all its helper methods
# (recalculate_route_metrics, is_route_feasible, get_unvisited, etc.)

class SimulatedAnnealing(LocalSearch):
    
    def __init__(self, inst: TOPTWInstance, dist: List[List[float]], 
                 tmat: List[List[float]], hard_tw: bool = True, 
                 f: float = 0.0, beta: float = 0.0, alpha: float = 0.1, seed: int = 42):
        # Initialize the parent class (LocalSearch)
        # This sets up self.inst, self.dist, self.rng, self.metrics, etc.
        super().__init__(inst, dist, tmat, hard_tw, f, beta, alpha, seed)

    # ==================== RANDOM NEIGHBORHOOD OPERATORS ====================
    # These functions generate ONE random, feasible neighbor, unlike the 
    # LocalSearch operators which found the BEST neighbor.

    def _get_random_insert(self, routes: List[Route]) -> Optional[List[Route]]:
        """Try to find one random, feasible insertion move."""
        unvisited = self.get_unvisited(routes)
        if not unvisited:
            return None
        
        customer = self.rng.choice(list(unvisited))
        r_idx = self.rng.integers(0, len(routes))
        route = routes[r_idx]
        
        # Need at least [0, 0] to insert into
        if len(route.nodes) < 2:
            return None # Should not happen with [0,0] routes
            
        pos = self.rng.integers(1, len(route.nodes)) # e.g., for [0, 0], pos=1
        
        new_route = route.copy()
        new_route.nodes.insert(pos, customer)
        self.recalculate_route_metrics(new_route)
        
        if self.is_route_feasible(new_route):
            new_routes = self.copy_solution(routes)
            new_routes[r_idx] = new_route
            return new_routes
            
        return None # Failed this random attempt

    def _get_random_relocate(self, routes: List[Route]) -> Optional[List[Route]]:
        """Try to find one random, feasible relocate move."""
        # Find routes with at least one customer
        valid_r1_indices = [i for i, r in enumerate(routes) if len(r.nodes) > 2]
        if not valid_r1_indices:
            return None

        r1_idx = self.rng.choice(valid_r1_indices)
        route1 = routes[r1_idx]
        
        cust_pos = self.rng.integers(1, len(route1.nodes) - 1)
        customer = route1.nodes[cust_pos]
        
        r2_idx = self.rng.integers(0, len(routes))
        route2 = routes[r2_idx]
        
        insert_pos = self.rng.integers(1, len(route2.nodes)) # e.g. for [0,0] -> pos=1
        
        # Create new routes
        new_route1 = route1.copy()
        new_route1.nodes.pop(cust_pos)
        self.recalculate_route_metrics(new_route1)
        
        new_route2 = route2.copy()
        adj_insert_pos = insert_pos
        if r1_idx == r2_idx and cust_pos < insert_pos:
            adj_insert_pos -= 1
        
        new_route2.nodes.insert(adj_insert_pos, customer)
        self.recalculate_route_metrics(new_route2)
        
        if self.is_route_feasible(new_route1) and self.is_route_feasible(new_route2):
            new_routes = self.copy_solution(routes)
            new_routes[r1_idx] = new_route1
            new_routes[r2_idx] = new_route2
            return new_routes
            
        return None # Failed this random attempt

    def _get_random_swap(self, routes: List[Route]) -> Optional[List[Route]]:
        """Try to find one random, feasible swap move (inter-route only)."""
        valid_r_indices = [i for i, r in enumerate(routes) if len(r.nodes) > 2]
        if len(valid_r_indices) < 2:
            return None # Need two different routes with customers

        r1_idx, r2_idx = self.rng.choice(valid_r_indices, 2, replace=False)
        route1, route2 = routes[r1_idx], routes[r2_idx]
        
        pos1 = self.rng.integers(1, len(route1.nodes) - 1)
        pos2 = self.rng.integers(1, len(route2.nodes) - 1)
        cust1, cust2 = route1.nodes[pos1], route2.nodes[pos2]
        
        new_route1 = route1.copy()
        new_route1.nodes[pos1] = cust2
        self.recalculate_route_metrics(new_route1)
        
        new_route2 = route2.copy()
        new_route2.nodes[pos2] = cust1
        self.recalculate_route_metrics(new_route2)
        
        if self.is_route_feasible(new_route1) and self.is_route_feasible(new_route2):
            new_routes = self.copy_solution(routes)
            new_routes[r1_idx] = new_route1
            new_routes[r2_idx] = new_route2
            return new_routes
            
        return None # Failed this random attempt

    def _get_random_2opt(self, routes: List[Route]) -> Optional[List[Route]]:
        """Try to find one random, feasible 2-opt move (intra-route)."""
        # Need at least 2 customers (4 nodes) to perform 2-opt
        valid_r_indices = [i for i, r in enumerate(routes) if len(r.nodes) > 3]
        if not valid_r_indices:
            return None
            
        r_idx = self.rng.choice(valid_r_indices)
        route = routes[r_idx]
        n = len(route.nodes)
        
        # i is first break, j is second break.
        # i from 1 to n-3 (inclusive)
        # j from i+1 to n-2 (inclusive)
        # e.g. [0, 1, 2, 3, 0], n=5
        # i in [1, 2]. 
        #   if i=1, j in [2, 3]. 
        #     j=2: reverse [1, 2] -> [0, 2, 1, 3, 0]
        #     j=3: reverse [1, 2, 3] -> [0, 3, 2, 1, 0]
        #   if i=2, j in [3, 3].
        #     j=3: reverse [2, 3] -> [0, 1, 3, 2, 0]
        
        i = self.rng.integers(1, n - 2)
        j = self.rng.integers(i + 1, n - 1)

        new_route = route.copy()
        new_route.nodes[i:j+1] = reversed(new_route.nodes[i:j+1])
        self.recalculate_route_metrics(new_route)
        
        if self.is_route_feasible(new_route):
            new_routes = self.copy_solution(routes)
            new_routes[r_idx] = new_route
            return new_routes
            
        return None # Failed this random attempt

    def get_random_neighbor(self, routes: List[Route], 
                            operator_set: Tuple[str, ...]) -> Tuple[Optional[List[Route]], Optional[str]]:
        """
        Picks a random operator and tries to generate one random, feasible neighbor.
        Tries all operators in a random order until one succeeds.
        """
        all_operators = {
            'insert': self._get_random_insert,
            'relocate': self._get_random_relocate,
            'swap': self._get_random_swap,
            '2opt': self._get_random_2opt
        }
        
        # Use only the operators specified in the set
        ops_to_use = [op for op in operator_set if op in all_operators]
        self.rng.shuffle(ops_to_use) # Randomize operator order

        for op_name in ops_to_use:
            op_func = all_operators[op_name]
            new_routes = op_func(routes)
            
            if new_routes is not None:
                # Found a feasible neighbor
                return new_routes, op_name
                
        return None, None # No feasible neighbor found
    
    def tune_initial_temperature(self, 
                             initial_routes: List[Route], 
                             target_ratio: float = 0.8, 
                             n_samples: int = 1000) -> float:
        """
        Calculates a T_start to achieve the target_ratio for worsening moves.
        
        This is done by sampling N random moves from the initial solution,
        calculating the average objective change (delta) for all worsening moves,
        and then solving T_start = avg_delta / ln(target_ratio).
        """
        print(f"Tuning T_start for {target_ratio*100}% acceptance ratio...")
        worsening_deltas = []
        
        # Use the full operator set for a representative sample of moves
        full_op_set = ('insert', 'relocate', 'swap', '2opt')
        
        # Get the objective of the starting solution
        initial_obj = evaluate_solution(self.inst, initial_routes, self.dist, 
                                        self.f, self.beta, self.hard_tw, self.alpha)
        
        # Sample N random moves
        for _ in range(n_samples):
            # Get a random neighbor
            new_routes, op_name = self.get_random_neighbor(initial_routes, full_op_set)
            
            if new_routes is None:
                continue # Infeasible or invalid move
            
            # Evaluate the new neighbor
            new_obj = evaluate_solution(self.inst, new_routes, self.dist,
                                        self.f, self.beta, self.hard_tw, self.alpha)
            
            delta = new_obj - initial_obj
            
            # Store the delta if it's a worsening move
            if delta < 0:
                worsening_deltas.append(delta)
        
        # Check if we found any worsening moves
        if not worsening_deltas:
            print("  > WARNING: No worsening moves found in sample. Solution might be a local optimum.")
            print("  > Returning default T_start=100.0")
            return 100.0
            
        # Calculate the average delta for worsening moves
        avg_worse_delta = sum(worsening_deltas) / len(worsening_deltas)
        
        # Solve for T_start: T = delta / ln(ratio)
        if target_ratio <= 0 or target_ratio >= 1:
            raise ValueError("Target ratio must be between 0 and 1 (exclusive).")
            
        T_start = avg_worse_delta / math.log(target_ratio)
        
        print(f"  > Sampled {len(worsening_deltas)} worsening moves from {n_samples} attempts.")
        print(f"  > Average worsening delta: {avg_worse_delta:.2f}")
        print(f"  > Calculated T_start for {target_ratio*100}% acceptance: {T_start:.2f}")
        return T_start

    # ==================== MAIN SA SEARCH ====================
    
    def search(self, 
               initial_routes: List[Route], 
               max_time: float = 120.0,
               operator_set: Tuple[str, ...] = ('insert', 'relocate', 'swap', '2opt'),
               T_start: float = 100.0,
               T_end: float = 0.01,
               alpha_cooling: float = 0.99,
               max_iter_per_temp: int = 200,
               verbose: bool = True) -> Tuple[List[Route], SearchMetrics]:
        """
        Main Simulated Annealing search.
        """
        start_time = time.time()
        
        # --- 1. Initialization ---
        current_routes = self.copy_solution(initial_routes)
        current_obj = evaluate_solution(self.inst, current_routes, self.dist, 
                                        self.f, self.beta, self.hard_tw, self.alpha)
        
        best_routes = self.copy_solution(current_routes)
        best_obj = current_obj
        
        T = T_start
        iteration = 0
        
        # Log initial solution
        initial_customers = sum(len(r.nodes) - 2 for r in current_routes)
        self.metrics.log_iteration(current_obj, 0.0, initial_customers)
        
        if verbose:
            print(f"\n{'='*70}")
            print(f"SIMULATED ANNEALING STARTED")
            print(f"{'-'*70}")
            print(f"Operators: {operator_set}")
            print(f"Params: T_start={T_start}, T_end={T_end}, alpha={alpha_cooling}, iter/temp={max_iter_per_temp}")
            print(f"Initial objective: {initial_obj:.2f} | Max time: {max_time}s")
            print(f"{'='*70}\n")
        
        # --- 2. Main SA Loop ---
        while T > T_end:
            elapsed = time.time() - start_time
            if elapsed >= max_time:
                if verbose:
                    print(f"\n{'='*70}\nTIME LIMIT REACHED ({max_time}s)\n{'='*70}")
                break
            
            if verbose:
                print(f"--- Temp: {T:8.3f} | Iterations for this temp: {max_iter_per_temp} ---", end='\r')

            iter_at_temp = 0
            while iter_at_temp < max_iter_per_temp:
                iter_at_temp += 1
                iteration += 1
                
                # Check time limit inside inner loop
                if time.time() - start_time >= max_time:
                    break
                
                # --- 2a. Generate Neighbor ---
                new_routes, op_name = self.get_random_neighbor(current_routes, operator_set)
                
                if new_routes is None:
                    continue # No feasible neighbor found, try again
                
                self.metrics.operator_counts[op_name] += 1
                
                # --- 2b. Evaluate Neighbor ---
                new_obj = evaluate_solution(self.inst, new_routes, self.dist,
                                            self.f, self.beta, self.hard_tw, self.alpha)
                
                delta_obj = new_obj - current_obj
                
                # --- 2c. Acceptance Criterion ---
                accepted = False
                if delta_obj > 0:
                    # Always accept improvements
                    accepted = True
                    self.metrics.improvement_counts[op_name] += 1
                else:
                    # Accept worse solutions with probability
                    acceptance_prob = math.exp(delta_obj / T)
                    if self.rng.random() < acceptance_prob:
                        accepted = True
                
                # --- 2d. Update Solution ---
                if accepted:
                    current_routes = new_routes
                    current_obj = new_obj
                    
                    # Log this accepted move
                    elapsed = time.time() - start_time
                    customers = sum(len(r.nodes) - 2 for r in current_routes)
                    self.metrics.log_iteration(current_obj, elapsed, customers)
                    
                    # Update best-so-far
                    if current_obj > best_obj:
                        best_routes = self.copy_solution(current_routes)
                        best_obj = current_obj
                        if verbose:
                            print(f"Iter {iteration:5d} | Time: {elapsed:6.2f}s | NEW BEST: {best_obj:8.2f} | "
                                  f"Temp: {T:7.2f} | Move: {op_name.upper()}")
                
                if verbose and iteration % 200 == 0 and not (accepted and delta_obj > 0):
                    print(f"Iter {iteration:5d} | Time: {elapsed:6.2f}s | Obj: {current_obj:8.2f} | "
                          f"Best: {best_obj:8.2f} | Temp: {T:7.2f}")

            # --- 2e. Cool Temperature ---
            T *= alpha_cooling

        # --- 3. Final Statistics ---
        final_obj = best_obj
        final_customers = sum(len(r.nodes) - 2 for r in best_routes)
        total_time = time.time() - start_time
        
        if verbose:
            print(f"\n{'='*70}\nSIMULATED ANNEALING COMPLETED\n{'='*70}")
            print(f"Total iterations: {iteration}")
            print(f"Total time: {total_time:.2f}s")
            print(f"Final objective: {final_obj:.2f}")
            if initial_obj != 0:
                print(f"Improvement: {final_obj - initial_obj:.2f} ({((final_obj/initial_obj - 1)*100):.1f}%)")
            else:
                print(f"Improvement: {final_obj - initial_obj:.2f}")
            print(f"Final customers served: {final_customers}/{self.inst.N}")
            print(f"\nOperator statistics (Successful moves):")
            
            for op_name in operator_set:
                imp_count = self.metrics.improvement_counts.get(op_name, 0)
                att_count = self.metrics.operator_counts.get(op_name, 0)
                print(f"  {op_name.upper():<8}: {imp_count:4d} improvements / {att_count:5d} evaluations")
            print(f"{'='*70}\n")
            
        return best_routes, self.metrics

In [36]:
# ==================== DEMO USAGE ====================

# MODIFIED for Simulated Annealing

def run_sa_experiment(json_path: str, K_modifier: int = 0, seed: int = 42, 
                      max_time: float = 60.0, alpha: float = 0.1, 
                      operator_set: Tuple[str, ...] = ('insert', 'relocate', 'swap', '2opt'),
                      T_start: float = 100.0,
                      T_end: float = 0.01,
                      alpha_cooling: float = 0.99,
                      max_iter_per_temp: int = 200,
                      verbose: bool = False, plot: bool = False):
    """
    Complete experiment: load, construct, run SA, and RETURN results.
    """
    
    # Load instance
    inst = load_capacitated_instance(json_path)
    dist, tmat = build_distance_time_matrices(inst, rounding="auto")
    
    # Apply K_modifier
    K = inst.k + K_modifier
    if K <= 0:
        K = 1 # Need at least one vehicle
    
    # Parameters
    hard_tw = True
    f = 0.0
    beta = 0.0
    instance_name = os.path.basename(json_path)
    
    if verbose:
        print(f"\n{'='*70}")
        print(f"Running SA: {instance_name} | Alpha: {alpha} | Seed: {seed}")
        print(f"Operators: {operator_set} | K: {K}")
        print(f"{'='*70}")
    
    # Set seed
    np.random.seed(seed)
    random.seed(seed) # Also set python's random seed for SA
    
    # Construct initial solution
    start_construct = time.time()
    initial_routes = greedy_profit_density(inst, dist, tmat, K, hard_tw, f, beta)
    construct_time = time.time() - start_construct
    
    initial_obj = evaluate_solution(inst, initial_routes, dist, f, beta, hard_tw, alpha)
    
    # Run Simulated Annealing
    sa = SimulatedAnnealing(inst, dist, tmat, hard_tw, f, beta, alpha, seed)
    
    # Pass the new parameters to the search method
    final_routes, metrics = sa.search(
        initial_routes, 
        max_time=max_time, 
        operator_set=operator_set,
        T_start=T_start,
        T_end=T_end,
        alpha_cooling=alpha_cooling,
        max_iter_per_temp=max_iter_per_temp,
        verbose=verbose
    )
    
    # --- Collect Final Results ---
    final_obj = metrics.objective_history[-1] if metrics.objective_history else initial_obj
    final_profit = sum(r.profit for r in final_routes)
    final_distance = sum(r.cost for r in final_routes)
    final_customers = metrics.customers_served_history[-1] if metrics.customers_served_history else 0
    total_time = metrics.time_history[-1] if metrics.time_history else 0
    total_iterations = metrics.iteration # SA iteration count is different
    
    if plot:
        fig = plot_search_progress(metrics, f"{instance_name} (SA)")
        # In a notebook, plt.show() is needed to display
        plt.show()

    results = {
        'initial_obj': initial_obj,
        'final_obj': final_obj,
        'total_time': total_time,
        'iterations': total_iterations,
        'customers_served': final_customers,
        'final_profit': final_profit,
        'final_distance': final_distance,
        'construction_time': construct_time,
        # Add SA params for analysis
        'T_start': T_start,
        'alpha_cooling': alpha_cooling,
        'max_iter_per_temp': max_iter_per_temp,
    }
    
    return results

In [37]:
# === Example Usage for T_start Tuning ===

import glob
import os

BASE_DIR = os.getcwd()
BENCHMARK_FOLDER = os.path.join(BASE_DIR, "benchmarks_cap_json")

# Get the first json file found (or specify one)
try:
    instance_files = glob.glob(os.path.join(BENCHMARK_FOLDER, "*.json"))
    if not instance_files:
        print("ERROR: No .json files found in 'benchmarks_cap_json'.")
        print("Please create this folder and add instance files to run the example.")
    else:
        json_path = instance_files[0]
        print(f"--- Using instance: {os.path.basename(json_path)} ---")

        # 2. Load instance and build initial solution
        inst = load_capacitated_instance(json_path)
        dist, tmat = build_distance_time_matrices(inst, rounding="auto")
        # Use K=inst.k + 1 as in your experiment
        initial_routes = greedy_profit_density(inst, dist, tmat, K=inst.k + 1, hard_tw=True)
        
        # 3. Create SA object
        sa = SimulatedAnnealing(inst, dist, tmat, seed=42, alpha=0.1) # Use same alpha as experiment
        
        # 4. Run the new tuning function
        tuned_t_start = sa.tune_initial_temperature(initial_routes, target_ratio=0.8)
        
        print(f"\n--- Running SA with tuned T_start: {tuned_t_start:.2f} ---")
        
        # 5. Run the search using this tuned T_start
        final_routes, metrics = sa.search(
            initial_routes,
            max_time=10.0, # Short run for demo
            T_start=tuned_t_start,
            alpha_cooling=0.99, # Using a value from your test
            verbose=True
        )

except Exception as e:
    if 'instance_files' not in locals() or not instance_files:
        pass # Error already printed
    else:
        print(f"An error occurred during the example run: {e}")

--- Using instance: c108_cap.json ---
Tuning T_start for 80.0% acceptance ratio...
  > Sampled 326 worsening moves from 1000 attempts.
  > Average worsening delta: -2.88
  > Calculated T_start for 80.0% acceptance: 12.89

--- Running SA with tuned T_start: 12.89 ---

SIMULATED ANNEALING STARTED
----------------------------------------------------------------------
Operators: ('insert', 'relocate', 'swap', '2opt')
Params: T_start=12.890131380948423, T_end=0.01, alpha=0.99, iter/temp=200
Initial objective: 229.98 | Max time: 10.0s

Iter     1 | Time:   0.00s | NEW BEST:   553.60 | Temp:   12.89 | Move: SWAP
Iter     8 | Time:   0.00s | NEW BEST:   568.91 | Temp:   12.89 | Move: RELOCATE
Iter    11 | Time:   0.00s | NEW BEST:   578.91 | Temp:   12.89 | Move: RELOCATE
Iter    13 | Time:   0.00s | NEW BEST:   592.27 | Temp:   12.89 | Move: INSERT
Iter    15 | Time:   0.00s | NEW BEST:   605.29 | Temp:   12.89 | Move: INSERT
Iter    19 | Time:   0.00s | NEW BEST:   615.24 | Temp:   12.89 | M

In [39]:
import os
import glob
import time
import pandas as pd
from tqdm import tqdm
import itertools  # To help create all combinations

# 1. Path
BASE_DIR = os.getcwd()   # project root (when script is run from repo)
# --- IMPORTANT ---
# Create this folder and add your .json instance files
BENCHMARK_FOLDER = os.path.join(BASE_DIR, "benchmarks_cap_json")

# 2. Objective Function Params
ALPHAS_TO_TEST = [0.1]

# 3. Statistical Params
SEEDS_TO_TEST = [42]

# 4. Neighborhood Params
OPERATOR_SETS_TO_TEST = [
    ('insert', 'relocate', 'swap', '2opt'), 
    #('insert', 'relocate', '2opt'),         
    #('insert', 'relocate'),
]

# 5. Problem Params
K_MODIFIERS_TO_TEST = [3]

# 6. SA Hyperparameters
T_START_TO_TEST = [20.0, 50.0, 100.0]
ALPHA_COOLING_TO_TEST = [0.80, 0.90, 0.95, 0.99]
L_TO_TEST = [200, 400]
T_END = 0.01

# 7. Other settings
MAX_TIME_PER_RUN = 60.0  # Shorter time for more experiments
OUTPUT_CSV = "experiment_results_SA_detailed.csv"

# ==================================================================

def run_sa_experiments():
    print("Starting TOPTW Simulated Annealing Experiment...")
    
    # Check if benchmark folder exists
    if not os.path.isdir(BENCHMARK_FOLDER):
        print(f"Error: Benchmark folder not found: '{BENCHMARK_FOLDER}'")
        print("Please create this folder and add your .json instance files to it.")
        return

    instance_files = glob.glob(os.path.join(BENCHMARK_FOLDER, "*.json"))
    # select a random sample of 10 files
    random.seed(SEEDS_TO_TEST[0])
    instance_files = random.sample(instance_files, 10)
    if not instance_files:
        print(f"Error: No .json files found in '{BENCHMARK_FOLDER}'")
        print("Please add your .json instance files to this folder.")
        return

    print(f"Using {len(instance_files)} instances.")
    
    # --- Create all experiment combinations ---
    experiment_configs = list(itertools.product(
        ALPHAS_TO_TEST,
        SEEDS_TO_TEST,
        OPERATOR_SETS_TO_TEST,
        K_MODIFIERS_TO_TEST,
        T_START_TO_TEST,
        ALPHA_COOLING_TO_TEST,
        L_TO_TEST
    ))
    
    experiment_runs = []
    for instance_path in instance_files:
        for config in experiment_configs:
            alpha, seed, op_set, k_mod, t_start, alpha_cool, iter_temp = config
            experiment_runs.append({
                'path': instance_path,
                'alpha': alpha,
                'seed': seed,
                'op_set': op_set,
                'k_mod': k_mod,
                't_start': t_start,
                'alpha_cool': alpha_cool,
                'iter_temp': iter_temp
            })

    print(f"Total experiments to run: {len(experiment_runs)}")
    
    all_results = []
    start_total_time = time.time()
    
    for run in tqdm(experiment_runs, desc="Running SA Experiments"):
        try:
            # Pass all the new parameters to the worker function
            result_dict = run_sa_experiment(
                json_path=run['path'],
                K_modifier=run['k_mod'],
                seed=run['seed'],
                max_time=MAX_TIME_PER_RUN,
                alpha=run['alpha'],
                operator_set=run['op_set'],
                T_start=run['t_start'],
                T_end=T_END,
                alpha_cooling=run['alpha_cool'],
                max_iter_per_temp=run['iter_temp'],
                verbose=False,
                plot=False
            )
            
            # Add config back to results for the CSV
            result_dict.update(run)
            all_results.append(result_dict)
            
        except Exception as e:
            print(f"!!! ERROR running {os.path.basename(run['path'])} with {run['alpha']} !!!")
            print(f"Error: {e}\n")

    end_total_time = time.time()
    print(f"\n\n{'='*70}")
    print(f"All experiments completed in {(end_total_time - start_total_time) / 60:.2f} minutes.")
    
    if not all_results:
        print("No results to analyze.")
        return

    df = pd.DataFrame(all_results)
    
    # Save results
    df.to_csv(OUTPUT_CSV, index=False)
    print(f"Results saved to '{OUTPUT_CSV}'")
    
    # --- Updated Analysis ---
    
    print("\n--- Operator Set Comparison (Averaged) ---")
    df['op_set_str'] = df['op_set'].astype(str)
    op_set_summary = df.groupby('op_set_str')[['final_obj', 'total_time', 'iterations']].mean()
    print(op_set_summary.sort_values('final_obj', ascending=False))
    
    print("\n--- T_start Comparison (Averaged) ---")
    t_start_summary = df.groupby('T_start')[['final_obj', 'total_time', 'iterations']].mean()
    print(t_start_summary.sort_values('final_obj', ascending=False))
    
    print("\n--- Alpha (Cooling) Comparison (Averaged) ---")
    alpha_summary = df.groupby('alpha_cooling')[['final_obj', 'total_time', 'iterations']].mean()
    print(alpha_summary.sort_values('final_obj', ascending=False))

    print("\n--- Max Iterations per Temp Comparison (Averaged) ---")
    iter_temp_summary = df.groupby('iter_temp')[['final_obj', 'total_time', 'iterations']].mean()
    print(iter_temp_summary.sort_values('final_obj', ascending=False))
    
    print(f"\n{'='*70}")
    print(f"Analysis complete. Open '{OUTPUT_CSV}' for full details.")

# Run the new experiment script
run_sa_experiments()

Starting TOPTW Simulated Annealing Experiment...
Using 10 instances.
Total experiments to run: 240


Running SA Experiments: 100%|██████████| 240/240 [34:26<00:00,  8.61s/it] 



All experiments completed in 34.44 minutes.
Results saved to 'experiment_results_SA_detailed.csv'

--- Operator Set Comparison (Averaged) ---
                                         final_obj  total_time  iterations
op_set_str                                                                
('insert', 'relocate', 'swap', '2opt')  708.655792    8.566138  13814.1375

--- T_start Comparison (Averaged) ---
          final_obj  total_time  iterations
T_start                                    
100.0    716.724125    9.506038  15870.2250
20.0     709.951500    7.529412  11541.8875
50.0     699.291750    8.662964  14030.3000

--- Alpha (Cooling) Comparison (Averaged) ---
                final_obj  total_time    iterations
alpha_cooling                                      
0.99           711.317000   25.655092  40996.666667
0.90           709.998333    2.437251   4051.400000
0.95           709.168333    5.009311   8261.083333
0.80           704.139500    1.162898   1947.400000

--- Max Iter




In [44]:
import os
import glob
import pandas as pd
import numpy as np
import random
from tqdm import tqdm
import time

# --- 1. SET FIXED HYPERPARAMETERS ---
FIXED_SEED = 42

# Problem & Objective Params
ALPHA_OBJ_FUNCTION = 0.1  # The 'alpha' for evaluate_solution (profit vs. distance)

# SA Algorithm Params
MAX_TIME_PER_RUN = 60.0   # Time limit in seconds for each instance
OPERATOR_SET = ('insert', 'relocate', 'swap', '2opt')
T_START = 20.0           # Fixed Initial Temperature
T_END = 0.01
ALPHA_COOLING = 0.90
L = 200

# --- 2. SETUP ---
BASE_DIR = os.getcwd()
BENCHMARK_FOLDER = os.path.join(BASE_DIR, "benchmarks_cap_json")

# Fix seeds for reproducibility
np.random.seed(FIXED_SEED)
random.seed(FIXED_SEED)

instance_files = glob.glob(os.path.join(BENCHMARK_FOLDER, "*.json"))
# instance_files = random.sample(instance_files, 30)  # Sample 30 instances for demo

if not instance_files:
    print(f"Error: No .json files found in '{BENCHMARK_FOLDER}'")
    print("Please create this folder and add your .json instance files to run this cell.")
else:
    print(f"Found {len(instance_files)} instances. Running SA with fixed seed {FIXED_SEED}...")
    all_results = []

    # --- 3. RUN ON ALL BENCHMARKS ---
    for json_path in tqdm(instance_files, desc="Processing Benchmarks"):
        instance_name = os.path.basename(json_path)
        try:
            # 1. Load
            inst = load_capacitated_instance(json_path)
            dist, tmat = build_distance_time_matrices(inst, rounding="auto")
            K = inst.k
            if K <= 0: K = 1

            # 2. Initial Solution
            # Note: greedy_profit_density is deterministic
            initial_routes = greedy_profit_density(inst, dist, tmat, K, hard_tw=True)
            initial_obj = evaluate_solution(inst, initial_routes, dist, 
                                            f=0.0, beta=0.0, hard_tw=True, 
                                            alpha=ALPHA_OBJ_FUNCTION)

            # 3. Setup SA
            # We pass the FIXED_SEED to the SA object to make its RNG reproducible
            sa = SimulatedAnnealing(inst, dist, tmat, 
                                    f=0.0, beta=0.0, 
                                    alpha=ALPHA_OBJ_FUNCTION, 
                                    seed=FIXED_SEED)
            
            start_time = time.time()
            # 4. Run SA
            final_routes, metrics = sa.search(
                initial_routes, 
                max_time=MAX_TIME_PER_RUN,
                operator_set=OPERATOR_SET,
                T_start=T_START,
                T_end=T_END,
                alpha_cooling=ALPHA_COOLING,
                max_iter_per_temp=L,
                verbose=False  # Keep output clean
            )
            end_time = time.time()
            total_time = end_time - start_time

            # 5. Get results
            final_obj = metrics.objective_history[-1]
            improvement = final_obj - initial_obj
            
            # Handle division by zero or negative initial obj
            if initial_obj == 0.0:
                pct_improvement = float('inf') if improvement > 0 else 0.0
            elif initial_obj < 0.0:
                 # Be careful with percentages on negative numbers
                 # Here we'll just calculate based on the magnitude of change
                 pct_improvement = (improvement / abs(initial_obj)) * 100.0
            else:
                pct_improvement = (improvement / initial_obj) * 100.0
                
            all_results.append({
                "instance": instance_name,
                "initial_obj": initial_obj,
                "final_obj": final_obj,
                "improvement_pct": pct_improvement,
                "total_time": total_time,
            })
            
        except Exception as e:
            print(f"Error processing {instance_name}: {e}")

    # --- 4. DISPLAY RESULTS ---
    if all_results:
        df_results = pd.DataFrame(all_results)
        # add a row with the average values
        avg_row = {
            "instance": "AVERAGE",
            "initial_obj": df_results["initial_obj"].mean(),
            "final_obj": df_results["final_obj"].mean(),
            "improvement_pct": df_results["improvement_pct"].mean(),
            "total_time": df_results["total_time"].mean(),
        }
        all_results.append(avg_row)
        df_results = pd.DataFrame(all_results)
        # Format for readability
        pd.set_option('display.float_format', lambda x: '%.2f' % x)
        
        df_results = df_results.sort_values(by="improvement_pct", ascending=False)
        
        print(f"\n--- SA Run Complete (Seed: {FIXED_SEED}, Time: {MAX_TIME_PER_RUN}s) ---")
        print(df_results.to_string(index=False))

Found 58 instances. Running SA with fixed seed 42...


Processing Benchmarks: 100%|██████████| 58/58 [02:02<00:00,  2.11s/it]


--- SA Run Complete (Seed: 42, Time: 60.0s) ---
         instance  initial_obj  final_obj  improvement_pct  total_time
   rc101_cap.json       170.41     662.98           289.05        2.35
   rc103_cap.json       214.26     679.10           216.95        2.36
 50_r105_cap.json       140.10     392.52           180.17        2.05
50_rc102_cap.json       193.08     538.76           179.03        1.89
   rc107_cap.json       237.45     651.84           174.52        2.44
    r106_cap.json       269.37     730.28           171.11        2.48
50_rc103_cap.json       215.60     578.51           168.33        1.93
    r101_cap.json       178.39     471.92           164.54        2.40
   rc102_cap.json       204.63     497.63           143.19        2.52
    r102_cap.json       231.51     555.02           139.74        2.17
   rc105_cap.json       194.38     462.38           137.87        2.17
    r105_cap.json       229.98     533.78           132.10        2.24
50_rc106_cap.json       205.


