## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

# Подготовка данных

Создадим spark-сессию, загрузим данные.

In [1]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, Imputer

RANDOM_SEED = 2022

In [2]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("Housing California - Linear regression") \
                    .getOrCreate()

df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True) 

Посмотрим на данные

In [3]:
df.printSchema() 

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [4]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [5]:
df.summary().show()

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|    total_bedrooms|        population|       households|     median_income|median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|  count|              20640|            20640|             20640|             20640|             20433|             20640|            20640|             20640|             20640|          20640|
|   mean|-119.56970445736148| 35.6318614341087|28.639486434108527|2635.7630813953488| 537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|206855.81690891474|           null|
| stddev|  2.0035317

Проверим, есть ли дубликаты

In [6]:
def print_na(df):
    for i in range(len(df.columns)):
        col = df.columns[i]
        print(col, ':', df.filter(df[col].isNull()).count(), 'duplicates')
        
print_na(df)

longitude : 0 duplicates
latitude : 0 duplicates
housing_median_age : 0 duplicates
total_rooms : 0 duplicates
total_bedrooms : 207 duplicates
population : 0 duplicates
households : 0 duplicates
median_income : 0 duplicates
median_house_value : 0 duplicates
ocean_proximity : 0 duplicates


Так как признак числовой и пропусков немного относительно общего количества измерений, заменим пропуски чувствительной к выбросам медианой.

In [7]:
imputer = Imputer(
                    inputCols=['total_bedrooms'], 
                    outputCols=['total_bedrooms']
                  ).setStrategy("median")

# Add imputation cols to df
df = imputer.fit(df).transform(df)
print_na(df)

longitude : 0 duplicates
latitude : 0 duplicates
housing_median_age : 0 duplicates
total_rooms : 0 duplicates
total_bedrooms : 0 duplicates
population : 0 duplicates
households : 0 duplicates
median_income : 0 duplicates
median_house_value : 0 duplicates
ocean_proximity : 0 duplicates


Мы решаем задачу линейной регрессии, для которой нормализация признаков является полезным шагом в предобработке данных. Давайте сделаем ее.

Сперва определим массивы категориальных и числовых признаков, а также отведем под таргет отдельную переменную.

In [8]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude',
                     'latitude',
                     'housing_median_age',
                     'total_rooms',
                     'total_bedrooms',
                     'population',
                     'households',
                     'median_income']
target = "median_house_value" 

Теперь скомпонуем численные признаки в один и проведем нормализацию.

In [9]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
df = numerical_assembler.transform(df) 

standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
df = standardScaler.fit(df).transform(df) 

In [10]:
df.select('numerical_features', 'numerical_features_scaled').show(5) 

