# Assignment 3 #

We are given three columns of integers with a row for each node. The first two columns contain x and y coordinates of the node positions in a plane. The third column contains node costs. The goal is to select exactly 50% of the nodes (if the number of nodes is odd we round the number of nodes to be selected up) and form a Hamiltonian cycle (closed path) through this set of nodes such that the sum of the total length of the path plus the total cost of the selected nodes is minimized. The distances between nodes are calculated as Euclidean distances rounded mathematically to integer values. The distance matrix should be calculated just after reading an instance and then only the distance matrix (no nodes coordinates) should be accessed by optimization methods to allow instances defined only by distance matrices. 

## Read the data ##

In [1]:
import pandas as pd
import numpy as np
from numpy.typing import ArrayLike, NDArray
import matplotlib.pyplot as plt
import random
from tqdm import tqdm
from math import sqrt

In [2]:
# read data into dataframes
instances = {
    "A": pd.read_csv("data/TSPA.csv", sep=';', header=None, names=["x", "y", "cost"]),
    "B": pd.read_csv("data/TSPB.csv", sep=';', header=None, names=["x", "y", "cost"]),
    "C": pd.read_csv("data/TSPC.csv", sep=';', header=None, names=["x", "y", "cost"]),
    "D": pd.read_csv("data/TSPD.csv", sep=';', header=None, names=["x", "y", "cost"]),
}

In [3]:
def calculate_distance_matrix(df: pd.DataFrame) -> NDArray[np.int32]:
    """
    Calculate the distance matrix from the dataframe.
    The dataframe contains 'x' and 'y' columns for the coordinates.
    The distances are Euclidean, rounded to the nearest integer + the cost of the destination node.
    """
    coordinates = df[['x', 'y']].to_numpy()
    dist_matrix = np.zeros(shape=(len(df), len(df)))
    for i in range(len(coordinates)):
        for j in range(len(coordinates)):
            dist_matrix[i, j] = round(sqrt((coordinates[i, 0] - coordinates[j, 0])**2 + (coordinates[i, 1] - coordinates[j, 1])**2))
    return dist_matrix

In [4]:
distances_matrices = {
    "A": calculate_distance_matrix(instances["A"]),
    "B": calculate_distance_matrix(instances["B"]),
    "C": calculate_distance_matrix(instances["C"]),
    "D": calculate_distance_matrix(instances["D"])
}

costs = {
    "A": instances["A"]["cost"].to_numpy(),
    "B": instances["B"]["cost"].to_numpy(),
    "C": instances["C"]["cost"].to_numpy(),
    "D": instances["D"]["cost"].to_numpy()
}

In [5]:
def visualize_selected_route(
    selected_nodes_indices: ArrayLike, 
    dataframe: pd.DataFrame,
    title: str) -> None:
    """
    Visualize the selected route returned by the algorithm, including the cost of each node represented by a colormap.

    Parameters:
    selected_nodes_indices (list): Indices of the selected nodes in the route.
    dataframe (DataFrame): DataFrame containing 'x', 'y', and 'cost' columns for each node.
    """
    x = dataframe["x"].to_numpy()
    y = dataframe["y"].to_numpy()
    costs = dataframe["cost"].to_numpy()

    cmap = plt.cm.get_cmap('viridis')
    norm = plt.Normalize(vmin=min(costs), vmax=max(costs))

    plt.figure(figsize=(15, 10))
    scatter = plt.scatter(x, y, c=costs, cmap=cmap, norm=norm, s=100)
    plt.colorbar(scatter, label='Node Cost')

    for i, node in enumerate(selected_nodes_indices):
        start_node = selected_nodes_indices[i]
        end_node = selected_nodes_indices[(i + 1) % len(selected_nodes_indices)]
        plt.plot([x[start_node], x[end_node]], [y[start_node], y[end_node]], 'k-', lw=1)

    plt.title(title, fontsize=18)
    plt.xlabel('X Coordinate', fontsize=14)
    plt.ylabel('Y Coordinate', fontsize=14)
    plt.grid(True)
    plt.show()

In [6]:
def objective_function(solution: list[int], dist_matrix: list[list[int]], costs: list[int]) -> int:
    total_score = 0
    n = len(solution)
    for x in range(n):
        total_score += dist_matrix[solution[x - 1]][solution[x]]
        total_score += costs[solution[x]]
    return total_score

