# VRPTW Solver: Hybrid Genetic Algorithm + Tabu Search

This notebook implements a hybrid metaheuristic combining:
- **Genetic Algorithm (GA)**: Global exploration through evolution
- **Tabu Search (TS)**: Local exploitation for refinement

## Algorithm Flow:
1. Initialize population of solutions
2. For each generation:
   - Selection: Choose parent solutions
   - Crossover: Create offspring
   - Mutation: Add diversity
   - Tabu Search: Local optimization of offspring
   - Population update: Select next generation

## Goal:
Find feasible VRPTW solutions within **7% gap** from optimal.

In [14]:
# Import required libraries
import numpy as np
import vrplib
import yaml
import random
import math
import time
import os
from collections import deque
from copy import deepcopy
from typing import List, Dict, Tuple, Optional
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

print("Libraries imported successfully!")

Libraries imported successfully!


In [15]:
# Load configuration from HGA config file
with open('config_hga.yaml', 'r') as f:
    config = yaml.safe_load(f)

# Set random seed
random.seed(config['general']['random_seed'])
np.random.seed(config['general']['random_seed'])

print("HGA+TS configuration loaded and seed set!")
print(yaml.dump(config, default_flow_style=False))

HGA+TS configuration loaded and seed set!
general:
  random_seed: 42
  time_limit_seconds: 600
  verbose: true
genetic_algorithm:
  crossover_rate: 0.8
  elitism_count: 5
  max_generations: 100
  mutation_rate: 0.2
  population_size: 50
  tournament_size: 3
quality:
  feasibility_required: true
  target_gap_percentage: 7.0
tabu_search:
  aspiration_enabled: true
  max_iterations: 50
  tabu_tenure: 15



## GA+TS Configuration Parameters

In [16]:
# Extract GA and TS parameters from config file
GA_CONFIG = config['genetic_algorithm']
TS_CONFIG = config['tabu_search']

print("GA+TS parameters loaded from config_hga.yaml!")
print(f"Population size: {GA_CONFIG['population_size']}")
print(f"Generations: {GA_CONFIG['max_generations']}")
print(f"Crossover rate: {GA_CONFIG['crossover_rate']}")
print(f"Mutation rate: {GA_CONFIG['mutation_rate']}")
print(f"TS iterations per offspring: {TS_CONFIG['max_iterations']}")
print(f"Tabu tenure: {TS_CONFIG['tabu_tenure']}")

GA+TS parameters loaded from config_hga.yaml!
Population size: 50
Generations: 100
Crossover rate: 0.8
Mutation rate: 0.2
TS iterations per offspring: 50
Tabu tenure: 15


## Solution Representation

