In [None]:
# 0. Colab 파일 업로드 (한 번만 필요)
from google.colab import files
print("필요 파일 업로드: metro JSON(data-metro-station-1.0.0.json,data-metro-line-1.0.0.json), bus Excel(bus_stops_seoul.xlsx,bus_route.xlsx) 파일을 올려주세요.")
uploaded = files.upload()

필요 파일 업로드: metro JSON(data-metro-station-1.0.0.json,data-metro-line-1.0.0.json), bus Excel(bus_stops_seoul.xlsx,bus_route.xlsx) 파일을 올려주세요.


Saving bus_route.xlsx to bus_route.xlsx
Saving bus_stops_seoul.xlsx to bus_stops_seoul.xlsx
Saving data-metro-line-1.0.0.json to data-metro-line-1.0.0.json
Saving data-metro-station-1.0.0.json to data-metro-station-1.0.0.json


In [None]:
!pip install folium pandas networkx osmnx scipy geopy openpyxl cch

Collecting folium
  Downloading folium-0.19.6-py2.py3-none-any.whl.metadata (4.1 kB)
Collecting osmnx
  Downloading osmnx-2.0.3-py3-none-any.whl.metadata (4.9 kB)
Collecting geopy
  Downloading geopy-2.4.1-py3-none-any.whl.metadata (6.8 kB)
Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting cch
  Downloading cch-0.1.19-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting branca>=0.6.0 (from folium)
  Downloading branca-0.8.1-py3-none-any.whl.metadata (1.5 kB)
Collecting xyzservices (from folium)
  Downloading xyzservices-2025.4.0-py3-none-any.whl.metadata (4.3 kB)
Collecting geopandas>=1.0 (from osmnx)
  Downloading geopandas-1.1.0-py3-none-any.whl.metadata (2.3 kB)
Collecting shapely>=2.0 (from osmnx)
  Downloading shapely-2.1.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Collecting geographiclib<3,>=1.52 (from geopy)
  Downloading geographiclib-2.0-py3-none-any.whl.metadata (1.4 kB)
