In [6]:
import requests
import os
import csv
from ortools.constraint_solver import routing_enums_pb2
from ortools.constraint_solver import pywrapcp

In [2]:

import random
import pandas as pd
from secrets_manager import get_secret_key

# 카카오 REST API 키 (유준형)
API_KEY = get_secret_key()

# 거리 행렬 파일 이름
DISTANCE_MATRIX_FILE = 'distance_matrix.csv'

# 주소 리스트: 엑셀 파일 읽기
file_path = 'guessed_trash.xlsx'
locations = pd.read_excel(f"{file_path}", engine='openpyxl').to_dict("records")
# locations = [
#     {'address': '0', 'Latitude': 36.420924, 'Longitude': 127.423353},
#     {'address': '1', 'Latitude': 36.315569, 'Longitude': 127.346243},
#     {'address': '2', 'Latitude': 36.310511, 'Longitude': 127.347090},
#     {'address': '3', 'Latitude': 36.313307, 'Longitude': 127.350245},
# ]

# 노드 수에 맞는 무작위 demands 리스트 생성
demands = [random.randrange(1,11) for i in range(len(locations))]

# 거리 행렬 생성 함수
def create_distance_matrix(locations):
    """카카오 지도 API를 사용하여 거리 행렬을 생성합니다."""
    distance_matrix = []

    headers = {
        'Authorization': f'KakaoAK {API_KEY}',
    }

    for i, origin in enumerate(locations):
        distance_row = []
        origin_coords = f"{origin['Longitude']},{origin['Latitude']}"
        for j, destination in enumerate(locations):
            if i == j:
                # 동일한 노드의 거리는 0으로 설정
                distance_row.append(0)
            else:
                destination_coords = f"{destination['Longitude']},{destination['Latitude']}"
                url = f"https://apis-navi.kakaomobility.com/v1/directions?origin={origin_coords}&destination={destination_coords}&priority=RECOMMEND"
                response = requests.get(url, headers=headers)
                result = response.json()

                # 응답에서 거리 값 추출
                if response.status_code == 200:
                    try:
                        distance = result['routes'][0]['summary']['distance']  # 거리(meter)
                        distance_row.append(distance)
                    except (KeyError, IndexError):
                        # 경로를 찾을 수 없는 경우 큰 값으로 설정
                        distance_row.append(float('inf'))
                else:
                    print(f"API 요청 오류: {response.status_code}")
                    return None
        distance_matrix.append(distance_row)

    return distance_matrix

