In [5]:
import csv
import math
import random

### Problem Parameters

In [6]:
CAPACITY = 25
POP_SIZE = 200
NUM_GENERATIONS = 300
TOURNAMENT_SIZE = 5
MUTATION_RATE = 0.2
SEED = 42

random.seed(SEED)

### Data Loading

In [7]:
towns = []
with open('./data/data.csv', 'r') as f:
    reader = csv.DictReader(f)
    for row in reader:
        if row['point_id'] == 'DEPOT':
            depot = (float(row['x']), float(row['y']), int(row['demand']))
        else:
            towns.append((row['point_id'], float(row['x']), float(row['y']), int(row['demand'])))

# 좌표 정보 추출
points = [('DEPOT', depot[0], depot[1], depot[2])] + towns

N = len(points)  # 디포(depot)를 포함한 전체 지점의 개수
depot_index = 0

### Distance Computation

In [8]:
# 거리 행렬(dist_matrix) 생성
dist_matrix = [[0]*N for _ in range(N)]
for i in range(N):
    for j in range(N):
        if i != j:
            dx = points[i][1] - points[j][1]
            dy = points[i][2] - points[j][2]
            dist_matrix[i][j] = math.sqrt(dx*dx + dy*dy)

### Helper Functions

In [9]:
def is_feasible(route, demands, capacity=CAPACITY):
    # 모든 지점이 정확히 한 번씩 나타나는지 확인
    # 일반적으로는 경로 생성 시 이를 보장하지만, 혹시 모르니 검사
    # route는 디포를 제외한 지점 인덱스들의 리스트
    # 용량 제약(capacity)에 따라 여러 서브투어(sub-tour)로 나누어 확인
    load = 0
    for t in route:
        d = demands[t]
        if d > capacity:
            return False  # 단일 지점 수요가 용량을 초과하는 경우
        load += d
        if load > capacity:
            # 디포로 돌아가고(하차), 새로운 용량으로 다시 시작
            load = d
            if load > capacity:
                return False
    return True

def route_to_subtours(route, demands, capacity=CAPACITY):
    # 전체 경로(순열)를 용량 제한에 맞춰 서브투어 목록으로 변환
    # 예시: route = [1,2,3,4,...]
    # 용량이 초과되면, 디포로 돌아가는 지점을 삽입하여 서브투어로 나눔
    subtours = []
    current_subtour = []
    current_load = 0
    for t in route:
        d = demands[t]
        if current_load + d > capacity:
            # 현재 서브투어를 마치고 디포로 복귀 후, 새 서브투어 시작
            subtours.append(current_subtour)
            current_subtour = [t]
            current_load = d
        else:
            current_subtour.append(t)
            current_load += d
    if current_subtour:
        subtours.append(current_subtour)
    return subtours

def compute_total_distance(route, dist_matrix, demands):
    # 총 거리를 계산(디포로 돌아오는 거리도 포함)
    # route는 지점 인덱스들의 순열 [1..N-1]
    # 용량제한을 고려하여 여러 서브투어로 나눈 뒤 거리 계산
    subtours = route_to_subtours(route, demands)
    total_dist = 0.0
    for subtour in subtours:
        # 디포에서 시작
        current = 0
        for town in subtour:
            total_dist += dist_matrix[current][town]
            current = town
        # 서브투어 종료 후 디포로 복귀
        total_dist += dist_matrix[current][0]
    return total_dist

def initial_population(pop_size, n_towns):
    # 초기 개체군 생성: 무작위로 섞은 순열(permutation)을 만듦
    # 각 염색체(Chromosome)는 [1..N-1] 범위의 지점 인덱스(디포 제외)
    chromosomes = []
    town_indices = list(range(1, n_towns))
    demands = {i: points[i][3] for i in range(n_towns)}

    # 유효한 염색체(용량 제약을 만족하는 순열)을 pop_size만큼 생성
    while len(chromosomes) < pop_size:
        candidate = town_indices[:]
        random.shuffle(candidate)
        if is_feasible(candidate, demands):
            chromosomes.append(candidate)
    return chromosomes

def tournament_selection(pop, fitness, k=TOURNAMENT_SIZE):
    # 토너먼트 선택
    selected = random.sample(list(zip(pop, fitness)), k)
    selected.sort(key=lambda x: x[1])
    return selected[0][0]

def crossover(parent1, parent2):
    # 순서 교차(Ordered Crossover, OX)
    size = len(parent1)
    a, b = sorted(random.sample(range(size), 2))
    child = [None]*size

    # 부모1의 a~b 구간을 자식에 복사
    for i in range(a, b+1):
        child[i] = parent1[i]

    # 나머지 위치를 부모2의 순서를 기준으로 채움
    p2_pointer = 0
    for i in range(size):
        if child[i] is None:
            while parent2[p2_pointer] in child:
                p2_pointer += 1
            child[i] = parent2[p2_pointer]
    return child

def mutate(route):
    # 단순 스왑(swap) 변이
    if random.random() < MUTATION_RATE:
        i, j = random.sample(range(len(route)), 2)
        route[i], route[j] = route[j], route[i]

# 메인 demands 딕셔너리
demands = {i: points[i][3] for i in range(N)}

