In [1]:
##################################################################
#           《Python人工智能编程实践（2024年度版）》开源代码
#-----------------------------------------------------------------
#            @章节号：7.4.2.3（分布式随机森林回归器）                                 
#            @作者：范淼、徐晟桐 
#            @购书链接：暂无
#            @电子邮箱：fm12@tsinghua.org.cn             
#            @官方交流QQ群号：561500762                        
##################################################################

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as func


#创建SparkSession。
spark = SparkSession.builder.getOrCreate()

#读取文件并存储到DataFrame中。
df = spark.read.csv('./datasets/bike_rental/bike_rental.csv', header=True)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/31 17:26:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/31 17:26:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/08/31 17:26:41 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
#选择数据列。
df = df.select([c for c in df.columns if c not in ['instant','dteday']])

In [4]:
from pyspark.sql.functions import col


cate_cols = ['season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit']
num_cols = ['temp', 'atemp', 'hum', 'windspeed', 'registered']

#类别型数据采用整数编码。
for column in cate_cols:
    df = df.withColumn(column, col(column).cast('int'))
    
#数值型数据采用浮点数编码。
for column in num_cols:
    df = df.withColumn(column, col(column).cast('float'))

df = df.withColumn('cnt', col('cnt').cast('float'))

In [5]:
#分割出训练和测试集。
(train_df, test_df) = df.randomSplit([0.8, 0.2], seed=911120)

In [6]:
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline


enc_cols = [c+'_c' for c in cate_cols]

ohe = OneHotEncoder(inputCols= cate_cols, outputCols=enc_cols)

va = VectorAssembler(inputCols = num_cols+enc_cols, outputCol= 'features')

scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

regressor = RandomForestRegressor(featuresCol='scaled_features', labelCol='cnt')

pipeline = Pipeline(stages=[ohe, va, scaler, regressor])

model = pipeline.fit(train_df)

predictions = model.transform(test_df)

23/08/31 17:26:44 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [7]:
from pyspark.ml.evaluation import RegressionEvaluator


evaluator = RegressionEvaluator(labelCol="cnt", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)

#评估回归器的均方根误差。
print ('Spark-ML的分布式随机森林回归器在bike_rental测试集上的均方根误差为：%.2f。' %(rmse))

Spark-ML的分布式随机森林回归器在bike_rental测试集上的均方根误差为：55.66。