# 거리 행렬 저장 함수
def save_distance_matrix(distance_matrix, filename):
    """거리 행렬을 CSV 파일로 저장합니다."""
    with open(filename, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        for row in distance_matrix:
            writer.writerow(row)

# 거리 행렬 로드 함수
def load_distance_matrix(filename):
    """CSV 파일에서 거리 행렬을 로드합니다."""
    distance_matrix = []
    with open(filename, 'r') as csvfile:
        reader = csv.reader(csvfile)
        for row in reader:
            # 문자열로 읽혀진 값을 숫자로 변환
            distance_row = []
            for value in row:
                if value == 'inf':
                    distance_row.append(float('inf'))
                else:
                    distance_row.append(float(value))
            distance_matrix.append(distance_row)
    return distance_matrix

# 데이터 모델 생성 함수 수정
import os

def create_data_model():
    """문제에 필요한 데이터를 저장"""
    data = {}
    
    # 거리 행렬 파일이 존재하면 로드하고, 크기를 확인
    if os.path.exists(DISTANCE_MATRIX_FILE):
        print("저장된 거리 행렬을 로드합니다...")
        saved_distance_matrix = load_distance_matrix(DISTANCE_MATRIX_FILE)
        if len(saved_distance_matrix) == len(locations) and all(len(row) == len(locations) for row in saved_distance_matrix):
            print("저장된 거리 행렬을 사용합니다...")
            data["distance_matrix"] = saved_distance_matrix
        else:
            print("거리 행렬의 크기가 현재의 위치 목록과 일치하지 않습니다. 거리 행렬을 다시 생성합니다...")
            data["distance_matrix"] = create_distance_matrix(locations)
            if data["distance_matrix"] is None:
                print("거리 행렬 생성에 실패했습니다.")
                exit(1)
            else:
                # 거리 행렬을 파일로 저장
                save_distance_matrix(data["distance_matrix"], DISTANCE_MATRIX_FILE)
                print(f"거리 행렬을 '{DISTANCE_MATRIX_FILE}' 파일로 저장했습니다.")
    else:
        print("거리 행렬을 생성합니다...")
        data["distance_matrix"] = create_distance_matrix(locations)
        if data["distance_matrix"] is None:
            print("거리 행렬 생성에 실패했습니다.")
            exit(1)
        else:
            # 거리 행렬을 파일로 저장
            save_distance_matrix(data["distance_matrix"], DISTANCE_MATRIX_FILE)
            print(f"거리 행렬을 '{DISTANCE_MATRIX_FILE}' 파일로 저장했습니다.")
    
    # 각 노드(고객)의 수요를 demands 리스트로 설정
    data["demands"] = demands
    
    # 각 차량의 용량
    data["vehicle_capacities"] = [500]
    
    # 차량 수
    data["num_vehicles"] = 1
    
    # 출발점 (디포)
    data["depot"] = 0
    return data


# 솔루션을 출력하는 함수 (변경 없음)
def print_solution(data, manager, routing, solution):
    """해결된 경로를 콘솔에 출력"""
    print(f"Objective: {solution.ObjectiveValue()}")
    total_distance = 0
    total_load = 0

    # 각 차량에 대해 경로 출력
    for vehicle_id in range(data["num_vehicles"]):
        index = routing.Start(vehicle_id)
        plan_output = f"Vehicle {vehicle_id}의 경로:\n"
        route_distance = 0
        route_load = 0

        # 경로의 각 노드(고객)에 대해 반복
        while not routing.IsEnd(index):
            node_index = manager.IndexToNode(index)
            route_load += data["demands"][node_index]
            plan_output += f" {node_index} Load({route_load}) -> "
            previous_index = index
            index = solution.Value(routing.NextVar(index))
            if routing.GetArcCostForVehicle(previous_index, index, vehicle_id) is not None:
                route_distance += routing.GetArcCostForVehicle(previous_index, index, vehicle_id)

        plan_output += f" {manager.IndexToNode(index)} Load({route_load})\n"
        plan_output += f"경로 거리: {route_distance}m\n"
        plan_output += f"경로 적재량: {route_load}\n"
        print(plan_output)
        total_distance += route_distance
        total_load += route_load

    # 전체 경로 거리와 적재량 출력
    print(f"모든 경로의 총 거리: {total_distance}m")
    print(f"모든 경로의 총 적재량: {total_load}")

# 메인 함수
def main():
    """CVRP 문제 해결"""
    # 데이터 모델 초기화
    data = create_data_model()

    # 거리 행렬이 제대로 생성되었는지 확인
    if data["distance_matrix"] is None:
        print("거리 행렬 생성에 실패했습니다.")
        return

    # 라우팅 인덱스 매니저 생성
    manager = pywrapcp.RoutingIndexManager(len(data["distance_matrix"]), data["num_vehicles"], data["depot"])

    # 라우팅 모델 생성
    routing = pywrapcp.RoutingModel(manager)

    # 거리 계산 콜백 함수 정의 및 등록
    def distance_callback(from_index, to_index):
        """두 노드 간의 거리를 반환"""
        from_node = manager.IndexToNode(from_index)
        to_node = manager.IndexToNode(to_index)
        return int(data["distance_matrix"][from_node][to_node])

    transit_callback_index = routing.RegisterTransitCallback(distance_callback)

    # 각 호(arc)의 비용 설정
    routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)

    # 용량 제약 추가
    def demand_callback(from_index):
        """각 노드의 수요를 반환"""
        from_node = manager.IndexToNode(from_index)
        return data["demands"][from_node]

    demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)
    routing.AddDimensionWithVehicleCapacity(
        demand_callback_index,
        0,  # 용량 여유 없음
        data["vehicle_capacities"],  # 차량 최대 용량
        True,  # 누적값을 0에서 시작
        "Capacity",
    )

    # 첫 솔루션 탐색 전략 설정
    search_parameters = pywrapcp.DefaultRoutingSearchParameters()
    search_parameters.first_solution_strategy = routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC
    search_parameters.local_search_metaheuristic = routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH
    search_parameters.time_limit.FromSeconds(30)  # 시간을 늘려 더 나은 솔루션 탐색

    # 문제 해결
    solution = routing.SolveWithParameters(search_parameters)

    # 솔루션이 있으면 출력
    if solution:
        print_solution(data, manager, routing, solution)
    else:
        print('솔루션 없음')

