# 读入函数 

In [None]:
import pandas as pd
import numpy as np
import os

import warnings
warnings.filterwarnings('ignore')

# 分组统计

In [None]:
def GroupStatCalc(data,list_hightype,list_midtype,list_lowtype,list_sensitype,list_risktype,list_accounttype,list_paytype,list_transtype,list_fail):
    '''
    input:
    data: dataframe, 原始数据
    list_hightype: list, 高频交易代码列表
    list_midtype: list, 中频交易代码列表
    list_lowtype: list, 低频交易代码列表
    list_sensitype: list, 敏感交易代码列表
    list_risktype: list, 风险交易代码列表
    list_accounttype: list, 账户操作交易代码列表
    list_paytype: list, 支付交易代码列表
    list_transtype: list, 转账交易代码列表
    list_fail: list, 失败交易代码列表
    return:
    result: dataframe, 变量大宽表, 其index为User_Id
    '''
    list_hightype = [xx.lower() for xx in list_hightype]
    list_midtype = [xx.lower() for xx in list_midtype]
    list_lowtype = [xx.lower() for xx in list_lowtype]
    list_sensitype = [xx.lower() for xx in list_sensitype]
    list_risktype = [xx.lower() for xx in list_risktype]
    list_accounttype = [xx.lower() for xx in list_accounttype]
    list_paytype = [xx.lower() for xx in list_paytype]
    list_transtype = [xx.lower() for xx in list_transtype]
    list_fail = [xx.lower() for xx in list_fail]
    data['Txn_Tm'] = pd.to_datetime(data['Txn_Tm'],errors='coerce')

    result = []
    #总交易笔数
    tmp = data.groupby(by='User_Id',sort=False)['User_Id'].count()
    tmp.name = 'TransNum'
    result.append(tmp.copy()) 
    print('总交易笔数')
    
    #周末、周中交易数量以及交易数量标准差
    weekday_map = {0:'Weekday',1:'Weekday',2:'Weekday',3:'Weekday',4:'Weekday',5:'Weekend',6:'Weekend'}
    weekday_names = ['Weekday','Weekend']
    data['tmp1_'] = data['Txn_Tm'].dt.weekday
    data['tmp1_'] = data['tmp1_'].map(lambda xx:weekday_map.get(xx,np.nan))
    data['tmp2_'] = data['Txn_Tm']
    data = data.set_index('tmp2_')
    data = data.to_period('W-SUN')
    data = data.reset_index()
    
    tmp = data.groupby(by=['User_Id','tmp1_'],sort=False)['tmp1_'].count()
    tmp = tmp.unstack('tmp1_')
    tmp.columns = tmp.columns+'Num'
    tmp = tmp.reindex(columns=[col+'Num' for col in weekday_names])
    tmp = tmp.fillna(0)
    result.append(tmp.copy())    
    
    tmp = data.groupby(by=['User_Id','tmp1_','tmp2_'],sort=False)['tmp1_'].count()
    tmp = tmp.groupby(level=['User_Id','tmp1_'],sort=False).std()
    tmp = tmp.unstack('tmp1_')
    tmp.columns = tmp.columns+'Std'
    tmp = tmp.reindex(columns=[col+'Std' for col in weekday_names])
    result.append(tmp.copy())
    
    data = data.drop(['tmp1_','tmp2_'],axis=1,errors='ignore')
    print('周末、周中交易数量以及交易数量标准差')
    
    #一天四个时段交易数量以及交易数量标准差+午夜账户操作笔数+午夜支付笔数+午夜敏感交易笔数+午夜风险交易笔数
    day_names=['Midnight','Morning','Afternoon','Evening']
    
    data['tmp2_'] = data['Txn_Tm']
    data = data.set_index('tmp2_')
    data = data.to_period('D')
    data = data.reset_index()
    
    data['tmp3_'] = data['Txn_Tm'].dt.hour
    data['tmp1_'] = np.nan
    data.loc[(data['tmp3_']>=0)&(data['tmp3_']<=7), 'tmp1_'] = 'Midnight'
    data.loc[(data['tmp3_']>=8)&(data['tmp3_']<=11), 'tmp1_'] = 'Morning'
    data.loc[(data['tmp3_']>=12)&(data['tmp3_']<=17), 'tmp1_'] = 'Afternoon'
    data.loc[(data['tmp3_']>=18)&(data['tmp3_']<=23), 'tmp1_'] = 'Evening'
    
    tmp = data.groupby(by=['User_Id','tmp1_'],sort=False)['tmp1_'].count()
    tmp = tmp.unstack('tmp1_')
    tmp.columns = tmp.columns+'Num'
    tmp = tmp.reindex(columns=[col+'Num' for col in day_names])
    tmp = tmp.fillna(0)
    result.append(tmp.copy())
    
    tmp = data.groupby(by=['User_Id','tmp1_','tmp2_'],sort=False)['tmp1_'].count()
    tmp = tmp.groupby(level=['User_Id','tmp1_'],sort=False).std()
    tmp = tmp.unstack('tmp1_')
    tmp.columns = tmp.columns+'Std'
    tmp = tmp.reindex(columns=[col+'Std' for col in day_names])
    result.append(tmp.copy())    
    
    data['tmp2_'] = data['Txn_Cd'].isin(list_accounttype).astype(int)
    tmp = data.loc[data['tmp1_']=='Midnight'].groupby(by='User_Id',sort=False)['tmp2_'].sum()
    tmp.name = 'MidnightAccountNum'
    result.append(tmp.copy())
    
    data['tmp2_'] = data['Txn_Cd'].isin(list_paytype).astype(int)
    tmp = data.loc[data['tmp1_']=='Midnight'].groupby(by='User_Id',sort=False)['tmp2_'].sum()
    tmp.name = 'MidnightPayNum'
    result.append(tmp.copy())    
    
    data['tmp2_'] = data['Txn_Cd'].isin(list_sensitype).astype(int)
    tmp = data.loc[data['tmp1_']=='Midnight'].groupby(by='User_Id',sort=False)['tmp2_'].sum()
    tmp.name = 'MidnightSensiNum'
    result.append(tmp.copy())
    
    data['tmp2_'] = data['Txn_Cd'].isin(list_risktype).astype(int)
    tmp = data.loc[data['tmp1_']=='Midnight'].groupby(by='User_Id',sort=False)['tmp2_'].sum()
    tmp.name = 'MidnightRiskNum'
    result.append(tmp.copy())
    
    data = data.drop(['tmp1_','tmp2_','tmp3_'],axis=1,errors='ignore')
    print('一天四个时段交易数量以及交易数量标准差+午夜账户操作笔数+午夜支付笔数+午夜敏感交易笔数+午夜风险交易笔数')
    
    #高中低频交易数量
    data['tmp_'] = data['Txn_Cd'].isin(list_hightype).astype(int)
    tmp = data.groupby(by='User_Id',sort=False)['tmp_'].sum()
    tmp.name = 'HighTypeNum'
    result.append(tmp.copy())    
    
    data['tmp_'] = data['Txn_Cd'].isin(list_midtype).astype(int)
    tmp = data.groupby(by='User_Id',sort=False)['tmp_'].sum()
    tmp.name = 'MidTypeNum'
    result.append(tmp.copy()) 

    data['tmp_'] = data['Txn_Cd'].isin(list_lowtype).astype(int)
    tmp = data.groupby(by='User_Id',sort=False)['tmp_'].sum()
    tmp.name = 'LowTypeNum'
    result.append(tmp.copy())     
    
    print('高中低频交易数量')
    
    #敏感交易数量
    data['tmp_'] = data['Txn_Cd'].isin(list_sensitype).astype(int)
    tmp = data.groupby(by='User_Id',sort=False)['tmp_'].sum()
    tmp.name = 'SensiTypeNum'
    result.append(tmp.copy()) 
    
    #风险交易数量
    data['tmp_'] = data['Txn_Cd'].isin(list_risktype).astype(int)
    tmp = data.groupby(by='User_Id',sort=False)['tmp_'].sum()
    tmp.name = 'RiskTypeNum'
    result.append(tmp.copy()) 
    
    #账户操作交易数量
    data['tmp_'] = data['Txn_Cd'].isin(list_accounttype).astype(int)
    tmp = data.groupby(by='User_Id',sort=False)['tmp_'].sum()
    tmp.name = 'AccountTypeNum'
    result.append(tmp.copy()) 
    
    #支付交易数量
    data['tmp_'] = data['Txn_Cd'].isin(list_paytype).astype(int)
    tmp = data.groupby(by='User_Id',sort=False)['tmp_'].sum()
    tmp.name = 'PayTypeNum'
    result.append(tmp.copy()) 
    
    #转账交易数量
    data['tmp_'] = data['Txn_Cd'].isin(list_transtype).astype(int)
    tmp = data.groupby(by='User_Id',sort=False)['tmp_'].sum()
    tmp.name = 'TransTypeNum'
    result.append(tmp.copy()) 
    
    #失败交易数量
    data['tmp_'] = data['Txn_Stat_Cd'].isin(list_fail).astype(int)
    tmp = data.groupby(by='User_Id',sort=False)['tmp_'].sum()
    tmp.name = 'FailNum'
    result.append(tmp.copy()) 
    
    print('敏感、风险、账户操作、支付、转账、失败交易数量')

    #IP地址数量
    tmp = data.drop_duplicates(subset=['User_Id','Source_IP']).groupby(by='User_Id',sort=False)['Source_IP'].count()
    tmp.name = 'IPNum'
    result.append(tmp.copy())
    print('IP地址数量')
    
    #合并数据
    result = pd.concat(result,axis=1,join='outer')

    #午夜特殊交易类型从数量转化为比率
    result['MidnightAccountNum'] = result['MidnightAccountNum']/result['MidnightNum']
    result['MidnightPayNum'] = result['MidnightPayNum']/result['MidnightNum']
    result['MidnightSensiNum'] = result['MidnightSensiNum']/result['MidnightNum']
    result['MidnightRiskNum'] = result['MidnightRiskNum']/result['MidnightNum']   
    
    result=result.rename(columns={'MidnightAccountNum','MidnightAccountPercent','MidnightPayNum','MidnightPayPercent',
                                  'MidnightSensiNum','MidnightSensiPercent','MidnightRiskNum','MidnightRiskPercent'})

    #将其他数量转化为比率    
    col_num2percent=['Weekend','Weekday','Midnight','Morning','Afternoon','Evening',
                     'HighType','MidType','LowType','SensiType','RiskType','AccountType',
                     'PayType','Fail']
    for col in col_num2percent:
        result[col+'Num'] = result[col+'Num']/result['TransNum']
    result = result.rename(columns={col+'Num':col+'Percent' for col in col_num2percent})
    
    result.index.name='User_Id'
    return result

