In [341]:
from util import *
from time import gmtime, strftime
from pytz import timezone
from datetime import datetime
from sqlalchemy import ForeignKey, Table, Column, String, Integer, Float, Boolean, MetaData, select
from joblib import Parallel, delayed
import random
from impala.dbapi import connect
from impala.util import as_pandas

In [33]:
def get_filtered_fg_df(feature_engineered_df):
    static_item_ids = feature_engineered_df.item_id[(feature_engineered_df.std_in_cluster == 0.0)].values
    data_df_cleaned = feature_engineered_df[feature_engineered_df.mean_in_cluster.notnull()]
    purified_df = data_df_cleaned[(data_df_cleaned.ratio_drop < 0.3)
#                           & (data_df_cleaned.ratio_same_value < 0.3)
                          & (data_df_cleaned.n_jumps <= 3)
                          & (data_df_cleaned.n_days >= 3)
#                           & (data_df_cleaned.std_in_cluster > 0.2)
                          & (data_df_cleaned.std_in_cluster < 4)
                          & (data_df_cleaned.ratio_of_na < 0.5)
#                           & (data_df_cleaned.n_unique_stock_id < 50)
                                 ]
    return purified_df, static_item_ids

def get_feature_engineered_bundle(df):
    
    def get_arr_in_cluster(df):

        empty_lst = []
        for name, group in df.groupby('label')['STOCK_AMOUNT']:
            result_lst = np.sort(np.diff(group))[1:-1]
            empty_lst = np.append(empty_lst, result_lst)
        arr_in_cluster = -empty_lst[empty_lst < 0]
        return arr_in_cluster
    
    # The number of unique stock_id

    df = df.set_index("REG_DT")
    unique_stock_ids = df.STOCK_ID.unique()
    n_unique_stock_id = len(unique_stock_ids) 
    
    # select a single stock_id
    tmp2 = list(df.groupby('STOCK_ID'))[0][1]
    
    
    
    # The ratio of NA
    tmp3 = tmp2.resample('1D').first()
    
    # The number of days
    n_days = len(tmp3.ID)
    
    if n_days <=1:
        return
    
    null_arr = pd.isnull(tmp3.ID).values
    ratio_of_na = sum(null_arr) / float(n_days)
    
    
    consecutive_lst = [ sum( 1 for _ in group ) for key, group in itertools.groupby( null_arr ) if key ]
    
    
    # The max value of consecutive NAs 
    max_consecutive_na = max([0] + consecutive_lst)
    

    # The instances of consecutive NAs
    n_consecutive_na = len(consecutive_lst)
    
    # Define a stock array
    stock_arr = tmp3.STOCK_AMOUNT.values
    
    # The medain
    median_v = np.nanmedian(stock_arr)
    
    # Std
    std_v = np.nanstd(stock_arr)
    
    # max, min
    max_v = np.nanmax(stock_arr)
    min_v = np.nanmin(stock_arr)
    
    # The range between max and min
    range_v = max_v - min_v
    
    stock_na_removed = stock_arr[~np.isnan(stock_arr)]
    
    consecutive_same_lst = [ sum( 1 for _ in group ) for key, group in itertools.groupby( stock_na_removed ) if key ]
    
    if len(consecutive_same_lst) == 0:
        ratio_same_value = 0
    else:
        ratio_same_value = max(consecutive_same_lst) / float(n_days)
    
    
    n_jumps = sum(np.diff(stock_na_removed) > 0)
    max_drop = -min(np.diff(stock_na_removed))
    
    tmp3['STOCK_AMOUNT'] = tmp3.STOCK_AMOUNT.replace(np.nan, -1)
    n_cluster, label = get_label_from_dbscan(tmp3)
    
    tmp3['label'] = label
    arr_in_cluster = get_arr_in_cluster(tmp3)

    if len(arr_in_cluster) > 0:

        mean_in_cluster = np.nanmean(arr_in_cluster)
        std_in_cluster = np.nanstd(arr_in_cluster)
    else:
        mean_in_cluster = 0
        std_in_cluster = 0

    
    bundle = {
        'item_id': df.ITEM_ID.values[0],
        'stock_id': tmp3.STOCK_ID.values[0],
        'n_unique_stock_id': n_unique_stock_id,
        'n_days': n_days,
        'ratio_of_na': ratio_of_na,
        'max_consecutive_na': max_consecutive_na,
        'n_consecutive_na': n_consecutive_na,
        'median_v': median_v,
        'std_v': std_v,
        'max_v': max_v,
        'ratio_drop': max_drop / float(max_v),
        'min_v': min_v,
        'range_v': range_v,
        'ratio_same_value': ratio_same_value,
        'n_jumps': n_jumps,
        'max_drop': max_drop,
        'n_cluster': n_cluster,
        'mean_in_cluster': mean_in_cluster,
        'std_in_cluster': std_in_cluster

    }
    
    return bundle