Collecting et-xmlfile (f

In [None]:
# 1. 라이브러리 설치 및 import
import json
import os
import pandas as pd
import networkx as nx
import osmnx as ox
import numpy as np
from osmnx.distance import nearest_nodes
from scipy.spatial import cKDTree
from geopy.geocoders import Nominatim
from itertools import combinations, islice
from math import radians, sin, cos, sqrt, asin
from multiprocessing import Pool
import cch
from concurrent.futures import ProcessPoolExecutor
from multiprocessing.pool import ThreadPool
from functools import lru_cache
from collections import defaultdict

In [None]:
# 전역 변수 (버스 병렬 계산 및 좌표 캐시)
STOP_TO_ROAD = {}
STOP_COORD   = {}
ROAD_COORD   = {}
AVG_SPEED_KMH_GLOBAL = 18.0

In [None]:
# 2 유틸리티 함수 정의
def haversine(lat1, lon1, lat2, lon2):
    """두 지점 간 대원거리 계산 (미터)"""
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat, dlon = lat2 - lat1, lon2 - lon1
    a = sin(dlat/2)**2 + cos(lat1)*cos(lat2)*sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    return 6371000 * c

# -------------------- 1. 빠른 도로 최단거리 함수 (lru_cache) --------------------
@lru_cache(maxsize=200_000)
def fast_shortest_path(u_rg, v_rg):
    try:
        return nx.shortest_path_length(road_graph, u_rg, v_rg, weight="length")
    except Exception:
        lat1, lon1 = ROAD_COORD[u_rg]
        lat2, lon2 = ROAD_COORD[v_rg]
        return haversine(lat1, lon1, lat2, lon2)

In [None]:
# ----------------------------------------------
# 1. 경계(버스정류장,지하철,추가 POI 모두 포함) 계산
# ----------------------------------------------
def get_combined_bounding_box(bus_excel, metro_json, margin=0.02):
    # 버스 좌표 읽기
    bus_df = pd.read_excel(bus_excel)
    lats = list(bus_df['Y좌표'].astype(float))
    lons = list(bus_df['X좌표'].astype(float))

    # 지하철 JSON 읽기
    with open(metro_json, encoding='utf-8') as f:
        metro_data = json.load(f)
    # lat/lon 컬럼명 확인: 'lat'/'lng' 혹은 'latitude'/'longitude'
    for s in metro_data:
        # 'lat', 'lng' 모두 있는 경우만 추가
        if 'lat' in s and 'lng' in s:
            lats.append(float(s['lat']))
            lons.append(float(s['lng']))
        # (필요시 추가: 'latitude', 'longitude' 등)

    lat_min, lat_max = min(lats) - margin, max(lats) + margin
    lon_min, lon_max = min(lons) - margin, max(lons) + margin
    return (lat_min, lat_max, lon_min, lon_max)

# 사용 예시
bbox = get_combined_bounding_box('bus_stops_seoul.xlsx', 'data-metro-station-1.0.0.json', margin=0.02)
print("경계 BBOX:", bbox)



경계 BBOX: (37.4105199435, 37.710177, 126.43723, 127.201785)


In [None]:
def get_road_graphs(bbox):
    print(f"BBOX: {bbox}")
    # 1. 차량용(버스 등)
    print("[1/2] OSM drive 네트워크 다운로드...")
    road_graph_drive = ox.graph_from_bbox(
        bbox,
        network_type="drive",
        simplify=True
    )
    print("  (drive) 노드:", road_graph_drive.number_of_nodes(),
          "엣지:", road_graph_drive.number_of_edges())

    # 2. 보행자용(환승 등)
    print("[2/2] OSM walk 네트워크 다운로드...")
    road_graph_walk = ox.graph_from_bbox(
        bbox,
        network_type="walk",
        simplify=True
    )
    print("  (walk) 노드:", road_graph_walk.number_of_nodes(),
          "엣지:", road_graph_walk.number_of_edges())

    return road_graph_drive, road_graph_walk


In [None]:
# ---- 좌표별 OSM 노드 매핑 함수 (drive + walk 동시 반환) ----
def map_stops_to_osm_nodes(stops_list, road_graph_drive, road_graph_walk):
    """
    stops_list: [(id, lat, lon), ...]
    반환: {id: {'drive': osm_node, 'walk': osm_node, 'lat':..., 'lon':...}, ...}
    """
    mapping = {}
    for stop_id, lat, lon in stops_list:
        try:
            node_drive = nearest_nodes(road_graph_drive, lon, lat)
        except Exception:
            node_drive = None
        try:
            node_walk = nearest_nodes(road_graph_walk, lon, lat)
        except Exception:
            node_walk = None
        mapping[stop_id] = {
            'drive': node_drive,
            'walk': node_walk,
            'lat': lat,
            'lon': lon
        }
    return mapping

In [None]:
# ----------------- 4. 빠른 도로 최단거리 함수 -----------------
def fast_shortest_path_maker(G, coord_dict):
    @lru_cache(maxsize=200_000)
    def f(u, v):
        try:
            return nx.shortest_path_length(G, u, v, weight="length")
        except Exception:
            lat1, lon1 = coord_dict[u]
            lat2, lon2 = coord_dict[v]
            return haversine(lat1, lon1, lat2, lon2)
    return f

In [None]:
def make_stops_list(bus_excel, metro_json):
    bus_df = pd.read_excel(bus_excel)
    stops = []
    for _, r in bus_df.iterrows():
        stops.append( (f"bus_{r['정류장ID']}", float(r['Y좌표']), float(r['X좌표'])) )
    with open(metro_json, encoding='utf-8') as f:
        metro = json.load(f)
    for s in metro:
        stops.append( (f"metro_{s['station_cd']}", float(s['lat']), float(s['lng'])) )
    return stops

In [None]:
# 사용 예시
# 1. BBOX, 도로그래프 다운로드
bbox = get_combined_bounding_box('bus_stops_seoul.xlsx', 'data-metro-station-1.0.0.json', margin=0.02)
road_graph_drive, road_graph_walk = get_road_graphs(bbox)

# 2. 정류장/역 좌표 리스트화 & OSM 노드 매핑
stops_list = make_stops_list('bus_stops_seoul.xlsx', 'data-metro-station-1.0.0.json')
STOP_OSM_NODE = map_stops_to_osm_nodes(stops_list, road_graph_drive, road_graph_walk)

# --- 예시 출력
for sid, info in list(STOP_OSM_NODE.items())[:5]:
    print(f"{sid}: drive={info['drive']} walk={info['walk']} 좌표=({info['lat']:.5f},{info['lon']:.5f})")

BBOX: (37.4105199435, 37.710177, 126.43723, 127.201785)
[1/2] OSM drive 네트워크 다운로드...
  (drive) 노드: 1 엣지: 0
[2/2] OSM walk 네트워크 다운로드...
  (walk) 노드: 1 엣지: 0


KeyError: '정류장ID'

지하철 그래프

In [None]:
def build_metro_graph(station_json, line_json, speed_by_line):
    global station_to_road  # 전역 선언을 함수 맨 위에 둡니다.

    # 3-1) JSON 로드
    stations = json.load(open(station_json, 'r', encoding='utf-8'))['DATA']
    lines    = json.load(open(line_json,    'r', encoding='utf-8'))['DATA']

    # 3-2) station_map 생성
    station_map = {
        s['station_cd']: {
            'name': s.get('name', s.get('station_nm')),
            'lat': float(s['lat']),
            'lng': float(s['lng'])
        }
        for s in stations
    }

    # 3-3) 역 ↔ 도로 노드 매핑 캐시 (여기서 station_to_road에 할당)
    ids   = list(station_map.keys())
    lats  = [station_map[s]['lat'] for s in ids]
    lons  = [station_map[s]['lng'] for s in ids]
    road_nodes = nearest_nodes(road_graph, lons, lats)
    station_to_road = dict(zip(ids, road_nodes))  # 전역 변수에 값 할당

    Gm = nx.Graph()

    # 3-4) 실제 구간 가중치(도로망 기준 시간)으로 노드·간선 추가
    for info in lines:
        raw_line = info['line']
        line = f"{raw_line}호선"                       # 예: "2" → "2호선"
        spd = speed_by_line.get(raw_line, 30.0)


        for seg in info.get('node', []):
            a, b = seg['station']
            na, nb = station_map[a['station_cd']], station_map[b['station_cd']]
            # 역 이름을 가져와서 라벨 생성
            u, v = f"{line}_{na['name']}", f"{line}_{nb['name']}"
            Gm.add_node(u, station=na['name'], line=line, lat=na['lat'], lng=na['lng'])
            Gm.add_node(v, station=nb['name'], line=line, lat=nb['lat'], lng=nb['lng'])

            u_rg = station_to_road[a['station_cd']]
            v_rg = station_to_road[b['station_cd']]
            # 여기! lru_cache 적용
            try:
                dist_m = fast_shortest_path(u_rg, v_rg)
            except Exception:
                dist_m = haversine(na['lat'], na['lng'], nb['lat'], nb['lng'])
            t_min = dist_m / 1000 / spd * 60
            Gm.add_edge(u, v, weight=t_min)


    # 3-5) 같은 역 이름끼리 환승 간선 추가
    from collections import defaultdict
    st2nodes = defaultdict(list)
    for n, d in Gm.nodes(data=True):
        st2nodes[d['station']].append(n)

    for nodes in st2nodes.values():
        if len(nodes) < 2:
            continue
        for u, v in combinations(nodes, 2):
            walk_m = haversine(Gm.nodes[u]['lat'], Gm.nodes[u]['lng'],
                               Gm.nodes[v]['lat'], Gm.nodes[v]['lng'])
            wt = walk_m / 1000 / 5 * 60 + 2
            Gm.add_edge(u, v, weight=wt, lines={'transfer'})
            Gm.add_edge(v, u, weight=wt, lines={'transfer'})

    return Gm


In [None]:
# =============================================================================
# (1) compute_bus_edge: 병렬 처리용 엣지 가중치 계산 함수
#
#   task: (u_lbl, v_lbl, lines, ars_id_u, ars_id_v)
#     - u_lbl, v_lbl: "노선명_정류소명" 형태의 문자열 노드 라벨
#     - lines: 해당 구간을 지나는 버스 노선명들의 set
#     - ars_id_u, ars_id_v: 각각의 u, v 노드가 속한 정류소 ARS_ID (정수)
#
#   반환: (u_lbl, v_lbl, t_min, lines)
#     - t_min: u_lbl → v_lbl 구간의 버스 이동 시간(분)
# =============================================================================
def compute_bus_edge(task):
    u_lbl, v_lbl, lines, ars_id_u, ars_id_v = task
    key_u = f"bus_{ars_id_u}"
    key_v = f"bus_{ars_id_v}"
    u_rg = STOP_TO_ROAD[key_u]
    v_rg = STOP_TO_ROAD[key_v]
    dist_m = fast_shortest_path(u_rg, v_rg)
    t_min = dist_m / 1000 / AVG_SPEED_KMH_GLOBAL * 60
    return (u_lbl, v_lbl, t_min, lines)

In [None]:
# =============================================================================
# (2) build_bus_graph_optimized: 노드를 "<노선명>_<정류소명>" 문자열로 생성
#
#   - stops_excel: 버스 정류장 정보 엑셀
#         (NODE_ID, ARS_ID, 정류소명, X좌표, Y좌표, 정류소타입 컬럼 포함)
#   - routes_excel: 버스 노선 정보 엑셀
#         (ROUTE_ID, 노선명, 순번, NODE_ID, ARS_ID, 정류소명, X좌표, Y좌표 컬럼 포함)
#
#   반환: Gb (NetworkX Graph)
#     - 노드 라벨: "<노선명>_<정류소명>"
#     - 엣지 'weight': 버스 이동 시간(분) 또는 보행환승 시간(5분)
#     - 엣지 'lines': 해당 구간을 지나는 버스 노선명 세트 또는 {'walk'}
#   ※ stops_df에 없는 ARS_ID 구간은 전부 건너뜁니다.
# =============================================================================
# 5. 메인 그래프 빌드
def build_bus_graph_optimized(stops_excel, routes_excel, avg_speed_kmh=18.0):
    global STOP_TO_ROAD, STOP_COORD, AVG_SPEED_KMH_GLOBAL
    AVG_SPEED_KMH_GLOBAL = avg_speed_kmh

    stops_df  = pd.read_excel(stops_excel)
    routes_df = pd.read_excel(routes_excel)

    ars_to_coord = {}
    for _, row in stops_df.iterrows():
        ars = int(row["ARS_ID"])
        ars_to_coord[ars] = (row['Y좌표'], row['X좌표'])

    all_ars = list(ars_to_coord.keys())
    all_lons = [ars_to_coord[ars][1] for ars in all_ars]
    all_lats = [ars_to_coord[ars][0] for ars in all_ars]
    road_nodes = ox.nearest_nodes(road_graph, all_lons, all_lats)

    STOP_COORD = {}
    STOP_TO_ROAD = {}
    for idx, ars in enumerate(all_ars):
        key = f"bus_{ars}"
        STOP_COORD[key] = ars_to_coord[ars]
        STOP_TO_ROAD[key] = road_nodes[idx]

    Gb = nx.Graph()
    for _, row in routes_df.iterrows():
        if pd.isna(row["ARS_ID"]):
            continue
        ars = int(row["ARS_ID"])
        key_base = f"bus_{ars}"
        if key_base not in STOP_TO_ROAD:
            continue
        route_name = str(row["노선명"]).strip()
        stop_name  = str(row["정류소명"]).strip()
        lbl = f"{route_name}_{stop_name}"
        if lbl not in Gb.nodes:
            Gb.add_node(lbl,
                        ars_id = ars,
                        route  = route_name,
                        name   = stop_name,
                        coord  = (row['Y좌표'], row['X좌표'])
                       )

    edge_lines = defaultdict(set)
    for route_name, grp in routes_df.sort_values(['노선명', '순번']).groupby('노선명'):
        prev_lbl = None
        for _, row in grp.iterrows():
            if pd.isna(row["ARS_ID"]):
                continue
            curr_ars = int(row["ARS_ID"])
            key_base = f"bus_{curr_ars}"
            if key_base not in STOP_TO_ROAD:
                continue
            curr_stop_name = str(row["정류소명"]).strip()
            curr_lbl = f"{route_name}_{curr_stop_name}"
            if prev_lbl is not None and prev_lbl in Gb.nodes and curr_lbl in Gb.nodes:
                edge_lines[(prev_lbl, curr_lbl)].add(route_name)
            prev_lbl = curr_lbl

    tasks = []
    for (u_lbl, v_lbl), lines in edge_lines.items():
        ars_u = Gb.nodes[u_lbl]['ars_id']
        ars_v = Gb.nodes[v_lbl]['ars_id']
        tasks.append((u_lbl, v_lbl, lines, ars_u, ars_v))

    with ThreadPool() as pool:
        for u_lbl, v_lbl, t_min, lines in pool.imap_unordered(compute_bus_edge, tasks, chunksize=200):
            Gb.add_edge(u_lbl, v_lbl, weight=t_min, lines=set(lines))

    # 보행 환승 엣지 추가
    TRANSFER_RADIUS_M = 500
    WALK_PENALTY     = 5.0
    coord_list = [ars_to_coord[ars] for ars in all_ars]
    tree       = cKDTree(coord_list)
    pairs = tree.query_ball_tree(tree, r=TRANSFER_RADIUS_M)

    arsid_to_nodes = defaultdict(list)
    for lbl, d in Gb.nodes(data=True):
        arsid_to_nodes[d['ars_id']].append(lbl)

    for i, nbrs in enumerate(pairs):
        ars_i = all_ars[i]
        for j in nbrs:
            if j <= i:
                continue
            ars_j = all_ars[j]
            for u_node in arsid_to_nodes[ars_i]:
                for v_node in arsid_to_nodes[ars_j]:
                    if Gb.has_edge(u_node, v_node):
                        Gb.edges[u_node, v_node]['lines'].add('walk')
                        Gb.edges[u_node, v_node]['weight'] = min(
                            Gb.edges[u_node, v_node]['weight'], WALK_PENALTY
                        )
                    else:
                        Gb.add_edge(u_node, v_node, weight=WALK_PENALTY, lines={'walk'})

    return Gb


In [None]:
def main():
    stops_excel  = 'bus_stops_seoul.xlsx'
    routes_excel = 'bus_route.xlsx'

    print("정류장 범위에서 bbox 산출...")
    lat_min, lat_max, lon_min, lon_max = get_bounding_box(stops_excel, margin=0.02)
    print(f"BBOX: ({lat_min:.5f}, {lat_max:.5f}, {lon_min:.5f}, {lon_max:.5f})")

    print("작은 bbox로 도로망 추출 중...")
    global road_graph, ROAD_COORD

    bbox = (lat_max, lat_min, lon_max, lon_min)
    road_graph = osmnx.graph.graph_from_bbox(
        bbox,
        network_type='drive'
    )

    print(f"도로망 노드: {road_graph.number_of_nodes():,}, 엣지: {road_graph.number_of_edges():,}")

    ROAD_COORD = {n:(data['y'], data['x']) for n, data in road_graph.nodes(data=True)}

    print("버스 그래프 생성중...")
    Gb = build_bus_graph_optimized(stops_excel, routes_excel)
    print(f"버스 그래프 노드: {Gb.number_of_nodes():,}, 엣지: {Gb.number_of_edges():,}")

    return Gb

Gb = main()


정류장 범위에서 bbox 산출...
BBOX: (37.41052, 37.71018, 126.43723, 127.20179)
작은 bbox로 도로망 추출 중...
도로망 노드: 21, 엣지: 37
버스 그래프 생성중...


NameError: name 'defaultdict' is not defined

In [None]:
def get_top_k_routes(G, origin, dest, k=30, max_transfers=3):
    paths = nx.shortest_simple_paths(G, origin, dest, weight='weight')
    routes = []
    for path in paths:
        used_lines = []
        prev_line = None
        transfer_count = 0
        valid = True

        for node in path:
            ln = G.nodes[node].get('line')
            # 환승이 발생한 순간
            if prev_line is not None and ln != prev_line:
                transfer_count += 1
                # 1) 최대 환승 초과
                if transfer_count > max_transfers:
                    valid = False
                    break
                # 2) 이미 탄 호선으로 재진입 금지
                if ln in used_lines:
                    valid = False
                    break
            used_lines.append(ln)
            prev_line = ln

        if not valid:
            continue

        total_time = sum(G[u][v]['weight'] for u, v in zip(path, path[1:]))
        routes.append({
            'path': path,
            'time_min': total_time,
            'transfers': transfer_count
        })

        if len(routes) >= k:
            break

    return routes


In [None]:
# 5. 그래프 빌드에 앞서 미리 도로망을 로드해 두고 CH를 만듭니다.
print("도로망 그래프 로드 중...")
road_graph = ox.graph_from_place('Seoul, South Korea', network_type='drive')

try:
    ch_road = cch.ContractionHierarchy(road_graph)
except AttributeError:
    # cch가 없거나 CH 생성 실패 시 fallback
    class DummyCH:
        def __init__(self, G): self.G = G
        def distance(self, u, v):
            try:
                return nx.shortest_path_length(self.G, u, v, weight='length')
            except:
                # 거리 계산이 불가능하면 haversine fallback
                return haversine(*ROAD_COORD[u], *ROAD_COORD[v])
    ch_road = DummyCH(road_graph)

# 도로 노드 좌표 캐시 (haversine fallback 시 사용)
ROAD_COORD = {n:(data['y'], data['x']) for n, data in road_graph.nodes(data=True)}


도로망 그래프 로드 중...


In [None]:
# 1~9호선 및 기타 전철·경전철 노선별로 직접 지정
speed_by_line = {
    '1호선': 28.6, '2호선': 36.5, '3호선': 34.9, '4호선': 30.1,
    '5호선': 32.8, '6호선': 30.0, '7호선': 32.3, '8호선': 32.2,
    '9호선': 26.2,
    '인천1호선': 32.5, '인천2호선': 32.3,
    '경강선': 68.6, '경의중앙선': 41.5, '경춘선': 60.0,
    '공항철도': 66.7, '서해선': 41.0, '수인분당선': 41.1,
    '신분당선': 64.9, '신림선': 26.5, '우이신설선': 28.7,
    '김포골드라인': 45.0, '용인에버라인': 41.5,
    '의정부경전철': 28.2, 'GTXA': 31.1
}

In [None]:
Gm = build_metro_graph('data-metro-station-1.0.0.json','data-metro-line-1.0.0.json',speed_by_line)

In [None]:
Gb = build_bus_graph_optimized('bus_stops_seoul.xlsx','bus_route.xlsx')

In [None]:
print(f"지하철 Gm: 노드 {Gm.number_of_nodes()}개, 간선 {Gm.number_of_edges()}개")
print(f"버스 Gb: 노드 {Gb.number_of_nodes()}개, 간선 {Gb.number_of_edges()}개")
print("Gm 샘플:", list(Gm.nodes(data=True))[:5])
print("Gb 샘플:", list(Gb.nodes(data=True))[:5])

In [None]:
G_full = nx.Graph()
G_full = nx.compose(G_full, Gm)
G_full = nx.compose(G_full, Gb)
print(f"통합 G_full: 노드 {G_full.number_of_nodes()}개, 간선 {G_full.number_of_edges()}개")
print("G_full 샘플 노드:", list(G_full.nodes(data=True))[:5])

In [None]:
# (3-3) (선택) 실제로 “지하철 이동 엣지”와 “버스 이동 엣지”가 남아 있는지 샘플 출력
# ----------------------------------------------------------------------------
print("\n=== 지하철 이동 엣지 일부 예시 ===")
count = 0
for u, v, d in G_full.edges(data=True):
    if (
        u.startswith(tuple(f"{i}호선" for i in range(1,10))) and
        v.startswith(tuple(f"{i}호선" for i in range(1,10)))
    ):
        print(f"{u} → {v} : weight={d['weight']:.1f}분, lines={d.get('lines',{})}")
        count += 1
        if count >= 5:
            break

print("\n=== 버스 이동 엣지 일부 예시 ===")
count = 0
for u, v, d in G_full.edges(data=True):
    if ("호선" not in u.split("_")[0] and "호선" not in v.split("_")[0]):
        # “실제 버스 이동 시간”이 weight로 들어온 엣지 예시
        if 'walk' not in d.get('lines', set()):
            print(f"{u} → {v} : weight={d['weight']:.1f}분, lines={d.get('lines',{})}")
            count += 1
            if count >= 5:
                break

print("\n=== 기존 보행환승 엣지 일부 예시 ===")
count = 0
for u, v, d in G_full.edges(data=True):
    if 'walk' in d.get('lines', {}):
        print(f"{u} ↔ {v} : weight={d['weight']:.1f}분, lines={d['lines']}")
        count += 1
        if count >= 5:
            break

2) 메트로↔버스 환승(보행) 엣지 전부 연결 코드