In [22]:
# A function that generates a random solution
def generate_random_solution(n: int) -> list[int]:
    """
    Generate a random solution for a given number of nodes.

    :param n: The number of nodes.
    :return: A list of nodes representing the solution.
    """
    return random.sample(range(0, n * 2 + 1), n)

In [7]:
def two_edges_exchange(current_solution: list[int], 
                       current_distance: float, 
                       distance_matrix: list[list[int]]):
    """
    Generate new solutions by exchanging two edges in the current solution.

    :param current_solution: List of nodes in the current solution.
    :param current_score: The score of the current solution.
    :param distance_matrix: 2D list representing the distances between nodes.
    :return: A list of tuples where each tuple contains a new solution and its score.
    """
    n = len(current_solution)
    new_solutions = []

    for i in range(n - 2):
        for j in range(i + 2, n):
            # Create a new solution by reversing the order of nodes between i and j
            new_solution = current_solution[:i + 1] + current_solution[i + 1:j + 1][::-1] + current_solution[j + 1:]

            score_delta = (
                -distance_matrix[current_solution[i]][current_solution[i + 1]]
                -distance_matrix[current_solution[j]][current_solution[(j + 1) % n]]
                +distance_matrix[current_solution[i]][current_solution[j]]
                +distance_matrix[current_solution[i + 1]][current_solution[(j + 1) % n]]
            )
            new_score = current_distance + score_delta

            new_solutions.append((new_solution, 
                                  new_score))
    
    return new_solutions

In [108]:
def two_edges_exchange_advanced(current_solution: list[int], 
                                current_distance: float, 
                                distance_matrix: list[list[int]],
                                start_index: int = 0,
                                direction: str = "right"):
    """
    Generate a new solution by exchanging two edges in the current solution,
    starting from a given index and moving in the specified direction.

    :param current_solution: List of nodes in the current solution.
    :param current_distance: The score of the current solution.
    :param distance_matrix: 2D list representing the distances between nodes.
    :param start_index: The index from which to start searching for a better solution.
    :param direction: The direction in which to search for a better solution ("right" or "left").
    :return: A tuple containing the new solution and its score if it's better,
             otherwise the original solution and score.
    """
    n = len(current_solution)

    # Define the order of iteration based on the direction
    if direction == "right":
        range_i = range(n - 2)
        range_j = lambda i: range(i + 2, n)
    else:  # direction == "left"
        range_i = range(n - 3, -1, -1)
        range_j = lambda i: range(n - 1, i + 1, -1)

    # Convert the linear start index to a pair of indices (i, j)
    count = 0
    for i in range_i:
        for j in range_j(i):
            if count >= start_index:
                # Perform the two-edges exchange from this point
                new_solution = (current_solution[:i + 1] 
                                + current_solution[i + 1:j + 1][::-1] 
                                + current_solution[j + 1:])

                score_delta = (
                    -distance_matrix[current_solution[i]][current_solution[i + 1]]
                    -distance_matrix[current_solution[j]][current_solution[(j + 1) % n]]
                    +distance_matrix[current_solution[i]][current_solution[j]]
                    +distance_matrix[current_solution[i + 1]][current_solution[(j + 1) % n]]
                )
                new_score = current_distance + score_delta

                if new_score < current_distance:
                    return new_solution, new_score

            count += 1  # Increment the counter after checking the condition

    return None


In [8]:
def two_nodes_exchange(selected, score, dist_matrix):
    solutions=[]
    for x in range(len(selected)): 
        for y in range(x+1,len(selected)):
            temp=selected[:]
            temp_score=score                  
            #usunięcie i zmienienie edgy poprzedzających node
            temp_score-=dist_matrix[temp[x-1]][temp[x]]
            temp_score+=dist_matrix[temp[x-1]][temp[y]]
            temp_score-=dist_matrix[temp[y-1]][temp[y]]
            temp_score+=dist_matrix[temp[y-1]][temp[x]]


            #kiedy byłby index x+1 out of range
            if temp[-1]==temp[x]:
                idx=temp[0]
            else: 
                idx=temp[x+1]

            #kiedy byłby index y+1 out of range 
            if temp[-1]==temp[y]:
                idy=temp[0]
            else:
                idy=temp[y+1]
            
            #usunięcie i zmienienie edgy następujących po node
            temp_score-=dist_matrix[temp[x]][idx]
            temp_score-=dist_matrix[temp[y]][idy]
            temp_score+=dist_matrix[temp[x]][idy]
            temp_score+=dist_matrix[temp[y]][idx]
    
            #podmiana nodów
            temp[x],temp[y]=temp[y],temp[x]


            solutions.append((temp,temp_score))
    return solutions

