# Предсказание стоимости жилья

В проекте нам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году.

Каждая строка содержит агрегированную статистику о жилом массиве. Жилой массив — минимальная географическая единица с населением от 600 до 3000 человек в зависимости от штата. Одна строка в данных содержит статистику в среднем о 1425.5 обитателях жилого массива.

На основе данных нужно предсказать медианную стоимость дома в жилом массиве — median_house_value. Обучим модель и сделаем предсказания на тестовой выборке. Для оценки качества модели используем метрики RMSE, MAE и R2.

###Описание данных
В колонках датасета содержатся следующие данные:

- longitude — широта;
- latitude — долгота;
- housing_median_age — медианный возраст жителей жилого массива;
- total_rooms — общее количество комнат в домах жилого массива;
- total_bedrooms — общее количество спален в домах жилого массива;
- population — количество человек, которые проживают в жилом массиве;
- households — количество домовладений в жилом массиве;
- median_income — медианный доход жителей жилого массива;
- median_house_value — медианная стоимость дома в жилом массиве;
- ocean_proximity — близость к океану.

###План выполнения проекта
- Инициализируем локальную Spark-сессию.
- Прочитайтем содержимое файла housing.csv.
- Выведем типы данных колонок датасета. Используем методы pySpark.
- Выполним предобработку данных:
    - Исследуем данные на наличие пропусков и заполним их, выбрав значения по своему усмотрению.
    - Преобразуем колонку с категориальными значениями техникой One hot encoding.
- Построим две модели линейной регрессии на разных наборах данных:
    - используя все данные из файла;
    - используя только числовые переменные, исключив категориальные.
    - Для построения модели используем оценщик LinearRegression из библиотеки MLlib.
- Сравним результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2.

####Импорт необходимых библиотек.

In [1]:
import pandas as pd # Импортируем библиотеку pandas для работы с данными в формате таблиц.
import numpy as np # Импортируем библиотеку numpy для работы с массивами и числовыми операциями.

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=8913683487c6379d888b93ac98d202a3602fc57cdf49a00f3a9ebb0a807f3e3a
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


Импорт необходимых библиотек и запуск spark сессии.

In [3]:
import pyspark # Импортируем библиотеку pyspark для работы с большими данными и распределенными вычислениями.
from pyspark.sql import SparkSession # Импортируем класс SparkSession для создания сессии Spark.
from pyspark.sql.types import * # Импортируем все типы данных из pyspark.sql.types для определения схемы данных.
import pyspark.sql.functions as F # Импортируем функции из pyspark.sql для работы с данными.

from pyspark.ml.feature import Imputer # Импортируем класс Imputer для обработки пропущенных значений.
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler # Импортируем классы для обработки признаков.
from pyspark.ml.regression import LinearRegression # Импортируем класс LinearRegression для построения линейной регрессии.

pyspark_version = pyspark.__version__ # Получаем версию установленного PySpark.
if int(pyspark_version[:1]) == 3: # Проверяем, если версия PySpark 3.x.
  from pyspark.ml.feature import OneHotEncoder # Импортируем OneHotEncoder для кодирования категориальных признаков.
elif int(pyspark_version[:1]) == 2: # Если версия PySpark 2.x.
  from pyspark.ml.feature import OneHotEncodeEstimator # Импортируем OneHotEncodeEstimator для
                                                       # кодирования категориальных признаков.

SEED = 31416 # Устанавливаем фиксированное значение для генератора случайных чисел для воспроизводимости.
# Начинаем создание сессии Spark.
spark = SparkSession.builder \
                    .master('local') \
                    .appName('California Housing ML') \
                    .getOrCreate()
# Указываем, что будем использовать локальный режим.
# Задаем имя приложения.
# Создаем сессию или получаем существующую.

Прочитайтем содержимое файла housing.csv.

In [4]:
df = spark.read.load('housing.csv', format='csv', sep=',', inferSchema=True, header='true') # Загружаем данные из CSV файла
                                                                                            # в DataFrame.
df.printSchema() # Выводим схему DataFrame, чтобы увидеть типы данных в каждом столбце.

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Выведем типы данных колонок датасета. Используем методы pySpark.

In [5]:
pd.DataFrame(df.dtypes, columns=['column', 'type']) # Создаем DataFrame pandas для отображения типов данных столбцов.

