# **Предсказание стоимости жилья в Spark**

В проекте нам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. 

На основе данных нужно предсказать медианную стоимость дома в жилом массиве. 

Обучим модель и сделаем предсказания на тестовой выборке. 

Для оценки качества модели используем метрики RMSE, MAE и R2.

## Подготовка рабочего пространства: импорт библиотек

In [None]:
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import when, expr, col, isnan, isnull

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, Imputer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# One Hot Encoding для категориальной колонки ocean_proximity
pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder as Encoder
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator as Encoder

### Инициализация Spark-сессии

Инициализируем локальную Spark-сессию.

In [2]:
spark = SparkSession.builder \
                    .master('local') \
                    .appName('California') \
                    .getOrCreate()

### Знакомство с данными

Прочитаем содержимое файла с данными.

In [3]:
df = spark.read.csv('/datasets/housing.csv', header=True, inferSchema=True)

                                                                                

In [4]:
df.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Выведем типы данных колонок датасета. Используем методы pySpark.

In [5]:
print(pd.DataFrame(df.dtypes, columns=['column', 'type']))

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


# Подготовка и предобработка данных

Выполним предобработку данных

### Разделение данных на обучающую и тестовую выборки

Разделим данные на обучающую и тестовую выборки ПЕРЕД любыми преобразованиями для предотвращения утечки данных.

In [6]:
# Разделение данных на обучающую и тестовую выборки
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

print(f"Размер обучающей выборки: {train_df.count()}")
print(f"Размер тестовой выборки: {test_df.count()}")

                                                                                

Размер обучающей выборки: 16560
Размер тестовой выборки: 4080


### Пропуски: поиск и обработка

Исследуем данные на наличие пропусков.

In [7]:
# Проверка пропусков
columns = df.columns
for column in columns:
    null_count = df.filter(df[column].isNull()).count()
    print(f'{column}: {null_count} пропущенных значений')

longitude: 0 пропущенных значений
latitude: 0 пропущенных значений
housing_median_age: 0 пропущенных значений
total_rooms: 0 пропущенных значений
total_bedrooms: 207 пропущенных значений
population: 0 пропущенных значений
households: 0 пропущенных значений
median_income: 0 пропущенных значений
median_house_value: 0 пропущенных значений
ocean_proximity: 0 пропущенных значений


## Создание Pipeline для предобработки и обучения моделей

Создадим Pipeline, который включает все этапы предобработки данных и обучения модели. Это гарантирует правильный порядок операций и предотвращает утечку данных.

In [8]:
# Определяем числовые колонки
numerical_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 
                  'total_bedrooms', 'population', 'households', 'median_income']
target = 'median_house_value'

print(f"Числовые признаки: {numerical_cols}")
print(f"Целевая переменная: {target}")

Числовые признаки: ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']
Целевая переменная: median_house_value


### Модель 1: Линейная регрессия с использованием всех признаков

In [None]:
# МОДЕЛЬ 1: Все признаки (числовые + категориальные)
# Этап 1: Обработка пропусков
imputer1 = Imputer(
    inputCols=["total_bedrooms"], 
    outputCols=["total_bedrooms"],
    strategy="median"
)

# Этап 2: Преобразование категориальных признаков
indexer1 = StringIndexer(handleInvalid='skip', inputCol='ocean_proximity', outputCol='ocean_proximity_index')
encoder1 = Encoder(inputCol='ocean_proximity_index', outputCol='ocean_proximity_encoded')

# Этап 3: Объединение признаков
assembler1 = VectorAssembler(
    inputCols=numerical_cols + ['ocean_proximity_encoded'], 
    outputCol='features'
)

# Этап 4: Модель
lr1 = LinearRegression(featuresCol='features', labelCol=target)

# Создание и обучение Pipeline
pipeline1 = Pipeline(stages=[imputer1, indexer1, encoder1, assembler1, lr1])
model1 = pipeline1.fit(train_df)  # Обучение ТОЛЬКО на train данных