In [113]:
random_solution = generate_random_solution(7)
score = objective_function(random_solution, distances_matrices["A"], costs["A"])
print(f"Random solution: {random_solution}")
print(f"Score: {score}")

Random solution: [9, 8, 2, 11, 12, 10, 3]
Score: 19686.0


In [127]:
def inter_route_exchange_simple(selected, unselected, score, distance_matrix, costs):
    new_solutions = []

    # Assume node indices are 0-based for distance_matrix
    for selected_node in selected:
        for new_node in unselected:
            new_solution = selected.copy()
            replaced_node_index = selected.index(selected_node)
            
            # Assuming 'selected' and 'unselected' contain 0-based indices already
            new_solution[replaced_node_index] = new_node

            prev_node_index = (replaced_node_index - 1) % len(selected)
            next_node_index = (replaced_node_index + 1) % len(selected)

            # Calculate score_delta considering 0-based indices for distance_matrix
            score_delta = (
                -distance_matrix[selected[prev_node_index]][selected_node]
                -distance_matrix[selected_node][selected[next_node_index]]
                +distance_matrix[selected[prev_node_index]][new_node]
                +distance_matrix[new_node][selected[next_node_index]]
                -costs[selected_node]
                +costs[new_node]
            )
            new_score = score + score_delta

            new_solutions.append((new_solution, new_score))

    return new_solutions


In [138]:
def inter_route_exchange(selected, unselected, score, distance_matrix, costs, start_index=0, direction="right"):
    n_selected = len(selected)
    n_unselected = len(unselected)
    
    # Create all possible combinations of selected and unselected nodes
    all_combinations = [(i, j) for i in range(n_selected) for j in range(n_unselected)]

    # Determine the order of combinations based on the direction
    if direction == "left":
        all_combinations = all_combinations[::-1]  # Reverse the list for 'left' direction

    # Start from the given start_index
    for i, j in all_combinations[start_index:]:
        selected_node = selected[i]
        new_node = unselected[j]

        # Replace the selected node with the new node
        new_solution = selected.copy()
        new_solution[i] = new_node

        prev_node_index = (i - 1) % n_selected
        next_node_index = (i + 1) % n_selected

        # Calculate score_delta considering 0-based indices for distance_matrix
        score_delta = (
            -distance_matrix[selected[prev_node_index]][selected_node]
            -distance_matrix[selected_node][selected[next_node_index]]
            +distance_matrix[selected[prev_node_index]][new_node]
            +distance_matrix[new_node][selected[next_node_index]]
            -costs[selected_node]
            +costs[new_node]
        )
        new_score = score + score_delta

        # If a better solution is found, return immediately
        if new_score < score:
            return new_solution, new_score

    # If no better solution is found, return None
    return None


In [130]:
print(f"Random solution: {random_solution}")
print(f"Score: {score}")

Random solution: [9, 8, 2, 11, 12, 10, 3]
Score: 19686.0


In [139]:
inter_route_exchange_simple(random_solution, [1,5,6], score, distances_matrices["A"], costs["A"]) 

[([1, 8, 2, 11, 12, 10, 3], 19006.0),
 ([5, 8, 2, 11, 12, 10, 3], 19425.0),
 ([6, 8, 2, 11, 12, 10, 3], 19083.0),
 ([9, 1, 2, 11, 12, 10, 3], 18609.0),
 ([9, 5, 2, 11, 12, 10, 3], 19773.0),
 ([9, 6, 2, 11, 12, 10, 3], 19464.0),
 ([9, 8, 1, 11, 12, 10, 3], 17242.0),
 ([9, 8, 5, 11, 12, 10, 3], 18188.0),
 ([9, 8, 6, 11, 12, 10, 3], 18682.0),
 ([9, 8, 2, 1, 12, 10, 3], 19174.0),
 ([9, 8, 2, 5, 12, 10, 3], 19017.0),
 ([9, 8, 2, 6, 12, 10, 3], 18572.0),
 ([9, 8, 2, 11, 1, 10, 3], 17532.0),
 ([9, 8, 2, 11, 5, 10, 3], 19100.0),
 ([9, 8, 2, 11, 6, 10, 3], 18577.0),
 ([9, 8, 2, 11, 12, 1, 3], 19017.0),
 ([9, 8, 2, 11, 12, 5, 3], 18683.0),
 ([9, 8, 2, 11, 12, 6, 3], 17907.0),
 ([9, 8, 2, 11, 12, 10, 1], 19031.0),
 ([9, 8, 2, 11, 12, 10, 5], 19765.0),
 ([9, 8, 2, 11, 12, 10, 6], 18968.0)]