# 前后变动次数

In [None]:
def IpFreqCalc(data):
    '''
    input:
    data: dataframe, 原始数据
    return:
    result: dataframe, 包含User_Id和IpFreq
    '''
    #根据User_Id和Txn_Tm对数据排序
    data['Txn_Tm'] = pd.to_datetime(data['Txn_Tm'],errors='coerce')
    data = data.sort_values(['User_Id','Txn_Tm'],ascending=True)
    #比较每条记录与前一条记录的IP地址
    data['tmp_'] = data['Source_IP'].shift(1)
    data['tmp_'] = (data['Source_IP']!=data['tmp_']).astype(int)
    #用tag_标记每组User_Id的第一条记录
    data['tag_'] = data['User_Id'].shift(1)
    data['tag_'] = (data['User_Id']!=data['tag_'])
    data.loc[data['tag_'],'tmp_'] = 0
    #计算每组User_Id内IP地址变动次数
    result = data.groupby(by='User_Id',sort=False)['tmp_'].sum()
    result.name = 'IpFreq'
    result.index.name='User_Id'
    print('IP地址变动次数完成')
    return result

# 连续最多次数

In [None]:
def ConFailNumCalc(data,list_fail):
    '''
    input:
    data: dataframe, 原始数据
    list_fail: list, 失败交易代码列表
    return:
    result: dataframe, 包含User_Id和ConFailNum
    '''
    #失败交易代码大写转小写
    list_fail = [xx.lower() for xx in list_fail]
    #根据User_Id和Txn_Tm对数据排序
    data['Txn_Tm'] = pd.to_datetime(data['Txn_Tm'],errors='coerce')
    data = data.sort_values(['User_Id','Txn_Tm'],ascending=True)
    #计算每组User_Id内连续失败交易笔数
    data['tmp1_'] = (data['Txn_Stat_Cd'].isin(list_fail)==False).astype(int)
    data['tmp2_'] = data['tmp1_'].cumsum()
    data = data.set_index('User_Id')
    data_last = data.groupby(level=['User_Id'],sort=False)['tmp2_'].last()
    data_last = data_last.shift(1).fillna(0)
    data['tmp2_'] = data['tmp2_']-data_last
    data = data.reset_index()
    #计算每组User_Id内连续失败最多交易笔数
    result = data.groupby(by=['User_Id','tmp2_'],sort=False)['tmp2_'].count()-1
    result = result.groupby(level=['User_Id'],sort=False).max()
    result.name = 'ConFailNum'
    result.index.name='User_Id'
    print('连续失败最多交易笔数完成')
    return result    

