# Andrea Mirenda LAB 2 CI

In [1]:
import pandas as pd
from dataclasses import dataclass
from collections import Counter
import random
import math
import numpy as np
from icecream import ic
from matplotlib import pyplot as plt
from itertools import accumulate
from itertools import combinations
from tqdm.auto import tqdm
import geopy.distance
from geopy.distance import geodesic
import networkx as nx
from collections import deque

  from .autonotebook import tqdm as notebook_tqdm


# Dataset import

In [2]:
CITIES = pd.read_csv('cities/italy.csv', header=None, names=['name', 'lat', 'lon'])
CITIES

Unnamed: 0,name,lat,lon
0,Ancona,43.6,13.5
1,Andria,41.23,16.29
2,Bari,41.12,16.87
3,Bergamo,45.7,9.67
4,Bologna,44.5,11.34
5,Bolzano,46.5,11.35
6,Brescia,45.55,10.22
7,Cagliari,39.22,9.1
8,Catania,37.5,15.08
9,Ferrara,44.84,11.61


In [3]:
DIST_MATRIX = np.zeros((len(CITIES), len(CITIES)))
for c1, c2 in combinations(CITIES.itertuples(), 2):
    DIST_MATRIX[c1.Index, c2.Index] = DIST_MATRIX[c2.Index, c1.Index] = geodesic(
        (c1.lat, c1.lon), (c2.lat, c2.lon)
    ).km

# Class for the individuals

In [4]:
@dataclass
class Individual:
    genome: list
    fitness : float = None

# Greedy algorithm

In [None]:
def find_closest(segments, city, visited):
    candidates = [
        (pair, distance) for pair, distance in segments
        if city in pair and (other_city := (pair - {city}).pop()) not in visited
    ]
    if len(candidates)==0:
        print("errore")
    closest_segment = min(candidates, key=lambda x: x[1])
    closest_city = (closest_segment[0] - {city}).pop()
    return (int(city), closest_city)

In [6]:
def greedy_sol(city, segments):
    solution = []
    solution.append(city)
    visited = []
    visited.append(int(city))
    while len(visited)<len(CITIES):
        _, c1 = find_closest(segments, city, visited)
        solution.append(c1)
        visited.append(c1)
        city=c1
    solution.append(solution[0])
    
    return solution

In [7]:
starting_city = np.random.randint(0, len(CITIES))
segments = [
        ({c1, c2}, float(DIST_MATRIX[c1, c2])) for c1, c2 in combinations(range(len(CITIES)), 2)
]
greedy_sol(starting_city, segments)

[24,
 8,
 37,
 31,
 17,
 35,
 21,
 14,
 15,
 34,
 39,
 26,
 0,
 33,
 12,
 30,
 9,
 4,
 19,
 32,
 25,
 28,
 18,
 20,
 3,
 6,
 44,
 45,
 23,
 43,
 41,
 5,
 40,
 22,
 42,
 13,
 16,
 29,
 10,
 27,
 11,
 1,
 2,
 38,
 7,
 36,
 24]

# Evolutionary algorithm

### Fitness definition

In [8]:
def fitness(solution):
    tot_dist=0
    for node in range(len(solution)-1):
        tot_dist -= DIST_MATRIX[solution[node], solution[node+1]]
    return tot_dist

### Mutations and Crossover

Follows some crossover and mutations functions that have been tested for the final version.

