In [1]:
"""
共享单车数据集：
    http://archive.ics.uci.edu/ml/datasets/Bike+Sharing+Dataset
"""
print()




In [3]:
"""
	1- instant: record index
	2- dteday : date
	3- season : season (1:springer, 2:summer, 3:fall, 4:winter)
	4- yr : year (0: 2011, 1:2012)
	5- mnth : month ( 1 to 12)
	6- hr : hour (0 to 23)
	7- holiday : weather day is holiday or not (extracted from http://dchr.dc.gov/page/holiday-schedule)
	8- weekday : day of the week
	9- workingday : if day is neither weekend nor holiday is 1, otherwise is 0.
	10+ weathersit : 
		- 1: Clear, Few clouds, Partly cloudy, Partly cloudy
		- 2: Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist
		- 3: Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds
		- 4: Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog
	11- temp : Normalized temperature in Celsius. The values are divided to 41 (max)
	12- atemp: Normalized feeling temperature in Celsius. The values are divided to 50 (max)
	13- hum: Normalized humidity. The values are divided to 100 (max)
	14- windspeed: Normalized wind speed. The values are divided to 67 (max)
	15- casual: count of casual users
	16- registered: count of registered users
	17- cnt: count of total rental bikes including both casual and registered
"""
print()




In [4]:
# 从spark.sql中导入SparkSession类
from pyspark.sql import SparkSession
# 导入系统模块
import os
import time

In [5]:
# 构建SparkSession实例对象
spark = SparkSession.builder\
    .appName("SparkSessionExample")\
    .master("local[*]")\
    .getOrCreate()

In [6]:
# 获取SparkContext实例对象
sc = spark.sparkContext

In [7]:
sc

### 读取数据

In [9]:
# 采用spark.read.csv方式读取数据
raw_hour_df = spark.read.csv('./hour.csv', header='true', inferSchema='true')

In [10]:
# 查看schema信息
raw_hour_df.printSchema()

root
 |-- instant: integer (nullable = true)
 |-- dteday: timestamp (nullable = true)
 |-- season: integer (nullable = true)
 |-- yr: integer (nullable = true)
 |-- mnth: integer (nullable = true)
 |-- hr: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weathersit: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- cnt: integer (nullable = true)



In [11]:
# 显示10条数据
raw_hour_df.show(10, False)

+-------+-------------------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|dteday             |season|yr |mnth|hr |holiday|weekday|workingday|weathersit|temp|atemp |hum |windspeed|casual|registered|cnt|
+-------+-------------------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|1      |2011-01-01 00:00:00|1     |0  |1   |0  |0      |6      |0         |1         |0.24|0.2879|0.81|0.0      |3     |13        |16 |
|2      |2011-01-01 00:00:00|1     |0  |1   |1  |0      |6      |0         |1         |0.22|0.2727|0.8 |0.0      |8     |32        |40 |
|3      |2011-01-01 00:00:00|1     |0  |1   |2  |0      |6      |0         |1         |0.22|0.2727|0.8 |0.0      |5     |27        |32 |
|4      |2011-01-01 00:00:00|1     |0  |1   |3  |0      |6      |0         |1         |0.24|0.2879|0.75|0.0      |3     |10        |13 |
|5      |2011-01-01 00:00:00|1     |0  |1

In [12]:
print(raw_hour_df.count())

17379


In [14]:
# 查看列的名称
print(raw_hour_df.columns)

