# Подготовка данных

**1. Импортируем необходимые библиотеки и функции**

In [None]:
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.sql.functions import col,isnan, when, count, avg, mean

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator,RegressionEvaluator

**Инициализируем локальную Spark-сессию**

In [None]:
pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator

RANDOM_SEED = 2022

spark = SparkSession.builder \
                    .master("local") \
                    .appName("Housing") \
                    .getOrCreate()

**2. Прочитаем содержимое файла `/datasets/housing.csv` в DataFrame `df`**

In [None]:
df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True)

                                                                                

**3. Выведем типы данных колонок датасета и первые пять строк. Используя методы `pySpark`**

In [None]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [None]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [None]:
df.select('ocean_proximity').distinct().collect()


                                                                                

[Row(ocean_proximity='ISLAND'),
 Row(ocean_proximity='NEAR OCEAN'),
 Row(ocean_proximity='NEAR BAY'),
 Row(ocean_proximity='<1H OCEAN'),
 Row(ocean_proximity='INLAND')]

Датафрейм содержит одну колонку `ocean_proximity`(которая принимает значения: `ISLAND`,`NEAR OCEAN`, `NEAR BAY`, `<1H OCEAN`, `INLAND`) c категориальными признаками, остальные колонки содержат числовые признаки.

**4. Выполним предобработку данных: исследуем данные на наличие пропусков**

In [None]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



                                                                                

Датафрейм имеет пропуски только в колонке `total_bedrooms`. Заполним их средним значением.

In [None]:
df = df.na.fill(df.select(F.mean(df['total_bedrooms'])).collect()[0][0])

Проверим датафрейм после заполнения на наличие пропусков

In [None]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



Пропусков в датафрейме нет

**Преобразуем колонку `ocean_proximity` c категориальными значениями техникой `One Hot Encoding`**

Создадим списки из числовых признаков, категориальных признаков и целевого признака.

In [None]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude','latitude','housing_median_age','total_rooms',
                   'total_bedrooms','population','households','median_income']
target = 'median_house_value'

Трансформируем категориальные признаки с помощью трансформера `StringIndexer`. Он переводит текстовые категории в числовые значения , т.е. для значения `<1H OCEAN` будет 1, для значения `INLAND` будет 2 и так далее.

In [None]:
indexer = StringIndexer(inputCols=categorical_cols,
                        outputCols=[c+'_idx' for c in categorical_cols])
df = indexer.fit(df).transform(df)

cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
df.select(cols).distinct().orderBy('ocean_proximity_idx').show()



+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|      <1H OCEAN|                0.0|
|         INLAND|                1.0|
|     NEAR OCEAN|                2.0|
|       NEAR BAY|                3.0|
|         ISLAND|                4.0|
+---------------+-------------------+



                                                                                

Далее применим  `OneHotEncoder`

In [None]:
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
                        outputCols=[c+'_ohe' for c in categorical_cols])
df = encoder.fit(df).transform(df)

cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))]
df.select(cols).distinct().orderBy('ocean_proximity_idx').show()



+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|      <1H OCEAN|                0.0|      (4,[0],[1.0])|
|         INLAND|                1.0|      (4,[1],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|         ISLAND|                4.0|          (4,[],[])|
+---------------+-------------------+-------------------+



                                                                                

Финальный шаг преобразований — это объединение признаков в один вектор, с которым ML-алгоритм умеет работать.

In [None]:
categorical_assembler = \
        VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols],
                                        outputCol="categorical_features")
df = categorical_assembler.transform(df)

Отмасштабируем числовые признаки.

In [None]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols,
                                     outputCol="numerical_features")
df = numerical_assembler.transform(df)

In [None]:
standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled")
df = standardScaler.fit(df).transform(df)

Финальный шаг — собрать трансформированные категорийные и числовые признаки с помощью VectorAssembler.

In [None]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features,
                                  outputCol="features")
df = final_assembler.transform(df)