# 是否存在子列

In [None]:
def RiskPatCountCalcGroupby(xx, risk_pat):
    '''
    辅助函数，对于每组User_Id的数据，判断是否存在试探性交易规律
    input:
    xx: 二元组(dataframe.groupby对象中的一个元素), 第一项为User_Id, 第二项为dataframe交易数据
    risk_pat: list, 指定试探性交易规律
    return:
    tmp_result: dataframe, 包含User_Id和RiskPatCount
    '''
    df = xx[1].copy()
    tmp = sum(list(df['tmp_'])[i:i+len(risk_pat)] == risk_pat for i in range(len(df)))
    tmp_result = pd.DataFrame()
    tmp_result['RiskPatCount'] = [tmp]
    tmp_result['User_Id'] = xx[0]
    return tmp_result

def RiskPatCountCalc(data, list_type, list_fail, risk_pat):
    '''
    主体函数，判断是否存在试探性交易规律
    input:
    data: dataframe, 原始数据
    list_type: list, 风险交易类型
    list_fail: list, 失败交易代码
    risk_pat: list, 指定试探性交易规律
    return:
    result: dataframe, 包含User_Id和RiskPatCount
    '''
    #交易类型和交易代码大写转小写
    list_type = [xx.lower() for xx in list_type]
    list_fail = [xx.lower() for xx in list_fail]
    #根据User_Id和Txn_Tm对数据排序
    data['Txn_Tm'] = pd.to_datetime(data['Txn_Tm'],errors='coerce')
    data = data.sort_values(['User_Id','Txn_Tm'],ascending=True)
    #判断每组User_Id内是否存在试探性交易
    data['tmp_'] = (data['Txn_Stat_Cd'].isin(list_fail))&(data['Txn_Cd'].isin(list_type)).astype(int)
    data_by = data.groupby(by='User_Id',sort=False)
    data_list = list(map(lambda xx:RiskPatCountCalcGroupby(xx,risk_pat), data_by))
    result = pd.concat(data_list)
    print('是否存在试探性交易规律完成')
    return result