In [None]:
from scipy.spatial import cKDTree

# 1) 메트로 노드와 버스 노드 각각 (라벨, (lat, lon)) 리스트 만들기
metro_nodes = [
    (n, (d['lat'], d['lng']))
    for n, d in Gm.nodes(data=True)
    # Gm 안에 'lat','lng'이 있는 메트로 노드만
]

bus_nodes = [
    (n, d['coord'])
    for n, d in Gb.nodes(data=True)
    # Gb 안에 'coord' = (lat, lon)으로 저장된 버스 노드만
]

# 2) KDTree 구축 (위도,경도를 그대로 사용하되, 반경을 대략 도 단위로 변환)
metro_coords = [coord for _, coord in metro_nodes]
bus_coords   = [coord for _, coord in bus_nodes]

# KDTree는 (위도, 경도) 배열을 받아서 반경 검색 가능
tree_bus = cKDTree(bus_coords)

# 3) 반경(500m)을 도(degree) 단위로 환산
#    위도·경도 1도 ≒ 111300m 이므로, 500m ≒ 500/111300 ≈ 0.00449도
TRANSFER_RADIUS_M = 500
radius_deg = TRANSFER_RADIUS_M / 111300.0

# 4) 메트로↔버스 간 환승 엣지 추가
for i, (metro_name, (mlat, mlon)) in enumerate(metro_nodes):
    # KDTree에 metro_coord로 쿼리: bus 쪽 인덱스 목록
    candidates = tree_bus.query_ball_point((mlat, mlon), r=radius_deg)
    for j in candidates:
        bus_name, (blat, blon) = bus_nodes[j]
        # 실제 거리(m)를 계산해서 500m 이하만 처리
        walk_m = haversine(mlat, mlon, blat, blon)
        if walk_m <= TRANSFER_RADIUS_M:
            # 보행 시간(분) 계산 (5km/h 기준)
            walk_t = walk_m / 1000 / 5 * 60
            # G_full에 양방향으로 추가
            G_full.add_edge(metro_name, bus_name, weight=walk_t, lines={'walk'})
            G_full.add_edge(bus_name, metro_name, weight=walk_t, lines={'walk'})


