# Проект: Предсказание стоимости жилья в Калифорнии с использованием Apache Spark

## Введение

Проект посвящён задаче предсказания медианной стоимости жилья в жилых массивах Калифорнии. В рамках работы на практике исследуются возможности фреймворка **Apache Spark** и библиотеки **MLlib** для обработки и анализа данных, а также для обучения и оценки качества моделей машинного обучения в распределённой среде.

## Цель проекта
Целью проекта является построение модели линейной регрессии, способной предсказывать медианную стоимость жилья в жилом массиве на основе данных о жилье в Калифорнии в 1990 году. Также будет проведено сравнение качества моделей, построенных на полном и сокращённом наборах признаков, с использованием метрик RMSE, MAE и R².

## Описание данных

В проекте используется датасет с данными о жилых массивах Калифорнии за 1990 год. В таблице содержатся следующие признаки:
* longitude — долгота;
* latitude — широта;
* housing_median_age — медианный возраст жителей жилого массива;
* total_rooms — общее количество комнат в домах жилого массива;
* total_bedrooms — общее количество спален в домах жилого массива;
* population — количество человек, которые проживают в жилом массиве;
* households — количество домовладений в жилом массиве;
* median_income — медианный доход жителей жилого массива;
* ocean_proximity — близость к океану (категориальный признак);
* median_house_value — медианная стоимость дома в жилом массиве (целевой признак).

## План исследования

