## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

В колонках датасета содержатся следующие данные:

`longitude` — широта;

`latitude` — долгота;

`housing_median_age` — медианный возраст жителей жилого массива;

`total_rooms` — общее количество комнат в домах жилого массива;

`total_bedrooms` — общее количество спален в домах жилого массива;

`population` — количество человек, которые проживают в жилом массиве;

`households` — количество домовладений в жилом массиве;

`median_income` — медианный доход жителей жилого массива;

`median_house_value` — медианная стоимость дома в жилом массиве;

`ocean_proximity` — близость к океану.

In [1]:
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import seaborn as sns
from pyspark.ml.feature import OneHotEncoder 
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
RANDOM_SEED = 2022

In [3]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("EDA California Housing") \
                    .getOrCreate()

In [4]:
df_housing = spark.read.load('/datasets/housing.csv',
                                        format='csv',
                                             sep=',',
                                    inferSchema=True,
                                         header=True)



                                                                                

Прочитаем содеримое файла.

In [5]:
df_housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Выведем типы данных используя `dtypes`.

In [6]:
print(pd.DataFrame(df_housing.dtypes, columns=['column', 'type']).head(10))

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


In [7]:
df_housing.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


## Подготовка данных

Проыерим данные на наличие пропусков.

In [8]:
columns = df_housing.columns

In [9]:
for column in columns:
    print(column, df_housing.where(F.isnan(column) | F.col(column).isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


In [10]:
#df_housing = df_housing.na.drop(how='any')

In [11]:
total_bedrooms_median = df_housing.approxQuantile('total_bedrooms', [0.5], 0)[0]

In [12]:
df_housing = df_housing.fillna(total_bedrooms_median, subset=['total_bedrooms'])

Проверим удаление данные после удаления пропусков.

In [13]:
for column in columns:
    print(column, df_housing.where(F.isnan(column) | F.col(column).isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Разделим колонки на два типа: текстовые и числовые, которые представляют категориальные данные.

In [14]:
categorical_cols = ['ocean_proximity']
numerical_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']
target = 'median_house_value'

Приступим к трансформации признаков. Преобразуем колонку с категориальными значениями с помощью OneHotEncoder и  StringIndexer.
Начнём с трансформации категориальных признаков в числовые с помощью StringIndexer

In [15]:
#indexer = StringIndexer(inputCols=categorical_cols,
#                       outputCols=[c+'_idx' for c in categorical_cols])

#df_housing = indexer.fit(df_housing).transform(df_housing)

Далее выполним OneHotEncoder.

In [16]:
#ohe = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
#                        outputCols=[c+'_ohe' for c in categorical_cols])

#df_housing = ohe.fit(df_housing).transform(df_housing)
#df_housing.toPandas().head()

Объединим признаки с помощью верктора VectorAssembler

In [17]:
#categorical_assembler = \
#VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols],
#               outputCol='categorical_features')
#df_housing=categorical_assembler.transform(df_housing)

## Обучение моделей

Трансформируем числовые признаки, чтобы выбросы не влияли на работу модели.

In [18]:
#numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
#df_housing = numerical_assembler.transform(df_housing)
#standardScaler = StandardScaler(inputCol='numerical_features',outputCol="numerical_features_scaled")
#df_housing = standardScaler.fit(df_housing).transform(df_housing) 

Соберём полученные категориальные и числовые признаки с помощью VectorAssembler

In [19]:
#all_features = ['categorical_features', 'numerical_features_scaled']

#final_assembler = VectorAssembler(inputCols=all_features, outputCol='features')
#df_housing = final_assembler.transform(df_housing)

In [20]:
#df_housing.select(all_features).show(3)

Необходимо построить две модели линейной регрессии на разных наборах данных. Разделим наш датасет на две части - обучающую и тестовую, чтобы протестировать качество модели. Для каждого метода будем использовать метод `RandomSplit()`. Выведем количество записей выборок с помощью метода `count()`.

In [21]:
train_data, test_data = df_housing.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())

16418 4222


In [22]:
indexer_train = StringIndexer(inputCols=categorical_cols, 
                              outputCols=[c+'_idx' for c in categorical_cols], handleInvalid="skip")
indexer_model = indexer_train.fit(train_data)
train_data = indexer_model.transform(train_data)
test_data = indexer_model.transform(test_data)

                                                                                

In [23]:
#indexer_train = StringIndexer(inputCols=categorical_cols,
#                       outputCols=[c+'_idx' for c in categorical_cols])
#model = indexer_train.fit(train_data)
#train_data = model.transform(train_data)
#test_data = indexer_train.transform(test_data)

In [24]:
#indexer_test = StringIndexer(inputCols=categorical_cols,
#                       outputCols=[c for c in categorical_cols])

#test_data = indexer_test.fit(test_data).transform(test_data)
#indexer_test.fit(train_data)
#test_data = indexer_test.transform(test_data)

In [25]:
ohe_train = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols], 
                        outputCols=[c+'_ohe' for c in categorical_cols])

train_data = ohe_train.fit(train_data).transform(train_data)
# train_data.toPandas().head()
test_data.show(5) 
test_data.schema

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|     NEAR OCEAN|                2.0|
|  -124.23|   40.81|              52.0|     1112.0|         209.0|     544.0|     172.0|       3.3462|           50800.0|     NEAR OCEAN|                2.0|
|  -124.21|   40.75|              32.0|     1218.0|         331.0|     620.0|     268.0|       1.6528|           58100.0|     NEAR OCEAN|                2.0|
|  -124.21|   41.77|              17.0|     3461.0| 

StructType(List(StructField(longitude,DoubleType,true),StructField(latitude,DoubleType,true),StructField(housing_median_age,DoubleType,true),StructField(total_rooms,DoubleType,true),StructField(total_bedrooms,DoubleType,false),StructField(population,DoubleType,true),StructField(households,DoubleType,true),StructField(median_income,DoubleType,true),StructField(median_house_value,DoubleType,true),StructField(ocean_proximity,StringType,true),StructField(ocean_proximity_idx,DoubleType,false)))

In [26]:
ohe_test = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
                        outputCols=[c+'_ohe' for c in categorical_cols])

