In [1]:
#注意事项:
#当运行本Notebook的程序后，如果要关闭Notebook，请选择菜单: File > Close and Halt 才能确实停止当前正在运行的程序，并且释放资源
#如果没有使用以上方法，只关闭此分页，程序仍在运行，未释放资源，当您打开并运行其他的Notebook，可能会发生错误

# 22	Spark ML Pipeline 回归分析

# 22.1	数据准备

In [2]:
sc.master

u'spark://master:7077'

In [3]:
global Path    
if sc.master[0:5]=="local" :
   Path="file:/home/hduser/pythonsparkexample/PythonProject/"
else:   
   Path="hdfs://master:9000/user/hduser/"
#如果要在cluster模式运行(hadoop yarn 或Spark Stand alone)，请按照书上的说明，先把文件上传到HDFS目录

In [4]:
hour_df= spark.read.format('csv') \
                  .option("header", 'true').load(Path+"data/hour.csv")
hour_df.count()

17379

In [5]:
print hour_df.columns

['instant', 'dteday', 'season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'casual', 'registered', 'cnt']


In [6]:
hour_df=hour_df.drop("instant").drop("dteday") \
                            .drop('yr').drop("casual").drop("registered")

In [7]:
print hour_df.printSchema()

root
 |-- season: string (nullable = true)
 |-- mnth: string (nullable = true)
 |-- hr: string (nullable = true)
 |-- holiday: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- workingday: string (nullable = true)
 |-- weathersit: string (nullable = true)
 |-- temp: string (nullable = true)
 |-- atemp: string (nullable = true)
 |-- hum: string (nullable = true)
 |-- windspeed: string (nullable = true)
 |-- cnt: string (nullable = true)

None


In [8]:
from pyspark.sql.functions import col  

In [9]:
hour_df= hour_df.select([ col(column).cast("double").alias(column) 
                                          for column in hour_df.columns])

In [10]:
hour_df.printSchema()

root
 |-- season: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)



In [11]:
hour_df.show(5)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+
|season|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed| cnt|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+
|   1.0| 1.0|0.0|    0.0|    6.0|       0.0|       1.0|0.24|0.2879|0.81|      0.0|16.0|
|   1.0| 1.0|1.0|    0.0|    6.0|       0.0|       1.0|0.22|0.2727| 0.8|      0.0|40.0|
|   1.0| 1.0|2.0|    0.0|    6.0|       0.0|       1.0|0.22|0.2727| 0.8|      0.0|32.0|
|   1.0| 1.0|3.0|    0.0|    6.0|       0.0|       1.0|0.24|0.2879|0.75|      0.0|13.0|
|   1.0| 1.0|4.0|    0.0|    6.0|       0.0|       1.0|0.24|0.2879|0.75|      0.0| 1.0|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+
only showing top 5 rows



In [12]:
train_df, test_df = hour_df.randomSplit([0.7, 0.3])
train_df.cache()
test_df.cache()

DataFrame[season: double, mnth: double, hr: double, holiday: double, weekday: double, workingday: double, weathersit: double, temp: double, atemp: double, hum: double, windspeed: double, cnt: double]

# 22.2	建立机器学习pipeline管线

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import  StringIndexer,  VectorIndexer,VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor

In [14]:
featuresCols = hour_df.columns[:-1]
print featuresCols 

['season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed']


In [15]:
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="aFeatures")
vectorIndexer = VectorIndexer(inputCol="aFeatures", outputCol="features", maxCategories=24)
dt = DecisionTreeRegressor(labelCol="cnt",featuresCol= 'features')
dt_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer ,dt])

In [16]:
dt_pipeline .getStages()

[VectorAssembler_4225b2b97a05f44d7d54,
 VectorIndexer_4fb6ab0279e5d3190fbc,
 DecisionTreeRegressor_4c06a847661a1160d5a6]

# 22.3	使用pipeline进行数据处理与训练

In [17]:
dt_pipelineModel = dt_pipeline.fit(train_df)

In [18]:
dt_pipelineModel.stages[2]

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_4c06a847661a1160d5a6) of depth 5 with 63 nodes

