# Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

Предсказание стоимости жилья в Калифорнии в 1990 году. 
**Цель проекта** - обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Для оценки качества модели используйте метрики RMSE, MAE и R2.

**В колонках датасета содержатся следующие данные:**

- longitude — широта;
- latitude — долгота;
- housing_median_age — медианный возраст жителей жилого массива;
- total_rooms — общее количество комнат в домах жилого массива;
- total_bedrooms — общее количество спален в домах жилого массива;
- population — количество человек, которые проживают в жилом массиве;
- households — количество домовладений в жилом массиве;
- median_income — медианный доход жителей жилого массива;
- median_house_value — медианная стоимость дома в жилом массиве;
- ocean_proximity — близость к океану.

**План работ:**

- Подготовка данных
- Обучение моделей
- Анализ результатов

# Подготовка данных

In [49]:
# загрузка основных библиотек
import pandas as pd 
import numpy as np
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from functools import reduce

from pyspark.ml.feature import (StringIndexer,
                                VectorAssembler,
                                StandardScaler,
                                Imputer)

from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline

from pyspark.ml.regression import (DecisionTreeRegressor, 
                                   RandomForestRegressor,
                                     LinearRegression,
                                      GBTRegressor)

from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
        

In [2]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:85% !important; }</style>"))

In [3]:
# запуск спарк сессии
spark = SparkSession.builder \
                    .master("local") \
                    .appName('California housing predict') \
                    .getOrCreate()

In [4]:
# читаем датасет 
df = spark.read.load('/datasets/housing.csv',format="csv", 
                     sep=",", inferSchema=True, 
                    header="true")

                                                                                

In [5]:
# просмотр датасета
df.printSchema()
df.show()
df.count()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR B

20640

In [6]:
# изучение пустых значений

df.where(reduce(lambda x, y: x | y, (F.col(x).isNull() for x in df.columns))).show()

df.where(reduce(lambda x, y: x | y, (F.col(x).isNull() for x in df.columns))).count()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.16|   37.77|              47.0|     1256.0|          null|     570.0|     218.0|        4.375|          161900.0|       NEAR BAY|
|  -122.17|   37.75|              38.0|      992.0|          null|     732.0|     259.0|       1.6196|           85100.0|       NEAR BAY|
|  -122.28|   37.78|              29.0|     5154.0|          null|    3741.0|    1273.0|       2.5762|          173400.0|       NEAR BAY|
|  -122.24|   37.75|              45.0|      891.0|          null|     384.0|     146.0|       4.9489|          247100.0|       NEAR BAY|
|   -122.1|   37.69|              

207

In [9]:
# замена пустых значений на медианные
imputer = Imputer(inputCol="total_bedrooms", outputCol="total_bedrooms_out")

In [10]:
model_inp = imputer.setStrategy("median").setMissingValue(1.0).fit(df)

In [11]:
model_inp.surrogateDF.show()

+--------------+
|total_bedrooms|
+--------------+
|         435.0|
+--------------+



In [12]:
# трансформирую пустые строчки
df = model_inp.transform(df)

In [13]:
# удаляю старый столбец и переименовываю новый
df = df.drop('total_bedrooms')
df = df.withColumnRenamed('total_bedrooms_out', 'total_bedrooms')

In [22]:
# проверка выводимых значений
df.printSchema()
df.show()
df.count()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- total_bedrooms: double (nullable = true)

+---------+--------+------------------+-----------+----------+----------+-------------+------------------+---------------+--------------+
|longitude|latitude|housing_median_age|total_rooms|population|households|median_income|median_house_value|ocean_proximity|total_bedrooms|
+---------+--------+------------------+-----------+----------+----------+-------------+------------------+---------------+--------------+
|  -122.23|   37.88|              41.0|      880.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|         129

20640

In [23]:
# статистически показатели 
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,population,households,median_income,median_house_value,ocean_proximity,total_bedrooms
0,count,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640,20640.0
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,,536.8598837209303
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,,419.37589062980027
3,min,-124.35,32.54,1.0,2.0,3.0,1.0,0.4999,14999.0,<1H OCEAN,2.0
4,max,-114.31,41.95,52.0,39320.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN,6445.0


In [24]:
print(df.columns)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'population', 'households', 'median_income', 'median_house_value', 'ocean_proximity', 'total_bedrooms']


In [25]:
 # разделим колонки 
cat_cols = ['ocean_proximity']
num_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
                'total_bedrooms', 'population', 'households', 'median_income']
target = 'median_house_value'

In [26]:
# переведем значения категориальных переменных
indexer = StringIndexer(inputCols = cat_cols,
                       outputCols = [c+'_idx' for c in cat_cols])

 # перекодирование категориальных переменных
