# このファイルについて
- 高速道路上のIC間経路を計算するためのプログラム

## 動作環境
- Ubuntu 20.04.4 LTS (Focal Fossa)
- Python 3.8.13
- networkx 2.6.3
- pandas 1.3.5

In [1]:
import os
import pickle
from typing import Dict, List, Optional, Set

import networkx as nx
import pandas as pd

In [2]:
# data directory
DATA_DIR = '../../Input_processed_data'

# 道路構造データ
IC_CSV = f'{DATA_DIR}/road_master/ic_preprocessed.csv'
IC_NET_CSV = f'{DATA_DIR}/road_master/icnet_all.csv'

# 道路構造データの読み込み・ICグラフの構築

In [3]:
# モジュール内で前処理済み
df_ic = pd.read_csv(IC_CSV, dtype={'ic_code': str})
df_icnet = pd.read_csv(IC_NET_CSV, dtype={'start_code': str, 'end_code': str, 'road_code': str})

In [4]:
code2name = dict(zip(df_ic['ic_code'], df_ic['ic_name']))
name2code = {v: k for k, v in code2name.items()}

In [5]:
ic_graph = nx.from_pandas_edgelist(
    df_icnet, source='start_code', target='end_code',
    edge_attr=['distance', 'road_code', 'direction'], create_using=nx.DiGraph())

In [6]:
ic_nodes_set: set = set(ic_graph.nodes)

# 経路検索用プログラム

## 最短経路オブジェクト`route_dict`の読み込み・書き出し

In [7]:
fname = './route_dict.pkl'

if os.path.exists(fname): # 経路マップがすでに存在しているとき、それを使う
    with open(fname, 'rb') as f:
        print('Loading IC Routes...')
        route_dict = pickle.load(f)
else: # 存在していなければ計算してバイナリで保存
    print('Calculating IC Routes...')
    route_dict = dict(nx.all_pairs_dijkstra_path(ic_graph, weight='distance'))
    
    with open(fname, 'wb') as f:
        pickle.dump(route_dict, f)
print('Finished.')

Loading IC Routes...
Finished.


In [8]:
! du -h ./route_dict.pkl

3.4G	./route_dict.pkl


## トラカンデータが省略しているICを列挙する

In [9]:
# 省略するICをハードコーディング

small_ic_set = {
    '1040014',
    '1040019',
    '1040021',
    '1040041',
    '1040071',
    '1040081',
    '1040111',
    '1800004',
    '1800031',
    '1800046',
    '1800093',
    '1800101'
}

### old

In [9]:
# トラカンデータが持つ区間のdataframe
tc_segments = pd.read_pickle(f'./traffic_counter_segments.pkl')

print(tc_segments.shape)
tc_segments.head(3)

(80, 4)


Unnamed: 0,start_code,start_name,end_code,end_name
0,1800006,所沢,1110210,大泉ＪＣＴ
1,1800011,三芳ＰＡ,1800006,所沢
2,1800016,川越,1800011,三芳ＰＡ


In [10]:
def get_small_ic_set(df: pd.DataFrame) -> Set[str]:
    '''
    トラカン固有のIC区間に本来含まれるICを列挙する
    '''
    unmatch_segments = [
        segment for segment in df.loc[:, ['start_code', 'end_code']].values
        if not ic_graph.has_edge(segment[0], segment[1])
    ]
    
    small_ic_set = set()
    for segment in unmatch_segments:
        path = nx.shortest_path(ic_graph, *segment)
        # 両端（start ICとend ICを除く）
        small_ic_subset = set(path[1:-1])
        small_ic_set |= small_ic_subset
    return small_ic_set

In [7]:
fname = './small_ic.pkl'

if os.path.exists(fname):
    df_small_ic = pd.read_pickle(fname)
    small_ic_set = set(df_small_ic.ic_code)
else:
    small_ic_set = get_small_ic_set(tc_segments)
    small_ic_list = list(small_ic_set)
    
    df_small_ic = pd.DataFrame({
        'ic_code': small_ic_list,
        'ic_name': [code2name[ic] for ic in small_ic_list]
    }).sort_values('ic_code')
    df_small_ic.to_pickle('./small_ic.pkl')
    
for ic in small_ic_set:
    print(code2name[ic])

高坂ＳＡ
都賀西方ＰＡ
大谷ＰＡ
蓮田ＳＡ
蓮田ＳＡ
赤城ＰＡ
黒磯ＰＡ
嵐山ＰＡ
赤城高原ＳＡ
浦和（仙台方面）
羽生ＰＡ
新座本線


## main

In [10]:
def __get_route(
    src: str, dest: str, route_dict: Dict[str, Dict[str, List[str]]]
) -> Optional[List[str]]:
    if not (src in ic_nodes_set and dest in ic_nodes_set):
        return []
    try:
        path = route_dict[src][dest]
        return path
    except: # 経路が存在しない, もしくはノードがグラフ上に存在しない場合
        return []

In [11]:
def get_route(
    src: str, 
    dest: str, 
    route_dict: Dict[str, Dict[str, List[str]]],
    excluded_ic_set: Set[str] = set(),
) -> List[str]:
    '''
    ic_graph上で出発地から目的地までの経路を得る関数

    Parameters
    --------------
    src: 出発ICコード
    dest: 目的ICコード
    '''
    path = __get_route(src, dest, route_dict=route_dict)
    
    if len(excluded_ic_set) > 0:
        path = [ic for ic in path if ic not in excluded_ic_set]
    return path

