In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('test').getOrCreate()

# Train(T+7)

In [6]:
#/bin/python2
# set spark.driver.maxResultSize=10g
# set spark.driver.memory=20g
# set spark.driver.memoryOverhead=12g
# set spark.executor.memory=25g
# set spark.executor.memoryOverhead=12g
# set spark.sql.execution.arrow.enabled=true

import pandas as pd
import numpy as np
import math
import sys
import json
import time
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, ArrayType, LongType, FloatType
import pyspark.sql.functions as F 
from sklearn import svm
from sklearn.externals import joblib
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

In [7]:
"""
以最大的窗口向后获取数据，供给pagerank计算node的流出各边的权重
输入:
    sdf: 从千维度表中筛选有用columns后，得到的小表
    window: 窗口大小，单位为秒
输出:
    表: [平台名，时间差，窗口内的下一条记录的平台名]
"""
def forward_for_pr(spark, sdf, window=420):
    bc_window = spark.sparkContext.broadcast(window)
    def func(row):
        result = []
        # 按时间，再按sequence_id排序，避免统一时间的记录的排序不稳定
        items = sorted(row[1], key=lambda x: (x[1], x[0]))
        for i in range(1, len(items)):
            if items[i][1] - items[i-1][1] <= bc_window.value:
                if items[i][3] != items[i-1][3]:
                    result.append((items[i-1][3], items[i][1] - items[i-1][1], items[i][3]))
            else:
                result.append((items[i-1][3], -1.0, ''))
        return result
    schema = StructType([StructField('partnercode', StringType(),True),
                         StructField('delta_time', FloatType(),True),
                         StructField('fromplat', StringType(),True)])
    # 按身份证聚合，然后按时间和流水号排序，将时间间隔在窗口内的数据作为关联数据，其余为单独数据
    sdf = sdf.rdd.map( lambda x: ( x.rqst_idnumber, x ) ).groupByKey().flatMap(func).toDF(schema)
    return sdf

In [8]:
"""
以最大的窗口向前获取数据，供给fp-growth等计算关联，因为我们希望将A->B中的B去除，优势在于B可能有多条
输入:
    sdf: 从千维度表中筛选有用columns后，得到的小表
    window: 窗口大小，单位为秒
输出:
    表: [流水号，发生时间，身份证号，平台名，设备码，用户标记的引流标识，时间差，窗口内的上一条记录的平台名]
"""
def backward_for_fp(spark, sdf, window=420):
    bc_window = spark.sparkContext.broadcast(window)
    def func1(part):
        result = []
        for row in part:
            result.append([ row[2], list(row) ])
        return result
    
    def func2(part):
        result = []
        for row in part:
            items = sorted(list(row[1]), key=lambda x: (x[1], x[0]))
            for i in range(1, len(items)):
                if items[i][1] - items[i-1][1] <= bc_window.value:
                    if items[i][3] != items[i-1][3]:
                        result.append( items[i] + [ int(items[i][1] - items[i-1][1]), items[i-1][3] ])
                else:
                    result.append( items[i] + [ -1, '' ] )
        return result
    
    schema = StructType([
        StructField("sequence_id", StringType(), True),
        StructField("eventoccurtime", FloatType(), True),
        StructField("rqst_idnumber", StringType(), True),
        StructField("partnercode", StringType(), True),
        StructField("deviceid", StringType(), True),
        StructField("rqst_customerchannel", StringType(), True),
        StructField("delta_time", IntegerType(), True),
        StructField("items", StringType(), True)])
    sdf = sdf.repartition(100)
    sdf = sdf.rdd.mapPartitions( func1 ).groupByKey().mapPartitions( func2 ).toDF(schema)
    return sdf