print("Модель 1 (все признаки) обучена успешно")

26/01/26 08:18:55 WARN Instrumentation: [f65bd6b7] regParam is zero, which might cause numerical instability and overfitting.
26/01/26 08:18:56 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
26/01/26 08:18:56 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
26/01/26 08:18:56 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
26/01/26 08:18:56 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

Модель 1 (все признаки) обучена успешно


In [10]:
# Применение модели к данным
train_predictions1 = model1.transform(train_df)
test_predictions1 = model1.transform(test_df)

print("Предсказания модели 1 получены")

Предсказания модели 1 получены


### Модель 2: Линейная регрессия только с числовыми признаками

In [11]:
# МОДЕЛЬ 2: Только числовые признаки
# Этап 1: Обработка пропусков
imputer2 = Imputer(
    inputCols=["total_bedrooms"], 
    outputCols=["total_bedrooms"],
    strategy="median"
)

# Этап 2: Объединение признаков
assembler2 = VectorAssembler(
    inputCols=numerical_cols, 
    outputCol='features'
)

# Этап 3: Модель
lr2 = LinearRegression(featuresCol='features', labelCol=target)

# Создание и обучение Pipeline
pipeline2 = Pipeline(stages=[imputer2, assembler2, lr2])
model2 = pipeline2.fit(train_df)  # Обучение ТОЛЬКО на train данных

print("Модель 2 (только числовые признаки) обучена успешно")

26/01/26 08:19:00 WARN Instrumentation: [f36d0de4] regParam is zero, which might cause numerical instability and overfitting.


Модель 2 (только числовые признаки) обучена успешно


In [12]:
# Применение модели к данным
train_predictions2 = model2.transform(train_df)
test_predictions2 = model2.transform(test_df)

print("Предсказания модели 2 получены")

Предсказания модели 2 получены


## Оценка качества моделей

In [13]:
# Оценка модели 1
evaluator = RegressionEvaluator(labelCol=target, predictionCol='prediction')

test_rmse1 = evaluator.evaluate(test_predictions1, {evaluator.metricName: "rmse"})
test_mae1 = evaluator.evaluate(test_predictions1, {evaluator.metricName: "mae"})
test_r21 = evaluator.evaluate(test_predictions1, {evaluator.metricName: "r2"})

train_rmse1 = evaluator.evaluate(train_predictions1, {evaluator.metricName: "rmse"})
train_mae1 = evaluator.evaluate(train_predictions1, {evaluator.metricName: "mae"})
train_r21 = evaluator.evaluate(train_predictions1, {evaluator.metricName: "r2"})

print("\n=== МОДЕЛЬ 1: Все признаки ===")
print(f"train_RMSE: {train_rmse1:.2f} / test_RMSE: {test_rmse1:.2f}")
print(f"train_MAE: {train_mae1:.2f} / test_MAE: {test_mae1:.2f}")
print(f"train_R²: {train_r21:.4f} / test_R²: {test_r21:.4f}")


=== МОДЕЛЬ 1: Все признаки ===
train_RMSE: 68275.74 / test_RMSE: 70786.68
train_MAE: 49529.49 / test_MAE: 50863.76
train_R²: 0.6463 / test_R²: 0.6378


In [14]:
# Оценка модели 2
test_rmse2 = evaluator.evaluate(test_predictions2, {evaluator.metricName: "rmse"})
test_mae2 = evaluator.evaluate(test_predictions2, {evaluator.metricName: "mae"})
test_r22 = evaluator.evaluate(test_predictions2, {evaluator.metricName: "r2"})

train_rmse2 = evaluator.evaluate(train_predictions2, {evaluator.metricName: "rmse"})
train_mae2 = evaluator.evaluate(train_predictions2, {evaluator.metricName: "mae"})
train_r22 = evaluator.evaluate(train_predictions2, {evaluator.metricName: "r2"})

