## Предсказание стоимости жилья

В проекте нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Необходимо обучить модель на всех признаках и отдельно на численных
и сделайте предсказания на тестовой выборке. Для оценки качества модели будут использованы метрики RMSE, MAE и R2.

Сначала инициируем Spark-сессию и прочитаем датасет.

### Импорт библиотек

In [1]:
import pandas as pd
import numpy as np


In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.window import Window 
import pyspark.sql.functions as F
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.feature import Imputer


In [3]:
pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator


In [4]:
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as F

### Запуск spark сессии

In [5]:
RANDOM_SEED = 2022

In [6]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("California - median_house_value") \
                    .getOrCreate()
# Запускаем спарк сессию

22/08/05 14:43:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/08/05 14:43:08 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


### Чтение фаила и первичное знакомство с ним

Теперь прочитаем фаил.

In [7]:
df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True) 

                                                                                

In [8]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [9]:
df.toPandas() 

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY
...,...,...,...,...,...,...,...,...,...,...
20635,-121.09,39.48,25.0,1665.0,374.0,845.0,330.0,1.5603,78100.0,INLAND
20636,-121.21,39.49,18.0,697.0,150.0,356.0,114.0,2.5568,77100.0,INLAND
20637,-121.22,39.43,17.0,2254.0,485.0,1007.0,433.0,1.7000,92300.0,INLAND
20638,-121.32,39.43,18.0,1860.0,409.0,741.0,349.0,1.8672,84700.0,INLAND


### Вывод

В рамках данного раздела импортировали библиотеки, запустили spark сессию, прочитали фаил и провели первичное знакомство с данными.

В колонках датасета содержатся следующие данные:

- longitude — широта;
- latitude — долгота;
- housing_median_age — медианный возраст жителей жилого массива;
- total_rooms — общее количество комнат в домах жилого массива;
- total_bedrooms — общее количество спален в домах жилого массива;
- population — количество человек, которые проживают в жилом массиве;
- households — количество домовладений в жилом массиве;
- median_income — медианный доход жителей жилого массива;
- median_house_value — медианная стоимость дома в жилом массиве;
- ocean_proximity — близость к океану.

## Подготовка данных

In [10]:
df.toPandas().isna().sum() 

longitude               0
latitude                0
housing_median_age      0
total_rooms             0
total_bedrooms        207
population              0
households              0
median_income           0
median_house_value      0
ocean_proximity         0
dtype: int64

### Заполнение пропусков

In [13]:
avg_bedrooms = df.select(F.mean('total_bedrooms')).collect()[0][0]
df = df.fillna(avg_bedrooms)

In [15]:
df.printSchema()

root
 |-- longitude: double (nullable = false)
 |-- latitude: double (nullable = false)
 |-- housing_median_age: double (nullable = false)
 |-- total_rooms: double (nullable = false)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = false)
 |-- households: double (nullable = false)
 |-- median_income: double (nullable = false)
 |-- median_house_value: double (nullable = false)
 |-- ocean_proximity: string (nullable = true)



In [16]:
df.toPandas().isna().sum() 

longitude             0
latitude              0
housing_median_age    0
total_rooms           0
total_bedrooms        0
population            0
households            0
median_income         0
median_house_value    0
ocean_proximity       0
dtype: int64

### Преобразование категорильной калонки

Преобразуем единственную категорильную калонку "ocean_proximity" методом One hot encoding

In [17]:
categorical_cols = ['ocean_proximity']

In [18]:
indexer = StringIndexer(inputCols=categorical_cols, 
                        outputCols=[c+'_idx' for c in categorical_cols]) 
df = indexer.fit(df).transform(df)

cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
df.select(cols).show(3)

                                                                                

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
+---------------+-------------------+
only showing top 3 rows



In [19]:
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
                        outputCols=[c+'_ohe' for c in categorical_cols])
df = encoder.fit(df).transform(df)

cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
df.select(cols).show(3)

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows



In [20]:
categorical_assembler = \
        VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols],
                                        outputCol="categorical_features")
df = categorical_assembler.transform(df)

### Преобразование числовых колонок

In [21]:
numerical_cols  = ["longitude", "latitude", "housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income"]

In [22]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
df = numerical_assembler.transform(df)

