In [6]:
import random
import math
import copy


In [7]:
def read_top_instance(file_path):
    with open(file_path, 'r') as file:
        # Read the first three lines
        n_line = file.readline().strip()
        m_line = file.readline().strip()
        tmax_line = file.readline().strip()

        # Parse n N
        _, N = n_line.split()
        N = int(N)

        # Parse m P
        _, P = m_line.split()
        P = int(P)

        # Parse tmax Tmax
        _, Tmax = tmax_line.split()
        Tmax = float(Tmax)

        # Read the remaining lines for each node
        nodes = []
        for i in range(N):
            line = file.readline().strip()
            if line == '':
                continue  # Skip empty lines
            x_str, y_str, s_str = line.split()
            x = float(x_str)
            y = float(y_str)
            s = float(s_str)
            nodes.append({'id': i, 'x': x, 'y': y, 'score': s})

        return N, P, Tmax, nodes


In [8]:
# define problem instance. 
class ProblemInstance:
    def __init__(self, file_path):
        """
        Initialize the problem instance by reading data from a file.

        :param file_path: Path to the instance data file
        """
        self.N, self.P, self.Tmax, self.nodes = read_top_instance(file_path)
        self.distance_matrix = self.compute_distance_matrix()

    def compute_distance_matrix(self):
        """
        Compute the Euclidean distance matrix keyed by node IDs.

        :return: A dictionary of dictionaries representing the distance matrix, 
                where distance_matrix[id1][id2] = distance between node id1 and id2.
        """
        # Initialize the distance matrix as a nested dictionary
        distance_matrix = {}
        
        # Extract all node IDs and coordinates for convenience
        nodes_info = {node['id']: (node['x'], node['y']) for node in self.nodes}

        for id1, (x1, y1) in nodes_info.items():
            distance_matrix[id1] = {}
            for id2, (x2, y2) in nodes_info.items():
                if id1 == id2:
                    distance_matrix[id1][id2] = 0.0
                else:
                    distance = math.hypot(x1 - x2, y1 - y2)
                    distance_matrix[id1][id2] = distance

        return distance_matrix