In [None]:
print("\n=== 메트로↔버스 보행 환승 엣지 확인 ===")
count = 0
for u, v, d in G_full.edges(data=True):
    if 'walk' in d.get('lines', {}):
        print(f"{u} <-> {v} : weight={d['weight']:.1f}분, lines={d['lines']}")
        count += 1
        if count >= 5:
            break


In [None]:
# 이후에 G_full를 위한 CH 생성
try:
    ch_full = cch.ContractionHierarchy(G_full)
except AttributeError:
    class DummyFullCH:
        def __init__(self, G): self.G = G
        def distance(self, u, v):
            try: return nx.shortest_path_length(self.G, u, v, weight='weight')
            except: return float('inf')
    ch_full = DummyFullCH(G_full)

In [None]:
# 8. User Input & Geocoding ===
geolocator = Nominatim(user_agent="route_planner")
def get_location(name):
    loc = geolocator.geocode(name, timeout=10)
    if not loc: loc = geolocator.geocode(f"{name}, Seoul", timeout=10)
    if not loc: raise ValueError(f"'{name}' not found.")
    return loc.latitude, loc.longitude
start_lat, start_lon = get_location(input("출발지 (예: 고려대학교): "))
end_lat,   end_lon   = get_location(input("도착지 (예: 한남힐스테이트): "))

