In [362]:
import numpy as np
from collections import namedtuple
import random
from numba import njit

## Simple Test Problem

In [363]:
Individual = namedtuple('Individuals', ['solution', 'cost'])

In [364]:
test_problem = np.load('lab2/problem_r2_1000.npy')
CITIES = np.array(range(test_problem.shape[0]))
POPULATION_SIZE = CITIES.shape[0] 
OFFSPRING_SIZE = int(0.5 * POPULATION_SIZE)
MUTATION_RATE = 0.5
MAX_ITERATIONS = CITIES.shape[0] * 10
MAX_ITERATIONS_WITHOUT_IMPROVEMENT = MAX_ITERATIONS // 10

In [365]:
def check(problem):
    if np.any(problem < 0): # Negative values?
        return False
    if not np.allclose(problem, problem.T): # Symmetric matrix?
        return False
    if not np.allclose(np.diag(problem), 0.0): # Diagonal is all zero?
        return False
    return True

In [366]:
@njit(nopython=True)
def cost(solution):
    total_cost = 0
    for i in range(len(solution)):
        city_from = solution[i]
        city_to = solution[(i + 1) % len(solution)]
        total_cost += test_problem[city_from][city_to]
    return total_cost



In [367]:
def create_random_solution(num_cities):
    solution = list(range(num_cities))
    np.random.shuffle(solution)
    return solution

@njit(nopython=True)
def create_nearest_neighbour_solution(start_city):
    num_cities = len(CITIES)
    tour = [start_city]
    visited = np.zeros(num_cities, dtype=np.bool_)
    visited[start_city] = True
    
    current_city = start_city
    
    while len(tour) < num_cities:
        best_dist = float('inf')
        best_city = -1
        
        for next_city in range(num_cities):
            if not visited[next_city]:
                dist = test_problem[current_city][next_city]
                if dist < best_dist:
                    best_dist = dist
                    best_city = next_city
        
        current_city = best_city
        tour.append(current_city)
        visited[current_city] = True
        
    return tour

def initialize_population():
    population = []
    num_cities = len(CITIES)

    for i in range(num_cities):
        if len(population) < POPULATION_SIZE:
            nn_solution = create_nearest_neighbour_solution(start_city=i)
            population.append(Individual(nn_solution, 0))

    remaining_size = POPULATION_SIZE - len(population)
    for _ in range(remaining_size):
        random_sol = create_random_solution(num_cities)
        population.append(Individual(random_sol, 0))

    population = [Individual(ind.solution, cost(ind.solution)) for ind in population]

    return population


In [368]:
def inverse_mutation_delta(individual):
    solution = individual.solution
    old_cost = individual.cost
    size = len(solution)

    a, b = np.random.choice(size, size=2, replace=False)
    if a > b:
        a, b = b, a
    if a == 0 and b == size - 1:
        return individual

    a_prev_city = solution[a - 1]
    city_a = solution[a]
    city_b = solution[b]
    b_next_city = solution[(b + 1) % size]

    old_edges_cost = test_problem[a_prev_city][city_a] + test_problem[city_b][b_next_city]
    new_edges_cost = test_problem[a_prev_city][city_b] + test_problem[city_a][b_next_city]
    
    cost_delta = new_edges_cost - old_edges_cost
    new_cost = old_cost + cost_delta

    new_solution = solution.copy()
    new_solution[a:b+1] = new_solution[a:b+1][::-1]
    return Individual(new_solution, new_cost)

def inverse_mutation_delta_asymmetric(individual):
    solution = individual.solution
    old_cost = individual.cost
    size = len(solution)

    a, b = np.random.choice(size, size=2, replace=False)
    if a > b:
        a, b = b, a
        
    if a == 0 and b == size - 1:
        new_solution = solution[::-1]
        return Individual(new_solution, cost(new_solution))

    a_prev_city = solution[a - 1]
    city_a = solution[a]
    city_b = solution[b]
    b_next_city = solution[(b + 1) % size]

    old_outer_cost = test_problem[a_prev_city][city_a] + test_problem[city_b][b_next_city]
    new_outer_cost = test_problem[a_prev_city][city_b] + test_problem[city_a][b_next_city]
    
    outer_delta = new_outer_cost - old_outer_cost
    
    old_inner_cost = 0
    for i in range(a, b): 
        old_inner_cost += test_problem[solution[i]][solution[i+1]]
    new_inner_cost = 0
    for i in range(b, a, -1): 
        new_inner_cost += test_problem[solution[i]][solution[i-1]]
        
    inner_delta = new_inner_cost - old_inner_cost
    
    new_cost = old_cost + outer_delta + inner_delta
    
    new_solution = solution.copy()
    new_solution[a:b+1] = new_solution[a:b+1][::-1]
    return Individual(new_solution, new_cost)