df.select(all_features).show(3)

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[3],[1.0])|     [-61.007269596069...|
|       (4,[3],[1.0])|     [-61.002278409814...|
|       (4,[3],[1.0])|     [-61.012260782324...|
+--------------------+-------------------------+
only showing top 3 rows



# Обучение моделей

**5. Построим две модели линейной регрессии на двух наборах данных**


Разделяем наш датасет на две части — выборку для обучения и выборку для тестирования качества модели c помощью метода randomSplit().

In [None]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())

                                                                                

16418 4222


                                                                                

При обучении модели алгоритм ждёт указания:

какая колонка содержит вектор признаков для обучения;

как называется колонка с целевой переменной.

In [None]:
lr = LinearRegression(featuresCol = 'features', labelCol='median_house_value', solver="normal")

model = lr.fit(train_data)

22/05/03 05:40:01 WARN Instrumentation: [ffd4e90b] regParam is zero, which might cause numerical instability and overfitting.
22/05/03 05:40:02 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/05/03 05:40:02 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/05/03 05:40:02 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/05/03 05:40:02 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
22/05/03 05:40:02 WARN Instrumentation: [ffd4e90b] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

In [None]:
lr1 = LinearRegression(featuresCol = 'numerical_features_scaled', labelCol='median_house_value', solver="normal")

model1 = lr1.fit(train_data)

22/05/03 05:40:05 WARN Instrumentation: [46873e1b] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [None]:
predictions = model.transform(test_data)

predictedLabes = predictions.select("median_house_value", "prediction")
predictedLabes.show()

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|152864.83787063695|
|           50800.0|214875.15745500242|
|           58100.0|142617.23499632766|
|           68400.0|132348.03377544833|
|           72200.0|163938.85895167245|
|           67000.0|154333.74063058477|
|           81300.0|152707.89295244776|
|           70500.0|164405.68221020233|
|           60000.0|142658.99211929925|
|          109400.0|171159.74399856105|
|           74100.0|150520.55504797166|
|           74700.0|167726.48518295074|
|           90000.0|209691.65750443935|
|          104200.0| 200348.3492369824|
|           74100.0| 156997.7090818272|
|           67500.0|148165.17924111616|
|          103100.0| 47312.01717952825|
|           92500.0|166598.51765157236|
|          128100.0|221959.78148622438|
|           99600.0|187128.65126372967|
+------------------+------------------+
only showing top 20 rows



                                                                                

In [None]:
predictions1 = model1.transform(test_data)

predictedLabes1 = predictions1.select("median_house_value", "prediction")
predictedLabes1.show(3)

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|101360.98099971423|
|           50800.0|183294.14571925811|
|           58100.0|109574.76993060019|
+------------------+------------------+
only showing top 3 rows



# Анализ результатов

**Сравним результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2**

Посчитаем метрики качества `R2`, `MAE`, `RMSE` для модели, содержащей все признаки. Результаты сохраним в Series.

In [None]:
list_metrics = ['r2', 'mae', 'rmse']
list_results=[]
for metric in list_metrics:
    evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="median_house_value", metricName=metric)
    list_results.append(evaluator.evaluate(predictions))
s1 = pd.Series(list_results, index=list_metrics, name ='все признаки')
s1

                                                                                

r2          0.653622
mae     49849.341084
rmse    68480.413421
Name: все признаки, dtype: float64

Посчитаем метрики качества `R2`, `MAE`, `RMSE` для модели, с исключенным одним категориальным признаком. Результаты сохраним в Series.

In [None]:
list_metrics = ['r2', 'mae', 'rmse']
list_results=[]
for metric in list_metrics:
    evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="median_house_value", metricName=metric)
    list_results.append(evaluator.evaluate(predictions1))
s2 = pd.Series(list_results, index=list_metrics, name = 'без одного признака')
s2

r2          0.646226
mae     50848.475110
rmse    69207.629961
Name: без одного признака, dtype: float64

Метрики качества для двух моделей для наглядности объединим в одну таблицу.

In [None]:
result = pd.DataFrame([s1,s2])

In [None]:
result.T

Unnamed: 0,все признаки,без одного признака
r2,0.653622,0.646226
mae,49849.341084,50848.47511
rmse,68480.413421,69207.629961


**Вывод**


**Категориальный признак `ocean_proximity` (перевод с англ. близость к океану) должен иметь весьма жесткую корреляцию со стоимостью жилья. Чем ближе дом к океану, тем как правило он имеет более высокую стоимость. Исключение этого признака из набора данных должно будет привести к ухудшению качества модели. Что мы и наблюдаем на таблице изображенной выше. Метрики MAE и RMSE характеризуют разброс предсказания от истинного значения и они стали больше. Метрика R2 уменьшилась, так как для идеальной модели эта метрика равна 1.**