출발지 (예: 고려대학교): 고려대학교
도착지 (예: 한남힐스테이트): 강남역


In [None]:
# --- (B) G_full 노드별 'road-node 인덱스' 미리 매핑해 두기 ---
metro_rg_map = {node: station_to_road[node.split('_')[0]]
                for node, data in Gm.nodes(data=True)}

bus_rg_map = {node: STOP_TO_ROAD[int(node)]
              for node, data in Gb.nodes(data=True)}

In [None]:
# --- (C) 출발지 / 도착지 각각 가장 가까운 도로망 노드 찾기 ---
start_rg = nearest_nodes(road_graph, start_lon, start_lat)
end_rg   = nearest_nodes(road_graph, end_lon,   end_lat)


In [None]:
# --- (D) 도로망 거리 기준으로 500m 이내인 노드 인덱스 뽑기 ---
# === 최적화: 한 번의 Dijkstra로 “500m 이내 도로망 거리”를 계산한 뒤, 노드별로 필터 ===

# (1) 도로망 그래프에 대해 단일 소스 Dijkstra 시행 (start_rg 기준, 컷오프 500m)
dist_from_start = nx.single_source_dijkstra_path_length(
    road_graph, start_rg, cutoff=500, weight='length'
)
# dist_from_start: {road_node: 거리(m), ...} 500m 이내만 남음

