In [50]:
import six
import sys
sys.modules['sklearn.externals.six'] = six
import mlrose
import pandas as pd
import random
from tqdm import tqdm
from threading import Thread, Lock, Semaphore
import time
from sklearn.model_selection import ParameterGrid
import numpy as np
import glob
import ast
import re

pd.set_option('display.max_colwidth', 1000)

NUM_SEEDS = 50
RANDOM_SEED = 42
NUM_TUNING_REPEATS = 10
random.seed(RANDOM_SEED)

In [None]:
output = []

genetic_param_grid = {
    'max_attempts': [10, 20, 50],
    'pop_size': [100, 200, 300, 400],
    'mutation_prob': [0.1, 0.2, 0.3, 0.4],
}

random_hill_param_grid = {
    'max_attempts': [10, 20, 50],
}

simulated_annealing_param_grid = {
    'max_attempts': [10, 20, 50],
    'schedule': [mlrose.ExpDecay(init_temp,exp_cost,min_temp) for init_temp in np.arange(0.5,1.1,.2) for exp_cost in np.arange(0.001, 0.011, 0.001) for min_temp in np.arange(0.001, 0.011, 0.02)]
    + [mlrose.GeomDecay(init_temp,decay,min_temp) for init_temp in np.arange(0.5,1.1,.2) for decay in [0.90, 0.92, 0.95, 0.97, 0.99, 0.999] for min_temp in np.arange(0.001, 0.011, 0.02)]
    + [mlrose.ArithDecay(init_temp,decay,min_temp) for init_temp in np.arange(0.5,1.1,.2) for decay in np.arange(0.001, 0.011, 0.002) for min_temp in np.arange(0.001, 0.011, 0.02)]
}

# Function to calculate Euclidean distance
def euclidean_distance(coord1, coord2):
    return np.sqrt((coord1[0] - coord2[0]) ** 2 + (coord1[1] - coord2[1]) ** 2)

# Define the fitness function using the inverted distances
def max_tsp_fitness(state,coords):
    total_distance = 0
    for i in range(len(state) - 1):
        dist = euclidean_distance(coords[state[i]], coords[state[i + 1]])
        total_distance += dist
    total_distance += euclidean_distance(coords[state[-1]], coords[state[0]])  # Complete the loop

    # Invert the distance to convert it into a maximization problem
    max_possible_distance = len(state) * max([euclidean_distance(c1, c2) for c1 in coords for c2 in coords])
    return max_possible_distance - total_distance


def tune_ts_algorithm(ts_length, alg_name, alg_ptr, param_grid):
    for params, _ in tqdm([(params, repeat) for params in ParameterGrid(param_grid) for repeat in range(0, NUM_TUNING_REPEATS)], desc=f"length={ts_length}, alg={alg_name}"):
        coord_list = [(random.randint(0, 1000),random.randint(0,1000)) for _ in range(0, ts_length)]
        fitness = mlrose.CustomFitness(max_tsp_fitness, problem_type="tsp",coords=coord_list)
        problem_fit = mlrose.TSPOpt(length=len(coord_list), fitness_fn=fitness, maximize=True)
        _, fitness = alg_ptr(problem_fit, **params)
        if alg_name != "simulated_annealing":
            output.append([alg_name, ts_length, params, fitness])
            continue

        if isinstance(params["schedule"], mlrose.ExpDecay):
            output.append([alg_name, ts_length, params | {"init_temp": params["schedule"].init_temp, "exp_cost": params["schedule"].exp_const, "min_temp": params["schedule"].min_temp}, fitness])
        elif isinstance(params["schedule"], mlrose.GeomDecay):
            output.append([alg_name, ts_length, params | {"init_temp": params["schedule"].init_temp, "decay": params["schedule"].decay, "min_temp": params["schedule"].min_temp}, fitness])
        elif isinstance(params["schedule"], mlrose.ArithDecay):
            output.append([alg_name, ts_length, params | {"init_temp": params["schedule"].init_temp, "decay": params["schedule"].decay, "min_temp": params["schedule"].min_temp}, fitness])
        