In [23]:
standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
df = standardScaler.fit(df).transform(df)

                                                                                

Посмотрим все получившиеся колонки.

In [24]:
print(df.columns)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'ocean_proximity', 'ocean_proximity_idx', 'ocean_proximity_ohe', 'categorical_features', 'numerical_features', 'numerical_features_scaled']


### Разделение на "все признаки" и "только численные"

In [25]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, outputCol="features") 
df = final_assembler.transform(df)

df.select(all_features).show(3) 

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[3],[1.0])|     [-61.007269596069...|
|       (4,[3],[1.0])|     [-61.002278409814...|
|       (4,[3],[1.0])|     [-61.012260782324...|
+--------------------+-------------------------+
only showing top 3 rows



### Вывод


Заполнили пропуски и преобразовали категорильные колонки в числовые, кроме того разделили признаки на "все" и "только числвые" для дальнейших расчетов. 

## Обучение моделей

### Разделение на выборки

#### Для всех признаков

In [26]:
train_data, test_data = df.randomSplit([.6,.4], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())
valid_data, test_data = test_data.randomSplit([.5,.5], seed=RANDOM_SEED)
print(valid_data.count(), test_data.count())


                                                                                

12336 8304


                                                                                

4179 4125


### Обучение модели для всех признаков и получение предсказаний

In [27]:
target = "median_house_value"

In [28]:
lr_all = LinearRegression(labelCol=target, featuresCol='features')
lr_all = lr_all.fit(train_data)

22/08/05 14:43:39 WARN Instrumentation: [18aaef33] regParam is zero, which might cause numerical instability and overfitting.
22/08/05 14:43:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/08/05 14:43:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/08/05 14:43:39 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/08/05 14:43:39 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

In [29]:
predictions_all = lr_all.transform(valid_data)

predictedLabes = predictions_all.select("median_house_value", "prediction")
predictedLabes.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           58100.0|142723.99962627888|
|           66900.0|121476.27087807655|
|           72200.0| 164083.9573687613|
|           67000.0|154777.36265781522|
|           60000.0| 141867.8965705037|
|          105900.0|136181.90761566162|
|          109400.0|172157.41575792432|
|           85400.0|190969.63880082965|
|          104200.0|200277.44609490037|
|           75100.0| 162250.0834891498|
|          128100.0|222080.56191834807|
|           72600.0|163387.86765423417|
|           94800.0|229814.06801205873|
|           86900.0| 153482.1420776546|
|           92800.0|  208770.843100518|
|           90200.0|  154838.494589746|
|           55000.0|214280.03166013956|
|           97900.0|137420.30935016274|
|           82100.0| 160589.9170615673|
|           74100.0|136744.73066720366|
+------------------+------------------+
only showing top 20 rows



                                                                                

### Обучение модели для численных признаков и получение предсказаний

In [30]:
lr_num = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')
lr_num = lr_num.fit(train_data)
predictions_num = lr_num.transform(valid_data)

predictedLabes_num = predictions_num.select("median_house_value", "prediction")
predictedLabes_num.show()

22/08/05 14:43:43 WARN Instrumentation: [f89d0458] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           58100.0|109120.29268488381|
|           66900.0|  69174.9659396438|
|           72200.0|129475.19255082961|
|           67000.0|120384.96173414215|
|           60000.0|108808.63557971409|
|          105900.0| 83167.03531967942|
|          109400.0|118631.36746329069|
|           85400.0|157278.98781010322|
|          104200.0|166026.52563168434|
|           75100.0|135866.01440439466|
|          128100.0|190763.57758687576|
|           72600.0|128029.67120921286|
|           94800.0|209622.86844796827|
|           86900.0| 117767.4650675077|
|           92800.0|173870.55173100485|
|           90200.0|115230.45061197039|
|           55000.0|191735.34472666914|
|           97900.0|  96353.1072069509|
|           82100.0|122449.57607704261|
|           74100.0| 98906.71486711456|
+------------------+------------------+
only showing top 20 rows



### Вывод

Обучили две модели линейной регрессии

## Анализ результатов

### Расчет RMSE, R2 и MAE.

Модель для всех признаков.

In [31]:
trainingSummary = lr_all.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
print("MAE: %f" % trainingSummary.meanAbsoluteError)

RMSE: 69188.942637
r2: 0.640472
MAE: 49902.578610


Модель для всех числовых признаков.

In [32]:
trainingSummary = lr_num.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
print("MAE: %f" % trainingSummary.meanAbsoluteError)

RMSE: 70146.730697
r2: 0.630449
MAE: 51055.167802


### Вывод

Рассчитали RMSE,R2 и МАЕ для моделей обученых на "всех" признаках и на моделях обученых только на числовых признаках. Как мы видим показатели идентичные, но модель для всех признаков показала результат чуть лучше.

## Тестирование моделей

### Модель для всех признаков

In [33]:
lr_all = LinearRegression(labelCol=target, featuresCol='features')
lr_all = lr_all.fit(train_data)
predictions_all = lr_all.transform(test_data)

predictedLabes_all = predictions_all.select("median_house_value", "prediction")
predictedLabes_all.show()

22/08/05 14:43:46 WARN Instrumentation: [d723b0d7] regParam is zero, which might cause numerical instability and overfitting.


+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           85800.0|117359.57445809245|
|          103600.0| 153757.9799311757|
|          106700.0|217409.22498613596|
|           50800.0| 214777.3330861032|
|           68400.0|133638.21631896496|
|           69000.0|178217.14557304978|
|           70500.0| 145056.2179094851|
|           81300.0|153020.32594528794|
|           70500.0|164014.49734935164|
|           62500.0|165826.48828193545|
|           76900.0|162894.61694589257|
|           74100.0| 149831.1982563436|
|           74700.0| 166799.0329067707|
|           69500.0| 114976.4316585958|
|           79600.0|161462.03105822206|
|           90000.0| 209460.9872572422|
|           74100.0|156522.28617674112|
|           67500.0| 147836.4513669014|
|          103100.0| 52238.53210002184|
|           92500.0|167574.46961587667|
+------------------+------------------+
only showing top 20 rows



                                                                                

In [34]:
trainingSummary = lr_all.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
print("MAE: %f" % trainingSummary.meanAbsoluteError)

RMSE: 69188.942637
r2: 0.640472
MAE: 49902.578610


### Модель для численных признаков

In [35]:
lr_num = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')
lr_num = lr_num.fit(train_data)
predictions_num = lr_num.transform(test_data)

predictedLabes_num = predictions_num.select("median_house_value", "prediction")
predictedLabes_num.show()

22/08/04 15:29:56 WARN Instrumentation: [afe1d3d6] regParam is zero, which might cause numerical instability and overfitting.


+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           85800.0|65315.973193401005|
|          103600.0|101694.52138374606|
|          106700.0|189510.28086764552|
|           50800.0|182870.42707846267|
|           68400.0| 81242.01378327422|
|           69000.0| 144368.9662286099|
|           70500.0|109950.83582656318|
|           81300.0|117785.30803561024|
|           70500.0|129821.69004094554|
|           62500.0| 132167.2546706386|
|           76900.0|129047.64332940197|
|           74100.0|116672.57423091121|
|           74700.0|133053.90313441912|
|           69500.0| 61562.48707794584|
|           79600.0|129837.99241570896|
|           90000.0|175242.85064450232|
|           74100.0| 122014.7358749723|
|           67500.0| 112853.5739462534|
|          103100.0|-3596.581487144809|
|           92500.0| 140491.7545566922|
+------------------+------------------+
only showing top 20 rows



In [36]:
trainingSummary = lr_num.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
print("MAE: %f" % trainingSummary.meanAbsoluteError)

RMSE: 70146.730697
r2: 0.630449
MAE: 51055.167802


In [46]:
testSummary = lr_num.evaluate(test_data)
print("RMSE: %f" % testSummary.rootMeanSquaredError)
print("r2: %f" % testSummary.r2)
print("MAE: %f" % testSummary.meanAbsoluteError)

RMSE: 69412.101679
r2: 0.642115
MAE: 50761.367072


### Вывод

Провели тестирование на тестовой выборке.

## Общий вывод

В рамках проделанной работы обучили 2 модели: на "всех" признаках и только на численных.
Модель, обученая для всех признаков показала результат чуть лучше:

- RMSE: 69188.94
- R2: 0.6404
- MAE: 49902.57.