# (2) 도착지 기준도 마찬가지
dist_to_end = nx.single_source_dijkstra_path_length(
    road_graph, end_rg, cutoff=500, weight='length'
)

# (3) G_full 노드별 “도로망 노드”를 미리 매핑해두었으므로,
#     dist_from_start/ dist_to_end 딕셔너리의 키(road_node)에 속하는지로 바로 판별
nodes, coords = [], []
for node, data in G_full.nodes(data=True):
    if 'lat' in data and 'lng' in data:
        nodes.append(node)
        coords.append((data['lat'], data['lng']))
    elif 'coord' in data:
        nodes.append(node)
        coords.append((data['coord'][0], data['coord'][1]))

node_coords = np.array(coords)  # shape = (N,2)
N = len(nodes)

start_idxs, end_idxs = [], []

for i, node in enumerate(nodes):
    # (A) 도로망 노드 인덱스 찾기 (메트로 vs 버스)
    if node in metro_rg_map:
        node_rg = metro_rg_map[node]
    else:
        node_rg = bus_rg_map[node]

    # (B) dist_from_start 딕셔너리에 node_rg 키가 있으면 500m 이내
    if node_rg in dist_from_start:
        start_idxs.append(i)

    # (C) dist_to_end 딕셔너리에 node_rg 키가 있으면 500m 이내
    if node_rg in dist_to_end:
        end_idxs.append(i)

print("500m 내 탑승 후보 개수:", len(start_idxs))
print("500m 내 하차 후보 개수:", len(end_idxs))



500m 내 탑승 후보 개수: 18
500m 내 하차 후보 개수: 21


In [None]:
# 노드 이름(지하철 역명 또는 버스 정류소명) 출력 함수
def label(i):
    attrs = G_full.nodes[nodes[i]]
    return attrs.get('station', attrs.get('name', str(nodes[i])))

print("500m 내 탑승 샘플:", [label(i) for i in start_idxs[:5]])
print("500m 내 하차 샘플:", [label(i) for i in end_idxs[:5]])

500m 내 탑승 샘플: ['안암', '고려대', '제기동한신아파트앞', '고려대역.고대앞삼거리', '고려대역.고대앞삼거리']
500m 내 하차 샘플: ['강남', '강남', '신분당선강남역', '롯데칠성', '롯데칠성']


In [None]:
import concurrent.futures

# --- (E) 상위 K개 후보만 골라서 교차 비교 준비 ---
K = 10

# (E-1) 출발지→노드 도보 시간(분) 계산, 상위 K개 추출
walk_start_map = {}
for i in start_idxs:
    node_rg = metro_rg_map.get(nodes[i], bus_rg_map.get(nodes[i]))
    try:
        dist_m = ch_road.distance(start_rg, node_rg)
    except:
        lat_i, lon_i = node_coords[i]
        dist_m = haversine(start_lat, start_lon, lat_i, lon_i)
    walk_start_map[i] = dist_m / 5000 * 60  # 분 단위

top_start = sorted(walk_start_map.keys(), key=lambda i: walk_start_map[i])[:K]