In [9]:
'''
pagerank算法部分
输入:     sdf: forward_for_pr的输出
         当前模型的版本号
输出:
         写入pgrk_result[rank, support_cnt, outdegree]
         
'''
def pagerank(spark, sdf, version_number, error_thresh = 1e-2, trans_thresh = 0.85, thresh1=0, thresh2=0.001, thresh3=100, window = 420):
    bc_window = spark.sparkContext.broadcast(window)
    def func(part):
        result = []
        for row in part:
            if row.delta_time < 0 or row.delta_time > bc_window.value:
                result.append((row.partnercode, ''))
            else:
                result.append((row.partnercode, row.fromplat))
        return result
   
    sdf.repartition(100)
    sdf = sdf.rdd.mapPartitions(func)
    thresh1 = spark.sparkContext.broadcast(thresh1)
    thresh2 = spark.sparkContext.broadcast(thresh2)

    # filter with thresh1 and thresh2
    def map_to_plat_pair(part):
        result = []
        for row in part:
            if row[1]:
                result.append([ (row[0], row[1]), 1 ])
            else:
                # if not drainage append it call itself
                result.append([ (row[0], row[0]), 1])
        return result

    def cnt_func(row):
        cnt1, cnt2 = 0, 0
        for item in row[1]: # filter by first thresh, count total
            cnt1 += 0 if item[1] < thresh1.value else item[1]
        for item in row[1]: # filter by second thresh * count_total
            cnt2 += 0 if item[1] < cnt1 * thresh2.value else item[1]
        return [row[0], cnt2]
    
    # get record count for each platform 
    plat_support_cnt_dict =  spark.sparkContext.broadcast( dict( sdf.mapPartitions(map_to_plat_pair).reduceByKey(lambda x1, x2: x1+x2).groupBy(lambda x: x[0][0]).map(
        cnt_func).collect() ) )
    # filter with thresh3
    tmp = []
    for item in sdf.map(lambda x: (x[0], '')).reduceByKey(lambda x1, x2: x1).collect():
        if plat_support_cnt_dict.value[item[0]] >= thresh3:
            tmp.append(item[0])
    # get plat to index matching dictionary
    bc_plat_idx = spark.sparkContext.broadcast(dict(zip(tmp, range(len(tmp)))))
    bc_length = spark.sparkContext.broadcast(len(tmp))

    # function for calculate transform matrix
    def calc_transfer_matrix(row):
        if row[0] not in bc_plat_idx.value:
            return []
        result = [float(0) for _ in range(bc_length.value)]
        cnt = 0.0
        for item in row[1]:
            if item[1] < plat_support_cnt_dict.value[row[0]] * thresh2.value or item[1] < thresh1.value or item[0][1] not in bc_plat_idx.value: continue #
            cnt += item[1]
        for item in row[1]:
            if item[1] < plat_support_cnt_dict.value[row[0]] * thresh2.value or item[1] < thresh1.value or item[0][1] not in bc_plat_idx.value: continue #
            result[bc_plat_idx.value[item[0][1]]] = float(item[1]) / cnt
        if np.sum(result) == 0:
            result[bc_plat_idx.value[row[0]]] = 1.0
        return [(row[0], 1, result)]
    # calculate out precentage for each out going edge
    df_rdd = sdf.mapPartitions(map_to_plat_pair).reduceByKey(lambda x1, x2: x1+x2).groupBy(lambda x: x[0][0]).flatMap(
        calc_transfer_matrix)
    # function for transfermation
    def pg_transfer(row):
        return [ [ bc_idx_plat.value[i], 
                   [ bc_beta.value * (row[1] * row[2][i]), [] ] ] 
                 for i in range(len(row[2])) ] + [ [row[0], [1-bc_beta.value, row[2]]] ]
    
    # get and save idx to platform dictionary
    bc_idx_plat = spark.sparkContext.broadcast(dict([(v, k) for k, v in bc_plat_idx.value.items()]))
    # iterate until error change less than thresh4
    bc_beta = spark.sparkContext.broadcast(trans_thresh)
    old_error, new_error, newdf_rdd = sys.maxint, sys.maxint - 1, df_rdd
    if(len(df_rdd.collect())) == 0:
        raise('all data is filtered!!!')
    while(old_error - new_error > error_thresh):
        df_rdd, old_error = newdf_rdd, new_error
        new_result = df_rdd.flatMap(pg_transfer).reduceByKey(lambda x, y: [x[0] + y[0], x[1] + y[1]])
        old_result = df_rdd.map(lambda x: [x[0], [x[1], x[2]] ])
        result = new_result.union(old_result)
        diff = result.map(lambda x: (x[0], x[1][0])).reduceByKey(lambda x1, x2: (x1 - x2) ** 2)
        new_error = math.sqrt( diff.map(lambda x: ('', x[1])).reduceByKey(lambda x1, x2: x1 + x2).collect()[0][1] )
        print('error before: %.6f, error after: %.6f' % (old_error, new_error))
        newdf_rdd = new_result.map(lambda x: [ x[0], x[1][0], x[1][1] ])
    print(new_error)
    
    # merge result with plat cnt
    newdf_rdd = newdf_rdd.map(lambda x: [ x[0], x[1], x[2], plat_support_cnt_dict.value[x[0]] ])
    platform_dict = spark.sparkContext.broadcast( dict(newdf_rdd.map(lambda x: (x[0], x[1])).collect()) )
    
    # translate linked platform idx to name
    def mapping_sort(row):
        result = 0
        for i in range(len(row[2])):
            if row[2][i] > 0:
                plt = bc_idx_plat.value[i]
                # result.append( [plt, row[2][i]] )
                result += 1 if row[2][i] > 0.01 else 0
        # result = sorted(result, key = lambda x: x[1], reverse=True)
        return ( row[0], row[1], row[3], result ) 
    
    schema = StructType([
        StructField("partnercode", StringType(), True),
        StructField("rank", DoubleType(), True),
        StructField("count", LongType(), True),
        StructField("infos", IntegerType(), True) ])    

    # output result as table
    newdf = newdf_rdd.map(mapping_sort).toDF(schema)
    bc_pr_dict = spark.sparkContext.broadcast( dict( newdf.rdd.map( lambda x: (x[0], [ x[1], x[2], x[3] ]) ).collect() ) )
    tmp_table_name = 'tdl_tmp_algo_%s' % int(time.time() * 1000)
    newdf.registerTempTable(tmp_table_name)
    spark.sql( 'insert overwrite table ml.mlp_cxw_more_%s_pgrk_result_dt partition (ds) select *, %s from %s' % (str(int(window)), version_number, tmp_table_name) )


