In [4]:
## import

# 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')

# 필요한 라이브러리 불러오기
import pandas as pd
import numpy as np
import random
import os
from scipy.spatial.distance import euclidean

# 경고 무시
import warnings
warnings.filterwarnings('ignore')

# Fixing the random seed for reproducibility
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)

# Set a fixed seed
fixed_seed = 42
set_seed(fixed_seed)

## 데이터 및 환경준비

# Load the user's data
data_df = pd.read_csv('/content/drive/MyDrive/산타/data/data.csv')

# Load coordinates and demands from the data
towns = data_df[data_df['point_id'] != 'DEPOT']
depot = data_df[data_df['point_id'] == 'DEPOT']
depot_coords = (depot['x'].values[0], depot['y'].values[0])

# Maximum capacity of the sleigh
max_capacity = 25

# Prepare town data
town_coords = list(zip(towns['x'], towns['y']))
town_demands = towns['demand'].values
town_names = towns['point_id'].values

## 모델링

def calculate_cvrp_cost(routes, coords, demands, depot_coords, max_capacity):
    """
    routes: [[town_idx1, town_idx2, ...], [다른 차량 경로], ...]
    각 vehicle route는 depot -> towns -> depot 형태로 이동한다고 가정.
    """
    total_distance = 0
    for route in routes:
        current_pos = depot_coords
        for t_idx in route:
            total_distance += euclidean(current_pos, coords[t_idx])
            current_pos = coords[t_idx]
        total_distance += euclidean(current_pos, depot_coords)
    return total_distance

def decode_solution(route, demands, max_capacity):
    """
    주어진 순열을 용량 제약을 고려하여 여러 개의 서브 루트로 분할합니다.
    route: 도시 인덱스 리스트
    """
    routes = []
    current_route = []
    current_capacity = max_capacity
    for town_idx in route:
        d = demands[town_idx]
        if d <= current_capacity:
            current_route.append(town_idx)
            current_capacity -= d
        else:
            routes.append(current_route)
            current_route = [town_idx]
            current_capacity = max_capacity - d
    if current_route:
        routes.append(current_route)
    return routes

def encode_solution(routes):
    """
    여러 차량 경로 리스트를 하나의 순열로 합칩니다.
    """
    return [t for r in routes for t in r]

def route_cost(route, coords, demands, depot_coords, max_capacity):
    dist = 0
    pos = depot_coords
    for t_idx in route:
        dist += euclidean(pos, coords[t_idx])
        pos = coords[t_idx]
    dist += euclidean(pos, depot_coords)
    return dist

def two_opt_route(route, coords, demands, depot_coords, max_capacity, max_iterations=50):
    """
    단일 route(한 차량 경로)에 2-opt 적용.
    route: 고객 인덱스 리스트
    """
    if len(route) < 4:
        return route[:]  # 너무 짧으면 2-opt 불필요

    best = route[:]
    best_distance = route_cost(best, coords, demands, depot_coords, max_capacity)
    improved = True
    iteration = 0

    while improved and iteration < max_iterations:
        improved = False
        for i in range(len(best)-1):
            for j in range(i+2, len(best)):
                if j - i == 1:
                    continue
                new_route = best[:i+1] + best[j:i:-1] + best[j+1:]
                # 수요합은 변하지 않으므로 용량 초과는 걱정할 필요가 없음 (단일 route 내에서 고객 순서만 바꿈)
                new_distance = route_cost(new_route, coords, demands, depot_coords, max_capacity)
                if new_distance < best_distance:
                    best = new_route
                    best_distance = new_distance
                    improved = True
                    break
            if improved:
                break
        iteration += 1

    return best

def local_search_cvrp(routes, coords, demands, depot_coords, max_capacity):
    """
    CVRP 경로 집합에 대해 로컬 서치.
    여기서는 각 route에 2-opt만 적용.
    """
    improved_routes = []
    for r in routes:
        improved_routes.append(two_opt_route(r, coords, demands, depot_coords, max_capacity))
    return improved_routes

# GA 파트
def initialize_population(size, num_towns):
    return [random.sample(range(num_towns), num_towns) for _ in range(size)]

def evaluate_population(population, coords, demands, depot_coords, max_capacity):
    # population의 각 해(순열)에 대해 decode -> cost 계산
    fits = []
    for ind in population:
        rts = decode_solution(ind, demands, max_capacity)
        cost = calculate_cvrp_cost(rts, coords, demands, depot_coords, max_capacity)
        fits.append(cost)
    return fits