In [None]:
# (E-2) 노드→도착지 도보 시간(분) 계산, 상위 K개 추출
walk_end_map = {}
for j in end_idxs:
    node_rg = metro_rg_map.get(nodes[j], bus_rg_map.get(nodes[j]))
    try:
        dist_m = ch_road.distance(node_rg, end_rg)
    except:
        lat_j, lon_j = node_coords[j]
        dist_m = haversine(lat_j, lon_j, end_lat, end_lon)
    walk_end_map[j] = dist_m / 5000 * 60

top_end = sorted(walk_end_map.keys(), key=lambda j: walk_end_map[j])[:K]


In [None]:
# === (F) 교통수단 경로 후보 생성 (K번의 Dijkstra로 최적화) ===

candidates = []

# top_start: 출발 후보 인덱스 리스트, top_end: 도착 후보 인덱스 리스트

for i in top_start:
    node_s = nodes[i]
    w0 = walk_start_map[i]

    # (F-1) node_s를 소스로 하여 G_full에서 한 번만 Dijkstra 시행
    lengths, paths = nx.single_source_dijkstra(
        G_full,
        source=node_s,
        weight='weight'
    )
    # lengths: {node: 최단거리(분), ...}
    # paths:   {node: [node_s, ..., node], ...} 형식의 경로 딕셔너리

    for j in top_end:
        node_t = nodes[j]
        if node_t not in lengths:
            # 연결된 경로가 없으면 건너뜀
            continue

        transit_time = lengths[node_t]
        w1 = walk_end_map[j]

        # (F-2) 환승 횟수 및 보행 합계 계산 (paths[node_t] 로부터)
        path = paths[node_t]  # [node_s, ..., node_t]
        transfers = 0
        walk_sum = 0.0
        for u, v in zip(path, path[1:]):
            d = G_full[u][v]
            if 'lines' in d and 'transfer' in d['lines']:
                transfers += 1
            if 'lines' in d and 'walk' in d['lines']:
                walk_sum += d['weight']

        total_time = w0 + transit_time + w1
        candidates.append({
            'node_s': node_s,
            'node_t': node_t,
            'total_time': total_time,
            'walk_start': w0,
            'transit': transit_time,
            'walk_end': w1,
            'transfers': transfers,
            'walk_sum': walk_sum
        })


In [None]:
candidates