In [9]:
def inver_over_mutation(parent1):
    child = parent1[:]
    
    start_node = child[0]
    end_node = child[-1]

    if start_node != end_node:
        child.append(start_node)
    
    
    num_inversions = random.randint(1, len(parent1) // 2)  
    
    for _ in range(num_inversions):
        i, j = sorted(random.sample(range(1, len(child) - 1), 2))
        
        child[i:j + 1] = reversed(child[i:j + 1])
    
    if child[-1] != child[0]:
        child[-1] = child[0]
    
    return child

In [None]:
def scramble_mutation(solution, strength = 0.4):
    # use the beta distribution to get a number n considering the strength:
    alpha = (1 - strength) * 5 + 1 
    beta = strength * 5 + 1
    # n between 1 and len(solution)-2
    max_n = len(solution) - 2
    n = int(1 + (max_n - 2) * random.betavariate(alpha, beta))
    indices = random.sample(range(1, len(solution)-3), n) #not the last one nor the first
    # shuffle the value of the indices:
    value_shuffle = [solution[i] for i in indices]
    random.shuffle(value_shuffle)
    for i, indice in enumerate(indices):
        solution[indice] = value_shuffle[i]
    return solution

In [11]:
def swap_mutation(solution):
    index = random.randint(1, len(solution)-3) #not the last one nor the first
    index2=index #should be higher
    while index2<=index:
        index2 = random.randint(1, len(solution)-2)
    selected_edge1 = solution[index]
    selected_edge2 = solution[index2]
    solution[index] = selected_edge2
    solution[index2] = selected_edge1
    return solution

In [12]:
def mutate_strength(individual, mutationRate=0.5):
    for swapped in range(1,len(individual)-1):
        if(random.random() < mutationRate):
            swapWith = int(random.random() * len(individual)-1)
            
            city1 = individual[swapped]
            city2 = individual[swapWith]
            
            individual[swapped] = city2
            individual[swapWith] = city1
    return individual

In [None]:
def inver_over_crossover(parent1, parent2):
    child = parent1[:]
    
    crossover_point = random.randint(1, len(parent1) - 2)
    
    first_segment = parent1[:crossover_point]
    
    second_segment = []
    for city in parent2:
        if city not in first_segment:
            second_segment.append(city)
    
    child = first_segment + second_segment
    
    return child

In [14]:
def crossover_inversion(seq1, seq2):
    dim = len(seq1) - 1  
    pos1, pos2 = sorted(random.sample(range(dim), 2))

    inverted_segment = seq1[pos1:pos2 + 1][::-1] 
    new_son = [None] * dim
    new_son[pos1:pos2 + 1] = inverted_segment

    indice_seq2 = 0
    for k in range(dim):
        if new_son[k] is None:
            while seq2[indice_seq2] in new_son:
                indice_seq2 += 1
            new_son[k] = seq2[indice_seq2]

    new_son.append(new_son[0])
    return new_son

In [15]:
def pmx_crossover(parent1, parent2):
    size = len(parent1)
    p1, p2 = [-1]*size, [-1]*size

    # Select crossover points
    cx_point1 = random.randint(0, size - 2)
    cx_point2 = random.randint(cx_point1 + 1, size - 1)

    # Copy the selected slice from first parent to child
    for i in range(cx_point1, cx_point2 + 1):
        p1[i] = parent1[i]
        p2[i] = parent2[i]

    # Map the values from parent1 to parent2 and vice versa
    for i in range(cx_point1, cx_point2 + 1):
        if parent2[i] not in p1:
            v = parent2[i]
            while p1[parent1.index(v)] != -1:
                v = parent2[parent1.index(v)]
            p1[parent1.index(v)] = parent2[i]

        if parent1[i] not in p2:
            v = parent1[i]
            while p2[parent2.index(v)] != -1:
                v = parent1[parent2.index(v)]
            p2[parent2.index(v)] = parent1[i]

    # Fill the remaining positions with corresponding parent genes
    for i in range(size):
        if p1[i] == -1:
            p1[i] = parent2[i]
        if p2[i] == -1:
            p2[i] = parent1[i]

    return p1, p2

In [16]:
def order_crossover(parent1, parent2):
    size = len(parent1)
    start, end = sorted([random.randint(0, size - 1) for _ in range(2)])

    # Create the child with -1 placeholders
    child = [-1] * size

    # Copy the crossover segment from parent1 to child
    child[start:end + 1] = parent1[start:end + 1]

    # Fill remaining positions with parent2 values in the order they appear
    p2_index = 0
    for i in range(size):
        if child[i] == -1:
            # Ensure that p2_index stays within bounds of parent2
            while p2_index < size and parent2[p2_index] in child:
                p2_index += 1
            # Now assign the value from parent2 if within bounds
            if p2_index < size:
                child[i] = parent2[p2_index]

    return child

### Helping function

In [17]:
def create_random_solution():
    solution =[i for i in range(len(CITIES))]
    np.random.shuffle(solution)
    solution.append(solution[0])
    return solution

Tournament selection

In [18]:
def parent_selection(population):
    candidates = sorted(np.random.choice(population, int(len(population)/8)), key=lambda e: e.fitness, reverse=True)
    return candidates[0]

### Simulated annealing 

In [None]:
def simulated_annealing(solution):
    initial_temperature = 100
    heating_rate = 1.02
    iteration = 0

    # One out of five approach
    recent_improvements = deque(maxlen=5)
    required_improvements = 1  

    # Stopping criteria:
    recent_improvements_stop = deque(maxlen=1000)
    recent_improvements_stop.append(True)

    # Initial solution: greedy one!
    current_solution = solution
    current_cost = fitness(current_solution)
    best_cost = current_cost
    best_solution = current_solution

    temperature = initial_temperature
    while iteration < 1_000:
        iteration += 1
        # Tweak the solution
        random_number = random.random()
        first_time = True
        while random_number > 0.8 or first_time:
            first_time = False
            new_solution = swap_mutation(current_solution.copy())
            new_cost = fitness(new_solution)
            random_number = random.random()
        
        # Variation of fitness by changing sign
        cost_delta = new_cost * (-1) - current_cost * (-1)
        # We are sure the solution after swap mutation is valid if the previous one was.

        if cost_delta < 0 or (random.random() < math.exp(-cost_delta / temperature) and cost_delta != 0):
            current_solution = new_solution
            current_cost = new_cost
            recent_improvements.append(True)
            recent_improvements_stop.append(True)
            if current_cost * (-1) < best_cost * (-1):
                best_cost = current_cost
                best_solution = current_solution
        else:
            recent_improvements.append(False)
            recent_improvements_stop.append(False)

        if recent_improvements.count(True) > required_improvements:
            temperature *= heating_rate  # More exploration
        if recent_improvements.count(True) < required_improvements:
            temperature /= heating_rate

        if recent_improvements_stop.count(True) == 0:  # Stop condition
            break
    return best_solution

## EA final implementation

Since, sometimes it does not reach the best solution, the idea is to simplify the number of iteration of the code and execute it more time and choose the best result over all the executions.

In some dataset is better to continue running the instance for a longher time, in others, instead, is better to execute more than once different instances to reach to an optimum solution (it would be better to to it in parallele using thread).

To consider all these elements together, we execute the EA algorithm considering the length of the city space.

In [20]:
def greedy_2(startingcity: int):
    visited = np.full(len(CITIES), False)
    dist = DIST_MATRIX.copy()
    city = startingcity
    visited[city] = True
    tsp = list()
    tsp.append(int(city))
    while not np.all(visited):
        dist[:, city] = np.inf
        closest = np.argmin(dist[city])
        visited[closest] = True
        city = closest
        tsp.append(int(city))
    tsp.append(tsp[0])
    return tsp


In [21]:

def execute_EA(small_db = False, POPULATION_SIZE=100, OFFSPRING_SIZE=200, MAX_GENERATIONS=200):
    segments = [
        ({c1, c2}, float(DIST_MATRIX[c1, c2])) for c1, c2 in combinations(range(len(CITIES)), 2)
    ]
    population = [Individual(simulated_annealing(greedy_sol(random.randint(0, len(CITIES)-1), segments))) for _ in range(int(POPULATION_SIZE))]
    # while len(population)<POPULATION_SIZE:
    #     population.append(Individual(genome=simulated_annealing(create_random_solution())))
    for i in population:
        i.fitness = fitness(i.genome)
    population.sort(key=lambda i: i.fitness, reverse=True)


    for g in range(MAX_GENERATIONS):
        offspring = []
        for _ in range (OFFSPRING_SIZE):
            if np.random.random()<0.4:#mutation probability:
                p=parent_selection(population)
                if small_db:
                    o=swap_mutation(p.genome.copy())
                else:
                    o=mutate_strength(p.genome.copy())
            else:
                i1 = parent_selection(population)
                i2 = parent_selection(population)
                o = crossover_inversion(i1.genome.copy(), i2.genome.copy())
            offspring.append(Individual(genome=o, fitness =fitness(o)))
            if small_db:
                o2 = swap_mutation(o.copy())
            else:
                o2 = mutate_strength(o.copy())
            if np.random.random()<0.05:
                o3 = simulated_annealing(o2.copy())
                offspring.append(Individual(genome=o3, fitness =fitness(o3)))
            offspring.append(Individual(genome=o2, fitness =fitness(o2)))

        population.extend(offspring)
        population.sort(key=lambda i: i.fitness, reverse=True)
        population = population[:POPULATION_SIZE]
        if g%50==0:
            print("sol so far at gen: ", g, " is: ",fitness(population[0].genome)*(-1))

    population.sort(key=lambda i: i.fitness, reverse=True)
    population = population[:POPULATION_SIZE]
    return population[0]

best_fitness = float('inf')*(-1)
best_sol = None

instances0 = [(100, 200, 20)]
instances1 = [(100, 305, 200), (100, 305, 200), (50, 80, 300)]
instances2 = [(100, 305, 1_500)]
instances3 = [(100, 305, 2_450)]

best_fitness = float('-inf')
best_sol = None

if len(CITIES)<100:
    instances=instances1
if len(CITIES)<30:
    instances=instances0
if len(CITIES)>100:
    instances=instances2
if len(CITIES)>200:
    instances=instances3


for current in instances:
    if len(CITIES)<50:
        valore = execute_EA(True,current[0], current[1], current[2]) 
    else:
        valore = execute_EA(False,current[0], current[1], current[2]) 
    if valore.fitness > best_fitness:
        best_fitness = valore.fitness
        best_sol = valore.genome

print("best fitness: ", best_fitness*(-1))
print(best_sol)

sol so far at gen:  0  is:  4305.478333870562
sol so far at gen:  50  is:  4181.619799758897
sol so far at gen:  100  is:  4172.762613916409
sol so far at gen:  150  is:  4172.762613916409


KeyboardInterrupt: 