def insertion_mutation_delta(individual):
    solution = individual.solution
    old_cost = individual.cost
    size = len(solution)

    a, b = np.random.choice(size, size=2, replace=False)
    city_a = solution[a]
    a_prev = solution[a - 1]
    a_next = solution[(a + 1) % size]
    
    city_b = solution[b]
    b_next = solution[(b + 1) % size]

    if a == b or a == (b + 1) % size:
        return individual 
    
    
    old_edges = (
        test_problem[a_prev][city_a] +  
        test_problem[city_a][a_next] +  
        test_problem[city_b][b_next]    
    )
    
    new_edges = (
        test_problem[a_prev][a_next] +   
        test_problem[city_b][city_a] +    
        test_problem[city_a][b_next]       
    )
    
    cost_delta = new_edges - old_edges
    new_cost = old_cost + cost_delta

    new_solution = solution.copy()
    
    city_to_move = new_solution.pop(a)
    
    b_index_new = new_solution.index(city_b)
    
    new_solution.insert(b_index_new + 1, city_to_move)
    return Individual(new_solution, new_cost)

def swap_mutation_delta(individual, is_real_problem=True):
    solution = individual.solution
    old_cost = individual.cost
    size = len(solution)

    a, b = np.random.choice(size, size=2, replace=False)

    if a > b:
        a, b = b, a

    if a == 0 and b == size - 1:
        return individual

    city_a = solution[a]
    prev_a = solution[a - 1]
    next_a = solution[(a + 1) % size]

    city_b = solution[b]
    prev_b = solution[b - 1]
    next_b = solution[(b + 1) % size]

    old_edges_cost = 0
    new_edges_cost = 0

    if b == a + 1:
        old_edges_cost = test_problem[prev_a][city_a] + test_problem[city_b][next_b]
        new_edges_cost = test_problem[prev_a][city_b] + test_problem[city_a][next_b]
        
        old_edges_cost += test_problem[city_a][city_b]
        new_edges_cost += test_problem[city_b][city_a]
    else:
        old_edges_cost = (test_problem[prev_a][city_a] + test_problem[city_a][next_a] +
                          test_problem[prev_b][city_b] + test_problem[city_b][next_b])
        new_edges_cost = (test_problem[prev_a][city_b] + test_problem[city_b][next_a] +
                          test_problem[prev_b][city_a] + test_problem[city_a][next_b])
    
    if not is_real_problem and not (b == a + 1):
         pass 
    
    cost_delta = new_edges_cost - old_edges_cost
    new_cost = old_cost + cost_delta

    new_solution = solution.copy()
    new_solution[a], new_solution[b] = new_solution[b], new_solution[a]
    
    return Individual(new_solution, new_cost)



In [369]:
def crossover_ox(parent1, parent2): 
    size = len(parent1)
    child = [-1] * size
    
    child_cities_set = set()
    
    start, end = sorted(np.random.choice(range(size), size=2, replace=False))
    
    child[start:end] = parent1[start:end]
    
    for i in range(start, end):
        child_cities_set.add(parent1[i])
    
    current_pos = end
    for city in parent2:
        if city not in child_cities_set:
            if current_pos >= size:
                current_pos = 0
            child[current_pos] = city
            child_cities_set.add(city) 
            current_pos += 1
            
    return Individual(child, cost(child))