In [17]:
class VRPTWSolution:
    """VRPTW solution with fitness evaluation."""
    
    def __init__(self, routes: List[List[int]], distance_matrix: np.ndarray,
                 time_windows: np.ndarray, service_times: np.ndarray,
                 demands: np.ndarray, capacity: int):
        self.routes = routes
        self.distance_matrix = distance_matrix
        self.time_windows = time_windows
        self.service_times = service_times
        self.demands = demands
        self.capacity = capacity
        self.cost = self.calculate_cost()
        self.fitness = self.calculate_fitness()
    
    def calculate_cost(self) -> float:
        """Calculate total distance."""
        total_cost = 0
        for route in self.routes:
            if len(route) == 0:
                continue
            total_cost += self.distance_matrix[0, route[0]]
            for i in range(len(route) - 1):
                total_cost += self.distance_matrix[route[i], route[i+1]]
            total_cost += self.distance_matrix[route[-1], 0]
        return total_cost
    
    def calculate_fitness(self) -> float:
        """Calculate fitness with penalty for infeasibility."""
        penalty = 0
        
        # Check capacity constraints
        for route in self.routes:
            route_demand = sum(self.demands[c] for c in route)
            if route_demand > self.capacity:
                penalty += 10000 * (route_demand - self.capacity)
        
        # Check time window constraints
        for route in self.routes:
            current_time = 0.0
            current_loc = 0
            for customer in route:
                travel_time = self.distance_matrix[current_loc, customer]
                arrival_time = current_time + travel_time
                due_date = self.time_windows[customer, 1]
                
                if arrival_time > due_date:
                    penalty += 1000 * (arrival_time - due_date)
                
                ready_time = self.time_windows[customer, 0]
                start_service = max(arrival_time, ready_time)
                current_time = start_service + self.service_times[customer]
                current_loc = customer
        
        return self.cost + penalty
    
    def is_feasible(self) -> bool:
        """Check complete feasibility."""
        for route in self.routes:
            # Capacity
            if sum(self.demands[c] for c in route) > self.capacity:
                return False
            # Time windows
            current_time = 0.0
            current_loc = 0
            for customer in route:
                arrival_time = current_time + self.distance_matrix[current_loc, customer]
                if arrival_time > self.time_windows[customer, 1]:
                    return False
                start_service = max(arrival_time, self.time_windows[customer, 0])
                current_time = start_service + self.service_times[customer]
                current_loc = customer
        return True
    
    def copy(self):
        """Deep copy."""
        return VRPTWSolution([r.copy() for r in self.routes], self.distance_matrix,
                            self.time_windows, self.service_times, self.demands, self.capacity)

print("VRPTWSolution class defined!")

VRPTWSolution class defined!


## Helper Functions

In [18]:
def calculate_distance_matrix(instance: Dict) -> np.ndarray:
    """Calculate distance matrix."""
    if 'edge_weight' in instance:
        return instance['edge_weight']
    coords = np.array(instance['node_coord'])
    n = len(coords)
    dist_matrix = np.zeros((n, n))
    for i in range(n):
        for j in range(n):
            if i != j:
                dist = np.sqrt(np.sum((coords[i] - coords[j])**2))
                dist_matrix[i, j] = dist
    return dist_matrix

def create_random_solution(instance: Dict, distance_matrix: np.ndarray,
                          time_windows: np.ndarray, service_times: np.ndarray) -> VRPTWSolution:
    """Create a random feasible solution."""
    demands = np.array(instance['demand'])
    capacity = instance['capacity']
    n_customers = len(demands) - 1
    
    customers = list(range(1, n_customers + 1))
    random.shuffle(customers)
    
    routes = []
    current_route = []
    current_load = 0
    current_time = 0.0
    current_loc = 0
    
    for customer in customers:
        # Try to add to current route
        can_add = True
        
        # Check capacity
        if current_load + demands[customer] > capacity:
            can_add = False
        else:
            # Check time window
            arrival = current_time + distance_matrix[current_loc, customer]
            if arrival > time_windows[customer, 1]:
                can_add = False
        
        if not can_add and current_route:
            routes.append(current_route)
            current_route = []
            current_load = 0
            current_time = 0.0
            current_loc = 0
        
        current_route.append(customer)
        current_load += demands[customer]
        arrival = current_time + distance_matrix[current_loc, customer]
        start = max(arrival, time_windows[customer, 0])
        current_time = start + service_times[customer]
        current_loc = customer
    
    if current_route:
        routes.append(current_route)
    
    return VRPTWSolution(routes, distance_matrix, time_windows, service_times, demands, capacity)

print("Helper functions defined!")

Helper functions defined!


## Population Initialization

In [19]:
def initialize_population(instance: Dict, distance_matrix: np.ndarray,
                         time_windows: np.ndarray, service_times: np.ndarray,
                         pop_size: int) -> List[VRPTWSolution]:
    """Generate initial population."""
    population = []
    for _ in range(pop_size):
        solution = create_random_solution(instance, distance_matrix, time_windows, service_times)
        population.append(solution)
    return population

print("Population initialization defined!")

Population initialization defined!


## Genetic Operators

