# Предсказание стоимости жилья в Калифорнии в 1990 году

На основе предоставленных данных нужно предсказать медианную стоимость дома в жилом массиве — `median_house_value`. Для этого обучим модель линейной регрессии и сделаем предсказания на тестовой выборке. Для оценки качества модели используем метрики RMSE, MAE и R2.

## Подготовка данных

Загрузим необходимые библиотеки:

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
    MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
    
from IPython.core.display import display, HTML

display(HTML("<style>.container { width:90% !important; }</style>"))

In [2]:
RANDOM_STATE = 42

Инициализируем локальную Spark-сессию:

In [3]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("forecasting_the_cost_of_housing") \
                    .getOrCreate()

Прочитаем содержимое файла `/datasets/housing.csv`:

In [4]:
df = spark.read.option('header', 'true'). \
            csv('/datasets/housing.csv', inferSchema = True)

                                                                                

Выведем тип данных колонок датасета:

In [5]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



В колонках датасета содержатся следующие данные:

- `longitude` — широта;
- `latitude` — долгота;
- `housing_median_age` — медианный возраст жителей жилого массива;
- `total_rooms` — общее количество комнат в домах жилого массива;
- `total_bedrooms` — общее количество спален в домах жилого массива;
- `population` — количество человек, которые проживают в жилом массиве;
- `households` — количество домовладений в жилом массиве;
- `median_income` — медианный доход жителей жилого массива;
- `median_house_value` — медианная стоимость дома в жилом массиве, <b>целевой признак</b>;
- `ocean_proximity` — близость к океану.

Вывод по разделу:
- загружены необходимые библиотеки, прочитано содержимое датасета. Все признаки, за исключением целевого `ocean_proximity`(категориальный) - числовые.

## Предобработка данных

Исследуем данные на наличие пропусков:

In [6]:
missing_counts = df.select([F.count(F.when(F.isnull(c), 1)) \
                            .alias(c) for c in df.columns])
missing_counts.show()

[Stage 2:>                                                          (0 + 1) / 1]

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



                                                                                

Посчитаем общее количество пропущенных значений в датасете:

In [7]:
total_missing = missing_counts.agg(*[F.sum(c) \
                    .alias(c) for c in missing_counts.columns]) \
                    .collect()
total_sum = sum(total_missing[0])
print(f"Общее количество пропусков: {total_sum}")

Общее количество пропусков: 207


Общее количество пропусков совпадает с количеством в колонке `total_bedrooms` (207). Посмотрим по описанию, сколько вообще данных содержится в датасете: 

In [8]:
description = df.describe()
description.show()

[Stage 6:>                                                          (0 + 1) / 1]

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|    total_bedrooms|        population|       households|     median_income|median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|  count|              20640|            20640|             20640|             20640|             20433|             20640|            20640|             20640|             20640|          20640|
|   mean|-119.56970445736148| 35.6318614341087|28.639486434108527|2635.7630813953488| 537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|206855.81690891474|           null|
| stddev|  2.0035317

                                                                                

Количество строк - 20640, количество пропущенных значений - 207, это всего 1%. Заполним пропуски медианным значением по колонке. Для этого сначала вычислим медиану:

In [10]:
total_bedrooms_median = df.approxQuantile('total_bedrooms', [0.5], 0)[0]
total_bedrooms_median

                                                                                

435.0

Теперь заполним пропуски:

In [11]:
df = df.fillna(total_bedrooms_median, subset=['total_bedrooms'])

И проверим, что пропуски заполнены:

In [12]:
missing_counts = df.select([F.count(F.when(F.isnull(c), 1)) \
                            .alias(c) for c in df.columns])
missing_counts.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



Кодируем все признаки и соберём их в массив, чтобы использовать далее в пайплайне:

In [14]:
str_indexer = StringIndexer(inputCols = ['ocean_proximity'], outputCols = ['ocean_proximity_indx']) 
encoder = OneHotEncoder(inputCols = ['ocean_proximity_indx'], outputCols = ['ocean_proximity_ohe'])
cat_assembler = VectorAssembler(inputCols = ['ocean_proximity_ohe'], outputCol = 'cat_features')
num_assembler = VectorAssembler(inputCols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
                                            'total_bedrooms', 'population', 'households', 'median_income'],
                                              outputCol = 'num_features')
