### Basic Feature Generation(on Spark remote)

In [2]:
import pandas as pd
import numpy as np
import seaborn as sns
from matplotlib import pyplot as plt 
import datetime

In [3]:
class Config:
    pass
config = pd.read_pickle('config.pkl')
print dir(config)

data_path = '../../kaggleData/JD_logging/'
feature_path = '../../kaggleData/JD_logging/features/'

['__doc__', '__module__', 'data_path', 'feature_dict', 'feature_path', 'result_path', 'single_module_validation_indice_set', 'trade_train_size', 'train_2_6_index']


In [4]:
login_tt = pd.read_csv(data_path+'login_tt.csv')
trade_tt = pd.read_csv(data_path+'trade_tt.csv')

login_tt['time'] = login_tt['time'].apply(lambda x : datetime.datetime.strptime(x , '%Y-%m-%d %H:%M:%S'))
trade_tt['time'] = trade_tt['time'].apply(lambda x : datetime.datetime.strptime(x , '%Y-%m-%d %H:%M:%S'))

In [5]:
print 'test'

test


In [37]:
def find_related_logins_before(row,login_table,*args,**kw):
    related_logins = login_table[login_table['id'] == row.id]
    related_logins_before = related_logins[related_logins['time']<row.time]
    return related_logins_before    

def find_related_recent_logins_within_days(row,login_table,days,*args,**kw):    
    recent_logins = find_related_logins_before(row,login_table)
    if len(recent_logins)>0:
        recent_logins['from_now'] =  row.time - recent_logins['time']
        return recent_logins[recent_logins['from_now']<datetime.timedelta(days = days)]
    else:
        recent_logins['from_now'] = np.nan
        return recent_logins[recent_logins['from_now']<datetime.timedelta(days = days)]
        

def find_related_trades_before(row,trade_table,*args,**kw):
    related_trades = trade_table[trade_table['id'] == row.id]
    related_trades_before = related_trades[related_trades['time']<row.time]
    return related_trades_before    

def find_related_recent_trades_within_days(row,trade_table,days,*args,**kw):    
    recent_trades = find_related_trades_before(row,trade_table)
    if len(recent_trades)>0:
        recent_trades['from_now'] =  row.time - recent_trades['time']
        return recent_trades[recent_trades['from_now']<datetime.timedelta(days = days)]  
    else:
        recent_trades['from_now'] = np.nan
        return recent_trades[recent_trades['from_now']<datetime.timedelta(days = days)]

In [38]:
def get_multiple_feature_dicts_wihtin_days(row,login_table,trade_table):
    date_range = [360,30,15,7,3,1]
    result_dict = {}
    for days in date_range:
        if days ==360:
            recent_trade_table = find_related_recent_trades_within_days(row,trade_table,days)
            recent_login_table = find_related_recent_logins_within_days(row,login_table,days)
        else:
            recent_trade_table = find_related_recent_trades_within_days(row,recent_trade_table,days)
            recent_login_table = find_related_recent_logins_within_days(row,recent_login_table,days)
        
        result_dict[days] = build_statistical_feature_dict(recent_login_table,recent_trade_table)    
    return result_dict

