In [1]:
from pyspark.sql.session import SparkSession
import findspark
findspark.init()

from pyspark.sql.types import *
import pyspark.sql.functions as fn
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler,MinMaxScaler,VectorSlicer
from pyspark.ml.feature import OneHotEncoder,StringIndexer

In [2]:
spark=SparkSession.Builder().master("local[2]").getOrCreate()    # 创建spark实例对象
sc = spark.sparkContext

In [4]:
# 定义数据结构
myschema=StructType([  
            StructField("标题",StringType(),True),
            StructField("室数", IntegerType(), True),
            StructField("厅数", IntegerType(), True),
            StructField("卫数", IntegerType(), True),
            StructField("面积", DoubleType(), True),
            StructField("朝向",StringType(),True),
            StructField("建造时间",IntegerType(),True),
            StructField("小区", StringType(), True),
            StructField("行政区", StringType(), True),
            StructField("街道", StringType(), True),
            StructField("门牌号", StringType(), True),
            StructField("单价", DoubleType(),True),
            StructField("总价", DoubleType(),True),
            StructField("楼层所属区域",StringType(),True),
])

In [5]:
data=spark.read.csv('./data/data.csv',schema=myschema,header=True)
data.show(3)                                 # 展示前三行的数据
data.printSchema()                           # 打印数据结构信息

+-----------------------------------+----+----+----+------+----+--------+----------------+------+--------------+------------+-------+-----+------------+
|                               标题|室数|厅数|卫数|  面积|朝向|建造时间|            小区|行政区|          街道|      门牌号|   单价| 总价|楼层所属区域|
+-----------------------------------+----+----+----+------+----+--------+----------------+------+--------------+------------+-------+-----+------------+
| 新上 武汉天地优质笋房 多层 通透...|   3|   2|   2|133.94|南北|    2010|  武汉天地御江苑|  江岸|        永清街|   永安路6号|49372.0|661.0|        高层|
|                 正南 2室2厅 精装修|   2|   2|   1|  85.3|  南|    2016|  福星华府(东区)|  江汉|        杨汊湖|  新湾路23号|14536.0|124.0|        高层|
|急售，急售，降价20万毛坯朝南三房...|   3|   2|   1| 98.92|南北|    2022|中建星光城(二期)|  洪山|九峰山森林公园|光谷三路59号|14456.0|143.0|        中层|
+-----------------------------------+----+----+----+------+----+--------+----------------+------+--------------+------------+-------+-----+------------+
only showing top 3 rows

root
 |-- 标题: string (nullable = true)


In [6]:
df=data.select("室数","厅数","卫数","面积","建造时间","朝向","行政区","楼层所属区域","单价")
df.show(3)
df.printSchema()

+----+----+----+------+--------+----+------+------------+-------+
|室数|厅数|卫数|  面积|建造时间|朝向|行政区|楼层所属区域|   单价|
+----+----+----+------+--------+----+------+------------+-------+
|   3|   2|   2|133.94|    2010|南北|  江岸|        高层|49372.0|
|   2|   2|   1|  85.3|    2016|  南|  江汉|        高层|14536.0|
|   3|   2|   1| 98.92|    2022|南北|  洪山|        中层|14456.0|
+----+----+----+------+--------+----+------+------------+-------+
only showing top 3 rows

root
 |-- 室数: integer (nullable = true)
 |-- 厅数: integer (nullable = true)
 |-- 卫数: integer (nullable = true)
 |-- 面积: double (nullable = true)
 |-- 建造时间: integer (nullable = true)
 |-- 朝向: string (nullable = true)
 |-- 行政区: string (nullable = true)
 |-- 楼层所属区域: string (nullable = true)
 |-- 单价: double (nullable = true)



In [7]:
# 对朝向进行编码
stringindexer = StringIndexer(inputCol='朝向', outputCol="朝向_features")
encoder = OneHotEncoder(dropLast=False, inputCol="朝向_features", outputCol="朝向_onehot")
pipeline = Pipeline(stages=[stringindexer, encoder])
pipeline_fit = pipeline.fit(df)
df = pipeline_fit.transform(df)
df.show(3)