scaler = StandardScaler(inputCol = 'num_features', outputCol = 'num_features_scaled', withMean=True)
features_assembler = VectorAssembler(inputCols = ['cat_features','num_features_scaled'], 
                                     outputCol = 'features')

transformations = [str_indexer, encoder, cat_assembler, num_assembler, scaler, features_assembler]

Выводы по разделу:

- датасет проверен на пропущенные значения, все они оказались в колонке `total_bedrooms`;
- пропущенные значения были удалены, т.к. составляют всего 1% от общего количества данных;
- аномальные значения в первом приближении обнаружены не были;
- признаки кодированы и собраны в массив.

## Построение моделей линейной регрессии

Разделим данные в датасете на обучающую и тестовую выборки:

In [15]:
df_train, df_test = df.randomSplit([.75,.25], seed = RANDOM_STATE)

Построим модель линейной регрессии, используя все данные из датасета. Подберём также гиперпараметры для модели:

In [16]:
lin_reg = LinearRegression(featuresCol = 'features', labelCol = 'median_house_value')
paramGrid = ParamGridBuilder() \
    .addGrid(lin_reg.regParam, [0.1, 0.01]) \
    .addGrid(lin_reg.elasticNetParam, [1, 0.1, 0.01]) \
    .addGrid(lin_reg.maxIter, [10, 100, 1000]) \
    .addGrid(lin_reg.solver, ['normal', 'l-bfgs']) \
    .build()
crossval = CrossValidator(estimator = lin_reg,
                          estimatorParamMaps = paramGrid,
                          evaluator = RegressionEvaluator(labelCol = 'median_house_value'),
                          numFolds = 5,
                          parallelism = 2)

Соберём пайплайн для модели:

In [17]:
model = Pipeline(stages = transformations + [crossval]).fit(df_train)

24/11/17 10:46:44 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/11/17 10:46:44 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/11/17 10:46:56 WARN BlockManager: Asked to remove block broadcast_188, which does not exist
24/11/17 10:47:02 WARN BlockManager: Asked to remove block broadcast_424, which does not exist
24/11/17 10:47:09 WARN BlockManager: Asked to remove block broadcast_776_piece0, which does not exist
24/11/17 10:47:09 WARN BlockManager: Asked to remove block broadcast_776, which does not exist
24/11/17 10:47:20 WARN BlockManager: Asked to remove block broadcast_1204, which does not exist
24/11/17 10:47:20 WARN BlockManager: Asked to remove block broadcast_1222, which does not exist
24/11/17 10:47:27 WARN BlockManager: Asked to remove block broadcast_1552, which does not exist
24/11/17 10:47:29 WARN BlockManager: Asked to remove block broadcast_1683_piece0, which does not exist
24/11/17 10

Теперь построим модель линейной регрессии, используя только числовые данные:

In [18]:
lin_reg = LinearRegression(featuresCol = 'num_features_scaled', labelCol = 'median_house_value')
paramGrid = ParamGridBuilder() \
    .addGrid(lin_reg.regParam, [0.1, 0.01]) \
    .addGrid(lin_reg.elasticNetParam, [1, 0.1, 0.01]) \
    .addGrid(lin_reg.maxIter, [10, 100, 1000]) \
    .addGrid(lin_reg.solver, ['normal', 'l-bfgs']) \
    .build()
crossval = CrossValidator(estimator = lin_reg,
                          estimatorParamMaps = paramGrid,
                          evaluator=RegressionEvaluator(labelCol = 'median_house_value'),
                          numFolds = 5,
                          parallelism = 2)

Для этой модели также соберём пайплайн:

In [19]:
num_model = Pipeline(stages = transformations + [crossval]).fit(df_train)