def build_statistical_feature_dict(recent_login_table,recent_trade_table,*args,**kw):
    """
    ID交易次数
    #最近的前一次交易时间 - 在顶层使用
    ID登录次数
    交易/登录次数比
    ID登录成功次数（大于零的项）
    ID登录失败次数（小于零的项）
    ID登录成功比率
    交易/成功登录次数比，交易/失败次数比
    是否有连续login失败
    login失败到下一次尝试的平均时间、最大时间、最小时间、时间中位数、方差
    timelong平均值，最大值，最小值，中位数，方差
    timelong方差（仅一个时为0或N/A）
    """
    result_dict = {}
    
    trade_times = len(recent_trade_table)
    login_times = len(recent_login_table)
    
    login_success_times = np.sum(recent_login_table['result']>0)
    login_fail_times = np.sum(recent_login_table['result']<0)
    
    result_dict['trade_times'] = trade_times
    result_dict['login_times'] = login_times
    result_dict['login_success_times'] = login_success_times
    result_dict['login_fail_times'] = login_fail_times
    
    if login_times ==0:
        result_dict['trade_login_rate'] = -10
    else:
        result_dict['trade_login_rate'] = trade_times*1.0/login_times
        
    if login_times ==0:
        result_dict['login_success_rate'] = -10
    else:
        result_dict['login_success_rate'] = login_success_times*1.0/login_times
    
    if login_success_times ==0:
        result_dict['trade_login_success_rate'] = -10
    else:
        result_dict['trade_login_success_rate'] = trade_times*1.0/login_success_times
    
    if login_fail_times ==0:
        result_dict['trade_login_fail_rate'] = -10
    else:
        result_dict['trade_login_fail_rate'] = trade_times*1.0/login_fail_times
    
    result_dict['multiple_fails'] = lower_than_zero_more_than_once(recent_login_table['result'])
    result_dict['after_fail_mean'],result_dict['after_fail_max'],result_dict['after_fail_min'],result_dict['after_fail_med']\
    ,result_dict['after_fail_std'] = get_averge_fail_to_success_time(recent_login_table)
    
    timelong_series =  np.log(recent_login_table['timelong']+1).dropna()
    if len(timelong_series) == 0:
        result_dict['timelong_mean'] = -10
        result_dict['timelong_max'] = -10
        result_dict['timelong_min'] = -10
        result_dict['timelong_med'] = -10
        result_dict['timelong_std'] = -10
    else:
        result_dict['timelong_mean'] = np.mean(timelong_series)
        result_dict['timelong_med'] = np.median(timelong_series)
        result_dict['timelong_min'] = np.min(timelong_series)
        result_dict['timelong_max'] = np.max(timelong_series)
        if len(timelong_series) > 1:
            result_dict['timelong_std'] =  np.std(timelong_series)
        else:
            result_dict['timelong_std'] = -10
    
    return result_dict

def lower_than_zero_more_than_once(sequence):
    if len(sequence)>2:
        sequence = list(sequence)
        for i in range(len(sequence)-1):
            if sequence[i] < 0:
                if sequence[i+1]<0:
                    return True
    return False

def get_averge_fail_to_success_time(recent_login_table):
    login_table_process = recent_login_table[['result','time']].sort_values(by = 'time')
    login_fail_times = np.sum(login_table_process['result'])
    
    if login_fail_times<1 or len(login_table_process)<2:
        return (-10,-10,-10,-10,-10)
    
    time_delta_list = []
    for i in range(len(login_table_process)-1):
        if login_table_process.iloc[i].result < 0:
            time_delta_list.append(login_table_process.iloc[i+1].time - login_table_process.iloc[i].time)
            
    time_delta_list = np.log(list(map(lambda x: x.total_seconds(),time_delta_list))+1)
    
    if len(time_delta_list) < 2:
        std_return = -10
    else:
        std_return = np.std(time_delta_list)
        
    if len(time_delta_list) ==0:
        return -10, -10, -10, -10,-10
    
    return np.mean(time_delta_list),np.max(time_delta_list),np.min(time_delta_list),np.median(time_delta_list),std_return

In [39]:
#trade_tt['stat_result_dicts'] = trade_tt.apply(lambda row : get_multiple_feature_dicts_wihtin_days(row,login_tt,trade_tt),axis = 1)
#trade_tt.to_pickle(data_path+'trade_tt_stat_C_temp.pkl')
"""
works on spark
conf = spark.SparkConf().setAppName('jupyter_backend').setMaster('local[7]')\
        .set('spark.executor.memory','2g')\
        .set('spark.default.parallelism','112')
sc = spark.SparkContext(conf=conf)

#packing the rdd for spark
trade_tt_rdd_buffer = []
for (idx,row) in trade_tt.iterrows():
    trade_tt_rdd_buffer.append(row)
trade_tt_rdd = sc.parallelize(trade_tt_rdd_buffer)

result_rdd = trade_tt_rdd.map(lambda x : get_multiple_feature_dicts_wihtin_days_with_rowkey(x,login_tt,trade_tt))
result_rdd_buffer = result_rdd.collect()

#getting the new feature names
recent_trade_example=find_related_recent_trades_within_days(trade_tt.loc[0],trade_tt,30)
recent_login_example=find_related_recent_logins_within_days(trade_tt.loc[0],login_tt,30)

date_range_list = [1,3,7,15,30,360]
feature_list = list(build_statistical_feature_dict(recent_login_example,recent_trade_example).keys())

#unstacking the result_rdd_dict
result_rdd_to_df_buffer = []
for rowkey,result_dict in result_rdd_buffer:
    unit_dict= {}
    unit_dict['rowkey'] = rowkey
    
    for date_range in date_range_list:
        for feature in feature_list:
            unit_dict[feature+'_'+str(date_range)] = result_dict[date_range][feature]
    
    result_rdd_to_df_buffer.append(unit_dict)
result_df = pd.DataFrame(result_rdd_to_df_buffer)
"""