In [10]:
def three_opt(route, dist_matrix, demands):
    """
    3-opt 지역 탐색을 적용하여 경로를 최적화.
    경로를 세 구간으로 분할한 뒤, 재연결하는 여러 방식을 시도하여
    더 짧은 경로가 있는지 탐색.
    """
    def calculate_cost(route):
        return compute_total_distance(route, dist_matrix, demands)
    
    def swap_segments(route, i, j, k):
        # 3개 구간으로 분할: [0:i], [i:j], [j:k], [k:end]
        A, B, C, D = route[:i], route[i:j], route[j:k], route[k:]
        # 3-opt로 가능한 모든 재연결 방식
        return [
            A + B + C + D,        # 원본 그대로
            A + B[::-1] + C + D,  # B 구간 뒤집기
            A + B + C[::-1] + D,  # C 구간 뒤집기
            A + C + B + D,        # B와 C 교체
            A + C[::-1] + B + D,  # B와 C 교체 후 C 뒤집기
            A + B[::-1] + C[::-1] + D,  # B와 C 모두 뒤집기
            A + C + B[::-1] + D,  # B와 C 교체 후 B 뒤집기
            A + C[::-1] + B[::-1] + D   # B와 C 교체 후 모두 뒤집기
        ]

    best_route = route[:]
    best_cost = calculate_cost(best_route)
    improved = True

    while improved:
        improved = False
        for i in range(1, len(route) - 2):
            for j in range(i + 1, len(route) - 1):
                for k in range(j + 1, len(route)):
                    # 가능한 모든 3-opt 후보 생성
                    new_routes = swap_segments(best_route, i, j, k)
                    for new_route in new_routes:
                        new_cost = calculate_cost(new_route)
                        if new_cost < best_cost and is_feasible(new_route, demands):
                            best_route = new_route
                            best_cost = new_cost
                            improved = True
                            break  # 첫 번째 개선점 발견 시 즉시 종료
                if improved:
                    break
            if improved:
                break
    return best_route

### multi-thread

In [15]:
from joblib import Parallel, delayed
from tqdm import tqdm

def create_offspring(population, fitness_values, dist_matrix, demands):
    # 선택
    p1 = tournament_selection(population, fitness_values, TOURNAMENT_SIZE)
    p2 = tournament_selection(population, fitness_values, TOURNAMENT_SIZE)
    # 교차
    child = crossover(p1, p2)
    # 변이
    mutate(child)
    # 지역 탐색(3-opt)
    child = three_opt(child, dist_matrix, demands)
    # 유효성(용량 제약) 검사
    if is_feasible(child, demands):
        return child
    return None  # 유효하지 않으면 None 반환

In [None]:
# =====================
# Genetic Algorithm
# =====================
population = initial_population(POP_SIZE, N)
fitness_values = [compute_total_distance(ind, dist_matrix, demands) for ind in population]

best_solution = population[0]
best_cost = fitness_values[0]
best_iter = 0

pbar = tqdm(range(NUM_GENERATIONS))
for g in pbar:
    new_pop = []
    # 엘리트 보존(Elitism) - 현재 세대에서 최적 해를 그대로 유지
    elite_idx = fitness_values.index(min(fitness_values))
    new_pop.append(population[elite_idx][:])

    # 병렬적으로 새로운 자손 생성
    remaining_size = POP_SIZE - len(new_pop)
    offspring = Parallel(n_jobs=-1)(
        delayed(create_offspring)(population, fitness_values, dist_matrix, demands) 
        for _ in range(remaining_size)
    )

    # 유효한 자손만 추가
    new_pop.extend(child for child in offspring if child is not None)

    population = new_pop
    fitness_values = [compute_total_distance(ind, dist_matrix, demands) for ind in population]

    current_best = min(fitness_values)
    if current_best < best_cost:
        best_cost = current_best
        best_solution = population[fitness_values.index(current_best)]
        best_iter = g

    pbar.set_description(f"Generation {g+1}: Best Iter = {best_iter} Best Distance = {best_cost:.4f}")

### Results

In [None]:
# in AMD Ryzen Threadripper PRO 7995WX (X 2)
"""
POP_SIZE = 200
NUM_GENERATIONS = 300
MUTATION_RATE = 0.2
"""
# 2172.2818512535173, Best Iter = 224
# answer = [
# [47, 6, 8, 72, 66, 71, 53, 32], 
# [34, 48, 18, 17, 61, 42, 9, 73], 
# [35, 46, 2, 4, 3, 74, 5, 1], 
# [15, 14, 21, 24, 16], 
# [13, 41, 31, 12], 
# [44, 64, 43, 23, 20, 22], 
# [40, 25, 75, 52, 59, 70, 51, 7], 
# [33, 60, 67, 55, 69, 57, 19], 
# [58, 63, 28, 27, 26, 49, 45], 
# [68, 37, 62, 54, 30], 
# [29, 36, 65, 39, 38, 50, 56], 
# [11, 10]]

In [None]:
answer = route_to_subtours(best_solution, demands)

In [None]:
output = []

for i in range(len(answer)):
    output.append("DEPOT")
    for j in range(len(answer[i])):
        output.append(f"TOWN_{answer[i][j]:02d}")

output.append("DEPOT")

In [None]:
import pandas as pd

ss = pd.read_csv("sample_submission.csv")

ss["point_id"] = output

ss.to_csv("genetic_7995WXx2.csv", index=False)