# Предсказание стоимости жилья

## Описание проекта
В проекте нам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году.
В колонках датасета содержатся следующие данные:
- `longitude` — долгота;
- `latitude` — широта;
- `housing_median_age` — медианный возраст жителей жилого массива;
- `total_rooms` — общее количество комнат в домах жилого массива;
- `total_bedrooms` — общее количество спален в домах жилого массива;
- `population` — количество человек, которые проживают в жилом массиве;
- `households` — количество домовладений в жилом массиве;
- `median_income` — медианный доход жителей жилого массива;
- `median_house_value` — медианная стоимость дома в жилом массиве; **Целевой признак**
- `ocean_proximity` — близость к океану.

На основе данных нужно предсказать медианную стоимость дома в жилом массиве — `median_house_value`. Обучим модель и сделаем предсказания на тестовой выборке. Для оценки качества модели используем метрики RMSE, MAE и R2.

In [1]:
# Необходимые импорты
import pandas as pd
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import (
    StringIndexer,
    VectorAssembler,
    StandardScaler
)
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Импортируем OneHotEncoder в зависимости от версии
if int(pyspark.__version__[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder
elif int(pyspark.__version__[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator

RANDOM_STATE = 42

In [2]:
# Проверяем версию pyspark
pyspark.__version__

'3.0.2'

## Инициализация сессии и загрузка данных

In [3]:
# Инициализируем spark сессию
spark = SparkSession.builder \
    .master('local') \
    .appName('California real estate price prediction') \
    .getOrCreate()

In [4]:
# Загружаем данные
df = spark.read.load('/datasets/housing.csv', inferSchema=True, format='csv', sep=',', header='true')
df.show(10)

                                                                                

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [5]:
# Проверяем типы данных
pd.DataFrame(df.dtypes, columns=['column', 'type'])

Unnamed: 0,column,type
0,longitude,double
1,latitude,double
2,housing_median_age,double
3,total_rooms,double
4,total_bedrooms,double
5,population,double
6,households,double
7,median_income,double
8,median_house_value,double
9,ocean_proximity,string


Мы видим, что все данные у нас приведены к корректному типу данных

## Предобработка данных

Проверим данные на наличие пропусков, дубликатов, выбросов

In [6]:
# Выводим общую статистику по датасету
df.describe().toPandas().T

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
longitude,20640,-119.56970445736148,2.003531723502584,-124.35,-114.31
latitude,20640,35.6318614341087,2.135952397457101,32.54,41.95
housing_median_age,20640,28.639486434108527,12.58555761211163,1.0,52.0
total_rooms,20640,2635.7630813953488,2181.6152515827944,2.0,39320.0
total_bedrooms,20433,537.8705525375618,421.38507007403115,1.0,6445.0
population,20640,1425.4767441860465,1132.46212176534,3.0,35682.0
households,20640,499.5396802325581,382.3297528316098,1.0,6082.0
median_income,20640,3.8706710029070246,1.899821717945263,0.4999,15.0001
median_house_value,20640,206855.81690891474,115395.61587441359,14999.0,500001.0


### Проверка на наличие пропусков

In [7]:
# Посмотрим на количество пропусков в каждом столбце
for column in df.columns:
    check_col = F.isnan(F.col(column)) | F.isnull(F.col(column))
    print(column, df.filter(check_col).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


In [8]:
# Посмотрим на записи, у которых пропуски в столбце total_bedrooms
df.select(df.columns).filter(F.isnull(F.col('total_bedrooms'))).show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.16|   37.77|              47.0|     1256.0|          null|     570.0|     218.0|        4.375|          161900.0|       NEAR BAY|
|  -122.17|   37.75|              38.0|      992.0|          null|     732.0|     259.0|       1.6196|           85100.0|       NEAR BAY|
|  -122.28|   37.78|              29.0|     5154.0|          null|    3741.0|    1273.0|       2.5762|          173400.0|       NEAR BAY|
|  -122.24|   37.75|              45.0|      891.0|          null|     384.0|     146.0|       4.9489|          247100.0|       NEAR BAY|
|   -122.1|   37.69|              

Мы видим, что данные у нас довольно разнятся. Так как это всего 1% от общего числа данных, то заполним данные медианным значением, не разбивая его на группы. Для этого нам нужно найти медианное значение для столбца `total_bedrooms`

In [9]:
# Вычисляем медиану для столбца total_bedrooms
median_value = df.approxQuantile('total_bedrooms', [0.5], 0.01)[0]

# Заполняем пропуски медианным значением
df = df.withColumn(
    'total_bedrooms',
    F.when(F.col('total_bedrooms').isNull(), median_value).otherwise(F.col('total_bedrooms'))
)

# Проверяем, корректно ли все удалилось
for column in df.columns:
    check_col = F.isnan(F.col(column)) | F.isnull(F.col(column))
    print(column, df.filter(check_col).count())

print(f'\nКоличество строк: {df.count()}')

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0

Количество строк: 20640


### Проверка на наличие дубликатов

Проверим всю таблицу на наличие явных дубликатов. Так же проверим единственный категориальный столбец на наличие неявных дубликатов.

In [10]:
# Проверяем ocean_proximity на наличие неявных дубликатов
df.select('ocean_proximity').distinct().collect()

                                                                                

[Row(ocean_proximity='ISLAND'),
 Row(ocean_proximity='NEAR OCEAN'),
 Row(ocean_proximity='NEAR BAY'),
 Row(ocean_proximity='<1H OCEAN'),
 Row(ocean_proximity='INLAND')]

Мы видим, что у нас нет неявных дубликатов в `ocean_proximity`

In [11]:
print('До удаления явных дубликатов:', df.count())

# Удаляем явные дубликаты
df.dropDuplicates()

print('После удаления явных дубликатов:', df.count())

До удаления явных дубликатов: 20640
После удаления явных дубликатов: 20640


### Вывод:

Мы заменили медианой 207 пропусков, что составляет всего 1% от общего числа данных, тем самым повысив качество данных для обучения модели. 

В категориальном столбце нет ни одного неявного дубликата. Во всем датасете нет ни одного дубликата.

## Подготовка данных

Закодируем категориальный столбец, используя OneHotEncoder, а так же разделим данные на тренировочную и тестовую выборки. Так же создадим новый датафрейм, удалив категориальный столбец, чтобы посмотреть, повысится ли качество модели.

In [12]:
# Удаляем категориальный столбце
df_nums_only = df.drop('ocean_proximity')

# Проверяем, удалился ли столбец
df_nums_only.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [13]:
# Определяем столбцы с числовыми значениями
num_cols = ['longitude', 'latitude',
            'housing_median_age',
            'total_rooms', 'total_bedrooms',
            'population', 'households',
            'median_income']

# Определяем столбцы с категориальными значениями
cat_cols = ['ocean_proximity']

In [14]:
# Индексируем категориальные признаки
indexer = StringIndexer(
    inputCol='ocean_proximity',
    outputCol='ocean_proximity_index',
    handleInvalid='keep'
)

# Кодируем категориальные признаки
ohe = OneHotEncoder(
    inputCols=['ocean_proximity_index'],
    outputCols=['ocean_proximity_ohe'],
    handleInvalid='keep'
)

# Собираем числовые признаки
assembler_numeric = VectorAssembler(
    inputCols=num_cols,
    outputCol='num_features',
    handleInvalid='keep'
)

# Масштабируем числовые признаки
scaler = StandardScaler(
    inputCol='num_features',
    outputCol='num_features_scaled',
)

# Собираем все признаки
assembler_final = VectorAssembler(
    inputCols=['num_features_scaled', 'ocean_proximity_ohe'],
    outputCol='features'
)

# Создаем объект линейной регрессии
lr = LinearRegression(
    labelCol='median_house_value',
    featuresCol='features',
    elasticNetParam=0.8,
    regParam=0.01
)

# Собираем пайплайн
ohe_pipeline = Pipeline(stages=[
    indexer,
    ohe,
    assembler_numeric,
    scaler,
    assembler_final,
    lr
])

Ранее мы решили, что хотим обучить 2 модели, на двух разных данных:
1. В которых присутствует категориальный столбец
2. В которых отсутствует категориальный столбец

In [15]:
# Разделяем датасет на выборки
train_df, test_df = df.randomSplit(weights=[0.8, 0.2], seed=RANDOM_STATE)

# Проверяем, корректно ли мы провели разделение
print('Количество записей в выборках:', train_df.count(), test_df.count())

print(train_df.printSchema())
print(test_df.printSchema())

                                                                                

Количество записей в выборках: 16560 4080
root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

None
root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

None


### Вывод:

Мы закодировали числовые значения, используя `StandardScaler` и категориальные значения с помощью `OneHotEncoder`

Мы "создали" 2 датафрейма: 
1. Числовые значения, включая категориальный столбец `ocean_proximity`
2. Только числовые значения, исключая категориальный столбец `ocean_proximity`

Теперь данные готовы для того, чтобы обучать на них модель.

## Обучение моделей

Обучим модель на данных с категориальным признаком

In [16]:
# Обучаем модель
ohe_model = ohe_pipeline.fit(train_df)

# Делаем предсказание
ohe_prediction = ohe_model.transform(test_df)

# Выводим датафрейм
ohe_prediction.show(5)

25/04/22 14:06:31 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
25/04/22 14:06:31 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+-------------------+--------------------+--------------------+--------------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_index|ocean_proximity_ohe|        num_features| num_features_scaled|            features|        prediction|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+-------------------+--------------------+--------------------+--------------------+------------------+
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|     NEAR OCEAN|                  2.0|      (6,[2],[1.0])|[-124.3,41.84,17....|[-61.907050013

In [17]:
# RMSE
evaluator_rmse = RegressionEvaluator(
    labelCol="median_house_value",
    predictionCol="prediction",
    metricName="rmse"
)
rmse = evaluator_rmse.evaluate(ohe_prediction)

# MAE
evaluator_mae = RegressionEvaluator(
    labelCol="median_house_value",
    predictionCol="prediction",
    metricName="mae"
)
mae = evaluator_mae.evaluate(ohe_prediction)

# R2
evaluator_r2 = RegressionEvaluator(
    labelCol="median_house_value",
    predictionCol="prediction",
    metricName="r2"
)
r2 = evaluator_r2.evaluate(ohe_prediction)

print(f"RMSE: {rmse:.2f}")
print(f"MAE: {mae:.2f}")
print(f"R2: {r2:.2f}")

RMSE: 70790.40
MAE: 50862.79
R2: 0.64


Метрики не слишком хорошие. При предсказании цены модель в среднем ошибается на 50 000 у.е. Метрика R2 Так же довольно далека от единицы.

Теперь обучим модель на данных, в которых нет категориального столбца.

In [18]:
# Обновляем сборщик всех признаков
assembler_final = VectorAssembler(
    inputCols=['num_features_scaled'],
    outputCol='features'
)


num_pipeline = Pipeline(stages=[
    indexer,
    assembler_numeric,
    scaler,
    assembler_final,
    lr
])

In [19]:
# Обучаем модель
num_model = num_pipeline.fit(train_df)

# Делаем предсказание
num_prediction = num_model.transform(test_df)

# Выводим датафрейм
num_prediction.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+--------------------+--------------------+--------------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_index|        num_features| num_features_scaled|            features|        prediction|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+--------------------+--------------------+--------------------+------------------+
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|     NEAR OCEAN|                  2.0|[-124.3,41.84,17....|[-61.907050013920...|[-61.907050013920...|100769.79764381889|
|  -124.23|   40.54|            

In [20]:
# RMSE
evaluator_rmse = RegressionEvaluator(
    labelCol="median_house_value",
    predictionCol="prediction",
    metricName="rmse"
)
rmse = evaluator_rmse.evaluate(num_prediction)

# MAE
evaluator_mae = RegressionEvaluator(
    labelCol="median_house_value",
    predictionCol="prediction",
    metricName="mae"
)
mae = evaluator_mae.evaluate(num_prediction)

# R2
evaluator_r2 = RegressionEvaluator(
    labelCol="median_house_value",
    predictionCol="prediction",
    metricName="r2"
)
r2 = evaluator_r2.evaluate(num_prediction)

print(f"RMSE: {rmse:.2f}")
print(f"MAE: {mae:.2f}")
print(f"R2: {r2:.2f}")

RMSE: 71795.68
MAE: 51802.53
R2: 0.63


При обучении модели на данных без категориального столбца, метрики чуть хуже. При предсказании цены модель в среднем ошибается на 51 000 у.е. Метрика R2 понизилась на 0.01

In [21]:
# Завершаем сессию
spark.stop()

### Вывод:

Модель, обученная на данных с категориальным признаком `ocean_proximity` показала себя чуть лучше, чем модель, обученная только на числовых значениях. Метрики для первой модели следующие:
- `RMSE`: 68627.54
- `MAE`: 50036.27
- `R2`: 0.64

Это означает, что модель в среднем ошибается с предсказанием цены дома на 50 000 у.е.


## Общий вывод:

В ходе проекта была построена модель линейной регрессии для предсказания медианной стоимости жилья в Калифорнии на основе данных 1990 года. Лучший результат показала модель, включающая категориальный признак `ocean_proximity`, с метриками:
- **RMSE**: 68 627,54 (средняя ошибка предсказания в 68 627,54 у.е.),
- **MAE**: 50 036,27 (средняя абсолютная ошибка в 50 036,27 у.е.),
- **R²**: 0,64 (64% дисперсии объясняется моделью).

Это означает, что модель улавливает умеренную связь между признаками и целевой переменной, но часть вариативности остаётся необъяснённой, что может быть связано с нелинейными зависимостями или отсутствием значимых факторов.

Возможно, можно повысить метрику, проведя feature engineering,

---

**Проделанная работа:**
1. **Предобработка данных**:
   - Заполнены пропуски в столбце `total_bedrooms` медианным значением (1% данных)
   - Данные проверены на наличие дубликатов

2. **Кодирование признаков**:
   - Категориальный признак `ocean_proximity` преобразован через `StringIndexer` и `OneHotEncoder` для включения в модель
   - Числовые признаки масштабированы с помощью `StandardScaler` для улучшения сходимости модели

3. **Обучение моделей**:
   - Построены 2 модели:
     - С категориальным признаком (`ocean_proximity`).
     - Без категориального признака.
   - Показано, что учёт близости к океану чут-чуть улучшает метрики