+--------------------+-------------------------+
|  numerical_features|numerical_features_scaled|
+--------------------+-------------------------+
|[-122.23,37.88,41...|     [-61.007269596069...|
|[-122.22,37.86,21...|     [-61.002278409814...|
|[-122.24,37.85,52...|     [-61.012260782324...|
|[-122.25,37.85,52...|     [-61.017251968579...|
|[-122.25,37.85,52...|     [-61.017251968579...|
+--------------------+-------------------------+
only showing top 5 rows



Вектор нормализованных числовых величин получен. Теперь приведем единственный нечисловой признак в числовой нормализованный.

Для начала переведем его просто в числовой

In [11]:
indexer = StringIndexer(inputCol=categorical_cols[0], outputCol=categorical_cols[0]+'_idx') 
df = indexer.fit(df).transform(df)

In [12]:
cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
df.select(cols).show(5) 

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
+---------------+-------------------+
only showing top 5 rows



А теперь проведем one-hot кодирование.

In [13]:
encoder = OneHotEncoder(inputCol=categorical_cols[0]+'_idx', outputCol=categorical_cols[0]+'_ohe') 
df = encoder.transform(df)

In [14]:
cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
df.select(cols).show(5) 

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
+---------------+-------------------+-------------------+
only showing top 5 rows



In [15]:
categorical_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols],
                                        outputCol="categorical_features")
df = categorical_assembler.transform(df) 

Теперь объединим наши нормализованные признаки в один

In [16]:
all_features = ['categorical_features', 'numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 
df = final_assembler.transform(df)

df.select(all_features).show(3) 

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[3],[1.0])|     [-61.007269596069...|
|       (4,[3],[1.0])|     [-61.002278409814...|
|       (4,[3],[1.0])|     [-61.012260782324...|
+--------------------+-------------------------+
only showing top 3 rows



Предобработка данных завершена.

# Обучение моделей

Разделим наш датасет на обучающую и тестовую выборки

In [17]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count()) 

16437 4203


Обучим модели на двух наборах признаков:

* на всем наборе признаков
* только на числовых признаках

In [18]:
lr = LinearRegression(labelCol=target, featuresCol='features')
model_all_feat = lr.fit(train_data) 

In [19]:
lr = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')
model_num = lr.fit(train_data)

In [20]:
predictions_num = model_num.transform(test_data)

predictedLabes_num = predictions_num.select(target, "prediction")
predictedLabes_num.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           85800.0| 64335.38324976526|
|          111400.0| 164099.3713788488|
|           58100.0| 109437.6758287712|
|           64600.0| 99163.18136291625|
|           82800.0| 138039.6829711632|
|           85100.0|149032.18297593342|
|           85600.0| 155144.2789014033|
|           76900.0|130175.94708645158|
|           92700.0|151077.77760828426|
|           74100.0|117696.14210933307|
|           96000.0|133306.03774422454|
|           74100.0|122696.04789372301|
|          128100.0| 191197.0836423263|
|          100500.0| 131254.7389358543|
|           99600.0|152051.76741494937|
|           81000.0|130351.66840665368|
|           81100.0|117054.72467243299|
|           55000.0|191942.16273756092|
|           87500.0| 90024.59904904803|
|          105800.0|132973.44725472527|
+------------------+------------------+
only showing top 20 rows



In [21]:
predictions_all_feat = model_all_feat.transform(test_data)

predictedLabes_all_feat = predictions_all_feat.select(target, "prediction")
predictedLabes_all_feat.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           85800.0| 113670.8475707327|
|          111400.0|190777.60007122857|
|           58100.0|140720.86547184316|
|           64600.0| 131828.1151816314|
|           82800.0|170537.15062440932|
|           85100.0|178015.10192344664|
|           85600.0|187074.03952357592|
|           76900.0|161951.21830386063|
|           92700.0|182728.51984754857|
|           74100.0|148809.99381590448|
|           96000.0|169338.50511733722|
|           74100.0|155104.09917140426|
|          128100.0|220375.56650389126|
|          100500.0|164388.00971795898|
|           99600.0|184968.98043534718|
|           81000.0|163428.54040131997|
|           81100.0|150709.90114873275|
|           55000.0| 213133.6186054917|
|           87500.0|125118.97478940058|
|          105800.0| 167362.7770756455|
+------------------+------------------+
only showing top 20 rows



По 20 строкам есть ощущение, что модели у нас ужаснейшие:) Посмотрим на метрики качества.

# Анализ результатов

Вычислим значение следующих метрик:

* корень из среднеквадратического отклонения
* среднее абсолютное отклонение 
* коэффициент детерминации


Посмотрим на их значение на каждом из двух наборов признаков

In [22]:
rmse = RegressionEvaluator(labelCol=target, metricName='rmse').evaluate(predictions_all_feat)
mae = RegressionEvaluator(labelCol=target, metricName='mae').evaluate(predictions_all_feat)
r2 = RegressionEvaluator(labelCol=target, metricName='r2').evaluate(predictions_all_feat)

print('rmse: ', rmse)
print('mae: ', mae)
print('r2: ', r2)

rmse:  68515.56613000772
mae:  49397.559157762924
r2:  0.6452125451023688


In [23]:
rmse = RegressionEvaluator(labelCol=target, metricName='rmse').evaluate(predictions_num)
mae = RegressionEvaluator(labelCol=target, metricName='mae').evaluate(predictions_num)
r2 = RegressionEvaluator(labelCol=target, metricName='r2').evaluate(predictions_num)

print('rmse: ', rmse)
print('mae: ', mae)
print('r2: ', r2)

rmse:  69532.16421088148
mae:  50567.73967674745
r2:  0.6346061375691843


Как видно по коэффициенту детерминации, наши модели предсказывают на весьма неплохом уровне. Также мы видим, что модель, в которой есть категориальный признак, не особо лучше второй модели, что говорит о невысокой значимости категориального признака.

# Вывод

В этом спринте была проведена некоторая работа по предобработке данных и построению моделей с помощью библиотеки pysparq. Эта библиотека рассчитана на то, чтобы работать с огромными объемами данных, которые не помещаются в оперативную память, а их обработка в случае обыкновенного последовательного исполнения кода обработки занимала бы слишком долгое время, которое на бизнес-задачи просто не выдается. Поэтому эта библиотека не предназначена для красивых визуализаций - здесь нужно быстро и оптимально выполнять простые вещи по обработке данных и построению моделей.

Очень жаль, что поработать на сервере с большими данными в рамках этого спринта не получилось. Было бы забавно поработать со штуками, где от одной криво написанной строки время исполнения твоей программы увеличивается на несколько часов (или больше) :) А конкретно в этой работе мы просто закрепили знание синтаксиса.