In [None]:
'''
计算条件概率, ir, kulc, lift, window/global
ir = P(A|B) / P(B|A)
kulc = 0.5 * (P(A|B) + P(B|A))
lift = P(A|B) / P(A)
window / global = cnt in window / cnt in global
输入:
        sdf: backward_for_fp的输出
        version_number
输出:
        永久表:
            bc_dict
            cond_prob
            win_over_all_ratio
            合并所有特征后的对于当前window的大表
'''
def fp_growth_table(spark, sdf, version_number, min_support = 100, window=420):
    bc_window = spark.sparkContext.broadcast(window)
    def func(part):
        result = []
        for row in part:
            if row.delta_time < 0 or row.delta_time > bc_window.value:
                result.append(list(row[:-2]) + [-1, ''])
            else:
                # 分箱
                ttime = 6
                if row[-2] <= 20:
                    ttime = 0
                elif row[-2] <= 45:
                    ttime = 1
                elif row[-2] <= 70:
                    ttime = 2
                elif row[-2] <= 120:
                    ttime = 3
                elif row[-2] <= 180:
                    ttime = 4
                elif row[-2] <= 300:
                    ttime = 5
                result.append( list(row[:-2]) + [ttime, row[-1]] )
        return result
   
    schema = StructType([
        StructField("sequence_id", StringType(), True),
        StructField("eventoccurtime", FloatType(), True),
        StructField("rqst_idnumber", StringType(), True),
        StructField("partnercode", StringType(), True),
        StructField("deviceid", StringType(), True),
        StructField("rqst_customerchannel", StringType(), True),
        StructField("delta_time", IntegerType(), True),
        StructField("items", StringType(), True)])
    sdf.repartition(100)
    sdf = sdf.rdd.mapPartitions(func).toDF(schema)
    bc_support_count = spark.sparkContext.broadcast(min_support)
    def items_frequency(part):
        result = []
        for row in part:
            result.append( ( (row[3], row[-1]), 1 ) )
            if len(row[-1]) > 0:
                result.append( ( (row[-1], ''), 1 ) )
                result.append( ( (row[3], ''), 1 ) )
        return result

    def items_filter(part):
        result = []
        for row in part:
            if row[1] >= bc_support_count.value:
                result.append( ( row[0], row[1] ) )
        return result
    # 计算item的频数
    sdf.repartition(100)
    df_freq = sdf.rdd.mapPartitions(items_frequency).reduceByKey(lambda x1, x2: x1 + x2).mapPartitions(items_filter)
    
    freq_dict = dict( df_freq.map(lambda x: (','.join(sorted(x[0])), x[1]) if len(x[0][1]) > 0 else (x[0][0], x[1])).collect() )
    candidates_list = spark.sparkContext.broadcast( set( freq_dict.keys() ) )
    freq_dict['totol_count_of_platforms'] = int( np.sum( list( freq_dict.values() ) ) )
    bc_dict = spark.sparkContext.broadcast( freq_dict )
    
    tmp_df = spark.createDataFrame(bc_dict.value.items())
    tmp_table_name = 'tdl_tmp_algo_%s' % int(time.time() * 1000)
    tmp_df.registerTempTable(tmp_table_name)
    spark.sql( 'insert overwrite table ml.mlp_cxw_more_%s_bc_dict_dt partition (ds) select *, %s from %s' % (str(int(window)), version_number, tmp_table_name) )

    # 计算win_over_all
    def count_global(row):
        cands_set = sorted(list(set(row[1])))
        result = []
        for i in range(len(cands_set)):
            for j in range(i+1, len(cands_set)):
                tmp_str = cands_set[i] + ',' + cands_set[j]
                if tmp_str in candidates_list.value:
                    result.append( (tmp_str, 1) )
        return result
    
    def count_window(row):
        tmp_set = set(row[1])
        if '' in tmp_set:
            tmp_set.remove('')
        cands_set = sorted(list(tmp_set))
        result = []
        for i in range(len(cands_set)):
            for j in range(i+1, len(cands_set)):
                tmp_str = cands_set[i] + ',' + cands_set[j]
                if tmp_str in candidates_list.value:
                    result.append( (tmp_str, 1) )
        return result
    
    plat_pair_global_count = sdf.rdd.map(lambda x: (x.rqst_idnumber, x.partnercode)).groupByKey().flatMap(count_global).reduceByKey(lambda x1, x2: x1+x2)
    plat_pair_window_count = sdf.rdd.map(lambda x: (x.rqst_idnumber, x.items)).groupByKey().flatMap(count_window).reduceByKey(lambda x1, x2: x1+x2)
    plat_pair_count = plat_pair_global_count.union(plat_pair_window_count)
    plat_pair_count.cache()
    def calc_ratio(row):
        result = []
        for line in row[1]:
            result.append(line)
        if len(result) == 2:
            return (row[0], float(result[0]) / result[1] if result[1] > result[0] else float(result[1]) / result[0])
        return (row[0], 0.0)
    
    plat_pair_ratio = spark.sparkContext.broadcast( dict(plat_pair_count.groupByKey().map(calc_ratio).collect()) )
    tmp_df = spark.createDataFrame(plat_pair_ratio.value.items())
    tmp_table_name = 'tdl_tmp_algo_%s' % int(time.time() * 1000)
    tmp_df.registerTempTable(tmp_table_name)
    spark.sql( 'insert overwrite table ml.mlp_cxw_more_%s_plat_pair_ratio_dt partition (ds) select *, %s from %s' % (str(int(window)), version_number, tmp_table_name) )
    
    # 计算条件概率
    def cond_prob_calc(row):
        if len(row[0][1]) == 0: return []
        left, right = row[0][0], row[0][1]
        return [(left+'|'+right, float(row[1]) / bc_dict.value[right]), (right+'|'+left, float(row[1]) / bc_dict.value[left])]
    bc_cond_prob = spark.sparkContext.broadcast( dict( df_freq.flatMap(cond_prob_calc).collect() ) )
    
    tmp_df = spark.createDataFrame(bc_cond_prob.value.items())
    tmp_table_name = 'tdl_tmp_algo_%s' % int(time.time() * 1000)
    tmp_df.registerTempTable(tmp_table_name)
    spark.sql( 'insert overwrite table ml.mlp_cxw_more_%s_cond_prob_dt partition (ds) select *, %s from %s' % (str(int(window)), version_number, tmp_table_name) )
    
    # 合并特征
    tmp_sdf = spark.sql('select * from ml.mlp_cxw_more_%s_pgrk_result_dt where ds=%s' % (str(int(window)), version_number))  # pgrk_result_table
    bc_pr_dict = spark.sparkContext.broadcast( dict( tmp_sdf.rdd.map( lambda x: (x[0], [ x[1], x[2], x[3] ]) ).collect() ) )
    def merge_info(part): # sequence_id: 0, eventoccurtime: 1, rqst_idnumber: 2, partnercode: 3, deviceid: 4, rqst_customerchannel: 5, delta_time: 6, items: 7
        result = []
        for line in part:
            row = list(line)
            if len(row[7]) < 1: continue
            if row[7] + '|' + row[3] not in bc_cond_prob.value: continue # if not enough count for pair
            best_prob_val =  bc_cond_prob.value[row[7] + '|' + row[3]] 
            best_ratio_val = plat_pair_ratio.value[row[3] + ',' + row[7]] if row[7] > row[3] else plat_pair_ratio.value[row[7] + ',' + row[3]]
            best_lift_val = float(bc_cond_prob.value[row[7] + '|' + row[3]]) / bc_dict.value[row[7]] * bc_dict.value['totol_count_of_platforms']
            best_kulc_val = 0.5 * (bc_cond_prob.value[row[7] + '|' + row[3]] + bc_cond_prob.value[ row[3] + '|' + row[7]]) 
            best_ir_val = float(bc_cond_prob.value[row[7] + '|' + row[3]]) / bc_cond_prob.value[ row[3] + '|' + row[7]]
            support_cnt = bc_dict.value[row[3] + ',' + row[7]] if row[3] < row[7] else bc_dict.value[row[7] + ',' + row[3]]
            label = 9
            if row[4] != None and str(row[4]) != '':
                if row[5] != '1':
                    label = 0
            elif row[5] == '1':
                label = 1
            
            if row[7] in bc_pr_dict.value and row[3] in bc_pr_dict.value:
                result.append( [row[0], row[2], row[6], row[7], float(best_prob_val), support_cnt, 
                            bc_pr_dict.value[row[7]][0], bc_pr_dict.value[row[7]][1],
                            float(best_lift_val), float(best_kulc_val), float(best_ir_val), 
                            bc_pr_dict.value[row[3]][0], bc_pr_dict.value[row[3]][1], float(best_ratio_val), 
                            bc_pr_dict.value[row[7]][2], label ]  )    
        return result

    schema = StructType([
        StructField("sequence_id", StringType(), True),
        StructField("rqst_idnumber", StringType(), True),
        StructField("delta_time", IntegerType(), True),
        StructField("_from", StringType(), True),   
        StructField("_prob", FloatType(), True),
        StructField("support_count", IntegerType(), True),
        StructField("from_rank", FloatType(), True),        
        StructField("from_cnt", StringType(), True),
        StructField("_lift_val", FloatType(), True),
        StructField("_kulc_val", FloatType(), True),
        StructField("_ir_val", FloatType(), True),
        StructField("to_rank", FloatType(), True),        
        StructField("to_cnt", StringType(), True), 
        StructField("ratio_val", FloatType(), True),
        StructField("from_out_degree", IntegerType(), True),
        StructField("label", IntegerType(), True)])
    sdf.repartition(100)
    sdf = sdf.rdd.mapPartitions(merge_info).toDF(schema)
    tmp_table_name = 'tdl_tmp_algo_%s' % int(time.time() * 1000)
    sdf.registerTempTable(tmp_table_name)
    spark.sql( 'insert overwrite table ml.mlp_cxw_more_%s_tbres_dt partition (ds) select *, %s from %s' % (str(int(window)), version_number, tmp_table_name) )