print("\n=== МОДЕЛЬ 2: Только числовые признаки ===")
print(f"train_RMSE: {train_rmse2:.2f} / test_RMSE: {test_rmse2:.2f}")
print(f"train_MAE: {train_mae2:.2f} / test_MAE: {test_mae2:.2f}")
print(f"train_R²: {train_r22:.4f} / test_R²: {test_r22:.4f}")


=== МОДЕЛЬ 2: Только числовые признаки ===
train_RMSE: 69208.47 / test_RMSE: 71791.60
train_MAE: 50642.47 / test_MAE: 51804.75
train_R²: 0.6366 / test_R²: 0.6275


## Сравнение результатов моделей

In [15]:
# Сравнение результатов
print("\n=== СРАВНЕНИЕ МОДЕЛЕЙ ===")
print(f"{'Метрика':<10} {'Все признаки':<15} {'Только числовые':<15} {'Разница':<10}")
print("-" * 55)
print(f"{'RMSE':<10} {test_rmse1:<15.2f} {test_rmse2:<15.2f} {test_rmse1-test_rmse2:<10.2f}")
print(f"{'MAE':<10} {test_mae1:<15.2f} {test_mae2:<15.2f} {test_mae1-test_mae2:<10.2f}")
print(f"{'R²':<10} {test_r21:<15.4f} {test_r22:<15.4f} {test_r21-test_r22:<10.4f}")


=== СРАВНЕНИЕ МОДЕЛЕЙ ===
Метрика    Все признаки    Только числовые Разница   
-------------------------------------------------------
RMSE       70786.68        71791.60        -1004.91  
MAE        50863.76        51804.75        -940.99   
R²         0.6378          0.6275          0.0104    


In [16]:
print("\n=== ВЫВОДЫ ===")
if test_rmse1 < test_rmse2:
    print("✓ Модель с категориальными признаками показывает лучший RMSE")
    print(f"  Улучшение: {test_rmse2-test_rmse1:.0f} долларов")
else:
    print("✓ Модель только с числовыми признаками показывает лучший RMSE")

if test_r21 > test_r22:
    print("✓ Модель с категориальными признаками объясняет больше вариации")
    print(f"  Улучшение R²: {test_r21-test_r22:.4f}")
else:
    print("✓ Модель только с числовыми признаками объясняет больше вариации")


=== ВЫВОДЫ ===
✓ Модель с категориальными признаками показывает лучший RMSE
  Улучшение: 1005 долларов
✓ Модель с категориальными признаками объясняет больше вариации
  Улучшение R²: 0.0104


## Анализ и интерпретация результатов

### Качество предсказаний:

**RMSE (Root Mean Square Error):**
- Показывает среднеквадратичную ошибку предсказания в долларах
- Модель с категориальными признаками показывает лучший результат
- Улучшение составляет около 1,000 долларов (~1.4% точнее)

**MAE (Mean Absolute Error):**
- Показывает среднюю абсолютную ошибку предсказания
- Категориальные признаки уменьшают типичную ошибку предсказания

**R² (коэффициент детерминации):**
- Показывает долю объясненной вариации в данных
- Модель с категориальными признаками объясняет ~63.8% вариации
- Модель только с числовыми признаками объясняет ~62.8% вариации

### Интерпретация:

1. **Категориальный признак `ocean_proximity` значим** - близость к океану влияет на цены недвижимости в Калифорнии

2. **Улучшение умеренное** - числовые признаки (локация, размер, доход) уже хорошо объясняют цены

3. **Обе модели показывают приемлемое качество** - R² ~63-64% для задачи предсказания цен недвижимости

### Практические выводы:

- **Средняя ошибка ~71 тысячи долларов** при медианной цене ~200-300 тысяч (относительная ошибка ~25%)
- **Включение категориальных признаков оправдано** - небольшое, но стабильное улучшение
- **Географическое расположение относительно океана** - важный фактор ценообразования

**Рекомендация:** использовать модель со всеми признаками для лучшей точности предсказаний.

## Закрытие Spark сессии

In [17]:
# Закрываем сессию
spark.stop()
print("Spark сессия закрыта")

Spark сессия закрыта