In [141]:
inter_route_exchange(random_solution, [1, 5, 6], score, distances_matrices["A"], costs["A"], start_index=2, direction="left")

([9, 8, 2, 11, 12, 10, 1], 19031.0)

# Repository for Local Search solution finder #

In [148]:
from abc import ABC, abstractmethod

class LocalSearch(ABC):
    """
    Abstract class for local search algorithms.
    """
    def __init__(self, 
                 initial_solution: list[int], 
                 distance_matrix: list[list[int]], 
                 costs: list[int]) -> None:
        """
        :param initial_solution: The initial solution.
        :param initial_score: The score of the initial solution.
        :param distance_matrix: 2D list representing the distances between nodes.
        """
        self.initial_solution = initial_solution
        self.initial_score = objective_function(initial_solution, distance_matrix, costs)
        self.distance_matrix = distance_matrix
        self.costs = costs
        
        self.current_solution: list[int] = initial_solution
        self.current_score = self.initial_score

    # two method for intra moves
    @abstractmethod
    def two_edges_exchange(self, index: int = 0, direction: str = "right") -> tuple[list[int], int] | None:
        """This method should return a new solution and its score if a better solution is found, otherwise None.
        For a greedy algorithm, the first better solution found should be returned. For a steepest local search,
        the best solution among all possible solutions should be returned, if there is no better solution, just
        return None. For a steepest local search, both neighborhood moves will return one solution, then we will
        take the better one among these two solutions.

        Parameters
        ----------
        current_solution : list[int]
        current_distance : int
        distance_matrix : list[list[int]]

        Returns
        -------
        tuple[list[int], int] | None
            A single better solution founded by the algorithm, or None if no better solution is found.
        """
        raise NotImplementedError
    
    @abstractmethod
    def two_nodes_exchange(self, start_index=0, direction='right') -> tuple[list[int], int] | None:
        raise NotImplementedError
    
    # one method for inter move
    @abstractmethod
    def inter_route_exchange(self, unselected: list[int], start_index=0, direction="right") -> tuple[list[int], int] | None:
        raise NotImplementedError
    
    @abstractmethod
    def run(self, *args, **kwargs) -> None:
        """
        Run the algorithm.
        """
        pass

# Greedy Local Search #

