In [None]:
import logging
from itertools import combinations
import pandas as pd
from tqdm.auto import tqdm

import random

import numpy as np
from geopy.distance import geodesic
import networkx as nx

from icecream import ic

logging.basicConfig(level=logging.DEBUG)

## Lab2 - TSP



In [None]:
# Calculate the distance matrix

CITIES = pd.read_csv('../cities/china.csv', header=None, names=['name', 'lat', 'lon'])
DIST_MATRIX = np.zeros((len(CITIES), len(CITIES)))

for c1, c2 in combinations(CITIES.itertuples(), 2):
    DIST_MATRIX[c1.Index, c2.Index] = DIST_MATRIX[c2.Index, c1.Index] = geodesic(
        (c1.lat, c1.lon), (c2.lat, c2.lon)
    ).km
CITIES.head()


# US : 3_901_662_065_726,7 km shortees tour 
# italy : 4_172.76km shortest tour
# russia : 32_722.5km km shortest tour

In [None]:
def tsp_cost(tsp):
    assert tsp[0] == tsp[-1]
    assert set(tsp) == set(range(len(CITIES)))

    tot_cost = 0
    for c1, c2 in zip(tsp, tsp[1:]):
        tot_cost += DIST_MATRIX[c1, c2]
    return tot_cost





# GREEDY : 


In [None]:
best_tsp = None
best_cost = float('inf')



for i in range(len(CITIES)):


    visited = np.full(len(CITIES), False)
    dist = DIST_MATRIX.copy()
    city = i
    visited[city] = True
    tsp = [int(city)]

    while not np.all(visited):

        dist[:, city] = np.inf
        closest = np.argmin(dist[city])

        visited[closest] = True
        city = closest
        tsp.append(int(city))
    
    tsp.append(tsp[0])  # Chiudo  percorso
    cost = tsp_cost(tsp)

    
    if cost < best_cost:
        best_cost = cost
        best_tsp = tsp

# Output finale del miglior percorso
logging.info(f"Best path found has length {best_cost:.2f} km" ) 


for c1, c2 in zip(best_tsp, best_tsp[1:]):
    logging.info(
        f"step: {CITIES.at[c1,'name']} -> {CITIES.at[c2,'name']} ({DIST_MATRIX[c1,c2]:.2f}km)"
    )



# GREEDY with mutations


In [None]:
#  mutazione 
def insert_mutation(path):
    path = path.copy()
    i, j = random.sample(range(1, len(path) - 1), 2 ) 

    if i < j:
        element = path.pop(j)
        path.insert(i + 1, element)
    else:
        element = path.pop(j)
        path.insert(i, element)

    return path



initial_attempts = 1000
improvement_threshold = 0.001  # Soglia di miglioramento minimo percentuale
num_attempts = initial_attempts


# Algoritmo Greedy con miglioramento post-mutazione
for i in range(len(CITIES)):
    visited = np.full(len(CITIES), False)
    dist = DIST_MATRIX.copy()
    city = i
    visited[city] = True
    tsp = [int(city)]

    # Costruzione del percorso con algoritmo Greedy
    while not np.all(visited):
        dist[:, city] = np.inf
        closest = np.argmin(dist[city])
        visited[closest] = True
        city = closest
        tsp.append(int(city))

    tsp.append(tsp[0])  # Chiude il percorso
    cost = tsp_cost(tsp)

    if cost < best_cost:
        best_cost = cost
        best_tsp = tsp

    # Miglioramento post-mutazione con riduzione dinamica degli attempt
    last_cost = cost  
    for _ in range(num_attempts):
        new_tsp = insert_mutation(tsp)  # Usa insert_mutation
        new_cost = tsp_cost(new_tsp)
        
        if new_cost < cost:
            tsp = new_tsp
            cost = new_cost
            
            if cost < best_cost:
                best_cost = cost
                best_tsp = tsp

            # Riduci il numero di tentativi se i miglioramenti sono piccoli
            if abs(last_cost - cost) / last_cost < improvement_threshold:
                num_attempts = max(100, int(num_attempts * 0.9))  #  - 10% 
            last_cost = cost


# Output finale del miglior percorso
logging.info(f"Best path found has length {best_cost:.2f} km")

for c1, c2 in zip(best_tsp, best_tsp[1:]):
    logging.info(
        f"step: {CITIES.at[c1, 'name']} -> {CITIES.at[c2, 'name']} ({DIST_MATRIX[c1, c2]:.2f}km)"
    )


# SIMULATED ANNEALING


In [None]:
def fitness(tsp):
    assert tsp[0] == tsp[-1]
    assert set(tsp) == set(range(len(CITIES)))

    tot_cost = 0
    for c1, c2 in zip(tsp, tsp[1:]):
        tot_cost += DIST_MATRIX[c1, c2]
    return tot_cost



#  mutazione 
def insert_mutation(path):
    path = path.copy()
    i, j = random.sample(range(1, len(path) - 1), 2 ) 

    if i < j:
        element = path.pop(j)
        path.insert(i + 1, element)
    else:
        element = path.pop(j)
        path.insert(i, element)

    return path