def select_parents(population, fitness, k=5):
    selected = []
    for _ in range(2):  # 2명의 부모 선택
        tournament = random.sample(list(zip(population, fitness)), k)
        tournament.sort(key=lambda x: x[1])
        selected.append(tournament[0][0])
    return selected

def crossover(parent1, parent2):
    size = len(parent1)
    child = [-1] * size
    start, end = sorted(random.sample(range(size), 2))
    child[start:end] = parent1[start:end]

    pointer = 0
    for gene in parent2:
        if gene not in child:
            while child[pointer] != -1:
                pointer += 1
            child[pointer] = gene
    return child

def mutate(individual, mutation_rate):
    if random.random() < mutation_rate:
        idx1, idx2 = random.sample(range(len(individual)), 2)
        individual[idx1], individual[idx2] = individual[idx2], individual[idx1]

def genetic_algorithm_with_local_search(coords, demands, depot_coords, max_capacity,
                                        population_size, generations, mutation_rate,
                                        stop_threshold=100):
    num_towns = len(coords)
    population = initialize_population(population_size, num_towns)
    best_solution = None
    best_fitness = float('inf')

    no_improvement_count = 0  # 개선 없는 세대 수를 셀 변수

    for generation in range(generations):
        fitness = evaluate_population(population, coords, demands, depot_coords, max_capacity)

        new_population = []
        # 엘리트 보존
        elite_idx = np.argmin(fitness)
        elite = population[elite_idx]
        new_population.append(elite)

        for _ in range((population_size // 2) - 1):
            parent1, parent2 = select_parents(population, fitness)

            child1 = crossover(parent1, parent2)
            child2 = crossover(parent2, parent1)

            mutate(child1, mutation_rate)
            mutate(child2, mutation_rate)

            # 로컬서치 적용: decode -> local_search -> encode
            child1_routes = decode_solution(child1, demands, max_capacity)
            child1_routes = local_search_cvrp(child1_routes, coords, demands, depot_coords, max_capacity)
            child1 = encode_solution(child1_routes)

            child2_routes = decode_solution(child2, demands, max_capacity)
            child2_routes = local_search_cvrp(child2_routes, coords, demands, depot_coords, max_capacity)
            child2 = encode_solution(child2_routes)

            new_population.extend([child1, child2])

        if len(new_population) < population_size:
            new_population.append(random.sample(range(num_towns), num_towns))

        population = new_population
        fitness = evaluate_population(population, coords, demands, depot_coords, max_capacity)
        current_best_idx = np.argmin(fitness)
        current_best = population[current_best_idx]
        current_best_fitness = fitness[current_best_idx]

        if current_best_fitness < best_fitness:
            best_solution = current_best[:]
            best_fitness = current_best_fitness
            no_improvement_count = 0  # 개선 발생 시 카운터 리셋
        else:
            no_improvement_count += 1  # 개선이 없으니 증가

        if generation % 10 == 0:
            print(f"Generation {generation}: Best Distance = {best_fitness}")

        # 개선이 일정 세대 이상 없으면 중단
        if no_improvement_count >= stop_threshold:
            print(f"No improvement for {stop_threshold} generations. Early stopping at generation {generation}.")
            break

    return best_solution, best_fitness

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
# 파라미터 설정 및 실행
population_size = 10
generations = 100
mutation_rate = 0.1
stop_threshold = 20  # 50세대 동안 진전이 없으면 멈춤

best_route, best_distance = genetic_algorithm_with_local_search(
    coords=town_coords,
    demands=town_demands,
    depot_coords=depot_coords,
    max_capacity=max_capacity,
    population_size=population_size,
    generations=generations,
    mutation_rate=mutation_rate,
    stop_threshold=stop_threshold
)

final_routes = decode_solution(best_route, town_demands, max_capacity)
best_distance = calculate_cvrp_cost(final_routes, town_coords, town_demands, depot_coords, max_capacity)

print("Best Distance:", best_distance)
print("Routes:", final_routes)

Generation 0: Best Distance = 3672.329157080389
Generation 10: Best Distance = 3664.9217969064466
Generation 20: Best Distance = 3557.7908250567943
Generation 30: Best Distance = 3551.737658492115
Generation 40: Best Distance = 3550.9289391761367
Generation 50: Best Distance = 3550.9289391761367
Generation 60: Best Distance = 3548.7294757875516
Generation 70: Best Distance = 3548.7294757875516
No improvement for 20 generations. Early stopping at generation 71.


In [6]:
best_distance

3548.7294757875516