In [165]:
import logging
from itertools import combinations

import numpy as np
import matplotlib.pyplot as plt
import networkx as nx

from icecream import ic

import random
import copy
from tqdm import tqdm 

Cost: $d + (d \cdot \alpha \cdot w)^\beta$ with $\alpha \ge 0$ and $\beta \ge 0$

In [166]:
class Problem:
    _graph: nx.Graph
    _alpha: float
    _beta: float

    def __init__(
        self,
        num_cities: int,
        *,
        alpha: float = 1.0,
        beta: float = 1.0,
        density: float = 0.5,
        seed: int = 42,
    ):
        rng = np.random.default_rng(seed)
        self._alpha = alpha
        self._beta = beta
        cities = rng.random(size=(num_cities, 2))
        cities[0, 0] = cities[0, 1] = 0.5

        self._graph = nx.Graph()
        self._graph.add_node(0, pos=(cities[0, 0], cities[0, 1]), gold=0)
        for c in range(1, num_cities):
            self._graph.add_node(c, pos=(cities[c, 0], cities[c, 1]), gold=(1 + 999 * rng.random()))

        tmp = cities[:, np.newaxis, :] - cities[np.newaxis, :, :]
        d = np.sqrt(np.sum(np.square(tmp), axis=-1))
        for c1, c2 in combinations(range(num_cities), 2):
            if rng.random() < density or c2 == c1 + 1:
                self._graph.add_edge(c1, c2, dist=d[c1, c2])

        assert nx.is_connected(self._graph)

    @property
    def graph(self) -> nx.Graph:
        return nx.Graph(self._graph)

    @property
    def alpha(self):
        return self._alpha

    @property
    def beta(self):
        return self._beta

    def cost(self, path, weight):
        dist = nx.path_weight(self._graph, path, weight='dist')
        return dist + (self._alpha * dist * weight) ** self._beta

    def baseline(self):
        total_cost = 0
        for dest, path in nx.single_source_dijkstra_path(
            self._graph, source=0, weight='dist'
        ).items():
            cost = 0
            for c1, c2 in zip(path, path[1:]):
                cost += self.cost([c1, c2], 0)
                cost += self.cost([c1, c2], self._graph.nodes[dest]['gold'])
            logging.debug(
                f"dummy_solution: go to {dest} ({' > '.join(str(n) for n in path)} ({cost})"
            )
            total_cost += cost
        return total_cost

    def plot(self):
        plt.figure(figsize=(10, 10))
        pos = nx.get_node_attributes(self._graph, 'pos')
        size = [100] + [self._graph.nodes[n]['gold'] for n in range(1, len(self._graph))]
        color = ['red'] + ['lightblue'] * (len(self._graph) - 1)
        return nx.draw(self._graph, pos, with_labels=True, node_color=color, node_size=size)

## $\beta$ Variants

Glossary:
1. non_random_sol_ratio-> percentuale di soluzioni pre-calcolate (non randomiche) sul totale della popolazione iniziale

2. add_zero_prob -> probabilità di aggiungere un ritorno al nodo zero nel percorso

In [167]:
def get_betaVariant(beta:float, num_cities:int):
    add_zero_prob = 0.0
    if beta < 1:
        non_random_sol_ratio = 0.2
    elif beta == 1:
        non_random_sol_ratio = 0.3
    else:
        non_random_sol_ratio = 0.5
        add_zero_prob = 0.6

    if num_cities >= 100:
        non_random_sol_ratio += 0.1
        

    return non_random_sol_ratio, add_zero_prob


## Population

In [None]:
def get_random_individual(num_cities:int):
    cities = list(range(1, num_cities))  
    random.shuffle(cities)
    individual = [0] + cities + [0]  
    return individual

In [169]:
def get_distance_matrix(problem: Problem):
    length_dict = dict(nx.all_pairs_dijkstra_path_length(problem._graph, weight='dist'))
    
    num_cities = len(problem._graph.nodes)
    matrix = np.zeros((num_cities, num_cities))
    
    for i in range(num_cities):
        for j in range(num_cities):
            matrix[i][j] = length_dict[i][j]
            
    return matrix

