In [1]:
import sys
import re
import datetime
import numpy as np
import pandas as pd
from numpy import dot
from numpy.linalg import norm
from pyspark import *
from pyspark.sql import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, ArrayType, StructType, StructField, FloatType, DoubleType
from pyspark.ml.feature import VectorAssembler, StringIndexer,  OneHotEncoder,  StandardScaler
from pyspark.ml.stat import Correlation
from pyspark.ml.regression import LinearRegression,  RandomForestRegressor, GBTRegressor
import matplotlib.pyplot as plt
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import SparseVector, DenseVector

In [2]:
sc.stop()
spark = SparkSession.builder.appName('regression').getOrCreate()

In [3]:
data = spark.read.format('csv').option('header', 'true').option('inferSchema','true').option('numPartitions',100).load('data_cleaned.csv')
data = data.sample(fraction=0.005)
data = data.repartition(200)                                                     

                                                                                

In [4]:
##数据预处理 status类型转换为string， 将signal 改成 label（float）， 去掉provider

data_processed = data.withColumn('status', data.status.cast(StringType())).withColumn(
    'label', data.signal.cast(FloatType())).drop('signal','provider')
data_processed.show()

[Stage 2:>                                                        (0 + 10) / 10]

+--------+---------+---------+------+---+-----+----------+----------+----------+-------+-----+
|     lat|     long| operator|status|net|speed|satellites|precission|  activity|hour_24|label|
+--------+---------+---------+------+---+-----+----------+----------+----------+-------+-----+
|41.69279|   1.7891| movistar|     2| 4G|  0.1|       6.0|       9.0|     STILL|     12|  4.0|
|41.39408|  2.16954|   orange|     2| 3G|  1.1|       5.0|      29.0|   ON_FOOT|     14|  7.0|
|41.39213|  2.16522|   others|     2| 4G|  7.3|       7.0|      21.0|IN_VEHICLE|     13| 10.0|
|41.64385|  2.73346|pepephone|     2| 4G| 43.9|       1.0|       7.0|IN_VEHICLE|     12| 29.0|
|41.44764|  1.97855| movistar|     2| 4G|117.8|       8.0|       3.0|IN_VEHICLE|     14| 22.0|
|41.46967|  2.08247| movistar|     2| 4G|  0.6|       4.0|       9.0|     STILL|     12| 14.0|
|41.17764|   1.5134|   orange|     2| 4G|  0.0|       8.0|      12.0|IN_VEHICLE|     11| 13.0|
|41.62855|  0.83421|pepephone|     2| 2G| 47.3|   



In [5]:
#数据集分割 
train_df, val_df, test_df =data_processed.randomSplit([0.7,0.2, 0.1], seed=777)

In [6]:
## one hot 编码
categoricalCols = [field for (field, dataType) in data_processed.dtypes if dataType == "string"]
inputOutputCols = [x+"index" for x in categoricalCols]
oheOutputCols = [x+"OHE" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols,
                              outputCols=inputOutputCols,
                              handleInvalid="skip")
oheEncoder = OneHotEncoder(inputCols=inputOutputCols, outputCols=oheOutputCols)
numeric_cols = [field for (field, dataType) in data_processed.dtypes if ((dataType != "string") & (field !='label'))]
assembled_numeric = VectorAssembler(inputCols=numeric_cols, outputCol="features_numeric")
scaler = StandardScaler(inputCol="features_numeric", outputCol="features_scaled", withStd=True, withMean=True)
assembled_inputs = oheOutputCols+["features_scaled"]


##将数据集组合成 feature向量
vecAssembler = VectorAssembler(inputCols=assembled_inputs, outputCol='features')

##选择loss func
evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction')

In [7]:
##validationsplit manually
numTrees = [10]
maxDepth = [15]


for i in numTrees:
    for j in maxDepth:
            rf = RandomForestRegressor(featuresCol='features', labelCol='label',numTrees=i, maxDepth=j)
            pipeline_onetime = Pipeline(stages=[stringIndexer, oheEncoder, assembled_numeric,scaler, vecAssembler, rf])
            onetime_model = pipeline_onetime.fit(train_df)
            rmse_train = evaluator.evaluate(onetime_model.transform(train_df), {evaluator.metricName: "rmse"})
            rmse_test = evaluator.evaluate(onetime_model.transform(test_df), {evaluator.metricName: "rmse"})
            print("numtrees={}, maxdepth={}, rmse_train={}, rmse_test={}".format(i, j, rmse_train, rmse_test)) 

                                                                                

23/04/27 01:33:33 WARN DAGScheduler: Broadcasting large task binary with size 1454.1 KiB


                                                                                

23/04/27 01:33:35 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB


                                                                                

23/04/27 01:33:38 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB


                                                                                

