# Предсказание цены недвижимости в Калифорнии

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии
в 1990 году. 

## Данные

В колонках датасета содержатся следующие данные:
- longitude — широта;
- latitude — долгота;
- housing_median_age — медианный возраст жителей жилого массива;
- total_rooms — общее количество комнат в домах жилого массива;
- total_bedrooms — общее количество спален в домах жилого массива;
- population — количество человек, которые проживают в жилом массиве;
- households — количество домовладений в жилом массиве;
- median_income — медианный доход жителей жилого массива;
- median_house_value — медианная стоимость дома в жилом массиве;
- ocean_proximity — близость к океану.

## Цель

На основе данных нужно предсказать медианную стоимость дома в жилом массиве — median_house_value. Для оценки качества модели следует использовать метрики RMSE, MAE и R2.

In [1]:
import warnings

import pandas as pd
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import Imputer, OneHotEncoder, StandardScaler, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException

In [2]:
spark = (
    SparkSession.builder.master("local")
    .appName("Python Spark SQL basic example")
    .getOrCreate()
)

In [3]:
sc = spark.sparkContext

In [4]:
try:
    df = spark.read.load(
        "/datasets/housing.csv", format="csv", inferSchema=True, header=True
    )
except AnalysisException:
    df = spark.read.load(
        "./data/housing.csv", format="csv", inferSchema=True, header=True
    )

In [5]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [6]:
# код ревьюера для проверки
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



# Подготовка данных

Посмотрим типы колонок датафрейма

In [7]:
df.dtypes

[('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'double'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double'),
 ('ocean_proximity', 'string')]

Все колонки кроме первой (целочисленный индекс) и последней (строковый категориальный признак) имеют тип `double`. В принципе, для количества комнат (`total_rooms`, `total_bedrooms`) и количества людей (`population`, `households`) следовало использовать целый тип, но это не критично.

Первым делом разобъём датасет на обучающую и тестовую части. Для этого используем функцию Spark randomSplit

In [8]:
df_train, df_test = df.randomSplit(weights = [0.80, 0.20], seed = 42)

Проверим датафреймы на пропуски:

In [9]:
df_train.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_train.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           163|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



In [10]:
df_test.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_test.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|            44|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



Пропуски обнаружились в колонке "Количество спален". Разумно заменить пропуски медианным значением

In [11]:
imputer=Imputer(strategy="median", inputCols=["total_bedrooms"], outputCols=["total_bedrooms_filled"])

Преобразуем строковый категориальный признак "рассояние до океана" в числовой техникой One hot encoding:

In [12]:
indexer = StringIndexer(
    inputCol="ocean_proximity", outputCol="ocean_proximity_idx"
).fit(df_train)
ohe = OneHotEncoder(inputCol="ocean_proximity_idx", outputCol="ocean_proximity_vec")

Осталось подготовить вектора данных для моделей. Поскольку мы планируем обучать две модели и векторов нужно построить два.

In [13]:
vectorAssembler_num = VectorAssembler(
    inputCols=[
        "longitude",
        "latitude",
        "housing_median_age",
        "total_rooms",
        "total_bedrooms_filled",
        "population",
        "households",
        "median_income",
    ],
    outputCol="features",
)

In [14]:
vectorAssembler_all = VectorAssembler(
    inputCols=[
        "longitude",
        "latitude",
        "housing_median_age",
        "total_rooms",
        "total_bedrooms_filled",
        "population",
        "households",
        "median_income",
        "ocean_proximity_vec",
    ],
    outputCol="features",
)

Последнее, что стоит сделать с данными перед обучением модели - номализовать их. Воспользуемся для этого функцией `StandardScaler`

In [15]:
scaler = StandardScaler(inputCol="features", outputCol="features_scaled", withStd=True, withMean=True)

# Обучение моделей

Обучим две мадели линейной регрессии: одну только на числовых данных, а вторую на числовых и категориальных. В качестве метрик в обоих случаях используем `RMSE`, `MAE` и `R2`.

Начнём с модели, которую обучим только на числовых данных. Создадим модель и подберём её гиперпараметры на кросс-валидации. Для этого создадим evaluator, estimator (наша модель) и карту параметров, на которой будет работать алгоритм:  