In [170]:
def randomized_greedy_nearest_neighbor(dist_matrix, k=3):
    num_cities = dist_matrix.shape[0]
    unvisited = set(range(1, num_cities))
    current_node = 0
    path = []
    
    while unvisited:
        candidates = list(unvisited)
        dists = [(city, dist_matrix[current_node][city]) for city in candidates]
        dists.sort(key=lambda x: x[1])
        top_k_candidates = dists[:k]
        chosen_city, _ = random.choice(top_k_candidates)
        path.append(chosen_city)
        unvisited.remove(chosen_city)
        current_node = chosen_city

    return [0] + path + [0]

In [171]:
def cluster_individual(problem, dist_matrix):
    num_cities = len(problem._graph.nodes)
    unvisited = set(range(1, num_cities)) 
    path = [0]
    if problem._beta > 2.0:
        max_cluster_size = 1
    elif problem._beta > 1.5:
        max_cluster_size = 2 
    else:
        max_cluster_size = 4 
        
    while unvisited:
        pivot = random.choice(list(unvisited))
        cluster = [pivot]
        unvisited.remove(pivot)
        current_cluster_limit = random.randint(1, max_cluster_size)
        while len(cluster) < current_cluster_limit and unvisited:
            current_node = cluster[-1]
            candidates = list(unvisited)
            nearest = min(candidates, key=lambda city: dist_matrix[current_node][city])
            cluster.append(nearest)
            unvisited.remove(nearest)
        path.extend(cluster)
        path.append(0)
    return path

In [172]:
def get_non_random_individual(problem:Problem, mod: str, distance_matrix):
    if mod == 'Greedy Nearest Neighbor':
        individual = randomized_greedy_nearest_neighbor(distance_matrix, k=3)
    else:
        individual = cluster_individual(problem, distance_matrix)

    return individual


In [173]:
def init_population(problem: Problem, population_size: int, dist_matrix, non_random_sol_ratio) -> list[list[int]]:
    num_non_random = int(population_size * non_random_sol_ratio)
    population = []
    if problem._beta < 1:
        for _ in range(num_non_random):
            individual = get_non_random_individual(problem, 'Greedy Nearest Neighbor', dist_matrix) # TODO: use precomputed distance matrix
            population.append(individual)
    elif problem._beta == 1:
        for _ in range(num_non_random):
            individual = get_non_random_individual(problem, 'Greedy Nearest Neighbor', dist_matrix) # TODO: use precomputed distance matrix
            population.append(individual)
    else:
        for _ in range(num_non_random):
            individual = get_non_random_individual(problem, 'Cluster', dist_matrix) # TODO: use precomputed distance matrix
            population.append(individual)


    for _ in range(population_size - num_non_random):
        individual = get_random_individual(len(problem._graph.nodes))
        population.append(individual)
        
    return population

In [None]:
def get_baseline_individual(problem:Problem):
    """
    Genera manualmente l'individuo Baseline (Dijkstra):
    0 -> Nodo1 -> 0 -> Nodo2 -> 0 ...
    """
    genome = []

    nodes = list(problem._graph.nodes)
    if 0 in nodes:
        nodes.remove(0)
        
    for node in nodes:
        genome.append(node)
        genome.append(0)
        
    return [0] + genome

In [175]:
problem = Problem(100, density=0.2, alpha=1, beta=0.5)
nodes = list(problem._graph.nodes)
individual = []
if 0 in nodes:
    nodes.remove(0)
for node in nodes:
    individual.append(node)
    individual.append(0)
individual.pop()
individual


[1,
 0,
 2,
 0,
 3,
 0,
 4,
 0,
 5,
 0,
 6,
 0,
 7,
 0,
 8,
 0,
 9,
 0,
 10,
 0,
 11,
 0,
 12,
 0,
 13,
 0,
 14,
 0,
 15,
 0,
 16,
 0,
 17,
 0,
 18,
 0,
 19,
 0,
 20,
 0,
 21,
 0,
 22,
 0,
 23,
 0,
 24,
 0,
 25,
 0,
 26,
 0,
 27,
 0,
 28,
 0,
 29,
 0,
 30,
 0,
 31,
 0,
 32,
 0,
 33,
 0,
 34,
 0,
 35,
 0,
 36,
 0,
 37,
 0,
 38,
 0,
 39,
 0,
 40,
 0,
 41,
 0,
 42,
 0,
 43,
 0,
 44,
 0,
 45,
 0,
 46,
 0,
 47,
 0,
 48,
 0,
 49,
 0,
 50,
 0,
 51,
 0,
 52,
 0,
 53,
 0,
 54,
 0,
 55,
 0,
 56,
 0,
 57,
 0,
 58,
 0,
 59,
 0,
 60,
 0,
 61,
 0,
 62,
 0,
 63,
 0,
 64,
 0,
 65,
 0,
 66,
 0,
 67,
 0,
 68,
 0,
 69,
 0,
 70,
 0,
 71,
 0,
 72,
 0,
 73,
 0,
 74,
 0,
 75,
 0,
 76,
 0,
 77,
 0,
 78,
 0,
 79,
 0,
 80,
 0,
 81,
 0,
 82,
 0,
 83,
 0,
 84,
 0,
 85,
 0,
 86,
 0,
 87,
 0,
 88,
 0,
 89,
 0,
 90,
 0,
 91,
 0,
 92,
 0,
 93,
 0,
 94,
 0,
 95,
 0,
 96,
 0,
 97,
 0,
 98,
 0,
 99]

