##  Установка библиотек

In [1]:
!pip install pyspark



## Инициализация

Датасет: https://www.kaggle.com/competitions/house-prices-advanced-regression-techniques/overview

In [2]:
TRAIN_CSV_PATH = "train.csv"

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("House Prices").master("local").getOrCreate()

In [4]:
train_sdf = spark.read.format("csv").option("header",True).load(TRAIN_CSV_PATH)

In [5]:
print("Количество записей в датасете:", train_sdf.count())

Количество записей в датасете: 1460


## Работа с признаками

### Извлекаемые признаки




На основе описания признаков датасета были выбраны следующие:
- Цена продажи, таргет (SalePrice, целое)
- Площадь территории (LotArea, целое)
- Доступные подключения (Utilities, категориальный, следует скоратить)
- Район города (Neighborhood, категориальный, не сокращать)
- Тип жилья (BldgType, категориальный, следует скоратить)
- Оценка материала и отделки дома (OverallQual, целое)
- Оценка общего состояния дома (OverallCond, целое)
- Дата постройки дома (YearBuilt, дата, следуте привести к возрасту)
- Дата реконструкции (YearRemodAdd, дата, следует привести к логическому)
- Оценка качества материала внешней отделки дома (ExterQual, категориальный, привести к целому)
- Оценка общего состояния подвала (BsmtCond, категориальный, привести к целому)
- Оценка качества и состояния отопления (HeatingQC, категориальный, привести к целому)
- Наличие центрального кондиционирования (CentralAir, категориальный, привести к логическому)
- Оценка качества кухни (KitchenQual, категориальный, привести к целому)
- Количество каминов (Fireplaces, целый)
- Площадь гаража (GarageArea, целый)
- Площадь бассейна (PoolArea, целый)
- Площадь первого этажа (1stFlrSF, целый)
- Площадь второго этажа (2ndFlrSF, целый)

In [6]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DataType

new_train_sdf = train_sdf.select(
    F.col("SalePrice").cast(IntegerType()),
    F.col("LotArea").cast(IntegerType()),
    F.col("Utilities"),
    F.col("Neighborhood"),
    F.col("BldgType"),
    F.col("OverallQual").cast(IntegerType()),
    F.col("OverallCond").cast(IntegerType()),
    F.col("YearBuilt").cast(IntegerType()),
    F.col("YearRemodAdd").cast(IntegerType()),
    F.col("ExterQual"),
    F.col("BsmtCond"),
    F.col("HeatingQC"),
    F.col("CentralAir"),
    F.col("KitchenQual"),
    F.col("Fireplaces").cast(IntegerType()),
    F.col("GarageArea").cast(IntegerType()),
    F.col("PoolArea").cast(IntegerType()),
    F.col("1stFlrSF").cast(IntegerType()),
    F.col("2ndFlrSF").cast(IntegerType()),
)

### Поработаем с категориальными фичами

#### Посмотрим на распределение доступных подключений у дома

In [7]:
(
    new_train_sdf
    .select("Utilities")
    .groupBy("Utilities")
    .agg(F.count("Utilities").alias("Count of houses"))
).show()

+---------+---------------+
|Utilities|Count of houses|
+---------+---------------+
|   NoSeWa|              1|
|   AllPub|           1459|
+---------+---------------+



Как видно только у одного дома нет воды. Возможно стоит его преобразовать в логический тип

In [8]:
new_train_sdf = (
    new_train_sdf
    .withColumn("HasAllUtils", F.col("Utilities") == "AllPub")
    .drop("Utilities")
)

#### Посмотрим на распределение районов

In [9]:
(
    new_train_sdf
    .select("Neighborhood", "SalePrice")
    .groupBy("Neighborhood")
    .agg(
        F.count("Neighborhood").alias("Count of houses"),
        F.median("SalePrice").alias("Average sale price")
    )
).show(30)

+------------+---------------+------------------+
|Neighborhood|Count of houses|Average sale price|
+------------+---------------+------------------+
|     Veenker|             11|          218000.0|
|     BrkSide|             58|          124300.0|
|     NPkVill|              9|          146000.0|
|     NridgHt|             77|          315000.0|
|     NoRidge|             41|          301500.0|
|      NWAmes|             73|          182900.0|
|     OldTown|            113|          119000.0|
|     Gilbert|             79|          181000.0|
|     Somerst|             86|          225500.0|
|     Crawfor|             51|          200624.0|
|       NAmes|            225|          140000.0|
|      IDOTRR|             37|          103000.0|
|     Edwards|            100|          121750.0|
|      Sawyer|             74|          135000.0|
|     StoneBr|             25|          278000.0|
|     CollgCr|            150|          197200.0|
|       SWISU|             25|          139500.0|