In [10]:
'''
对多个窗口的结果进行拼接，以最左的窗口为主
'''
def merge_train_data(spark, version_number, tablenames):
    from pyspark.sql.functions import col
    tablenames = ['ml.mlp_cxw_more_%s_tbres_dt' % str(int( float(name[:-3]) * 60 )) for name in tablenames ]
    sdf1 = spark.sql('select * from %s where ds=%s' % (tablenames[0], version_number)).drop('ds')
    for i in range(1, len(tablenames)):
        sdf2 = spark.sql('select * from %s where ds=%s' % (tablenames[i], version_number))
        sdf2 = sdf2.drop('deviceid').drop('rqst_idnumber').drop('_from').drop('ds')
        for name in sdf2.columns:
            sdf2 = sdf2.withColumnRenamed(name, name+str(i))
        sdf1 = sdf1.join(sdf2, col('sequence_id') == col('sequence_id'+str(i)), 'left').drop('sequence_id' + str(i)).drop('label'+ str(i))
    return sdf1

In [None]:
'''
One class SVM模型的训练，包括scaler
'''
def OneClassSvmModel(spark, sdf, version_number, nu_para=0.0001, gamma_para=0.01):
    sdf = sdf.drop('sequence_id').drop('rqst_idnumber').drop('_from')
    df = sdf.withColumn('PR_propotion', F.col('from_rank') / F.col('to_rank') ).toPandas()
    df = df.fillna(0)
    # get not drainage data, split train and test
    train_data = df[df['label']==0].drop(['label'], axis = 1)
    train_label = pd.Series(np.ones(df[df['label']==0].label.count()))
    train_data, test_data, y_train, y_test = train_test_split(train_data, train_label, test_size=0.3, random_state=2018)
    # drainage and unlabeled data
    valid_data = df[df['label']==1].drop(['label'], axis = 1)
    unlabel_data = df[df['label']==9].drop(['label'],axis = 1)

    # train and save scaler model
    scaler = preprocessing.StandardScaler().fit(train_data)
    path1 = 'drainage_models/scaler_model_%s' % (version_number)
    with open(path1, 'wb') as f:
        joblib.dump(scaler, f)
        savePersonalFile(f, overwrite=True)
    train_data = scaler.transform(train_data)
    test_data = scaler.transform(test_data)
    valid_data = scaler.transform(valid_data)
    unlabel_data = scaler.transform(unlabel_data)

    # train and save one class SVM model
    clf = svm.OneClassSVM(nu=nu_para, kernel="rbf", gamma=gamma_para)
    clf.fit(train_data)
    path2 = 'drainage_models/train_model_%s' % (version_number)
    with open(path2, 'wb') as f:
        joblib.dump(clf, f)
        savePersonalFile(f, overwrite=True)
    
    # calculate prediction and distance
    pred_train = clf.predict(train_data)
    pred_test = clf.predict(test_data)
    pred_valid = clf.predict(valid_data)
    pred_unlabel = clf.predict(unlabel_data)

    # ------------- Evaluation --------------------
    recall_train = pred_train[pred_train == 1].size / pred_train.shape[0]
    recall_test = pred_test[pred_test == 1].size / pred_test.shape[0]
    recall_valid = 1 - pred_valid[pred_valid == 1].size / pred_valid.shape[0]
    print(pred_train[pred_train == -1].size + pred_test[pred_test == -1].size + pred_valid[pred_valid == -1].size + pred_unlabel[pred_unlabel == -1].size)
    print('Recall\ntrain:%s, test:%s, vaild:%s' % (recall_train, recall_test, recall_valid))
    print('F-meansure\ntrain:%s, test:%s, vaild:%s' % (2*recall_train/(recall_train+1.0), 2*recall_test/(recall_test+1.0), 2*recall_valid/(recall_valid+1.0)) )
    return [ '/user/datacompute/users/xingwei.chen/' + path1, 
             '/user/datacompute/users/xingwei.chen/' + path2 ]

