In [None]:
%load_ext autoreload
%autoreload 2

In [None]:

from pathlib import Path

import pandas as pd
from joblib import Parallel, delayed

from hamiltonian_cycle.algorithms.lab1 import init_random_solution
from hamiltonian_cycle.algorithms.lab7 import LargeNeighborhoodSearch
from hamiltonian_cycle.algorithms.lab2 import init_greedy_2regret_weighted_cycle
from hamiltonian_cycle.algorithms.lab3_4 import LocalSearch
from hamiltonian_cycle.costs import dm, function_cost
from hamiltonian_cycle.plots import plot_solution
import matplotlib.pyplot as plt
import random
from joblib import Parallel, delayed
from tqdm.auto import tqdm
import numpy as np
import time


In [None]:
def read_dataset_csv(csv_path: Path) -> pd.DataFrame:
    return pd.read_csv(csv_path, sep=";", names=["x", "y", "cost"])


DATA_DIR = Path("../data").resolve()

ds_a = read_dataset_csv(DATA_DIR / "TSPA.csv")
ds_b = read_dataset_csv(DATA_DIR / "TSPB.csv")

dm_a = dm(ds_a)
dm_b = dm(ds_b)

In [None]:
def find_common_edges(parent1: list[int], parent2: list[int]):
    size = len(parent1)

    # Convert edges to unordered pairs
    parent1_edges = [
        tuple(sorted((parent1[i], parent1[(i + 1) % size]))) for i in range(size)
    ]
    parent2_edges = [
        tuple(sorted((parent2[i], parent2[(i + 1) % size]))) for i in range(size)
    ]
    common_edges = set(parent1_edges).intersection(parent2_edges)

    return common_edges


def combine_common_edges(
    parent1: list[int], parent2: list[int], common_edges: set
) -> list[int]:
    # Randomly select either parent1 or parent2 as the starting point
    starting_parent = random.choice([parent1, parent2])

    # Start forming the child solution
    child = []
    # Add nodes from common edges in the order they appear in `starting_parent`
    for node in starting_parent:
        if any(node in edge for edge in common_edges):
            child.append(node)
    return child


def recombination_operator_1(
    parent1: list[int], parent2: list[int], all_nodes: list[int]
) -> list[int]:
    common_edges = find_common_edges(parent1, parent2)

    child = combine_common_edges(parent1, parent2, common_edges)

    # Fill remaining nodes randomly
    remaining_nodes = [node for node in all_nodes if node not in child]
    random.shuffle(remaining_nodes)

    # Complete the solution to the required size
    child.extend(remaining_nodes[: len(parent1) - len(child)])
    return child


def recombination_operator_2(
    parent1: list[int], parent2: list[int], ds: pd.DataFrame, dm: pd.DataFrame
) -> list[int]:
    common_edges = find_common_edges(parent1, parent2)
    child = combine_common_edges(parent1, parent2, common_edges)
    child = init_greedy_2regret_weighted_cycle(
        ds,
        dm,
        start=0,
        w_cost=0.5,
        w_regret=0.5,
        initial_solution=child,
    )
    return child.index.tolist()


def hea(
    ds: pd.DataFrame,
    dm: pd.DataFrame,
    population_size: int = 20,
    time_budget: float = 555,
    use_operator_1_prob: float = 0.5,
    with_local_search_after_recombination: bool = True,
):
    ls = LocalSearch(strategy="steepest", intra_search="edge")

    def create_initial_solution(ds, dm):
        return ls(ds, dm, init_random_solution(ds, dm, 0).index.tolist()).index.tolist()

    population = [create_initial_solution(ds, dm) for _ in range(population_size)]

    num_iterations = 0
    start_time = time.time()
    while time.time() - start_time < time_budget:
        num_iterations += 1

        parent_ids = np.random.choice(range(population_size), 2, replace=False)
        parent1, parent2 = population[parent_ids[0]], population[parent_ids[1]]

        if random.random() < use_operator_1_prob:
            child_solution = recombination_operator_1(parent1, parent2, list(ds.index))
        else:
            child_solution = recombination_operator_2(parent1, parent2, ds, dm)

        if with_local_search_after_recombination:
            child_solution = ls(ds, dm, child_solution).index.tolist()

        child_cost = function_cost(ds.loc[child_solution])
        population_costs = [function_cost(ds.loc[s]) for s in population]
        # if child cost is smaller than the worst solution in the population, replace worst with the child
        # here also it ensures that population is unique
        if child_cost < max(population_costs) and child_cost not in population_costs:
            population.pop(np.argmax(population_costs))
            population.append(child_solution)

    return population[
        np.argmin(function_cost(ds.loc[s]) for s in population)
    ], num_iterations


class HEAMetrics:
    def __init__(self, solution: list[int], num_iterations: float):
        self.cost = function_cost(solution)
        self.solution = list(solution.index)
        self.num_iterations = num_iterations


def run_hea_a() -> HEAMetrics:
    best_solution, num_iterations = hea(ds_a, dm_a)
    return HEAMetrics(ds_a.loc[best_solution], num_iterations)


def run_hea_b() -> HEAMetrics:
    best_solution, num_iterations = hea(ds_b, dm_b)
    return HEAMetrics(ds_b.loc[best_solution], num_iterations)


HEA_RUNS = 20

## Results on DataSet A

In [None]:
metrics_a: list[HEAMetrics] = Parallel(n_jobs=-1)(
    delayed(run_hea_a)() for _ in range(HEA_RUNS)
)

In [None]:
minimum_a = min(metrics_a, key=lambda x: x.cost)
mean_a = sum([metric.cost for metric in metrics_a]) / len(metrics_a)
maximum_a = max(metrics_a, key=lambda x: x.cost)
mean_n_iterations_a = sum([metric.num_iterations for metric in metrics_a]) / len(metrics_a)

print(f"Best solution: {minimum_a.solution}")
print("Objective function statistics:")
print(f"{minimum_a.cost = }\n{mean_a = }\n{maximum_a.cost= }")
print(f"Mean Number of iterations: {mean_n_iterations_a}")
plot_solution(ds_a, minimum_a.solution, title="HEA on dataset A")

In [None]:
metrics_b: list[HEAMetrics] = Parallel(n_jobs=-1)(
    delayed(run_hea_b)() for _ in range(HEA_RUNS)
)

In [None]:
minimum_b = min(metrics_b, key=lambda x: x.cost)
mean_b = sum([metric.cost for metric in metrics_b]) / len(metrics_b)
maximum_b = max(metrics_b, key=lambda x: x.cost)
mean_n_iterations_b = sum([metric.num_iterations for metric in metrics_b]) / len(metrics_b)

print(f"Best solution: {minimum_b.solution}")
print("Objective function statistics:")
print(f"{minimum_b.cost = }\n{mean_b = }\n{maximum_b.cost= }")
print(f"Mean Number of iterations: {mean_n_iterations_b}")
plot_solution(ds_b, minimum_b.solution, title="HEA on dataset B")