<h1>Содержание<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Описание-данных" data-toc-modified-id="Описание-данных-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Описание данных</a></span></li><li><span><a href="#Загрузка-библиотек-и-данных" data-toc-modified-id="Загрузка-библиотек-и-данных-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Загрузка библиотек и данных</a></span></li><li><span><a href="#Предобработка" data-toc-modified-id="Предобработка-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Предобработка</a></span></li><li><span><a href="#Обучение-модели" data-toc-modified-id="Обучение-модели-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Обучение модели</a></span><ul class="toc-item"><li><span><a href="#LogisticRegression-все-данные" data-toc-modified-id="LogisticRegression-все-данные-4.1"><span class="toc-item-num">4.1&nbsp;&nbsp;</span>LogisticRegression все данные</a></span></li></ul></li></ul></div>

## Описание данных

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году

В колонках датасета содержатся следующие данные:
- longitude — широта;
- latitude — долгота;
- housing_median_age — медианный возраст жителей жилого массива;
- total_rooms — общее количество комнат в домах жилого массива;
- total_bedrooms — общее количество спален в домах жилого массива;
- population — количество человек, которые проживают в жилом массиве;
- households — количество домовладений в жилом массиве;
- median_income — медианный доход жителей жилого массива;
- median_house_value — медианная стоимость дома в жилом массиве;
- ocean_proximity — близость к океану.

На основе данных нужно предсказать медианную стоимость дома в жилом массиве — median_house_value. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

## Загрузка библиотек и данных

In [1]:
import pandas as pd 
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit


pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator

In [2]:
RANDOM_SEED = 15347

In [3]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("EDA California Housing") \
                    .getOrCreate()

In [4]:
df_housing = spark.read.load('./housing.csv', format="csv", sep=",", inferSchema=True, header="true")
df_housing.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

## Предобработка  

Проверим данные на пропуски.

In [5]:
df_housing.describe().toPandas()

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


В столбце `total_bedrooms` есть пропуски, заполнять их чем попало не получится, нужно просто удалить, учитывая что это всего 200 строк, примерно 1%

In [6]:
df_housing = df_housing.dropna()

In [7]:
df_housing.describe().toPandas()

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433
1,mean,-119.57068859198068,35.63322125972706,28.633093525179856,2636.5042333480155,537.8705525375618,1424.9469485635982,499.43346547252,3.8711616013312273,206864.4131551901,
2,stddev,2.003577890751096,2.1363476663779872,12.591805202182837,2185.269566977601,421.3850700740312,1133.2084897449597,382.2992258828481,1.899291249306247,115435.66709858322,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


Теперь можно перевести категориальные значения в понятные для модели цифры через OHE.

In [8]:
indexer = StringIndexer(inputCols=['ocean_proximity'], 
                        outputCols=['ocean_proximity_idx']) 
df_housing = indexer.fit(df_housing).transform(df_housing)

encoder = OneHotEncoder(inputCols=['ocean_proximity_idx'],
                        outputCols=['ocean_proximity_ohe'])
df_housing = encoder.fit(df_housing).transform(df_housing)

categorical_assembler =VectorAssembler(inputCols=['ocean_proximity_ohe'],
                                        outputCol="categorical_features")
df_housing = categorical_assembler.transform(df_housing)

In [9]:
df_housing.toPandas().sample(5)

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,ocean_proximity_idx,ocean_proximity_ohe,categorical_features
2972,-119.05,35.32,11.0,7035.0,1455.0,3525.0,1387.0,3.4827,93600.0,INLAND,1.0,"(0.0, 1.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)"
19969,-119.3,34.39,35.0,3079.0,579.0,1807.0,589.0,4.69,199300.0,NEAR OCEAN,2.0,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"
4151,-118.21,34.12,35.0,1937.0,439.0,1523.0,412.0,3.5638,170500.0,<1H OCEAN,0.0,"(1.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)"
15571,-122.44,37.78,37.0,1235.0,314.0,481.0,297.0,3.6875,492300.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)","(0.0, 0.0, 0.0, 1.0)"
12190,-116.8,33.52,3.0,830.0,145.0,272.0,104.0,3.8281,163500.0,INLAND,1.0,"(0.0, 1.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)"


Теперь можно и числовые столбцы привести к одному размеру.

In [10]:
numerical_assembler = VectorAssembler(inputCols=['longitude',
                                                 'latitude',
                                                 'housing_median_age',
                                                 'total_rooms',
                                                 'total_bedrooms',
                                                 'population',
                                                 'households',
                                                 'median_income'],
                                      outputCol="numerical_features")
df_housing = numerical_assembler.transform(df_housing) 
standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled")

df_housing = standardScaler.fit(df_housing).transform(df_housing) 

