# Предсказание стоимости жилья

Необходимо обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году с использованием библиотеки **PySpark**.  

В колонках датасета содержатся следующие данные:  
- `longitude` — широта;
- `latitude` — долгота;
- `housing_median_age` — медианный возраст жителей жилого массива;
- `total_rooms` — общее количество комнат в домах жилого массива;
- `total_bedrooms` — общее количество спален в домах жилого массива;
- `population` — количество человек, которые проживают в жилом массиве;
- `households` — количество домовладений в жилом массиве;
- `median_income` — медианный доход жителей жилого массива;
- `median_house_value` — медианная стоимость дома в жилом массиве;
- `ocean_proximity` — близость к океану.  

На основе данных нужно предсказать медианную стоимость дома в жилом массиве — `median_house_value`. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

## Импорт библиотек. Подготовка окружения

In [1]:
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Включим широкоформатный Юпитер
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
# Инициализируем Spark-сессию
spark = SparkSession.builder.master('local').appName('Houses Price Predictions').getOrCreate()

22/04/03 15:05:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Подготовка данных

### Откроем данные и исследуем их

In [3]:
# Сохраним датасет в переменную "data"
data = spark.read.csv('/datasets/housing.csv', inferSchema=True, header=True)

# Выведем на экран типы данных колонок датасета
data.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



                                                                                

Посмотрим на количество пропусков

In [4]:
# Создадим функцию, которая будет показывать количество пропущенных значений в каждом столбце
def show_nans(df):
    result = (
        df
        .select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns])
        .toPandas()
        .T
    )
    result.columns = ['Number of NaNs']
    return result

In [5]:
# Посмотрим на количество пропущенных значений в таблице "data"
show_nans(data)

                                                                                

Unnamed: 0,Number of NaNs
longitude,0
latitude,0
housing_median_age,0
total_rooms,0
total_bedrooms,207
population,0
households,0
median_income,0
median_house_value,0
ocean_proximity,0


Столбец `total_bedrooms` содержит 207 пропусков.  
Для того, чтобы заполнить пропуски, посмотрим с каким показателем данный столбец коррелирует сильнее всего.  

In [6]:
for col in data.columns:
    try:
        print(f'{col}: {data.stat.corr("total_bedrooms", col)}')
    except:
        print(f'{col}: column type is not numeric')

longitude: 0.06808179725677305
latitude: -0.06531831669569808
housing_median_age: -0.31706334936263136
total_rooms: 0.9201961721166215
total_bedrooms: 1.0
population: 0.8662661985860806
households: 0.966507240042043
median_income: -0.0072945417081858544
median_house_value: 0.04914821959942551
ocean_proximity: column type is not numeric


Наибольшая линейная корреляция, среди числовых признаков наблюдается со столбцом `households`.
Сгруппируем по столбцу `households` и для каждой группы найдем среднее `total_bedrooms`. Этим значением заполним столбец `total_bedrooms`.

In [7]:
# Создадим оконную фукнцию
window = Window().partitionBy("households")

# Заполним пропущенные значения средним по группам ""
data_filled = data.withColumn("mean", F.mean("total_bedrooms").over(window)) \
              .withColumn("total_bedrooms", F.when(F.col("total_bedrooms").isNull(), F.col("mean")) \
              .otherwise(F.col("total_bedrooms"))) \
              .drop("mean")

# Посмотрим на количество пропусков после заполнения
show_nans(data_filled)

                                                                                

Unnamed: 0,Number of NaNs
longitude,0
latitude,0
housing_median_age,0
total_rooms,0
total_bedrooms,6
population,0
households,0
median_income,0
median_house_value,0
ocean_proximity,0


В данных осталось 6 пропущенных значений. Удалим эти строки.

In [8]:
data_filled = data_filled.dropna()

# Выведем на экран количество пропущенных значений
show_nans(data_filled)

                                                                                

Unnamed: 0,Number of NaNs
longitude,0
latitude,0
housing_median_age,0
total_rooms,0
total_bedrooms,0
population,0
households,0
median_income,0
median_house_value,0
ocean_proximity,0


Теперь, пропуски в датасете отсутствуют.  
Проверим, есть ли дубликаты.

In [9]:
if data_filled.count() > data_filled.dropDuplicates().count():
    raise ValueError('Data has duplicates')
else:
    print('Data does not contain duplicates')



Data does not contain duplicates


                                                                                

Дубликаты отсутствуют. Можно двигаться дальше.

### Подготовка данных для модели машинного обучения