+----+----+----+------+--------+----+------+------------+-------+-------------+-------------+
|室数|厅数|卫数|  面积|建造时间|朝向|行政区|楼层所属区域|   单价|朝向_features|  朝向_onehot|
+----+----+----+------+--------+----+------+------------+-------+-------------+-------------+
|   3|   2|   2|133.94|    2010|南北|  江岸|        高层|49372.0|          0.0|(9,[0],[1.0])|
|   2|   2|   1|  85.3|    2016|  南|  江汉|        高层|14536.0|          1.0|(9,[1],[1.0])|
|   3|   2|   1| 98.92|    2022|南北|  洪山|        中层|14456.0|          0.0|(9,[0],[1.0])|
+----+----+----+------+--------+----+------+------------+-------+-------------+-------------+
only showing top 3 rows



In [8]:
# 对行政区进行编码
stringindexer = StringIndexer(inputCol='行政区', outputCol="行政区_features")
encoder = OneHotEncoder(dropLast=False, inputCol="行政区_features", outputCol="行政区_onehot")
pipeline = Pipeline(stages=[stringindexer, encoder])
pipeline_fit = pipeline.fit(df)
df = pipeline_fit.transform(df)
df.show(3)

+----+----+----+------+--------+----+------+------------+-------+-------------+-------------+---------------+--------------+
|室数|厅数|卫数|  面积|建造时间|朝向|行政区|楼层所属区域|   单价|朝向_features|  朝向_onehot|行政区_features| 行政区_onehot|
+----+----+----+------+--------+----+------+------------+-------+-------------+-------------+---------------+--------------+
|   3|   2|   2|133.94|    2010|南北|  江岸|        高层|49372.0|          0.0|(9,[0],[1.0])|            3.0|(13,[3],[1.0])|
|   2|   2|   1|  85.3|    2016|  南|  江汉|        高层|14536.0|          1.0|(9,[1],[1.0])|            6.0|(13,[6],[1.0])|
|   3|   2|   1| 98.92|    2022|南北|  洪山|        中层|14456.0|          0.0|(9,[0],[1.0])|            0.0|(13,[0],[1.0])|
+----+----+----+------+--------+----+------+------------+-------+-------------+-------------+---------------+--------------+
only showing top 3 rows



In [9]:
# 对楼层所属区域进行编码
stringindexer = StringIndexer(inputCol='楼层所属区域', outputCol="楼层所属区域_features")
encoder = OneHotEncoder(dropLast=False, inputCol="楼层所属区域_features", outputCol="楼层所属区域_onehot")
pipeline = Pipeline(stages=[stringindexer, encoder])
pipeline_fit = pipeline.fit(df)
df = pipeline_fit.transform(df)
df.show(3)

+----+----+----+------+--------+----+------+------------+-------+-------------+-------------+---------------+--------------+---------------------+-------------------+
|室数|厅数|卫数|  面积|建造时间|朝向|行政区|楼层所属区域|   单价|朝向_features|  朝向_onehot|行政区_features| 行政区_onehot|楼层所属区域_features|楼层所属区域_onehot|
+----+----+----+------+--------+----+------+------------+-------+-------------+-------------+---------------+--------------+---------------------+-------------------+
|   3|   2|   2|133.94|    2010|南北|  江岸|        高层|49372.0|          0.0|(9,[0],[1.0])|            3.0|(13,[3],[1.0])|                  0.0|      (5,[0],[1.0])|
|   2|   2|   1|  85.3|    2016|  南|  江汉|        高层|14536.0|          1.0|(9,[1],[1.0])|            6.0|(13,[6],[1.0])|                  0.0|      (5,[0],[1.0])|
|   3|   2|   1| 98.92|    2022|南北|  洪山|        中层|14456.0|          0.0|(9,[0],[1.0])|            0.0|(13,[0],[1.0])|                  2.0|      (5,[2],[1.0])|
+----+----+----+------+--------+----+------+------------+-----