# 滑动窗口

## 计算每笔交易最近某段时间内的交易笔数 

In [None]:
def RecentTransNumCalcGroupby(xx, col_time, col_time_before, col_out):
    '''
    辅助函数，对于每组User_Id的数据，计算每笔交易最近某段时间内（比如最近10分钟）的交易笔数
    input:
    xx: 二元组(dataframe, groupby对象中的一个元素), 第一项为User_Id, 第二项为dataframe交易数据
    col_time: list, 交易时间对应的列名, 一般为Txn_Tm
    col_time_before: list, 元素为交易时间减去某个时间段后的时间对应的列名, 如recent10m_time
    col_out: list, 元素为计算得到的列名, 如Recent10mTransNum, 需要与col_time_before对应
    return:
    result: dataframe, 包含User_Id, Event_Id以及col_out中的字段
    '''
    df =xx[1].copy()
    for i,col in enumerate(col_time_before):
        df[col+'_start_index'] = df[col_time].values.searchsorted(df[col],side='right')
        df[col+'_end_index'] = np.arange(df.shape[0])
        result_tmp = df[col+'_end_index'] - df[col+'_start_index'] + 1
        result_tmp.name = col_out[i]
        result_tmp = pd.DataFrame(result_tmp)
        result_tmp['User_Id'] = xx[0]
        result_tmp['Event_Id'] = df['Event_Id']
        if i==0:
            result = result_tmp.copy()
        else:
            result = pd.merge(result,result_tmp,on=['Event_Id','User_Id'])       
    return result
            