In [None]:
class GAConfig:
    def __init__(self, problem_instance):
        self.problem = problem_instance
        self.POPULATION_SIZE = max(100, len(problem_instance._graph.nodes) * 2)
        self.OFFSPRING_SIZE = 50    
        self.GENERATIONS = 1000
        if problem_instance._beta > 1:
            self.MUTATION_RATE = 0.4
            self.CROSSOVER_RATE = 0.2
        else:
            self.CROSSOVER_RATE = 0.8
            self.MUTATION_RATE = 0.2
        self.TOURNAMENT_SIZE = 3     # selective pressure
        self.ELITISM_SIZE = 5       

    
# ==========================================
# 2. Genetic Operators
# ==========================================

def cost(problem:Problem, solution: list, dist_matrix=None) -> float:
    total_cost = 0.0
    current_weight = 0.0
    path = solution

    if not path or path[0] != 0:
        path = [0] + list(path)
    if path[-1] != 0:
        path = path + [0]
        
    for i in range(len(path) - 1):
        u = path[i]     
        v = path[i+1]   
        d = dist_matrix[u][v]
        
        step_cost = d + (d * problem._alpha * current_weight) ** problem._beta
        total_cost += step_cost
       
        if v == 0:
            current_weight = 0.0
        else:
            gold_at_node = problem._graph.nodes[v]['gold']
            current_weight += gold_at_node
            
    return total_cost

def tournament_selection(population, fitnesses, k=3):
    candidates_indices = random.sample(range(len(population)), k)
    best_idx = candidates_indices[0]
    best_fit = fitnesses[best_idx]
    
    for idx in candidates_indices[1:]:
        if fitnesses[idx] < best_fit: 
            best_fit = fitnesses[idx]
            best_idx = idx
            
    return population[best_idx]

def crossover(parent1, parent2):
    has_internal_zeros = 0 in parent1[1:-1]
    
    if has_internal_zeros:
        return copy.deepcopy(random.choice([parent1, parent2]))
    
    else:
        size = len(parent1)
        
        start, end = sorted(random.sample(range(1, size-1), 2))
        
        child = [None] * size
        
        child[0] = 0
        child[-1] = 0
        
        child[start:end+1] = parent1[start:end+1]
        
        genes_in_segment = set(child[start:end+1])
        
        p2_genes = [gene for gene in parent2 if gene not in genes_in_segment and gene != 0]
        
        current_p2_index = 0
        for i in range(1, size-1):
            if child[i] is None:
                child[i] = p2_genes[current_p2_index]
                current_p2_index += 1
                
        return child

def mutation(genome, mutation_rate, beta, add_zero_prob):
    if random.random() < mutation_rate:

        allow_internal_zeros = False
        if beta > 1:
            allow_internal_zeros = True
        elif 0 in genome[1:-1]:
            allow_internal_zeros = True
    
        mutation_type = random.random()

        if mutation_type < 0.6 or not allow_internal_zeros:
            idx1, idx2 = random.sample(range(1, len(genome) - 1), 2)
            start, end = sorted([idx1, idx2])
            genome[start:end] = genome[start:end][::-1]
        elif allow_internal_zeros:
            if random.random() < add_zero_prob:
                insert_idx = random.randint(1, len(genome) - 1)
                if genome[insert_idx-1] != 0 and genome[insert_idx] != 0:
                    genome.insert(insert_idx, 0)
            else:
                zero_indices = [i for i, x in enumerate(genome) if x == 0 and i != 0 and i != len(genome)-1]
                if zero_indices:
                    remove_idx = random.choice(zero_indices)
                    genome.pop(remove_idx)
                    
    return genome