In [19]:
print dt_pipelineModel.stages[2].toDebugString[:500]

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_4c06a847661a1160d5a6) of depth 5 with 63 nodes
  If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,22.0,23.0})
   If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0})
    If (feature 2 in {2.0,3.0,4.0,5.0})
     If (feature 4 in {1.0,2.0,3.0,4.0,5.0})
      If (feature 2 in {2.0,3.0,4.0})
       Predict: 6.786743515850144
      Else (feature 2 not in {2.0,3.0,4.0})
       Predict: 23.983425414364643
     Else (feature 4 not in {1.0,2.0,3.0,4.0,5.0})
    


# 22.4	使用pipelineModel 进行预测

In [20]:
predicted_df=dt_pipelineModel.transform(test_df)

In [21]:
print predicted_df.columns

['season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'cnt', 'aFeatures', 'features', 'prediction']


In [22]:
predicted_df.select('season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', \
                     'weathersit', 'temp', 'atemp', 'hum', 'windspeed','cnt','prediction').show(10)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|season|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed| cnt|        prediction|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0| 0.1|0.0758|0.42|   0.3881|25.0| 47.21978021978022|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.16|0.1818| 0.8|   0.1045|33.0| 47.21978021978022|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.26| 0.303|0.56|      0.0|39.0| 47.21978021978022|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0| 0.3|0.2879|0.33|   0.2239|96.0| 47.21978021978022|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       2.0|0.46|0.4545|0.88|   0.2985|17.0|           100.035|
|   1.0| 1.0|0.0|    0.0|    1.0|       1.0|       1.0|0.22| 0.197|0.44|   0.3582| 5.0|37.598290598290596|
|   1.0| 1.0|0.0|    0.0|    2.0|    

# 22.5	评估模型的准确率

In [23]:
from pyspark.ml.evaluation import RegressionEvaluator

In [24]:
evaluator = RegressionEvaluator(labelCol='cnt',
                                                        predictionCol='prediction',
                                                        metricName="rmse")

In [25]:
predicted_df=dt_pipelineModel.transform(test_df)
rmse = evaluator.evaluate(predicted_df)
rmse

96.03600517062088

# 22.6	使用TrainValidation进行训练评估找出最佳模型

In [26]:
from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit

In [27]:
paramGrid = ParamGridBuilder()\
  .addGrid(dt.maxDepth, [ 5,10,15,25])\
  .addGrid(dt.maxBins, [25,35,45,50])\
  .build()

In [28]:
tvs = TrainValidationSplit(estimator=dt,evaluator=evaluator,
                  estimatorParamMaps=paramGrid,trainRatio=0.8)

In [29]:
tvs_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer ,tvs])

In [30]:
tvs_pipelineModel =tvs_pipeline.fit(train_df)

In [31]:
bestModel=tvs_pipelineModel.stages[2].bestModel
print bestModel.toDebugString[:500]

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_4c06a847661a1160d5a6) of depth 10 with 1809 nodes
  If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,22.0,23.0})
   If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0})
    If (feature 2 in {2.0,3.0,4.0,5.0})
     If (feature 4 in {1.0,2.0,3.0,4.0,5.0})
      If (feature 2 in {2.0,3.0,4.0})
       If (feature 3 in {0.0})
        If (feature 2 in {3.0,4.0})
         If (feature 0 in {0.0})
          If (feature 7 <= 0.4)
           If (feature 8 <= 0.197


In [32]:
predictions = tvs_pipelineModel.transform(test_df)
rmse= evaluator.evaluate(predictions)
rmse

81.13273892536569

# 22.7	使用crossValidation进行训练评估找出最佳模型

In [33]:
from pyspark.ml.tuning import CrossValidator

In [34]:
cv = CrossValidator(estimator=dt, evaluator=evaluator, 
                    estimatorParamMaps=paramGrid, numFolds=3)

In [35]:
cv_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer ,cv])

In [36]:
cv_pipelineModel = cv_pipeline.fit(train_df)

In [37]:
predictions = cv_pipelineModel.transform(test_df)
rmse= evaluator.evaluate(predictions)
rmse

81.13273892536569

