# Предсказание стоимости жилья

# Описание проекта

Необходимо обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году.

На основе данных нужно предсказать медианную стоимость дома в жилом массиве — `median_house_value`. Обучить модель и сделайть предсказания на тестовой выборке. Для оценки качества модели будут использованы метрики `RMSE`, `MAE` и `R2`.

**Описание датасета**

В колонках датасета содержатся следующие данные:

- `longitude` — широта;
- `latitude` — долгота;
- `housing_median_age` — медианный возраст жителей жилого массива;
- `total_rooms` — общее количество комнат в домах жилого массива;
- `total_bedrooms` — общее количество спален в домах жилого массива;
- `population` — количество человек, которые проживают в жилом массиве;
- `households` — количество домовладений в жилом массиве;
- `median_income` — медианный доход жителей жилого массива;
- `median_house_value` — медианная стоимость дома в жилом массиве;
- `ocean_proximity` — близость к океану.

**План проекта**

1. Подготовка данных.
   
   - Импорты необходимых библиотек;
   - Инициализация локальной `Spark`-сессии;
   - Чтение содержимого файла `/datasets/housing.csv`;
   - Проверка типов данных колонок датасета(использую методы pySpark);
   - Предобработка данных:
       
       - Исследование данные на наличие пропусков и заполнение их по возможности;
       - Преобразование колонки с категориальными значениями техникой One hot encoding;
       
2. Обучение моделей.
   
   - Построение двух моделей линейной регрессии на разных наборах данных(используйя оценщик LinearRegression из библиотеки MLlib):
       
       - используя все данные из файла;
       - используя только числовые переменные, исключив категориальные.
       
3. Анализ результатов.
   
   - Сравнение результатов работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2. 

4. Выводы.

## Подготовка данных

### Импорт необходимых библиотек для работы

In [1]:
import pandas as pd
import numpy as np
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml import Pipeline

RANDOM = 12345

import warnings

warnings.filterwarnings("ignore")

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['SPARK_LOCAL_IP'] = '127.0.1.1'

### Инициализация Spark-сессии и загрузка датасета

In [2]:
spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("Housing Forecast") \
                    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/11 10:33:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/11 10:33:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
data = spark.read.option('header', 'true').csv('../Data/housing.csv',
                                               inferSchema=True)
data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



### Типы данных

In [4]:
print(pd.DataFrame(data.dtypes, columns=['column', 'type']))

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


### Проверка на наличие пропусков

