Copyright **`(c)`** 2024 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free under certain conditions — see the [`license`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [6]:
import logging
from itertools import combinations
import pandas as pd
import numpy as np
from geopy.distance import geodesic
import networkx as nx
import random
from icecream import ic
from matplotlib import pyplot as plt
logging.basicConfig(level=logging.DEBUG)

Changing the name of the file in read_csv() attributes it is possible running the function with the five different countries.

In [7]:
CITIES = pd.read_csv('cities/russia.csv', header=None, names=['name', 'lat', 'lon'])
DIST_MATRIX = np.zeros((len(CITIES), len(CITIES)))
for c1, c2 in combinations(CITIES.itertuples(), 2):
    DIST_MATRIX[c1.Index, c2.Index] = DIST_MATRIX[c2.Index, c1.Index] = geodesic(
        (c1.lat, c1.lon), (c2.lat, c2.lon)
    ).km
CITIES.head()

Unnamed: 0,name,lat,lon
0,Abakan,53.72,91.43
1,Achinsk,56.28,90.5
2,Almetyevsk,54.9,52.31
3,Angarsk,52.57,103.91
4,Arkhangelsk,64.57,40.53


## Lab2 - TSP

https://www.wolframcloud.com/obj/giovanni.squillero/Published/Lab2-tsp.nb

- tsp_cost() function verify that the path selected starts and finishes with the same city and it contains all of the cities in the file csv. After that, tsp_cost calculates the sum of the distances between every step of the selected path;
- Individual class represents a TSP path with two attributes: genome (the order of the visited cities) and the fitness (the opposite of the cost);
- roulette_wheel_selection(), selects and individual in base of its fitness;
- xover() is a crossover operator that combines the genomes of two parents to produce a child. It copies a subsequence of p1 into the child_genome, than it fills the rest of the child_genome with unique elements from p2.
- cycle_crossover() is called by xover() with probability of 50%. It uses an alternative cyclic crossover to produce the child, ensuring that the it receives information from both parents and respecting the order of visiting the cities in p1.
- mutation() choices a selected segments random number between 3 and 5, it creates all the possibie inversion combinations and with a 30% probability it traslates a segment.

In [8]:
def tsp_cost(tsp):
    assert tsp[0] == tsp[-1]
    assert set(tsp) == set(range(len(CITIES)))

    tot_cost = 0
    for c1, c2 in zip(tsp, tsp[1:]):
        tot_cost += DIST_MATRIX[c1, c2]
    return tot_cost

class Individual:
    def __init__(self, genome):
        self.genome = genome
        self.fitness = -tsp_cost(genome)


def roulette_wheel_selection(population):
    total_fitness = sum(ind.fitness for ind in population)
    pick = random.uniform(0, total_fitness)
    current = 0
    for ind in population:
        current += ind.fitness
        if current >= pick:
            return ind 
    return random.choice(population)    


def xover(p1: Individual, p2: Individual):
    start, end = sorted(random.sample(range(1, len(CITIES)), 2))
    
    child_genome = [None] * len(CITIES)
    child_genome[start:end] = p1.genome[start:end]

    p2_pointer = 0
    for i in range(len(CITIES)):
        if child_genome[i] is None:
            while p2.genome[p2_pointer] in child_genome:
                p2_pointer += 1
            child_genome[i] = p2.genome[p2_pointer]

    child_genome.append(child_genome[0])

    if random.random() < 0.5:
        child_genome = cycle_crossover(p1.genome, p2.genome)

    return Individual(child_genome)

def cycle_crossover(p1_genome, p2_genome):
    child_genome = [None] * len(p1_genome)
    cycle = set()
    start = 1  
    while len(cycle) < len(p1_genome) - 1:
        if start not in cycle:
            cycle.add(start)
            index = p1_genome.index(p2_genome[start])
            while index not in cycle:
                cycle.add(index)
                index = p1_genome.index(p2_genome[index])
        start += 1
    for i in cycle:
        child_genome[i] = p1_genome[i]
    for i in range(len(child_genome)):
        if child_genome[i] is None:
            child_genome[i] = p2_genome[i]
    child_genome[0] = child_genome[-1] = p1_genome[0]
    return child_genome

def mutation(p: Individual):
    genome = p.genome.copy()

    points = sorted(random.sample(range(1, len(genome) - 1), random.randint(3, 5)))
    segments = [genome[points[i]:points[i + 1]] for i in range(len(points) - 1)]
    
    inverted_segments = [seg[::-1] if random.random() < 0.5 else seg for seg in segments]
    new_genome = genome[:points[0]] + sum(inverted_segments, []) + genome[points[-1]:]

    if random.random() < 0.3: 
        start, end = sorted(random.sample(range(1, len(new_genome) - 1), 2))
        segment = new_genome[start:end]
        new_genome = new_genome[:start] + new_genome[end:]
        insert_pos = random.randint(1, len(new_genome) - 1)
        new_genome = new_genome[:insert_pos] + segment + new_genome[insert_pos:]

    return Individual(new_genome)

## Greedy Algorithm (Nearest Neighbour)

It generates an initial solution with the euristic Nearest Neighbour, it consists in choosing the closest city to the current one as following step.

In [9]:
def greedy_nearest_neighbor():
    visited = np.full(len(CITIES), False)
    dist = DIST_MATRIX.copy()
    city = 0
    visited[city] = True
    tsp = list()
    tsp.append(int(city))
    while not np.all(visited):
        dist[:, city] = np.inf
        closest = np.argmin(dist[city])
        logging.debug(
            f"step: {CITIES.at[city,'name']} -> {CITIES.at[closest,'name']} ({DIST_MATRIX[city,closest]:.2f}km)"
        )
        visited[closest] = True
        city = closest
        tsp.append(int(city))
    logging.debug(
        f"step: {CITIES.at[tsp[-1],'name']} -> {CITIES.at[tsp[0],'name']} ({DIST_MATRIX[tsp[-1],tsp[0]]:.2f}km)"
    )
    tsp.append(tsp[0])
    logging.info(f"result: Found a path of {len(tsp)-1} steps, total length {tsp_cost(tsp):.2f}km")
    return Individual(tsp)
    





## Evolutionary Algorithm

In order to solve the TSP problem with many cities, it would be better choosing an elevated number of POPULATION_SIZE and OFFSPRING_SIZE.

The evolve() function accepts this arguments:
- population, a list of individuals (pathways) with their genomes and fitness;
- generations, the maximum number of generations for this evolutionary algorithm;
- mutation rate, the initial mutation probability of every individual;
- cooling rate, a paramether uses by the Simulated Annealing algorithms in order to reduce the probability of explorate worse solutions after each iteration;
- stagnation limit, the maximum number of iterations without improvements, useful to increase the mutation rate;
- adaptive_increase, the adaptive increase after a stagnation period;

Each child of the current generation offspring is generated muting a random parent (with a probability equal to the mutation rate) or with the xover function.
The algorithm is executed a second time, with the best path found the first time also included in the initial population.

In [None]:
POPULATION_SIZE = 500  
OFFSPRING_SIZE = 100
population = []   
num_greedy_routes = int(POPULATION_SIZE * 0.1)
for start_city in range(num_greedy_routes):
    population.append(greedy_nearest_neighbor())

for _ in range(POPULATION_SIZE - len(population)):
    random_path = list(range(len(CITIES)))
    random.shuffle(random_path)
    random_path.append(random_path[0])
    population.append(Individual(random_path))


def evolve(population, generations=150_000, mutation_rate=0.3, initial_temperature=10_000.0, cooling_rate=0.999, stagnation_limit=5_000, adaptive_increase=0.1):
    best_path = max(population, key=lambda ind: ind.fitness)
    current_best = best_path
    temperature = initial_temperature
    stagnation_counter = 0  
    base_mutation_rate = mutation_rate  
    history = []
    
    logging.info(f"Initial Best Path: Path cost: {-best_path.fitness:.2f} km")
    
    for generation in range(generations):
        offspring = []
        
        for _ in range(OFFSPRING_SIZE):
           
            if random.random() < mutation_rate:
                parent = roulette_wheel_selection(population)
                child = mutation(parent)
            else:
                parent1 = roulette_wheel_selection(population)
                parent2 = roulette_wheel_selection(population)
                
                child = xover(parent1, parent2)

            offspring.append(child)

        
        improved = False
        for child in offspring:
            child.fitness = -tsp_cost(child.genome)
            delta_fitness = child.fitness - current_best.fitness

            if delta_fitness > 0 or random.random() < np.exp(delta_fitness / temperature):
                current_best = child
                if child.fitness > best_path.fitness:
                    best_path = child
                    improved = True  
                    stagnation_counter = 0  

        if not improved:
            stagnation_counter += 1

        if stagnation_counter >= stagnation_limit:
            mutation_rate = min(1.0, mutation_rate + adaptive_increase)

        if improved:
            mutation_rate = base_mutation_rate

        population.extend(offspring)
        population.sort(key=lambda ind: ind.fitness, reverse=True)
        population = population[:POPULATION_SIZE]

        temperature *= cooling_rate

        history.append(-best_path.fitness)
        
        if generation % 10_000 == 0:
            logging.info(f"Generation {generation}: Best Path Cost: {-best_path.fitness:.2f} km")
            random_path = list(range(len(CITIES)))
            random.shuffle(random_path)
            random_path.append(random_path[0])
            population.append(Individual(random_path))
    
    return best_path, history

best_path, history = evolve(population)

logging.info(f"Best Path Found in the final phase: {best_path.genome}")
logging.info(f"Final Best Path Cost: {tsp_cost(best_path.genome):.2f} km")

plt.figure(figsize=(14, 8))
plt.plot(range(len(history)), history, color="blue", label="Cost over generations")
plt.scatter(range(len(history)), history, color="red", marker=".", label="Individual cost")
plt.title("Progression of Cost over Generations")
plt.xlabel("Generations")
plt.ylabel("Cost (km)")
plt.legend()
plt.grid(True)
plt.show()


DEBUG:root:step: Abakan -> Krasnoyarsk (276.58km)
DEBUG:root:step: Krasnoyarsk -> Achinsk (161.71km)
DEBUG:root:step: Achinsk -> Kemerovo (296.59km)
DEBUG:root:step: Kemerovo -> Leninsk‐Kuznetskiy (74.76km)
DEBUG:root:step: Leninsk‐Kuznetskiy -> Prokopyevsk (91.87km)
DEBUG:root:step: Prokopyevsk -> Novokuznetsk (30.63km)
DEBUG:root:step: Novokuznetsk -> Biysk (187.38km)
DEBUG:root:step: Biysk -> Barnaul (132.82km)
DEBUG:root:step: Barnaul -> Novosibirsk (194.50km)
DEBUG:root:step: Novosibirsk -> Tomsk (206.90km)
DEBUG:root:step: Tomsk -> Seversk (14.97km)
DEBUG:root:step: Seversk -> Rubtsovsk (613.13km)
DEBUG:root:step: Rubtsovsk -> Omsk (647.47km)
DEBUG:root:step: Omsk -> Tobolsk (475.40km)
DEBUG:root:step: Tobolsk -> Tyumen (200.98km)
DEBUG:root:step: Tyumen -> Kurgan (189.69km)
DEBUG:root:step: Kurgan -> Kopeysk (236.87km)
DEBUG:root:step: Kopeysk -> Chelyabinsk (14.72km)
DEBUG:root:step: Chelyabinsk -> Miass (87.20km)
DEBUG:root:step: Miass -> Zlatoust (33.88km)
DEBUG:root:step: Zl