Видно что для каких-то районов медианная зарплата выше или ниже, значит стоит учесть районность дома

#### Посмотрим на распределение типов жилья

In [10]:
(
    new_train_sdf
    .select("BldgType", "SalePrice")
    .groupBy("BldgType")
    .agg(
        F.count("BldgType").alias("Count of houses"),
        F.median("SalePrice").alias("Average sale price")
    )
).show(10)

+--------+---------------+------------------+
|BldgType|Count of houses|Average sale price|
+--------+---------------+------------------+
|   Twnhs|             43|          137500.0|
|    1Fam|           1220|          167900.0|
|  Duplex|             52|          135980.0|
|  2fmCon|             31|          127500.0|
|  TwnhsE|            114|          172200.0|
+--------+---------------+------------------+



Возможно имеет смысл выделить признак того, что дом ориентирован на 1 семью

In [11]:
new_train_sdf = (
    new_train_sdf
    .withColumn(
        "For1Fam",
        (
            F.when(F.col("BldgType") == "1Fam", True)
            .when(F.col("BldgType") == "TwnhsE", True)
            .when(F.col("BldgType") == "TwnhsI", True)
            .otherwise(False)
        )
    )
    .drop("BldgType")
)

#### Посмотрим на распределение оценки качества материала внешней отделки дома

In [12]:
(
    new_train_sdf
    .select("ExterQual", "SalePrice")
    .groupBy("ExterQual")
    .agg(
        F.count("ExterQual").alias("Count of houses"),
        F.median("SalePrice").alias("Average sale price")
    )
).show(10)

+---------+---------------+------------------+
|ExterQual|Count of houses|Average sale price|
+---------+---------------+------------------+
|       Gd|            488|          220000.0|
|       Ex|             52|          364606.5|
|       Fa|             14|           82250.0|
|       TA|            906|          139450.0|
+---------+---------------+------------------+



Смапим значения
- Ex  -> 5	Excellent        
- Gd	-> 4  Good
- TA	-> 3  Average/Typical
- Fa	-> 2  Fair
- Po	-> 1 Poor

In [13]:
new_train_sdf = (
    new_train_sdf
    .withColumn(
        "ExterQual",
        (
            F.when(F.col("ExterQual") == "Ex", 5)
            .when(F.col("ExterQual") == "Gd", 4)
            .when(F.col("ExterQual") == "TA", 3)
            .when(F.col("ExterQual") == "Fa", 2)
            .otherwise(1)
        )
    )
)

#### Посмотрим на распределение оценки общего состояния подвала

In [14]:
(
    new_train_sdf
    .select("BsmtCond", "SalePrice")
    .groupBy("BsmtCond")
    .agg(
        F.count("BsmtCond").alias("Count of houses"),
        F.median("SalePrice").alias("Average sale price")
    )
).show(10)

+--------+---------------+------------------+
|BsmtCond|Count of houses|Average sale price|
+--------+---------------+------------------+
|      Gd|             65|          193879.0|
|      NA|             37|          101800.0|
|      Po|              2|           64000.0|
|      Fa|             45|          118500.0|
|      TA|           1311|          165000.0|
+--------+---------------+------------------+



In [15]:
new_train_sdf = (
    new_train_sdf
    .withColumn(
        "BsmtCond",
        (
            F.when(F.col("BsmtCond") == "Ex", 5)
            .when(F.col("BsmtCond") == "Gd", 4)
            .when(F.col("BsmtCond") == "TA", 3)
            .when(F.col("BsmtCond") == "Fa", 2)
            .otherwise(1)
        )
    )
)

#### Посмотрим на распределение оценки качества и состояния отопления

In [16]:
(
    new_train_sdf
    .select("HeatingQC", "SalePrice")
    .groupBy("HeatingQC")
    .agg(
        F.count("HeatingQC").alias("Count of houses"),
        F.median("SalePrice").alias("Average sale price")
    )
).show(10)

+---------+---------------+------------------+
|HeatingQC|Count of houses|Average sale price|
+---------+---------------+------------------+
|       Gd|            241|          152000.0|
|       Po|              1|           87000.0|
|       Ex|            741|          194700.0|
|       Fa|             49|          123500.0|
|       TA|            428|          135000.0|
+---------+---------------+------------------+



