## Описание проекта

В проекте нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Нужно обучить модель и сделать предсказания на тестовой выборке. Для оценки качества модели нужно использовать метрики RMSE, MAE и R2.

### Описание дасета.

В колонках датасета содержатся следующие данные:

`longitude` — широта;  
`latitude` — долгота;  
`housing_median_age` — медианный возраст жителей жилого массива;  
`total_rooms` — общее количество комнат в домах жилого массива;  
`total_bedrooms` — общее количество спален в домах жилого массива;  
`population` — количество человек, которые проживают в жилом массиве;  
`households` — количество домовладений в жилом массиве;  
`median_income` — медианный доход жителей жилого массива;  
`median_house_value` — медианная стоимость дома в жилом массиве;  
`ocean_proximity` — близость к океану.

На основе данных нужно предсказать медианную стоимость дома в жилом массиве — `median_house_value`. 

### План проекта

1. Инициализировать локальную Spark-сессию.
2. Вывести типы данных колонок датасета, используя методы pySpark.
3. Вывести типы данных колонок датасета, используйте методы pySpark
4. Выполнить предобработку данных:
    * Исследовать данные на наличие пропусков и заполнить их, выбрав значения по своему усмотрению.
    * Преобразовать колонку с категориальными значениями техникой `One hot encoding`.
5. Построить две модели линейной регрессии на разных наборах данных:
    * используя все данные из файла;
    * используя только числовые переменные, исключив категориальные.   
    Для построения модели использовать оценщик `LinearRegression` из библиотеки MLlib. 
6. Сравнить результаты работы линейной регрессии на двух наборах данных по метрикам `RMSE`, `MAE` и `R2`. Сделать выводы.



## Подготовка данных

### Подключаем библиотеки

In [250]:
import pandas as pd 
import numpy as np
from joblib import dump
import warnings
warnings.filterwarnings('ignore')

import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.float_format', lambda x: '%.3f' % x)

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

RANDOM_SEED = 42

### Инициализируем Spark-сессию

In [251]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("California Housing") \
                    .getOrCreate()

### Загружаем датасет

In [252]:
data_calif = spark.read.load('/datasets/housing.csv',
                                        format='csv',
                                             sep=',',
                                    inferSchema=True,
                                         header=True)

#вывод схемы датафрейма
data_calif.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



### Выводим типы данных колонок датасета

In [253]:
pd.DataFrame(data_calif.dtypes, columns=['column', 'type'])

Unnamed: 0,column,type
0,longitude,double
1,latitude,double
2,housing_median_age,double
3,total_rooms,double
4,total_bedrooms,double
5,population,double
6,households,double
7,median_income,double
8,median_house_value,double
9,ocean_proximity,string


Имеется всего один качественный признак `ocean_proximity`, имеющий тип `string`

### Первичный анализ данных