def genetic_algorithm(problem, distance_matrix, non_random_sol_ratio, add_zero_prob):
    config = GAConfig(problem)
    population = init_population(problem, config.POPULATION_SIZE, distance_matrix, non_random_sol_ratio)
    baseline_ind = get_baseline_individual(problem)
    population[0] = baseline_ind 
    
    baseline_cost_check = cost(problem, baseline_ind, distance_matrix)
    print(f"[DEBUG] Baseline inserita. Costo iniziale: {baseline_cost_check}")
    fitnesses = [cost(problem, ind, distance_matrix) for ind in population]
    
    best_idx = np.argmin(fitnesses)
    global_best_genome = copy.deepcopy(population[best_idx])
    global_best_cost = fitnesses[best_idx]
    
    history = [global_best_cost]
    
    iterator = tqdm(range(config.GENERATIONS)) 
    for gen in iterator:
        sorted_indices = np.argsort(fitnesses)
        new_population = [population[i] for i in sorted_indices[:config.ELITISM_SIZE]]
        
        while len(new_population) < config.POPULATION_SIZE:
            parent1 = tournament_selection(population, fitnesses, k=config.TOURNAMENT_SIZE)
            parent2 = tournament_selection(population, fitnesses, k=config.TOURNAMENT_SIZE)
            
            child = copy.deepcopy(parent1)
            if random.random() < config.CROSSOVER_RATE:
                child = crossover(parent1, parent2)
            
            child = mutation(child, config.MUTATION_RATE, problem._beta, add_zero_prob)
            new_population.append(child)
            
        population = new_population

        fitnesses = [cost(problem, ind, distance_matrix) for ind in population]
        
        current_best_idx = np.argmin(fitnesses)
        current_best_cost = fitnesses[current_best_idx]
        
        if current_best_cost < global_best_cost:
            global_best_cost = current_best_cost
            global_best_genome = copy.deepcopy(population[current_best_idx])
            
        history.append(global_best_cost)
        
    return global_best_genome, global_best_cost, history

In [None]:
problem_instance = Problem(100, density=0.2, alpha=1, beta=1)
dm = get_distance_matrix(problem_instance)
non_random_sol_ratio, add_zero_prob=get_betaVariant(problem_instance._beta, len(problem_instance._graph.nodes))
best_sol, best_cost, hist = genetic_algorithm(problem_instance, dm, non_random_sol_ratio, add_zero_prob)
print(f"Costo finale: {best_cost}")
print(f"Costo baseline: {problem_instance.baseline()}")

[DEBUG] Baseline inserita. Costo iniziale: 25266.405618510726


100%|██████████| 1000/1000 [00:46<00:00, 21.48it/s]

Costo finale: 25260.133344975642
Costo baseline: 25266.40561851072





: 

In [178]:
problem_instance = Problem(100, density=0.2, alpha=1, beta=2)
dm = get_distance_matrix(problem_instance)
non_random_sol_ratio, add_zero_prob=get_betaVariant(problem_instance._beta, len(problem_instance._graph.nodes))
best_sol, best_cost, hist = genetic_algorithm(problem_instance, dm, non_random_sol_ratio, add_zero_prob)
print(f"Costo finale: {best_cost}")
print(f"Costo baseline: {problem_instance.baseline()}")

[DEBUG] Baseline inserita. Costo iniziale: 9549171.574361566


100%|██████████| 1000/1000 [00:41<00:00, 24.03it/s]

Costo finale: 9057444.982573431
Costo baseline: 5334401.927002504





In [None]:
problem_instance = Problem(100, density=0.2, alpha=1, beta=2)
baseline_ind = get_baseline_individual(len(problem_instance._graph.nodes))
cost_ga = cost(problem_instance, baseline_ind, dm)
cost_real = problem_instance.baseline()

print(f"1. Costo calcolato dal GA per la Baseline: {cost_ga}")
print(f"2. Costo ufficiale (problem.baseline()):   {cost_real}")

diff = abs(cost_ga - cost_real)
if diff < 1.0:
    print("\nLe funzioni di costo coincidono.")
else:
    print(f"\nERRORE: C'è una discrepanza di {diff}!")


AttributeError: 'int' object has no attribute '_graph'