# mutazione con num_swaps dinamico
def insert_mutation_swap(path, num_swaps):
    path = path.copy()
    
    for _ in range(num_swaps):
        i, j = random.sample(range(1, len(path) - 1), 2)
        
        # Inserisce path[j] in posizione i (controllando l'ordine di i e j)
        if i < j:
            element = path.pop(j)
            path.insert(i + 1, element)
        else:
            element = path.pop(j)
            path.insert(i, element)
    
    return path


# non usata 
def insert_mutation_temp2(path, temperature):
    path = path.copy()
    
    i = random.randint(1, len(path) - 2)
    
    # Calcola una distanza massima tra i e j basata sulla temperatura
    max_distance = int((temperature / 1000) * (len(path) - 2))
    max_distance = max(1, min(max_distance, len(path) - 2))  # Limita max_distance all'intervallo valido

    # j con una distanza casuale fino a max_distance da i
    j = random.randint(max(1, i - max_distance), min(len(path) - 2, i + max_distance))

    # mutazione: 
    if i < j:
        element = path.pop(j)
        path.insert(i + 1, element)
    else:
        element = path.pop(j)
        path.insert(i, element)

    return path




# non usata
def scrambling_mutation(path):
    path = path.copy()
    # subset da mischiare
    i, j = sorted(random.sample(range(1, len(path) - 1), 2))  # Evita punti iniziale e finale

    subset = path[i:j+1]
    random.shuffle(subset)

    path[i:j+1] = subset

    return path



# SIMULATED ANNEALING 


In [None]:


initial_temp = 1000  
cooling_rate = 0.995
#num_iterations = 700_000  # iterazioni
num_iterations = 1_000_000  # iterazioni # china


#buffer_size = 4                         # russia - italy  
#buffer_size = 5                         # US 
buffer_size = 8                          # china


current_path = list(range(len(CITIES))) + [0]    # Percorso iniziale che parte e torna alla città 0   *** DA SISTEMARE  . inizia con un percorso meglio?????? 


best_path = current_path
current_cost = fitness(current_path)
best_cost = current_cost

temperature = initial_temp
cost_buffer = []

for iteration in tqdm (range(num_iterations) ): 
        
    new_path = insert_mutation(current_path)  ## 
    new_cost = fitness(new_path)

    # Calcola la differenza di costo
    delta_cost = new_cost - current_cost

    # Decide se accettare il nuovo percorso
    if delta_cost < 0 or np.random.random() < np.exp(-delta_cost / temperature):
        current_path = new_path
        current_cost = new_cost

        if current_cost < best_cost:
            best_cost = current_cost
            best_path = current_path

    
    cost_buffer.append(current_cost)
    if len(cost_buffer) > buffer_size:
        cost_buffer.pop(0)

  # Aggiorna la temperatura solo se il trend è in discesa
    if iteration % buffer_size == 0 and len(cost_buffer) == buffer_size:
        avg_cost = np.mean(cost_buffer)
        if avg_cost < current_cost: # Se la media è in discesa ... 
            temperature *= cooling_rate


    if iteration % 100_000 == 0:
        logging.info(f"Iteration {iteration}, Best Cost: {best_cost:.2f} km, Temperature: {temperature:.2f}")

    if temperature < 1e-5:
        break

logging.info(f"Best path found with length {best_cost:.2f} km,  after {iteration} iterations")




# SIMULATED ANNEALING with swapping mutation

In [None]:

initial_temp = 1000  
cooling_rate = 0.995
#num_iterations = 700_000  # iterazioni
num_iterations = 1_000_000  # iterazioni # china


#buffer_size = 4                         # russia - italy  
buffer_size = 5                          # US 
buffer_size = 8                          # china


current_path = list(range(len(CITIES))) + [0]

best_path = current_path
current_cost = fitness(current_path)
best_cost = current_cost

temperature = initial_temp
cost_buffer = []


 # num_swaps in base alla temperatura
def calcola_num_swaps(temperature, max_swaps):
    return max(1, int(max_swaps * temperature / initial_temp))



for iteration in tqdm(range(num_iterations)):

    #max_swaps = 4  # russia- italy  ( 4 )
    #max_swaps = 5  # us- china  ( 5 )
    max_swaps = 8   # china   ( 8 )


    num_swaps = calcola_num_swaps(temperature, max_swaps)
    new_path = insert_mutation_swap(current_path, num_swaps)

    new_cost = fitness(new_path)

    delta_cost = new_cost - current_cost

    if delta_cost < 0 or np.random.random() < np.exp(-delta_cost / temperature):
        current_path = new_path
        current_cost = new_cost

        if current_cost < best_cost:
            best_cost = current_cost
            best_path = current_path

    cost_buffer.append(current_cost)
    if len(cost_buffer) > buffer_size:
        cost_buffer.pop(0)

    if iteration % buffer_size == 0 and len(cost_buffer) == buffer_size:
        avg_cost = np.mean(cost_buffer)
        if avg_cost < current_cost:
            temperature *= cooling_rate

    if iteration % 100_000 == 0:
        logging.info(f"Iteration {iteration}, Best Cost: {best_cost:.2f} km, Temperature: {temperature:.2f}")

    if temperature < 1e-5:
        break

logging.info(f"Best path found with length {best_cost:.2f} km, after {iteration} iterations")