# Вступление

В проекте нам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучить модель и сделать предсказания на тестовой выборке. Для оценки качества модели нужно использовать метрики RMSE, MAE и R2.

Нам предоставлен датасет содержащий данные о жилой недвижимости в Калифорнии за 1990г. Используя Spark DataFrame API прочитаем данные, поищем пропущенные значения, обработаем их и построим несколько моделей машинного обучения. Тестовую выборку сделаем из имеющегося датасета. Для оценки качества моделей будем использовать следующие метрики: RMSE, R2, MAE.

### Подготовка данных

**Импортирование необходимых библиотек**

In [1]:
import pandas as pd 
import numpy as np
import pyspark
import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, LinearRegressionSummary
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql import SparkSession
from pyspark.sql.types import *

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator

**Инициализация Spark сессии**

Инициализируем Spark сессию в локальном режиме для входа в Spark приложение.

In [2]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("California Housing") \
                    .getOrCreate()

**Чтение исходного файла**

Теперь когда сессия запущена, прочитаем файл и запием его в переменную df_housing.

In [3]:
df_housing = spark.read.load('/datasets/housing.csv', format="csv", sep=",", inferSchema=True, header="true")

                                                                                

**Основная информация**

Посмотрим какие колонки находятся в датасете и их типы.

In [4]:
df_housing.printSchema()
df_housing.show()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR B

Видим что есть только одна колонка со строковыми значениями, остальные с численными.

Выведем основую информацию по численным колонкам.

In [5]:
df_housing.summary().show()

                                                                                

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|    total_bedrooms|        population|       households|     median_income|median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|  count|              20640|            20640|             20640|             20640|             20433|             20640|            20640|             20640|             20640|          20640|
|   mean|-119.56970445736148| 35.6318614341087|28.639486434108527|2635.7630813953488| 537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|206855.81690891474|           null|
| stddev|  2.0035317

**Пропуски**

Проверим есть ли в данных пропуски.

In [6]:
df_housing.select(* \
                  (F.sum(F.col(c).isNull().cast("int")).alias(c) \
                   for c in df_housing.columns)) \
                 .show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



Нашли пропуски в одной колонке `total_bedrooms`, 207 пропусков. Можем заполнить их средним значенем.

In [7]:
median_bedrooms = int(df_housing.approxQuantile('total_bedrooms', [0.5], 0)[0])
median_bedrooms

435

Выделили среднее значение колонки `total_bedrooms` в отдельную переменную.

In [9]:
df_housing = df_housing.na.fill(median_bedrooms, ['total_bedrooms'])

In [10]:
df_housing.select(* \
                  (F.sum(F.col(c).isNull().cast("int")).alias(c) \
                   for c in df_housing.columns)) \
                 .show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



Заполнили пропуски и проверили их отсутствие после этого. Пропусков нет.

**Выделение признаков**

Перед тем как создавать модели нам нужно подготовить данные, определить целевой признак и признаки для обучения. Разобьём все колонки на три типа:

- Целевой признак: колонка `median_house_value`.
- Категориальные признаки: только одна колонка `ocean_proximity`.
- Численные признаки: все остальные колонки.


Создадим три переменные в которые и положим колонки в соответсвии с их типом.

In [11]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ["longitude", "latitude", "housing_median_age", "total_rooms", "total_bedrooms",
                   "population", "households", "median_income",]
target = "median_house_value" 

Разделим датасет на две выборки:

- Тренировачная, нужная для обучения модели.
- Тестовая, для проверки модели.

Разбитие будем производить в соотношении 80%/20% (Тренировочная/Тестовая).

In [12]:
RANDOM_SEED = 1234

In [13]:
train_data, test_data = df_housing.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())

                                                                                

16525 4115


Будем использовать Pipeline. Преобразуем категориальные признаки техникой OHE, а для численных сделаем масштабирование с помощью StandartScaler. Всё это пропишем с план действия для pipeline.

In [14]:
stages = []

categoricalColumns = categorical_cols
for categoricalCol in categoricalColumns:

    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + 'Index',
                                  handleInvalid = 'keep')
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
                            outputCol=categoricalCol + "classVec")
    stages += [stringIndexer, encoder]

numerical_assembler = VectorAssembler(inputCols=numerical_cols,
                                      outputCol="numerical_features")
standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled")

stages += [numerical_assembler, standardScaler]    

numericCols = ['numerical_features_scaled']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

Так как мы хотим построить ещё и модель обученную тольно на численных признаках, создадим ещё один план действия pipeline.

In [15]:
stages_num = []

numerical_assembler = VectorAssembler(inputCols=numerical_cols,
                                      outputCol="numerical_features")
standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled")

stages_num += [numerical_assembler, standardScaler]    

Теперь у нас есть все необходимые данные для обучения и проверки модели.

### Обучение моделей

Построим две модели линейной регрессии. В одной для обучения будем использовать все признаки, в другой только численные.