In [151]:
class GreedyLocalSearch(LocalSearch):
    def __init__(self, 
                 initial_solution: list[int], 
                 distance_matrix: list[list[int]], 
                 costs: list[int]) -> None:
        super().__init__(initial_solution, distance_matrix, costs)
        
    def two_edges_exchange(self,
                            start_index: int = 0,
                            direction: str = "right"):
        """
        Generate a new solution by exchanging two edges in the current solution,
        starting from a given index and moving in the specified direction.

        :param current_solution: List of nodes in the current solution.
        :param current_distance: The score of the current solution.
        :param distance_matrix: 2D list representing the distances between nodes.
        :param start_index: The index from which to start searching for a better solution.
        :param direction: The direction in which to search for a better solution ("right" or "left").
        :return: A tuple containing the new solution and its score if it's better,
                otherwise the original solution and score.
        """
        n = len(self.current_solution)

        if direction == "right":
            range_i = range(n - 2)
            range_j = lambda i: range(i + 2, n)
        else:  # direction == "left"
            range_i = range(n - 3, -1, -1)
            range_j = lambda i: range(n - 1, i + 1, -1)

        count = 0
        for i in range_i:
            for j in range_j(i):
                if count >= start_index:
                    new_solution = (self.current_solution[:i + 1] 
                                    + self.current_solution[i + 1:j + 1][::-1] 
                                    + self.current_solution[j + 1:])

                    score_delta = (
                        -self.distance_matrix[self.current_solution[i]][self.current_solution[i + 1]]
                        -self.distance_matrix[self.current_solution[j]][self.current_solution[(j + 1) % n]]
                        +self.distance_matrix[self.current_solution[i]][self.current_solution[j]]
                        +self.distance_matrix[self.current_solution[i + 1]][self.current_solution[(j + 1) % n]]
                    )
                    new_score = self.current_score + score_delta

                    if new_score < self.current_score:
                        return new_solution, new_score

                count += 1  # Increment the counter after checking the condition

        return None
    
    def two_nodes_exchange(self, start_index=0, direction='right'):
        n = len(self.current_solution)
        total_moves = n * (n - 1) // 2  # Total number of possible swaps

        index_pairs = [(x, y) for x in range(n) for y in range(x+1, n)]
        
        # Adjust the indices list based on the direction
        if direction == 'left':
            index_pairs = index_pairs[::-1]
            start_index = total_moves - start_index - 1 

        for count, (x, y) in enumerate(index_pairs[start_index:], start=start_index):
            temp = self.current_solution[:]
            temp_score = self.current_score

            idx_next = (x + 1) % n
            idy_next = (y + 1) % n

            temp_score -= self.distance_matrix[temp[x-1]][temp[x]] + self.distance_matrix[temp[x]][temp[idx_next]]
            temp_score -= self.distance_matrix[temp[y-1]][temp[y]] + self.distance_matrix[temp[y]][temp[idy_next]]

            temp_score += self.distance_matrix[temp[x-1]][temp[y]] + self.distance_matrix[temp[y]][temp[idx_next]]
            temp_score += self.distance_matrix[temp[y-1]][temp[x]] + self.distance_matrix[temp[x]][temp[idy_next]]

            temp[x], temp[y] = temp[y], temp[x]

            # If the new score is better, return the new solution immediately
            if temp_score < score:
                return temp, temp_score

        # If no improvement is found, return None
        return None
    
    
    def inter_route_exchange(self, unselected, start_index=0, direction="right"):
        n_selected = len(self.current_solution)
        n_unselected = len(unselected)
        
        # Create all possible combinations of selected and unselected nodes
        all_combinations = [(i, j) for i in range(n_selected) for j in range(n_unselected)]

        if direction == "left":
            all_combinations = all_combinations[::-1]

        for i, j in all_combinations[start_index:]:
            selected_node = self.current_solution[i]
            new_node = unselected[j]

            new_solution = self.current_solution.copy()
            new_solution[i] = new_node

            prev_node_index = (i - 1) % n_selected
            next_node_index = (i + 1) % n_selected

            score_delta = (
                -self.distance_matrix[self.current_solution[prev_node_index]][selected_node]
                -self.distance_matrix[selected_node][self.current_solution[next_node_index]]
                +self.distance_matrix[self.current_solution[prev_node_index]][new_node]
                +self.distance_matrix[new_node][self.current_solution[next_node_index]]
                -self.costs[selected_node]
                +self.costs[new_node]
            )
            new_score = score + score_delta

            if new_score < score:
                return new_solution, new_score

        # If no better solution is found, return None
        return None
    
    def run(self, *args, **kwargs) -> None:
        return super().run(*args, **kwargs)


In [154]:
g = GreedyLocalSearch(initial_solution=generate_random_solution(10), 
                      distance_matrix=distances_matrices["A"], 
                      costs=costs["A"])

g.two_edges_exchange()
g.two_nodes_exchange()
g.inter_route_exchange([1, 5, 6])

([1, 15, 14, 7, 5, 19, 17, 9, 11, 20], 19041.0)

# Steepest Local Search #