['instant', 'dteday', 'season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'casual', 'registered', 'cnt']


### 特征提取


In [15]:
# 选取要是字段特征
"""
综合分析：instant(序号)、dteday（日期）、yr（年份）、casual（非注册用户租车树）和registered（注册用户租车数）
"""
hour_df = raw_hour_df.drop('instant').drop('dteday').drop('yr').drop('casual').drop('registered')

In [16]:
hour_df.printSchema()

hour_df.show(5, False)

root
 |-- season: integer (nullable = true)
 |-- mnth: integer (nullable = true)
 |-- hr: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weathersit: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: integer (nullable = true)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+---+
|season|mnth|hr |holiday|weekday|workingday|weathersit|temp|atemp |hum |windspeed|cnt|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+---+
|1     |1   |0  |0      |6      |0         |1         |0.24|0.2879|0.81|0.0      |16 |
|1     |1   |1  |0      |6      |0         |1         |0.22|0.2727|0.8 |0.0      |40 |
|1     |1   |2  |0      |6      |0         |1         |0.22|0.2727|0.8 |0.0      |32 |
|1     |1 

In [17]:
"""
将上述所有字段的数据类型数值类型，double类型, 针对SparkMLlib中分类和回归算法数据类型Double数值类型
"""
from pyspark.sql.functions import col
## 转换 alias 回去
bike_hour_df = hour_df.select([ col(column).cast('double').alias(column) for column in hour_df.columns ])

In [18]:
bike_hour_df.printSchema()
bike_hour_df.show(5, False)

root
 |-- season: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+
|season|mnth|hr |holiday|weekday|workingday|weathersit|temp|atemp |hum |windspeed|cnt |
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+
|1.0   |1.0 |0.0|0.0    |6.0    |0.0       |1.0       |0.24|0.2879|0.81|0.0      |16.0|
|1.0   |1.0 |1.0|0.0    |6.0    |0.0       |1.0       |0.22|0.2727|0.8 |0.0      |40.0|
|1.0   |1.0 |2.0|0.0    |6.0    |0.0       |1.0       |0.22|0.2727|0.8 |0.0      |32.0|
|1.0   |1.0 

### 组合特征字段为Vector向量

In [19]:
features_cols = bike_hour_df.columns[:-1]

print(features_cols)

['season', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed']


In [20]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [21]:
# 组合特征到向量
assembler = VectorAssembler(
    inputCols=features_cols,
    outputCol="raw_features")

In [22]:
# 针对DataFrame中数据进行转换
bike_hour_df2 = assembler.transform(bike_hour_df)

In [23]:
bike_hour_df2.printSchema()
bike_hour_df2.show(5, False)

root
 |-- season: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)
 |-- raw_features: vector (nullable = true)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+--------------------------------------------------+
|season|mnth|hr |holiday|weekday|workingday|weathersit|temp|atemp |hum |windspeed|cnt |raw_features                                      |
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+--------------------------------------------------+
|1.0   |1.0 |0.0|0.0    |6.0    |0.0       |1.0       |0.24|0.2879|0.81|0.0    

### 使用决策树回归算法模型（模型学习器）


In [24]:
from pyspark.ml.regression import DecisionTreeRegressor

In [25]:
"""
决策树回归算法默认超参数的值：
featuresCol="features", labelCol="label", predictionCol="prediction",
                 maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,
     maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance",
                 seed=None, varianceCol=None
"""
dtr = DecisionTreeRegressor(maxDepth=10, maxBins=32, 
                            featuresCol="raw_features", labelCol="cnt")

In [27]:
# 使用数据集训练模型
dtr_model = dtr.fit(bike_hour_df2)

In [28]:
dtr_model

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_bea18ee11e9a) of depth 10 with 1785 nodes

In [29]:
dtr_model.featureImportances

SparseVector(11, {0: 0.024, 1: 0.0054, 2: 0.6885, 3: 0.0029, 4: 0.0072, 5: 0.0742, 6: 0.021, 7: 0.1357, 8: 0.0107, 9: 0.0256, 10: 0.0048})

In [30]:
dtr_model.numFeatures

11

In [31]:
dtr_model.numNodes

1785

### 使用模型预测

In [32]:
predict_hour_df = dtr_model.transform(bike_hour_df2)

In [33]:
predict_hour_df.printSchema()

root
 |-- season: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)
 |-- raw_features: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [34]:
predict_hour_df.select('raw_features', 'cnt', 'prediction').show(5, False)

+--------------------------------------------------+----+------------------+
|raw_features                                      |cnt |prediction        |
+--------------------------------------------------+----+------------------+
|[1.0,1.0,0.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.81,0.0]|16.0|37.22222222222222 |
|[1.0,1.0,1.0,0.0,6.0,0.0,1.0,0.22,0.2727,0.8,0.0] |40.0|37.22222222222222 |
|[1.0,1.0,2.0,0.0,6.0,0.0,1.0,0.22,0.2727,0.8,0.0] |32.0|23.88095238095238 |
|[1.0,1.0,3.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.75,0.0]|13.0|12.030303030303031|
|[1.0,1.0,4.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.75,0.0]|1.0 |4.055555555555555 |
+--------------------------------------------------+----+------------------+
only showing top 5 rows



### 模型评估

In [35]:
from pyspark.ml.evaluation import RegressionEvaluator

# 构建评估器实例对象
evaluator = RegressionEvaluator(labelCol='cnt', predictionCol='prediction')

In [36]:
evaluator.evaluate(predict_hour_df, {evaluator.metricName: 'rmse'})

70.41731834779505

## 对类别特征处理

In [37]:
bike_hour_df2.printSchema()
bike_hour_df2.show(5, False)

root
 |-- season: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)
 |-- raw_features: vector (nullable = true)

+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+--------------------------------------------------+
|season|mnth|hr |holiday|weekday|workingday|weathersit|temp|atemp |hum |windspeed|cnt |raw_features                                      |
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+--------------------------------------------------+
|1.0   |1.0 |0.0|0.0    |6.0    |0.0       |1.0       |0.24|0.2879|0.81|0.0    

