# Проект по предсказанию стоимости жилья



## Цели исследования.

- Построить модель МО для для предсказания медианной стоимости дома в жилом массиве , используя методы *MLlib и PySpark*

## Ход исследования.

- Инициализировать локальную Spark-сессию для работы с данными.


- Загрузить данные из файла и изучить их структуру методами PySpark.


- Проверить данные на наличие пропусков и заполнить их выбранными значениями.


- Преобразовать категориальные переменные с помощью техники One Hot Encoding.


- Обучить две модели:на полном наборе данных (включая категориальные признаки),только на числовых переменных (исключив категориальные). Использовать оценщик LinearRegression из библиотеки MLlib.

- Проанализировать качество моделей по метрикам RMSE, MAE и R².

## Технический модуль.

### Импорт библиотек.

In [1]:
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import pandas as pd
import numpy as np
import math
import time
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder   
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
SparkSession.stop;

### Импорт функций.

In [3]:
def evaluate_metrics(predictions):
    """Вычисляет и выводит метрики качества регрессионной модели.
    
    Функция вычисляет три основные метрики для оценки регрессионных моделей:
    RMSE (Root Mean Square Error), MAE (Mean Absolute Error) и R² (коэффициент детерминации).
    Результаты выводятся в удобочитаемом формате.
    
    Args:
        predictions (DataFrame): DataFrame с предсказанными и фактическими значениями,
                                должен содержать колонки 'prediction' и 'label'.
                                
    Returns:
        tuple: Кортеж с вычисленными метриками в формате (rmse, mae, r2)
    
    Examples:
        >>> predictions = model.transform(test_data)
        >>> rmse, mae, r2 = evaluate_metrics(predictions)
        === Оценка модели на тестовых данных ===
        RMSE: 1.23
        MAE: 1.05
        R²: 0.9567
    """
    rmse = evaluator_rmse.evaluate(predictions)
    mae = evaluator_mae.evaluate(predictions)
    r2 = evaluator_r2.evaluate(predictions)
    
    print("=== Оценка модели на тестовых данных ===")
    print(f"RMSE: {rmse:.2f}")
    print(f"MAE: {mae:.2f}")
    print(f"R²: {r2:.4f}")
    
    return rmse, mae, r2

## Загрузка и проверка данных.

### Инициализируем локальную Spark-сессию.

In [4]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("EDA California Housing") \
                    .getOrCreate()

### Загрузка и ознакомление с типом данных. 

Загрузим наш датасет и ознакомиися с типом данных при помощи методов PySpark.

In [5]:
df = spark.read.csv('/datasets/housing.csv', header=True, inferSchema=True)
df.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



                                                                                

Приведем полученную таблицу к более привычному нам виду при помощи преобразования в *pandas*.

In [6]:
pd.DataFrame(df.dtypes, columns=['columns', 'dtype'])

Unnamed: 0,columns,dtype
0,longitude,double
1,latitude,double
2,housing_median_age,double
3,total_rooms,double
4,total_bedrooms,double
5,population,double
6,households,double
7,median_income,double
8,median_house_value,double
9,ocean_proximity,string


Видим что все переменные за исключением **ocean_proximity** представленны в числовом виде.

In [7]:
df.toPandas().head(10)

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY
5,-122.25,37.85,52.0,919.0,213.0,413.0,193.0,4.0368,269700.0,NEAR BAY
6,-122.25,37.84,52.0,2535.0,489.0,1094.0,514.0,3.6591,299200.0,NEAR BAY
7,-122.25,37.84,52.0,3104.0,687.0,1157.0,647.0,3.12,241400.0,NEAR BAY
8,-122.26,37.84,42.0,2555.0,665.0,1206.0,595.0,2.0804,226700.0,NEAR BAY
9,-122.25,37.84,52.0,3549.0,707.0,1551.0,714.0,3.6912,261100.0,NEAR BAY


In [8]:
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


Видим небольшые аномалии в данных.

### Проверка на пропущенные значения и дубликаты.

In [9]:
columns = df.columns