In [16]:
lr_all = LinearRegression(labelCol=target, featuresCol='features', maxIter=1000, regParam=0.3, elasticNetParam=0.8)

stages += [lr_all]

In [17]:
lr_num = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled', maxIter=1000, regParam=0.3, elasticNetParam=0.8)

stages_num += [lr_num]

In [18]:
pipeline_all = Pipeline(stages=stages)
pipeline_num = Pipeline(stages=stages_num)

Пайплайны готовы, можем отправлять им тренировочные данные.

In [19]:
model_all = pipeline_all.fit(train_data)
model_num = pipeline_num.fit(train_data)

23/02/22 14:02:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/02/22 14:02:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

Модели обучены, теперь сделаем предсказания.

In [20]:
predictions_all = model_all.transform(test_data)
predictions_num = model_num.transform(test_data)

In [21]:
predictedLabes = predictions_all.select("median_house_value", "prediction")
predictedLabes.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           85800.0|116678.53068166226|
|          111400.0| 191475.7288470976|
|           70500.0| 144584.9568145182|
|          128900.0| 208334.3518197541|
|          116100.0| 235026.5104953777|
|           82800.0|172973.75397757487|
|           81300.0|152439.22813346703|
|           85600.0|189040.63798059896|
|           80500.0|180690.00186081696|
|           75500.0|139192.64227761608|
|           90000.0|209654.53985772282|
|           67500.0|147605.67249723244|
|           57500.0|141044.24547791667|
|          103100.0| 47803.07622436015|
|          128100.0|222018.93631890416|
|           99600.0|187131.60005696164|
|          122400.0|128764.82224631542|
|           94800.0|219672.69692408107|
|           92800.0| 208703.4660310666|
|           72300.0|155120.83782315627|
+------------------+------------------+
only showing top 20 rows



In [22]:
predictedLabes = predictions_num.select("median_house_value", "prediction")
predictedLabes.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|           85800.0| 65460.03770375531|
|          111400.0| 164139.8048401568|
|           70500.0|110604.48890756443|
|          128900.0|174701.44169566687|
|          116100.0|201364.00518673938|
|           82800.0|139320.25653447816|
|           81300.0|118247.25597418752|
|           85600.0| 156074.8918116265|
|           80500.0|149105.69694786659|
|           75500.0| 100246.7957484019|
|           90000.0|176447.10333629092|
|           67500.0|113829.32170521747|
|           57500.0|105136.01540501928|
|          103100.0|-7608.166964618489|
|          128100.0|191669.65294918325|
|           99600.0|153121.25924076186|
|          122400.0| 72969.22992756078|
|           94800.0| 199094.3622708586|
|           92800.0|174763.94455447793|
|           72300.0|130161.65584604396|
+------------------+------------------+
only showing top 20 rows



Предсказания сделаны, посмотрим на результаты метрик.

In [23]:
evaluator_r2 = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="median_house_value", metricName="r2")
evaluator_rmse = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="median_house_value", metricName="rmse")
evaluator_mae = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="median_house_value", metricName="mae")

print("R Squared (R2) on test data (all) = %g" % evaluator_r2.evaluate(predictions_all))
print("R Squared (R2) on test data (numerical) = %g" % evaluator_r2.evaluate(predictions_num))
print("RMSE on test data (all) = %g" % evaluator_rmse.evaluate(predictions_all))
print("RMSE on test data (numerical) = %g" % evaluator_rmse.evaluate(predictions_num))
print("MAE on test data (all) = %g" % evaluator_mae.evaluate(predictions_all))
print("MAE on test data (numerical) = %g" % evaluator_mae.evaluate(predictions_num))

R Squared (R2) on test data (all) = 0.654292
R Squared (R2) on test data (numerical) = 0.64585
RMSE on test data (all) = 67360
RMSE on test data (numerical) = 68177.5
MAE on test data (all) = 49218.9
MAE on test data (numerical) = 50399.2


Обе модели показали примерно одинаковые результаты. R2 - 0.654 и 0.646, модель обученная на всех признаках показала немного лучший результат, но незначительно. Тоже самое можно увидель по метрикам RSME и MAE.

# Анализ результатов

Перед нами стояла задача построить две модели линейной регресии используя Spark DataFrame API. Изучили предоставленные данные, проверили их на пропуски. В данных нашли пропуски, обработали их.

Определили целевой признак, а так-же признаки для обучения моделей. Признаки разбили на два типа: категориальные и численные. Категориальные трансформировали техникой прямого кодирования (OHE). Для численных сделали масштабирования для уравнивания их веса.

Разбили датасет на две выборки: тренировочную и тестовую в соотношении 80%/20% соответсвенно.

Построили две модели, для одной использовали в процессе обучения все признаки, для другой только численные. Протестировали их на тестовой выборке. Модель обученная на всех признаках показала немного лучшие результаты. В целом обе модели показали довольно посредственные результаты. Лучшие на тестовой: **R2: 0.654**, **RMSE: 67360**, **MAE: 49218**