def RecentTransNumCalc(rawdata):
    '''
    主体函数，计算每笔交易最近某段时间内（比如最近10分钟）的交易笔数
    input:
    rawdata: dataframe, 原始数据
    return:
    result: dataframe, 包含User_Id, Event_Id以及统计字段
    '''        
    data = rawdata.copy()
    #根据User_Id和Txn_Tm对数据排序
    data['Txn_Tm'] = pd.to_datetime(data['Txn_Tm'],errors='coerce')
    data = data.sort_values(by=['User_Id','Txn_Tm'])
    #确定滑动窗口区间
    data['recent10m_time'] = data['Txn_Tm'] - pd.DateOffset(minutes=10)
    data['recent30m_time'] = data['Txn_Tm'] - pd.DateOffset(minutes=30)         
    data['recent60m_time'] = data['Txn_Tm'] - pd.DateOffset(minutes=60)           
    data['recent1_time'] = data['Txn_Tm'] - pd.DateOffset(days=1)            
    data['recent3_time'] = data['Txn_Tm'] - pd.DateOffset(days=3)  
    data['recent7_time'] = data['Txn_Tm'] - pd.DateOffset(days=7)  
    data['recent30_time'] = data['Txn_Tm'] - pd.DateOffset(days=30)    
    data_by = data.groupby(by='User_Id',sort=False)
    #计算每组User_Id内每笔交易最近某段时间内的交易笔数
    col_time_before_out_maps = {'recent10m_time':'Recent10mTransNum','recent30m_time':'Recent30mTransNum',
                                'recent60m_time':'Recent60mTransNum','recent1_time':'Recent1TransNum',
                                'recent3_time':'Recent3TransNum','recent7_time':'Recent7TransNum',
                                'recent30_time':'Recent30TransNum'}
    col_time = 'Txn_Tm'
    col_time_before = {'recent10m_time','recent30m_time','recent60m_time','recent1_time',
                       'recent3_time','recent7_time','recent30_time'}
    col_out = [col_time_before_out_maps[col] for col in col_time_before]
    data_list = list(map(lambda xx: RecentTransNumCalcGroupby(xx,col_time,col_time_before,col_out), data_by))
    result = pd.concat(data_list)
    print('每笔交易最近某段时间内的交易笔数完成')
    return result

## 计算每笔交易最近某段时间内某种交易比例 

In [None]:
def RecentXXXTransNumCalcGroupby(xx, col_time, col_time_before, col_num, col_out):
    '''
    辅助函数，对于每组User_Id的数据，计算每笔交易前某段时间内（比如最近十分钟）某种交易（比如敏感交易）比例
    input:
    xx: 二元组(dataframe, groupby对象中的一个元素), 第一项为User_Id, 第二项为dataframe交易数据
    col_time: list, 交易时间对应的列名, 一般为Txn_Tm
    col_time_before: list, 元素为交易时间减去某个时间段后的时间对应的列名, 如recent10m_time
    col_num: list, 元素为是否符合某种交易对应的列名, 如sensi, 需要与col_time_before对应; 
             一般来源形如data['Txn_Cd'],isin(list_sensitype).astype(int)
    col_out: list, 元素为计算得到的列名, 如Recent10mSensiTypeNum, 需要与col_time_before对应
    return:
    result: dataframe, 包含User_Id, Event_Id以及col_out中的字段
    '''
    df = xx[1].copy()
    for i,col in enumerate(col_time_before):
        df[col+'_start_index'] = df[col_time].values.searchsorted(df[col],side='right')
        df.loc[df[col+'_start_index']>0,col+'_start_index'] = df.loc[df[col+'_start_index']>0,col+'_start_index']-1
        df[col+'_end_index'] = np.arange(df.shape[0])
        df[col+'_cumsum'] = df[col_num[i]].cumsum()
        df[col+'_start_sum'] = df[col+'_cumsum'].iloc[df[col+'_start_index']].values
        df[col+'_end_sum'] = df[col+'_cumsum'].iloc[df[col+'_end_index']].values
        result_tmp = df[col+'_end_sum'] - df[col+'_start_sum']
        result_tmp.name = col_out[i]
        result_tmp = pd.DataFrame(result_tmp)
        result_tmp['User_Id'] = xx[0]
        result_tmp['Event_Id'] = df['Event_Id']
        if i==0:
            result = result_tmp.copy()
        else:
            result = pd.merge(result,result_tmp,on=['Event_Id','User_Id'])
    return result