In [38]:
"""
VectorIndexer:
    对类别特征数据进行处理，使用类别特征的类别数目的下标作为次类别特征的值
"""
from pyspark.ml.feature import VectorIndexer

In [39]:
indexer = VectorIndexer(inputCol="raw_features", outputCol="features", maxCategories=24)
# 使用数据得到的 模型学习器
indexer_model = indexer.fit(bike_hour_df2)

In [40]:
# 进行数据转换
bike_hour_df3 = indexer_model.transform(bike_hour_df2)

In [41]:
bike_hour_df3.printSchema()

root
 |-- season: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)
 |-- raw_features: vector (nullable = true)
 |-- features: vector (nullable = true)



In [42]:
bike_hour_df3.select('cnt', 'raw_features', 'features').show(5, False)

+----+--------------------------------------------------+--------------------------------------------------+
|cnt |raw_features                                      |features                                          |
+----+--------------------------------------------------+--------------------------------------------------+
|16.0|[1.0,1.0,0.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.81,0.0]|[0.0,0.0,0.0,0.0,6.0,0.0,0.0,0.24,0.2879,0.81,0.0]|
|40.0|[1.0,1.0,1.0,0.0,6.0,0.0,1.0,0.22,0.2727,0.8,0.0] |[0.0,0.0,1.0,0.0,6.0,0.0,0.0,0.22,0.2727,0.8,0.0] |
|32.0|[1.0,1.0,2.0,0.0,6.0,0.0,1.0,0.22,0.2727,0.8,0.0] |[0.0,0.0,2.0,0.0,6.0,0.0,0.0,0.22,0.2727,0.8,0.0] |
|13.0|[1.0,1.0,3.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.75,0.0]|[0.0,0.0,3.0,0.0,6.0,0.0,0.0,0.24,0.2879,0.75,0.0]|
|1.0 |[1.0,1.0,4.0,0.0,6.0,0.0,1.0,0.24,0.2879,0.75,0.0]|[0.0,0.0,4.0,0.0,6.0,0.0,0.0,0.24,0.2879,0.75,0.0]|
+----+--------------------------------------------------+--------------------------------------------------+
only showing top 5 

In [43]:
# 训练
dtr2 = DecisionTreeRegressor(maxDepth=10, maxBins=32, 
                            featuresCol="features", labelCol="cnt")
dtr_model2 = dtr2.fit(bike_hour_df3)
# 预测
predict_hour_df2 = dtr_model2.transform(bike_hour_df3)

# 评估
evaluator.evaluate(predict_hour_df2, {evaluator.metricName: 'rmse'})

66.10943240563866

In [45]:
predict_hour_df2.select('cnt', 'prediction').show(5, False)

+----+------------------+
|cnt |prediction        |
+----+------------------+
|16.0|47.15384615384615 |
|40.0|47.15384615384615 |
|32.0|47.5              |
|13.0|10.608695652173912|
|1.0 |5.25              |
+----+------------------+
only showing top 5 rows



### 如何找出最佳模型（调整算法超参数）

In [46]:
"""
在Spark ML机器学习库中，提供如何找到最佳模型：
    http://spark.apache.org/docs/2.2.0/ml-tuning.html#ml-tuning-model-selection-and-hyperparameter-tuning
    -a. 交叉验证
        Cross-Validation
    -b. 训练验证分隔
        Train-Validation Split
"""
print()




#### 使用Train-Validation 找出最佳模型

In [47]:
# 导入模块
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [48]:
dt = DecisionTreeRegressor(featuresCol="features", labelCol="cnt")

In [49]:
# 构建算法超参数
paramGrid = ParamGridBuilder()\
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .addGrid(dt.maxBins, [32, 64, 128])\
    .build()

In [50]:
# 创建评估器
regeression_evaluator = RegressionEvaluator(labelCol='cnt', predictionCol='prediction')

In [51]:
# 训练验证分隔实例对象
dt_tvs = TrainValidationSplit(estimator=dt,estimatorParamMaps=paramGrid,
                           evaluator=regeression_evaluator, trainRatio=0.8)

In [52]:
# 使用数据训练和验证模型，使用不同超参数组合
dt_best_model = dt_tvs.fit(bike_hour_df3)

In [53]:
dt_best_model.bestModel

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_c81bb2e8010d) of depth 10 with 1831 nodes

In [54]:
# 评估
regeression_evaluator.evaluate(
    dt_best_model.bestModel.transform(bike_hour_df3), {evaluator.metricName: 'rmse'})

65.68407783551991

In [55]:
dt_best_model.bestModel.save('./best_dt_model')

#### 使用Cross-Validation找出最佳模型

In [56]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid2 = ParamGridBuilder()\
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .addGrid(dt.maxBins, [32, 64, 128])\
    .build()

In [57]:
dt2 = DecisionTreeRegressor(featuresCol="features", labelCol="cnt")