test_data = ohe_test.fit(test_data).transform(test_data)
#test_data.toPandas().head()
test_data.show(5)
test_data.schema

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|  -124.23|   40.81|              52.0|     1112.0|         209.0|     544.0|     172.0|       3.3462|           50800.0|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|  -124.21|   40.75|              32.0|     1218.0|         331.0|     620.0|     268.0|       1.6528|        

StructType(List(StructField(longitude,DoubleType,true),StructField(latitude,DoubleType,true),StructField(housing_median_age,DoubleType,true),StructField(total_rooms,DoubleType,true),StructField(total_bedrooms,DoubleType,false),StructField(population,DoubleType,true),StructField(households,DoubleType,true),StructField(median_income,DoubleType,true),StructField(median_house_value,DoubleType,true),StructField(ocean_proximity,StringType,true),StructField(ocean_proximity_idx,DoubleType,false),StructField(ocean_proximity_ohe,VectorUDT,true)))

In [27]:
categorical_assembler_train = \
VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols],
               outputCol='categorical_features')
train_data=categorical_assembler_train.transform(train_data)

In [28]:
categorical_assembler_test = \
VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols],
               outputCol='categorical_features')
test_data=categorical_assembler_test.transform(test_data)

In [29]:
numerical_assembler_train = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
train_data = numerical_assembler_train.transform(train_data)
standardScaler_train = StandardScaler(inputCol='numerical_features',outputCol="numerical_features_scaled")
train_data = standardScaler_train.fit(train_data).transform(train_data) 

                                                                                

In [30]:
numerical_assembler_test = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
test_data = numerical_assembler_test.transform(test_data)
standardScaler_test = StandardScaler(inputCol='numerical_features',outputCol="numerical_features_scaled")
test_data = standardScaler_test.fit(test_data).transform(test_data) 

In [31]:
all_features_train = ['categorical_features', 'numerical_features_scaled']

final_assembler_train = VectorAssembler(inputCols=all_features_train, outputCol='features')
train_data = final_assembler_train.transform(train_data)

In [32]:
all_features_test = ['categorical_features', 'numerical_features_scaled']

final_assembler_test = VectorAssembler(inputCols=all_features_test, outputCol='features')
test_data = final_assembler_test.transform(test_data)