In [415]:
def get_arr_in_cluster(df):

    empty_lst = []
    for name, group in df.groupby('label')['STOCK_AMOUNT']:
        result_lst = np.sort(np.diff(group))[1:-1]
        empty_lst = np.append(empty_lst, result_lst)
    arr_in_cluster = -empty_lst[empty_lst < 0]
    return arr_in_cluster

In [16]:
epopcon_db = Epopcon_db()
wspider_engine = epopcon_db.get_engine(production=True)
wspider_temp_engine = epopcon_db.get_engine(production=False)

In [17]:
conn = connect(host='133.186.168.6', port=21050)
cursor = conn.cursor()

In [38]:
ids_df = pd.read_sql_query("SELECT ID FROM MWS_COLT_ITEM WHERE RELEASE_DT > '2018-01-01'", wspider_engine)

OperationalError: (_mysql_exceptions.OperationalError) (2006, 'MySQL server has gone away') [SQL: "SELECT ID FROM MWS_COLT_ITEM WHERE RELEASE_DT > '2018-01-01'"]

In [453]:
DENOM = 100
item_ids = ids_df.ID.values.astype(str)
n_batches = math.ceil( len(item_ids) / float(DENOM))
batch_ls = [str(tuple(batch)) for batch in np.array_split(item_ids, n_batches)]
batch_lst = [(idx, row) for idx, row in enumerate(batch_ls)]

In [454]:
idx, query = batch_lst[0]
stmt = "SELECT item_id, stock_id, reg_dt, stock_amount FROM MWS_COLT_ITEM_IVT WHERE item_id IN %s" % query
cursor.execute(stmt)
batch = as_pandas(cursor)
batch.columns = [item.upper() for item in batch.columns]
batch['REG_DT'] = pd.to_datetime(batch['REG_DT'])
batch[['STOCK_AMOUNT']] = batch[['STOCK_AMOUNT']].astype(np.int)

In [455]:
# batch = batch.set_index('REG_DT')
# batch = batch.resample('1D').first()

In [456]:
# batch

In [457]:
# batch.loc[batch.ITEM_ID == '6739668']

In [458]:
batch_cov = batch.set_index('REG_DT').groupby(['ITEM_ID', 'STOCK_ID']).resample('D').first()

In [463]:
import time

In [None]:
start = time.time()

In [466]:
start = time.time()
result_lst = [wrapper(item_id, batch_cov.loc[item_id])for item_id in batch_cov.ITEM_ID.unique() if item_id is not np.nan]
extracted_feature_df = pd.DataFrame([result for result in result_lst if result != None])
time.time() - start

17.80727791786194

In [467]:
extracted_feature_df

Unnamed: 0,item_id,max_consecutive_na,max_drop,max_v,mean_in_cluster,median_v,min_v,n_cluster,n_consecutive_na,n_days,n_jumps,n_unique_stock_id,range_v,ratio_drop,ratio_of_na,ratio_same_value,std_in_cluster,std_v
0,6739668,11,2.0,1.0,0.0,-1.0,-1.0,0,1,13,1,5,2.0,2.000000e+00,0.846154,0.846154,0.0,0.721602
1,6739669,5,1.0,0.0,1.0,-1.0,-1.0,1,7,31,7,5,1.0,1.000000e+08,0.741935,0.161290,0.0,0.437570
2,6739670,0,0.0,2.0,0.0,2.0,2.0,1,0,1,0,6,0.0,0.000000e+00,0.000000,1.000000,0.0,0.000000
3,6739671,4,1.0,2.0,3.0,2.0,1.0,1,6,28,0,5,1.0,5.000000e-01,0.750000,0.214286,0.0,0.349927
4,6739672,7,3.0,2.0,3.0,-1.0,-1.0,1,9,57,9,5,3.0,1.500000e+00,0.824561,0.122807,0.0,1.141025
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,6739763,6,1000.0,999.0,0.0,-1.0,-1.0,1,13,63,13,5,1000.0,1.001001e+00,0.777778,0.095238,0.0,415.739710
96,6739764,6,-0.0,999.0,0.0,999.0,999.0,1,13,63,0,4,0.0,-0.000000e+00,0.777778,0.222222,0.0,0.000000
97,6739765,6,1000.0,999.0,0.0,-1.0,-1.0,1,13,63,13,6,1000.0,1.001001e+00,0.777778,0.095238,0.0,415.739710
98,6739766,6,1000.0,999.0,0.0,-1.0,-1.0,3,13,62,13,4,1000.0,1.001001e+00,0.741935,0.096774,0.0,437.515012