for column in columns:
    check_col = F.isnan(column) | F.isnull(column)
    print(column, df.filter(check_col).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Обнаружились незначительные пропуски в переменной *total_bedrooms* , заполним их использовав среднее значение.

In [10]:
mean = df.select(F.ceil(F.mean('total_bedrooms'))).first()[0]
mean

538

In [11]:
df = df.fillna(mean)

In [12]:
for column in columns:
    check_col = F.isnan(column) | F.isnull(column)
    print(column, df.filter(check_col).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


In [13]:
df.groupBy("total_bedrooms").count().orderBy(F.desc("count")).show(10)




+--------------+-----+
|total_bedrooms|count|
+--------------+-----+
|         538.0|  230|
|         280.0|   55|
|         331.0|   51|
|         345.0|   50|
|         393.0|   49|
|         343.0|   49|
|         348.0|   48|
|         328.0|   48|
|         394.0|   48|
|         272.0|   47|
+--------------+-----+
only showing top 10 rows



                                                                                

Видим что замена пропусков прошла успешно.

In [13]:
duplicate_count = df.count() - df.distinct().count()
print(f"Количество дубликатов: {duplicate_count}")



Количество дубликатов: 0


                                                                                

Явных дубликатов устнановить не удалось.

### Вывод

- Мы загрузили полученные данные при помощи средств PySpark 


- Проведя первичный анализ мы заметили что признак **ocean_proximity** носит категориальный характер , остальные же переменные представлены в числовом виде.


- Мы выполнили обработку пропусков средним значением , явных дубликатов в данных обнаружить не удалось.

## Подготовка данных и предсказания модели.

### Определение базовых элементов пайплайна.

Разделим данные на категории.

In [14]:
cat_cols = ['ocean_proximity']
num_cols = ['housing_median_age',
            'total_rooms',
            'total_bedrooms',
            'population',
            'households',
            'median_income']
target = 'median_house_value'

Определим кодировщики.

In [15]:
indexer = StringIndexer(inputCols=cat_cols, outputCols=[c+'_idx' for c in cat_cols])

encoder = OneHotEncoder(inputCols=[c+'_idx' for c in cat_cols], outputCols=[c+'_ohe' for c in cat_cols])

categorical_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in cat_cols], 
                                        outputCol="categorical_features")

numerical_assembler = VectorAssembler(inputCols=num_cols, outputCol="numerical_features")

standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled", 
                                withStd=True, withMean=True)

final_assembler = VectorAssembler(inputCols=['categorical_features', 'numerical_features_scaled'], 
                                  outputCol="features")


Создание метрик оценки.

In [16]:
evaluator_rmse = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")

### Предсказания на всех данных.

Соберем пайалайн для получения предсказания на всех данных.

In [17]:
lr = LinearRegression(labelCol=target, featuresCol='features')

pipeline = Pipeline(stages=[
    indexer,
    encoder,
    categorical_assembler,
    numerical_assembler,
    standardScaler,
    final_assembler,
    lr
])

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [10, 100]) \
    .build()



crossval = CrossValidator(estimator=pipeline,
                         estimatorParamMaps=paramGrid,
                         evaluator=evaluator_rmse,
                         numFolds=5,
                         seed=42)



Разделение данных и получение результатов.

In [22]:
%%time
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)
cv_model = crossval.fit(train_data)

best_model = cv_model.bestModel

predictions = best_model.transform(test_data)

rmse_all, mae_all, r2_all = evaluate_metrics(predictions)

best_lr_model = best_model.stages[-1]
print("\n=== Лучшие параметры модели all_cat ===")
print(f"regParam: {best_lr_model.getRegParam()}")
print(f"elasticNetParam: {best_lr_model.getElasticNetParam()}")
print(f"maxIter: {best_lr_model.getMaxIter()}")


=== Оценка модели на тестовых данных ===
RMSE: 69825.72
MAE: 50597.34
R²: 0.6308

=== Лучшие параметры модели all_cat ===
regParam: 0.01
elasticNetParam: 0.0
maxIter: 10
CPU times: user 6.7 s, sys: 2.79 s, total: 9.49 s
Wall time: 1min 29s


