このファイルについて
- about
    - 検索履歴データから館山道・関越道の各区間の5分単位の検索数を計算する
- author: 松永
- Input
    - ../Input_processed_data/search_records/csv202xxx/*
- Output
    - 時間指定あり検索
        - ../Input_processed_data/search_count/search-count_{tateyama or kannetsu}.csv
    - 時間指定なし検索
        - ../Input_processed_data/search_count/search-count_{tateyama or kannetsu}_unspecified.csv"

In [1]:
import datetime as dt
import os
import pickle
import time
from typing import Dict, List, Optional, Set, Tuple

import networkx as nx
import numpy as np
import pandas as pd
import cudf
# import warnings
# warnings.simplefilter('ignore')

In [2]:
# data directory
DATA_DIR = '../../Input_processed_data'

# IC, 道路情報 csv
IC_CSV = f'{DATA_DIR}/road_master/ic_preprocessed.csv'
IC_NET_CSV = f'{DATA_DIR}/road_master/220303-doronet_ic.csv'
IC_SUBNET_CSV = f'{DATA_DIR}/road_master/icnet_sub.csv'

# 検索ログ csv
SEARCH_LOG_DIR = lambda month: f'{DATA_DIR}/simple_search_records/csv{month}'
SEARCH_LOG_CSV = lambda date: f'{SEARCH_LOG_DIR(date[:6])}/record_{date}.csv'

# 準備

## 道路構造データの読み込み

In [3]:
# モジュール内で前処理済み
df_ic = pd.read_csv(IC_CSV, dtype={'ic_code': str})
df_icnet = pd.read_csv(IC_NET_CSV, dtype={'start_code': str, 'end_code': str, 'road_code': str})
sub_icnet = pd.read_csv(IC_SUBNET_CSV, dtype={'start_code': str, 'end_code': str, 'road_code': str})

code2name = dict(zip(df_ic['ic_code'], df_ic['ic_name']))
name2code = {v: k for k, v in code2name.items()}

ic_graph = nx.from_pandas_edgelist(
    df_icnet, source='start_code', target='end_code',
    edge_attr=['distance', 'road_code', 'direction'], create_using=nx.DiGraph())

In [4]:
# 区間ごとの制限速度を格納したテーブル, Map を作成
df_limits = sub_icnet.loc[:, ['start_code', 'end_code', 'start_name', 'end_name', 'road_code', 'limit']]
limit_dict = {
    (s_code, e_code): lim for s_code, e_code, lim in df_limits.loc[:, ['start_code', 'end_code', 'limit']].values
}

In [5]:
ic_nodes_set: set = set(code2name.keys())

In [6]:
# トラカンデータが持つ区間のdataframe
tc_segments = pd.read_pickle(f'./traffic_counter_segments.pkl')
tc_segments_set = set(
    [tuple(segment) for segment in tc_segments.loc[:, ['start_code', 'end_code']].values]
)

## 最短経路オブジェクト`route_dict`の読み込み・書き出し

In [7]:
fname = './route_dict.pkl'

if os.path.exists(fname): # 経路マップがすでに存在しているとき、それを使う
    with open(fname, 'rb') as f:
        print('Loading IC Routes...')
        route_dict = pickle.load(f)
else: # 存在していなければ計算してバイナリで保存
    print('Calculating IC Routes...')
    route_dict = dict(nx.all_pairs_dijkstra_path(ic_graph, weight='distance'))
    
    with open(fname, 'wb') as f:
        pickle.dump(route_dict, f)
print('Finished.')

Loading IC Routes...
Finished.


In [8]:
! du -h ./route_dict.pkl

3.4G	./route_dict.pkl


## 省略するICの集合を読み込む

In [9]:
with open('small_ic.pkl', 'rb') as f:
    df_small_ic = pickle.load(f)

small_ic_set = set(df_small_ic.ic_code)

# Functions

## 経路計算

In [10]:
def __get_route(
    src: str, dest: str, route_dict: Dict[str, Dict[str, List[str]]]
) -> Optional[List[str]]:
    if not (src in ic_nodes_set and dest in ic_nodes_set):
        return []
    try:
        path = route_dict[src][dest]
        return path
    except: # 経路が存在しない, もしくはノードがグラフ上に存在しない場合
        return []

In [11]:
def get_route(
    src: str, 
    dest: str, 
    route_dict: Dict[str, Dict[str, List[str]]],
    excluded_ic_set: Set[str] = set(),
) -> List[str]:
    '''
    ic_graph上で出発地から目的地までの経路を得る関数

    Parameters
    --------------
    src: 出発ICコード
    dest: 目的ICコード
    '''
    path = __get_route(src, dest, route_dict=route_dict)
    
    if len(excluded_ic_set) > 0:
        path = [ic for ic in path if ic not in excluded_ic_set]
    return path

In [12]:
def get_route_with_time(
    src: str,
    dest: str,
    spec_datetime: dt.datetime,
    spec_type: int,
    route_dict: Dict[str, Dict[str, List[str]]],
    excluded_ic_set: Set[str] = set(),
) -> List[Tuple[str, dt.datetime]]:
    '''
    ic_graph上で出発地（src_name）から目的地（target_name）までの予想通過時刻付き経路を得る関数
    '''
    # 関越道・館山道 以外の道路の移動速度は80km/hと仮定する
    DEFAULT_SPEED = 80
    
    path = __get_route(src, dest, route_dict=route_dict)

    elapsed = dt.timedelta()
    elapsed_time_list = [elapsed]
    for i, (start, end) in enumerate(zip(path, path[1:])):
        dist = ic_graph[start][end]['distance']
        limit_speed = limit_dict.get((start, end), DEFAULT_SPEED)
        
        # 所要時間を積算
        td = dt.timedelta(hours = dist / limit_speed)
        elapsed += td
        elapsed_time_list.append(elapsed)
    
    if spec_type == 1:
        time_list = [spec_datetime + td for td in elapsed_time_list]
    elif spec_type == 2:
        time_list = [spec_datetime - td for td in elapsed_time_list[::-1]]
    else:
        time_list = [spec_datetime + td for td in elapsed_time_list]
    
    result = list(zip(path, time_list))
    if len(excluded_ic_set) > 0:
        result = [(ic, time) for (ic, time) in result if ic not in excluded_ic_set]
    return result

## 検索ログの読み込み

In [13]:
def str2date(date: str, format: str = '%Y%m%d') -> dt.date:
    return dt.datetime.strptime(date, '%Y%m%d').date()

In [14]:
def get_log(date: str) -> cudf.DataFrame:
    type_map = {
        'datetime': np.datetime64,
        'start_code': 'category',
        'end_code': 'category',
        'spec_datetime': np.datetime64,
        'spec_type': 'category',
        'car_type': 'category'
    }
    if not os.path.exists(SEARCH_LOG_CSV(date)):
        return cudf.DataFrame(columns=type_map.keys())

    df = pd.read_csv(SEARCH_LOG_CSV(date)).astype(type_map)
    df = df.loc[
        (df.start_code.isin(ic_nodes_set)) &
        (df.end_code.isin(ic_nodes_set))
    ]
    return cudf.from_pandas(df)

In [15]:
def get_past_logs(date: str, periods: int, include_target: bool = False) -> cudf.DataFrame:
    '''
    指定日(target_date)から過去数日分の検索履歴データを取得する関数

    Parameters
    --------------
    date: 混雑度算出の対象となる日付（文字列 or リスト)
    periods: 過去何日分の履歴を参照するか
    include_target: dateをデータに含めるかどうか
    '''
    # 参照すべき全日付のiterableを生成
    if include_target:
        dt_range = pd.date_range(end=date, periods=periods)
    else:
        dt_range = pd.date_range(end=date, periods=periods+1, closed='left')

    DAYS = [d.strftime('%Y%m%d') for d in dt_range]

    df_specified = cudf.DataFrame()
    for d in DAYS:
        df = get_log(d)
        if len(df) == 0:
            continue
        
        series_spec_date: pd.Series = df.spec_datetime.to_pandas().map(lambda dt: dt.date())
        _df_specified = df.loc[series_spec_date == str2date(date)]
        df_specified = cudf.concat([df_specified, _df_specified], ignore_index=True)
    df_specified.reset_index(drop=True, inplace=True)
    
    if len(df_specified) == 0:
        df_specified = cudf.DataFrame(columns=df.columns)
    return df_specified

In [34]:
def get_unspecified_log(date: str) -> cudf.DataFrame:
    '''
    検索履歴全体から時間指定なしの検索のみを得る関数
    '''
    ALLOWED_MINUTE_LAG = 15
    
    df = get_log(date)
    # 検索日時と指定日時の差を秒単位で算出
    t_diff = abs((df.datetime - df.spec_datetime) / np.timedelta64(1, 's'))
    
    # 事前に定義した時間差（=15分）より検索日時と指定日時の差が少ないレコードのみを抽出
    df_unspecified = df.loc[t_diff < ALLOWED_MINUTE_LAG * 60].reset_index(drop=True)
    return df_unspecified

## 検索ログのマッピング

In [16]:
def map_search_to_road_network(
    df: cudf.DataFrame,
    route_dict: Dict[str, Dict[str, List[str]]],
    excluded_ic_set: Set[str] = set()
) -> cudf.DataFrame:
    '''
    検索ログから各道路区間の予想通過時刻を計算する
    '''
    start_code_list = []
    end_code_list = []
    passing_time_list = []
    
    columns = ['start_code', 'end_code', 'spec_datetime', 'spec_type']
    for (src, dest, spec_datetime, spec_type) in df.loc[:, columns].to_pandas().values:
        path = get_route_with_time(
            src, dest, spec_datetime, spec_type, 
            route_dict=route_dict, 
            excluded_ic_set=excluded_ic_set
        )
        for i, (start, end) in enumerate(zip(path, path[1:])):
            s_code, e_code = start[0], end[0]
            s_ptime = start[-1]
            
            # if ic_graph[s_code][e_code]['road_code'] not in target_road_code_set:
            #     break
            
            start_code_list.append(s_code)
            end_code_list.append(e_code)
            passing_time_list.append(s_ptime)
            
        if spec_type not in {1, 2} and len(path) > 0:
            with open('./error_spec_type_records.csv', 'a') as f:
                print(f'{s_ptime},{src},{dest},{spec_datetime},{spec_type}', file=f)
                


    return cudf.DataFrame({
        'start_code': start_code_list,
        'end_code': end_code_list,
        'passing_time': passing_time_list,
    }).astype({
        'start_code': 'category',
        'end_code': 'category'
    })

In [17]:
def aggregate_search_count(df: cudf.DataFrame, timeslice: str = '5min') -> cudf.DataFrame:
    # 検索量として積算される対象となるカラムを付与
    result = df.assign(search=1)
    
    # timesliceでサンプリングし、検索量の和を取る
    result = (result
              .set_index('passing_time')
              .to_pandas()
              .groupby(['start_code', 'end_code'])
              .apply(lambda g: g['search'].resample(timeslice).sum())
              .reset_index())
    return cudf.from_pandas(result)

# 検索数の作成

## 時間指定あり

In [18]:
period_blocks = [
    # ('20210402', '20210630'),
    # ('20210701', '20210930'),
    # ('20211001', '20211231'),
    # ('20220101', '20220331'),
    # ('20220401', '20220630'),
    # ('20220701', '20220930'),
    ('20221001', '20221231'),
    ('20230101', '20230331'),
    ('20230401', '20230630'),
    ('20230701', '20230731'),
    ('20230801', '20230816'),
    ('20230817', '20230930'),
]

In [19]:
past_periods = 7
timeslice = '5min'

In [None]:
for start_date, end_date in period_blocks:
    print('='*40, f'{start_date} -> {end_date}', '='*40)
    
    DAYS: List[str] = [
        d.strftime('%Y%m%d') for d in pd.date_range(start_date, end_date, freq='1D')
    ]
    
    df_search = cudf.DataFrame()
    for i, date in enumerate(DAYS):
        s = time.time()
        
        df = get_past_logs(date, periods=past_periods)
        
        df_mapped = map_search_to_road_network(
            df, route_dict, excluded_ic_set=small_ic_set
        )
        df_mapped = df_mapped.loc[
            df_mapped.loc[:, ['start_code', 'end_code']].to_pandas().apply(
                lambda segment: tuple(segment) in tc_segments_set, axis=1
            )
        ].reset_index(drop=True)
        
        _df_search = aggregate_search_count(df_mapped, timeslice)
        _df_search = cudf.from_pandas(
            _df_search.set_index('passing_time').to_pandas().loc[date]
        ).reset_index()

        print(f'{date} | {len(_df_search)} records ({time.time() - s:.3f} sec)')

        df_search = cudf.concat([df_search, _df_search], ignore_index=True)

    # df_count.reset_index(inplace=True)
    OUTPUT_FILE = f'./search_count/search-count_specified_{start_date}-{end_date}.csv'
    df_search.to_pandas().to_csv(OUTPUT_FILE, index=False)

20221001 | 23040 records (49.901 sec)
20221002 | 23039 records (32.963 sec)
20221003 | 23034 records (24.963 sec)
20221004 | 23029 records (15.668 sec)
20221005 | 23031 records (20.169 sec)
20221006 | 23040 records (21.342 sec)
20221007 | 23040 records (34.183 sec)
20221008 | 23040 records (65.459 sec)
20221009 | 23038 records (47.246 sec)
20221010 | 23039 records (24.435 sec)
20221011 | 23022 records (28.638 sec)
20221012 | 23031 records (24.290 sec)
20221013 | 23040 records (23.523 sec)
20221014 | 23040 records (31.208 sec)
20221015 | 23040 records (49.438 sec)
20221016 | 23001 records (34.991 sec)
20221017 | 23032 records (26.069 sec)
20221018 | 23040 records (22.782 sec)
20221019 | 23026 records (17.836 sec)
20221020 | 22951 records (25.446 sec)
20221021 | 23040 records (31.437 sec)
20221022 | 23040 records (59.247 sec)
20221023 | 23040 records (30.093 sec)
20221024 | 23040 records (25.657 sec)
20221025 | 22992 records (22.319 sec)
20221026 | 23027 records (22.641 sec)
20221027 | 2

In [21]:
!head -n5 "$OUTPUT_FILE"

passing_time,start_code,end_code,search
2023-08-17 00:00:00,1040013,1040016,11
2023-08-17 00:05:00,1040013,1040016,4
2023-08-17 00:10:00,1040013,1040016,6
2023-08-17 00:15:00,1040013,1040016,16


In [22]:
!tail -n5 "$OUTPUT_FILE"

2023-09-30 23:35:00,1800106,1800111,2
2023-09-30 23:40:00,1800106,1800111,0
2023-09-30 23:45:00,1800106,1800111,0
2023-09-30 23:50:00,1800106,1800111,0
2023-09-30 23:55:00,1800106,1800111,4


## 時間指定なし

In [62]:
period_blocks = [
    ('20210401', '20210630'),
    ('20210701', '20210930'),
    ('20211001', '20211231'),
    ('20220101', '20220331'),
    ('20220401', '20220630'),
    ('20220701', '20220930'),
    ('20221001', '20221231'),
    ('20230101', '20230331'),
    ('20230401', '20230630'),
    ('20230701', '20230731'),
    ('20230801', '20230816'),
    ('20230817', '20230930'),
]

In [63]:
def aggregate_search_count(df: cudf.DataFrame) -> cudf.DataFrame:
    # 検索量として積算される対象となるカラムを付与
    result = df.assign(search=1)
    
    # timesliceでサンプリングし、検索量の和を取る
    result = (result
              .set_index('passing_time')
              .to_pandas()
              .groupby(['start_code', 'end_code'])
              .apply(lambda g: g['search'].sum())
              .reset_index()
              .rename(columns={0: 'search'}))
    return cudf.from_pandas(result)

In [None]:
for start_date, end_date in period_blocks:
    print('='*40, f'{start_date} -> {end_date}', '='*40)
    
    DAYS: List[str] = [
        d.strftime('%Y%m%d') for d in pd.date_range(start_date, end_date, freq='1D')
    ]
    
    df_search = cudf.DataFrame()
    for i, date in enumerate(DAYS):
        s = time.time()
        
        df = get_unspecified_log(date)
        
        df_mapped = map_search_to_road_network(
            df, route_dict, excluded_ic_set=small_ic_set
        )
        df_mapped = df_mapped.loc[
            df_mapped.loc[:, ['start_code', 'end_code']].to_pandas().apply(
                lambda segment: tuple(segment) in tc_segments_set, axis=1
            )
        ].reset_index(drop=True)
        
        _df_search = aggregate_search_count(df_mapped)
        _df_search = (_df_search
                      .assign(search_date=str2date(date).strftime('%Y-%m-%d'))
                      .loc[:, ['search_date', 'start_code', 'end_code', 'search']])

        print(f'{date} [{time.time() - s:.3f} sec]')

        df_search = cudf.concat([df_search, _df_search], ignore_index=True)

    OUTPUT_FILE = f'./search_count/search-count_unspecified_{start_date}-{end_date}.csv'
    df_search.to_pandas().to_csv(OUTPUT_FILE, index=False)

20210401 [51.539 sec]
20210402 [52.568 sec]
20210403 [49.076 sec]
20210404 [73.560 sec]
20210405 [84.778 sec]
20210406 [78.978 sec]
20210407 [82.405 sec]
20210408 [25.712 sec]
20210409 [32.248 sec]
20210410 [31.325 sec]
20210411 [25.358 sec]
20210412 [33.609 sec]
20210413 [33.417 sec]
20210414 [25.201 sec]
20210415 [32.002 sec]
20210416 [33.906 sec]
20210417 [41.943 sec]
20210418 [44.776 sec]
20210419 [44.039 sec]
20210420 [34.789 sec]
20210421 [39.254 sec]
20210422 [41.728 sec]
20210423 [42.148 sec]
20210424 [64.635 sec]
20210425 [51.375 sec]
20210426 [50.683 sec]
20210427 [50.055 sec]
20210428 [47.648 sec]
20210429 [46.799 sec]
20210430 [49.963 sec]
20210501 [52.471 sec]
20210502 [48.047 sec]
20210503 [36.284 sec]
20210504 [37.087 sec]
20210505 [39.723 sec]
20210506 [33.031 sec]
20210507 [28.769 sec]
20210508 [19.068 sec]
20210509 [24.110 sec]
20210510 [19.783 sec]
20210511 [25.542 sec]
20210512 [18.797 sec]
20210513 [26.133 sec]
20210514 [19.506 sec]
20210515 [24.920 sec]
20210516 [

In [65]:
!head -n5 "$OUTPUT_FILE"

search_date,start_code,end_code,search
2023-08-17,1040013,1040016,5363
2023-08-17,1040016,1040013,4760
2023-08-17,1040016,1040020,6152
2023-08-17,1040020,1040016,4521


In [66]:
!tail -n5 "$OUTPUT_FILE"

2023-09-30,1800091,1800096,4219
2023-09-30,1800096,1800091,3744
2023-09-30,1800096,1800106,4169
2023-09-30,1800106,1800096,3312
2023-09-30,1800106,1800111,3918


### 時間指定なし検索を1日後ろにシフトする
日付Dのレコードに対して、昨日(D-1)の時間指定なし検索数を示すようにする

In [6]:
def shift_unspecified_search(df):
    res = (df
           .groupby(['start_code', 'end_code'])
           .apply(lambda g: g.search.shift(1))
           # 関越道の場合はshift後にpivot_tableっぽくなってしまう（通常はpd.Series）ため、特別に処理
           .pipe(lambda _df: _df.stack().rename('search') if not isinstance(_df, pd.Series) else _df)
           .reset_index()
           .sort_values(['search_date', 'start_code', 'end_code'])
           .reset_index(drop=True)
           .loc[:, ['search_date', 'start_code', 'end_code', 'search']])
    return res

In [7]:
df_unspecified = shift_unspecified_search(df_unspecified)