## test

In [17]:
name2code['大泉ＪＣＴ'], name2code['花園']

('1110210', '1800051')

In [21]:
src, dest = name2code['大泉ＪＣＴ'], name2code['花園']
route_dict[src][dest]

['1110210',
 '1800004',
 '1800006',
 '1800011',
 '1800016',
 '1800021',
 '1800026',
 '1800028',
 '1800031',
 '1800036',
 '1800041',
 '1800046',
 '1800051']

In [22]:
src, dest = name2code['大泉ＪＣＴ'], name2code['花園']

path = get_route(src, dest, route_dict=route_dict)
print([code2name[ic] for ic in path])

['大泉ＪＣＴ', '新座本線', '所沢', '三芳ＰＡ', '川越', '鶴ヶ島ＪＣＴ', '鶴ヶ島', '坂戸西スマート', '高坂ＳＡ', '東松山', '嵐山小川', '嵐山ＰＡ', '花園']


In [23]:
src, dest = name2code['大泉ＪＣＴ'], name2code['花園']

path = get_route(src, dest, route_dict=route_dict, excluded_ic_set=small_ic_set)
print([code2name[ic] for ic in path])

['大泉ＪＣＴ', '所沢', '三芳ＰＡ', '川越', '鶴ヶ島ＪＣＴ', '鶴ヶ島', '坂戸西スマート', '東松山', '嵐山小川', '花園']


In [24]:
src, dest = name2code['大泉ＪＣＴ'], name2code['大泉ＪＣＴ']

path = get_route(src, dest, route_dict=route_dict, excluded_ic_set=small_ic_set)
print([code2name[ic] for ic in path])

['大泉ＪＣＴ']


In [25]:
src, dest = name2code['新座本線'], name2code['花園']

path = get_route(src, dest, route_dict=route_dict, excluded_ic_set=small_ic_set)
print([code2name[ic] for ic in path])

['所沢', '三芳ＰＡ', '川越', '鶴ヶ島ＪＣＴ', '鶴ヶ島', '坂戸西スマート', '東松山', '嵐山小川', '花園']


In [26]:
src, dest = name2code['新座本線'], name2code['所沢']

path = get_route(src, dest, route_dict=route_dict, excluded_ic_set=small_ic_set)
print([code2name[ic] for ic in path])

['所沢']


In [27]:
src, dest = name2code['新座本線'], name2code['新座本線']

path = get_route(src, dest, route_dict=route_dict, excluded_ic_set=small_ic_set)
print([code2name[ic] for ic in path])

[]


In [28]:
src, dest = name2code['新座本線'], 'error code'

path = get_route(src, dest, route_dict=route_dict, excluded_ic_set=small_ic_set)
print([code2name[ic] for ic in path])

[]


## test module

In [7]:
from expressway_router import ExpresswayRouter

In [8]:
icnet_file = f'../../Input_processed_data/road_master/icnet_all.csv'

router = ExpresswayRouter(icnet_file)

Loading IC Network...
Finished.


In [9]:
import pandas as pd

df_ic = pd.read_csv(f'../../Input_processed_data/road_master/ic_preprocessed.csv', dtype={'ic_code': str})
code2name = dict(zip(df_ic['ic_code'], df_ic['ic_name']))
name2code = {v: k for k, v in code2name.items()}

In [10]:
# 大泉JCT --> 花園
# src, dest = name2code['大泉ＪＣＴ'], name2code['花園ＪＣＴ']
src, dest = ('1110210', '1800051')

In [11]:
path = router.get_route(src, dest)
print(path)
print([code2name[ic] for ic in path])

['1110210', '1800006', '1800011', '1800016', '1800021', '1800026', '1800028', '1800036', '1800041', '1800051']
['大泉ＪＣＴ', '所沢', '三芳ＰＡ', '川越', '鶴ヶ島ＪＣＴ', '鶴ヶ島', '坂戸西スマート', '東松山', '嵐山小川', '花園']


In [12]:
# 大泉JCT --> 大泉JCT
# src, dest = name2code['大泉ＪＣＴ'], name2code['大泉ＪＣＴ']
src, dest = ('1110210', '1110210')

path = router.get_route(src, dest)
print(path)
print([code2name[ic] for ic in path])

['1110210']
['大泉ＪＣＴ']


In [13]:
src, dest = name2code['新座本線'], name2code['花園']

path = router.get_route(src, dest)
print([code2name[ic] for ic in path])

['所沢', '三芳ＰＡ', '川越', '鶴ヶ島ＪＣＴ', '鶴ヶ島', '坂戸西スマート', '東松山', '嵐山小川', '花園']


In [14]:
src, dest = name2code['新座本線'], name2code['所沢']

path = router.get_route(src, dest)
print([code2name[ic] for ic in path])

['所沢']


In [15]:
src, dest = name2code['新座本線'], name2code['新座本線']

path = router.get_route(src, dest)
print([code2name[ic] for ic in path])

[]


In [16]:
src, dest = name2code['新座本線'], 'error code'

path = router.get_route(src, dest)
print([code2name[ic] for ic in path])

[]