In [None]:
# 主函数
def main(spark):
    version_number = int(time.strftime('%Y%m%d%H%M%S'), time.localtime(time.time())) # e.g. '20180901000000'
    tmp = version_number // 1000000
    nanotime_of_day = datetime.datetime(tmp//10000, tmp//100%100, tmp%100)
    sql_time = int(time.mktime(nanotime_of_day.timetuple())) * 1000
    sdf = spark.sql('''
        select sequence_id, round(cast(eventoccurtime as bigint)/1000, 0) eventoccurtime, rqst_idnumber, partercode,
        deviceid, rqst_customerchannel from bigdata.raw_activity_flat 
        where  
            ( ( year=from_unixtime(cast(%s-86400l*30 as bigint)/1000, 'yyyy') 
                and month=from_unixtime(cast(%s as bigint)/1000, 'MM')
                and day<=from_unixtime(cast(%s as bigint)/1000, 'dd') ) 
                or
              ( year=from_unixtime(cast(%s-86400l*30 as bigint)/1000, 'yyyy') 
                and month=from_unixtime(cast(%s-86400l*30 as bigint)/1000, 'MM')
                and day>=from_unixtime(cast(%s-86400l*30 as bigint)/1000, 'dd') ) )
            and
            (event = 'loan' and eventtype='entPreLoan') and rqst_idnumber is not null
    ''' % (sql_time, sql_time, sql_time, sql_time, sql_time, sql_time))
    windows = ['7min', '5min', '3min', '2min']
    sdf_pr = forward_for_pr(spark, sdf, window=int(float(windows[0][:-3])*60))
    sdf_fp = backward_for_fp(spark, sdf, window=int(float(windows[0][:-3])*60))
    for wind in windows:
        pagerank(spark, sdf_pr, version_number,  error_thresh = 1e-2, trans_thresh = 0.85, thresh1=0, thresh2=0.001, thresh3=100, window = float(wind[:-3]) * 60)
        fp_growth_table(spark, sdf_fp, version_number, min_support = 100, window = float(wind[:-3]) * 60)
    sdf = merge_train_data(spark, version_number, windows)
    result = OneClassSvmModel(spark, sdf, version_number)
    print('The version number for next 7 days is %s' % version_number)

# Predict(T+1)

In [None]:
#/bin/python2
# set spark.sql.execution.arrow.enabled=true 
# set spark.driver.maxResultSize=5g

from pyspark.sql.functions import split
import numpy as np
import math
import json
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, ArrayType, LongType, FloatType
from sklearn.externals import joblib
import pyspark.sql.functions as F
import pandas as pd

In [None]:
"""
以最大的窗口向前获取数据，供给fp-growth等计算关联，因为我们希望将A->B中的B去除，优势在于B可能有多条
输入:
    sdf: 从千维度表中筛选有用columns后，得到的小表
    window: 窗口大小，单位为秒
输出:
    表: [流水号，发生时间，身份证号，平台名，设备码，用户标记的引流标识，时间差，窗口内的上一条记录的平台名]
"""
def backward_for_fp(spark, sdf, window=420):
    bc_window = spark.sparkContext.broadcast(window)
    def func1(part):
        result = []
        for row in part:
            result.append([ row[2], list(row) ])
        return result
    
    def func2(part):
        result = []
        for row in part:
            items = sorted(list(row[1]), key=lambda x: (x[1], x[0]))
            # catch A B B B as 3 record, not 1 as before
            for i in range(1, len(items)):
                gap_i = 1
                while(items[i][1] - items[i-gap_i][1] <= bc_window.value and i >= gap_i and items[i][3] == items[i-gap_i][3]):
                    gap_i += 1
                if items[i][3] != items[i-gap_i][3] and items[i][1] - items[i-gap_i][1] <= bc_window.value and i >= gap_i:
                    result.append( items[i] + [ int(items[i][1] - items[i-gap_i][1]), items[i-gap_i][3] ])
        return result
    
    schema = StructType([
        StructField("sequence_id", StringType(), True),
        StructField("eventoccurtime", FloatType(), True),
        StructField("rqst_idnumber", StringType(), True),
        StructField("partnercode", StringType(), True),
        StructField("deviceid", StringType(), True),
        StructField("rqst_customerchannel", StringType(), True),
        StructField("delta_time", IntegerType(), True),
        StructField("items", StringType(), True)])
    sdf = sdf.repartition(100)
    sdf = sdf.rdd.mapPartitions( func1 ).groupByKey().mapPartitions( func2 ).toDF(schema)
    return sdf

In [None]:
def get_features(spark, sdf, version_number, window=420):
    bc_window = spark.sparkContext.broadcast(window)
    def func(part):
        result = []
        for row in part:
            if row.delta_time >= 0 and row.delta_time <= bc_window.value:
                result.append(list(row))
        return result
   
    schema = StructType([
        StructField("sequence_id", StringType(), True),
        StructField("eventoccurtime", FloatType(), True),
        StructField("rqst_idnumber", StringType(), True),
        StructField("partnercode", StringType(), True),
        StructField("deviceid", StringType(), True),
        StructField("rqst_customerchannel", StringType(), True),
        StructField("delta_time", IntegerType(), True),
        StructField("items", StringType(), True)])
    sdf.repartition(100)
    sdf = sdf.rdd.mapPartitions(func).toDF(schema)
    
    tmp_sdf = spark.sql('select * from ml.mlp_cxw_more_%s_bc_dict_dt where ds=%s' % (str(int(window)), version_number)).drop('ds')
    bc_dict = spark.sparkContext.broadcast( dict(tmp_sdf.collect()) )

    tmp_sdf = spark.sql('select * from ml.mlp_cxw_more_%s_plat_pair_ratio_dt where ds=%s' % (str(int(window)), version_number)).drop('ds')
    plat_pair_ratio = spark.sparkContext.broadcast( dict(tmp_sdf.collect()) )

    tmp_sdf = spark.sql('select * from ml.mlp_cxw_more_%s_cond_prob_dt where ds=%s' % (str(int(window)), version_number)).drop('ds')
    bc_cond_prob =  spark.sparkContext.broadcast( dict(tmp_sdf.collect()) )
    
    tmp_sdf = spark.sql('select * from ml.mlp_cxw_more_%s_pgrk_result_dt where ds=%s' % (str(int(window)), version_number))  # pgrk_result_table
    bc_pr_dict = spark.sparkContext.broadcast( dict( tmp_sdf.rdd.map( lambda x: (x[0], [ x[1], x[2], x[3] ]) ).collect() ) )
    
    
    def map_features(part): # sequence_id: 0, eventoccurtime: 1, rqst_idnumber: 2, partnercode: 3, deviceid: 4, rqst_customerchannel: 5, delta_time: 6, items: 7
        result = []
        for line in part:
            row = list(line)
            if row[7] + '|' + row[3] not in bc_cond_prob.value: continue # if not enough count for pair
            best_prob_val =  bc_cond_prob.value[row[7] + '|' + row[3]] 
            best_ratio_val = plat_pair_ratio.value[row[3] + ',' + row[7]] if row[7] > row[3] else plat_pair_ratio.value[row[7] + ',' + row[3]]
            best_lift_val = float(bc_cond_prob.value[row[7] + '|' + row[3]]) / bc_dict.value[row[7]] * bc_dict.value['totol_count_of_platforms']
            best_kulc_val = 0.5 * (bc_cond_prob.value[row[7] + '|' + row[3]] + bc_cond_prob.value[ row[3] + '|' + row[7]]) 
            best_ir_val = float(bc_cond_prob.value[row[7] + '|' + row[3]]) / bc_cond_prob.value[ row[3] + '|' + row[7]]
            support_cnt = bc_dict.value[row[3] + ',' + row[7]] if row[3] < row[7] else bc_dict.value[row[7] + ',' + row[3]]
            if row[7] in bc_pr_dict.value and row[3] in bc_pr_dict.value:
                result.append( [row[0], row[3], row[4], row[2], row[6], row[7], float(best_prob_val), support_cnt, 
                            bc_pr_dict.value[row[7]][0], bc_pr_dict.value[row[7]][1],
                            float(best_lift_val), float(best_kulc_val), float(best_ir_val), 
                            bc_pr_dict.value[row[3]][0], bc_pr_dict.value[row[3]][1], float(best_ratio_val), 
                            bc_pr_dict.value[row[7]][2] ] )
        return result
    schema = StructType([
        StructField("sequence_id", StringType(), True),
        StructField("partnercode", StringType(), True),
        StructField("deviceid", StringType(), True),
        StructField("rqst_idnumber", StringType(), True),
        StructField("delta_time", IntegerType(), True),
        StructField("_from", StringType(), True),   
        StructField("_prob", FloatType(), True),
        StructField("support_count", IntegerType(), True),
        StructField("from_rank", FloatType(), True),        
        StructField("from_cnt", StringType(), True),
        StructField("_lift_val", FloatType(), True),
        StructField("_kulc_val", FloatType(), True),
        StructField("_ir_val", FloatType(), True),
        StructField("to_rank", FloatType(), True),        
        StructField("to_cnt", StringType(), True), 
        StructField("ratio_val", FloatType(), True),
        StructField("from_out_degree", IntegerType(), True)])
    
    
    sdf.repartition(100)
    sdf = sdf.rdd.mapPartitions(map_features).toDF(schema)
    return sdf

In [None]:
def merge_predict_features(spark, dataframes):
    from pyspark.sql.functions import col
    sdf1 = dataframes[0]
    for i in range(1, len(dataframes)):
        sdf2 = dataframes[i]
        sdf2 = sdf2.drop('deviceid').drop('rqst_idnumber').drop('_from').drop('partnercode')
        for name in sdf2.columns:
            sdf2 = sdf2.withColumnRenamed(name, name+str(i))
        sdf1 = sdf1.join(sdf2, col('sequence_id') == col('sequence_id'+str(i)), 'left').drop('sequence_id' + str(i))
    return sdf1

In [None]:
def make_prediction(spark, sdf, version_number, partitionNum = 100):
    bc_model = spark.sparkContext.broadcast(joblib.load('/user/datacompute/users/xingwei.chen/drainage_models/train_model_%s' % version_number))
    bc_scaler_model = spark.sparkContext.broadcast(joblib.load('/user/datacompute/users/xingwei.chen/drainage_models/scaler_model_%s' % version_number))
    bc_cols = spark.sparkContext.broadcast([  u'sequence_id', u'_from', u'partnercode', u'deviceid', u'rqst_idnumber', u'delta_time', u'_prob', u'support_count', u'from_rank', u'from_cnt',
                                              u'_lift_val', u'_kulc_val', u'_ir_val', u'to_rank', u'to_cnt',
                                              u'ratio_val', u'from_out_degree', u'delta_time1', u'_prob1',
                                              u'support_count1', u'from_rank1', u'from_cnt1',
                                              u'_lift_val1', u'_kulc_val1', u'_ir_val1', u'to_rank1',
                                              u'to_cnt1', u'ratio_val1', u'from_out_degree1',
                                              u'delta_time2', u'_prob2', u'support_count2', u'from_rank2',
                                              u'from_cnt2', u'_lift_val2', u'_kulc_val2', u'_ir_val2',
                                              u'to_rank2', u'to_cnt2', u'ratio_val2', u'from_out_degree2',
                                              u'delta_time3', u'_prob3', u'support_count3', u'from_rank3',
                                              u'from_cnt3', u'_lift_val3', u'_kulc_val3', u'_ir_val3',
                                              u'to_rank3', u'to_cnt3', u'ratio_val3', u'from_out_degree3', 'PR_propotion'  ])
    sdf = sdf.select(bc_cols.value[:-1])
    sdf = sdf.withColumn('PR_propotion', F.col('from_rank') / F.col('to_rank') ) 
    sdf = sdf.repartition(partitionNum)
    def func(part_rdd):
        data = [row for row in part_rdd]
        if len(data) == 0:
            return []
        df = pd.DataFrame(data, columns=bc_cols.value)
        df = df.fillna(0)
        scaled_data = bc_scaler_model.value.transform(df[bc_cols.value[5:]])
        prediction = bc_model.value.predict(scaled_data)
        distance = bc_model.value.decision_function( scaled_data )
        df['distance'] = distance
        devid_label = df['deviceid'].values
        label = []
        for i in range(len(prediction)):
            label.append( -1 if devid_label[i] != 0 or prediction[i] > 0 else 1 )
        df['label'] = label
        return df.values.tolist()
    
    sdf = sdf.rdd.mapPartitions(func).toDF()
    col_names = bc_cols.value + ['distance', 'label']
    for i in range(len(col_names)):
        sdf = sdf.withColumnRenamed('_'+str(i+1), col_names[i])
    output_table = 'ml.mlp_drainage_recoginze_result_detail_dt'
    tmp_table_name = 'tdl_tmp_algo_%s' % int(time.time() * 1000)
    sdf.registerTempTable(tmp_table_name)
    spark.sql('insert overwrite table ' + output_table + 
                     ' partition (ds) select *, %s from %s' % (version_number, tmp_table_name) )

    sdf = sdf.where(sdf.label > 0).select(['sequence_id', 'distance', 'label', '_from', 'partnercode', 'rqst_idnumber'])
    tmp_table_name = 'tdl_tmp_algo_%s' % int(time.time() * 1000)
    sdf.registerTempTable(tmp_table_name)
    spark.sql('insert overwrite table ml.mlp_drainage_recoginze_result_final_dt' + 
                     ' partition (ds) select *, from_unixtime( substring(sequence_id, 0, 10), \'yyyyMMdd\') from %s' % tmp_table_name)


In [None]:
# 注意：因为数据安全的需求，很快要隔离业务网络和大数据平台网络，请不要在作业中访问大数据平台外部外部网络，包含http接口和mysqlmysql等
def main(spark):
    version_number, month, day = '20181101000000', 8, 1
    tablenames = ['7min', '5min', '3min', '2min']
    sdf = spark.sql('''select sequence_id, round(cast(eventoccurtime as bigint)/1000, 0) eventoccurtime, rqst_idnumber, partnercode, cast(deviceid as string) deviceid, rqst_customerchannel
                        from bigdata.raw_activity_flat where year=2018 and month=%s and day=%s and (event ='loan'  OR eventtype = 'entPreLoan') and rqst_idnumber is not null''' % (month, day))
    sdf = backward_for_fp(spark, sdf, window=420)
    dataframes = []
    for name in tablenames:
        dataframes.append( get_features(spark, sdf, version_number, window = float(name[:-3]) * 60) )
    sdf = merge_predict_features(spark, dataframes)
    make_prediction(spark, sdf, version_number)