encoder = OneHotEncoder(inputCols = [c+'_idx' for c in cat_cols],
                       outputCols = [c+'_ohe' for c in cat_cols])

# сборка категориальных переменных в один вектор
categorical_assembler = \
        VectorAssembler(inputCols=[c+'_ohe' for c in cat_cols],
                        outputCol="cat_f")

# сборка численных переменных в один вектор
numerical_assembler = VectorAssembler(inputCols=num_cols,
                                    outputCol="num_f")

# скалирование вектора 
standardScaler = StandardScaler(inputCol='num_f',
                                outputCol='num_f_scaled', withMean=True)

all_features = ['cat_f', 'num_f_scaled']

# итоговое добавления векторов категориальных и численных 
# признаков через 1 единый вектор
final_assembler = VectorAssembler(inputCols=all_features, outputCol='features')

# сборка пайплайна
pipeline = Pipeline(stages=[indexer, encoder, categorical_assembler, numerical_assembler, standardScaler, final_assembler])

In [27]:
# делим выборку на 2 датасета
train_data, test_data = df.randomSplit([.8,.2], seed=12345)
print(train_data.count(), test_data.count()) 

                                                                                

16431 4209


In [28]:
# обучения пайплайна на тренировочной выборке
pipeline = pipeline.fit(train_data)

                                                                                

In [29]:
# трансформация тренировочноый выборки
train_data = pipeline.transform(train_data)

In [30]:
# трансформация тестовой выборки
test_data = pipeline.transform(test_data)

**Вывод:** Датасет был изучен. Пустые значения было решено заполнить медианным значением. Всего в датасете 20640 строчек. Даныне были разделены на обучающию и тренировочную выборку, послсе чего данные были трансформированны через пайплайн.

# Обучение моделей

In [41]:
%%time

# линейнай регрессия

lr = LinearRegression(labelCol=target, featuresCol="features")

paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .addGrid(lr.maxIter, range(5, 30, 5))\
    .addGrid(lr.solver, ['auto', 'normal', 'l-bfgs'])\
    .addGrid(lr.standardization, [True, False])\
    .build()

tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(labelCol=target),
                           trainRatio=0.8)

model = tvs.fit(train_data)

predictions_lr = model.transform(test_data)

23/09/13 09:06:48 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/09/13 09:06:48 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/09/13 09:06:48 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/09/13 09:06:48 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

CPU times: user 7.14 s, sys: 3.4 s, total: 10.5 s
Wall time: 3min 54s


In [43]:
%%time

# дерево решений

dt = DecisionTreeRegressor(labelCol=target, featuresCol="features", seed = 12345)

paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, range(1, 31, 5)) \
    .addGrid(dt.minWeightFractionPerNode, np.arange(0.0, 0.05, 0.1)) \
    .build()

tvs = TrainValidationSplit(estimator=dt,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(labelCol=target),
                           trainRatio=0.8)


model = tvs.fit(train_data)

predictions_dt = model.transform(test_data)


23/09/13 09:10:49 WARN DAGScheduler: Broadcasting large task binary with size 1137.3 KiB
23/09/13 09:10:50 WARN DAGScheduler: Broadcasting large task binary with size 1422.0 KiB
23/09/13 09:10:51 WARN DAGScheduler: Broadcasting large task binary with size 1078.7 KiB
23/09/13 09:10:53 WARN DAGScheduler: Broadcasting large task binary with size 1137.3 KiB
23/09/13 09:10:53 WARN DAGScheduler: Broadcasting large task binary with size 1422.0 KiB
23/09/13 09:10:54 WARN DAGScheduler: Broadcasting large task binary with size 1722.6 KiB
23/09/13 09:10:54 WARN DAGScheduler: Broadcasting large task binary with size 2023.4 KiB
23/09/13 09:10:54 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/09/13 09:10:55 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
23/09/13 09:10:55 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
23/09/13 09:10:56 WARN DAGScheduler: Broadcasting large task binary with size 1828.1 KiB
23/09/13 09:10:58 WARN DAGSche

CPU times: user 153 ms, sys: 87.5 ms, total: 240 ms
Wall time: 24 s


In [44]:
%%time

# случайны лес

rf = RandomForestRegressor(labelCol=target, featuresCol="features", seed = 12345)

paramGrid = ParamGridBuilder() \
    .addGrid(rf.bootstrap, [True, False]) \
    .addGrid(rf.maxDepth, range(1, 31, 5)) \
    .addGrid(rf.numTrees, range(1, 31, 10)) \
    .build()

tvs = TrainValidationSplit(estimator=rf,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(labelCol=target),
                           trainRatio=0.8)


model = tvs.fit(train_data)

predictions_rf = model.transform(test_data)