In [448]:
def wrapper(item_id, selected):
    unique_stock_ids = [item for item in selected.STOCK_ID.unique() if item is not np.nan]
    n_unique_stock_id = len(unique_stock_ids) 

    random_n = random.randint(0, n_unique_stock_id-1)
    tmp = selected.loc[unique_stock_ids[random_n]]

    # The number of days
    n_days = len(tmp.STOCK_ID)

    null_arr = pd.isnull(tmp.STOCK_ID).values

    ratio_of_na = sum(null_arr) / float(n_days)

    consecutive_lst = [ sum( 1 for _ in group ) for key, group in itertools.groupby( null_arr ) if key ]

    # The max value of consecutive NAs 
    max_consecutive_na = max([0] + consecutive_lst)

    # The instances of consecutive NAs
    n_consecutive_na = len(consecutive_lst)

    # Define a stock array
    stock_arr = tmp.STOCK_AMOUNT.values

    # The medain
    median_v = np.nanmedian(stock_arr)

    # Std
    std_v = np.nanstd(stock_arr)

    # max, min
    max_v = np.nanmax(stock_arr)
    min_v = np.nanmin(stock_arr)

    # The range between max and min
    range_v = max_v - min_v

    stock_na_removed = stock_arr[~np.isnan(stock_arr)]

    consecutive_same_lst = [ sum( 1 for _ in group ) for key, group in itertools.groupby( stock_na_removed ) if key ]

    if len(consecutive_same_lst) == 0:
        ratio_same_value = 0
    else:
        ratio_same_value = max(consecutive_same_lst) / float(n_days)

    diff_stock_na_removed = np.diff(stock_na_removed)
        
    if len(diff_stock_na_removed) == 0:
        n_jumps = 0
        max_drop = 0
    else:
        n_jumps = sum(diff_stock_na_removed > 0)
        max_drop = -min(diff_stock_na_removed)

    tmp['STOCK_AMOUNT'] = tmp.STOCK_AMOUNT.replace(np.nan, -1)

    mean_in_cluster = 0
    std_in_cluster = 0
    n_cluster = 1
    if std_v != 0:

        n_cluster, label = get_label_from_dbscan(tmp)

        tmp['label'] = label
        arr_in_cluster = get_arr_in_cluster(tmp)

        if len(arr_in_cluster) > 0:

            mean_in_cluster = np.nanmean(arr_in_cluster)
            std_in_cluster = np.nanstd(arr_in_cluster)
            

            
    bundle = {
        'item_id': item_id,
#         'stock_id': tmp3.STOCK_ID.values[0],
        'n_unique_stock_id': n_unique_stock_id,
        'n_days': n_days,
        'ratio_of_na': ratio_of_na,
        'max_consecutive_na': max_consecutive_na,
        'n_consecutive_na': n_consecutive_na,
        'median_v': median_v,
        'std_v': std_v,
        'max_v': max_v,
        'ratio_drop': max_drop / float(max(max_v, 0.00000001)),
        'min_v': min_v,
        'range_v': range_v,
        'ratio_same_value': ratio_same_value,
        'n_jumps': n_jumps,
        'max_drop': max_drop,
        'n_cluster': n_cluster,
        'mean_in_cluster': mean_in_cluster,
        'std_in_cluster': std_in_cluster

    }

    return bundle

In [42]:
result_lst = []
for idx, group in batch.groupby('ITEM_ID'):
#     group
#     tmp = list(group_by_item_id.groupby('STOCK_ID'))[0][1]    
    result_lst.append(get_feature_engineered_bundle(group))
extracted_feature_df = pd.DataFrame([result for result in result_lst if result != None])

In [60]:
filtered_df, static_item_ids = get_filtered_fg_df(extracted_feature_df)

# filtered df
cleaned_item_ids = filtered_df.item_id.values
cleaned_df = batch[batch['ITEM_ID'].isin(cleaned_item_ids)]

# label extracted feature df
extracted_feature_df['condition_clean'] = 0
extracted_feature_df.loc[extracted_feature_df.item_id.isin(cleaned_item_ids), 'condition_clean'] = 1
extracted_feature_df.loc[extracted_feature_df.item_id.isin(static_item_ids), 'condition_clean'] = 2


In [61]:
# extracted_feature_df

In [62]:
# cleaned_df