In [20]:
def tournament_selection(population: List[VRPTWSolution], tournament_size: int) -> VRPTWSolution:
    """Select parent using tournament selection."""
    tournament = random.sample(population, tournament_size)
    return min(tournament, key=lambda x: x.fitness)

def order_crossover(parent1: VRPTWSolution, parent2: VRPTWSolution) -> VRPTWSolution:
    """Order crossover (OX) for route-based representation."""
    # Flatten routes to customer sequences
    seq1 = [c for route in parent1.routes for c in route]
    seq2 = [c for route in parent2.routes for c in route]
    
    if len(seq1) < 2:
        return parent1.copy()
    
    # Select crossover points
    point1 = random.randint(0, len(seq1) - 1)
    point2 = random.randint(point1, len(seq1))
    
    # Copy segment from parent1
    offspring_seq = [None] * len(seq1)
    offspring_seq[point1:point2] = seq1[point1:point2]
    
    # Fill remaining from parent2
    remaining = [c for c in seq2 if c not in offspring_seq[point1:point2]]
    idx = 0
    for i in range(len(offspring_seq)):
        if offspring_seq[i] is None:
            offspring_seq[i] = remaining[idx]
            idx += 1
    
    # Reconstruct routes
    routes = []
    current_route = []
    current_load = 0
    current_time = 0.0
    current_loc = 0
    
    for customer in offspring_seq:
        can_add = True
        if current_load + parent1.demands[customer] > parent1.capacity:
            can_add = False
        else:
            arrival = current_time + parent1.distance_matrix[current_loc, customer]
            if arrival > parent1.time_windows[customer, 1]:
                can_add = False
        
        if not can_add and current_route:
            routes.append(current_route)
            current_route = []
            current_load = 0
            current_time = 0.0
            current_loc = 0
        
        current_route.append(customer)
        current_load += parent1.demands[customer]
        arrival = current_time + parent1.distance_matrix[current_loc, customer]
        start = max(arrival, parent1.time_windows[customer, 0])
        current_time = start + parent1.service_times[customer]
        current_loc = customer
    
    if current_route:
        routes.append(current_route)
    
    return VRPTWSolution(routes, parent1.distance_matrix, parent1.time_windows,
                        parent1.service_times, parent1.demands, parent1.capacity)

def swap_mutation(solution: VRPTWSolution) -> VRPTWSolution:
    """Swap two random customers."""
    new_solution = solution.copy()
    all_customers = [(i, j) for i, route in enumerate(new_solution.routes) 
                     for j in range(len(route))]
    
    if len(all_customers) < 2:
        return new_solution
    
    pos1, pos2 = random.sample(all_customers, 2)
    route1, idx1 = pos1
    route2, idx2 = pos2
    
    new_solution.routes[route1][idx1], new_solution.routes[route2][idx2] = \
        new_solution.routes[route2][idx2], new_solution.routes[route1][idx1]
    
    new_solution.cost = new_solution.calculate_cost()
    new_solution.fitness = new_solution.calculate_fitness()
    
    return new_solution

print("Genetic operators defined!")

Genetic operators defined!


## Tabu Search for Local Optimization

In [21]:
class TabuList:
    """Tabu list implementation."""
    def __init__(self, tenure: int):
        self.tenure = tenure
        self.tabu_dict = {}
        self.iteration = 0
    
    def add(self, move: Tuple):
        self.tabu_dict[move] = self.iteration + self.tenure
    
    def is_tabu(self, move: Tuple) -> bool:
        return move in self.tabu_dict and self.tabu_dict[move] > self.iteration
    
    def increment(self):
        self.iteration += 1