def RecentXXXTransNumCalc(rawdata,list_sensitype,list_risktype,list_fail):
    '''
    主体函数, 计算每笔交易前某段时间内（比如最近十分钟）某种交易（比如敏感类型）比例
    input:
    rawdata: dataframe, 原始数据
    list_sensitype: list, 敏感交易类型
    list_risktype: list, 风险交易类型
    list_fail: list, 失败交易代码列表
    return:
    result: dataframe, 包含User_Id, Event_Id以及统计字段
    '''
    data = rawdata.copy()

    data['Txn_Tm'] = pd.to_datetime(data['Txn_Tm'],errors='coerce')
    data = data.sort_values(by=['User_Id','Txn_Tm'])
    #Recent60mSensiTypeNum
    data['recent60m_time_sensi'] = data['Txn_Tm'] - pd.DateOffset(minutes=60)
    data['recent60m_sensi_num'] = data['Txn_Cd'].isin(list_sensitype).astype(int)
    #Recent60mRiskTypeNum
    data['recent60m_time_risk'] = data['Txn_Tm'] - pd.DateOffset(minutes=60)
    data['recent60m_risk_num'] = data['Txn_Cd'].isin(list_risktype).astype(int)
    #Recent7FailNum
    data['recent7_time_fail'] = data['Txn_Tm'] - pd.DateOffset(days=7)
    data['recent7_fail_num'] = data['Txn_Cd'].isin(list_fail).astype(int)  
    data_by = data.groupby(by='User_Id',sort=False)
    
    col_time_before_num_maps = {'recent60m_time_sensi':'recent60m_sensi_num',
                                'recent60m_time_risk':'recent60m_risk_num',
                                'recent7_time_fail':'recent7_fail_num'}
    col_time_before_out_maps = {'recent60m_time_sensi':'Recent60mSensiTypeNum',
                                'recent60m_time_risk':'Recent60mRiskTypeNum',
                                'recent7_time_fail':'Recent7FailNum'} 
    col_time = 'Txn_Tm'
    col_time_before = ['recent60m_time_sensi','recent60m_time_risk','recent7_time_fail']
    col_num = [col_time_before_num_maps[col] for col in col_time_before]
    col_out = [col_time_before_out_maps[col] for col in col_time_before]
    data_list = list(map(lambda xx: RecentXXXTransNumCalcGroupby(xx,col_time,col_time_before,col_num,col_out),data_by))
    result = pd.concat(data_list)
    
    print('每笔交易最近某段时间内某种交易比例')
    return result

## 计算每笔交易与前X笔交易时间间隔

In [None]:
def RecentXTimeGapCalc(data, X=1):
    '''
    计算每笔交易与前X笔交易时间间隔
    input:
    data: dataframe, 原始数据
    X: int, 前X笔
    return:
    result: dataframe, 包含Event_Id, User_Id, RecentXTimeGap, 其中会根据X的不同替换对应的列名
    '''
    data = data.fillna('')
    data['Txn_Tm'] = pd.to_datetime(data['Txn_Tm'],errors='coerce')
    data = data.sort_values(['User_Id','Txn_Tm'],ascending=True)
    data['tmp_'] = data['Txn_Tm'].shift(X)
    data['tmp_'] = (data['Txn_Tm']-data['tmp_']).astype('timedelta64[s]')/60
    
    #用tag_标记每组的前X条记录
    data['tag_'] = data['User_Id'].shift(X)
    data.loc[data['tag_']!=data['User_Id'],'tmp_'] = np.nan
    
    result = data[['Event_Id','User_Id','tmp_']].copy()
    data = data.drop(['tmp_','tag_'],axis=1,errors='ignore')
    result = result.rename(columns={'tmp_':'Recent%dTimeGap' %X})
    
    print('每笔交易与前X笔交易时间间隔')    
    return result