In [33]:
train_data.select(all_features_train).show(3)

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       [0.0,0.0,1.0]|     [-61.952887791441...|
|       [0.0,0.0,1.0]|     [-61.927977100733...|
|       [0.0,0.0,1.0]|     [-61.913030686308...|
+--------------------+-------------------------+
only showing top 3 rows



In [34]:
test_data.select(all_features_test).show(3)

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       [0.0,0.0,1.0]|     [-62.454275773333...|
|       [0.0,0.0,1.0]|     [-62.419104419318...|
|       [0.0,0.0,1.0]|     [-62.409055461027...|
+--------------------+-------------------------+
only showing top 3 rows



Первая модель линейной регрессии будет использовать все данные.

In [35]:
lr_1 = LinearRegression(labelCol=target, featuresCol = 'features', regParam=0.0)

In [36]:
model = lr_1.fit(train_data)

22/12/27 20:12:28 WARN Instrumentation: [c094c34d] regParam is zero, which might cause numerical instability and overfitting.
22/12/27 20:12:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/12/27 20:12:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/12/27 20:12:29 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/12/27 20:12:29 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

Сохраним трансформационную таблицу с колонкой предсказания первой модели в переменной `predictions`.

In [37]:
predictions = model.transform(test_data)

In [38]:
predictions_col = 'prediction'

Теперь построим вторую модель исключив категориальные данные и используя только чичловые данные.

In [39]:
lr_2 = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled', regParam=0.0)

In [40]:
model_2 = lr_2.fit(train_data)

22/12/27 20:12:32 WARN Instrumentation: [3826657a] regParam is zero, which might cause numerical instability and overfitting.


In [41]:
predictions_2 = model_2.transform(test_data)

## Анализ результатов

Создадим объект класса `RegressionEvaluator`.

In [42]:
evaluator = RegressionEvaluator(predictionCol=predictions_col, labelCol = target)

Сравним результы работы линейной регрессии на двух наборах данных по нижеперечисленным метрикам:

**RMSE** 

В результате вычисления MSE (среднеквадратического отклоаения), мы получим число, единица измерения которого — квадрат исходной единицы измерения (например, «квадратные рубли»). Чтобы получить метрику качества в исходных единицах измерения, берут корень от среднеквадратичной ошибки — RMSE (root mean squared error). Чем меньше среднеквадратичное отклонение, тем лучше модель
регрессии.

Для первой модели учитываются все признаки

In [43]:
evaluator.evaluate(predictions, {evaluator.metricName: 'rmse'})

70817.73155473004

Для второй модели учитывается только количественные признаки

In [44]:
evaluator.evaluate(predictions_2, {evaluator.metricName: 'rmse'})

76682.78889978507

Согласно RMSE первая модель работает точнее.

**MAE**

Метрика, которая сообщает нам среднюю абсолютную разницу между прогнозируемыми значениями и фактическими значениями в наборе данных. Чем ниже MAE, тем лучше модель соответствует набору данных.

Для первой модели учитываются все признаки

In [45]:
evaluator.evaluate(predictions, {evaluator.metricName: 'mae'})

54983.47532843328

Для второй модели учитывается только количественные признаки

In [46]:
evaluator.evaluate(predictions_2, {evaluator.metricName: 'mae'})

62392.8157185057

Согласно MAE - первая модель работает точнее.

**R2**

Коэффициент детерминации, или метрика R2, вычисляет долю среднеквадратичной ошибки модели от MSE среднего, а затем вычитает эту величину из единицы. Увеличение метрики означает прирост качества модели.

Для первой модели учитываются все признаки

In [47]:
evaluator.evaluate(predictions, {evaluator.metricName: 'r2'})

0.6288949168810403

Для второй модели учитывается только количественные признаки

In [48]:
evaluator.evaluate(predictions_2, {evaluator.metricName: 'r2'})

0.5648803670145031

In [49]:
spark.stop()

Согласно R2 - первая модель работает точнее.

## **Выводы:**

В рамках работы над проектом сделаны следующие действия:

* Инициализирована локальная Spark-сессия;
* Исследованы данные на наличие пропусков. Пропуски были удалены в связи с их небольшой численностью;
* Преобразована колонка с категориальными значениями техникой One hot encoding;
* Построены две модели линейной регрессии на разных наборах данных;
* Были сравнены результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2. 

Построив две модели линейной регрессии на разных наборах данных (используя все данные и только числовые, исключив при этом категориальные), мы выяснили, что все три метрики, а именно RMSE, MAE и R2 указывают на то, что лучше работает первая модель, которая содерджит в себе все признаки.