In [1]:
import findspark
findspark.init()

import pyspark
import os

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.ml.feature import *
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
from pyspark.context import SparkContext


os.environ['PYSPARK_PYTHON']='./xxx/xxx/bin/python'

In [None]:
%%time
def spark_init(app_name, queue_name='xxx'):
    # 设置sparkSession配置参数
    # 通过SparkConf设置好启动spark应用的所有参数
    conf = SparkConf()
    ## spark应用参数
    conf.set('spark.master', 'yarn') # 集群管理方式，此处可选择是是使用yarn-client模式启动spark，还是使用local模式
    conf.set('spark.submit.deployMode', 'client')
    conf.set('spark.app.name', app_name) # spark应用名称
    # conf.set('spark.driver.cores', '10') # dirver使用的核心数
    conf.set('spark.driver.memory', '12g') # driver使用的内存大小，spark应用启动后不可设置该参数，这个参数很有必要，防止容器内存溢出导致进程被杀死
    conf.set('spark.executor.memory', '12g') # executor使用的内存大小
    # conf.set('spark.executor.cores', '3') # executor使用的核心数
    
    # 集群python环境
    archives = 'hdfs://xxx.tar.gz#xxx'
    conf.set('spark.yarn.dist.archives', archives)
    conf.set('spark.pyspark.driver.python', 'xxx/bin/python') # driver端本地python路径
    conf.set('spark.pyspark.python', './xxx/xxx/bin/python') # executor端python路径
    conf.set('spark.driver.host', 'xxx.xxx.xxx.xxx')
    
    # 动态资源分配
    # conf.set('spark.shuffle.service.enabled', 'true')
    conf.set('spark.dynamicAllocation.enabled', 'true')
    conf.set('spark.dynamicAllocation.maxExecutors', '500')
    conf.set('spark.dynamicAllocation.minExecutors', '100') # 该值建议设置小一点，过大会影响组内Hadoop资源
    conf.set('spark.task.cpus', '6')
    
    # 读取文件格式不需要orc格式
    conf.set('spark.sql.hive.convertMetastoreOrc', 'false')

    # 设置队列与hive
    conf.set('spark.yarn.queue', queue_name)
    # conf.set('mapreduce.input.fileinputformat.input.dir.recursive', 'true')
    
    # 初始化sparksession
    spark = (SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate())
    sc = spark.sparkContext
    
    return spark, sc

spark, sc = spark_init('uplift_score')

In [None]:
# 模型
import joblib
import causalml


learner_x_lgb = joblib.load('xxx.model')
learner_x_lgb

In [None]:
# 模型广播
bc_learner_x_lgb = sc.broadcast(learner_x_lgb)
bc_learner_x_lgb

In [None]:
# 特征数据读取
str_sql = '''
select 
    * 
from 
    xxx 
where 
    concat_ws('-', year, month, day) = '2022-02-06' 
limit 20
'''

df_test = spark.sql(str_sql)
df_test.show()

In [None]:
features = list(df_test.columns)
print(len(features))
features

In [None]:
print(len(features[1:-3]))
features[1:-3]

In [None]:
# 特征广播
bc_feats = sc.broadcast(features[1:-3])
bc_feats

In [None]:
# 模型字典广播
import numpy as np
dict_p = {
    'treatment_1': np.array([0.0653]), 
    'treatment_2': np.array([0.2908]), 
    'treatment_3': np.array([0.0653]), 
    'treatment_4': np.array([0.0476]), 
    'treatment_5': np.array([0.0367])
}
bc_dict_p = sc.broadcast(dict_p)
bc_dict_p

In [None]:
# 预测udf
import pyspark.sql.functions as F
import pandas as pd
import numpy as np


# 单行输入，单值输出
# @F.udf(returnType=FloatType())
# def predictor(*list_feature):
#     dict_p = {
#         'treatment_1': np.array([0.0653]), 
#         'treatment_2': np.array([0.2908]), 
#         'treatment_3': np.array([0.0653]), 
#         'treatment_4': np.array([0.0476]), 
#         'treatment_5': np.array([0.0367])
#     }
    
#     return float(bc_learner_x_lgb.value.predict(np.array(list_feature).reshape(1, -1), p=dict_p)[:, 1])