In [9]:
class HALNS:
    def __init__(self, file_path, parameters):
        """
        Initialize the HALNS heuristic with the given problem instance file and parameters.

        :param file_path: Path to the TOP instance data file
        :param parameters: A dictionary containing algorithm parameters
        """
        self.problem = ProblemInstance(file_path)
        self.params = parameters

        # Initialize variables
        self.s_best = None
        self.s_adm = None
        self.T = self.params['T0']
        self.seg = 0
        self.iteration_best = 0

        # Initialize scores for strategies and operators
        self.selection_strategies = ['strategy_1', 'strategy_2'] # update here!!!
        self.removal_ops = ['removal_1', 'removal_2'] # update here!!!
        self.insertion_ops = ['insertion_1', 'insertion_2'] # update here!!!

        self.selection_scores = {}
        self.selection_observed_scores = {}
        self.selection_counts = {}

        self.removal_scores = {}
        self.removal_observed_scores = {}
        self.removal_counts = {}

        self.insertion_scores = {}
        self.insertion_observed_scores = {}
        self.insertion_counts = {}

    
    def node_elimination_procedure(self):
        """
        Apply a node elimination procedure.
        This procedure removes nodes that cannot be serviced within the allowed time budget.
        A node i is eliminated if even the direct route s -> i -> e exceeds Tmax.
        """
        print("Applying node elimination procedure...")

        # Extract relevant information from the problem instance
        N = self.problem.N
        distance_matrix = self.problem.distance_matrix
        Tmax = self.problem.Tmax

        start_node = 0
        end_node = N - 1

        # Mandatory nodes (start and end) cannot be eliminated
        # We'll create a new list of nodes that remain feasible after elimination
        feasible_nodes = []

        # For each node, check feasibility:
        for node in self.problem.nodes:
            i = node['id']
            
            # Skip start and end nodes as they must remain
            if i == start_node or i == end_node:
                feasible_nodes.append(node)
                continue

            # Compute minimal route time s -> i -> e
            T_sie = distance_matrix[start_node][i] + distance_matrix[i][end_node]

            if T_sie <= Tmax:
                # This node could potentially be serviced in some feasible route
                feasible_nodes.append(node)
            else:
                # Node cannot be serviced within Tmax, so it is eliminated
                print(f"Eliminating node {i} due to infeasibility (s->i->e exceeds Tmax).")

        # Update the problem instance with the reduced set of nodes
        self.problem.nodes = feasible_nodes
        # Update N to reflect the new number of nodes
        self.problem.N = len(feasible_nodes)

        # Create a dictionary keyed by node id for direct access:
        self.problem.node_dict = {node['id']: node for node in self.problem.nodes}

        print("Node elimination procedure completed.")
    
    def construct_initial_solution(self):
        print("Constructing initial solution using the nearest neighbor algorithm...")

        N = self.problem.N
        start_node = 0
        end_node = N - 1

        unvisited = set(node['id'] for node in self.problem.nodes if node['id'] not in {start_node, end_node})
        paths = [[start_node, end_node] for _ in range(self.problem.P)]
        current_times = []
        for p in range(self.problem.P):
            initial_time = self.problem.distance_matrix[start_node][end_node]
            current_times.append(initial_time)

        current_positions = [start_node for _ in range(self.problem.P)]

        while unvisited:
            improvement = False
            for p in range(self.problem.P):
                last_node = current_positions[p]
                best_node = None
                best_score = -1
                best_distance = float('inf')

                for node_id in unvisited:
                    distance = self.problem.distance_matrix[last_node][node_id]
                    distance_to_end = self.problem.distance_matrix[node_id][end_node]
                    new_total_time = current_times[p] - self.problem.distance_matrix[last_node][end_node] + distance + distance_to_end

                    if new_total_time <= self.problem.Tmax:
                        node_score = self.problem.node_dict[node_id]['score']  # Use node_dict now
                        heuristic = node_score / (distance + 1)
                        if heuristic > best_score:
                            best_score = heuristic
                            best_node = node_id
                            best_distance = distance

                if best_node is not None:
                    paths[p].insert(-1, best_node)
                    unvisited.remove(best_node)
                    current_times[p] = current_times[p] - self.problem.distance_matrix[last_node][end_node] + best_distance + self.problem.distance_matrix[best_node][end_node]
                    current_positions[p] = best_node
                    improvement = True
                    print(f"Path {p + 1}: Added node {best_node} (Score: {self.problem.node_dict[best_node]['score']}, Distance: {best_distance:.2f})")

            if not improvement:
                print("No further nodes can be added without exceeding Tmax.")
                break

        for p in range(self.problem.P):
            if paths[p][-1] != end_node:
                paths[p].append(end_node)
                print(f"Path {p + 1}: Ensured ending at node {end_node}. Total time: {current_times[p]:.2f}")

        self.s_best = paths
        self.s_adm = copy.deepcopy(paths)
        print("Initial solution constructed with time budget consideration.")
        return paths
        
    def initialize_scores(self):
        """
        Initialize the scores of the node selection strategy and the removal and insertion operators.
        """
        print("Initializing scores for strategies and operators...")

        # Each strategy/operator starts with a score of 1.0 as per the described approach
        for strat in self.selection_strategies:
            self.selection_scores[strat] = 1.0
            self.selection_observed_scores[strat] = 0.0
            self.selection_counts[strat] = 0

        for rem in self.removal_ops:
            self.removal_scores[rem] = 1.0
            self.removal_observed_scores[rem] = 0.0
            self.removal_counts[rem] = 0

        for ins in self.insertion_ops:
            self.insertion_scores[ins] = 1.0
            self.insertion_observed_scores[ins] = 0.0
            self.insertion_counts[ins] = 0
    
    def select_node_selection_strategy(self):
        """
        Select a node selection strategy based on current scores.
        A roulette wheel selection is performed:
        P_k = p_k / sum_of_all_p
        """
        print("Selecting node selection strategy...")
        total_score = sum(self.selection_scores.values())
        r = random.uniform(0, total_score)
        cumulative = 0.0
        for strat, score in self.selection_scores.items():
            cumulative += score
            if r <= cumulative:
                return strat
        # Fallback (should not happen if everything is correct)
        return self.selection_strategies[0]
    
    def select_removal_insertion_operators(self):
        """
        Select removal and insertion operators based on current scores via roulette wheel selection.
        """
        print("Selecting removal and insertion operators...")

        # Removal operator selection
        total_score_rem = sum(self.removal_scores.values())
        r_rem = random.uniform(0, total_score_rem)
        cumulative_rem = 0.0
        selected_removal = None
        for rem, score in self.removal_scores.items():
            cumulative_rem += score
            if r_rem <= cumulative_rem:
                selected_removal = rem
                break

        # Insertion operator selection
        total_score_ins = sum(self.insertion_scores.values())
        r_ins = random.uniform(0, total_score_ins)
        cumulative_ins = 0.0
        selected_insertion = None
        for ins, score in self.insertion_scores.items():
            cumulative_ins += score
            if r_ins <= cumulative_ins:
                selected_insertion = ins
                break

        return selected_removal, selected_insertion
    
    def remove_nodes(self, solution, b, removal_operator):
        """
        Remove b nodes from the solution using the specified removal operator.
        Placeholder for the actual implementation.
        
        :param solution: Current solution
        :param b: Number of nodes to remove
        :param removal_operator: Removal operator identifier
        :return: Modified solution
        """
        print(f"Removing {b} nodes using {removal_operator}...")
        # TODO: Implement node removal logic
        modified_solution = copy.deepcopy(solution)  # Replace with actual modification
        return modified_solution
    
    def insert_nodes(self, solution, insertion_operator, strategy):
        """
        Insert nodes into the solution using the specified insertion operator and strategy.
        Placeholder for the actual implementation.
        
        :param solution: Current solution
        :param insertion_operator: Insertion operator identifier
        :param strategy: Node selection strategy identifier
        :return: Modified solution
        """
        print(f"Inserting nodes using {insertion_operator} with strategy {strategy}...")
        # TODO: Implement node insertion logic
        modified_solution = copy.deepcopy(solution)  # Replace with actual modification
        return modified_solution
    
    def evaluate_solution(self, solution):
        """
        Evaluate the objective function of the solution.
        Placeholder for the actual implementation.
        
        :param solution: Solution to evaluate
        :return: Objective function value
        """
        print("Evaluating solution...")
        # TODO: Implement solution evaluation
        objective_value = 0  # Replace with actual evaluation
        return objective_value
    
    def apply_local_search(self, solution):
        """
        Apply local search procedures on the solution.
        Placeholder for the actual implementation.
        
        :param solution: Solution to improve
        :return: Improved solution
        """
        print("Applying local search procedures...")
        # TODO: Implement local search
        improved_solution = copy.deepcopy(solution)  # Replace with actual improvement
        return improved_solution
    
    def generate_and_solve_SROP(self, solution):
        """
        Generate and solve a SROP (Sub-Route Optimization Problem).
        Placeholder for the actual implementation.
        
        :param solution: Current solution
        :return: Optimized solution
        """
        print("Generating and solving SROP...")
        # TODO: Implement SROP generation and solving
        optimized_solution = copy.deepcopy(solution)  # Replace with actual optimization
        return optimized_solution
    
    def generate_and_solve_SPP(self, solution):
        """
        Generate and solve a SPP (Single Path Problem).
        Placeholder for the actual implementation.
        
        :param solution: Current solution
        :return: Optimized solution
        """
        print("Generating and solving SPP...")
        # TODO: Implement SPP generation and solving
        optimized_solution = copy.deepcopy(solution)  # Replace with actual optimization
        return optimized_solution
    
    def update_scores(self):
        """
        Update the scores of the ALNS operators and insertion strategies at the end of a segment.
        
        p_{k,q+1} = kappa * (observed_score_k / n_k) + (1 - kappa)*p_{k,q}, if n_k > 0
        If n_k = 0, score remains unchanged.
        After updating, observed scores and counts reset.
        """
        print("Updating scores of the ALNS operators and insertion strategies...")

        kappa = self.params.get('kappa', 0.5)

        def update_category_scores(scores, observed_scores, counts):
            for k in scores.keys():
                n_k = counts[k]
                if n_k > 0:
                    old_score = scores[k]
                    new_score = kappa * (observed_scores[k] / n_k) + (1 - kappa) * old_score
                    scores[k] = new_score
                # Reset for next segment
                observed_scores[k] = 0.0
                counts[k] = 0

        update_category_scores(self.selection_scores, self.selection_observed_scores, self.selection_counts)
        update_category_scores(self.removal_scores, self.removal_observed_scores, self.removal_counts)
        update_category_scores(self.insertion_scores, self.insertion_observed_scores, self.insertion_counts)

        print("Score update completed.")
        
    def run(self):
            # Step 1: Apply node elimination procedure
            self.node_elimination_procedure()
            
            # Step 2: Construct initial solution using the nearest neighbor algorithm
            initial_solution = self.construct_initial_solution()
            self.s_best = initial_solution
            self.s_adm = initial_solution
            
            # Initialize parameters
            self.T = self.params['T0']
            self.seg = 0
            self.iteration_best = 0
            
            # Step 4: Initialize scores
            self.initialize_scores()

            # Extract q parameters
            q1 = self.params.get('q1', 10.0)
            q2 = self.params.get('q2', 5.0)
            q3 = self.params.get('q3', 1.0)

            # Step 5: Main loop
            while self.seg < self.params['Nseg'] and self.iteration_best < self.params['iteration_best_max']:
                print(f"\n--- Segment {self.seg + 1} ---")
                iteration = 0
                
                while iteration < self.params['iteration_max']:
                    print(f"\nIteration {iteration + 1} within Segment {self.seg + 1}")
                    s = copy.deepcopy(self.s_adm)
                    
                    # Generate b (number of nodes to remove)
                    # For safety, ensure b does not exceed solution complexity; using a simple logic:
                    b = random.randint(0, max(0, sum(len(p) for p in s)-2*len(s)))
                    print(f"Number of nodes to remove: {b}")
                    
                    # Select node selection strategy
                    c = self.select_node_selection_strategy()
                    
                    # Select removal and insertion operators
                    R, I = self.select_removal_insertion_operators()
                    
                    # Count usage
                    self.selection_counts[c] += 1
                    self.removal_counts[R] += 1
                    self.insertion_counts[I] += 1

                    # Remove b nodes using R
                    s = self.remove_nodes(s, b, R)
                    
                    # Insert nodes using I following c
                    s = self.insert_nodes(s, I, c)
                    
                    # Generate a random number d in [0,1]
                    d = random.uniform(0, 1)
                    print(f"Random number d: {d}")
                    
                    # Evaluate solutions
                    f_s = self.evaluate_solution(s)
                    f_sadm = self.evaluate_solution(self.s_adm)
                    f_sbest = self.evaluate_solution(self.s_best)
                    
                    if f_s >= f_sadm or d <= math.exp((f_s - f_sadm) / self.T):
                        print("Solution accepted based on acceptance criteria.")
                        
                        # Determine q increment based on improvement
                        if f_s > f_sbest:
                            q_value = q1
                        elif f_s > f_sadm:
                            q_value = q2
                        else:
                            q_value = q3

                        # Increment observed scores
                        self.selection_observed_scores[c] += q_value
                        self.removal_observed_scores[R] += q_value
                        self.insertion_observed_scores[I] += q_value

                        # If f(s) > f(sadm), apply local search
                        if f_s > f_sadm:
                            s = self.apply_local_search(s)
                            f_s = self.evaluate_solution(s)
                            print("Applied local search.")

                        # If f(s) > f(s_best), update sbest and reset iteration_best
                        if f_s > f_sbest:
                            print("New best solution found. Generating and solving SROP...")
                            s = self.generate_and_solve_SROP(s)
                            self.s_best = copy.deepcopy(s)
                            self.iteration_best = 0
                            print("s_best updated.")
                        else:
                            self.iteration_best += 1
                            print(f"Iteration best incremented to {self.iteration_best}.")
                        
                        self.s_adm = copy.deepcopy(s)
                    else:
                        # Not accepted
                        self.iteration_best += 1
                        print(f"Solution not accepted. Iteration best incremented to {self.iteration_best}.")
                    
                    # Temperature check
                    if self.T <= self.params['T_min']:
                        print("Temperature below minimum. Resetting temperature and generating SPP...")
                        self.T = self.params['T0']
                        s = self.generate_and_solve_SPP(s)
                        
                        if self.evaluate_solution(s) > self.evaluate_solution(self.s_best):
                            self.iteration_best = 0
                            print("Improved solution found after SPP.")
                        
                        self.s_best = copy.deepcopy(s)
                        self.s_adm = copy.deepcopy(s)
                    
                    # Update temperature and iteration
                    self.T *= self.params['cooling_rate']
                    iteration += 1
                    print(f"Temperature updated to {self.T}.")
                
                # Update scores at the end of the segment
                self.update_scores()
                
                # Increment segment
                self.seg += 1
                print(f"Segment {self.seg} completed.")
            
            # Generate and solve a final SPP
            print("\nGenerating and solving final SPP...")
            self.s_best = self.generate_and_solve_SPP(self.s_best)
            
            # Final solution
            print("Final solution obtained.")
            return self.s_best


In [10]:
if __name__ == "__main__":
    # Define the path to your TOP instance data file
    file_path = 'Set_100_234/p4.2.a.txt'

    # Example parameters (these should be tuned based on the specific problem)
    parameters = {
        'T0': 1000,                # Initial temperature
        'T_min': 1,                # Minimum temperature
        'cooling_rate': 0.95,      # Cooling rate for temperature
        'Nseg': 10,                # Number of segments
        'iteration_max': 100,      # Maximum iterations per segment
        'iteration_best_max': 50,  # Maximum iterations without improvement
        # 'b': 2                     # Number of nodes to remove in each iteration -- randomly generated
    }

    # Initialize HALNS heuristic with the problem instance file and parameters
    halns = HALNS(file_path, parameters)

    # Run HALNS to obtain the best solution
    best_solution = halns.run()

    # Output the best solution
    print("Best Solution:", best_solution)


Applying node elimination procedure...
Eliminating node 1 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 2 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 3 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 5 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 8 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 11 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 12 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 13 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 15 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 16 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 17 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 18 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 19 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 20 due to infeasibility (s->i->e exceeds Tmax).
Eliminating node 21 due to infeasibility (s->i->e exceed