In [254]:
# выводим первые пять строк датасета
pd.DataFrame(data_calif.take(5), columns=data_calif.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-122.230,-122.220,-122.240,-122.250,-122.250
latitude,37.880,37.860,37.850,37.850,37.850
housing_median_age,41.000,21.000,52.000,52.000,52.000
total_rooms,880.000,7099.000,1467.000,1274.000,1627.000
total_bedrooms,129.000,1106.000,190.000,235.000,280.000
population,322.000,2401.000,496.000,558.000,565.000
households,126.000,1138.000,177.000,219.000,259.000
median_income,8.325,8.301,7.257,5.643,3.846
median_house_value,452600.000,358500.000,352100.000,341300.000,342200.000
ocean_proximity,NEAR BAY,NEAR BAY,NEAR BAY,NEAR BAY,NEAR BAY


In [255]:
# выводим размеры датасета
print('Количество строк данных', data_calif.count())
print('Количество столбцов данных', len(data_calif.columns))

Количество строк данных 20640
Количество столбцов данных 10


In [256]:
# проверим данные, содержащиеся в единственном категориальном признаке
data_calif.groupBy('ocean_proximity').count().toPandas()

                                                                                

Unnamed: 0,ocean_proximity,count
0,ISLAND,5
1,NEAR OCEAN,2658
2,NEAR BAY,2290
3,<1H OCEAN,9136
4,INLAND,6551


Большинство недвижимости расположено в пределах часа езды от океана или на берегу океана. Тем не менее, вдалеке от океана расположено более 6500 домовладений. Обращает на себя очень маленькое количество домов на островах. С этими данными нужно быть острожнее.

In [257]:
# отбор числовых колонок датасета
small_tab = pd.DataFrame(data_calif.take(5), columns=data_calif.columns)
numeric_cols = small_tab.select_dtypes(include=[np.number])
data_calif_numeric_cols = numeric_cols.columns.values.tolist()
print('Столбцы с количественными признаками: \n', data_calif_numeric_cols)

Столбцы с количественными признаками: 
 ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value']


In [258]:
# описание числовых колонок датасета
data_calif.select(data_calif_numeric_cols).summary().toPandas().T

                                                                                

Unnamed: 0,0,1,2,3,4,5,6,7
summary,count,mean,stddev,min,25%,50%,75%,max
longitude,20640,-119.56970445736148,2.003531723502584,-124.35,-121.8,-118.49,-118.01,-114.31
latitude,20640,35.6318614341087,2.135952397457101,32.54,33.93,34.26,37.71,41.95
housing_median_age,20640,28.639486434108527,12.58555761211163,1.0,18.0,29.0,37.0,52.0
total_rooms,20640,2635.7630813953488,2181.6152515827944,2.0,1447.0,2127.0,3146.0,39320.0
total_bedrooms,20433,537.8705525375618,421.38507007403115,1.0,296.0,435.0,647.0,6445.0
population,20640,1425.4767441860465,1132.46212176534,3.0,787.0,1166.0,1724.0,35682.0
households,20640,499.5396802325581,382.3297528316098,1.0,280.0,409.0,605.0,6082.0
median_income,20640,3.8706710029070246,1.899821717945263,0.4999,2.5625,3.5347,4.7426,15.0001
median_house_value,20640,206855.81690891474,115395.61587441359,14999.0,119600.0,179700.0,264700.0,500001.0


В столбце `total_bedrooms` выявлены пропуски.

### Устраняем пропуски данных

In [259]:
# посмотрим внимательнее на пропуски
data_calif.filter(data_calif.total_bedrooms.isNull()).toPandas().head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.16,37.77,47.0,1256.0,,570.0,218.0,4.375,161900.0,NEAR BAY
1,-122.17,37.75,38.0,992.0,,732.0,259.0,1.62,85100.0,NEAR BAY
2,-122.28,37.78,29.0,5154.0,,3741.0,1273.0,2.576,173400.0,NEAR BAY
3,-122.24,37.75,45.0,891.0,,384.0,146.0,4.949,247100.0,NEAR BAY
4,-122.1,37.69,41.0,746.0,,387.0,161.0,3.906,178400.0,NEAR BAY


In [260]:
print('Количество пропусков', data_calif.filter(data_calif.total_bedrooms.isNull()).count())

Количество пропусков 207


In [261]:
# заменим пропуски средним значением
mean = data_calif.select(F.mean('total_bedrooms')).collect()[0][0]
data_calif = data_calif.na.fill({'total_bedrooms': mean})
print('Количество пропусков', data_calif.filter(data_calif.total_bedrooms.isNull()).count())

Количество пропусков 0


### Разделяем датасет на выборки

In [262]:
train, test = data_calif.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train.count(), test.count())

16560 4080