[{'node_s': 2840,
  'node_t': 9421,
  'total_time': np.float64(6.4427963751651856),
  'walk_start': np.float64(1.442796375165186),
  'transit': 5,
  'walk_end': 0.0,
  'transfers': 0,
  'walk_sum': 5.0},
 {'node_s': 2840,
  'node_t': 9555,
  'total_time': np.float64(6.4427963751651856),
  'walk_start': np.float64(1.442796375165186),
  'transit': 5,
  'walk_end': 0.0,
  'transfers': 0,
  'walk_sum': 5.0},
 {'node_s': 2840,
  'node_t': 9215,
  'total_time': np.float64(9.447818433646942),
  'walk_start': np.float64(1.442796375165186),
  'transit': 5,
  'walk_end': np.float64(3.0050220584817566),
  'transfers': 0,
  'walk_sum': 5.0},
 {'node_s': 2840,
  'node_t': 9628,
  'total_time': np.float64(11.596663496349908),
  'walk_start': np.float64(1.442796375165186),
  'transit': 5,
  'walk_end': np.float64(5.153867121184723),
  'transfers': 0,
  'walk_sum': 5.0},
 {'node_s': 2840,
  'node_t': 9216,
  'total_time': np.float64(11.678684684768125),
  'walk_start': np.float64(1.442796375165186),
 

In [None]:
# --- (G) 정렬 & 상위 5개 추출 ---
# G1: "최단 시간" 상위 5 (오름차순)
by_time = sorted(candidates, key=lambda x: x['total_time'])[:5]

# G2: "최소 환승" 상위 5 (환승 먼저, tie-breaker: total_time)
by_transfers = sorted(candidates, key=lambda x: (x['transfers'], x['total_time']))[:5]

# G3: "최소 도보 합계" 상위 5 (walk_sum 먼저, tie-breaker: total_time)
by_walk = sorted(candidates, key=lambda x: (x['walk_sum'], x['total_time']))[:5]


In [None]:
# === (H) 결과 출력 (경로 라벨링 포함 버전) ===

def format_node_label(n):
    """
    G_full 내 노드 n이 지하철인지 버스인지 판별하여:
      - 지하철: "<호선>_<역명>"
      - 버스:    "버스_<정류장명>"
    """
    data = G_full.nodes[n]
    if 'line' in data and 'station' in data:
        # 지하철 노드
        line = data['line']
        station = data['station']
        return f"{line}_{station}"
    elif 'name' in data and 'coord' in data:
        # 버스 정류장 노드 (coord 속성이 있는 노드)
        return f"버스_{data['name']}"
    else:
        # 그 외 (예상치 못한 노드)
        return str(n)


def format_full_path(path):
    """
    한 후보 경로(path: 노드 리스트)에서
    - 노드 사이가 'transfer' 간선이면 "→[환승]→"
    - 'walk' 간선이면        "→[도보]→"
    - 그 외(버스/지하철 간선)면 "→"
    이런 식으로 표시하여 문자열로 반환
    """
    parts = [format_node_label(path[0])]
    for u, v in zip(path, path[1:]):
        edge_data = G_full[u][v]
        if 'lines' in edge_data:
            if 'transfer' in edge_data['lines']:
                parts.append("→[환승]→")
            elif 'walk' in edge_data['lines']:
                parts.append("→[도보]→")
            else:
                # 버스/지하철 이동 간선
                parts.append("→")
        else:
            parts.append("→")
        parts.append(format_node_label(v))
    return "".join(parts)

In [None]:
# (G) 정렬 & 상위 5개 추출
by_time = sorted(candidates, key=lambda x: x['total_time'])[:5]
by_transfers = sorted(candidates, key=lambda x: (x['transfers'], x['total_time']))[:5]
by_walk = sorted(candidates, key=lambda x: (x['walk_sum'], x['total_time']))[:5]

# (H) 출력
print("\n== 최단 시간 1~5순위 ==")
for rank, c in enumerate(by_time, 1):
    node_s, node_t = c['node_s'], c['node_t']
    # 실제 경로 노드 리스트
    path = nx.shortest_path(G_full, source=node_s, target=node_t, weight='weight')
    route_str = format_full_path(path)
    print(f"{rank}. {route_str}")
    print(f"   총 {c['total_time']:.1f}분  (보행 {c['walk_start']:.1f} + {c['walk_end']:.1f}분, "
          f"이동 {c['transit']:.1f}분, 환승 {c['transfers']}회)\n")

print("\n== 최소 환승 1~5순위 ==")
for rank, c in enumerate(by_transfers, 1):
    node_s, node_t = c['node_s'], c['node_t']
    path = nx.shortest_path(G_full, source=node_s, target=node_t, weight='weight')
    route_str = format_full_path(path)
    print(f"{rank}. {route_str}")
    print(f"   환승 {c['transfers']}회, 총 {c['total_time']:.1f}분  "
          f"(보행 합계 {c['walk_sum']:.1f}분)\n")

print("\n== 최소 도보 1~5순위 ==")
for rank, c in enumerate(by_walk, 1):
    node_s, node_t = c['node_s'], c['node_t']
    path = nx.shortest_path(G_full, source=node_s, target=node_t, weight='weight')
    route_str = format_full_path(path)
    print(f"{rank}. {route_str}")
    print(f"   총 보행 {c['walk_sum']:.1f}분, 총 {c['total_time']:.1f}분, 환승 {c['transfers']}회\n")


== 최단 시간 1~5순위 ==
1. 버스_고대사거리→[도보]→버스_강남역8번출구
   총 6.4분  (보행 1.4 + 0.0분, 이동 5.0분, 환승 0회)

2. 버스_고대사거리→[도보]→버스_강남역9번출구
   총 6.4분  (보행 1.4 + 0.0분, 이동 5.0분, 환승 0회)

3. 버스_제기시장→[도보]→버스_강남역8번출구
   총 6.5분  (보행 1.5 + 0.0분, 이동 5.0분, 환승 0회)

4. 버스_제기시장→[도보]→버스_강남역9번출구
   총 6.5분  (보행 1.5 + 0.0분, 이동 5.0분, 환승 0회)

5. 버스_우신향병원→[도보]→버스_강남역8번출구
   총 7.1분  (보행 2.1 + 0.0분, 이동 5.0분, 환승 0회)


== 최소 환승 1~5순위 ==
1. 버스_고대사거리→[도보]→버스_강남역8번출구
   환승 0회, 총 6.4분  (보행 합계 5.0분)

2. 버스_고대사거리→[도보]→버스_강남역9번출구
   환승 0회, 총 6.4분  (보행 합계 5.0분)

3. 버스_제기시장→[도보]→버스_강남역8번출구
   환승 0회, 총 6.5분  (보행 합계 5.0분)

4. 버스_제기시장→[도보]→버스_강남역9번출구
   환승 0회, 총 6.5분  (보행 합계 5.0분)

5. 버스_우신향병원→[도보]→버스_강남역8번출구
   환승 0회, 총 7.1분  (보행 합계 5.0분)


== 최소 도보 1~5순위 ==
1. 버스_고대사거리→[도보]→버스_강남역8번출구
   총 보행 5.0분, 총 6.4분, 환승 0회

2. 버스_고대사거리→[도보]→버스_강남역9번출구
   총 보행 5.0분, 총 6.4분, 환승 0회

3. 버스_제기시장→[도보]→버스_강남역8번출구
   총 보행 5.0분, 총 6.5분, 환승 0회

4. 버스_제기시장→[도보]→버스_강남역9번출구
   총 보행 5.0분, 총 6.5분, 환승 0회

5. 버스_우신향병원→[도보]→버스_강남역8번출구
   총 보행 5.0분, 총 7.1분, 환승 0회



구체화
1) 최단 시간 기준
	•	추가 제한 조건:
	•	경로의 불확실성/편차: 시간 편차가 큰 경로는 제외 (평균은 짧지만 리스크 있음).
	•	IQR 기반 임계값 초과하는 과도하게 빠른 경로는 허위 정보/이상치로 간주.
	•	정류장 간 거리가 긴 노선은 제외 (걷는 시간 20분 이상 제외).

⸻

2) 최소 환승 기준 (with IQR-based time threshold)
	•	추가 제한 조건:
	•	환승 소요 시간이 실제 걸리는 시간과 큰 차이(2배 이상)가 나는 경로는 제외.
	•	환승 횟수는 적지만 도보 이동이 긴 경우는 IQR 기준으로 필터링.
		IQR 기반 임계값 초과하는 과도하게 빠른 경로, 느린 경로는 허위 정보/이상치로 간주.

⸻

3) 최소 도보 기준 (with IQR-based time threshold)
	•	추가 제한 조건:
	•	우회 경로로 인해 시간이 길어지면 제외.IQR 기준
	• 환승 4회 이상 제외.