# 22.8	使用随机森林RandomForestClassifier分类器

In [38]:
from pyspark.ml.regression import RandomForestRegressor
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="oFeatures")
vectorIndexer = VectorIndexer(inputCol="oFeatures", outputCol="features", maxCategories=24)
rf= RandomForestRegressor(labelCol="cnt",featuresCol= 'features', numTrees=20)
rf_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer ,rf])
rf_pipelineModel = rf_pipeline.fit(train_df)
predicted_df=rf_pipelineModel.transform(test_df)
evaluator = RegressionEvaluator(metricName="rmse", labelCol='cnt', predictionCol='prediction')
rmse = evaluator.evaluate(predicted_df)
rmse

98.46412057609375

In [39]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
paramGrid = ParamGridBuilder()\
  .addGrid(rf.maxDepth, [ 5,10,15])\
  .addGrid(rf.maxBins, [25,35,50])\
  .addGrid(rf.numTrees, [10, 20,30])\
  .build()

rftvs = TrainValidationSplit(estimator=rf, evaluator=evaluator,
                                 estimatorParamMaps=paramGrid, trainRatio=0.8)

rftvs_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer, rftvs])
rftvs_pipelineModel =rftvs_pipeline.fit(train_df)
rftvspredictions = rftvs_pipelineModel.transform(test_df)
rmse= evaluator.evaluate(rftvspredictions)
rmse

71.12886396206777

In [40]:
# 22.8	使用GBT

In [41]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol="cnt",featuresCol= 'features')
gbt_pipeline = Pipeline(stages=[vectorAssembler,vectorIndexer,gbt])

In [42]:
gbt_pipelineModel = gbt_pipeline.fit(train_df)
predicted_df=gbt_pipelineModel.transform(test_df)
rmse = evaluator.evaluate(predicted_df)
rmse

74.818219528214

In [43]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

paramGrid = ParamGridBuilder() \
  .addGrid(gbt.maxDepth, [ 5,10])\
  .addGrid(gbt.maxBins, [25,40])\
  .addGrid(gbt.maxIter, [10, 50])\
  .build()

cv = CrossValidator(estimator=gbt, evaluator=evaluator, 
                                  estimatorParamMaps=paramGrid, numFolds=3)
cv_pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [44]:
cv_pipelineModel = cv_pipeline.fit(train_df)

In [45]:
cvm=cv_pipelineModel.stages[2] 
gbestModel=cvm.bestModel
print bestModel.toDebugString[:500]

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_4c06a847661a1160d5a6) of depth 10 with 1809 nodes
  If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,22.0,23.0})
   If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0})
    If (feature 2 in {2.0,3.0,4.0,5.0})
     If (feature 4 in {1.0,2.0,3.0,4.0,5.0})
      If (feature 2 in {2.0,3.0,4.0})
       If (feature 3 in {0.0})
        If (feature 2 in {3.0,4.0})
         If (feature 0 in {0.0})
          If (feature 7 <= 0.4)
           If (feature 8 <= 0.197


In [46]:
predicted_df=cv_pipelineModel.transform(test_df)
predicted_df.select('season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', \
                     'weathersit', 'temp', 'atemp', 'hum', 'windspeed','cnt','prediction').show(10)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|season|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed| cnt|        prediction|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0| 0.1|0.0758|0.42|   0.3881|25.0| 43.05820627263327|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.16|0.1818| 0.8|   0.1045|33.0| 44.92757589533734|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.26| 0.303|0.56|      0.0|39.0| 45.32901901936476|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0| 0.3|0.2879|0.33|   0.2239|96.0| 49.88705827670886|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       2.0|0.46|0.4545|0.88|   0.2985|17.0| 86.68319925158879|
|   1.0| 1.0|0.0|    0.0|    1.0|       1.0|       1.0|0.22| 0.197|0.44|   0.3582| 5.0|20.809017324469753|
|   1.0| 1.0|0.0|    0.0|    2.0|    

In [47]:
evaluator = RegressionEvaluator(metricName="rmse", 
                                labelCol='cnt', predictionCol='prediction')
rmse = evaluator.evaluate(predicted_df)
rmse

69.37578603905104