<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#数据获取" data-toc-modified-id="数据获取-1">数据获取</a></span></li><li><span><a href="#数据处理" data-toc-modified-id="数据处理-2">数据处理</a></span></li><li><span><a href="#训练测试划分" data-toc-modified-id="训练测试划分-3">训练测试划分</a></span></li><li><span><a href="#模型训练-LinearRegression" data-toc-modified-id="模型训练-LinearRegression-4">模型训练-LinearRegression</a></span><ul class="toc-item"><li><span><a href="#模型预测" data-toc-modified-id="模型预测-4.1">模型预测</a></span><ul class="toc-item"><li><span><a href="#lr预测值" data-toc-modified-id="lr预测值-4.1.1">lr预测值</a></span></li><li><span><a href="#baseline均值" data-toc-modified-id="baseline均值-4.1.2">baseline均值</a></span></li></ul></li><li><span><a href="#模型评估" data-toc-modified-id="模型评估-4.2">模型评估</a></span><ul class="toc-item"><li><span><a href="#RMSE" data-toc-modified-id="RMSE-4.2.1">RMSE</a></span></li><li><span><a href="#MAPE" data-toc-modified-id="MAPE-4.2.2">MAPE</a></span></li></ul></li></ul></li><li><span><a href="#测试函数功能" data-toc-modified-id="测试函数功能-5">测试函数功能</a></span></li></ul></div>

In [49]:
import re
import os
import sys
sys.path.append('../')
import datetime
import pandas as pd
import numpy as np
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel
# from sparkxgb import XGBoostRegressor, XGBoostClassifier
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import Row, LongType, IntegerType, Row, StructType, StructField, StringType, TimestampType, \
    FloatType
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql.functions import broadcast, udf, pandas_udf, PandasUDFType, monotonically_increasing_id 


In [2]:
def get_spark_config(master="local[3]", app_name="ai-train"):
    os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
    conf = (SparkConf().setMaster(master).setAppName(app_name))
    return conf

def open_spark_session(master="local[3]",app_name="ai-train"):
    conf = get_spark_config(master=master, app_name=app_name)
    conf = conf.set("spark.driver.maxResultSize", "20g").set("spark.sql.execution.arrow.enabled", "true").set('spark.executor.memory', '10g').set('spark.driver.memory', '10g').set(
        'spark.port.maxRetries', 50)
    
    #set（key,value） 设置属性 
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    return spark
spark = open_spark_session(app_name='xxx')


### 数据获取

In [3]:
user_pdf1 = pd.read_csv('UserFeature-1.csv')
user_pdf0 = pd.read_csv('UserFeature-0.csv')

In [4]:
user_pdf = pd.concat([user_pdf1,user_pdf0],axis=0)
user_pdf.shape

(719946, 17)

In [5]:
user_pdf.columns

Index(['Unnamed: 0', 'Unnamed: 0.1', 'label', 'user_id', 'event_day', 'gender',
       'age', 'city_id', 'order_cnt_90d', 'order_cnt_180d',
       'total_order_cnt_td', 'total_gmv_amt_td', 'total_order_days_td',
       'total_buy_card_amt_td', 'register_days', 'unuse_bike_days',
       'city_comp_type'],
      dtype='object')

### 数据处理

In [6]:
user_pdf.city_comp_type.unique()

array(['未知', '有限竞对', '松果独占', '空白市场', '两者都有', '无限竞对', '竞对独占', '竞对非独占'],
      dtype=object)

In [7]:
user_pdf.register_days.min()

-1

In [8]:
user_pdf.isnull().any()

Unnamed: 0               False
Unnamed: 0.1             False
label                    False
user_id                  False
event_day                False
gender                   False
age                      False
city_id                  False
order_cnt_90d            False
order_cnt_180d           False
total_order_cnt_td       False
total_gmv_amt_td         False
total_order_days_td      False
total_buy_card_amt_td    False
register_days            False
unuse_bike_days           True
city_comp_type           False
dtype: bool

In [10]:
user_pdf.groupby('gender').gender.count()

gender
0     21295
1    413961
2    284690
Name: gender, dtype: int64

In [13]:
# 类别特征数值化
user_pdf['re_city_comp_type'] = user_pdf.city_comp_type.map(lambda x:1 if re.search('松果',x) else 0)
# 数值截断修复
user_pdf['register_days'] = user_pdf.register_days.map(lambda x:0 if x<0 else x)
user_pdf['re_age']=user_pdf.age.map(lambda x:30 if x>80 or x<18 else x)
# 缺失值填充
user_pdf['unuse_bike_days'] = user_pdf.apply(lambda x:x['register_days'] if pd.isna(x['unuse_bike_days']) else x['unuse_bike_days'], axis=1)