In [63]:
df_lst =[]
cleaned_df = cleaned_df.drop(['ID', 'ADD_PRICE'], axis=1)
cleaned_df = cleaned_df.sort_values(by=['ITEM_ID', 'STOCK_ID', 'REG_DT'])

In [None]:
# for idx, group in list(cleaned_df.groupby('ITEM_ID')):
#     df_lst.append(get_sell_amount_by_item_id(group))

In [69]:
cleaned_df

Unnamed: 0,ITEM_ID,STOCK_ID,STOCK_AMOUNT,COLLECT_DAY,REG_ID,REG_DT
1483,6739697,28978093001,999,20180102,SERVER,2018-01-02 02:36:51
1773,6739697,28978093001,999,20180106,SERVER,2018-01-06 21:08:01
430,6739697,28978093001,999,20180111,SERVER,2018-01-11 11:08:29
2490,6739697,28978093001,999,20180115,SERVER,2018-01-15 19:56:44
2693,6739697,28978093001,999,20180121,SERVER,2018-01-21 15:08:15
...,...,...,...,...,...,...
2662,6739711,28978103003,999,20180303,SERVER,2018-03-03 00:21:42
1115,6739711,28978103003,999,20180304,SERVER,2018-03-04 01:49:26
1163,6739711,28978103003,999,20180305,SERVER,2018-03-05 00:48:41
2887,6739711,28978103003,999,20180305,SERVER,2018-03-05 16:52:25


In [89]:
def tmp(target):
    target = target.set_index('REG_DT')
    target = target.resample('1D').first()
    return target

In [93]:
cleaned_df

Unnamed: 0,ITEM_ID,STOCK_ID,STOCK_AMOUNT,COLLECT_DAY,REG_ID,REG_DT
1483,6739697,28978093001,999,20180102,SERVER,2018-01-02 02:36:51
1773,6739697,28978093001,999,20180106,SERVER,2018-01-06 21:08:01
430,6739697,28978093001,999,20180111,SERVER,2018-01-11 11:08:29
2490,6739697,28978093001,999,20180115,SERVER,2018-01-15 19:56:44
2693,6739697,28978093001,999,20180121,SERVER,2018-01-21 15:08:15
...,...,...,...,...,...,...
2662,6739711,28978103003,999,20180303,SERVER,2018-03-03 00:21:42
1115,6739711,28978103003,999,20180304,SERVER,2018-03-04 01:49:26
1163,6739711,28978103003,999,20180305,SERVER,2018-03-05 00:48:41
2887,6739711,28978103003,999,20180305,SERVER,2018-03-05 16:52:25


In [97]:
tmp = cleaned_df.groupby(['ITEM_ID', 'STOCK_ID', 'REG_DT']).first()

In [104]:
tmp

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,STOCK_AMOUNT,COLLECT_DAY,REG_ID
ITEM_ID,STOCK_ID,REG_DT,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
6739697,28978093001,2018-01-02 02:36:51,999,20180102,SERVER
6739697,28978093001,2018-01-06 21:08:01,999,20180106,SERVER
6739697,28978093001,2018-01-11 11:08:29,999,20180111,SERVER
6739697,28978093001,2018-01-15 19:56:44,999,20180115,SERVER
6739697,28978093001,2018-01-21 15:08:15,999,20180121,SERVER
...,...,...,...,...,...
6739711,28978103003,2018-03-03 00:21:42,999,20180303,SERVER
6739711,28978103003,2018-03-04 01:49:26,999,20180304,SERVER
6739711,28978103003,2018-03-05 00:48:41,999,20180305,SERVER
6739711,28978103003,2018-03-05 16:52:25,999,20180305,SERVER


In [110]:
tmp.loc['6739697', '28978093001']

Unnamed: 0_level_0,STOCK_AMOUNT,COLLECT_DAY,REG_ID
REG_DT,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2018-01-02 02:36:51,999,20180102,SERVER
2018-01-06 21:08:01,999,20180106,SERVER
2018-01-11 11:08:29,999,20180111,SERVER
2018-01-15 19:56:44,999,20180115,SERVER
2018-01-21 15:08:15,999,20180121,SERVER
...,...,...,...
2018-03-03 00:21:40,999,20180303,SERVER
2018-03-04 01:49:00,999,20180304,SERVER
2018-03-05 00:48:40,999,20180305,SERVER
2018-03-05 16:55:42,999,20180305,SERVER


In [101]:
tmp.loc[tmp.ITEM_ID == '6739697',:]

AttributeError: 'DataFrame' object has no attribute 'ITEM_ID'