In [17]:
new_train_sdf = (
    new_train_sdf
    .withColumn(
        "HeatingQC",
        (
            F.when(F.col("HeatingQC") == "Ex", 5)
            .when(F.col("HeatingQC") == "Gd", 4)
            .when(F.col("HeatingQC") == "TA", 3)
            .when(F.col("HeatingQC") == "Fa", 2)
            .otherwise(1)
        )
    )
)

#### Посмотрим на распределение наличие центрального кондиционирования

In [18]:
(
    new_train_sdf
    .select("CentralAir", "SalePrice")
    .groupBy("CentralAir")
    .agg(
        F.count("CentralAir").alias("Count of houses"),
        F.median("SalePrice").alias("Average sale price")
    )
).show(10)

+----------+---------------+------------------+
|CentralAir|Count of houses|Average sale price|
+----------+---------------+------------------+
|         Y|           1365|          168000.0|
|         N|             95|           98000.0|
+----------+---------------+------------------+



In [19]:
new_train_sdf = (
    new_train_sdf
    .withColumn(
        "HasCentralAir",
        (
            F.when(F.col("CentralAir") == "Y", True)
            .otherwise(False)
        )
    )
    .drop("CentralAir")
)

#### Посмотрим на распределение оценки качества кухни

In [20]:
(
    new_train_sdf
    .select("KitchenQual", "SalePrice")
    .groupBy("KitchenQual")
    .agg(
        F.count("KitchenQual").alias("Count of houses"),
        F.median("SalePrice").alias("Average sale price")
    )
).show(10)

+-----------+---------------+------------------+
|KitchenQual|Count of houses|Average sale price|
+-----------+---------------+------------------+
|         Gd|            586|          201400.0|
|         Ex|            100|          316750.0|
|         Fa|             39|          115000.0|
|         TA|            735|          137000.0|
+-----------+---------------+------------------+



In [21]:
new_train_sdf = (
    new_train_sdf
    .withColumn(
        "KitchenQual",
        (
            F.when(F.col("KitchenQual") == "Ex", 5)
            .when(F.col("KitchenQual") == "Gd", 4)
            .when(F.col("KitchenQual") == "TA", 3)
            .when(F.col("KitchenQual") == "Fa", 2)
            .otherwise(1)
        )
    )
)

### Поработаем с числовыми фичами

#### Получим факт реконструкции дома

In [22]:
new_train_sdf = (
    new_train_sdf
    .withColumn(
        "HasReconstruction", F.col("YearRemodAdd") != F.col("YearBuilt")
    )
    .drop("YearRemodAdd")
)

#### Получим возраст дома

In [23]:
CURRENT_YEAR = 2024

new_train_sdf = (
    new_train_sdf
    .withColumn(
        "Age", CURRENT_YEAR - F.col("YearBuilt")
    )
    .drop("YearBuilt")
)

### Посмотрим на полученную схему

In [24]:
new_train_sdf.printSchema()