# 滑动窗口内的统计

In [None]:
def RecentStatCalc(data):
    '''
    滑动窗口内的统计
    input:
    data: dataframe, 滑动窗口中间表, 应该有User_Id, Event_Id和其他数据列
    return:
    result: dataframe, 滑动窗口相关变量的大宽表, 其index为User_Id
    '''
    result = []       
    
    #每笔交易前60分钟内交易笔数中位数、标准差
    tmp = data.groupby('User_Id',sort=False)['Recent60mTransNum'].median()
    tmp.name = 'Recent60mTransNumMed'
    result.append(tmp.copy())

    tmp = data.groupby('User_Id',sort=False)['Recent60mTransNum'].std()
    tmp.name = 'Recent60mTransNumStd'
    result.append(tmp.copy())        
        
    #每笔交易前30天内交易笔数中位数、标准差
    tmp = data.groupby('User_Id',sort=False)['Recent30TransNum'].median()
    tmp.name = 'Recent30TransNumMed'
    result.append(tmp.copy())

    tmp = data.groupby('User_Id',sort=False)['Recent30TransNum'].std()
    tmp.name = 'Recent30TransNumStd'
    result.append(tmp.copy())         
            
    #每笔交易与前3、10笔交易时间间隔中位数、标准差
    tmp = data.groupby('User_Id',sort=False)['Recent3TimeGap'].median()
    tmp.name = 'Recent3TimeGapMed'
    result.append(tmp.copy())

    tmp = data.groupby('User_Id',sort=False)['Recent3TimeGap'].std()
    tmp.name = 'Recent3TimeGapStd'
    result.append(tmp.copy())            
            
    #每笔交易前60分钟内敏感交易类型比例中位数
    data['Recent60mSensiTypePercent'] = data['Recent60mSensiTypeNum']/data['Recent60mTransNum']   
    data['Recent7FailPercent'] = data['Recent7FailNum']/data['Recent7TransNum']

    tmp = data.groupby('User_Id',sort=False)['Recent60mSensiTypePercent'].median()
    tmp.name = 'Recent60mSensiTypePercentMed'
    result.append(tmp.copy()) 

    #每笔交易前7天内失败交易比率中位数
    tmp = data.groupby('User_Id',sort=False)['Recent7FailPercent'].median()
    tmp.name = 'Recent7FailPercentMed'
    result.append(tmp.copy())    

    result = pd.concat(result,axis=1)
    result.index.name = 'User_Id'
    
    print('滑动窗口统计量')  
    return result

# 计算大宽表

In [None]:
##################################main##################################
path = '/usr/local/workspace/'
os.chdir(path)

#读入各类交易类型划分标准
maps = pd.read_excel('resource/电子银行交易类型汇总.xlsx',sheetname=None)

list_hightype = list(maps['高频交易']['Txn_Cd'])
list_midtype = list(maps['中频交易']['Txn_Cd'])
list_lowtype = list(maps['低频交易']['Txn_Cd'])
list_sensitype = list(maps['敏感交易']['Txn_Cd'])
list_risktype = list(maps['风险交易']['Txn_Cd'])
list_accounttype = list(maps['账户操作交易']['Txn_Cd'])
list_paytype = list(maps['支付交易']['Txn_Cd'])
list_transtype = list(maps['转账交易']['Txn_Cd'])
list_querytype = list(maps['查询交易']['Txn_Cd'])
list_fail = list(maps['失败交易']['Txn_Stat_Cd'])