def generate_neighbors(solution: VRPTWSolution) -> List[Tuple[VRPTWSolution, Tuple]]:
    """Generate neighboring solutions with move identifiers."""
    neighbors = []
    
    # Relocate moves
    for i, route_i in enumerate(solution.routes):
        if len(route_i) == 0:
            continue
        for pos_i in range(len(route_i)):
            for j in range(len(solution.routes)):
                if i == j:
                    continue
                for pos_j in range(len(solution.routes[j]) + 1):
                    new_sol = solution.copy()
                    customer = new_sol.routes[i].pop(pos_i)
                    new_sol.routes[j].insert(pos_j, customer)
                    new_sol.cost = new_sol.calculate_cost()
                    new_sol.fitness = new_sol.calculate_fitness()
                    move = ('relocate', i, pos_i, j, pos_j)
                    neighbors.append((new_sol, move))
    
    return neighbors

def tabu_search_local_optimization(solution: VRPTWSolution, config: Dict) -> VRPTWSolution:
    """Apply Tabu Search for local optimization."""
    tabu_list = TabuList(config['tabu_tenure'])
    current = solution
    best = solution.copy()
    
    for iteration in range(config['max_iterations']):
        # Generate neighbors
        neighbors = generate_neighbors(current)
        
        if not neighbors:
            break
        
        # Filter tabu neighbors
        valid_neighbors = []
        for neighbor, move in neighbors:
            if not tabu_list.is_tabu(move):
                valid_neighbors.append((neighbor, move))
            elif config['aspiration_enabled'] and neighbor.fitness < best.fitness:
                # Aspiration criterion
                valid_neighbors.append((neighbor, move))
        
        if not valid_neighbors:
            break
        
        # Select best neighbor
        best_neighbor, best_move = min(valid_neighbors, key=lambda x: x[0].fitness)
        
        # Update
        current = best_neighbor
        tabu_list.add(best_move)
        tabu_list.increment()
        
        if current.fitness < best.fitness:
            best = current.copy()
    
    return best

print("Tabu Search local optimization defined!")

Tabu Search local optimization defined!


## Hybrid GA+TS Main Algorithm

In [22]:
def hybrid_ga_ts(instance: Dict, distance_matrix: np.ndarray,
                 time_windows: np.ndarray, service_times: np.ndarray,
                 ga_config: Dict, ts_config: Dict, time_limit: float = None) -> VRPTWSolution:
    """Hybrid Genetic Algorithm with Tabu Search."""
    
    start_time = time.time()
    
    # 1. Initialize Population
    print("Initializing population...")
    population = initialize_population(instance, distance_matrix, time_windows,
                                      service_times, ga_config['population_size'])
    
    best_overall = min(population, key=lambda x: x.fitness)
    print(f"Initial best fitness: {best_overall.fitness:.2f}")
    
    # 2. Evolution Loop
    for generation in range(ga_config['max_generations']):
        if time_limit and (time.time() - start_time) > time_limit:
            print(f"Time limit reached at generation {generation}")
            break
        
        offspring_population = []
        
        # a. Selection, Crossover, Mutation
        while len(offspring_population) < ga_config['population_size'] - ga_config['elitism_count']:
            # Selection
            parent1 = tournament_selection(population, ga_config['tournament_size'])
            parent2 = tournament_selection(population, ga_config['tournament_size'])
            
            # Crossover
            if random.random() < ga_config['crossover_rate']:
                offspring = order_crossover(parent1, parent2)
            else:
                offspring = parent1.copy()
            
            # Mutation
            if random.random() < ga_config['mutation_rate']:
                offspring = swap_mutation(offspring)
            
            offspring_population.append(offspring)
        
        # d. Local Search with Tabu Search (on subset)
        print(f"Generation {generation + 1}: Applying TS to offspring...")
        for i in range(min(10, len(offspring_population))):  # Optimize subset for speed
            offspring_population[i] = tabu_search_local_optimization(offspring_population[i], ts_config)
        
        # f. Population Update with Elitism
        population.sort(key=lambda x: x.fitness)
        elite = population[:ga_config['elitism_count']]
        
        combined = elite + offspring_population
        combined.sort(key=lambda x: x.fitness)
        population = combined[:ga_config['population_size']]
        
        # Track best
        current_best = population[0]
        if current_best.fitness < best_overall.fitness:
            best_overall = current_best.copy()
            print(f"  New best: Cost={best_overall.cost:.2f}, Fitness={best_overall.fitness:.2f}, Feasible={best_overall.is_feasible()}")
    
    print(f"\nFinal best: Cost={best_overall.cost:.2f}, Feasible={best_overall.is_feasible()}")
    return best_overall