In [10]:
# 用当前年份减去建造时间构建新的“建造时间”字段
df=df.withColumn("建造时间", 2023-data["建造时间"])
df.show(3)

+----+----+----+------+--------+----+------+------------+-------+-------------+-------------+---------------+--------------+---------------------+-------------------+
|室数|厅数|卫数|  面积|建造时间|朝向|行政区|楼层所属区域|   单价|朝向_features|  朝向_onehot|行政区_features| 行政区_onehot|楼层所属区域_features|楼层所属区域_onehot|
+----+----+----+------+--------+----+------+------------+-------+-------------+-------------+---------------+--------------+---------------------+-------------------+
|   3|   2|   2|133.94|      13|南北|  江岸|        高层|49372.0|          0.0|(9,[0],[1.0])|            3.0|(13,[3],[1.0])|                  0.0|      (5,[0],[1.0])|
|   2|   2|   1|  85.3|       7|  南|  江汉|        高层|14536.0|          1.0|(9,[1],[1.0])|            6.0|(13,[6],[1.0])|                  0.0|      (5,[0],[1.0])|
|   3|   2|   1| 98.92|       1|南北|  洪山|        中层|14456.0|          0.0|(9,[0],[1.0])|            0.0|(13,[0],[1.0])|                  2.0|      (5,[2],[1.0])|
+----+----+----+------+--------+----+------+------------+-----

In [11]:
df.printSchema()

root
 |-- 室数: integer (nullable = true)
 |-- 厅数: integer (nullable = true)
 |-- 卫数: integer (nullable = true)
 |-- 面积: double (nullable = true)
 |-- 建造时间: integer (nullable = true)
 |-- 朝向: string (nullable = true)
 |-- 行政区: string (nullable = true)
 |-- 楼层所属区域: string (nullable = true)
 |-- 单价: double (nullable = true)
 |-- 朝向_features: double (nullable = false)
 |-- 朝向_onehot: vector (nullable = true)
 |-- 行政区_features: double (nullable = false)
 |-- 行政区_onehot: vector (nullable = true)
 |-- 楼层所属区域_features: double (nullable = false)
 |-- 楼层所属区域_onehot: vector (nullable = true)



In [12]:
df=df.select("室数","厅数","卫数","面积","建造时间","朝向_onehot","行政区_onehot","楼层所属区域_onehot","单价")
df.show(3)
df.printSchema()

+----+----+----+------+--------+-------------+--------------+-------------------+-------+
|室数|厅数|卫数|  面积|建造时间|  朝向_onehot| 行政区_onehot|楼层所属区域_onehot|   单价|
+----+----+----+------+--------+-------------+--------------+-------------------+-------+
|   3|   2|   2|133.94|      13|(9,[0],[1.0])|(13,[3],[1.0])|      (5,[0],[1.0])|49372.0|
|   2|   2|   1|  85.3|       7|(9,[1],[1.0])|(13,[6],[1.0])|      (5,[0],[1.0])|14536.0|
|   3|   2|   1| 98.92|       1|(9,[0],[1.0])|(13,[0],[1.0])|      (5,[2],[1.0])|14456.0|
+----+----+----+------+--------+-------------+--------------+-------------------+-------+
only showing top 3 rows

root
 |-- 室数: integer (nullable = true)
 |-- 厅数: integer (nullable = true)
 |-- 卫数: integer (nullable = true)
 |-- 面积: double (nullable = true)
 |-- 建造时间: integer (nullable = true)
 |-- 朝向_onehot: vector (nullable = true)
 |-- 行政区_onehot: vector (nullable = true)
 |-- 楼层所属区域_onehot: vector (nullable = true)
 |-- 单价: double (nullable = true)



## 连续变量归一化

In [13]:
# 对数值型变量进行变量归一化处理
vectorizer = VectorAssembler(inputCols=['室数','厅数','卫数','面积','建造时间',"单价"],outputCol= '连续变量_vec')
mmScaler = MinMaxScaler(inputCol=vectorizer.getOutputCol(),outputCol='连续变量_scaled')
pipeline = Pipeline(stages=[vectorizer, mmScaler])
df= pipeline.fit(df).transform(df)
df.show(3)