In [16]:
lr = LinearRegression(
    featuresCol="features_scaled", labelCol="median_house_value", solver="l-bfgs", maxIter=30
)
evaluator = RegressionEvaluator(
    predictionCol="prediction", labelCol="median_house_value"
)
gs = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.05, 0.1])
    .addGrid(lr.elasticNetParam, [0.5, 0.75, 1.0])
    .build()
)

cv = CrossValidator(estimator=lr, estimatorParamMaps=gs, evaluator=evaluator)

Соберём pipline для построения моделей:

In [17]:
%%time
model_num = Pipeline(stages=[imputer, indexer, ohe, vectorAssembler_num, scaler, cv]).fit(df_train)
model_all = Pipeline(stages=[imputer, indexer, ohe, vectorAssembler_all, scaler, cv]).fit(df_train)

CPU times: user 2.05 s, sys: 663 ms, total: 2.71 s
Wall time: 2min 52s


In [18]:
predict_num = model_num.transform(df_test)
predict_all = model_all.transform(df_test)

Посмотрим метрики полученных моделей:

In [19]:
eval = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="rmse")

In [20]:
rmse_num = eval.evaluate(predict_num)
mae_num = eval.evaluate(predict_num, {eval.metricName: "mae"})
r2_num = eval.evaluate(predict_num, {eval.metricName: "r2"})

print(f"Numeric features model RMSE on test: {rmse_num:.2f}")
print(f"Numeric features model MAE  on test: {mae_num:.2f}")
print(f"Numeric features model R2   on test: {r2_num:.4f}")

Numeric features model RMSE on test: 70410.94
Numeric features model MAE  on test: 50951.74
Numeric features model R2   on test: 0.6306


In [21]:
rmse_all = eval.evaluate(predict_all)
mae_all = eval.evaluate(predict_all, {eval.metricName: "mae"})
r2_all = eval.evaluate(predict_all, {eval.metricName: "r2"})

print(f"Numeric features model RMSE on test: {rmse_all:.2f}")
print(f"Numeric features model MAE  on test: {mae_all:.2f}")
print(f"Numeric features model R2   on test: {r2_all:.4f}")

Numeric features model RMSE on test: 69572.90
Numeric features model MAE  on test: 49872.67
Numeric features model R2   on test: 0.6394


# Анализ результатов

Видно, что метрики обеих моделий весьма близки. При этом по среднеквадратичной ошибке и по средней абсолютной ошибке чуть лучше была первая модель, тогда как по метрике R2 - вторая. Прирост метрик второй модели над первой оказался следующим:

In [22]:
print(f'Прирост RMSE: {(rmse_all-rmse_num)/rmse_num:.02%}')
print(f'Прирост MAE: {(mae_all-mae_num)/mae_num:.02%}')
print(f'Прирост R2: {(r2_all-r2_num)/r2_num:.02%}')

Прирост RMSE: -1.19%
Прирост MAE: -2.12%
Прирост R2: 1.39%


Из полученных результатов можно сделать вывод, что модель линейной регрессии на всех признаках показала себя лучше константной модели, в сравнении с линейной регрессией только на числовых признаках. Но это улучшение сопровождалось бОльшим разбросом значений, что вызвало прирост среднеквадратичной и средней абсолютной ошибок. Очевидно, виной всему - зашумлённые входные данные.

In [23]:
from tqdm import tqdm

In [24]:
# код ревьюера для объяснения
for _ in tqdm(range(100)):
    df_train_new, df_test_new = df.randomSplit(weights = [0.80, 0.20], seed = 42)
    assert df_train.count() == df_train_new.count(), f'Train size: {df_train.count()} != {df_train_new.count()}'
    assert df_test.count() == df_test_new.count(), f'Test size: {df_test.count()} != {df_test_new.count()}'

100%|██████████| 100/100 [01:08<00:00,  1.47it/s]


In [25]:
print(f'Train size: {df_train.count()} = {df_train_new.count()}')
print(f'Test size:  {df_test.count()} = {df_test_new.count()}')

Train size: 16549 = 16549
Test size:  4091 = 4091


In [26]:
# код ревьюера
spark.stop()