## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

В колонках датасета содержатся следующие данные:


longitude — широта;

latitude — долгота;

housing_median_age — медианный возраст жителей жилого массива;

total_rooms — общее количество комнат в домах жилого массива;

total_bedrooms — общее количество спален в домах жилого массива;

population — количество человек, которые проживают в жилом массиве;

households — количество домовладений в жилом массиве;

median_income — медианный доход жителей жилого массива;
median_house_value — медианная стоимость дома в жилом массиве;

ocean_proximity — близость к океану.



# Подготовка данных

**Импорт всех библиотек**

In [1]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import Imputer
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
#from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder

        
from pyspark.ml import Pipeline

train_ratio = 0.8
test_ratio = 0.2
seed = 42000

spark = SparkSession.builder \
                    .master("local") \
                    .appName("California Housing ML") \
                    .getOrCreate()

**Загрузка  данных**

In [2]:
df = spark.read.load('/datasets/housing.csv', format='csv', sep=',', inferSchema=True, header='true')
df.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



                                                                                

In [3]:
df.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Вывод типов данных столбцов и их названий

In [4]:
print(pd.DataFrame(df.dtypes, columns=['column', 'type']))

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


Изучаем данные с помощью describe()

In [5]:
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


**Смотрим пропуски ии заполняем их.**

In [6]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).toPandas()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,0,0,0,0,207,0,0,0,0,0


Пропуски есть в total_bedrooms. Заполняем их

In [7]:
filled_df = df.fillna({"total_bedrooms": df.select("total_bedrooms").agg({"total_bedrooms": "mean"}).first()[0]})

In [8]:
columns = filled_df.columns


for column in columns:
    print(column, filled_df.where(F.isnan(column) | F.col(column).isNull()).count())
    

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


## Преобразование

**Преобразование категориальных признаков**

Разделим колонки на два типа: числовые и категориальные. Так же запишем целевой признак.

Разделим наш датасет на две части -  train и test. Для каждого разбиения будем использовать метод randomSplit()

In [9]:
train_data, test_data = filled_df.randomSplit([train_ratio, test_ratio], seed=seed)

In [10]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
                   'total_bedrooms', 'population', 'households', 'median_income']
target = 'median_house_value' 

In [11]:
indexer = StringIndexer(inputCols=categorical_cols,
                        outputCols=[col + '_indexed' for col in categorical_cols])

pipeline = Pipeline(stages=[indexer])

pipeline_model = pipeline.fit(train_data)
train_data = pipeline_model.transform(train_data)

test_data = pipeline_model.transform(test_data)

                                                                                

In [12]:
test_data.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_indexed|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-----------------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|                    2.0|
|  -124.23|   40.54|              52.0|     2694.0|         453.0|    1152.0|     435.0|       3.0806|          106700.0|     NEAR OCEAN|                    2.0|
|  -124.18|   40.79|              40.0|     1398.0|         311.0|     788.0|     279.0|       1.4668|           64600.0|     NEAR OCEAN|                    2.0|
|  -124.17|   40.77|        

Трансформируем категориальные признаки с помощью StringIndexer

Далее выполним OHE-кодирование для категорий:

In [13]:
ohe = OneHotEncoder(inputCols=[c + '_indexed' for c in categorical_cols],
                    outputCols=[c + '_ohe' for c in categorical_cols])

pipeline_ohe = Pipeline(stages=[ohe])

pipeline_model = pipeline_ohe.fit(train_data)
train_data= pipeline_model.transform(train_data)

test_data = pipeline_model.transform(test_data)

**Преобразование числовых значений**

In [14]:
assembler = VectorAssembler(inputCols=numerical_cols + [c+'_ohe' for c in categorical_cols],
                           outputCol="features")

train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

Объединим признаки в один вектор с помощью VectorAssembler:

In [15]:
std_scal = StandardScaler(inputCol='features', outputCol='features_scaled')
scaler_model = std_scal.fit(train_data)

train_data = scaler_model.transform(train_data)

test_data = scaler_model.transform(test_data)

                                                                                

In [16]:
assembler_all = VectorAssembler(inputCols=['ocean_proximity_ohe'] + ['features_scaled'],
                                outputCol="all_features")
train_data = assembler_all.transform(train_data)
test_data = assembler_all.transform(test_data)

In [17]:
train_data.select('all_features').show(3)
train_data.select('features_scaled').show(3)