In [263]:
pd.DataFrame(train.take(5), columns=train.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.350,-124.300,-124.270,-124.260,-124.250
latitude,40.540,41.800,40.690,40.580,40.280
housing_median_age,52.000,19.000,36.000,52.000,32.000
total_rooms,1820.000,2672.000,2349.000,2217.000,1430.000
total_bedrooms,300.000,552.000,528.000,394.000,419.000
population,806.000,1298.000,1194.000,907.000,434.000
households,270.000,478.000,465.000,369.000,187.000
median_income,3.015,1.980,2.518,2.357,1.942
median_house_value,94600.000,85800.000,79000.000,111400.000,76100.000
ocean_proximity,NEAR OCEAN,NEAR OCEAN,NEAR OCEAN,NEAR OCEAN,NEAR OCEAN


In [264]:
pd.DataFrame(test.take(5), columns=test.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.300,-124.230,-124.230,-124.190,-124.180
latitude,41.840,40.540,41.750,40.730,40.780
housing_median_age,17.000,52.000,11.000,21.000,34.000
total_rooms,2677.000,2694.000,3159.000,5694.000,1592.000
total_bedrooms,531.000,453.000,616.000,1056.000,364.000
population,1244.000,1152.000,1343.000,2907.000,950.000
households,456.000,435.000,479.000,972.000,317.000
median_income,3.031,3.081,2.481,3.536,2.161
median_house_value,103600.000,106700.000,73200.000,90100.000,67000.000
ocean_proximity,NEAR OCEAN,NEAR OCEAN,NEAR OCEAN,NEAR OCEAN,NEAR OCEAN


### Преобразование категориальных значений

#### StringIndexer  - преобразуем категориальный признак ocean_proximity из строковых значений в числовые.

In [265]:
indexer = StringIndexer(inputCol = 'ocean_proximity', 
                        outputCol= 'ocean_proximity_idx')

indexerModel = indexer.fit(train)

train_idx = indexerModel.transform(train)
train_idx = train_idx.drop('ocean_proximity')

test_idx = indexerModel.transform(test)
test_idx = test_idx.drop('ocean_proximity')

In [266]:
train_idx.groupBy('ocean_proximity_idx').count().toPandas()

                                                                                

Unnamed: 0,ocean_proximity_idx,count
0,0.0,7268
1,1.0,5316
2,4.0,4
3,3.0,1855
4,2.0,2117


In [267]:
train_idx.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity_idx: double (nullable = false)



In [268]:
test_idx.groupBy('ocean_proximity_idx').count().toPandas()

                                                                                

Unnamed: 0,ocean_proximity_idx,count
0,0.0,1868
1,1.0,1235
2,4.0,1
3,3.0,435
4,2.0,541


In [269]:
test_idx.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity_idx: double (nullable = false)



#### OneHotEncoding - продолжаем преобразование категориального признака

In [270]:
encoder = OneHotEncoder(inputCol = 'ocean_proximity_idx', 
                        outputCol = 'ocean_proximity_ohe')
encoderModel = encoder.fit(train_idx)

train_ohe = encoderModel.transform(train_idx)

test_ohe = encoderModel.transform(test_idx)

In [271]:
pd.DataFrame(train_ohe.take(5), columns=train_ohe.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.350,-124.300,-124.270,-124.260,-124.250
latitude,40.540,41.800,40.690,40.580,40.280
housing_median_age,52.000,19.000,36.000,52.000,32.000
total_rooms,1820.000,2672.000,2349.000,2217.000,1430.000
total_bedrooms,300.000,552.000,528.000,394.000,419.000
population,806.000,1298.000,1194.000,907.000,434.000
households,270.000,478.000,465.000,369.000,187.000
median_income,3.015,1.980,2.518,2.357,1.942
median_house_value,94600.000,85800.000,79000.000,111400.000,76100.000
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000


In [272]:
pd.DataFrame(test_ohe.take(5), columns=test_ohe.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.300,-124.230,-124.230,-124.190,-124.180
latitude,41.840,40.540,41.750,40.730,40.780
housing_median_age,17.000,52.000,11.000,21.000,34.000
total_rooms,2677.000,2694.000,3159.000,5694.000,1592.000
total_bedrooms,531.000,453.000,616.000,1056.000,364.000
population,1244.000,1152.000,1343.000,2907.000,950.000
households,456.000,435.000,479.000,972.000,317.000
median_income,3.031,3.081,2.481,3.536,2.161
median_house_value,103600.000,106700.000,73200.000,90100.000,67000.000
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000


#### VectorAssembler - преобразуем признак в вектор

In [273]:
categorical_assembler = \
    VectorAssembler(inputCols=['ocean_proximity_ohe'],
                    outputCol='ocean_proximity_va')

train_va = categorical_assembler.transform(train_ohe)
test_va = categorical_assembler.transform(test_ohe)

In [274]:
pd.DataFrame(train_va.take(5), columns=train_va.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.350,-124.300,-124.270,-124.260,-124.250
latitude,40.540,41.800,40.690,40.580,40.280
housing_median_age,52.000,19.000,36.000,52.000,32.000
total_rooms,1820.000,2672.000,2349.000,2217.000,1430.000
total_bedrooms,300.000,552.000,528.000,394.000,419.000
population,806.000,1298.000,1194.000,907.000,434.000
households,270.000,478.000,465.000,369.000,187.000
median_income,3.015,1.980,2.518,2.357,1.942
median_house_value,94600.000,85800.000,79000.000,111400.000,76100.000
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000


In [275]:
pd.DataFrame(test_va.take(5), columns=test_va.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.300,-124.230,-124.230,-124.190,-124.180
latitude,41.840,40.540,41.750,40.730,40.780
housing_median_age,17.000,52.000,11.000,21.000,34.000
total_rooms,2677.000,2694.000,3159.000,5694.000,1592.000
total_bedrooms,531.000,453.000,616.000,1056.000,364.000
population,1244.000,1152.000,1343.000,2907.000,950.000
households,456.000,435.000,479.000,972.000,317.000
median_income,3.031,3.081,2.481,3.536,2.161
median_house_value,103600.000,106700.000,73200.000,90100.000,67000.000
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000


### StandardScaler - масштабирование числовых признаков

Нам задано предсказать медианную стоимость дома, поэтому целевой переменой будет `median_house_value`

In [276]:
# убираем таргет из выборок
target_train = train_va.select('median_house_value')
features_train = train_va.drop('median_house_value')


target_test =  test_va.select('median_house_value')
features_test = test_va.drop('median_house_value')


In [277]:
pd.DataFrame(features_train.take(5), columns=features_train.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.350,-124.300,-124.270,-124.260,-124.250
latitude,40.540,41.800,40.690,40.580,40.280
housing_median_age,52.000,19.000,36.000,52.000,32.000
total_rooms,1820.000,2672.000,2349.000,2217.000,1430.000
total_bedrooms,300.000,552.000,528.000,394.000,419.000
population,806.000,1298.000,1194.000,907.000,434.000
households,270.000,478.000,465.000,369.000,187.000
median_income,3.015,1.980,2.518,2.357,1.942
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000
ocean_proximity_ohe,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"


In [278]:
pd.DataFrame(target_train.take(5), columns=target_train.columns).T

Unnamed: 0,0,1,2,3,4
median_house_value,94600.0,85800.0,79000.0,111400.0,76100.0


In [279]:
pd.DataFrame(features_test.take(5), columns=features_test.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.300,-124.230,-124.230,-124.190,-124.180
latitude,41.840,40.540,41.750,40.730,40.780
housing_median_age,17.000,52.000,11.000,21.000,34.000
total_rooms,2677.000,2694.000,3159.000,5694.000,1592.000
total_bedrooms,531.000,453.000,616.000,1056.000,364.000
population,1244.000,1152.000,1343.000,2907.000,950.000
households,456.000,435.000,479.000,972.000,317.000
median_income,3.031,3.081,2.481,3.536,2.161
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000
ocean_proximity_ohe,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"


In [280]:
pd.DataFrame(target_test.take(5), columns=target_test.columns).T

Unnamed: 0,0,1,2,3,4
median_house_value,103600.0,106700.0,73200.0,90100.0,67000.0


In [281]:
data_calif_numeric_cols.remove('median_house_value')
print(data_calif_numeric_cols)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']


In [282]:
numerical_assembler = VectorAssembler(inputCols=data_calif_numeric_cols,
                                      outputCol='numerical_features')


features_train = numerical_assembler.transform(features_train)
features_test = numerical_assembler.transform(features_test)

standard_scaler = StandardScaler(inputCol='numerical_features',
                                 outputCol='numerical_features_scaled')

scalerModel = standard_scaler.fit(features_train)

features_train = scalerModel.transform(features_train)
features_test = scalerModel.transform(features_test)

In [283]:
features_train.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- ocean_proximity_idx: double (nullable = false)
 |-- ocean_proximity_ohe: vector (nullable = true)
 |-- ocean_proximity_va: vector (nullable = true)
 |-- numerical_features: vector (nullable = true)
 |-- numerical_features_scaled: vector (nullable = true)



In [284]:
pd.DataFrame(features_train.take(5), columns=features_train.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.350,-124.300,-124.270,-124.260,-124.250
latitude,40.540,41.800,40.690,40.580,40.280
housing_median_age,52.000,19.000,36.000,52.000,32.000
total_rooms,1820.000,2672.000,2349.000,2217.000,1430.000
total_bedrooms,300.000,552.000,528.000,394.000,419.000
population,806.000,1298.000,1194.000,907.000,434.000
households,270.000,478.000,465.000,369.000,187.000
median_income,3.015,1.980,2.518,2.357,1.942
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000
ocean_proximity_ohe,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"


In [285]:
pd.DataFrame(features_test.take(5), columns=features_test.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.300,-124.230,-124.230,-124.190,-124.180
latitude,41.840,40.540,41.750,40.730,40.780
housing_median_age,17.000,52.000,11.000,21.000,34.000
total_rooms,2677.000,2694.000,3159.000,5694.000,1592.000
total_bedrooms,531.000,453.000,616.000,1056.000,364.000
population,1244.000,1152.000,1343.000,2907.000,950.000
households,456.000,435.000,479.000,972.000,317.000
median_income,3.031,3.081,2.481,3.536,2.161
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000
ocean_proximity_ohe,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"


### Преобразование категориальных и численных признаков в один вектор

In [286]:
# все признаки
all_features = ['ocean_proximity_va', 'numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features,
                                  outputCol='all_features')

features_train = final_assembler.transform(features_train)
features_test = final_assembler.transform(features_test)

pd.DataFrame(features_train.select(all_features).take(5), columns=['categorical','numerical'])

Unnamed: 0,categorical,numerical
0,"(0.0, 0.0, 1.0, 0.0)","[-61.93195228665311, 18.90166089690822, 4.13040546908436, 0.8371996643785807, 0.7178732476340831, 0.7395727660314623, 0.7094142631969466, 1.6007431444753377]"
1,"(0.0, 0.0, 1.0, 0.0)","[-61.90705001392024, 19.48913235053684, 1.5091866137039007, 1.2291195072634988, 1.3208867756467129, 1.1910241318968213, 1.2559259918820018, 1.0511796208968807]"
2,"(0.0, 0.0, 1.0, 0.0)","[-61.89210865028051, 18.97159797472115, 2.8595114785968647, 1.0805395668270803, 1.2634569158359863, 1.0955953878927618, 1.2217690088391857, 1.3369526531576783]"
3,"(0.0, 0.0, 1.0, 0.0)","[-61.88712819573394, 18.920310784325, 4.13040546908436, 1.0198195911688537, 0.9428068652260958, 0.8322487578046356, 0.9695328263691604, 1.2515711897843296]"
4,"(0.0, 0.0, 1.0, 0.0)","[-61.88214774118737, 18.78043662869914, 2.541787980974991, 0.6577997362974564, 1.0026296358622695, 0.3982314894015566, 0.49133506376973707, 1.0310024094031789]"


In [287]:
pd.DataFrame(features_test.select(all_features).take(5), columns=['categorical','numerical'])

Unnamed: 0,categorical,numerical
0,"(0.0, 0.0, 1.0, 0.0)","[-61.90705001392024, 19.507782237953624, 1.3503248648929638, 1.231419506341462, 1.2706356483123271, 1.1414745917408673, 1.198121866732621, 1.6095574000225863]"
1,"(0.0, 0.0, 1.0, 0.0)","[-61.87218683209422, 18.90166089690822, 4.13040546908436, 1.2392395032065366, 1.0839886039274655, 1.057056856660353, 1.1429452018173027, 1.6357346770394152]"
2,"(0.0, 0.0, 1.0, 0.0)","[-61.87218683209422, 19.465819991265864, 0.8737396184601531, 1.453139417457108, 1.4740330684753173, 1.2323154153601164, 1.2585534521160644, 1.317094029213877]"
3,"(0.0, 0.0, 1.0, 0.0)","[-61.85226501390792, 18.99024786213793, 1.6680483625148377, 2.619238949984417, 2.5269138316719726, 2.6674169117288593, 2.553891347509008, 1.8777019211888868]"
4,"(0.0, 0.0, 1.0, 0.0)","[-61.84728455936135, 19.01356022140891, 2.7006497297859275, 0.7323197064234619, 0.8710195404626875, 0.8717048731140064, 0.8329048941978966, 1.1472868651168813]"


In [288]:
pd.DataFrame(features_train.take(5), columns=features_test.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.350,-124.300,-124.270,-124.260,-124.250
latitude,40.540,41.800,40.690,40.580,40.280
housing_median_age,52.000,19.000,36.000,52.000,32.000
total_rooms,1820.000,2672.000,2349.000,2217.000,1430.000
total_bedrooms,300.000,552.000,528.000,394.000,419.000
population,806.000,1298.000,1194.000,907.000,434.000
households,270.000,478.000,465.000,369.000,187.000
median_income,3.015,1.980,2.518,2.357,1.942
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000
ocean_proximity_ohe,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"


In [289]:
pd.DataFrame(features_test.take(5), columns=features_test.columns).T

Unnamed: 0,0,1,2,3,4
longitude,-124.300,-124.230,-124.230,-124.190,-124.180
latitude,41.840,40.540,41.750,40.730,40.780
housing_median_age,17.000,52.000,11.000,21.000,34.000
total_rooms,2677.000,2694.000,3159.000,5694.000,1592.000
total_bedrooms,531.000,453.000,616.000,1056.000,364.000
population,1244.000,1152.000,1343.000,2907.000,950.000
households,456.000,435.000,479.000,972.000,317.000
median_income,3.031,3.081,2.481,3.536,2.161
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000
ocean_proximity_ohe,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"


In [291]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
w=Window.orderBy(lit(1))

features_train = features_train.withColumn("id",row_number().over(w))
target_train = target_train.withColumn("id",row_number().over(w))
#pd.DataFrame(target_train.take(5), columns=target_train.columns).T
train = features_train.join(target_train, features_train.id == target_train.id, 'inner').drop(target_train.id)
train = train.drop('id')
features_test = features_test.withColumn("id",row_number().over(w))
target_test = target_test.withColumn("id",row_number().over(w))
#pd.DataFrame(target_test.take(5), columns=target_test.columns).T
test = features_test.join(target_test, features_test.id == target_test.id, 'inner').drop(target_test.id)
test = test.drop('id')


In [292]:
pd.DataFrame(train.take(5), columns=train.columns).T

23/03/23 21:41:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:41:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Unnamed: 0,0,1,2,3,4
longitude,-124.350,-124.300,-124.270,-124.260,-124.250
latitude,40.540,41.800,40.690,40.580,40.280
housing_median_age,52.000,19.000,36.000,52.000,32.000
total_rooms,1820.000,2672.000,2349.000,2217.000,1430.000
total_bedrooms,300.000,552.000,528.000,394.000,419.000
population,806.000,1298.000,1194.000,907.000,434.000
households,270.000,478.000,465.000,369.000,187.000
median_income,3.015,1.980,2.518,2.357,1.942
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000
ocean_proximity_ohe,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"


In [293]:
pd.DataFrame(test.take(5), columns=test.columns).T

23/03/23 21:41:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:41:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Unnamed: 0,0,1,2,3,4
longitude,-124.300,-124.230,-124.230,-124.190,-124.180
latitude,41.840,40.540,41.750,40.730,40.780
housing_median_age,17.000,52.000,11.000,21.000,34.000
total_rooms,2677.000,2694.000,3159.000,5694.000,1592.000
total_bedrooms,531.000,453.000,616.000,1056.000,364.000
population,1244.000,1152.000,1343.000,2907.000,950.000
households,456.000,435.000,479.000,972.000,317.000
median_income,3.031,3.081,2.481,3.536,2.161
ocean_proximity_idx,2.000,2.000,2.000,2.000,2.000
ocean_proximity_ohe,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"


## Обучение моделей

In [294]:
target = 'median_house_value'

### Обучение модели для всех признаков

In [295]:
lr = LinearRegression(featuresCol = "all_features", labelCol = target)

In [296]:
grid_search = ParamGridBuilder() \
.addGrid(lr.regParam, [0.0, 0.01, 0.1]) \
.addGrid(lr.fitIntercept, [False, True])\
.addGrid(lr.elasticNetParam, [0.5, 1.0]) \
.build()

In [297]:
evaluator = RegressionEvaluator(predictionCol='prediction',
                                labelCol=target)

In [298]:
cv = CrossValidator(estimator = lr,
                    estimatorParamMaps = grid_search,
                    numFolds = 3,
                    evaluator = evaluator)
cv_model = cv.fit(train)

23/03/23 21:41:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:41:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:41:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:41:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:41:46 WARN Instrumentation: [7249a0a5] regParam is zero, which might cause numerical instability and overfitting.
23/03/23 21:41:47 WARN Instrumentation: [2a197565] regParam is zero, which might cause numerical instability and overfitting.
23/03/23 21:41:48 WARN Instrumentation: [c229a6ce] regParam is zero, which might cause n

In [299]:
cv_model.bestModel.extractParamMap()

{Param(parent='LinearRegression_bca0155ddab6', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LinearRegression_bca0155ddab6', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.5,
 Param(parent='LinearRegression_bca0155ddab6', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35,
 Param(parent='LinearRegression_bca0155ddab6', name='featuresCol', doc='features column name.'): 'all_features',
 Param(parent='LinearRegression_bca0155ddab6', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LinearRegression_bca0155ddab6', name='labelCol', doc='label column name.'): 'median_house_value',
 Param(parent='LinearRegression_bca0155ddab6', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.

In [300]:
lr_best_param_all = LinearRegression(featuresCol = "all_features",
                                 labelCol = target,
                                 elasticNetParam=0.5,
                                 aggregationDepth=2,
                                 epsilon = 1.35,
                                 fitIntercept = True,
                                 maxIter = 100,
                                 regParam = 0.0,
                                 solver = 'auto',
                                 standardization = True,
                                 tol = 1e-06)

In [301]:
model_best_param_all = lr_best_param.fit(train)

23/03/23 21:42:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:04 WARN Instrumentation: [61f4bae1] regParam is zero, which might cause numerical instability and overfitting.
23/03/23 21:42:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:04 WARN WindowExec: No Partition D

In [302]:
predictions_all = model_best_param_all.transform(test)

predictedLabes_all = predictions_all.select("median_house_value", "prediction")

In [303]:
predictedLabes_all.show(5)

23/03/23 21:42:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|150428.91117827035|
|          106700.0|217515.79788495088|
|           73200.0|125091.61647185031|
|           90100.0|194918.60577875702|
|           67000.0|152202.44785278616|
+------------------+------------------+
only showing top 5 rows



### Обучение модели для числовых признаков

In [308]:
lr_num = LinearRegression(featuresCol = "numerical_features_scaled", labelCol = target)

In [309]:
grid_search = ParamGridBuilder() \
.addGrid(lr_num.regParam, [0.0, 0.01, 0.1]) \
.addGrid(lr_num.fitIntercept, [False, True])\
.addGrid(lr_num.elasticNetParam, [0.5, 1.0]) \
.build()

In [310]:
evaluator = RegressionEvaluator(predictionCol='prediction',
                                labelCol=target)

In [311]:
cv = CrossValidator(estimator = lr_num,
                    estimatorParamMaps = grid_search,
                    numFolds = 3,
                    evaluator = evaluator)
cv_model = cv.fit(train)

23/03/23 21:42:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:06 WARN Instrumentation: [c44cbf5d] regParam is zero, which might cause numerical instability and overfitting.
23/03/23 21:42:07 WARN Instrumentation: [a6b43e23] regParam is zero, which might cause numerical instability and overfitting.
23/03/23 21:42:07 WARN Instrumentation: [8731ad8c] regParam is zero, which might cause n

In [312]:
cv_model.bestModel.extractParamMap()

{Param(parent='LinearRegression_c71b2b5d0cd8', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LinearRegression_c71b2b5d0cd8', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.5,
 Param(parent='LinearRegression_c71b2b5d0cd8', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35,
 Param(parent='LinearRegression_c71b2b5d0cd8', name='featuresCol', doc='features column name.'): 'numerical_features_scaled',
 Param(parent='LinearRegression_c71b2b5d0cd8', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LinearRegression_c71b2b5d0cd8', name='labelCol', doc='label column name.'): 'median_house_value',
 Param(parent='LinearRegression_c71b2b5d0cd8', name='loss', doc='The loss function to be optimized. Supported options: squared

In [313]:
lr_best_param_num = LinearRegression(featuresCol = "numerical_features_scaled",
                                 labelCol = target,
                                 elasticNetParam=0.5,
                                 aggregationDepth=2,
                                 epsilon = 1.35,
                                 fitIntercept = True,
                                 loss = 'squaredError',
                                 maxIter = 100,
                                 regParam = 0.1,
                                 solver = 'auto',
                                 standardization = True,
                                 tol = 1e-06)

In [314]:
model_best_param_num = lr_best_param_num.fit(train)

23/03/23 21:42:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 2

In [315]:
predictions_numerical = model_best_param_num.transform(test)

predictedLabes_numerical = predictions_numerical.select("median_house_value", "prediction")

In [316]:
predictedLabes_numerical.show(5)

23/03/23 21:42:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|100681.19753087033|
|          106700.0|190702.41079243226|
|           73200.0| 74610.29043128574|
|           90100.0| 162196.9290324282|
|           67000.0|119374.48418606771|
+------------------+------------------+
only showing top 5 rows



## Анализ результатов

Качество алгоритмов регрессии оценим, используя RegressionEvaluator, который работает с DataFrame API и имеет набор метрик для оценки.

In [320]:
results = []

In [321]:
def eval_metric(pred_data, model_name):
    rmse = RegressionEvaluator(predictionCol="prediction", labelCol="median_house_value", metricName='rmse').evaluate(pred_data)
    mae = RegressionEvaluator(predictionCol="prediction", labelCol="median_house_value", metricName='mae').evaluate(pred_data)
    r2 = RegressionEvaluator(predictionCol="prediction", labelCol="median_house_value", metricName='r2').evaluate(pred_data)
    return model_name, rmse, mae, r2

In [322]:
results.append(eval_metric(predictedLabes_all, 'all_data'))

23/03/23 21:42:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [323]:
results.append(eval_metric(predictedLabes_numerical, 'numerical'))

23/03/23 21:42:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/23 21:42:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


<font color='blue'><b>Комментарий ревьюера :</b></font> ✔️\
<font color='green'>Тестирование проведено верно.</font>

In [324]:
results = pd.DataFrame(results, columns=['model', 'rmse', 'mae', 'r2'])
display(results)

Unnamed: 0,model,rmse,mae,r2
0,all_data,70781.23,50855.05,0.638
1,numerical,71789.086,51786.2,0.628


## Вывод

При построении модели линейной регрессии на всех данных были получены следующие результаты:

    RMSE  =  70781.230
    MAE   =  50855.050
    R2    =  0.638
        
При построении модели линейной регрессии только на числовых данных были получены следующие результаты:

    RMSE  =  71789.086
    MAE   =  51786.200
    R2    =  0.628
    
RMSE - корень от средней квадратической ошибки. В свою очередь средняя квадратическая ошибка - это величина равная сумме квадратов отклонений (разница между предсказанием и фактической величиной целевого признака), разделенной на количество объектов. Чем точнее модель, тем больше значение RMSE будет стремиться к нулю. Лучшие показатели у модели, учитывающей категориальные признаки.

Среднее абсолютное значение или MAE - это величина, равная сумме всех модулей отклонений (разница между предсказанием и фактическим значением целевого признака), разделенная на количество объектов в выборке.
Соответственно, для  идеально работающей модели, MAE будет равен 0. Поэтому, чем меньше будет значение этой метрики, тем точнее модель. Лучшие показатели у модели, учитывающей категориальные признаки.

Коэффициент детерминации R2 равен единице, когда модель идеально предсказывает результат, а 0 - когда работает также, как и среднее. Соответственно, чем выше R2, тем точнее модель. Лучшие показатели у модели, учитывающей категориальные признаки.

В целом  обе полученные модели показывают неплохой результат. Но модель, учитывающая категориальные признаки незначительно лучше.

In [325]:
spark.stop()