Сохраним название столбцов с числовми и категориальными признаки, а также таргет в переменные.

In [10]:
numeric_cols = ["housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income"]
categorical_cols = ["ocean_proximity"]
target = "median_house_value"

In [11]:
# Создадим объект "StringIndexer", чтобы преобразовать значения катег. признаков в индексы
indexer = StringIndexer(inputCols=categorical_cols,
                        outputCols=[c+'_idx' for c in categorical_cols])

# Создадим объект "OneHotEncoder", чтобы преобразовать индексные значения катег. признаков в бинарные вектора
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols], 
                        outputCols=[c+'_ohe' for c in categorical_cols])

# Создадим объект "VectorAssembler", чтобы объеденить значения числовых признаков в единый вектор
numerical_assembler = VectorAssembler(inputCols=numeric_cols, 
                                      outputCol='numeric_features')

# Создадим объект "StandardScaler", чтобы отмасштабировать вектор числовых признаков.
scaler = StandardScaler(inputCol='numeric_features', 
                        outputCol='numeric_features_scaled')

# Создадим объект "VectorAssembler", чтобы объеденить значения отмасштабированных числовых признаков 
# и ohe-категориальных признаков в единый вектор
all_features_assembler = VectorAssembler(inputCols=['numeric_features_scaled']+[c+'_ohe' for c in categorical_cols], 
                                         outputCol='all_features')

# Создадим объект "Pipeline", который объеденить все шаги преобразования признаков
pipeline = Pipeline(stages=[indexer, encoder, numerical_assembler, scaler, all_features_assembler])

# Создадим новые столбцы с преобразованными признаками и сохраним в переменную "data_filled"
data_filled = pipeline.fit(data_filled).transform(data_filled)

                                                                                