def crossover_pmx(parent1, parent2):
    size = len(parent1)
    child = [-1] * size
    
    start, end = sorted(np.random.choice(range(size), size=2, replace=False))
    
    mapping = {}
    for i in range(start, end + 1):
        child[i] = parent1[i]
        mapping[parent1[i]] = parent2[i] 
        
    for i in list(range(start)) + list(range(end + 1, size)):
        city = parent2[i]
        
        while city in mapping:
            city = mapping[city]
            
        child[i] = city
        
    return Individual(child, cost(child))


In [370]:
# Numba
@njit
def _local_search_core(solution, initial_cost, test_problem):
    size = len(solution)
    current_cost = initial_cost
    
    improved = True
    while improved:
        improved = False
        
        for a in range(size - 1):
            for b in range(a + 1, size):
                
                if a == 0 and b == size - 1:
                    continue 

                a_prev_city = solution[a - 1]
                city_a = solution[a]
                city_b = solution[b]
                b_next_city = solution[(b + 1) % size]

                old_edges_cost = test_problem[a_prev_city][city_a] + test_problem[city_b][b_next_city]
                new_edges_cost = test_problem[a_prev_city][city_b] + test_problem[city_a][b_next_city]
                
                cost_delta = new_edges_cost - old_edges_cost         

                if cost_delta < -1e-9: 
                    current_cost += cost_delta
                    
                    
                    solution_segment = solution[a:b+1].copy()
                    solution[a:b+1] = solution_segment[::-1]
                    
                    improved = True
                    break
            if improved:
                break
                
    return solution, current_cost 

# "WRAPPER" - Python 
def local_search_systematic(individual):

    solution_copy = individual.solution.copy()
    initial_cost = individual.cost
    
    new_solution, new_cost = _local_search_core(
        solution_copy, 
        initial_cost, 
        test_problem
    )
    
    return Individual(new_solution, new_cost)

# Numba - Asymmetric
@njit
def _local_search_insertion_core(solution, initial_cost, test_problem):

    size = len(solution)
    current_cost = initial_cost
    current_solution = solution.copy() 

    improved = True
    while improved:
        improved = False
        best_delta = 1e-9 
        best_move = (-1, -1) 

        for a in range(size):
            city_a = current_solution[a]
            a_prev = current_solution[a - 1]
            a_next = current_solution[(a + 1) % size]
            
            cost_removed = test_problem[a_prev][city_a] + test_problem[city_a][a_next] - test_problem[a_prev][a_next]

            for b in range(size):
                if b == a or b == (a - 1 + size) % size:
                    continue
                
                city_b = current_solution[b]
                b_next = current_solution[(b + 1) % size]
                
                cost_added = test_problem[city_b][city_a] + test_problem[city_a][b_next] - test_problem[city_b][b_next]
                
                cost_delta = cost_added - cost_removed
                
                if cost_delta < best_delta:
                    best_delta = cost_delta
                    best_move = (a, b) 

        if best_move[0] != -1:
            a_idx, b_idx = best_move
            city_to_move = current_solution[a_idx]
            
            temp_sol = current_solution.copy()
            
            for k in range(a_idx, size - 1):
                temp_sol[k] = current_solution[k+1]

            new_b_idx = -1
            for k in range(size - 1): 
                if temp_sol[k] == current_solution[b_idx]:
                    new_b_idx = k
                    break
            
            insert_pos = new_b_idx + 1
            
            for k in range(size - 1, insert_pos, -1):
                temp_sol[k] = temp_sol[k-1]
            
            temp_sol[insert_pos] = city_to_move
            
            current_solution = temp_sol
            current_cost += best_delta
            improved = True
            
    return current_solution, current_cost

# WRAPPER Python - asymmetric
def local_search_systematic_insertion(individual):

    solution_copy = individual.solution.copy()
    initial_cost = individual.cost

    new_solution, new_cost = _local_search_insertion_core(
        solution_copy, 
        initial_cost, 
        test_problem
    )
    
    return Individual(new_solution, new_cost)

In [371]:
def tournament_selection(population, tournament_size=3):
    tournament = random.sample(population, k=tournament_size)
    tournament_sorted = sorted(tournament, key=lambda ind: ind.cost)
    return tournament_sorted[0]