In [5]:
for column in data.columns:
    print(column, data.where(F.isnan(column) | F.col(column).isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Имеется 207 пропусков в колонке `total_bedrooms` — общее количество спален в домах жилого массива.

Т. к. пропусков меньше 1% их можно просто удалить, но как вариант можно попробовать точечно заполнить.
Можно былобы просто заменить средним значением всей колонки, но это будет не корректно, т.к. в некоторых строках общее количество спален может превысить общее количество комнат.

Да замены буду использовать след. вариант.

Рассчитаю среднее отношение количества комнат к количичетсву спален.

Сгруппируем датасет по колонке близости к океану и рассчитаем среднее отношение количества комнат к количеству спален.

In [6]:
temp = data.groupBy(['ocean_proximity'
                     ]).agg(F.avg('total_rooms'),
                            F.avg('total_bedrooms')).toPandas()

In [7]:
temp

Unnamed: 0,ocean_proximity,avg(total_rooms),avg(total_bedrooms)
0,ISLAND,1574.6,420.4
1,NEAR OCEAN,2583.700903,538.615677
2,NEAR BAY,2493.58952,514.182819
3,<1H OCEAN,2628.343586,546.539185
4,INLAND,2717.742787,533.881619


In [8]:
print(
    'Среднее значение:',
    round((temp['avg(total_rooms)'] / temp['avg(total_bedrooms)']).mean(), 0))

Среднее значение: 5.0


Данные с пропусками

In [9]:
print(data.filter(data['total_bedrooms'].isNull()).toPandas())

     longitude  latitude  housing_median_age  total_rooms  total_bedrooms  \
0      -122.16     37.77                47.0       1256.0             NaN   
1      -122.17     37.75                38.0        992.0             NaN   
2      -122.28     37.78                29.0       5154.0             NaN   
3      -122.24     37.75                45.0        891.0             NaN   
4      -122.10     37.69                41.0        746.0             NaN   
..         ...       ...                 ...          ...             ...   
202    -119.19     34.20                18.0       3620.0             NaN   
203    -119.18     34.19                19.0       2393.0             NaN   
204    -118.88     34.17                15.0       4260.0             NaN   
205    -118.75     34.29                17.0       5512.0             NaN   
206    -118.72     34.28                17.0       3051.0             NaN   

     population  households  median_income  median_house_value ocean_proxim

Замена пропусков

In [10]:
data = data.withColumn(
    'total_bedrooms',
    F.when(data['total_bedrooms'].isNull(),
           data['total_rooms'] / 5).otherwise(data['total_bedrooms']))

Проверка замены пропусков

In [11]:
for column in data.columns:
    print(column, data.where(F.isnan(column) | F.col(column).isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Пропуски заменены нужными значениями

Проверим на наличие дубликатов

In [12]:
display(data.describe().toPandas().T)

23/07/11 10:33:47 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
longitude,20640,-119.56970445736148,2.003531723502584,-124.35,-114.31
latitude,20640,35.6318614341087,2.135952397457101,32.54,41.95
housing_median_age,20640,28.639486434108527,12.58555761211163,1.0,52.0
total_rooms,20640,2635.7630813953488,2181.6152515827944,2.0,39320.0
total_bedrooms,20640,537.6163178294578,420.7923178822386,1.0,6445.0
population,20640,1425.4767441860465,1132.46212176534,3.0,35682.0
households,20640,499.5396802325581,382.3297528316098,1.0,6082.0
median_income,20640,3.8706710029070246,1.899821717945263,0.4999,15.0001
median_house_value,20640,206855.81690891474,115395.61587441359,14999.0,500001.0


In [13]:
print(data.distinct().count())

20640


Всего в датасете 20640 строк, уникальных тоже 20640. Дубликатов нету.

### Разделение на выборки

In [14]:
train_data, test_data = data.randomSplit([.75, .25], seed=RANDOM)

Выделение категориального, количественных и целевого признаков

In [15]:
print(train_data.count(), test_data.count()) 

15406 5234


### Преобразование признаков

In [16]:
categorical_features = ['ocean_proximity']
numerical_features = [
    column for column in data.columns
    if (column != 'median_house_value') and (column != 'ocean_proximity')]
target = 'median_house_value'

Трансформация категориальных признаков с помощью трансформера `StringIndexer`

In [17]:
indexer = StringIndexer(inputCols=categorical_features,
                        outputCols=[c + '_idx' for c in categorical_features])

indexer = indexer.fit(train_data)
train_data = indexer.transform(train_data)
test_data = indexer.transform(test_data)

In [18]:
cols = [
    column for column in train_data.columns for i in categorical_features
    if (column.startswith(i))]

train_data.select(cols).show(5)

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 5 rows



In [19]:
test_data.select(cols).show(5)

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 5 rows



Создание `OHE`-кодирование для категорий.

In [20]:
encoder = OneHotEncoder(
    inputCols=[column + '_idx' for column in categorical_features],
    outputCols=[column + '_ohe' for column in categorical_features])

encoder = encoder.fit(train_data)

train_data = encoder.transform(train_data)
test_data = encoder.transform(test_data)

cols = [
    column for column in train_data.columns for i in categorical_features
    if (column.startswith(i))]

In [21]:
train_data.select(cols).show(5)

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 5 rows



In [22]:
test_data.select(cols).show(5)

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 5 rows



Объединение признаков в один вектор

In [23]:
categorical_assembler = \
        VectorAssembler(inputCols=[column+'_ohe' for column in categorical_features],
                                        outputCol="categorical_features")

train_data = categorical_assembler.transform(train_data)
test_data = categorical_assembler.transform(test_data)

In [24]:
display(pd.DataFrame(train_data.take(5), columns=train_data.columns))

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,ocean_proximity_idx,ocean_proximity_ohe,categorical_features
0,-124.35,40.54,52.0,1820.0,300.0,806.0,270.0,3.0147,94600.0,NEAR OCEAN,2.0,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"
1,-124.3,41.8,19.0,2672.0,552.0,1298.0,478.0,1.9797,85800.0,NEAR OCEAN,2.0,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"
2,-124.3,41.84,17.0,2677.0,531.0,1244.0,456.0,3.0313,103600.0,NEAR OCEAN,2.0,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"
3,-124.27,40.69,36.0,2349.0,528.0,1194.0,465.0,2.5179,79000.0,NEAR OCEAN,2.0,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"
4,-124.26,40.58,52.0,2217.0,394.0,907.0,369.0,2.3571,111400.0,NEAR OCEAN,2.0,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"


In [25]:
display(pd.DataFrame(test_data.take(5), columns=test_data.columns))

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,ocean_proximity_idx,ocean_proximity_ohe,categorical_features
0,-124.23,40.54,52.0,2694.0,453.0,1152.0,435.0,3.0806,106700.0,NEAR OCEAN,2.0,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"
1,-124.17,40.74,17.0,2026.0,338.0,873.0,313.0,4.0357,128900.0,NEAR OCEAN,2.0,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"
2,-124.17,40.75,13.0,2171.0,339.0,951.0,353.0,4.8516,116100.0,NEAR OCEAN,2.0,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"
3,-124.17,40.79,43.0,2285.0,479.0,1169.0,482.0,1.9688,70500.0,NEAR OCEAN,2.0,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"
4,-124.16,40.77,35.0,2141.0,438.0,1053.0,434.0,2.8529,85600.0,NEAR OCEAN,2.0,"(0.0, 0.0, 1.0, 0.0)","(0.0, 0.0, 1.0, 0.0)"


**Трансформация числовых признаков**

Шкалируем значения с помощью `VectorAssembler` и `StandardScaler`

In [26]:
numerical_assembler = VectorAssembler(inputCols=numerical_features,
                                      outputCol="numerical_features")

train_data = numerical_assembler.transform(train_data)
test_data = numerical_assembler.transform(test_data)

In [27]:
standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled")
standardScaler = standardScaler.fit(train_data)
train_data = standardScaler.transform(train_data)
test_data = standardScaler.transform(test_data)

In [28]:
train_data.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity',
 'ocean_proximity_idx',
 'ocean_proximity_ohe',
 'categorical_features',
 'numerical_features',
 'numerical_features_scaled']

In [29]:
test_data.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity',
 'ocean_proximity_idx',
 'ocean_proximity_ohe',
 'categorical_features',
 'numerical_features',
 'numerical_features_scaled']

Собираем трансформированные категориальные и числовые признаки с помощью `VectorAssembler`

In [30]:
all_features = ['categorical_features', 'numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, outputCol="features")

train_data = final_assembler.transform(train_data)
test_data = final_assembler.transform(test_data)

In [31]:
train_data.select(all_features).show(5) 

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [-61.982225379042...|
|       (4,[2],[1.0])|     [-61.957302891957...|
|       (4,[2],[1.0])|     [-61.957302891957...|
|       (4,[2],[1.0])|     [-61.942349399707...|
|       (4,[2],[1.0])|     [-61.937364902290...|
+--------------------+-------------------------+
only showing top 5 rows



In [32]:
test_data.select(all_features).show(5) 

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [-61.922411410039...|
|       (4,[2],[1.0])|     [-61.892504425538...|
|       (4,[2],[1.0])|     [-61.892504425538...|
|       (4,[2],[1.0])|     [-61.892504425538...|
|       (4,[2],[1.0])|     [-61.887519928121...|
+--------------------+-------------------------+
only showing top 5 rows



**Данные обработаны, трансформированы и готовы к обучению моделей.**

## Обучение моделей

### Обучение модели линейной регрессии

Качество моделей будет оцениваться, используя трансформер `RegressionMetrics`.

In [33]:
final = []
for column in ['features', 'numerical_features_scaled']:

    reg = LinearRegression(featuresCol=column, labelCol=target, regParam=1)
    model = reg.fit(train_data)

    predictions = model.transform(test_data)

    results = predictions.select(['prediction', target])

    results_collect = results.collect()
    results_list = [(float(i[0]), float(i[1])) for i in results_collect]
    scoreAndLabels = spark.sparkContext.parallelize(results_list)

    metrics = RegressionMetrics(scoreAndLabels)

    final.append([
        column,
        round(metrics.rootMeanSquaredError, 4),
        round(metrics.meanAbsoluteError, 4),
        round(metrics.r2, 4)
    ])

    print('Модель:', column)
    print('RMSE:', round(metrics.rootMeanSquaredError, 4))
    print('MAE:', round(metrics.meanAbsoluteError, 4))
    print('R2:', round(metrics.r2, 4))
    print()

23/07/11 10:33:57 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

Модель: features
RMSE: 67139.3017
MAE: 48938.7983
R2: 0.6641

Модель: numerical_features_scaled
RMSE: 68175.0001
MAE: 50103.1342
R2: 0.6537



**Модели обучены**

## Анализ результатов

Для оценки качества модели использовались метрики `RMSE`, `MAE` и `R2`.

- `RMSE` - корень от средней квадратической ошибки, чем точнее модель, тем больше значение RMSE будет стремиться к нулю;
- `MAE` - среднее абсолютное значение, тем больше значение RMSE будет стремиться к нулю;
- `R2` - указывает насколько тесной является связь между факторами регрессии и зависимой переменной, чем выше R2, тем точнее модель. 

In [34]:
results = spark.createDataFrame(final, ['Признаки', 'RMSE', 'MAE', 'R2'])
results = results.replace(["features", "numerical_features_scaled"],
                          ["Все признаки", "Числовые признаки"], "Признаки")
results.show()

+-----------------+----------+----------+------+
|         Признаки|      RMSE|       MAE|    R2|
+-----------------+----------+----------+------+
|     Все признаки|67139.3017|48938.7983|0.6641|
|Числовые признаки|68175.0001|50103.1342|0.6537|
+-----------------+----------+----------+------+



**Согласно всем метрикам модель со всеми признаками работает чуть точнее**

## Альтернативное решение используя Pipeline

Создаем шаги для Pipe

In [35]:
stages = []

indexer = StringIndexer(inputCols=categorical_features,
                        outputCols=[c + '_idx' for c in categorical_features])

encoder = OneHotEncoder(
    inputCols=[column + '_idx' for column in categorical_features],
    outputCols=[column + '_ohe' for column in categorical_features])

categorical_assembler = \
        VectorAssembler(inputCols=[column+'_ohe' for column in categorical_features],
                                        outputCol="categorical_features")

numerical_assembler = VectorAssembler(inputCols=numerical_features,
                                      outputCol="numerical_features")

standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled")

all_features = ['categorical_features', 'numerical_features_scaled']
final_assembler = VectorAssembler(inputCols=all_features, outputCol="features")

stages += [indexer, encoder, categorical_assembler, numerical_assembler, 
          standardScaler, final_assembler]


Разделяем на train и test

In [36]:
train_data_pipe, test_data_pipe = data.randomSplit([.75, .25], seed=RANDOM)

Обучаем модель

In [37]:
final_pipe = []
for column in ['features', 'numerical_features_scaled']:

    reg = LinearRegression(featuresCol=column, labelCol=target, regParam=1)
    stages += [reg]
    pipeline = Pipeline(stages=stages)
    
    model = pipeline.fit(train_data_pipe)

    predictions = model.transform(test_data_pipe)

    results = predictions.select(['prediction', target])

    results_collect = results.collect()
    results_list = [(float(i[0]), float(i[1])) for i in results_collect]
    scoreAndLabels = spark.sparkContext.parallelize(results_list)

    metrics = RegressionMetrics(scoreAndLabels)

    final_pipe.append([
        column,
        round(metrics.rootMeanSquaredError, 4),
        round(metrics.meanAbsoluteError, 4),
        round(metrics.r2, 4)
    ])

    print('Модель:', column)
    print('RMSE:', round(metrics.rootMeanSquaredError, 4))
    print('MAE:', round(metrics.meanAbsoluteError, 4))
    print('R2:', round(metrics.r2, 4))
    print()
    stages.remove(reg)

Модель: features
RMSE: 67139.3017
MAE: 48938.7983
R2: 0.6641

Модель: numerical_features_scaled
RMSE: 68175.0001
MAE: 50103.1342
R2: 0.6537



Результаты

In [38]:
results_pipe = spark.createDataFrame(final_pipe, ['Признаки', 'RMSE', 'MAE', 'R2'])
results_pipe = results_pipe.replace(["features", "numerical_features_scaled"],
                          ["Все признаки", "Числовые признаки"], "Признаки")
results_pipe.show()

+-----------------+----------+----------+------+
|         Признаки|      RMSE|       MAE|    R2|
+-----------------+----------+----------+------+
|     Все признаки|67139.3017|48938.7983|0.6641|
|Числовые признаки|68175.0001|50103.1342|0.6537|
+-----------------+----------+----------+------+



**Результаты получились точно такие же как и без Pipeline**

Остановка Spark-сессии

In [39]:
spark.stop()

## Вывод

В результате исследования было сделано:
1. Инициализирована Spark сессия, прочитаны данные.
2. Обработаны пропуски. Пропуски заменены путем выявленного соотношения между общим количеством комнат и количеством спален.
3. Преобразованы признаки (`OHE` и `StandardScaler`).
4. Обучена модель линейной регрессии в двух вариантах:
   - со всеми признаками;
   - только с числовыми признаками.
5. Альтернативным вариантом обучены модели с использованием Pipeline. Результаты аналогичные.

Согласно полученным метрикам можно сделать вывод, что модель со всеми признаками чуть лучше.