Unnamed: 0,column,type
0,longitude,double
1,latitude,double
2,housing_median_age,double
3,total_rooms,double
4,total_bedrooms,double
5,population,double
6,households,double
7,median_income,double
8,median_house_value,double
9,ocean_proximity,string


Ознакомимся с первыми 10 строками данных.

In [6]:
df.show(10) # Показываем первые 10 строк DataFrame для предварительного просмотра данных.

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Посмотрим статистику по данным.

In [7]:
df.describe().toPandas() # Получаем статистическое описание данных и преобразуем его в DataFrame pandas.

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


####Выводы
Мы инициализировали сессию Spark, загрузили библиотеки и данные, провели осмотр данных.

## Подготовка данных


Исследуем данные на наличие пропусков.

In [8]:
# Считаем количество пропущенных значений в каждом столбце и выводим результат.
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



Заполним их медианными значениями.

In [9]:
imputer = Imputer(strategy='median', inputCols=['total_bedrooms'], outputCols=['total_bedrooms']) # Создаем объект Imputer
# для заполнения пропусков медианой в столбце 'total_bedrooms'.
df = imputer.fit(df).transform(df) # Обучаем Imputer на данных и применяем его для заполнения пропусков.

Проверим статистику.

In [10]:
df.describe().toPandas() # Снова получаем статистическое описание данных после обработки пропусков.

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,536.8388565891473,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,419.3918779216887,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


Среднее и стандартное отклонение почти не изменились.

####Выводы
Проверили пропуски в данных и заполнили их медианными значениями.

###Преобразование категориальных признаков
Преобразуем колонку с категориальными значениями техникой One hot encoding.

Разделим колонки на два типа: числовые и текстовые, которые представляют категориальные данные.

In [11]:
categorical_cols = ['ocean_proximity'] # Определяем список категориальных признаков.
numerical_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms',
                   'total_bedrooms', 'population', 'households', 'median_income'] # Определяем список числовых признаков.
target = 'median_house_value' # Определяем целевой признак для предсказания.

Применим технику сначала StringIndexer для перевода категориальных признаков в численные.

In [12]:
indexer = StringIndexer(inputCols=categorical_cols, outputCols=[c+'_idx' for c in categorical_cols]) # Создаем объект
# StringIndexer для преобразования категориальных признаков в числовые индексы.
df = indexer.fit(df).transform(df) # Обучаем и применяем StringIndexer к DataFrame.

cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))] # Получаем список столбцов, которые были
                                                                              # преобразованы в индексы.
df.select(cols).show(3) # Показываем первые 3 строки преобразованных категориальных признаков.

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
|       NEAR BAY|                3.0|
+---------------+-------------------+
only showing top 3 rows



Теперь можно применить технику OHE.

In [13]:
# Создаем объект OneHotEncoder для кодирования индексов в one-hot представление.
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols], outputCols=[c+'_ohe' for c in categorical_cols])
df = encoder.fit(df).transform(df) # Обучаем и применяем OneHotEncoder к DataFrame.

cols = [c for c in df.columns for i in categorical_cols if (c.startswith(i))] # Получаем список столбцов, которые были
                                                                              # закодированы в one-hot представление.
df.select(cols).show(3) # Показываем первые 3 строки закодированных категориальных признаков.

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
|       NEAR BAY|                3.0|      (4,[3],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows



####Выводы
Перевели в векторный вид категориальную колонку.

###Преобразование числовых признаков
Для числовых признаков тоже нужна трансформация — шкалирование значений — чтобы сильные выбросы не смещали предсказания модели. Применим StandardScaler.

Соберём вектор числовых признаков в отдельный столбец.

In [14]:
# Создаем объект VectorAssembler для объединения числовых признаков в один векторный столбец 'numerical_features'.
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol='numerical_features')
df = numerical_assembler.transform(df) # Применяем VectorAssembler к DataFrame для создания нового столбца
                                       # с векторными признаками.

Применим к столбцу с вектором числовых признаков StandardScaler.