+----+----+----+------+--------+-------------+--------------+-------------------+-------+--------------------+--------------------+
|室数|厅数|卫数|  面积|建造时间|  朝向_onehot| 行政区_onehot|楼层所属区域_onehot|   单价|        连续变量_vec|     连续变量_scaled|
+----+----+----+------+--------+-------------+--------------+-------------------+-------+--------------------+--------------------+
|   3|   2|   2|133.94|      13|(9,[0],[1.0])|(13,[3],[1.0])|      (5,[0],[1.0])|49372.0|[3.0,2.0,2.0,133....|[0.4,0.5,0.333333...|
|   2|   2|   1|  85.3|       7|(9,[1],[1.0])|(13,[6],[1.0])|      (5,[0],[1.0])|14536.0|[2.0,2.0,1.0,85.3...|[0.2,0.5,0.0,0.19...|
|   3|   2|   1| 98.92|       1|(9,[0],[1.0])|(13,[0],[1.0])|      (5,[2],[1.0])|14456.0|[3.0,2.0,1.0,98.9...|[0.4,0.5,0.0,0.25...|
+----+----+----+------+--------+-------------+--------------+-------------------+-------+--------------------+--------------------+
only showing top 3 rows



In [14]:
# 归一化后的数值型自变量
vs = VectorSlicer(inputCol="连续变量_scaled", outputCol="数值型自变量_scaled",indices=[0,1,2,3,4])
df=vs.transform(df)
# 归一化后的数值型因变量
vs = VectorSlicer(inputCol="连续变量_scaled", outputCol="数值型因变量_scaled",indices=[5])
df=vs.transform(df)

In [15]:
df.show(3)
df.printSchema()

+----+----+----+------+--------+-------------+--------------+-------------------+-------+--------------------+--------------------+--------------------+--------------------+
|室数|厅数|卫数|  面积|建造时间|  朝向_onehot| 行政区_onehot|楼层所属区域_onehot|   单价|        连续变量_vec|     连续变量_scaled| 数值型自变量_scaled| 数值型因变量_scaled|
+----+----+----+------+--------+-------------+--------------+-------------------+-------+--------------------+--------------------+--------------------+--------------------+
|   3|   2|   2|133.94|      13|(9,[0],[1.0])|(13,[3],[1.0])|      (5,[0],[1.0])|49372.0|[3.0,2.0,2.0,133....|[0.4,0.5,0.333333...|[0.4,0.5,0.333333...|[0.9644192422731804]|
|   2|   2|   1|  85.3|       7|(9,[1],[1.0])|(13,[6],[1.0])|      (5,[0],[1.0])|14536.0|[2.0,2.0,1.0,85.3...|[0.2,0.5,0.0,0.19...|[0.2,0.5,0.0,0.19...|[0.24083998005982...|
|   3|   2|   1| 98.92|       1|(9,[0],[1.0])|(13,[0],[1.0])|      (5,[2],[1.0])|14456.0|[3.0,2.0,1.0,98.9...|[0.4,0.5,0.0,0.25...|[0.4,0.5,0.0,0.25...|[0.23917829843801...|
+

In [16]:
import pandas
from pyspark.sql.functions import monotonically_increasing_id

In [17]:
a=df.select("数值型因变量_scaled").collect()
b=[a[i][0][0] for i in range(df.count())]                             # 由数值型因变量_scaled的取值所构成的一个列表
new_df = spark.createDataFrame(pandas.DataFrame(b))                   # 创建spark dataframe
new_df = new_df.withColumn("id", monotonically_increasing_id())       # 添加索引
new_df.show(3)

+-------------------+---+
|                  0| id|
+-------------------+---+
| 0.9644192422731804|  0|
|0.24083998005982052|  1|
|0.23917829843801924|  2|
+-------------------+---+
only showing top 3 rows



In [18]:
from pyspark.sql.functions import monotonically_increasing_id
df1 = df.withColumn("pid", monotonically_increasing_id())
# df1.show(2,vertical=True)

In [19]:
join_df = df1.join(new_df, df1.pid==new_df.id)
join_df.show(1,vertical=True)

-RECORD 0-----------------------------------
 室数                | 3                    
 厅数                | 2                    
 卫数                | 2                    
 面积                | 133.94               
 建造时间            | 13                   
 朝向_onehot         | (9,[0],[1.0])        
 行政区_onehot       | (13,[3],[1.0])       
 楼层所属区域_onehot | (5,[0],[1.0])        
 单价                | 49372.0              
 连续变量_vec        | [3.0,2.0,2.0,133.... 
 连续变量_scaled     | [0.4,0.5,0.333333... 
 数值型自变量_scaled | [0.4,0.5,0.333333... 
 数值型因变量_scaled | [0.9644192422731804] 
 pid                 | 0                    
 0                   | 0.9644192422731804   
 id                  | 0                    
only showing top 1 row



In [20]:
# 修改列数据(数值型因变量_scaled)
join_df=join_df.withColumn("数值型因变量_scaled",join_df["0"])
join_df.show(1,vertical=True)
join_df.printSchema()

-RECORD 0-----------------------------------
 室数                | 3                    
 厅数                | 2                    
 卫数                | 2                    
 面积                | 133.94               
 建造时间            | 13                   
 朝向_onehot         | (9,[0],[1.0])        
 行政区_onehot       | (13,[3],[1.0])       
 楼层所属区域_onehot | (5,[0],[1.0])        
 单价                | 49372.0              
 连续变量_vec        | [3.0,2.0,2.0,133.... 
 连续变量_scaled     | [0.4,0.5,0.333333... 
 数值型自变量_scaled | [0.4,0.5,0.333333... 
 数值型因变量_scaled | 0.9644192422731804   
 pid                 | 0                    
 0                   | 0.9644192422731804   
 id                  | 0                    
only showing top 1 row

root
 |-- 室数: integer (nullable = true)
 |-- 厅数: integer (nullable = true)
 |-- 卫数: integer (nullable = true)
 |-- 面积: double (nullable = true)
 |-- 建造时间: integer (nullable = true)
 |-- 朝向_onehot: vector (nullable = true)
 |-- 行政区_onehot: vector (nullable 

## 数据整合

In [21]:
inputCols=['朝向_onehot','行政区_onehot','楼层所属区域_onehot','数值型自变量_scaled']
assembler = VectorAssembler(inputCols=inputCols,outputCol='features')
join_df=assembler.transform(join_df)
join_df.show(3)

+----+----+----+------+--------+-------------+--------------+-------------------+-------+--------------------+--------------------+--------------------+-------------------+---+-------------------+---+--------------------+
|室数|厅数|卫数|  面积|建造时间|  朝向_onehot| 行政区_onehot|楼层所属区域_onehot|   单价|        连续变量_vec|     连续变量_scaled| 数值型自变量_scaled|数值型因变量_scaled|pid|                  0| id|            features|
+----+----+----+------+--------+-------------+--------------+-------------------+-------+--------------------+--------------------+--------------------+-------------------+---+-------------------+---+--------------------+
|   3|   2|   2|133.94|      13|(9,[0],[1.0])|(13,[3],[1.0])|      (5,[0],[1.0])|49372.0|[3.0,2.0,2.0,133....|[0.4,0.5,0.333333...|[0.4,0.5,0.333333...| 0.9644192422731804|  0| 0.9644192422731804|  0|(32,[0,12,22,27,2...|
|   2|   2|   1|  85.3|       7|(9,[1],[1.0])|(13,[6],[1.0])|      (5,[0],[1.0])|14536.0|[2.0,2.0,1.0,85.3...|[0.2,0.5,0.0,0.19...|[0.2,0.5,0.0,0.19...|0.240

In [22]:
data=join_df.select("features","数值型因变量_scaled")
data.show(3,truncate=False)
data.printSchema()

+------------------------------------------------------------------------------------------------------------+-------------------+
|features                                                                                                    |数值型因变量_scaled|
+------------------------------------------------------------------------------------------------------------+-------------------+
|(32,[0,12,22,27,28,29,30,31],[1.0,1.0,1.0,0.4,0.5,0.3333333333333333,0.3800883149653048,0.3939393939393939])|0.9644192422731804 |
|(32,[1,15,22,27,28,30,31],[1.0,1.0,1.0,0.2,0.5,0.19959924301458307,0.21212121212121213])                    |0.24083998005982052|
|(32,[0,9,24,27,28,30,31],[1.0,1.0,1.0,0.4,0.5,0.2501391517310475,0.030303030303030304])                     |0.23917829843801924|
+------------------------------------------------------------------------------------------------------------+-------------------+
only showing top 3 rows

root
 |-- features: vector (nullable = true)
 |-- 数值型因变量_scaled:

## 数据集拆分
70%训练集，30%测试集

In [23]:
train_data,test_data =data.randomSplit([0.7, 0.3], seed=2023)
train_data.show(3)
train_data.count()

+--------------------+-------------------+
|            features|数值型因变量_scaled|
+--------------------+-------------------+
|(32,[0,9,22,27,28...|0.23113991359255565|
|(32,[0,9,22,27,28...|0.23768278497839812|
|(32,[0,9,22,27,28...|0.22519940179461614|
+--------------------+-------------------+
only showing top 3 rows



257

In [24]:
test_data.show(3)
test_data.count()

+--------------------+-------------------+
|            features|数值型因变量_scaled|
+--------------------+-------------------+
|(32,[0,9,22,27,28...| 0.2703555998670654|
|(32,[0,9,22,27,28...|0.22503323363243602|
|(32,[0,9,22,27,28...|0.40403788634097704|
+--------------------+-------------------+
only showing top 3 rows



119

## 模型拟合+模型评估
模型分回归决策树、GBDT、随机森林

评估结果以RMSE（均方根误差）、MAE（平均绝对误差）和R²（决定系数）为标准

In [25]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

### CART 回归决策树

In [26]:
# 回归树调参
dt= DecisionTreeRegressor(maxDepth=5, maxBins=30, labelCol="数值型因变量_scaled", seed=123)
grid= (ParamGridBuilder().addGrid(dt.maxDepth, [5, 10, 15, 20])
                          .addGrid(dt.maxBins, [20, 30, 40])
                          .build())
dt_evaluator = RegressionEvaluator(labelCol='数值型因变量_scaled', metricName="rmse", predictionCol='prediction')
cv = CrossValidator(estimator=dt,
                    evaluator=dt_evaluator,
                    estimatorParamMaps=grid,
                    numFolds=3)
cvModel_dt = cv.fit(train_data)

In [27]:
dt_results = [
    ([
        {key.name: paramValue} for key, paramValue in zip(params.keys(), params.values())
    ], metric) for params, metric in zip(cvModel_dt.getEstimatorParamMaps(), cvModel_dt.avgMetrics)
]
sorted(dt_results, key=lambda el:el[1], reverse=False)[0]              # 按rmse升序排列，取出rmse最小的

([{'maxDepth': 5}, {'maxBins': 30}], 0.10861565044272971)

In [28]:
# 回归树-模型评估
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(maxDepth=5,maxBins=20,varianceCol="variance", labelCol='数值型因变量_scaled')
dt_model = dt.fit(train_data)
dt_model.featureImportances

SparseVector(32, {0: 0.0261, 1: 0.0001, 11: 0.5136, 12: 0.1594, 20: 0.0719, 21: 0.0678, 22: 0.0009, 23: 0.0001, 29: 0.1563, 30: 0.0034, 31: 0.0004})

In [29]:
dt_t_result = dt_model.transform(test_data)
dt_t_result.show(3)

+--------------------+-------------------+-------------------+--------------------+
|            features|数值型因变量_scaled|         prediction|            variance|
+--------------------+-------------------+-------------------+--------------------+
|(32,[0,9,22,27,28...| 0.2703555998670654|0.23794095356207795|0.004883734407695688|
|(32,[0,9,22,27,28...|0.22503323363243602|0.23794095356207795|0.004883734407695688|
|(32,[0,9,22,27,28...|0.40403788634097704|0.23794095356207795|0.004883734407695688|
+--------------------+-------------------+-------------------+--------------------+
only showing top 3 rows



In [30]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator_rmse = RegressionEvaluator(labelCol='数值型因变量_scaled', predictionCol='prediction', metricName='rmse')
evaluator_mae  = RegressionEvaluator(labelCol='数值型因变量_scaled', predictionCol='prediction', metricName='mae')
evaluator_r2   = RegressionEvaluator(labelCol='数值型因变量_scaled', predictionCol='prediction', metricName='r2')

dt_rmse = evaluator_rmse.evaluate(dt_t_result)
dt_mae  = evaluator_mae.evaluate(dt_t_result)
dt_r2   = evaluator_r2.evaluate(dt_t_result)

print("决策树回归评估结果:")
print(f"RMSE: {dt_rmse:.4f}")
print(f"MAE : {dt_mae:.4f}")
print(f"R²  : {dt_r2:.4f}")

决策树回归评估结果:
RMSE: 0.0983
MAE : 0.0636
R²  : 0.2576


### Bagging集成——随机森林

In [31]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [32]:
# 随机森林调参
rf = RandomForestRegressor(numTrees=3, maxDepth=10, maxBins=30, labelCol="数值型因变量_scaled", seed=123)
grid = (ParamGridBuilder().addGrid(rf.numTrees, [20, 40,60,80])
                          .addGrid(rf.maxDepth, [5, 10,20, 30])
                          .addGrid(rf.maxBins, [20, 30, 40])
                          .build())
rf_evaluator = RegressionEvaluator(labelCol='数值型因变量_scaled', metricName="rmse", predictionCol='prediction')
cv = CrossValidator(estimator=rf,
                    evaluator=rf_evaluator,
                    estimatorParamMaps=grid,
                    numFolds=3)
cvModel_rf = cv.fit(train_data)
rf_results = [
    ([
        {key.name: paramValue} for key, paramValue in zip(params.keys(), params.values())
    ], metric) for params, metric in zip(cvModel_rf.getEstimatorParamMaps(), cvModel_rf.avgMetrics)
]
sorted(rf_results, key=lambda el:el[1], reverse=False)[0]

([{'numTrees': 80}, {'maxDepth': 5}, {'maxBins': 30}], 0.09144978840774276)

In [33]:
# 随机森林-模型评估
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(numTrees=40, maxDepth=10, maxBins=20,seed=2023, labelCol='数值型因变量_scaled')
rf_model = rf.fit(train_data)
rf_model.featureImportances

SparseVector(32, {0: 0.0241, 1: 0.0201, 2: 0.0, 3: 0.0027, 4: 0.0021, 5: 0.003, 7: 0.0044, 8: 0.0, 9: 0.0067, 10: 0.0042, 11: 0.2289, 12: 0.0691, 13: 0.0034, 14: 0.0212, 15: 0.0183, 16: 0.0067, 17: 0.0107, 18: 0.0331, 19: 0.0, 20: 0.0302, 21: 0.0302, 22: 0.0267, 23: 0.0459, 24: 0.0111, 27: 0.0592, 28: 0.0172, 29: 0.094, 30: 0.1036, 31: 0.1232})

In [34]:
rf_t_result = rf_model.transform(test_data)
rf_t_result.show(3)

+--------------------+-------------------+-------------------+
|            features|数值型因变量_scaled|         prediction|
+--------------------+-------------------+-------------------+
|(32,[0,9,22,27,28...| 0.2703555998670654|0.24865871323351357|
|(32,[0,9,22,27,28...|0.22503323363243602|0.23392733890728787|
|(32,[0,9,22,27,28...|0.40403788634097704|0.24997741301561396|
+--------------------+-------------------+-------------------+
only showing top 3 rows



In [35]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator_rmse = RegressionEvaluator(labelCol='数值型因变量_scaled', predictionCol='prediction', metricName='rmse')
evaluator_mae  = RegressionEvaluator(labelCol='数值型因变量_scaled', predictionCol='prediction', metricName='mae')
evaluator_r2   = RegressionEvaluator(labelCol='数值型因变量_scaled', predictionCol='prediction', metricName='r2')

rf_rmse = evaluator_rmse.evaluate(rf_t_result)
rf_mae  = evaluator_mae.evaluate(rf_t_result)
rf_r2   = evaluator_r2.evaluate(rf_t_result)

print("随机森林回归评估结果:")
print(f"RMSE: {rf_rmse:.4f}")
print(f"MAE : {rf_mae:.4f}")
print(f"R²  : {rf_r2:.4f}")

随机森林回归评估结果:
RMSE: 0.0953
MAE : 0.0603
R²  : 0.3030


### Boosting集成——梯度提升树GDBT

In [36]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [37]:
#GBDT调参
gbt= GBTRegressor(maxDepth=5, maxBins=30, labelCol="数值型因变量_scaled", seed=123)
grid= (ParamGridBuilder().addGrid(gbt.maxDepth, [5, 10, 15, 20])
                          .addGrid(gbt.maxBins, [20, 30, 40])
                          .build())
gbt_evaluator = RegressionEvaluator(labelCol='数值型因变量_scaled', metricName="rmse", predictionCol='prediction')
cv = CrossValidator(estimator=gbt,
                    evaluator=gbt_evaluator,
                    estimatorParamMaps=grid,
                    numFolds=3)
cvModel_gbt = cv.fit(train_data)
gbt_results = [
    ([
        {key.name: paramValue} for key, paramValue in zip(params.keys(), params.values())
    ], metric) for params, metric in zip(cvModel_gbt.getEstimatorParamMaps(), cvModel_gbt.avgMetrics)
]
sorted(gbt_results, key=lambda el:el[1], reverse=False)[0]

([{'maxDepth': 5}, {'maxBins': 20}], 0.11013356139113517)

In [38]:
#GBDT-测试集模型评估
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(maxDepth=5, maxBins=20,labelCol='数值型因变量_scaled')
gbt_model = gbt.fit(train_data)
gdt_t_result = gbt_model.transform(test_data)
gdt_t_result.show(3)

+--------------------+-------------------+-------------------+
|            features|数值型因变量_scaled|         prediction|
+--------------------+-------------------+-------------------+
|(32,[0,9,22,27,28...| 0.2703555998670654|0.29836238643220586|
|(32,[0,9,22,27,28...|0.22503323363243602| 0.2335099506378417|
|(32,[0,9,22,27,28...|0.40403788634097704|0.28881096369393777|
+--------------------+-------------------+-------------------+
only showing top 3 rows



In [39]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator_rmse = RegressionEvaluator(labelCol='数值型因变量_scaled', predictionCol='prediction', metricName='rmse')
evaluator_mae  = RegressionEvaluator(labelCol='数值型因变量_scaled', predictionCol='prediction', metricName='mae')
evaluator_r2   = RegressionEvaluator(labelCol='数值型因变量_scaled', predictionCol='prediction', metricName='r2')

gbt_rmse = evaluator_rmse.evaluate(gdt_t_result)
gbt_mae  = evaluator_mae.evaluate(gdt_t_result)
gbt_r2   = evaluator_r2.evaluate(gdt_t_result)

print("梯度提升树回归评估结果:")
print(f"RMSE: {gbt_rmse:.4f}")
print(f"MAE : {gbt_mae:.4f}")
print(f"R²  : {gbt_r2:.4f}")

梯度提升树回归评估结果:
RMSE: 0.0934
MAE : 0.0598
R²  : 0.3302