### Предсказания только на числовых признаках.

Пайплайн только для числовых признаков.

In [23]:
final_assembler = VectorAssembler(inputCols=['numerical_features_scaled'], 
                                  outputCol="features")

In [24]:
pipeline = Pipeline(stages=[
    numerical_assembler,
    standardScaler,
    final_assembler,
    lr
])

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [10, 100]) \
    .build()



crossval = CrossValidator(estimator=pipeline,
                         estimatorParamMaps=paramGrid,
                         evaluator=evaluator_rmse,
                         numFolds=5,
                         seed=42)



Получение результатов.

In [25]:
%%time
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

cv_model = crossval.fit(train_data)

best_model_nocat = cv_model.bestModel

predictions = best_model_nocat.transform(test_data)


rmse_nocat, mae_nocat, r2_nocat = evaluate_metrics(predictions)

best_lr_nocat = best_model_nocat.stages[-1]
print("\n=== Лучшие параметры модели no_cat ===")
print(f"regParam: {best_lr_nocat.getRegParam()}")
print(f"elasticNetParam: {best_lr_nocat.getElasticNetParam()}")
print(f"maxIter: {best_lr_nocat.getMaxIter()}")

=== Оценка модели на тестовых данных ===
RMSE: 76574.41
MAE: 56292.84
R²: 0.5560

=== Лучшие параметры модели no_cat ===
regParam: 1.0
elasticNetParam: 0.5
maxIter: 100
CPU times: user 3.92 s, sys: 1.75 s, total: 5.67 s
Wall time: 56.9 s


Для наглядной оценки эффективноти полученных значений метрик , посмотрим на то какой процент относительно среднего значения нашей целевой переменной , составляет метрика MAE.

In [26]:
print(f'Ошибка предсказаний по всем категориям в среднем : \
{((mae_all / df.select(F.mean("median_house_value")).collect()[0][0])*100):.2f}%')

Ошибка предсказаний по всем категориям в среднем : 24.46%


In [27]:
print(f'Ошибка предсказаний только по числовым категориям в среднем: \
{((mae_nocat / df.select(F.mean("median_house_value")).collect()[0][0])*100):.2f}%')

Ошибка предсказаний только по числовым категориям в среднем: 27.21%


### Вывод

- Мы построили две модели **LinearRegression** с использованием всех входящих признаков и исключив категориальные признаки. Для этого мы использовали средства библиотеки MlLib.


- Для оценки качества моделей мы использовали следующие метрики **rmse, mae, r2**.


- При обучении модели на всех признаках нам удалось добиться следующих результатов : *RMSE*: **69825.72** *MAE*: **50597.34** *R²*: **0.6308**.


- При обучении модели только на числовых признаках метрики показали следующие результаты : *RMSE*: **76574.41** *MAE*: **56292.84** *R²*: **0.5560**.


- Мы видим что модель обученная на всех признаках более эффективна относительно модели с исключенными категориальными признаками.


## Общий вывод

**В этом проекте мы решали классическую задачу линейной регрессии при помощи средств MlLib и PySpark. Вот чего нам удалось достичь:**

- Мы загрузили полученный датасет при помощи методов *PySpark* содержащий информацию о жилье в Калифорнии в 1990 году.


- Выполнили проверку типов данных. Выяснив что переменная **ocean_proximity** содержит категориальные данные , а остальные признаки представлены числовым типом данных *(double)*



- Выполнили обработку пропусков средним значением , дубликатов найдено не было.



- При помощи *пайплайна* мы обучили две модели линейной регрессии и получили следующие результаты  **RMSE: 69825.72 MAE: 50597.34 R²: 0.6308** для модели обученной на всех признаках и **RMSE: 76574.41 MAE: 56292.84 R²: 0.5560** для модели обученной исключительно на числовых данных.



- Более лучший результат показала модель обученная на всем датасете с средним отклонением от среднего значения **median_house_value** в **24%** против **27%** в модели и исключенным категориальным признаком.