In [17]:
# one-hot编码
dummy_df = pd.get_dummies(user_pdf.gender)
dummy_df.rename(columns={0:'r0',1:'r1',2:'r2'},inplace=True)
dummy_df.head()

Unnamed: 0,r0,r1,r2
0,1,0,0
1,1,0,0
2,1,0,0
3,0,0,1
4,1,0,0


In [18]:
# 特征拼接
df_with_dummies = pd.concat([user_pdf, dummy_df], axis=1)
df_with_dummies.head()

Unnamed: 0.2,Unnamed: 0,Unnamed: 0.1,label,user_id,event_day,gender,age,city_id,order_cnt_90d,order_cnt_180d,...,total_order_days_td,total_buy_card_amt_td,register_days,unuse_bike_days,city_comp_type,re_city_comp_type,re_age,r0,r1,r2
0,0,0,2,55048233,20230405,0,2024,1,0,0,...,0,0,659,659.0,未知,0,30,1,0,0
1,5,5,29,88289283,20230405,0,0,1,0,0,...,0,0,16,16.0,未知,0,30,1,0,0
2,9,9,68,88567969,20230405,0,0,1,0,0,...,0,0,6,6.0,未知,0,30,1,0,0
3,10,10,2,23162427,20230405,2,36,1,0,0,...,33,0,1077,232.0,未知,0,36,0,0,1
4,11,11,19,88031257,20230405,0,0,1,0,0,...,0,0,24,24.0,未知,0,30,1,0,0


In [19]:
# 数据min-max归一化
cols = ['re_age',  'order_cnt_90d',
              'order_cnt_180d',    'total_order_cnt_td',
            'total_gmv_amt_td',   'total_order_days_td',
       'total_buy_card_amt_td',         'register_days',
             'unuse_bike_days']

for col in cols:
    minn = df_with_dummies[col].quantile(0.1)
    maxx = df_with_dummies[col].quantile(0.9)
    print(col,'\t',minn,'\t',maxx,'\t')
    df_with_dummies[col] = (df_with_dummies[col]-minn)/(maxx-minn)
    df_with_dummies[col] = np.clip(df_with_dummies[col], 0, 1)

re_age 	 20.0 	 48.0 	
order_cnt_90d 	 0.0 	 8.0 	
order_cnt_180d 	 0.0 	 22.0 	
total_order_cnt_td 	 2.0 	 201.0 	
total_gmv_amt_td 	 460.0 	 51660.0 	
total_order_days_td 	 1.0 	 129.0 	
total_buy_card_amt_td 	 0.0 	 6740.0 	
register_days 	 185.0 	 1437.0 	
unuse_bike_days 	 21.0 	 264.0 	


In [20]:
# 训练特征筛选
main = ['user_id','label','re_city_comp_type','r0','r1','r2']
end_df = df_with_dummies[cols + main]
end_df.head()

Unnamed: 0,re_age,order_cnt_90d,order_cnt_180d,total_order_cnt_td,total_gmv_amt_td,total_order_days_td,total_buy_card_amt_td,register_days,unuse_bike_days,user_id,label,re_city_comp_type,r0,r1,r2
0,0.357143,0.0,0.0,0.0,0.0,0.0,0.0,0.378594,1.0,55048233,2,0,1,0,0
1,0.357143,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,88289283,29,0,1,0,0
2,0.357143,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,88567969,68,0,1,0,0
3,0.571429,0.0,0.0,0.221106,0.315625,0.25,0.0,0.71246,0.868313,23162427,2,0,0,0,1
4,0.357143,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.012346,88031257,19,0,1,0,0


### 训练测试划分

In [21]:
user_sdf = spark.createDataFrame(end_df)
user_sdf.cache()
user_sdf.collect()

  Unsupported type in conversion from Arrow: uint8
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.