In [17]:
# Создаем объект StandardScaler для стандартизации векторных признаков.
standardScaler = StandardScaler(inputCol='numerical_features', outputCol='numerical_features_scaled')
df = standardScaler.fit(df).transform(df) # Обучаем StandardScaler на данных и применяем его для стандартизации.
df.printSchema() # Выводим схему DataFrame, чтобы увидеть обновленные типы данных после добавления новых столбцов.

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_idx: double (nullable = false)
 |-- ocean_proximity_ohe: vector (nullable = true)
 |-- numerical_features: vector (nullable = true)
 |-- numerical_features_scled: vector (nullable = true)
 |-- numerical_features_scaled: vector (nullable = true)



Финальный шаг — собрать трансформированные категорийные и числовые признаки с помощью VectorAssembler.

In [18]:
all_features = ['ocean_proximity_ohe','numerical_features_scaled'] # Определяем список всех признаков, которые будем
                                                                   # использовать для обучения модели.
final_assembler = VectorAssembler(inputCols=all_features, outputCol='features') # Создаем объект VectorAssembler для
                                                          # объединения всех признаков в один векторный столбец 'features'.
df = final_assembler.transform(df) # Применяем финальный VectorAssembler к DataFrame для создания нового столбца с
                                   # объединенными признаками.

In [19]:
df.select('features').show(3) # Показываем первые 3 строки столбца 'features' для проверки результата объединения признаков.
df.select('numerical_features_scaled').show(3) # Показываем первые 3 строки столбца 'numerical_features_scaled' для
                                               # проверки стандартизации.