In [58]:
# 创建评估器
regeression_evaluator2 = RegressionEvaluator(labelCol='cnt', predictionCol='prediction')

In [63]:
# 创建交叉验证的实例对象
crossval = CrossValidator(estimator=dt2,
                          estimatorParamMaps=paramGrid2,
                          evaluator=regeression_evaluator2,
                          numFolds=5)  # use 3+ folds in practice

In [64]:
cv_model = crossval.fit(bike_hour_df3)

In [65]:
cv_model.bestModel

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_3cf553441924) of depth 5 with 63 nodes

In [66]:
# 评估
regeression_evaluator.evaluate(
    cv_model.bestModel.transform(bike_hour_df3), {evaluator.metricName: 'rmse'})

91.61353642096543

## 使用Pipeline组合机器学习开发流程


In [67]:
"""
回顾一下，上述机器学习过程中流程步骤：
    -0. 读取数据, 适当数据处理
        bike_hour_df
    -1. 组合特征值到Vector中
        VectorAssembler
    -2. 处理类别特征数据
         VectorIndexer
    -3. 使用决策树训练模型
        DecisionTreeRegressor
"""
print()




In [68]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.regression import DecisionTreeRegressor

In [70]:
# a. 构建 特征合并VectorAssembler
vector_assembler = VectorAssembler(inputCols=features_cols, outputCol='raw_features')

In [71]:
# b. 创建类别特征VectorIndexer
vector_indexer = VectorIndexer(inputCol='raw_features', 
                               outputCol='features', maxCategories=24)

In [72]:
# c. 构建决策树回归实例对象
dt_regression = DecisionTreeRegressor(labelCol='cnt', featuresCol='features', 
                                      maxDepth=5, maxBins=64, varianceCol='variance')

In [73]:
from pyspark.ml.pipeline import Pipeline

# 构建Pipeline
dtr_pipeline = Pipeline(stages=[vector_assembler, vector_indexer, dt_regression])

In [74]:
dtr_pipeline.getStages()

[VectorAssembler_338cd010e37a,
 VectorIndexer_1dc90421dde4,
 DecisionTreeRegressor_e2b146e4472f]

In [75]:
# 将数据应用到Pipeline中，训练模型
dtr_pipeline_model = dtr_pipeline.fit(bike_hour_df)

In [77]:
print(dtr_pipeline_model.stages[2].toDebugString)

DecisionTreeRegressionModel (uid=DecisionTreeRegressor_e2b146e4472f) of depth 5 with 63 nodes
  If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0,6.0,22.0,23.0})
   If (feature 2 in {0.0,1.0,2.0,3.0,4.0,5.0})
    If (feature 2 in {2.0,3.0,4.0,5.0})
     If (feature 4 in {1.0,2.0,3.0,4.0,5.0})
      If (feature 2 in {2.0,3.0,4.0})
       Predict: 6.705921938088829
      Else (feature 2 not in {2.0,3.0,4.0})
       Predict: 24.33139534883721
     Else (feature 4 not in {1.0,2.0,3.0,4.0,5.0})
      If (feature 2 in {3.0,4.0,5.0})
       Predict: 14.760129659643436
      Else (feature 2 not in {3.0,4.0,5.0})
       Predict: 55.98067632850242
    Else (feature 2 not in {2.0,3.0,4.0,5.0})
     If (feature 4 in {1.0,2.0,3.0,4.0,5.0})
      If (feature 2 in {1.0})
       Predict: 17.464077669902913
      Else (feature 2 not in {1.0})
       Predict: 37.62669245647969
     Else (feature 4 not in {1.0,2.0,3.0,4.0,5.0})
      If (feature 1 in {0.0,1.0,2.0,3.0,10.0,11.0})
       Predict: 57.40476190476190

In [78]:
# 使用模型预测
predict_hour_df_pl = dtr_pipeline_model.transform(bike_hour_df)

In [79]:
predict_hour_df_pl.select('cnt', 'prediction', 'variance').show(10, False)

+----+------------------+------------------+
|cnt |prediction        |variance          |
+----+------------------+------------------+
|16.0|57.404761904761905|1062.1552154195012|
|40.0|57.404761904761905|1062.1552154195012|
|32.0|55.98067632850242 |717.7290951947537 |
|13.0|14.760129659643436|171.6863844240312 |
|1.0 |14.760129659643436|171.6863844240312 |
|1.0 |14.760129659643436|171.6863844240312 |
|2.0 |36.534013605442176|680.330475727706  |
|3.0 |87.59944367176634 |3196.9605560187324|
|8.0 |85.61818181818182 |2800.91482093664  |
|14.0|147.0990990990991 |7560.885347263725 |
+----+------------------+------------------+
only showing top 10 rows



In [80]:
dtr_pipeline_model.save('./pl-dtr-model')