root
 |-- SalePrice: integer (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- ExterQual: integer (nullable = false)
 |-- BsmtCond: integer (nullable = false)
 |-- HeatingQC: integer (nullable = false)
 |-- KitchenQual: integer (nullable = false)
 |-- Fireplaces: integer (nullable = true)
 |-- GarageArea: integer (nullable = true)
 |-- PoolArea: integer (nullable = true)
 |-- 1stFlrSF: integer (nullable = true)
 |-- 2ndFlrSF: integer (nullable = true)
 |-- HasAllUtils: boolean (nullable = true)
 |-- For1Fam: boolean (nullable = false)
 |-- HasCentralAir: boolean (nullable = false)
 |-- HasReconstruction: boolean (nullable = true)
 |-- Age: integer (nullable = true)



## Обучение модели

### Подготовка вектора фичей для модели

#### Масштабирование некоторых числовых признаков

LotArea, GarageArea, PoolArea, 1stFlrSF, 2ndFlrSF, Age

In [25]:
TO_SCALE_FEATURES = [
    "LotArea", "GarageArea", "PoolArea",
    "1stFlrSF", "2ndFlrSF", "Age"
]

In [26]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

to_scale_assembler = VectorAssembler(
    inputCols=TO_SCALE_FEATURES, outputCol="to_scale_features"
)
to_scale_sdf = to_scale_assembler.transform(new_train_sdf)

scaler = StandardScaler(
    inputCol="to_scale_features", outputCol="scaled_features",
    withMean=True, withStd=True
)
scaler_model = scaler.fit(to_scale_sdf)
scaled_sdf = scaler_model.transform(to_scale_sdf)

In [27]:
scaled_sdf.select(*TO_SCALE_FEATURES, "to_scale_features", "scaled_features").show(truncate=False)

+-------+----------+--------+--------+--------+---+--------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
|LotArea|GarageArea|PoolArea|1stFlrSF|2ndFlrSF|Age|to_scale_features                     |scaled_features                                                                                                              |
+-------+----------+--------+--------+--------+---+--------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
|8450   |548       |0       |856     |854     |21 |[8450.0,548.0,0.0,856.0,854.0,21.0]   |[-0.20707075668205258,0.35088009468698733,-0.06866821893757226,-0.7931620228890606,1.1614536249333418,-1.050633797867364]    |
|9600   |460       |0       |1262    |0       |48 |[9600.0,460.0,0.0,1262.0,0.0,48.0]    |[-0.0918548989935738,-0.06071021076074101,

#### One Hot кодирование района

In [28]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

district_indexer = StringIndexer(
    inputCol="Neighborhood", outputCol="NeighborhoodIndex"
)
district_indexer_model = district_indexer.fit(scaled_sdf)
indexed_sdf = district_indexer_model.transform(scaled_sdf)

district_ohe = OneHotEncoder(
    inputCol="NeighborhoodIndex", outputCol="NeighborhoodOHE"
)
district_ohe_model = district_ohe.fit(indexed_sdf)
encoded_sdf = district_ohe_model.transform(indexed_sdf)

In [29]:
encoded_sdf.select("Neighborhood", "NeighborhoodIndex", "NeighborhoodOHE").show(truncate=False)

+------------+-----------------+---------------+
|Neighborhood|NeighborhoodIndex|NeighborhoodOHE|
+------------+-----------------+---------------+
|CollgCr     |1.0              |(24,[1],[1.0]) |
|Veenker     |22.0             |(24,[22],[1.0])|
|CollgCr     |1.0              |(24,[1],[1.0]) |
|Crawfor     |11.0             |(24,[11],[1.0])|
|NoRidge     |13.0             |(24,[13],[1.0])|
|Mitchel     |12.0             |(24,[12],[1.0])|
|Somerst     |4.0              |(24,[4],[1.0]) |
|NWAmes      |8.0              |(24,[8],[1.0]) |
|OldTown     |2.0              |(24,[2],[1.0]) |
|BrkSide     |10.0             |(24,[10],[1.0])|
|Sawyer      |7.0              |(24,[7],[1.0]) |
|NridgHt     |6.0              |(24,[6],[1.0]) |
|Sawyer      |7.0              |(24,[7],[1.0]) |
|CollgCr     |1.0              |(24,[1],[1.0]) |
|NAmes       |0.0              |(24,[0],[1.0]) |
|BrkSide     |10.0             |(24,[10],[1.0])|
|NAmes       |0.0              |(24,[0],[1.0]) |
|Sawyer      |7.0   

#### Объединяем несколько векторов в один

In [30]:
OTHER_FEATURES = [
    "OverallQual", "OverallCond", "ExterQual", "BsmtCond",
    "HeatingQC", "KitchenQual", "HasAllUtils", "For1Fam",
    "HasCentralAir", "HasReconstruction"
]

In [31]:
from pyspark.ml.feature import VectorAssembler

union_assembler = VectorAssembler(
    inputCols=["scaled_features", "NeighborhoodOHE"] + OTHER_FEATURES, outputCol="features"
)
union_sdf = union_assembler.transform(encoded_sdf)

In [32]:
union_sdf.select("features").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                  |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(40,[0,1,2,3,4,5,7,30,31,32,33,34,35,36,37,38],[-0.20707075668205258,0.35088009468698733,-0.06866821893757226,-0.7931620228890606,1.1614536249333418,-1.050633797867364,1.0,7.0,5.0,4.0,3.0,5.0,4.0,1.0,1.0,1.0])         |
|(40,[0,1,2,3,4,5,28,30,31,32,33,34,35,36,37,38],[-0.0918548989935738,-0.06071021076074101,-0.06866821893757226,0.25

### Обучение модели

#### Линейная регрессия

In [33]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="SalePrice")
lr_model = lr.fit(union_sdf)

In [34]:
results = lr_model.evaluate(union_sdf)

In [35]:
results.rootMeanSquaredError

32787.5260122065

#### Случайный лес

In [36]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="features", labelCol="SalePrice")
rf_model = rf.fit(union_sdf)

In [37]:
from pyspark.ml.evaluation import RegressionEvaluator

rf_sdf = rf_model.transform(union_sdf)
err = RegressionEvaluator(labelCol="SalePrice").evaluate(rf_sdf)
err

29758.67184236945