In [11]:
df_housing.toPandas().sample(5)

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,ocean_proximity_idx,ocean_proximity_ohe,categorical_features,numerical_features,numerical_features_scaled
15453,-116.77,32.82,16.0,3688.0,817.0,1969.0,767.0,2.875,222900.0,<1H OCEAN,0.0,"(1.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","[-116.77, 32.82, 16.0, 3688.0, 817.0, 1969.0, ...","[-58.28073894158694, 15.362668032232689, 1.270..."
11124,-117.96,33.81,35.0,1153.0,192.0,884.0,208.0,5.2384,177400.0,<1H OCEAN,0.0,"(1.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","[-117.96, 33.81, 35.0, 1153.0, 192.0, 884.0, 2...","[-58.87467641988178, 15.826075751669324, 2.779..."
11472,-118.01,33.78,19.0,2648.0,478.0,1160.0,452.0,5.9357,207400.0,<1H OCEAN,0.0,"(1.0, 0.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","[-118.01, 33.78, 19.0, 2648.0, 478.0, 1160.0, ...","[-58.899631776112656, 15.812033093504576, 1.50..."
19804,-119.19,36.06,29.0,1815.0,376.0,1421.0,339.0,1.9091,71300.0,INLAND,1.0,"(0.0, 1.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","[-119.19, 36.06, 29.0, 1815.0, 376.0, 1421.0, ...","[-59.48857818316132, 16.87927511402531, 2.3030..."
19870,-120.38,37.99,36.0,2864.0,603.0,1155.0,565.0,2.3571,113400.0,INLAND,1.0,"(0.0, 1.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","[-120.38, 37.99, 36.0, 2864.0, 603.0, 1155.0, ...","[-60.082515661456156, 17.782686122624003, 2.85..."


Сибираем все вместе для будущего обучения.

In [12]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol='features') 
df_all_features = final_assembler.transform(df_housing)

In [13]:
df_all_features = df_all_features.select('features','median_house_value')

In [14]:
df_all_features.show(3)

+--------------------+------------------+
|            features|median_house_value|
+--------------------+------------------+
|[0.0,0.0,0.0,1.0,...|          452600.0|
|[0.0,0.0,0.0,1.0,...|          358500.0|
|[0.0,0.0,0.0,1.0,...|          352100.0|
+--------------------+------------------+
only showing top 3 rows



In [15]:
all_features = ['numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol='features') 
df_numerical = final_assembler.transform(df_housing)

In [16]:
df_numerical = df_numerical.select('features','median_house_value')

In [17]:
df_numerical.show(3)

+--------------------+------------------+
|            features|median_house_value|
+--------------------+------------------+
|[-61.005863841998...|          452600.0|
|[-61.000872770752...|          358500.0|
|[-61.010854913244...|          352100.0|
+--------------------+------------------+
only showing top 3 rows



Теперь делим на обучающую и проверочную выборки.

In [18]:
train_data_all, test_data_all = df_all_features.randomSplit([.8,.2], seed=RANDOM_SEED)
train_data_num, test_data_num = df_numerical.randomSplit([.8,.2], seed=RANDOM_SEED)

Отлично, все готово для будущего обучения.

Для обучения модели будем использовать несолько подходов:
1. Обучать нразные модели
2. Использовать разное количество данных.

## Обучение модели

### LogisticRegression все данные

In [19]:
lr = LinearRegression(featuresCol = 'features', labelCol='median_house_value')

In [20]:
regParam = []
for i in range(1,10):
    regParam.append(i/10)

In [21]:
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, range(1,51,10))\
                              .addGrid(lr.regParam, regParam)\
                              .addGrid(lr.elasticNetParam, regParam)\
                              .build()

In [22]:
evaluator = RegressionEvaluator(labelCol='median_house_value',metricName='r2')
valid = TrainValidationSplit(estimator=lr,
                             estimatorParamMaps=paramGrid,
                             evaluator=evaluator,
                             trainRatio=0.8,
                             seed=RANDOM_SEED)

In [23]:
model = valid.fit(train_data_all) 

In [24]:
predictions_all = model.transform(test_data_all)

In [25]:
predictions_all.show(2)

+--------------------+------------------+------------------+
|            features|median_house_value|        prediction|
+--------------------+------------------+------------------+
|[0.0,0.0,0.0,0.0,...|          300000.0| 257342.7080652197|
|[0.0,0.0,0.0,1.0,...|          285600.0|301754.82220735215|
+--------------------+------------------+------------------+
only showing top 2 rows



In [26]:
r2_all = RegressionEvaluator(labelCol='median_house_value',metricName='r2').evaluate(predictions_all.select("median_house_value", "prediction"))
mae_all = RegressionEvaluator(labelCol='median_house_value',metricName='mae').evaluate(predictions_all.select("median_house_value", "prediction"))
rmse_all = RegressionEvaluator(labelCol='median_house_value',metricName='rmse').evaluate(predictions_all.select("median_house_value", "prediction"))
print(f'метрика R2: {r2_all}')
print(f'метрика MAE: {mae_all}')
print(f'метрика RMSE: {rmse_all}')

метрика R2: 0.6474558513841693
метрика MAE: 49115.02886728671
метрика RMSE: 67568.09816102922


In [27]:
model = lr.fit(train_data_num)
predictions_num = model.transform(test_data_num)
predictions_num.show(2)

+--------------------+------------------+------------------+
|            features|median_house_value|        prediction|
+--------------------+------------------+------------------+
|[-62.039015589956...|          103600.0|102631.68488194514|
|[-61.994095948741...|           58100.0|110260.94553724024|
+--------------------+------------------+------------------+
only showing top 2 rows



In [28]:
r2_num = RegressionEvaluator(labelCol='median_house_value',metricName='r2').evaluate(predictions_num.select("median_house_value", "prediction"))
mae_num = RegressionEvaluator(labelCol='median_house_value',metricName='mae').evaluate(predictions_num.select("median_house_value", "prediction"))
rmse_num = RegressionEvaluator(labelCol='median_house_value',metricName='rmse').evaluate(predictions_num.select("median_house_value", "prediction"))
print(f'метрика R2: {r2_num}')
print(f'метрика MAE: {mae_num}')
print(f'метрика RMSE: {rmse_num}')

метрика R2: 0.6368685856991194
метрика MAE: 51410.59167405428
метрика RMSE: 70226.61891562853


In [29]:
print('test')

test