23/04/27 01:33:43 WARN DAGScheduler: Broadcasting large task binary with size 5.2 MiB


                                                                                

23/04/27 01:33:49 WARN DAGScheduler: Broadcasting large task binary with size 7.3 MiB


                                                                                

23/04/27 01:33:56 WARN DAGScheduler: Broadcasting large task binary with size 9.9 MiB












23/04/27 01:34:04 WARN DAGScheduler: Broadcasting large task binary with size 1103.8 KiB




numtrees=10, maxdepth=15, rmse_train=5.472850625117252, rmse_test=7.12934530119325


                                                                                

In [8]:
gbt = GBTRegressor(featuresCol="features",labelCol='label')
##选择loss func
evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction')

##定义超参数的范围
param_grid_2 = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [10, 20]) \
    .build()

##利用验证集调参
tvs_2 = TrainValidationSplit(estimator=gbt,
                           estimatorParamMaps=param_grid_2,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)
pipeline = Pipeline(stages=[stringIndexer, oheEncoder, assembled_numeric, scaler, vecAssembler, tvs_2])
pipelinemodel = pipeline.fit(train_df)



23/04/27 01:34:28 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/04/27 01:34:28 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/04/27 01:34:28 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


                                                                                

In [9]:
##预测
pipelinemodel.transform(test_df).select('prediction','label').show()
rmse = evaluator.evaluate(pipelinemodel.transform(train_df), {evaluator.metricName: "rmse"})
rmse

                                                                                

+------------------+-----+
|        prediction|label|
+------------------+-----+
|20.471043158654336| 11.0|
|13.296157091955775| 23.0|
|16.107365659520738| 18.0|
|14.074648207214432| 31.0|
|16.690338789337943| 13.0|
|14.060406284522578| 14.0|
| 17.05004079715291| 11.0|
|12.773843104368236| 15.0|
|14.081243551577533|  6.0|
|12.794045136475807|  7.0|
|14.062280461717844| 14.0|
|13.719220776091511| 29.0|
|12.325897662578592| 18.0|
|   12.304454135598| 22.0|
|12.708322698054028| 31.0|
| 11.37528746041952|  6.0|
|11.736420617619002| 14.0|
|13.200346369429164|  3.0|
|10.578434527889755| 14.0|
|12.493982918852206| 15.0|
+------------------+-----+
only showing top 20 rows



                                                                                

6.967196907747071

In [10]:
## evaluation
rmse = evaluator.evaluate(pipelinemodel.transform(test_df), {evaluator.metricName: "rmse"})
rmse

                                                                                

7.053013952526691

In [11]:
pipelinemodel.stages[5].bestModel.explainParams()

"cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)\ncheckpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)\nfeatureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)

In [12]:
##lr
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
param_grid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.3, 0.4]) \
    .addGrid(lr.elasticNetParam, [0.7, 0.8]) \
    .build()
tvs_lr = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=param_grid_lr,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)
pipeline_lr = Pipeline(stages=[stringIndexer, oheEncoder, assembled_numeric, scaler, vecAssembler, tvs_lr])
lr_model = pipeline_lr.fit(train_df)

                                                                                

In [13]:
evaluator.evaluate(lr_model.transform(train_df), {evaluator.metricName: "rmse"})

                                                                                

7.272473256051158

In [14]:
evaluator.evaluate(lr_model.transform(test_df), {evaluator.metricName: "rmse"})

                                                                                

7.237576035795086

In [15]:
###combing three columns to do the stacking
lr = LinearRegression(maxIter=100, regParam=0.002, elasticNetParam=0.0, predictionCol='predict_lr')
rf = RandomForestRegressor(featuresCol='features', labelCol='label', numTrees=13, maxDepth=12, predictionCol='predict_rf')
gbt = GBTRegressor(featuresCol="features",labelCol='label', predictionCol='predict_gbt')
pipeline_final = Pipeline(stages=[stringIndexer, oheEncoder, assembled_numeric, scaler, vecAssembler, lr, rf, gbt])

In [16]:
pipeline_model = pipeline_final.fit(train_df)

                                                                                

23/04/27 01:35:42 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


                                                                                

23/04/27 01:35:53 WARN DAGScheduler: Broadcasting large task binary with size 1101.2 KiB


                                                                                

23/04/27 01:35:55 WARN DAGScheduler: Broadcasting large task binary with size 1796.6 KiB


                                                                                

23/04/27 01:35:58 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB


                                                                                

23/04/27 01:36:02 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB


                                                                                

In [17]:
data_new = pipeline_model.transform(train_df).select('label', 'predict_lr','predict_rf','predict_gbt') \
                        .withColumn('predict',(F.col('predict_lr')+F.col('predict_rf')+F.col('predict_gbt'))/3)