print("Hybrid GA+TS algorithm defined!")

Hybrid GA+TS algorithm defined!


## Main Solver Function

In [23]:
def solve_vrptw_ga_ts(instance_path: str) -> Dict:
    """Solve VRPTW using GA+TS."""
    
    print(f"\n{'='*60}")
    print(f"Solving: {os.path.basename(instance_path)}")
    print(f"{'='*60}")
    
    # Read instance
    instance = vrplib.read_instance(instance_path, instance_format='solomon')
    
    # Extract data
    distance_matrix = calculate_distance_matrix(instance)
    demands = np.array(instance['demand'])
    capacity = instance['capacity']
    time_windows = np.array(instance['time_window'])
    service_times = np.array(instance['service_time'])
    
    print(f"Dimension: {len(demands)} nodes")
    print(f"Capacity: {capacity}")
    print(f"Time horizon: [0, {time_windows[0, 1]:.0f}]")
    
    # Solve
    start_time = time.time()
    time_limit = config['general']['time_limit_seconds']
    
    solution = hybrid_ga_ts(instance, distance_matrix, time_windows, service_times,
                           GA_CONFIG, TS_CONFIG, time_limit)
    
    elapsed_time = time.time() - start_time
    
    print(f"\nFinal cost: {solution.cost:.2f}")
    print(f"Final routes: {len(solution.routes)}")
    print(f"Time elapsed: {elapsed_time:.2f} seconds")
    print(f"Feasible: {solution.is_feasible()}")
    
    # Compare with optimal
    optimal_cost = None
    gap_percentage = None
    
    sol_path = instance_path.replace('.txt', '.sol')
    if os.path.exists(sol_path):
        try:
            optimal_solution = vrplib.read_solution(sol_path)
            optimal_cost = optimal_solution['cost']
            gap_percentage = ((solution.cost - optimal_cost) / optimal_cost) * 100
            
            print(f"\nOptimal cost: {optimal_cost:.2f}")
            print(f"Gap: {gap_percentage:.2f}%")
            
            if gap_percentage <= config['quality']['target_gap_percentage']:
                print(f"✓ Within target gap of {config['quality']['target_gap_percentage']}%")
            else:
                print(f"✗ Exceeds target gap of {config['quality']['target_gap_percentage']}%")
        except Exception as e:
            print(f"Could not read optimal solution: {e}")
    
    return {
        'instance_name': os.path.basename(instance_path),
        'solution': solution,
        'cost': solution.cost,
        'optimal_cost': optimal_cost,
        'gap_percentage': gap_percentage,
        'n_routes': len(solution.routes),
        'time_seconds': elapsed_time,
        'feasible': solution.is_feasible()
    }

print("Main solver function defined!")

Main solver function defined!


## Save Solution

In [24]:
def save_solution_ga_ts(result: Dict, output_dir: str = 'solutions_TW_GA'):
    """Save solution to file."""
    os.makedirs(output_dir, exist_ok=True)
    
    instance_name = result['instance_name'].replace('.txt', '')
    output_path = os.path.join(output_dir, f"{instance_name}_GA_TS.sol")
    
    routes = result['solution'].routes
    
    with open(output_path, 'w') as f:
        for idx, route in enumerate(routes, 1):
            route_str = ' '.join(map(str, route))
            f.write(f"Route #{idx}: {route_str}\n")
        f.write(f"Cost {result['cost']:.1f}\n")
    
    print(f"Solution saved to: {output_path}")
    return output_path

print("Save solution function defined!")