23/09/13 09:11:15 WARN DAGScheduler: Broadcasting large task binary with size 1394.0 KiB
23/09/13 09:11:15 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/09/13 09:11:18 WARN DAGScheduler: Broadcasting large task binary with size 1562.1 KiB
23/09/13 09:11:19 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
23/09/13 09:11:20 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/09/13 09:11:24 WARN DAGScheduler: Broadcasting large task binary with size 1125.6 KiB
23/09/13 09:11:28 WARN DAGScheduler: Broadcasting large task binary with size 1394.0 KiB
23/09/13 09:11:28 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
23/09/13 09:11:29 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
23/09/13 09:11:30 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
23/09/13 09:11:31 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
23/09/13 09:11:33 WARN DAGScheduler: Broad

CPU times: user 1.47 s, sys: 592 ms, total: 2.06 s
Wall time: 8min


In [57]:
%%time

# Градиентный бустинг

gbt = GBTRegressor(labelCol=target, featuresCol="features", seed = 12345)

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, range(10,50,10)) \
    .addGrid(gbt.maxDepth, range(0, 30, 10)) \
    .build()

tvs = TrainValidationSplit(estimator=gbt,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(labelCol=target),
                           trainRatio=0.8)

model = tvs.fit(train_data)

predictions_gbt = model.transform(test_data)

23/09/13 09:37:50 WARN DAGScheduler: Broadcasting large task binary with size 1038.0 KiB
23/09/13 09:37:50 WARN DAGScheduler: Broadcasting large task binary with size 1037.6 KiB
23/09/13 09:37:50 WARN DAGScheduler: Broadcasting large task binary with size 1038.1 KiB
23/09/13 09:37:50 WARN DAGScheduler: Broadcasting large task binary with size 1038.6 KiB
23/09/13 09:37:51 WARN DAGScheduler: Broadcasting large task binary with size 1039.8 KiB
23/09/13 09:37:51 WARN DAGScheduler: Broadcasting large task binary with size 1042.1 KiB
23/09/13 09:37:51 WARN DAGScheduler: Broadcasting large task binary with size 1046.6 KiB
23/09/13 09:37:51 WARN DAGScheduler: Broadcasting large task binary with size 1055.8 KiB
23/09/13 09:37:51 WARN DAGScheduler: Broadcasting large task binary with size 1072.5 KiB
23/09/13 09:37:51 WARN DAGScheduler: Broadcasting large task binary with size 1101.8 KiB
23/09/13 09:37:51 WARN DAGScheduler: Broadcasting large task binary with size 1146.8 KiB
23/09/13 09:37:51 WAR

CPU times: user 4.24 s, sys: 1.21 s, total: 5.45 s
Wall time: 22min 34s


**Вывод:** Были проведены исследования 5 различных моделей регрессии при помощи подбора параметров и кроссвалидации. Результаты предсказаний исследуются в следюуещй главе.

# Анализ результатов

In [45]:
def metrix(predict):
    '''Функция выводит оценку результатов предзказания'''
    
    RMSE = RegressionEvaluator(labelCol=target,
            metricName='rmse').evaluate(predict)
    
    MAE  = RegressionEvaluator(labelCol=target,
                metricName='mae').evaluate(predict)
    
    R2 =  RegressionEvaluator(labelCol=target,
                metricName='r2').evaluate(predict)
    
    print('RMSE: ', RMSE)
    print('MAE: ', MAE)
    print('R2: ', R2)

In [46]:
print('LinearRegression')
metrix(predictions_lr)

LinearRegression
RMSE:  67811.62033441877
MAE:  49060.03567203643
R2:  0.657814789474914


In [47]:
print('DecisionTreeRegressor')
metrix(predictions_dt)

DecisionTreeRegressor
RMSE:  64222.09193261303
MAE:  42186.164437839674
R2:  0.6930823336975


In [48]:
print('RandomForestRegressor')
metrix(predictions_rf)

RandomForestRegressor
RMSE:  51418.58764373223
MAE:  34357.19612151204
R2:  0.8032597231498182


In [58]:
print('GBTRegressor')
metrix(predictions_gbt)

RandomForestRegressor
RMSE:  58231.547684955476
MAE:  37485.618909083685
R2:  0.7476695527520807


# Итог:
> В ходе работы были выполнены следующие операции:
>>- Изучена информация о датасете
>>- Подготовлены данные к обучению моделей
>>- Обучение различных моделей 
>>- Выбор наилучшей модели 

В ходе исследования было решено, что лучший способ оценки модели регресси в данном случая является метрика R2, т.к. MAE и RMSE не информативны. При опоре на данную метрику было выявлено, что наилучшей моделью будет "Случайный лес".