"\nworks on spark\nconf = spark.SparkConf().setAppName('jupyter_backend').setMaster('local[7]')        .set('spark.executor.memory','2g')        .set('spark.default.parallelism','112')\nsc = spark.SparkContext(conf=conf)\n\n#packing the rdd for spark\ntrade_tt_rdd_buffer = []\nfor (idx,row) in trade_tt.iterrows():\n    trade_tt_rdd_buffer.append(row)\ntrade_tt_rdd = sc.parallelize(trade_tt_rdd_buffer)\n\nresult_rdd = trade_tt_rdd.map(lambda x : get_multiple_feature_dicts_wihtin_days_with_rowkey(x,login_tt,trade_tt))\nresult_rdd_buffer = result_rdd.collect()\n\n#getting the new feature names\nrecent_trade_example=find_related_recent_trades_within_days(trade_tt.loc[0],trade_tt,30)\nrecent_login_example=find_related_recent_logins_within_days(trade_tt.loc[0],login_tt,30)\n\ndate_range_list = [1,3,7,15,30,360]\nfeature_list = list(build_statistical_feature_dict(recent_login_example,recent_trade_example).keys())\n\n#unstacking the result_rdd_dict\nresult_rdd_to_df_buffer = []\nfor rowkey,res

In [6]:
#load from spark generated features
trade_tt_feature_c = pd.read_csv(data_path+'temp/feature_set_c.csv')
print np.sum(trade_tt_feature_c['rowkey']!=trade_tt['rowkey'])
del trade_tt_feature_c['rowkey']
trade_tt = pd.concat([trade_tt,trade_tt_feature_c],axis = 1)
print trade_tt.shape

0
(150594, 119)


In [7]:
login_trade_hist_stats_feature_list = list(trade_tt.columns)
login_trade_hist_stats_feature_list.remove('rowkey')
login_trade_hist_stats_feature_list.remove('time')
login_trade_hist_stats_feature_list.remove('id')
login_trade_hist_stats_feature_list.remove('from')

In [8]:
trade_tt = trade_tt.fillna(-10)

In [9]:
for feature in login_trade_hist_stats_feature_list:
    pd.to_pickle(trade_tt[feature].values,feature_path+feature+'.pkl')

In [1]:
print 'test'

test


### 检测特征空值率

In [32]:
feature_emptyness = {}
feature_emptyness_list = []
data_size = trade_tt.shape[0]
for feature in login_trade_hist_stats_feature_list:
    feature_emptyness[feature] = np.sum(trade_tt[feature] == -10)*1.0/data_size
    feature_emptyness_list.append((feature,feature_emptyness[feature]))

In [33]:
sorted(feature_emptyness_list,key = lambda x : x[1])

[('login_fail_times_1', 0.0),
 ('login_fail_times_15', 0.0),
 ('login_fail_times_3', 0.0),
 ('login_fail_times_30', 0.0),
 ('login_fail_times_360', 0.0),
 ('login_fail_times_7', 0.0),
 ('login_success_times_1', 0.0),
 ('login_success_times_15', 0.0),
 ('login_success_times_3', 0.0),
 ('login_success_times_30', 0.0),
 ('login_success_times_360', 0.0),
 ('login_success_times_7', 0.0),
 ('login_times_1', 0.0),
 ('login_times_15', 0.0),
 ('login_times_3', 0.0),
 ('login_times_30', 0.0),
 ('login_times_360', 0.0),
 ('login_times_7', 0.0),
 ('multiple_fails_1', 0.0),
 ('multiple_fails_15', 0.0),
 ('multiple_fails_3', 0.0),
 ('multiple_fails_30', 0.0),
 ('multiple_fails_360', 0.0),
 ('multiple_fails_7', 0.0),
 ('trade_times_1', 0.0),
 ('trade_times_15', 0.0),
 ('trade_times_3', 0.0),
 ('trade_times_30', 0.0),
 ('trade_times_360', 0.0),
 ('trade_times_7', 0.0),
 ('login_success_rate_360', 0.079050723893623581),
 ('trade_login_rate_360', 0.079050723893623581),
 ('timelong_max_360', 0.0790828113