Save solution function defined!


## Test Instances

In [25]:
# Select test instances
solomon_dir = 'data/cvrplib/Vrp-Set-Solomon'
if os.path.exists(solomon_dir):
    all_instances = [os.path.join(solomon_dir, f) for f in os.listdir(solomon_dir) if f.endswith('.txt')]
else:
    all_instances = [os.path.join('data', f) for f in os.listdir('data') if f.endswith('.txt') and not f.startswith('empty')]

# Test on same instances as before
selected_instances = [inst for inst in all_instances if any(name in inst for name in ['C101', 'R101', 'RC101'])][:3]

print(f"Selected {len(selected_instances)} instances:")
for inst in selected_instances:
    print(f"  - {os.path.basename(inst)}")

Selected 3 instances:
  - C101.txt
  - R101.txt
  - RC101.txt


## Run Solver

In [None]:
# Solve with GA+TS
results_ga_ts = []

for instance_path in selected_instances:
    try:
        result = solve_vrptw_ga_ts(instance_path)
        save_solution_ga_ts(result)
        results_ga_ts.append(result)
    except Exception as e:
        print(f"Error solving {instance_path}: {e}")
        import traceback
        traceback.print_exc()
        continue

print(f"\n\nCompleted solving {len(results_ga_ts)} instances with GA+TS!")


Solving: C101.txt
Dimension: 101 nodes
Capacity: 200
Time horizon: [0, 1236]
Initializing population...
Initial best fitness: 4619.91
Generation 1: Applying TS to offspring...
  New best: Cost=1849.15, Fitness=1849.15, Feasible=True
Time limit reached at generation 1

Final best: Cost=1849.15, Feasible=True

Final cost: 1849.15
Final routes: 55
Time elapsed: 1817.66 seconds
Feasible: True

Optimal cost: 827.30
Gap: 123.52%
✗ Exceeds target gap of 7.0%
Solution saved to: solutions_TW_GA/C101_GA_TS.sol

Solving: R101.txt
Dimension: 101 nodes
Capacity: 200
Time horizon: [0, 230]
Initializing population...
Initial best fitness: 4214.15
Generation 1: Applying TS to offspring...


## Results Summary

In [None]:
# Create summary
summary_data_ga = []
for result in results_ga_ts:
    summary_data_ga.append({
        'Instance': result['instance_name'],
        'Computed Cost': f"{result['cost']:.2f}",
        'Optimal Cost': f"{result['optimal_cost']:.2f}" if result['optimal_cost'] else 'N/A',
        'Gap (%)': f"{result['gap_percentage']:.2f}" if result['gap_percentage'] is not None else 'N/A',
        'Routes': result['n_routes'],
        'Time (s)': f"{result['time_seconds']:.2f}",
        'Feasible': result['feasible']
    })

df_ga_summary = pd.DataFrame(summary_data_ga)
print("\nSummary of GA+TS Results:")
print(df_ga_summary.to_string(index=False))

# Statistics
gaps_ga = [r['gap_percentage'] for r in results_ga_ts if r['gap_percentage'] is not None]
if gaps_ga:
    print(f"\n\nGap Statistics:")
    print(f"  Average gap: {np.mean(gaps_ga):.2f}%")
    print(f"  Median gap: {np.median(gaps_ga):.2f}%")
    print(f"  Min gap: {np.min(gaps_ga):.2f}%")
    print(f"  Max gap: {np.max(gaps_ga):.2f}%")
    print(f"  Instances within {config['quality']['target_gap_percentage']}%: {sum(g <= config['quality']['target_gap_percentage'] for g in gaps_ga)}/{len(gaps_ga)}")

In [None]:
# Save summary
os.makedirs('solutions_TW_GA', exist_ok=True)
summary_path = 'solutions_TW_GA/summary_results_GA_TS.csv'
df_ga_summary.to_csv(summary_path, index=False)
print(f"Summary saved to: {summary_path}")