rmse_lr = RegressionEvaluator(labelCol="label", predictionCol='predict_lr', metricName="rmse")
rmse_rf = RegressionEvaluator(labelCol="label", predictionCol='predict_rf', metricName="rmse")
rmse_gbt = RegressionEvaluator(labelCol="label", predictionCol='predict_gbt', metricName="rmse")
rmse = RegressionEvaluator(labelCol="label", predictionCol='predict', metricName="rmse")

In [18]:
print(rmse_lr.evaluate(data_new))
print(rmse_rf.evaluate(data_new))
print(rmse_gbt.evaluate(data_new))
print(rmse.evaluate(data_new))

                                                                                

7.250222554446609


                                                                                

6.289106691600019


                                                                                

6.967196907747071


[Stage 1333:=====>                                                 (1 + 9) / 10]

6.773012934831043


                                                                                

In [19]:
test_fitted = pipeline_model.transform(test_df).select('label', 'predict_lr','predict_rf','predict_gbt') \
                        .withColumn('predict',(F.col('predict_lr')+F.col('predict_rf')+F.col('predict_gbt'))/3)
print(rmse_lr.evaluate(test_fitted))
print(rmse_rf.evaluate(test_fitted))
print(rmse_gbt.evaluate(test_fitted))
print(rmse.evaluate(test_fitted))

                                                                                

7.219358038273961


                                                                                

7.029265703599833


                                                                                

7.053013952526691




7.041891452438378


                                                                                

In [20]:
###combing three columns to do the stacking
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, predictionCol='predict_lr')
rf = RandomForestRegressor(featuresCol='features', labelCol='label', numTrees=33, maxDepth=10, predictionCol='predict_rf')
gbt = GBTRegressor(featuresCol="features",labelCol='label', predictionCol='predict_gbt')
vecAssembler_agg = VectorAssembler(inputCols=['predict_gbt','predict_rf','predict_lr'], outputCol='features_agg')
lr_stack = LinearRegression(featuresCol='features_agg',labelCol='label' ,predictionCol='predict_stack')

In [21]:
pipeline_stack = Pipeline(stages=[stringIndexer, oheEncoder, assembled_numeric,
                                  scaler, vecAssembler,lr, rf, gbt,vecAssembler_agg, lr_stack])
stack_model = pipeline_stack.fit(train_df)

                                                                                

23/04/27 01:37:00 WARN DAGScheduler: Broadcasting large task binary with size 1443.0 KiB


                                                                                

23/04/27 01:37:03 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


                                                                                

23/04/27 01:37:07 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


[Stage 1722:=====>                                                 (1 + 9) / 10]

23/04/27 01:37:38 WARN Instrumentation: [e0da3657] regParam is zero, which might cause numerical instability and overfitting.


                                                                                

In [22]:
rmse_stack = RegressionEvaluator(labelCol="label", predictionCol='predict_stack', metricName="rmse")
rmse_stack.evaluate(stack_model.transform(train_df))

                                                                                

6.2907663134658405

In [23]:
rmse_stack = RegressionEvaluator(labelCol="label", predictionCol='predict_stack', metricName="rmse")
rmse_stack.evaluate(stack_model.transform(test_df))

                                                                                

7.275834604985077

In [24]:
pipeline_stack_v2 =  Pipeline(stages=[vecAssembler_agg, lr_stack])
test_fitted = pipeline_model.transform(val_df).select('label', 'predict_lr','predict_rf','predict_gbt')
stack_model_v2 = pipeline_stack_v2.fit(test_fitted)

[Stage 1739:>                                                     (0 + 10) / 10]

23/04/27 01:37:48 WARN Instrumentation: [876088b7] regParam is zero, which might cause numerical instability and overfitting.


                                                                                

In [25]:
rmse_stack.evaluate(stack_model_v2.transform(pipeline_model.transform(test_df).select('label', 'predict_lr','predict_rf','predict_gbt')))

                                                                                

7.00977262961062

In [26]:
rmse_stack.evaluate(stack_model_v2.transform(test_fitted))

                                                                                

7.173006674726298

In [27]:
pipeline_final = Pipeline(stages=[stringIndexer, oheEncoder, assembled_numeric, scaler, vecAssembler, lr, rf, gbt])
pipeline_model = pipeline_final.fit(train_df)
pipeline_stack_v2 =  Pipeline(stages=[vecAssembler_agg, lr_stack])
stack_model_v2 = pipeline_stack_v2.fit(pipeline_model.transform(val_df))

                                                                                

23/04/27 01:38:13 WARN DAGScheduler: Broadcasting large task binary with size 1443.0 KiB


                                                                                

23/04/27 01:38:15 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


                                                                                

23/04/27 01:38:20 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB










[Stage 2124:=====>                                                 (1 + 9) / 10]

23/04/27 01:38:52 WARN Instrumentation: [c344f0b0] regParam is zero, which might cause numerical instability and overfitting.


                                                                                