for i in [0,1,2,3,4]:
    #读取原始数据
    data_select = pd.read_csv('/data1/sample_bigtable/data_select_%d.csv'%i,dtype=str)
    data = data_select.drop(['Txn_Amt','Currency_Cd','Channel_Type','Login_Type_Cd','Cust_Acct_Type_Cd','Event_Type_Cd'],axis=1,errors='ignore')
    data = data.fillna('')
    data = data.applymap(lambda xx:xx.strip())
    print('原始数据读取成功')
    
    #计算分组统计量
    result = GroupStatCalc(data=data,list_hightype=list_hightype,list_midtype=list_midtype,list_lowtype=list_lowtype,
                             list_sensitype=list_sensitype,list_risktype=list_risktype,list_accounttype=list_accounttype,
                             list_paytype=list_paytype,list_transtype=list_transtype,list_querytype=list_querytype,list_fail=list_fail)
    result.to_csv('/data1/sample_bigtable/bigtable_groupstat_%d.csv'%i,index=True,header=True,encoding='utf-8')
    print('分组统计变量计算成功')
    
    #计算前后变动次数
    result = IpFreqCalc(data)
    result.to_csv('/data1/sample_bigtable/bigtable_ipfreq_%d.csv'%i,index=True,header=True,encoding='utf-8')
    print('IpFreq计算成功')
    
    #计算连续最多次数
    result = ConFailNumCalc(data,list_fail=list_fail)
    result.to_csv('/data1/sample_bigtable/bigtable_confailnum_%d.csv'%i,index=True,header=True,encoding='utf-8')
    print('ConFailNum计算成功')
    
    #计算是否存在子列
    risk_pat = [True,False,True,False,True,False]
    result = RiskPatCountCalc(data=data,list_type=list_risktype,list_fail=list_fail,risk_pat=risk_pat)
    result_risk.columns = ['RiskPatCount','User_Id']
    result_risk.index = range(len(result_risk))
    result.to_csv('/data1/sample_bigtable/bigtable_riskpatcount_%d.csv'%i,index=True,header=True,encoding='utf-8')
    print('RiskPatCount计算成功')
    
    #计算滑动窗口
    result = RecentTransNumCalc(data)
    result_tmp = RecentXXXTransNumCalc(data,list_sensitype,list_risktype,list_fail)
    result = pd.merge(result,result_tmp,on=['User_Id','Event_Id'],how='outer')
    
    tmp = RecentXTimeGapCalc(data,X=3)
    del tmp['User_Id']
    result = pd.merge(result,tmp,on='Event_Id')

    result.to_csv('/data1/sample_bigtable/bigtable_recentstatmiddle_%d.csv'%i,index=None,header=True,encoding='utf-8')
    print('滑动窗口中间表计算成功')
    
    #计算滑动窗口统计量
    result = RecentStatCalc(result)
    result.to_csv('/data1/sample_bigtable/bigtable_recentstat_%d.csv'%i,index=None,header=True,encoding='utf-8')
    print('滑动窗口统计量计算成功')
    
    #合并所有变量
    result_groupstat = pd.read_csv('/data1/sample_bigtable/bigtable_groupstat_%d.csv'%i,converters={'User_Id':str})
    result_ipfreq = pd.read_csv('/data1/sample_bigtable/bigtable_ipfreq_%d.csv'%i,converters={'User_Id':str})
    result_confailnum = pd.read_csv('/data1/sample_bigtable/bigtable_confailnum_%d.csv'%i,converters={'User_Id':str})
    result_riskpatcount = pd.read_csv('/data1/sample_bigtable/bigtable_riskpatcount_%d.csv'%i,converters={'User_Id':str})
    result_recentstat = pd.read_csv('/data1/sample_bigtable/bigtable_recentstat_%d.csv'%i,converters={'User_Id':str})
    
    result = pd.merge(result_groupstat,result_ipfreq,on='User_Id')
    result = pd.merge(result,result_confailnum,on='User_Id')
    result = pd.merge(result,result_riskpatcount,on='User_Id')
    result = pd.merge(result,result_recentstat,on='User_Id')
    
    #填补缺失
    fill_values={}
    for col in result.columns.tolist():
        if col == 'User_Id':
            continue
        if col.endswith('Std'):
            fill_values[col] = 0
        else:
            fill_values[col] = result[col].median()
    result = result.fillna(fill_values)
    
    result.to_csv('/data1/sample_bigtable/bigtable_final_%d.csv'%i,index=None,header=True,encoding='utf-8')
    print('第%d张大宽表计算成功'%i)