In [164]:
class SteepestLocalSearch(LocalSearch):
    def __init__(self, 
                 initial_solution: list[int], 
                 distance_matrix: list[list[int]], 
                 costs: list[int]) -> None:
        super().__init__(initial_solution, distance_matrix, costs)
        
    def two_edges_exchange(self, start_index: int = 0, direction: str = "right"):
        """
        Generate new solutions by exchanging two edges in the current solution.

        :param current_solution: List of nodes in the current solution.
        :param current_score: The score of the current solution.
        :param distance_matrix: 2D list representing the distances between nodes.
        :return: A list of tuples where each tuple contains a new solution and its score.
        """
        n = len(self.current_solution)
        new_solutions = []

        for i in range(n - 2):
            for j in range(i + 2, n):
                # Create a new solution by reversing the order of nodes between i and j
                new_solution = self.current_solution[:i + 1] + self.current_solution[i + 1:j + 1][::-1] + self.current_solution[j + 1:]

                score_delta = (
                    -self.distance_matrix[self.current_solution[i]][self.current_solution[i + 1]]
                    -self.distance_matrix[self.current_solution[j]][self.current_solution[(j + 1) % n]]
                    +self.distance_matrix[self.current_solution[i]][self.current_solution[j]]
                    +self.distance_matrix[self.current_solution[i + 1]][self.current_solution[(j + 1) % n]]
                )
                new_score = self.current_score + score_delta

                new_solutions.append((new_solution, 
                                    new_score))
        sorted_solutions = sorted(new_solutions, key=lambda x: x[1])
        return sorted_solutions[0]
    
    def two_nodes_exchange(self, start_index=0, direction='right'):
        new_solutions=[]
        n = len(self.current_solution)
        for i in range(n): 
            for j in range(i+1,n):
                new_solution=self.current_solution[:]
                
                score_delta = (
                    #change of edges preceeding the nodes
                    -self.distance_matrix[self.current_solution[i-1]][self.current_solution[i]]
                    -self.distance_matrix[self.current_solution[j-1]][self.current_solution[j]]
                    +self.distance_matrix[self.current_solution[i-1]][self.current_solution[j]]
                    +self.distance_matrix[self.current_solution[j-1]][self.current_solution[i]]

                    #change of edges after the nodes
                    -self.distance_matrix[self.current_solution[i]][self.current_solution[(i+1)%n]]   
                    -self.distance_matrix[self.current_solution[j]][self.current_solution[(j+1)%n]]
                    +self.distance_matrix[self.current_solution[i]][self.current_solution[(j+1)%n]]
                    +self.distance_matrix[self.current_solution[j]][self.current_solution[(i+1)%n]]                        
                )                
        
                #exchange of nodes
                new_solution[i],new_solution[j]=new_solution[j],new_solution[i]

                new_score = self.current_score + score_delta
                new_solutions.append((new_solution, 
                                    new_score))
        sorted_solutions = sorted(new_solutions, key=lambda x: x[1])
        return sorted_solutions[0]
    
    def inter_route_exchange(self, unselected, start_index=0, direction="right"):
        new_solutions = []

        # Assume node indices are 0-based for distance_matrix
        for selected_node in self.current_solution:
            for new_node in unselected:
                new_solution = self.current_solution.copy()
                replaced_node_index = self.current_solution.index(selected_node)
                
                # Assuming 'selected' and 'unselected' contain 0-based indices already
                new_solution[replaced_node_index] = new_node

                prev_node_index = (replaced_node_index - 1) % len(self.current_solution)
                next_node_index = (replaced_node_index + 1) % len(self.current_solution)

                # Calculate score_delta considering 0-based indices for distance_matrix
                score_delta = (
                    -self.distance_matrix[self.current_solution[prev_node_index]][selected_node]
                    -self.distance_matrix[selected_node][self.current_solution[next_node_index]]
                    +self.distance_matrix[self.current_solution[prev_node_index]][new_node]
                    +self.distance_matrix[new_node][self.current_solution[next_node_index]]
                    -self.costs[selected_node]
                    +self.costs[new_node]
                )
                new_score = self.current_score + score_delta

                new_solutions.append((new_solution, new_score))
        sorted_solutions = sorted(new_solutions, key=lambda x: x[1])
        return sorted_solutions[0]
    
    def run(self, *args, **kwargs) -> None:
        return super().run(*args, **kwargs)

In [170]:
s = SteepestLocalSearch(initial_solution=generate_random_solution(10),
                        distance_matrix=distances_matrices["A"],
                        costs=costs["A"])
s.two_edges_exchange()
s.two_nodes_exchange()
s.inter_route_exchange([1, 5, 6])

([7, 5, 1, 9, 1, 13, 6, 18, 8, 4], 24375.0)