In [12]:
# Выведем на экран первые 5 строк нового датафрейма.
pd.DataFrame(data_filled.head(5), columns=data_filled.columns)

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,ocean_proximity_idx,ocean_proximity_ohe,numeric_features,numeric_features_scaled,all_features
0,-122.26,37.77,52.0,1670.0,350.0,793.0,299.0,2.9732,282100.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)","[52.0, 1670.0, 350.0, 793.0, 299.0, 2.9732]","[4.131577965924315, 0.7658678113742143, 0.8324...","[4.131577965924315, 0.7658678113742143, 0.8324..."
1,-122.11,37.66,36.0,1755.0,316.0,913.0,299.0,4.1302,172700.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)","[36.0, 1755.0, 316.0, 913.0, 299.0, 4.1302]","[2.860323207178372, 0.8048491071627223, 0.7515...","[2.860323207178372, 0.8048491071627223, 0.7515..."
2,-121.91,37.69,23.0,2179.0,308.0,926.0,299.0,5.9345,259600.0,<1H OCEAN,0.0,"(1.0, 0.0, 0.0, 0.0)","[23.0, 2179.0, 308.0, 926.0, 299.0, 5.9345]","[1.8274287156972933, 0.999296982625397, 0.7325...","[1.8274287156972933, 0.999296982625397, 0.7325..."
3,-118.97,35.38,42.0,1185.0,358.0,1038.0,299.0,0.9951,48000.0,INLAND,1.0,"(0.0, 1.0, 0.0, 0.0)","[42.0, 1185.0, 358.0, 1038.0, 299.0, 0.9951]","[3.3370437417081007, 0.5434451236397868, 0.851...","[3.3370437417081007, 0.5434451236397868, 0.851..."
4,-118.21,34.08,39.0,986.0,361.0,1347.0,299.0,2.2907,133900.0,<1H OCEAN,0.0,"(1.0, 0.0, 0.0, 0.0)","[39.0, 986.0, 361.0, 1347.0, 299.0, 2.2907]","[3.098683474443236, 0.45218303114669184, 0.858...","[3.098683474443236, 0.45218303114669184, 0.858..."


## Обучение моделей

Разделим наш датасет на две части — выборку для обучения и выборку для тестирования качества модели.

In [13]:
train_data, test_data = data_filled.randomSplit([.8,.2], seed=52)

# Посмотрим на размер получившихся выборок
print('Train shape:', train_data.count(), '\n', 'Test shape:', test_data.count()) 



Train shape: 16397 
 Test shape: 4237


                                                                                

Добавим в тестовый датасет средние и медианные предсказания для проверки моделей на адекватность.

In [14]:
# Рассчитаем среднее и медиану нашего таргета на тренировочной выборке
mean = train_data.select(F.mean('median_house_value')).collect()[0].__getitem__('avg(median_house_value)')
median = train_data.approxQuantile('median_house_value', [0.5], 0)[0]

# Добавим в тестовые данные столбцы "mean_prediction" и "median_prediction" 
# и заполним их средним и медианным значениями соответсвенно.
test_data = test_data.withColumn('mean_prediction', F.lit(mean)).withColumn('median_prediction', F.lit(median))

                                                                                

### Обучим модель на всех признаках

При обучении модели используем регуляризацию ElasticNet (L1/L2 = 0.5/0.5). Силу регуляризации зададим равную 1.

In [15]:
linReg = LinearRegression(featuresCol='all_features',
                          regParam=1,
                          elasticNetParam=0.5,
                          labelCol=target,
                          predictionCol='prediction',
                          standardization=False
                         )

LinRegFitted = linReg.fit(train_data)

predictionsAllFeatures = LinRegFitted.transform(test_data)

print('Coefficients: \n', LinRegFitted.coefficients)

22/04/03 15:07:19 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/04/03 15:07:19 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

Coefficients: 
 [14244.540083231796,-14253.640250777864,31746.09959777052,-46796.65904796323,33544.479440083334,76138.32238893038,5030.0567013015025,-64691.988223068125,17614.080411051946,8686.207557226984]


### Обучим модель только на числовых признаках

При обучении модели также используем регуляризацию ElasticNet (L1/L2 = 0.5/0.5). Силу регуляризации зададим равную 1.

In [16]:
linReg = LinearRegression(featuresCol='numeric_features_scaled',
                          regParam=1,
                          elasticNetParam=0.5,
                          labelCol=target,
                          predictionCol='prediction',
                          standardization=False
                          )

LinRegFitted = linReg.fit(train_data)

predictionsNumFeatures = LinRegFitted.transform(test_data)

print('Coefficients: \n', LinRegFitted.coefficients)



Coefficients: 
 [23083.541866944306,-42266.89941424237,42656.57128986577,-43243.01003375272,48765.12324233819,90427.48578248474]


                                                                                

У обоих моделей получились высокие коэффициенты. Это можно объяснить тем, что значения признаков маленькие (так как они были отмасштабированы), а предсказываем мы медианную стоимость (большой показатель).

## Анализ результатов

Сравним результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2

In [17]:
# Инициализируем "RegressionEvaluator" для оценки предсказаний моделей.
evaluator = RegressionEvaluator(labelCol=target)

# Создадим таблицу, в которую сохраним результаты оценки моделей
metrics_table = pd.DataFrame()

# В цикле вычислим необходимые метрики для обоих моделей и сохраним результаты в таблицу "metrics_table"
for metric in ['rmse', 'mae', 'r2']:
    for model_name, features in zip(['allFeaturesModel', 'numFeaturesModel'], [predictionsAllFeatures, predictionsNumFeatures]):
        evaluator.setPredictionCol("prediction")
        res = evaluator.evaluate(features, {evaluator.metricName: metric})
        metrics_table.loc[metric, model_name] = res
    # Также добавим блок, в котором рассчитаем метрики для константных предсказаний
    if metric == 'rmse':
        evaluator.setPredictionCol("mean_prediction")
        res = evaluator.evaluate(test_data, {evaluator.metricName: metric})
        metrics_table.loc[metric, 'dummyModel'] = res
    elif metric == 'mae':
        evaluator.setPredictionCol("median_prediction")
        res = evaluator.evaluate(test_data, {evaluator.metricName: metric})
        metrics_table.loc[metric, 'dummyModel'] = res

# Выведем результаты на экран
metrics_table

                                                                                

Unnamed: 0,allFeaturesModel,numFeaturesModel,dummyModel
rmse,69458.563317,74837.544256,114594.693013
mae,50573.873279,55120.338368,87867.73283
r2,0.632439,0.573306,


## Выводы

В первую очередь, стоит сказать, что обе модели прошли проверку на адекватность. Их RMSE и MAE ниже, чем у модели, которая дают константные предсказания.  
Модель линейной регрессии, обученная на всех признаках показала более высокий результат по всем метрикам, нежели модель линейной регрессии, обученная только на числовых признаках.  
Оценки предсказаний моделей представленны в таблице ниже:
  

|      |   allFeaturesModel |   numFeaturesModel |   dummyModel |
|:-----|-------------------:|-------------------:|--------------:|
| rmse |       69458.6      |       74837.5      |      114595   |
| mae  |       50573.9      |       55120.3      |       87867.7 |
| r2   |           0.632439 |           0.573306 |         nan   |
