创建Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pyspark.sql.functions as func
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import GBTRegressor, LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import numpy as np
import os

spark = (
    SparkSession.builder
    .appName("HousePrices")
    .master("local[*]")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")


In [10]:
from pathlib import Path

DATA_DIR = Path.cwd().parent /'files'

# 测试数据
train_path = str(DATA_DIR/"train_simple.csv")
test_path  = str(DATA_DIR/"test_simple.csv")

train_spark = spark.read.option("header", "true").csv(train_path, inferSchema=True)
test_spark  = spark.read.option("header", "true").csv(test_path,  inferSchema=True)

print(f"Train: {train_spark.count()} rows, Test: {test_spark.count()} rows")

Train: 20 rows, Test: 20 rows


缺失值处理

In [11]:
# ---------- 类别型 NA → "None" ----------
na_to_none_cols = [
    'Alley','BsmtQual','BsmtCond','BsmtExposure','BsmtFinType1','BsmtFinType2',
    'FireplaceQu','GarageType','GarageFinish','GarageQual','GarageCond',
    'PoolQC','Fence','MiscFeature'
]

for col in na_to_none_cols:
    train_spark = train_spark.withColumn(col, F.when(F.col(col).isNull(), "None").otherwise(F.col(col)))
    test_spark  = test_spark.withColumn(col,  F.when(F.col(col).isNull(), "None").otherwise(F.col(col)))

# ---------- LotFrontage：按 Neighborhood 中位数填充 ----------
lotfront_median = (
    train_spark.groupBy("Neighborhood")
    .agg(F.percentile_approx("LotFrontage", 0.5).alias("median_lot"))
)

train_spark = train_spark.join(lotfront_median, on="Neighborhood", how="left")
test_spark  = test_spark.join(lotfront_median,  on="Neighborhood", how="left")

train_spark = train_spark.withColumn(
    "LotFrontage",
    F.when(F.col("LotFrontage").isNull(), F.col("median_lot")).otherwise(F.col("LotFrontage"))
)
test_spark = test_spark.withColumn(
    "LotFrontage",
    F.when(F.col("LotFrontage").isNull(), F.col("median_lot")).otherwise(F.col("LotFrontage"))
)

train_spark = train_spark.drop("median_lot")
test_spark  = test_spark.drop("median_lot")

# ---------- 其余数值型缺失 → 中位数 ----------
num_cols = [f.name for f in train_spark.schema.fields if isinstance(f.dataType, (IntegerType, DoubleType, LongType))]
num_cols = [c for c in num_cols if c not in ['Id', 'SalePrice']]

for col in num_cols:
    median_val = train_spark.approxQuantile(col, [0.5], 0.01)[0]
    train_spark = train_spark.withColumn(col, F.when(F.col(col).isNull(), median_val).otherwise(F.col(col)))
    test_spark  = test_spark.withColumn(col,  F.when(F.col(col).isNull(), median_val).otherwise(F.col(col)))

# GarageYrBlt → 无车库设为 0
train_spark = train_spark.withColumn("GarageYrBlt", F.when(F.col("GarageYrBlt").isNull(), 0).otherwise(F.col("GarageYrBlt")))
test_spark  = test_spark.withColumn("GarageYrBlt",  F.when(F.col("GarageYrBlt").isNull(), 0).otherwise(F.col("GarageYrBlt")))

特征工程

In [12]:
from pyspark.sql.functions import udf, col, year, lit

current_year = 2025

def feature_engineering(df):
    df = df.withColumn("HouseAge",   lit(current_year) - col("YearBuilt"))
    df = df.withColumn("RemodAge",   lit(current_year) - col("YearRemodAdd"))
    df = df.withColumn("GarageAge",  lit(current_year) - col("GarageYrBlt"))

    df = df.withColumn("TotalSF",    col("TotalBsmtSF") + col("1stFlrSF") + col("2ndFlrSF"))
    df = df.withColumn("TotalBath",  col("FullBath") + 0.5*col("HalfBath") +
                                    col("BsmtFullBath") + 0.5*col("BsmtHalfBath"))

    df = df.withColumn("HasPool",     (col("PoolArea") > 0).cast("int"))
    df = df.withColumn("Has2ndFloor", (col("2ndFlrSF") > 0).cast("int"))
    df = df.withColumn("HasGarage",   (col("GarageArea") > 0).cast("int"))
    df = df.withColumn("HasBsmt",     (col("TotalBsmtSF") > 0).cast("int"))

    df = df.withColumn("OverallGrade", col("OverallQual") * col("OverallCond"))

    return df

train_spark = feature_engineering(train_spark)
test_spark  = feature_engineering(test_spark)

目标变量处理+变量拆分

In [13]:
# 对 SalePrice 取 log1p
train_spark = train_spark.withColumn("log_SalePrice", F.log1p(col("SalePrice")))

# 特征列
cat_cols = [f.name for f in train_spark.schema.fields if isinstance(f.dataType, StringType)]
num_cols = [f.name for f in train_spark.schema.fields 
            if isinstance(f.dataType, (IntegerType, DoubleType, LongType))
            and f.name not in ['Id', 'SalePrice', 'log_SalePrice']]

bool_cols = ['HasPool','Has2ndFloor','HasGarage','HasBsmt']
num_cols = [c for c in num_cols if c not in bool_cols]

print(f"Cat: {len(cat_cols)}, Num: {len(num_cols)}, Bool: {len(bool_cols)}")

Cat: 44, Num: 41, Bool: 4


PySpark ML Pipeline（StringIndexer + OneHot + StandardScaler）

In [14]:
# 1. StringIndexer
indexers = [
    StringIndexer(inputCol=col, outputCol=col+"_idx", handleInvalid="keep")
    for col in cat_cols
]

# 2. OneHotEncoder
encoders = [
    OneHotEncoder(inputCol=col+"_idx", outputCol=col+"_ohe", handleInvalid="keep")
    for col in cat_cols
]

# 3. VectorAssembler
ohe_cols = [col+"_ohe" for col in cat_cols]
assembler_inputs = ohe_cols + num_cols + bool_cols

assembler = VectorAssembler(
    inputCols=assembler_inputs,
    outputCol="features_raw",
    handleInvalid="skip"
)

# 4. 标准化
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

# 5. 模型（GBT 回归，MLlib 自带）
gbt = GBTRegressor(
    featuresCol="features",
    labelCol="log_SalePrice",
    maxDepth=6,
    maxIter=200,
    subsamplingRate=0.8,
    seed=42
)

# 完整 Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, gbt])

交叉验证 & 超参数搜索

In [None]:
evaluator = RegressionEvaluator(
    labelCol="log_SalePrice",
    predictionCol="prediction",
    metricName="rmse"
)

paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [5, 6])
             .addGrid(gbt.maxIter, [150, 250])
             .addGrid(gbt.subsamplingRate, [0.7, 0.85])
             .build())

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5,
    seed=42,
    parallelism=4
)

# 训练（完整数据约 5-8 分钟）
cv_model = cv.fit(train_spark)
best_model = cv_model.bestModel

print("Best CV RMSE (log):", evaluator.evaluate(best_model.transform(train_spark)))

预测test，生成结果

In [None]:
# 预测
pred_spark = best_model.transform(test_spark)

# 逆变换
pred_spark = pred_spark.withColumn("SalePrice", F.expm1(col("prediction")))

# 转为 Pandas 并保存
submission = pred_spark.select("Id", "SalePrice").toPandas()
submission['Id'] = submission['Id'].astype(int)

result_path = str(DATA_DIR / "submission_spark.csv")
submission.to_csv(result_path, index=False)

print(submission.head())