In [1]:
import sys
sys.version

'3.6.8 (default, Jan 14 2019, 11:02:34) \n[GCC 8.0.1 20180414 (experimental) [trunk revision 259383]]'

In [2]:
import os
os.chdir("/project/work/Passenger_Demand")

## 패키지 설치
!pip install seaborn
!pip install haversine
!pip install pyarrow
!pip install multiprocessing_on_dill
!pip install statsmodels

In [3]:
import pandas as pd
import numpy as np
import math
import numbers

from haversine import haversine
import seaborn as sns
import matplotlib.pyplot as plt

In [4]:
import datetime

In [5]:
from multiprocessing_on_dill import Pool, cpu_count
from functools import partial
from tqdm import tqdm
import pandas as pd
import numpy as np


def parallelize_dataframe(df, 
                          func, 
                          group_keys = None, 
                          num_cores = None,
                          **params):
    """
        Pandas DataFrame의 apply함수를 병렬처리 하는 함수

        Args: 
            df: 적용 대상 데이터프레임 (Pandas.DataFrame)
            group_keys: apply를 적용할 때 기준이되는 group by key

        Returns:
            df: (Pandas.DataFrame)
            
        Exception: 
    """

    if num_cores is None:
        num_cores = cpu_count()

    if group_keys is None:
        df_list = np.array_split(df, num_cores)
    elif group_keys is not None:
        gr_df = df.groupby(group_keys)
        df_list = [group for name, group in gr_df]
    
    #func = partial(func, **params)
    
    def map_func(data):
        return data.apply(func, axis = 1, group_data= data.copy() ,**params)
        
        
    with Pool(num_cores) as p:

        pd_result = pd.concat(p.map(map_func, df_list))
#         pd_result = p.starmap(func, **params)
        
    return pd_result

In [6]:
pd.set_option('display.max_columns', None)

plt.rcParams['font.family'] = 'Nanum Gothic'
sns.set(font="NanumGothic")

In [7]:
%%time
bus_demand_401 = pd.read_parquet('model_mr/mybicard_401_agg_imputation.parquet', engine='pyarrow')

CPU times: user 162 ms, sys: 123 ms, total: 285 ms
Wall time: 568 ms


In [14]:
%%time
bus_demand_401 = pd.read_parquet('model_mr/mybicard_401_agg.parquet', engine='pyarrow')

CPU times: user 40.9 ms, sys: 54 ms, total: 94.9 ms
Wall time: 26 ms


In [8]:
bus_demand_401.shape

(620940, 6)

In [9]:
def period_before_cnt(data, group_data, i):
    col_names = ['normalcnt','studentcnt','childcnt','totalcnt']
    data_copy = data.copy()
    subset_df = group_data.sort_values(['transdate']).reset_index(drop=True).copy()
    
    before_day = subset_df[(subset_df['transdate'] == data_copy['transdate'] - datetime.timedelta(days=i))][col_names].copy()
    before_week = subset_df[(subset_df['transdate'] == data_copy['transdate'] - datetime.timedelta(weeks=i))][col_names].copy()
    
    if len(before_day) == 0:
        before_day = pd.DataFrame([data_copy[col_names]])
    if len(before_week) == 0:
        before_week = pd.DataFrame([data_copy[col_names]])
    
    for name in col_names:
        data_copy[name + '_D-' + str(i)] = before_day.reset_index(drop=True).loc[0][name]
        data_copy[name + '_W-' + str(i)] = before_week.reset_index(drop=True).loc[0][name]
    return data_copy

In [10]:
%%time
bus_demand_401_i2 = parallelize_dataframe(df = bus_demand_401, func = period_before_cnt, group_keys = 'mybi_stop_id', num_cores = 40, i= 2)

Process ForkPoolWorker-14:
Process ForkPoolWorker-36:
Process ForkPoolWorker-35:
Process ForkPoolWorker-37:
Process ForkPoolWorker-19:
Process ForkPoolWorker-10:
Process ForkPoolWorker-22:
Process ForkPoolWorker-16:
Process ForkPoolWorker-29:
Process ForkPoolWorker-30:
Process ForkPoolWorker-25:
Process ForkPoolWorker-39:
Process ForkPoolWorker-11:
Process ForkPoolWorker-4:
Process ForkPoolWorker-40:
Process ForkPoolWorker-9:
Process ForkPoolWorker-24:
Exception ignored in: <function _get_module_lock.<locals>.cb at 0x7fd1bffd8b70>
Process ForkPoolWorker-34:


KeyboardInterrupt: 