1. [Инициализация Spark-сессии](#link1)
2. [Загрузка и просмотр данных](#link2)
3. [Подготовка данных](#link3)
4. [Обучение моделей](#link4)
5. [Оценка качества моделей](#link5)
6. [Вывод](#link6)

## Подготовка рабочей среды

In [1]:
# импортируем библиотеки
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, Imputer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import avg, round, mean, stddev
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml import Pipeline

# зададим константу
RANDOM_SEED = 42

In [2]:
# настройка широкоформатного вывода
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

<a class='anchor' id="link1"></a>
## 1 Инициализация Spark-сессии

Инициализируем локальную Spark-сессию.

In [3]:
spark = SparkSession.builder \
    .master("local")\
    .appName("California Housing Linear Regression") \
    .getOrCreate()

In [4]:
# уберем лишние предупреждения
spark.sparkContext.setLogLevel("ERROR")

<a class='anchor' id="link2"></a>
## 2 Загрузка и просмотр данных

Загрузим данные, посмотрим первые строки датафрейма.

In [5]:
df = spark.read.csv("/datasets/housing.csv", header=True, inferSchema=True)

                                                                                

In [6]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Посмотрим структуру датафрейма.

In [7]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Выведем общую статистику по всем признакам.

In [8]:
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


In [9]:
df.describe().show(truncate=False, vertical=True)

[Stage 5:>                                                          (0 + 1) / 1]

-RECORD 0---------------------------------
 summary            | count               
 longitude          | 20640               
 latitude           | 20640               
 housing_median_age | 20640               
 total_rooms        | 20640               
 total_bedrooms     | 20433               
 population         | 20640               
 households         | 20640               
 median_income      | 20640               
 median_house_value | 20640               
 ocean_proximity    | 20640               
-RECORD 1---------------------------------
 summary            | mean                
 longitude          | -119.56970445736148 
 latitude           | 35.6318614341087    
 housing_median_age | 28.639486434108527  
 total_rooms        | 2635.7630813953488  
 total_bedrooms     | 537.8705525375618   
 population         | 1425.4767441860465  
 households         | 499.5396802325581   
 median_income      | 3.8706710029070246  
 median_house_value | 206855.81690891474  
 ocean_prox

                                                                                

In [10]:
# сохраним среднюю стоимость домов и стандартное отклонение для оценки качества моделей
result = df.select(
    mean("median_house_value").alias("mean_mhv"),
    stddev("median_house_value").alias("stddev_mhv")
).collect()[0]

mean_median_house_value = result["mean_mhv"]
stddev_median_house_value = result["stddev_mhv"]

print(f"mean = {mean_median_house_value}")
print(f"stddev = {stddev_median_house_value}")

mean = 206855.81690891474
stddev = 115395.61587441359


In [11]:
na = df.count() - 20433
print(f"Количество пропусков:       {na}")
print(f"Процент строк с пропусками: {na/df.count()*100:.2f}%")

Количество пропусков:       207
Процент строк с пропусками: 1.00%


**Промежуточный итог:**

Датасет содержит 20 640 строк и 10 признаков. Типы данных соответствуют содержимому столбцов: числовые признаки представлены вещественными значениями, категориальный признак ocean_proximity - строковым типом.

Один процент значений в столбце total_bedrooms пропущен. Строки с пропусками предлагается заполнить средними значениями на этапе предобработки.

Рассчитаны среднее значение и стандартное отклонение целевого признака median_house_value, которые позволят интерпретировать значения ошибок моделей в относительном масштабе.

<a class='anchor' id="link3"></a>
## 3 Подготовка данных

Построим пайплайн для предобработки данных.

In [12]:
# Разделим признаки на группы:
# категориальные признаки
categorical_cols = ['ocean_proximity']
# количественные признаки
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 
                   'total_bedrooms', 'population', 'households', 'median_income']
# целевой признак
target = 'median_house_value' 

In [13]:
# Заполнение пропусков средними значениями
imputer = Imputer(
    inputCols=["total_bedrooms"],
    outputCols=["total_bedrooms"],
    strategy="mean"
)

# Индексация категориальных признаков
indexers = [StringIndexer(inputCol=col, outputCol=col + "_idx") for col in categorical_cols]

# OHE
encoders = [OneHotEncoder(inputCol=col + "_idx", outputCol=col + "_ohe") for col in categorical_cols]

# Вектор категориальных признаков
categorical_assembler = VectorAssembler(
    inputCols=[col + "_ohe" for col in categorical_cols],
    outputCol="categorical_features"
)

# Вектор числовых признаков
numerical_assembler = VectorAssembler(
    inputCols=numerical_cols,
    outputCol="numerical_features"
)

# Масштабирование
scaler = StandardScaler(
    inputCol="numerical_features",
    outputCol="numerical_features_scaled",
    withMean=True,         # Центрируем (отнимаем среднее)
    withStd=True           # Масштабируем (делим на стандартное отклонение)
)

# Вектор всех признаков
assembler_all = VectorAssembler(
    inputCols=["categorical_features", "numerical_features_scaled"],
    outputCol="features"
)

# Вектор только числовых признаков
assembler_num = VectorAssembler(
    inputCols=["numerical_features_scaled"],
    outputCol="features_num"
)

# Объединяем все шаги в Pipeline
pipeline = Pipeline(stages=[
    imputer,
    *indexers,
    *encoders,
    categorical_assembler,
    numerical_assembler,
    scaler,
    assembler_all,
    assembler_num
])

Разобъем датафрейм на обучающую и тестовую выборки и предобработаем данные.

In [14]:
# Разделяем датафрейм на train, test
train_unprepared, test_unprepared = df.randomSplit([0.75, 0.25], seed=RANDOM_SEED)

# Обучаем пайплайн на train
pipeline_model = pipeline.fit(train_unprepared)

# Применяем к train и test
train_data = pipeline_model.transform(train_unprepared)
test_data = pipeline_model.transform(test_unprepared)

                                                                                

Проверим, нет ли неучтенных категорий в тестовой выборке.

In [15]:
# Уникальные значения в train и test
train_cats = train_unprepared.select("ocean_proximity").distinct().rdd.flatMap(lambda x: x).collect()
test_cats = test_unprepared.select("ocean_proximity").distinct().rdd.flatMap(lambda x: x).collect()

unexpected = set(test_cats) - set(train_cats)
if not unexpected:
    print("Неучтенных категорий в test нет")
else:
    print("Неучтенные категории в test:", unexpected)



Неучтенных категорий в test нет


                                                                                

**Прочежуточный итог:**<br>
Данные были успешно подготовлены для обучения моделей с использованием LinearRegression из MLlib. <br>
Для предобработки данных был реализован единый Pipeline, включающий все необходимые этапы трансформации. Пропущенные значения в числовом признаке total_bedrooms были заполнены средними значениями с использованием трансформера Imputer.<br>
Категориальный признак ocean_proximity был преобразован в числовой формат с помощью StringIndexer и OneHotEncoder. Числовые признаки были собраны в вектор и масштабированы с использованием StandardScaler с центрированием и нормализацией.<br>
Были сформированы два набора признаков: полный набор и сокращённый набор, содержащий только числовые признаки.


<a class='anchor' id="link4"></a>
## 4 Обучение моделей

Проведем подбор гиперпараметров моделей. Оценивать модели будем по метрике RMSE.

In [16]:
evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")

Обучение модели на полном наборе данных.

In [17]:
lr_all = LinearRegression(labelCol=target, featuresCol="features")

paramGrid_all = ParamGridBuilder() \
    .addGrid(lr_all.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr_all.fitIntercept, [False, True])\
    .addGrid(lr_all.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

tvs_all = TrainValidationSplit(estimator=lr_all,
                               estimatorParamMaps=paramGrid_all,
                               evaluator=evaluator,
                               trainRatio=0.8,
                               seed=RANDOM_SEED)

model_all = tvs_all.fit(train_data)
pred_all = model_all.transform(test_data)

best_model = model_all.bestModel
print("Гиперпараметры модели со всеми признаками:")
print("    regParam:", best_model._java_obj.getRegParam())
print("    elasticNetParam:", best_model._java_obj.getElasticNetParam())
print("    fitIntercept:", best_model._java_obj.getFitIntercept())

                                                                                

Гиперпараметры модели со всеми признаками:
    regParam: 1.0
    elasticNetParam: 1.0
    fitIntercept: False


Обучение модели на числовых признаках.

In [18]:
lr_num = LinearRegression(labelCol=target, featuresCol="features_num")

paramGrid_num = ParamGridBuilder() \
    .addGrid(lr_num.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr_num.fitIntercept, [False, True])\
    .addGrid(lr_num.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

tvs_num = TrainValidationSplit(estimator=lr_num,
                               estimatorParamMaps=paramGrid_num,
                               evaluator=evaluator,
                               trainRatio=0.8,
                               seed=RANDOM_SEED)

model_num = tvs_num.fit(train_data)
pred_num = model_num.transform(test_data)

best_model = model_num.bestModel
print("Гиперпараметры модели только с числовыми признаками:")
print("    regParam:", best_model._java_obj.getRegParam())
print("    elasticNetParam:", best_model._java_obj.getElasticNetParam())
print("    fitIntercept:", best_model._java_obj.getFitIntercept())

Гиперпараметры модели только с числовыми признаками:
    regParam: 0.1
    elasticNetParam: 0.5
    fitIntercept: True


**Промежуточный итог:**

Обучены две модели линейной регрессии с использованием библиотеки MLlib:
* модель на полном наборе признаков
* модель только на числовых признаках

Для обеих моделей был выполнен подбор гиперпараметров (regParam, elasticNetParam, fitIntercept) с помощью TrainValidationSplit.

<a class='anchor' id="link5"></a>
## 5 Оценка качества моделей

In [19]:
evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction")

def evaluate(predictions, name=""):
    rmse = evaluator.setMetricName("rmse").evaluate(predictions)
    mae = evaluator.setMetricName("mae").evaluate(predictions)
    r2 = evaluator.setMetricName("r2").evaluate(predictions)
    print(f"""{name}: \n    RMSE: {rmse:.2f} \n    MAE:  {mae:.2f} \n    R²:   {r2:.2f}
    RMSE/stddev: {rmse/stddev_median_house_value:.2f} (Среднеквадратичная ошибка / Стандартный разброс цен)
    MAE/mean:    {mae/mean_median_house_value:.2f} (Средняя абсолютная ошибка / Средняя цена)\n""")

evaluate(pred_all, "Модель со всеми признаками")
evaluate(pred_num, "Модель только с числовыми признаками")

Модель со всеми признаками: 
    RMSE: 69937.27 
    MAE:  50436.73 
    R²:   0.64
    RMSE/stddev: 0.61 (Среднеквадратичная ошибка / Стандартный разброс цен)
    MAE/mean:    0.24 (Средняя абсолютная ошибка / Средняя цена)

Модель только с числовыми признаками: 
    RMSE: 70640.40 
    MAE:  51224.95 
    R²:   0.63
    RMSE/stddev: 0.61 (Среднеквадратичная ошибка / Стандартный разброс цен)
    MAE/mean:    0.25 (Средняя абсолютная ошибка / Средняя цена)



<a class='anchor' id="link6"></a>
## Вывод

**Целью проекта** было построить модели линейной регрессии для предсказания медианной стоимости жилья в Калифорнии на основе данных переписи 1990 года с использованием Apache Spark и MLlib.

**Результаты**:
* Чтение данных, предобработка данных, обучение и оценка качества моделей проводилась с использованием Apache Spark и MLlib.
* Обучены две модели линейной регрессии: на полном наборе признаков и только на числовых признаках.
* Обе модели продемонстрировали приемлемую способность предсказывать медианную стоимость жилья.
* Модель с полным набором признаков показала лучшие метрики RMSE, MAE и R² по сравнению с моделью на числовых данных.

Проект продемонстрировал возможности **Apache Spark** для обработки и анализа больших данных с помощью библиотеки **MLlib**.

**Рекомендации:**
* Провести исследовательский анализ данных для выявления зависимостей между признаками и целевой переменной.
* Провести преобразование нелинейных признаков для улучшения работы линейной модели.
* Использовать нелинейные модели, способные учитывать сложные зависимости между признаками.