+--------------------+
|        all_features|
+--------------------+
|[0.0,0.0,1.0,0.0,...|
|[0.0,0.0,1.0,0.0,...|
|[0.0,0.0,1.0,0.0,...|
+--------------------+
only showing top 3 rows

+--------------------+
|     features_scaled|
+--------------------+
|[-62.130322900243...|
|[-62.130322900243...|
|[-62.115327649342...|
+--------------------+
only showing top 3 rows



In [18]:
train_data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_indexed: double (nullable = false)
 |-- ocean_proximity_ohe: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- features_scaled: vector (nullable = true)
 |-- all_features: vector (nullable = true)



# Обучение моделей

Выведем количество записей:

In [19]:
print(train_data.count(), test_data.count())

16488 4152


In [20]:
#param_grid = ParamGridBuilder() \
 #   .addGrid(lr_all.maxIter, [5, 10, 15]) \
#    .addGrid(lr_all.regParam, [0.1, 0.3, 0.5]) \
#    .addGrid(lr_all.elasticNetParam, [0.0, 0.5, 1.0]) \
#    .build()

In [21]:
#evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")


In [22]:
#crossval = CrossValidator(estimator=lr_all,
#                          estimatorParamMaps=param_grid,
#                          evaluator=evaluator,
#                         numFolds=3)

In [23]:
#cv_model = crossval.fit(train_data)


Первая модель линейной регрессии будет использовать все данные:

In [39]:
lr_all = LinearRegression(labelCol=target, featuresCol='all_features',
                          maxIter=5, regParam=0.5, elasticNetParam=0.2)
model_all = lr_all.fit(train_data)



Теперь  построим вторую модель, используя только числовые переменные, исключив категориальные:

In [40]:
lr_num = LinearRegression(labelCol=target, featuresCol='features_scaled',
                          maxIter=5, regParam=0.5, elasticNetParam=0.2)
model_num = lr_num.fit(train_data)

In [41]:
predictions_all = model_all.transform(test_data)
predictions_all.select("median_house_value", "prediction").show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           94600.0|234207.51764302014|
|          106700.0|239446.49646264437|
|           64600.0| 173065.6032408022|
|           81300.0|192700.03058803076|
|           68300.0|180151.51675253105|
|           70500.0| 195122.7207519093|
|           60000.0|164652.30008784495|
|          105900.0|195433.47105998662|
|           81800.0| 256819.1655093949|
|           75000.0| 179362.4168978295|
|           90000.0| 237780.4203266349|
|           74100.0|188318.24127857573|
|           57500.0| 179975.1039461252|
|           99600.0|218527.32707777433|
|           92800.0|238810.22582517733|
|           92800.0|240037.82603403123|
|           96100.0|196891.17773419898|
|           55000.0| 232448.8177119519|
|           70500.0|193080.24654561217|
|           75000.0|154485.37099691824|
+------------------+------------------+
only showing top 20 rows



In [42]:
predictions_num = model_num.transform(test_data)
predictions_num.select("median_house_value", "prediction").show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           94600.0|236307.85373572377|
|          106700.0| 242839.4484150107|
|           64600.0|163197.95597847214|
|           81300.0|183973.34889848623|
|           68300.0|171640.70250554022|
|           70500.0| 190096.8035226739|
|           60000.0|157088.78885085986|
|          105900.0|184895.23373889923|
|           81800.0|261496.76523847517|
|           75000.0|164843.77640673157|
|           90000.0| 237646.9290617518|
|           74100.0| 182620.3722906137|
|           57500.0| 170491.5839396366|
|           99600.0| 214339.4624919569|
|           92800.0| 238757.1980112684|
|           92800.0|239187.61035602645|
|           96100.0|187225.78062247892|
|           55000.0|233180.62397900864|
|           70500.0|186308.16009927355|
|           75000.0|139711.29061584175|
+------------------+------------------+
only showing top 20 rows



# Анализ результатов

In [43]:
evaluator_rmse = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
rmse_num = evaluator_rmse.evaluate(predictions_num)
rmse_all = evaluator_rmse.evaluate(predictions_all)
print(f"RMSE: {rmse_num}")
print(f"RMSE_all: {rmse_all}")


RMSE: 71728.72621594628
RMSE_all: 72809.54270996408


RMSE — это мера среднего отклонения прогнозируемых значений от фактических значений. Чем ниже RMSE, тем лучше производительность модели.

Показатели  модели с числовыми даннымаи показывает лучшие результаты 

In [44]:
evaluator_mae = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="mae")
mae_all = evaluator_mae.evaluate(predictions_all)
mae_num = evaluator_mae.evaluate(predictions_num)
print(f"MAE: {mae_num}")
print(f"MAE_all: {mae_all}")


MAE: 51907.0410571168
MAE_all: 52923.32361287323


Он представляет собой среднюю абсолютную разницу между прогнозируемыми и фактическими значениями. 

Как видим, согласно значениям метрики MAE модель с числовыми значениями   работает точнее.

In [45]:
evaluator_r2 = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")
r2_all = evaluator_r2.evaluate(predictions_all)
r2_num = evaluator_r2.evaluate(predictions_num)
print(f"R2: {r2_num}")
print(f"R2_all: {r2_all}")


R2: 0.6190369669760517
R2_all: 0.6074696826341224


Как видим, согласно значениям метрики R2 первая модель работает чуть точнее

**Вывод**

В рамках данной работы мы:

- Инициализировали локальную Spark-сессию
- Провели предобработку данных, используя методы pySpark
- Исследовали данные на наличие пропусков и избавились от них
- Преобразовали колонку с категориальными значениями техникой One hot encoding.
- Провели стандартизацию количественных признаков
- Построив две модели линейной регрессии на разных наборах данных:

* используя все данные из файла;

* используя только числовые переменные, исключив категориальные.

Мы выяснили, что согласно метрикам RMSE, MAE и R2 

точнее работает  модель, использующая только числовые данные из файла.