[Row(re_age=0.35714285714285715, order_cnt_90d=0.0, order_cnt_180d=0.0, total_order_cnt_td=0.0, total_gmv_amt_td=0.0, total_order_days_td=0.0, total_buy_card_amt_td=0.0, register_days=0.37859424920127793, unuse_bike_days=1.0, user_id=55048233, label=2, re_city_comp_type=0, r0=1, r1=0, r2=0),
 Row(re_age=0.35714285714285715, order_cnt_90d=0.0, order_cnt_180d=0.0, total_order_cnt_td=0.0, total_gmv_amt_td=0.0, total_order_days_td=0.0, total_buy_card_amt_td=0.0, register_days=0.0, unuse_bike_days=0.0, user_id=88289283, label=29, re_city_comp_type=0, r0=1, r1=0, r2=0),
 Row(re_age=0.35714285714285715, order_cnt_90d=0.0, order_cnt_180d=0.0, total_order_cnt_td=0.0, total_gmv_amt_td=0.0, total_order_days_td=0.0, total_buy_card_amt_td=0.0, register_days=0.0, unuse_bike_days=0.0, user_id=88567969, label=68, re_city_comp_type=0, r0=1, r1=0, r2=0),
 Row(re_age=0.5714285714285714, order_cnt_90d=0.0, order_cnt_180d=0.0, total_order_cnt_td=0.22110552763819097, total_gmv_amt_td=0.315625, total_order_d

In [22]:
utrain,utest = user_sdf.randomSplit([0.8,0.2], seed = 123)

In [24]:
feature_cols = cols + ['re_city_comp_type','r0','r1','r2']

### 模型训练-LinearRegression

In [25]:
# gbt_small = GBTRegressor(featuresCol="features", labelCol='label',maxIter=10, maxDepth=3)
lr_small = LinearRegression(maxIter=30,featuresCol="features", labelCol='label',elasticNetParam=0.1, regParam=0.1)
vectorAssembler = VectorAssembler().setInputCols(feature_cols).setOutputCol("features")
lr_pipeline = Pipeline().setStages([vectorAssembler,lr_small])
lr_model = lr_pipeline.fit(utrain)

#### 模型预测
##### lr预测值

In [26]:
utest_lr = lr_model.transform(utest)

##### baseline均值

In [27]:
utest_lr.agg({'label':'mean'}).collect()

[Row(avg(label)=9.682017513426498)]

In [28]:
utest_lr = utest_lr.withColumn('mean_label',F.lit(9.68))

#### 模型评估
##### RMSE
官方实现

In [30]:
lr_evaluator = RegressionEvaluator(predictionCol="prediction",
                                     labelCol="label", metricName="rmse")

lr_result = lr_evaluator.evaluate(utest_lr)


base_evaluator = RegressionEvaluator(predictionCol="mean_label",
                                     labelCol="label", metricName="rmse")

base_result = base_evaluator.evaluate(utest_lr)


print("RMSE on LR test data = %g " % lr_result)
print("RMSE on BaseLine test data = %g " % base_result)

RMSE on LR test data = 12.2025 
RMSE on BaseLine test data = 12.7049 


手动实现

In [45]:
@udf(returnType=FloatType())
def se(*args):
    pre , p = args
    return (p-pre)**2
utest_lr = utest_lr.withColumn('se_mean_error',se(F.col('mean_label'),F.col('label')))
utest_lr = utest_lr.withColumn('se_model_error',se(F.col('prediction'),F.col('label')))

tmp_res = utest_lr.agg({'se_mean_error':'mean','se_model_error':'mean'}).collect()
print('LR:',tmp_res[0][1]**0.5,'\n','BaseLine:',tmp_res[0][0]**0.5)

LR: 12.202522359852281 
 BaseLine: 12.704853788287059


##### MAPE
sparkml无mape实现，自己手动实现

In [35]:
@udf(returnType=FloatType())
def ape(*args):
    pre , p = args
    return abs((p-pre)/p) 
utest_lr = utest_lr.withColumn('ape_mean_error',ape(F.col('mean_label'),F.col('label')))
utest_lr = utest_lr.withColumn('ape_model_error',ape(F.col('prediction'),F.col('label')))

utest_lr.groupby(['label']).agg({'ape_mean_error':'mean','ape_model_error':'mean'}).collect()

[Row(label=29, avg(ape_mean_error)=0.6662068963050842, avg(ape_model_error)=0.5961874261591005),
 Row(label=26, avg(ape_mean_error)=0.6276922821998596, avg(ape_model_error)=0.5563758554804417),
 Row(label=65, avg(ape_mean_error)=0.8510769009590149, avg(ape_model_error)=0.8103182070875821),
 Row(label=222, avg(ape_mean_error)=0.9563964009284973, avg(ape_model_error)=0.9495146870613098),
 Row(label=243, avg(ape_mean_error)=0.9601646065711975, avg(ape_model_error)=0.9543168544769287),
 Row(label=19, avg(ape_mean_error)=0.49052631855010986, avg(ape_model_error)=0.4139879880515211),
 Row(label=54, avg(ape_mean_error)=0.8207407593727112, avg(ape_model_error)=0.763752100183003),
 Row(label=113, avg(ape_mean_error)=0.9143362641334534, avg(ape_model_error)=0.8525520052228656),
 Row(label=112, avg(ape_mean_error)=0.9135714173316956, avg(ape_model_error)=0.8872534930706024),
 Row(label=167, avg(ape_mean_error)=0.9420359134674072, avg(ape_model_error)=0.9363268613815308),
 Row(label=155, avg(ape_m

In [38]:
ulr_df = utest_lr.groupby(['label']).agg({'ape_mean_error':'mean','ape_model_error':'mean'}).toPandas()
ulr_df[ulr_df.label.between(1,30)].sort_values(['label'])

Unnamed: 0,label,avg(ape_mean_error),avg(ape_model_error)
61,1,8.68,7.548362
101,2,3.84,3.344224
87,3,2.226667,1.972682
108,4,1.42,1.300187
60,5,0.936,0.888063
35,6,0.613333,0.643116
13,7,0.382857,0.467857
95,8,0.21,0.364681
41,9,0.075556,0.29624
65,10,0.032,0.25474


### 测试函数功能

MinMaxScaler函数输入必须是vector

In [47]:
# 需要归一化的列转为vector
asembler_mms = VectorAssembler(inputCols=["label"], outputCol="label2")
output_mms = asembler_mms.transform(utest)
output_mms.collect()

[Row(re_age=0.0, order_cnt_90d=0.0, order_cnt_180d=0.0, total_order_cnt_td=0.0, total_gmv_amt_td=0.0, total_order_days_td=0.0, total_buy_card_amt_td=0.0, register_days=0.0, unuse_bike_days=0.0, user_id=87155335, label=4, re_city_comp_type=0, r0=0, r1=1, r2=0, label2=DenseVector([4.0])),
 Row(re_age=0.0, order_cnt_90d=0.0, order_cnt_180d=0.0, total_order_cnt_td=0.0, total_gmv_amt_td=0.0, total_order_days_td=0.0, total_buy_card_amt_td=0.0, register_days=0.0, unuse_bike_days=0.0, user_id=87166823, label=15, re_city_comp_type=1, r0=0, r1=1, r2=0, label2=DenseVector([15.0])),
 Row(re_age=0.0, order_cnt_90d=0.0, order_cnt_180d=0.0, total_order_cnt_td=0.0, total_gmv_amt_td=0.0, total_order_days_td=0.0, total_buy_card_amt_td=0.0, register_days=0.0, unuse_bike_days=0.0, user_id=87186151, label=6, re_city_comp_type=1, r0=0, r1=1, r2=0, label2=DenseVector([6.0])),
 Row(re_age=0.0, order_cnt_90d=0.0, order_cnt_180d=0.0, total_order_cnt_td=0.0, total_gmv_amt_td=0.0, total_order_days_td=0.0, total_b

In [50]:
# 创建MinMaxScaler对象
scaler = MinMaxScaler(inputCol="label2", outputCol="scaled_label2")

# 拟合数据并进行转换
scaler_model = scaler.fit(output_mms)
scaled_df = scaler_model.transform(output_mms)
scaled_df.collect()


[Row(re_age=0.0, order_cnt_90d=0.0, order_cnt_180d=0.0, total_order_cnt_td=0.0, total_gmv_amt_td=0.0, total_order_days_td=0.0, total_buy_card_amt_td=0.0, register_days=0.0, unuse_bike_days=0.0, user_id=87155335, label=4, re_city_comp_type=0, r0=0, r1=1, r2=0, label2=DenseVector([4.0]), scaled_label2=DenseVector([0.0111])),
 Row(re_age=0.0, order_cnt_90d=0.0, order_cnt_180d=0.0, total_order_cnt_td=0.0, total_gmv_amt_td=0.0, total_order_days_td=0.0, total_buy_card_amt_td=0.0, register_days=0.0, unuse_bike_days=0.0, user_id=87166823, label=15, re_city_comp_type=1, r0=0, r1=1, r2=0, label2=DenseVector([15.0]), scaled_label2=DenseVector([0.0517])),
 Row(re_age=0.0, order_cnt_90d=0.0, order_cnt_180d=0.0, total_order_cnt_td=0.0, total_gmv_amt_td=0.0, total_order_days_td=0.0, total_buy_card_amt_td=0.0, register_days=0.0, unuse_bike_days=0.0, user_id=87186151, label=6, re_city_comp_type=1, r0=0, r1=1, r2=0, label2=DenseVector([6.0]), scaled_label2=DenseVector([0.0185])),
 Row(re_age=0.0, order_

In [52]:
gbt_small = GBTRegressor(featuresCol="features", labelCol='label',maxIter=10, maxDepth=3)
gbt_vectorAssembler = VectorAssembler().setInputCols(feature_cols+['scaled_label2']).setOutputCol("features")
gbt_pipeline = Pipeline().setStages([gbt_vectorAssembler,gbt_small])
gbt_model = gbt_pipeline.fit(scaled_df)