for ts_length in [5,10,25,50,75,100]:
    tune_ts_algorithm(ts_length, "genetic_alg", mlrose.genetic_alg, genetic_param_grid)
    tune_ts_algorithm(ts_length, "random_hill_climbing", mlrose.random_hill_climb, random_hill_param_grid)
    tune_ts_algorithm(ts_length, "simulated_annealing", mlrose.simulated_annealing, simulated_annealing_param_grid)

output_df = pd.DataFrame(output,columns=["alg_name", "ts_length", "params", "fitness"])
output_df.to_csv(f"travelling_salesman_parameter_tuning_{time.time()}.csv")

In [41]:
def find_best_parameters(problem_name):
    # Get the list of files
    files = glob.glob(f'{problem_name}_parameter_tuning_*.csv')
    
    # Find the file with the highest number
    latest_file = max(files, key=lambda x: int(x.split('_')[-1].split('.')[0]))
    
    # Load the latest file into a pandas dataframe
    df = pd.read_csv(latest_file)
    
    # Group by the specified columns and calculate the mean of the fitness column
    grouped = df.groupby(['alg_name', 'ts_length', 'params']).fitness.mean().reset_index()
    
    # Find the params with the highest mean fitness score for each alg_name and ts_length
    return grouped.loc[grouped.groupby(['alg_name', 'ts_length']).fitness.idxmax()].reset_index(drop=True)


In [60]:
output = []

for problem_name, (_, row) in [(problem_name, (_, row)) for problem_name in {"queens", "flip_flop"} for (_, row) in find_best_parameters(problem_name).iterrows()]:
    if problem_name == "flip_flop":
        problem = mlrose.FlipFlop()
    elif problem_name=="queens":
        problem = mlrose.Queens()
    else: raise ValueError()
    
    problem_fit = mlrose.DiscreteOpt(length=row["ts_length"], fitness_fn=problem, maximize=True)
    if row["alg_name"]=="genetic_alg":
        _, fitness, curve =  mlrose.genetic_alg(problem_fit, **ast.literal_eval(row["params"]),curve=True)
    elif row["alg_name"]=="random_hill_climbing":
        _, fitness, curve =  mlrose.random_hill_climb(problem_fit, **ast.literal_eval(row["params"]),curve=True)
    elif row["alg_name"]=="simulated_annealing":
        params = ast.literal_eval(row["params"].replace("<","'<").replace(">", ">'"))
        if "ExpDecay" in params["schedule"]:
            schedule = mlrose.ExpDecay(params["init_temp"], params["exp_cost"], params["min_temp"])
        elif "ArithDecay" in params["schedule"]:
            schedule = mlrose.ArithDecay(params["init_temp"], params["decay"], params["min_temp"])
        elif "GeomDecay" in params["schedule"]:
            schedule = mlrose.GeomDecay(params["init_temp"], params["decay"], params["min_temp"])
        else: raise ValueError()

        _, fitness, curve =  mlrose.simulated_annealing(problem_fit, max_attempts=params["max_attempts"], schedule=schedule, curve=True)
    else: raise ValueError()

In [None]:
output = []

def benchmark_and_gather_results(input_length):
    global output
    coord_list = [(random.randint(0, 1000),random.randint(0,1000)) for _ in range(0, length)]

    start_time = time.time()
    fitness_cords= mlrose.TravellingSales(coords=coord_list)
    problem_fit = mlrose.TSPOpt(length=len(coord_list), fitness_fn=fitness_cords, maximize=False)
    _, best_fitness, curve = mlrose.genetic_alg(problem_fit, curve=True)
    execution_time = time.time() - start_time

    output.append([input_length, best_fitness, execution_time, curve])


threads: list[Thread] = []
for length, _ in tqdm([(length, iteration) for length in range(10,101,10) for iteration in range(0,NUM_SEEDS)]):
    benchmark_and_gather_results(length)

output_df = pd.DataFrame(output)
output_df.to_csv(f"travelling_salesman_parameter_performance{time.time()}.csv")