24/11/17 11:16:34 WARN BlockManager: Asked to remove block broadcast_131246, which does not exist
24/11/17 11:16:57 WARN BlockManager: Asked to remove block broadcast_132708, which does not exist
24/11/17 11:16:57 WARN BlockManager: Asked to remove block broadcast_132708_piece0, which does not exist
24/11/17 11:17:14 WARN BlockManager: Asked to remove block broadcast_133704, which does not exist
24/11/17 11:17:43 WARN BlockManager: Asked to remove block broadcast_135564, which does not exist
24/11/17 11:18:03 WARN BlockManager: Asked to remove block broadcast_136723, which does not exist
24/11/17 11:18:03 WARN BlockManager: Asked to remove block broadcast_136723_piece0, which does not exist
24/11/17 11:18:11 WARN BlockManager: Asked to remove block broadcast_137350, which does not exist
24/11/17 11:18:35 WARN BlockManager: Asked to remove block broadcast_138975, which does not exist
24/11/17 11:18:35 WARN BlockManager: Asked to remove block broadcast_138975_piece0, which does not exist

Выводы по разделу:

- данные в датасете разбили на обучающую и тестовую выборки в пропорции 75% и 25% соответственно;
- построили две модели линейной регрессии: первая использует все данные из датасета, вторая - только числовые данные;
- для обеих моделей собрали пайплайны, которые в дальнейшем задействуем для сравнения метрик.

## Сравнение результатов работы линейной регрессии

Создадим переменную `evaluator` (оценщик регрессии):

In [20]:
evaluator = RegressionEvaluator(labelCol = 'median_house_value')

Выведем метрики RMSE, MAE и R2 обеих моделей для сравнения:

In [21]:
print('model RMSE:', round(evaluator.setParams(metricName = "rmse").evaluate(model.transform(df_test)), 2))
print('model MAE:', round(evaluator.setParams(metricName = "mae").evaluate(model.transform(df_test)), 2))
print('model R2:', round(evaluator.setParams(metricName = "r2").evaluate(model.transform(df_test)), 2))

model RMSE: 69696.28
model MAE: 50350.81
model R2: 0.64


In [22]:
print('num_model RMSE:', round(evaluator.setParams(metricName = "rmse").evaluate(num_model.transform(df_test)), 2))
print('num_model MAE:', round(evaluator.setParams(metricName = 'mae').evaluate(num_model.transform(df_test)), 2))
print('num_model R2:', round(evaluator.setParams(metricName = 'r2').evaluate(num_model.transform(df_test)), 2))

num_model RMSE: 70626.52
num_model MAE: 51232.92
num_model R2: 0.63


Выводы по разделу:

- метрики RMSE, MAE и R2 для модели линейной регрессии, использующей все данные из датасета, равны, соответственно: 69696.3, 50350.8, 0.64;
- метрики RMSE, MAE и R2 для модели линейной регрессии, использующей только числовые данные, равны, соответственно: 70626.5, 51232.9, 0,63;

Общий вывод:

Для решения задачи были проделаны следующие этапы:

- подготовка данных:
    - загружены необходимые библиотеки, прочитано содержимое датасета. Все признаки, за исключением целевого `ocean_proximity`(категориальный) - числовые;

- предобработка данных:
    - датасет проверен на пропущенные значения, все они оказались в колонке `total_bedrooms`;
    - пропущенные значения были удалены, т.к. составляют всего 1% от общего количества данных;
    - аномальные значения в первом приближении обнаружены не были;
    - признаки кодированы и собраны в массив;

- построение моделей линейной регрессии:
    - данные в датасете разбиты на обучающую и тестовую выборки в пропорции 75% и 25% соответственно;
    - построены две модели линейной регрессии: первая использует все данные из датасета, вторая - только числовые данные;
    - для обеих моделей собраны пайплайны;

- сравнение результатов работы линейной регрессии:
    - метрики RMSE, MAE и R2 для модели линейной регрессии, использующей все данные из датасета, равны, соответственно: 69696.3, 50350.8, 0.64;
    - метрики RMSE, MAE и R2 для модели линейной регрессии, использующей только числовые данные, равны, соответственно: 70626.5, 51232.9, 0,63;

- итого, с предсказанием стоимости жилья в Калифорнии в 1990-м году примерно одинаково справляются обе модели - как та, что использует все данные из датасета, так и та, что использует только числовые данные. Метрики обеих моделей сопоставимы, при этом качество модели, обученной на всех признаках, несколько выше, чем той, что обучалась лишь на числовых данных.