Copyright **`(c)`** 2025 Giosuè Pinto `<s342711@studenti.polito.it>`  
[`https://github.com/giosuepinto`](https://github.com/giosuepinto)   


In [166]:
from itertools import combinations, product
import numpy as np
import random
import time
from numba import jit

In [167]:
# --- Problem Configuration ---
# Choose your problem file here
problem_filename = 'lab2/test_problem.npy' # <--- CHANGE THIS FILE

# --- Algorithm Configuration ---
# WARNING: Your PC will perform AT MOST this many operations
# approx. 6 min of computation for me (ONLY on 1000 cities)
COMPUTATIONAL_BUDGET = 1e10  # Max estimated operations

# --- Load Problem ---

problem = np.load(problem_filename)
problem_length = problem.shape[0]
N = problem_length  # number of cities
print(f"Loaded problem '{problem_filename}' with N={N} cities.")

# Ensure problem matrix is float64 for Numba compatibility
problem = problem.astype(np.float64)

Loaded problem 'lab2/test_problem.npy' with N=20 cities.


In [168]:
# --- Problem Analysis ---
is_symmetric = np.allclose(problem, problem.T)
has_negatives = np.any(problem < 0)
diagonal_zeros = np.allclose(np.diag(problem), 0.0)

print(f"Problem Analysis:")
print(f"  - Symmetric: {is_symmetric}")
print(f"  - Has Negative Costs: {has_negatives}")
print(f"  - Diagonal All Zeros: {diagonal_zeros}")

# Check triangular inequality only for small symmetric problems
if is_symmetric and N <= 100:
    print("  - Checking Triangular Inequality (can be slow)...")
    has_triangular_inequality = all(
        problem[x, y] <= problem[x, z] + problem[z, y] + 1e-9 # Add tolerance
        for x, y, z in combinations(range(N), 3)
    )
    print(f"  - Obeys Triangular Inequality: {has_triangular_inequality}")
elif not is_symmetric:
     print(f"  - Triangular Inequality: Not applicable for asymmetric problems.")
else:
     print(f"  - Triangular Inequality: Check skipped for N={N} (too slow).")

# --- Strategy Parameters ---
# These parameters control the adaptive algorithm's behavior
GA_PURE_STALL_LIMIT = 200     # Stall limit for the GA-Pure phase (Hybrid strategy)
MEMETIC_STALL_LIMIT = 200     # Stall limit for Memetic strategies
TOURNAMENT_SIZE = 4
MUTATION_RATE = 0.5           # General mutation probability
PURE_MEMETIC_THRESHOLD = 100  # N <= threshold uses Pure Memetic if budget allows
MEMETIC_K_PER_GEN = 50        # k children optimized in Restricted Memetic
K_NEAREST = 5                 # For smart_probabilistic_init
EPSILON = 1e-10               # Small value for float comparisons and divisions

Problem Analysis:
  - Symmetric: True
  - Has Negative Costs: False
  - Diagonal All Zeros: True
  - Checking Triangular Inequality (can be slow)...
  - Obeys Triangular Inequality: True


In [None]:
# --- Initialization Functions ---

# SLOW O(N^2 log N) INIT: Used for Memetic strategies if affordable
def get_next_city_probabilistic(current_city, remaining_cities):
    candidates = []
    for city in remaining_cities:
        distance = problem[current_city, city]
        candidates.append((city, distance))
    candidates.sort(key=lambda x: x[1])
    top_k_candidates = candidates[:K_NEAREST]
    cities = [city for city, dist in top_k_candidates]
    weights = [1.0 / (dist + EPSILON) for city, dist in top_k_candidates]
    # Handle case where all top K have zero distance (or weights are invalid)
    if not any(w > 0 for w in weights):
         if cities: return random.choice(cities)
         else: return random.choice(list(remaining_cities)) # Fallback if no candidates
    chosen_city = random.choices(cities, weights=weights, k=1)[0]
    return chosen_city

# NOTE: This is O(N^3 log N) for the whole population. Only use for small N.
def smart_probabilistic_init():
    solution = np.zeros(problem_length, dtype=np.int64) # Use int64 for Numba
    remaining_cities = set(range(problem_length))
    initial_city = random.randint(0, problem_length - 1)
    solution[0] = initial_city
    remaining_cities.remove(initial_city)
    for i in range(1, problem_length):
        next_city = get_next_city_probabilistic(solution[i-1], remaining_cities)
        solution[i] = next_city
        remaining_cities.remove(next_city)
    return solution

# FAST O(N) INIT: Used for Hybrid GA-ILS strategy
def fast_random_init():
    return np.random.permutation(problem_length).astype(np.int64)

In [170]:
# --- Fitness Function (JIT-compiled) ---

@jit(nopython=True)
def calculate_fitness(solution, problem_matrix): # Pass problem matrix explicitly
    fitness = 0.0
    problem_len = solution.shape[0]
    for i in range(problem_len - 1):
        fitness += problem_matrix[solution[i], solution[i+1]]
    fitness += problem_matrix[solution[-1], solution[0]]
    return fitness

In [171]:
# --- Symmetric TSP Operators (Used if is_symmetric is True) ---

# Optimizes using systematic 2-Opt local descent
@jit(nopython=True)
def local_search_2opt(solution, problem_matrix):
    best_solution = np.copy(solution)
    problem_len = solution.shape[0] # Get N locally for Numba
    improved = True
    while improved:
        improved = False
        for i in range(problem_len - 1):
            for j in range(i + 2, problem_len):
                idx_i_plus_1 = i + 1
                idx_j_plus_1 = (j + 1) % problem_len
                A, B = best_solution[i], best_solution[idx_i_plus_1]
                C, D = best_solution[j], best_solution[idx_j_plus_1]
                current_cost = problem_matrix[A, B] + problem_matrix[C, D]
                new_cost = problem_matrix[A, C] + problem_matrix[B, D]
                if new_cost < current_cost - EPSILON:
                    sub_array = best_solution[idx_i_plus_1:j+1].copy()
                    best_solution[idx_i_plus_1:j+1] = sub_array[::-1]
                    improved = True
                    break
            if improved: break
    return best_solution

# Inversion Mutation (effectively a random 2-Opt)
@jit(nopython=True)
def inversion_mutation(solution):
    problem_len = solution.shape[0]
    idx1, idx2 = np.sort(np.random.randint(0, problem_len, 2))
    if idx1 == idx2: return solution
    sub_array = solution[idx1:idx2+1].copy()
    solution[idx1:idx2+1] = sub_array[::-1]
    return solution

# Perturbation for Symmetric TSP (uses multiple inversions)
@jit(nopython=True)
def perturbation_tsp(solution):
    sol = np.copy(solution)
    problem_len = solution.shape[0]
    num_mutations = max(2, problem_len // 20)
    for _ in range(num_mutations):
        sol = inversion_mutation(sol) # Call the specific symmetric mutation
    return sol

In [172]:
# --- Asymmetric TSP Operators (Used if is_symmetric is False) ---

# Optimizes using systematic Insertion local descent (1-insertion, best improvement)
@jit(nopython=True)
def local_search_insertion(solution, problem_matrix):
    best_solution = np.copy(solution)
    problem_len = solution.shape[0]
    improved = True

    while improved:
        improved = False
        best_delta = 0.0
        # Store best move: (index_to_move, value_to_move, index_to_insert_AFTER)
        best_move = (-1, -1, -1)

        for i in range(problem_len):
            city_to_move = best_solution[i]
            prev_i = best_solution[(i - 1 + problem_len) % problem_len]
            next_i = best_solution[(i + 1) % problem_len]
            cost_removed = (problem_matrix[prev_i, city_to_move] +
                            problem_matrix[city_to_move, next_i] -
                            problem_matrix[prev_i, next_i])

            for j in range(problem_len):
                 # Cannot insert after itself or its direct predecessor
                if i == j or (i - 1 + problem_len) % problem_len == j:
                    continue

                insert_after_city = best_solution[j]
                insert_before_city = best_solution[(j + 1) % problem_len]

                cost_added = (problem_matrix[insert_after_city, city_to_move] +
                              problem_matrix[city_to_move, insert_before_city] -
                              problem_matrix[insert_after_city, insert_before_city])

                delta = cost_added - cost_removed

                if delta < best_delta - EPSILON:
                    best_delta = delta
                    best_move = (i, city_to_move, j) # Store index i, value city_to_move, insert after index j

        if best_move[0] != -1:
            idx_to_move, value_to_move, idx_insert_after = best_move

            # --- Numba-compatible insertion/deletion ---
            # Create a new array to build the result
            new_solution = np.empty(problem_len, dtype=best_solution.dtype)
            
            write_idx = 0
            # Copy elements before the removed city
            for k in range(idx_to_move):
                new_solution[write_idx] = best_solution[k]
                write_idx += 1
            # Copy elements after the removed city
            for k in range(idx_to_move + 1, problem_len):
                new_solution[write_idx] = best_solution[k]
                write_idx += 1

            # Find the actual index *in the new_solution* where idx_insert_after is
            # This requires iterating through the partially built new_solution
            insert_pos_in_new = -1
            for k in range(problem_len -1): # new_solution is one element shorter logically here
                if new_solution[k] == best_solution[idx_insert_after]:
                    insert_pos_in_new = k + 1 # Insert *after* this position
                    break
            
            # Shift elements to make space for insertion
            for k in range(problem_len - 1, insert_pos_in_new, -1):
                new_solution[k] = new_solution[k-1]
                
            # Insert the city
            new_solution[insert_pos_in_new] = value_to_move
            # --- End Numba-compatible section ---

            best_solution = new_solution # Update best_solution
            improved = True # Continue searching from the new solution

    return best_solution

# Swap Mutation (swaps two random cities)
@jit(nopython=True)
def swap_mutation(solution):
    problem_len = solution.shape[0]
    idx1, idx2 = np.random.randint(0, problem_len, 2)
    if idx1 == idx2: return solution # No change if indices are the same
    
    # Swap elements directly
    solution[idx1], solution[idx2] = solution[idx2], solution[idx1]
    return solution

# Perturbation for Asymmetric TSP (uses multiple swaps)
@jit(nopython=True)
def perturbation_atsp(solution):
    sol = np.copy(solution)
    problem_len = solution.shape[0]
    num_mutations = max(2, problem_len // 20) # Apply multiple swaps
    for _ in range(num_mutations):
        sol = swap_mutation(sol) # Call the specific asymmetric mutation
    return sol

In [173]:
# --- Common GA Components ---

# Tournament Selection (Not JITted)
def tournament_selection(population, k=TOURNAMENT_SIZE):
    contenders_indices = np.random.randint(0, len(population), size=k)
    best_fitness = float('inf')
    best_contender = None
    for idx in contenders_indices:
        fitness, solution = population[idx]
        if fitness < best_fitness:
            best_fitness = fitness
            best_contender = solution
    return best_contender

# O(N) Order Crossover (OX1) - Numba-compatible version
@jit(nopython=True)
def order_crossover(parent1, parent2):
    problem_len = parent1.shape[0] # Get N locally
    child = np.full(problem_len, -1, dtype=np.int64)
    start, end = np.sort(np.random.randint(0, problem_len, 2))
    if start == end:
        end = (end + 1) % problem_len
        if end < start: start, end = end, start
    
    segment_len = end - start
    if segment_len < 0: segment_len += problem_len
    
    in_slice = np.full(problem_len, False)
    for i in range(segment_len):
        city = parent1[(start + i) % problem_len]
        child_idx = (start + i) % problem_len
        child[child_idx] = city
        in_slice[city] = True

    parent2_idx = 0
    child_idx = 0
    while child_idx < problem_len:
        if child[child_idx] != -1:
            child_idx += 1
            continue
        city_from_p2 = parent2[parent2_idx % problem_len] # Handle wrap around for p2 idx
        parent2_idx += 1
        if not in_slice[city_from_p2]:
            child[child_idx] = city_from_p2
            child_idx += 1
    return child

In [174]:
# --- Main Budgeted Hybrid Algorithm Function ---

def run_evolutionary_algorithm():
    
    # --- Parameter Scaling ---
    POPULATION_SIZE = int(N * 1.5)
    OFFSPRING_SIZE = int (POPULATION_SIZE * 1.7)
    
    print(f"--- Starting Budgeted Hybrid Algorithm ({'Symmetric' if is_symmetric else 'Asymmetric'}) ---")
    print(f"Problem Size N: {N}")
    print(f"Total Budget (ops): {COMPUTATIONAL_BUDGET:.0e}")
    print(f"μ: {POPULATION_SIZE}, λ: {OFFSPRING_SIZE}")
    
    # --- Budget Calculation ---
    # Adjust cost estimate based on symmetry (Insertion might be faster/slower than 2Opt)
    # We will use N^3 as a generic estimate for both for simplicity here.
    # Numba speedup factor (conservative estimate)
    numba_speedup_factor = 10 if N > 50 else 1 # Assume less impact for very small N
    cost_per_local_search = (N**3 / numba_speedup_factor) + 1
    budget_in_ls_calls = int(COMPUTATIONAL_BUDGET / cost_per_local_search)
    
    # --- Select Operators based on Symmetry ---
    if is_symmetric:
        local_search_func = local_search_2opt
        mutation_func = inversion_mutation
        perturbation_func = perturbation_tsp
        ls_name = "2-Opt"
    else:
        local_search_func = local_search_insertion
        mutation_func = swap_mutation # Use swap for ATSP mutation
        perturbation_func = perturbation_atsp
        ls_name = "Insertion"
        
    print(f"Using {ls_name} local search and corresponding mutation/perturbation.")

    # --- Strategy Selection ---
    strategy = "Hybrid GA-ILS"
    k_to_optimize = 0
    STALL_LIMIT = GA_PURE_STALL_LIMIT
    init_function = fast_random_init

    # Check if smart init can even be used (only if NO negative costs)
    can_use_smart_init = not has_negatives

    memetic_k_cost = (MEMETIC_K_PER_GEN * MEMETIC_STALL_LIMIT)
    if budget_in_ls_calls >= memetic_k_cost:
        strategy = f"Restricted Memetic (k={MEMETIC_K_PER_GEN})"
        k_to_optimize = MEMETIC_K_PER_GEN
        STALL_LIMIT = MEMETIC_STALL_LIMIT
        # Use smart init ONLY if allowed AND N is reasonably small
        init_function = smart_probabilistic_init if can_use_smart_init and N <= 150 else fast_random_init

    pure_memetic_cost = (OFFSPRING_SIZE * MEMETIC_STALL_LIMIT)
    if N <= PURE_MEMETIC_THRESHOLD and budget_in_ls_calls >= pure_memetic_cost:
        strategy = "Pure Memetic (k=all)"
        k_to_optimize = OFFSPRING_SIZE
        STALL_LIMIT = MEMETIC_STALL_LIMIT
        # Use smart init ONLY if allowed
        init_function = smart_probabilistic_init if can_use_smart_init else fast_random_init
    

    print(f"STRATEGY: {strategy}")
    print(f"Total {ls_name} Call Budget (JIT-adjusted): {budget_in_ls_calls}")
    print("-" * 30)

    # --- 1. Initialization ---
    start_time_global = time.time()
    
    # --- JIT WARM-UP ---
    print("Running Numba JIT compilation (first call)...")
    _ = calculate_fitness(fast_random_init(), problem) # Pass problem
    _ = order_crossover(fast_random_init(), fast_random_init())
    if is_symmetric:
        _ = local_search_2opt(fast_random_init(), problem)
        _ = perturbation_tsp(fast_random_init())
    else:
        # Warm up ATSP functions - may take longer if first time
        _ = local_search_insertion(fast_random_init(), problem)
        _ = perturbation_atsp(fast_random_init())
    print("JIT compilation complete.")
    
    population = []
    
    # Initial population is ALWAYS raw for Hybrid, potentially optimized for Memetic
    # Optimization during init is costly, only do it if strategy is Memetic AND N is small
    optimize_initial_pop = (strategy != "Hybrid GA-ILS") and (N <= 150)

    if optimize_initial_pop:
        print(f"Initializing (optimized {init_function.__name__}, cost {POPULATION_SIZE} calls)...")
        progress_step = max(1, POPULATION_SIZE // 10)
        for i in range(POPULATION_SIZE):
            # Pass problem matrix to JIT function
            sol = local_search_func(init_function(), problem)
            population.append((calculate_fitness(sol, problem), sol))
            if (i + 1) % progress_step == 0 and i < POPULATION_SIZE - 1:
                print(f"  ... initialized {i+1}/{POPULATION_SIZE}")
    else:
        print(f"Initializing ('raw' population, using {init_function.__name__})... (Fast)")
        for _ in range(POPULATION_SIZE):
            sol = init_function()
            population.append((calculate_fitness(sol, problem), sol))
            
    population.sort(key=lambda x: x[0])
    best_champion_overall = population[0]
        
    
    print(f"Initial fitness: {best_champion_overall[0]:.2f}")
    
    # --- 2. Evolutionary Loop ---
    print(f"Starting Evolution (Stall limit: {STALL_LIMIT} gens)...")
    start_time_evo = time.time()
    generation_count = 0
    stall_count = 0
    
    # Store optimized genomes using tuples (hashable)
    optimized_genomes = {tuple(best_champion_overall[1])}
    
    while stall_count < STALL_LIMIT:
        generation_count += 1
        offspring_population = []
        
        k_actual = min(k_to_optimize, OFFSPRING_SIZE)
        indices_to_optimize = random.sample(range(OFFSPRING_SIZE), k_actual)
        
        for i in range(OFFSPRING_SIZE):
            p1_solution = tournament_selection(population)
            p2_solution = tournament_selection(population)
            
            child = order_crossover(p1_solution, p2_solution)
            
            if random.random() < MUTATION_RATE:
                # Call the correct mutation function based on symmetry
                child = mutation_func(child)
            
            if i in indices_to_optimize: # Only true for Memetic modes
                child = local_search_func(child, problem)
            
            offspring_population.append((calculate_fitness(child, problem), child))
            
        # --- Champion Selection ---
        champion = population[0]
        
        # In Hybrid mode, champion remains raw until end of Phase 1
        # In Memetic mode, we optimize the champion if it's new
        if strategy != "Hybrid GA-ILS":
            champion_hash = tuple(champion[1])
            if champion_hash not in optimized_genomes:
                champion_sol_opt = local_search_func(champion[1], problem)
                champion = (calculate_fitness(champion_sol_opt, problem), champion_sol_opt)
                optimized_genomes.add(tuple(champion[1]))
        
        contender_pool = offspring_population + [champion]
        contender_pool.sort(key=lambda x: x[0])
        population = contender_pool[:POPULATION_SIZE]
        
        # --- Logging and Stall Check ---
        if population[0][0] < best_champion_overall[0]:
            best_champion_overall = population[0]
            stall_count = 0
            # Print new champions more often if in Memetic mode
            print_freq = 1 if strategy != "Hybrid GA-ILS" else 10 # More verbose for Memetic
            if generation_count % print_freq == 0:
                 print(f"Gen {generation_count}: New champion! Fitness: {best_champion_overall[0]:.2f}")
        else:
            stall_count += 1
            
        if generation_count % 50 == 0: # Reduced log frequency
             print(f"Gen {generation_count}: Best: {best_champion_overall[0]:.2f} (Stall: {stall_count}/{STALL_LIMIT})")

    evo_time = time.time() - start_time_evo
    print(f"Evolution Phase finished after {generation_count} generations ({evo_time:.2f}s).")
    print(f"Best champion fitness (pre-ILS): {best_champion_overall[0]:.2f}")

    # --- 3. Finalization & Post-Processing ---
    final_solution = best_champion_overall[1]
    final_fitness = best_champion_overall[0]
    
    # Run ILS phase only if we were in the Hybrid strategy
    if strategy == "Hybrid GA-ILS":
        print("-" * 30)
        print("PHASE 2: Starting ILS (Iterated Local Search)...")
        
        # The champion from Phase 1 might be raw, optimize it first
        current_solution = local_search_func(final_solution, problem)
        best_fitness = calculate_fitness(current_solution, problem)
        print(f"Fitness after 1st optimization: {best_fitness:.2f}")
        
        # Budget for ILS 
        ils_calls_remaining = max(0, budget_in_ls_calls)
        print(f"Budget for ILS: {ils_calls_remaining} {ls_name} calls.")
        
        progress_step = max(1, ils_calls_remaining // 20)
        
        start_ils_time = time.time()
        for i in range(ils_calls_remaining):
            # Call the correct perturbation function
            perturbed_solution = perturbation_func(current_solution)
            optimized_solution = local_search_func(perturbed_solution, problem)
            optimized_fitness = calculate_fitness(optimized_solution, problem)
            
            if optimized_fitness < best_fitness:
                current_solution = optimized_solution
                best_fitness = optimized_fitness
                print(f"ILS (Iter {i+1}/{ils_calls_remaining}): New champion! Fitness: {best_fitness:.2f}")
            else:
                 # Update current solution even if no improvement (common ILS strategy)
                 current_solution = optimized_solution

            if (i + 1) % progress_step == 0:
                print(f"  ILS Iter {i+1}/{ils_calls_remaining}... (Best: {best_fitness:.2f})")
        
        ils_time = time.time() - start_ils_time
        final_solution = current_solution
        final_fitness = best_fitness
        print(f"PHASE 2 (ILS) finished ({ils_time:.2f}s).")
        
    else:
        # We were in a Memetic mode, final check is good practice
        print("Running final optimization check on champion...")
        final_check_sol = local_search_func(final_solution, problem)
        final_check_fit = calculate_fitness(final_check_sol, problem)
        if final_check_fit < final_fitness:
            final_fitness = final_check_fit
            final_solution = final_check_sol
            print(f"Final check found improvement: {final_fitness:.2f}")
        else:
            print("Final check: Solution was already locally optimal.")

    # --- Final Result ---
    end_time_global = time.time()
    print("-" * 30)
    print("Algorithm finished.")
    print(f"Total time: {end_time_global - start_time_global:.2f} seconds")
    print(f"Final (optimized) fitness: {final_fitness}")
    # Convert solution to list for easier reading
    print(f"Solution: {list(final_solution)}")
    
    return final_solution, final_fitness

In [175]:
# Run the main algorithm function
run_evolutionary_algorithm()

--- Starting Budgeted Hybrid Algorithm (Symmetric) ---
Problem Size N: 20
Total Budget (ops): 1e+10
μ: 30, λ: 51
Using 2-Opt local search and corresponding mutation/perturbation.
STRATEGY: Pure Memetic (k=all)
Total 2-Opt Call Budget (JIT-adjusted): 1249843
------------------------------
Running Numba JIT compilation (first call)...
JIT compilation complete.
Initializing (optimized smart_probabilistic_init, cost 30 calls)...
  ... initialized 3/30
  ... initialized 6/30
  ... initialized 9/30
  ... initialized 12/30
  ... initialized 15/30
  ... initialized 18/30
  ... initialized 21/30
  ... initialized 24/30
  ... initialized 27/30
Initial fitness: 2823.79
Starting Evolution (Stall limit: 200 gens)...
Gen 50: Best: 2823.79 (Stall: 50/200)
Gen 100: Best: 2823.79 (Stall: 100/200)
Gen 150: Best: 2823.79 (Stall: 150/200)
Gen 200: Best: 2823.79 (Stall: 200/200)
Evolution Phase finished after 200 generations (0.15s).
Best champion fitness (pre-ILS): 2823.79
Running final optimization check

(array([ 2,  4,  9, 12, 15,  8, 14, 10, 13, 11, 16,  1,  3,  5, 18, 19,  6,
        17,  7,  0]),
 2823.7899999999995)

In [176]:
# BEST RESULTS

# Type G problems (Symmetric):
# test_problem.npy -> 2823.79
# problem_g_10.npy -> 1497.66
# problem_g_20.npy -> 1755.51
# problem_g_50.npy -> 2629.99
# problem_g_100.npy -> 3952.51
# problem_g_200.npy -> 5421.97
# problem_g_500.npy -> 8354.22
# problem_g_1000.npy -> 12172.21

# Type R1 problems (Asymmetric):
# problem_r1_10.npy -> 184.27
# problem_r1_20.npy -> 337.29
# problem_r1_50.npy -> 539.67
# problem_r1_100.npy -> 689.44
# problem_r1_200.npy -> 1033.63
# problem_r1_500.npy -> 3432.31
# problem_r1_1000.npy -> 7114.05

# Type R2 problems (Asymmetric, Negative Costs):
# problem_r2_10.npy -> -411.70
# problem_r2_20.npy -> -855.42
# problem_r2_50.npy -> -2295.51
# problem_r2_100.npy -> -4745.97
# problem_r2_200.npy -> -9558.61
# problem_r2_500.npy -> -23017.70
# problem_r2_1000.npy -> -46442.27