+--------------------+
|            features|
+--------------------+
|[0.0,0.0,0.0,1.0,...|
|[0.0,0.0,0.0,1.0,...|
|[0.0,0.0,0.0,1.0,...|
+--------------------+
only showing top 3 rows

+-------------------------+
|numerical_features_scaled|
+-------------------------+
|     [-61.007269596069...|
|     [-61.002278409814...|
|     [-61.012260782324...|
+-------------------------+
only showing top 3 rows



####Выводы
Мы преобразовали числовые признаки в вектора, объединили с категориальным и создали два набора данных. Один включающий все признаки, второй только количественные.

## Обучение моделей
Разделим наши наборы данных на тренировочные и тестовые.

In [20]:
train, test = df.randomSplit([.8,.2], seed=SEED) # Разделяем данные на обучающую (80%) и тестовую (20%) выборки с
                                                 # фиксированным значением генератора случайных чисел.
print(train.count(), test.count()) # Выводим количество строк в обучающей и тестовой выборках для проверки.

16573 4067


Создадим и обучим оценщик LinearRegression.

In [23]:
# Создаем объект линейной регрессии с заданными параметрами для модели, использующей все признаки.
lr_all = LinearRegression(labelCol=target, featuresCol='features', maxIter=10, regParam=0.3, elasticNetParam=0.8)
model_all = lr_all.fit(train) # Обучаем модель линейной регрессии на обучающей выборке.

lr_num = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled',
                          maxIter=10, regParam=0.3, elasticNetParam=0.8) # Создаем объект линейной регрессии для модели,
                                                                # использующей только стандартизированные числовые признаки.
model_num = lr_num.fit(train) # Обучаем модель линейной регрессии на обучающей выборке.

Посмотрим на предсказания моделей, со всеми признаками:

In [24]:
predictions_all = model_all.transform(test) # Применяем обученную модель ко всем признакам на тестовой выборке для
                                            # получения предсказаний.
predicted_all = predictions_all.select('median_house_value', 'prediction') # Извлекаем реальные значения и предсказания из
                                                                           # тестовой выборки.
predicted_all.show(10) # Показываем первые 10 строк с реальными значениями и предсказаниями для проверки.

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          106700.0| 231885.9358476114|
|           78300.0|155799.70800574054|
|           70000.0| 166422.5769561075|
|           70500.0| 161990.8711288469|
|           86400.0|200405.75004111463|
|           60000.0| 159650.6852665979|
|          105900.0|164879.29050004017|
|          109400.0| 200361.7137626144|
|           85100.0|193512.05173163512|
|           92700.0|202673.46662855404|
+------------------+------------------+
only showing top 10 rows



И только с числовыми признаками:

In [25]:
predictions_num = model_num.transform(test) # Применяем обученную модель с числовыми признаками на тестовой выборке для
                                            # получения предсказаний.
predicted_num = predictions_num.select('median_house_value', 'prediction') # Извлекаем реальные значения и предсказания из
                                                                           # тестовой выборки.
predicted_num.show(10) # Показываем первые 10 строк с реальными значениями и предсказаниями для проверки.

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          106700.0|192166.85561598092|
|           78300.0| 85574.56148353405|
|           70000.0|114982.99945464544|
|           70500.0|110024.14066426875|
|           86400.0|153491.36768210493|
|           60000.0| 110568.8321413449|
|          105900.0|  88779.0739096175|
|          109400.0|122012.70780284284|
|           85100.0|148191.54561184114|
|           92700.0| 154945.2598633254|
+------------------+------------------+
only showing top 10 rows



####Выводы
Мы разделили данные на обучающие и тестовые выборки, обучили модели линейной регрессии и посмотрели на их предсказания.

## Анализ результатов
Сравним результаты работы линейной регрессии на двух наборах данных по метрикам RMSE, MAE и R2.

In [26]:
training_summary_all = model_all.summary # Получаем сводку о модели, обученной на всех признаках.
print('RMSE: %f' % training_summary_all.rootMeanSquaredError) # Выводим корень среднеквадратичной ошибки (RMSE) для
                                                              # модели с использованием всех признаков.
print('MAE: %f' % training_summary_all.meanAbsoluteError) # Выводим среднюю абсолютную ошибку (MAE) для модели
                                                          # с использованием всех признаков.
print('R2: %f' % training_summary_all.r2) #Выводим коэффициент детерминации (R²) для модели с использованием всех признаков.

RMSE: 69003.554642
MAE: 50038.513122
R2: 0.640441


In [27]:
training_summary_num = model_num.summary # Получаем сводку о модели, обученной только на числовых признаках.
print('RMSE: %f' % training_summary_num.rootMeanSquaredError) # Выводим RMSE для модели с использованием числовых признаков.
print('MAE: %f' % training_summary_num.meanAbsoluteError) # Выводим MAE для модели с использованием числовых признаков.
print('R2: %f' % training_summary_num.r2) # Выводим R² для модели с использованием числовых признаков.

RMSE: 69806.260737
MAE: 50949.855170
R2: 0.632027


RMSE и MAE измеряют разницу между предсказанным значением медианной цены дома и его реальной ценой. Реальная цена по набору лежит в диапазоне от 15000 до 500000. Ошибки в случае с использованием всего набора данных чуть меньше. Но не существенно.

Коэффициент детерминации, показывающий долю корректных предсказаний нашей модели выше у модели учитывающей все признаки.

Проверим наши модели на тестовых данных.

In [28]:
test_summary_all = model_all.evaluate(test) # Оцениваем модель с использованием всех признаков на тестовой выборке.
print('RMSE: %f' % test_summary_all.rootMeanSquaredError) # Выводим RMSE для тестовой выборки модели с использованием
                                                          # всех признаков.
print('MAE: %f' % test_summary_all.meanAbsoluteError) # Выводим MAE для тестовой выборки модели с использованием
                                                      # всех признаков
print('R2: %f' % test_summary_all.r2) # Выводим R² для тестовой выборки модели с использованием всех признаков.

RMSE: 70940.383666
MAE: 50393.890118
R2: 0.630216


In [30]:
test_summary_num = model_num.evaluate(test) # Оцениваем модель с использованием числовых признаков на тестовой выборке.
print('RMSE: %f' % test_summary_num.rootMeanSquaredError) # Выводим RMSE для тестовой выборки модели с использованием
                                                          # числовых признаков.
print('MSE: %f' % test_summary_num.meanAbsoluteError) # Выводим MAE для тестовой выборки модели с использованием
                                                      # числовых признаков.
print('R2: %f' % test_summary_num.r2) # Выводим R² для тестовой выборки модели с использованием числовых признаков.

RMSE: 71560.050901
MSE: 51063.192068
R2: 0.623728


####Выводы
Мы проверили результаты работы линейной регрессии на обучающих и тестовых наборах данных.

Вычислили по три метрики для всех результатов: RMSE, MAE, R2.

В целом модели показывают неплохой результат. На тестовом наборе данных лучший результат показала модель обученная как на количественных так и на категориальном признаках.

На тестовом наборе все метрики ухудшились, но незначительно. Можем считать что модели не переобучились.

In [31]:
spark.stop() # Завершаем сессию Spark, освобождая ресурсы.