In [1]:
##################################################################
#《Python机器学习及实践：从零开始通往Kaggle竞赛之路（2024年度版）》开源代码
#-----------------------------------------------------------------
#                 @章节号：7.4.2.1（分布式线性回归模型）                           
#                 @作者：范淼、徐晟桐 
#                 @购书链接：https://item.jd.com/13482761.html
#                 @电子邮箱：fanmiao.cslt.thu@hotmail.com               
#                 @官方交流QQ群号：561500762                        
##################################################################

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as func


#创建SparkSession。
spark = SparkSession.builder.getOrCreate()

#读取文件并存储到DataFrame中。
df = spark.read.csv('../Datasets/bike_rental/bike_rental.csv', header=True)

21/12/15 11:24:47 WARN Utils: Your hostname, michael-fandeMacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.24.206.69 instead (on interface en0)
21/12/15 11:24:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/12/15 11:24:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [3]:
df = df.select([c for c in df.columns if c not in ['instant','dteday']])

df

DataFrame[season: string, yr: string, mnth: string, hr: string, holiday: string, weekday: string, workingday: string, weathersit: string, temp: string, atemp: string, hum: string, windspeed: string, casual: string, registered: string, cnt: string]

In [4]:
df.show(5)

+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|     1|  0|   1|  0|      0|      6|         0|         1|0.24|0.2879|0.81|        0|     3|        13| 16|
|     1|  0|   1|  1|      0|      6|         0|         1|0.22|0.2727| 0.8|        0|     8|        32| 40|
|     1|  0|   1|  2|      0|      6|         0|         1|0.22|0.2727| 0.8|        0|     5|        27| 32|
|     1|  0|   1|  3|      0|      6|         0|         1|0.24|0.2879|0.75|        0|     3|        10| 13|
|     1|  0|   1|  4|      0|      6|         0|         1|0.24|0.2879|0.75|        0|     0|         1|  1|
+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
only showing top 5 

In [5]:
from pyspark.sql.functions import col


cate_cols = ['season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit']
num_cols = ['temp', 'atemp', 'hum', 'windspeed', 'registered']

for column in cate_cols:
    df = df.withColumn(column, col(column).cast('int'))

for column in num_cols:
    df = df.withColumn(column, col(column).cast('float'))

df = df.withColumn('cnt', col('cnt').cast('float'))

df

DataFrame[season: int, yr: int, mnth: int, hr: int, holiday: int, weekday: int, workingday: int, weathersit: int, temp: float, atemp: float, hum: float, windspeed: float, casual: string, registered: float, cnt: float]

In [6]:
df.show(5)

#分割出训练和测试集。
(train_df, test_df) = df.randomSplit([0.8, 0.2], seed=911120)

+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+----+
|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered| cnt|
+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+----+
|     1|  0|   1|  0|      0|      6|         0|         1|0.24|0.2879|0.81|      0.0|     3|      13.0|16.0|
|     1|  0|   1|  1|      0|      6|         0|         1|0.22|0.2727| 0.8|      0.0|     8|      32.0|40.0|
|     1|  0|   1|  2|      0|      6|         0|         1|0.22|0.2727| 0.8|      0.0|     5|      27.0|32.0|
|     1|  0|   1|  3|      0|      6|         0|         1|0.24|0.2879|0.75|      0.0|     3|      10.0|13.0|
|     1|  0|   1|  4|      0|      6|         0|         1|0.24|0.2879|0.75|      0.0|     0|       1.0| 1.0|
+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+----+
only showi

In [7]:
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml import Pipeline


enc_cols = [c+'_c' for c in cate_cols]

ohe = OneHotEncoder(inputCols= cate_cols, outputCols=enc_cols)

va = VectorAssembler(inputCols = num_cols+enc_cols, outputCol= 'features')

scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

regressor = DecisionTreeRegressor(featuresCol='scaled_features', labelCol='cnt')

pipeline = Pipeline(stages=[ohe, va, scaler, regressor])

model = pipeline.fit(train_df)

predictions = model.transform(test_df)

21/12/15 11:25:02 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [8]:
from pyspark.ml.evaluation import RegressionEvaluator


evaluator = RegressionEvaluator(labelCol="cnt", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)

#评估回归器的均方根误差。
print ('Spark-ML的分布式线性回归模型在bike_rental测试集上的均方根误差为：%.2f。' %(rmse))

Spark-ML的分布式线性回归模型在bike_rental测试集上的均方根误差为：34.73。