# 스크립트가 실행되면 main 함수를 호출
if __name__ == "__main__":
    main()


FileNotFoundError: [Errno 2] No such file or directory: 'guessed_trash.xlsx'

In [10]:
import requests
import folium

# 카카오 API 키 설정 (발급받은 API 키를 입력하세요)
API_KEY = get_secret_key()

# 시점과 종점, 경유지의 위도와 경도 설정 : 오성알씨
start = (36.420924, 127.423353)
end = (36.420924, 127.423353)

# 엑셀 파일 읽기
file_path = 'guessed_trash.xlsx'
address = pd.read_excel(f"../docs/LatLon/{file_path}", engine='openpyxl')
waypoints = [(lat, lon) for lat, lon in zip(address['latitude'], address['longitude'])]
print(waypoints)
# waypoints = [
#     (36.315569, 127.346243),     # 경유지 1
#     (36.310511, 127.347090),     # 경유지 2
#     (36.312989, 127.350407),     # 경유지 3
# ]

# 카카오 경로 탐색 API 요청 URL
url = "https://apis-navi.kakaomobility.com/v1/directions"

# 경로 탐색 API 요청
params = {
    'origin': f"{start[1]},{start[0]}",  # 시점 (경도, 위도)
    'destination': f"{end[1]},{end[0]}",  # 종점 (경도, 위도)
    'waypoints': '|'.join([f"{wp[1]},{wp[0]}" for wp in waypoints]),  # 경유지 (경도, 위도)
    'priority': 'RECOMMEND'  # 최적 경로
}
headers = {
    "Authorization": f"KakaoAK {API_KEY}"
}

response = requests.get(url, headers=headers, params=params)
data = response.json()

# folium 지도 초기화 (시작점을 중심으로)
m = folium.Map(location=start, zoom_start=14)

# 시점, 경유지, 종점에 마커 추가
folium.Marker(location=start, popup="Start", icon=folium.Icon(color="green")).add_to(m)
folium.Marker(location=end, popup="End", icon=folium.Icon(color="red")).add_to(m)
for idx, waypoint in enumerate(waypoints, start=1):
    folium.Marker(location=waypoint, popup=f"Waypoint {idx}", icon=folium.Icon(color="blue")).add_to(m)

# 경로 그리기
if data.get('routes'):
    for route in data['routes']:
        for section in route['sections']:
            for road in section['roads']:
                # 경로 좌표 가져오기
                polyline = road['vertexes']
                # 좌표 리스트를 folium에 추가
                coordinates = [(polyline[i+1], polyline[i]) for i in range(0, len(polyline), 2)]
                folium.PolyLine(coordinates, color="blue", weight=5, opacity=0.7).add_to(m)

# 지도를 HTML 파일로 저장
m.save('vehicle_route_kakao.html')

# 결과 출력
print("지도가 'vehicle_route_kakao.html' 파일로 저장되었습니다. 파일을 열어 경로를 확인하세요.")


KeyError: 'latitude'