In [281]:
def fill_blank_data(data, group_data):
    data_copy = data.copy()
    # 데이터 있는지 확인
    if math.isnan(data_copy['totalcnt']) : # 데이터 없음
        col_names = ['normalcnt','studentcnt','childcnt','totalcnt']
        subset_df = group_data.sort_values(['transdate']).reset_index(drop=True).copy()

        # 해당 일의 데이터가 있는지 확인
        check_day_df = subset_df[subset_df.apply(lambda x: x['transdate'].date(), axis=1) == data_copy['transdate'].date()][col_names].copy()
        
        if len(check_day_df) == 0 : # 해당 일의 데이터 없음
            
            # 4주 전까지의 데이터 확인
            date_4W_list = [data_copy['transdate'] - datetime.timedelta(weeks= i) for i in range(1,5)]
            if len(date_list) == 0 : # 4주 전까지의 데이터 없음
                tmp_date = data_copy['transdate'].date()
                
                # n주 전 데이터 찾기
                while tmp_date < subset_df['transdate'].min() :
                    result_data = subset_df[subset_df['transdate'] == tmp_date][col_names].copy()
                    if len(result_data) == 0 :
                        tmp_date -= datetime.timedelta(weeks= 1)
                    else :
                        break
                        
                # n주 전 데이터 찾았는지 확인
                if len(result_data) == 0: # False
                    result_data = pd.DataFrame([[data_copy['mybi_stop_id'], data_copy['transdate'], 0, 0, 0, 0]], columns= subset_df.columns)
                    
            else : # 4주 전까지의 데이터 있음
                result_data = subset_df[subset_df['transdate'].isin(date_4W_list)].groupby('transdate').mean() \
                            .apply(lambda x: math.ceil(x), axis=0).T[col_names].copy()
                
        else : # 해당 일의 데이터 있음
            result_data = pd.DataFrame([[data_copy['mybi_stop_id'], data_copy['transdate'], 0, 0, 0, 0]], columns= subset_df.columns)
            
    else : # 데이터 있음
        result_data = pd.DataFrame([data_copy])
    return result_data.reset_index(drop=True).loc[0]

In [289]:
%%time
bb = parallelize_dataframe(df = aa, func = fill_blank_data, group_keys = 'mybi_stop_id', num_cores = 40)

CPU times: user 42.5 s, sys: 1min 2s, total: 1min 45s
Wall time: 4h 47min 50s


In [290]:
bb.shape

(620940, 6)

In [313]:
bb.head(10)

Unnamed: 0,mybi_stop_id,transdate,normalcnt,studentcnt,childcnt,totalcnt
0,3100020,2020-04-08 00:00:00,0.0,0.0,0.0,0.0
1,3100020,2020-04-08 05:00:00,0.0,0.0,0.0,0.0
2,3100020,2020-04-08 06:00:00,2.0,0.0,0.0,2.0
3,3100020,2020-04-08 07:00:00,0.0,0.0,0.0,0.0
4,3100020,2020-04-08 08:00:00,7.0,0.0,0.0,7.0
5,3100020,2020-04-08 09:00:00,0.0,0.0,0.0,0.0
6,3100020,2020-04-08 10:00:00,2.0,0.0,0.0,2.0
7,3100020,2020-04-08 11:00:00,2.0,0.0,0.0,2.0
8,3100020,2020-04-08 12:00:00,1.0,0.0,0.0,1.0
9,3100020,2020-04-08 13:00:00,2.0,1.0,0.0,3.0


In [294]:
bb.isnull().sum()

mybi_stop_id    0
transdate       0
normalcnt       0
studentcnt      0
childcnt        0
totalcnt        0
dtype: int64

In [292]:
bb.to_parquet("model_mr/mybicard_401_agg_imputation.parquet")

In [316]:
def period_before_cnt(data, group_data, i):
    col_names = ['normalcnt','studentcnt','childcnt','totalcnt']
    data_copy = data.copy()
    subset_df = group_data.sort_values(['transdate']).reset_index(drop=True).copy()
    
    before_day = subset_df[(subset_df['transdate'] == data_copy['transdate'] - datetime.timedelta(days=i))][col_names].copy()
    before_week = subset_df[(subset_df['transdate'] == data_copy['transdate'] - datetime.timedelta(weeks=i))][col_names].copy()
    
    if len(before_day) == 0:
        before_day = pd.DataFrame([data_copy[col_names]])
    if len(before_week) == 0:
        before_week = pd.DataFrame([data_copy[col_names]])
    
    for name in col_names:
        data_copy[name + '_D-' + str(i)] = before_day.reset_index(drop=True).loc[0][name]
        data_copy[name + '_W-' + str(i)] = before_week.reset_index(drop=True).loc[0][name]
    return data_copy

In [None]:
%%time
cc = parallelize_dataframe(df = bb, func = period_before_cnt, group_keys = 'mybi_stop_id', num_cores = 10, i= 1)

In [None]:
cc.to_parquet("model_mr/mybicard_401_agg_imputation.parquet")