In [None]:
def easy_algorithm():
    best_individual = None
    iteration_without_improvement = 0
    num_restarts = 0
    epsilon = 1e-9
    is_real_problem = check(test_problem)
    print("Real problem:", is_real_problem)

    for iteration in range(MAX_ITERATIONS):
        if iteration == 0:
            population = initialize_population()
            population.sort(key=lambda ind: ind.cost)
            best_individual = population[0]
            print(f"Best Cost = {best_individual.cost:.3f}, Solution = {best_individual.solution}")

        # Generate offspring
        offspring = []
        for _ in range(OFFSPRING_SIZE):
            parent1 = tournament_selection(population)

            if np.random.rand() < MUTATION_RATE:
                parent2 = tournament_selection(population)

                if np.random.rand() < 0.5:
                    child = crossover_ox(parent1.solution, parent2.solution)
                else:
                    child = crossover_pmx(parent1.solution, parent2.solution) 
            else:
                child = parent1
                
            if np.random.rand() < 0.33:
                if is_real_problem:
                    child = inverse_mutation_delta(child)
                else:
                    child = inverse_mutation_delta_asymmetric(child)
            elif np.random.rand() < 0.66:
                child = swap_mutation_delta(child)
            else:
                child = insertion_mutation_delta(child)

            if np.random.rand() < 0.1:
                if is_real_problem:
                    child = local_search_systematic(child)
                else:
                    child = local_search_systematic_insertion(child)
                    
            offspring.append(child)
        
        # Combine and select the next generation
        combined_population = population + offspring
        combined_population.sort(key=lambda ind: ind.cost)
        population = combined_population[:POPULATION_SIZE]
        
        # Print best solution 
        if best_individual is None or population[0].cost < (best_individual.cost-epsilon):
            best_individual = population[0]
            print(f"Iteration {iteration + 1}: Best Cost = {best_individual.cost:.3f}, Solution = {best_individual.solution}")
            iteration_without_improvement = 0
        else:
            iteration_without_improvement += 1

        if iteration_without_improvement >= MAX_ITERATIONS_WITHOUT_IMPROVEMENT:
            num_restarts += 1
            print(f"No improvement, restarting population.")
            print(f"Best Cost = {best_individual.cost:.3f}, Solution = {best_individual.solution}")
            best_to_keep = best_individual

            population = initialize_population()

            population[0] = best_to_keep
            iteration_without_improvement = 0
        
        if num_restarts >= 3:
            print("Maximum number of restarts reached. Stopping.")
            print(f"Best Cost = {best_individual.cost:.3f}, Solution = {best_individual.solution}")
            break
        

## Common tests

In [None]:
easy_algorithm()

Real problem: False
Iteration 1
Best Cost = -49477.872, Solution = [697, 522, 442, 287, 794, 977, 112, 420, 754, 839, 857, 397, 344, 100, 64, 973, 4, 953, 32, 168, 971, 591, 53, 669, 166, 747, 940, 186, 604, 840, 531, 934, 172, 619, 159, 984, 547, 340, 46, 951, 463, 651, 388, 339, 200, 709, 514, 245, 276, 752, 682, 926, 119, 812, 329, 602, 979, 917, 739, 97, 770, 162, 725, 981, 935, 549, 193, 275, 109, 704, 149, 538, 472, 479, 174, 900, 226, 185, 196, 424, 767, 724, 202, 278, 157, 99, 201, 580, 399, 824, 633, 553, 15, 18, 77, 391, 690, 645, 439, 831, 356, 308, 147, 447, 749, 854, 902, 516, 62, 908, 486, 579, 562, 872, 274, 434, 687, 660, 180, 435, 233, 417, 587, 3, 365, 314, 810, 576, 369, 628, 615, 898, 657, 176, 163, 423, 332, 57, 991, 932, 203, 811, 748, 844, 95, 990, 736, 985, 8, 777, 481, 617, 597, 484, 877, 320, 65, 910, 813, 216, 360, 393, 826, 177, 370, 187, 916, 309, 289, 138, 215, 915, 783, 818, 91, 632, 764, 377, 263, 236, 404, 242, 667, 16, 380, 659, 10, 239, 758, 252, 939,