# 单行输入，多值转Array输出
# @F.udf(returnType=ArrayType(FloatType()))
# def predictor(*list_feature):
#     dict_p = {
#         'treatment_1': np.array([0.0653]), 
#         'treatment_2': np.array([0.2908]), 
#         'treatment_3': np.array([0.0653]), 
#         'treatment_4': np.array([0.0476]), 
#         'treatment_5': np.array([0.0367])
#     }
    
#     score = bc_learner_x_lgb.value.predict(np.array(list_feature).reshape(1, -1), p=dict_p)
#     score_coupon_20 = float(score[:, 0])
#     score_coupon_50 = float(score[:, 1])
#     score_coupon_100 = float(score[:, 2])
#     score_coupon_150 = float(score[:, 3])
#     score_coupon_180 = float(score[:, 4])
    
#     return (score_coupon_20, score_coupon_50, score_coupon_100, score_coupon_150, score_coupon_180)


def prdeictBatch(datas, feats, model, p_score):
    for data in datas:
        tmp = np.array(list(data)).reshape(-1, 1+len(feats.value)+3)[:, 1:-3].astype(float)
        score = model.value.predict(tmp, p=p_score.value)
        score_coupon_20 = float(score[:, 0])
        score_coupon_50 = float(score[:, 1])
        score_coupon_100 = float(score[:, 2])
        score_coupon_150 = float(score[:, 3])
        score_coupon_180 = float(score[:, 4])
        try:
            yield (int(data['uid']), score_coupon_20, score_coupon_50, score_coupon_100, score_coupon_150, score_coupon_180)
        except StopIteration:
            return

In [None]:
# df_test_pred = df_test.withColumn('prediction', predictor(*features[1:-3]))
rdd_pred = df_test.repartition(10).rdd.mapPartitions(lambda x: prdeictBatch(x, bc_feats, bc_learner_x_lgb, bc_dict_p))

In [None]:
rdd_pred.take(10)

In [None]:
type(rdd_pred)

In [None]:
rdd_pred

In [None]:
# df_test_pred_res = df_test_pred.select('uid', 'prediction')
df_pred = spark.createDataFrame(rdd_pred, ['uid', 'score_coupon_20', 'score_coupon_50', 'score_coupon_100', 'score_coupon_150', 'score_coupon_180'])
# df_pred = rdd_pred.toDF(['uid', 'score_coupon_20', 'score_coupon_50', 'score_coupon_100', 'score_coupon_150', 'score_coupon_180'])
df_pred.show()

In [None]:
df_pred.printSchema()

In [None]:
df_test_pred_res_flatten = df_test_pred_res.selectExpr('uid', 
                                                       'prediction[0] as score_coupon_20', 
                                                       'prediction[1] as score_coupon_50', 
                                                       'prediction[2] as score_coupon_100', 
                                                       'prediction[3] as score_coupon_150', 
                                                       'prediction[4] as score_coupon_180')
df_test_pred_res_flatten.show()

In [None]:
%%time
df_test_pred_res_flatten.cache()
df_test_pred_res_flatten.count()

In [None]:
df_test_pred_res_flatten.createOrReplaceTempView('table_tmp')

In [None]:
# 建表
# 只运行一次
str_sql_create_table = '''
create table if not exists xxx 
(
    uid bigint comment '用户id', 
    score_coupon_20 double comment '增益打分（0.2）', 
    score_coupon_50 double comment '增益打分（0.5）', 
    score_coupon_100 double comment '增益打分（1.0）', 
    score_coupon_150 double comment '增益打分（1.5）', 
    score_coupon_180 double comment '增益打分（1.8）'
)
comment 'xxx' 
partitioned by
(
    year string comment '年', 
    month string comment '月', 
    day string comment '日'
)
'''

spark.sql(str_sql_create_table)

In [None]:
# 写表
str_sql_write = '''
insert 
    overwrite table 
        xxx 
    partition 
    (
        year='2022', 
        month ='02', 
        day='06'
    ) 
select 
    uid, 
    score_coupon_20, 
    score_coupon_50, 
    score_coupon_100, 
    score_coupon_150, 
    score_coupon_180 
from 
    table_tmp 
'''

spark.sql(str_sql_write)

In [None]:
# 关闭连接
sc.stop()