## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

# Подготовка данных

Загружаем библиотеки

In [165]:
import pandas as pd

from typing import List

from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, udf, collect_list# approxQuantile
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.window import Window

In [166]:
SSEED = 3434

Инициализируем spark сессию

In [167]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("asdasd") \
                    .getOrCreate()

Загрузим данные

In [168]:
data = spark.read.csv('/datasets/housing.csv', header=True, inferSchema=True)

Посмотрим на общее количество строк

In [169]:
row_count = data.count()
print("Всего строк:", row_count)

Всего строк: 20640


Посмотрим на атрибуты и их типы

In [170]:
print(data.dtypes)

[('longitude', 'double'), ('latitude', 'double'), ('housing_median_age', 'double'), ('total_rooms', 'double'), ('total_bedrooms', 'double'), ('population', 'double'), ('households', 'double'), ('median_income', 'double'), ('median_house_value', 'double'), ('ocean_proximity', 'string')]


In [171]:
data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [172]:
print(pd.DataFrame(data.dtypes, columns=['column', 'type']))

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


Посмотрим на сами данные

In [173]:
display(data.limit(5).toPandas())

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


Проверим на наличие пустых значений

In [174]:
null_counts = data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).collect()[0]

percent_empty = (sum(null_counts) / (row_count * len(data.columns))) * 100

print("Доля пустых значений: {:.2f}%".format(percent_empty))
print('')
print("{:<25} {}".format("Наименование столбца", "Количество пустых значений"))
for col_name, null_count in zip(data.columns, null_counts):
    print("{:<25} {}".format(col_name, null_count))

Доля пустых значений: 0.10%

Наименование столбца      Количество пустых значений
longitude                 0
latitude                  0
housing_median_age        0
total_rooms               0
total_bedrooms            207
population                0
households                0
median_income             0
median_house_value        0
ocean_proximity           0


Вывод: пустые значения есть только в столбце <b>total_bedrooms</b>
<br>
С одной стороны можно заполнить все пропуски просто медианой, но с другой стороны по данным видно, что есть категориальный признак <b>ocean_proximity</b>, поэтому можно для начала проверить этот признак и посмотреть % пустых значений в каждой из категории. если будет прослеживаться корреляция, то можно будет заполнить столбец <b>total_bedrooms</b> медианой каждой из категорий, а не просто общей медианой. Конечно количество пустот (<b>0.10%</b>) здесь особо и не делает погоду

Посмотрим на пустоты при группировке категориального признака

In [175]:
missing_values_count = data.groupBy('ocean_proximity').agg(count(when(isnan('total_bedrooms') | col('total_bedrooms').isNull(), 'total_bedrooms')).alias('Количество пустых значений'))
total_count = data.groupBy('ocean_proximity').count()
missing_values_percentage = missing_values_count.join(total_count, 'ocean_proximity') \
    .withColumn('Доля', (col('Количество пустых значений') / col('count')) * 100)

display(missing_values_percentage.toPandas())

                                                                                

Unnamed: 0,ocean_proximity,Количество пустых значений,count,Доля
0,ISLAND,0,5,0.0
1,NEAR OCEAN,30,2658,1.128668
2,NEAR BAY,20,2290,0.873362
3,<1H OCEAN,102,9136,1.116462
4,INLAND,55,6551,0.839566


Посчитаем медиану для каждой из категорий
<br>
update
<br>
судя по всему ни median, ни approxQuantile у нас здесь нет. делать кастомную функцию под пустые значения, которые составляют .10% от общего количества данных это конечно сомнительное удовольствие, но что поделать

In [176]:
def median(values: List[float]) -> float:

    sorted_values = sorted(values)
    n = len(sorted_values)
    if n % 2 == 0:
        return (sorted_values[n // 2 - 1] + sorted_values[n // 2]) / 2
    else:
        return sorted_values[n // 2]

median_udf = udf(median, DoubleType())

Заполним пустоты в <b>total_bedrooms</b> медианным значением

In [178]:
data = data.withColumn('total_bedrooms', when(col('total_bedrooms').isNull(),
                                              median_udf(collect_list('total_bedrooms').over(Window.partitionBy('ocean_proximity')))).otherwise(col('total_bedrooms')))

Посмотрим на пустые значения еще раз

In [179]:
null_counts = data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).collect()[0]

percent_empty = (sum(null_counts) / (row_count * len(data.columns))) * 100

print("Доля пустых значений: {:.2f}%".format(percent_empty))
print('')
print("{:<25} {}".format("Наименование столбца", "Количество пустых значений"))
for col_name, null_count in zip(data.columns, null_counts):
    print("{:<25} {}".format(col_name, null_count))



Доля пустых значений: 0.00%

Наименование столбца      Количество пустых значений
longitude                 0
latitude                  0
housing_median_age        0
total_rooms               0
total_bedrooms            0
population                0
households                0
median_income             0
median_house_value        0
ocean_proximity           0


                                                                                

Преобразуйте колонку с категориальными значениями техникой One hot encoding.
<br>
StringIndexer для перевода в числовой формат
<br>
<font color='purple'><b>update 1</b> Сначала разделим на выборки, а потом уже кодирование</font>

In [180]:
train_all, test_all = data.randomSplit([0.8, 0.2], seed=SSEED)

In [181]:
indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_idx")
#data_ohe = indexer.fit(train_all).transform(train_all)

собственно сам ohe

In [182]:
encoder = OneHotEncoder(inputCols=["ocean_proximity_idx"], outputCols=["ocean_proximity_vec"])

теперь числовые признаки

In [183]:
assembler_all = VectorAssembler(inputCols=["longitude", "latitude", "housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income", "ocean_proximity_vec"], outputCol='features_all')

In [184]:
num_cols = ["longitude", "latitude", "housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income"]

In [185]:
num_assembler = VectorAssembler(inputCols=num_cols, outputCol="num_features")

In [186]:
scaler = StandardScaler(inputCol="num_features", outputCol="scaled_num_features")

Разделение на выборки 80\20

# Обучение моделей

<b>Построение модели на всех данных</b>

Инициализируем модель

In [188]:
lr_all = LinearRegression(featuresCol='features_all', labelCol='median_house_value')

In [189]:
pipeline_all = Pipeline(stages=[indexer, encoder, num_assembler, scaler, assembler_all, lr_all])

Обучим

In [None]:
lr_model_all = pipeline_all.fit(train_all)

23/05/08 21:50:12 WARN Instrumentation: [e92c20b0] regParam is zero, which might cause numerical instability and overfitting.

Получим предсказания

In [None]:
predictions_all = lr_model_all.transform(test_all)

Оценим по метрикам RMSE, MAE и R2

In [192]:
evaluator = RegressionEvaluator(labelCol='median_house_value', predictionCol='prediction', metricName='rmse')
rmse_all = evaluator.evaluate(predictions_all)
mae_all = evaluator.setMetricName('mae').evaluate(predictions_all)
r2_all = evaluator.setMetricName('r2').evaluate(predictions_all)

print("Модель на всех данных")
print("RMSE: {:.2f}".format(rmse_all))
print("MAE: {:.2f}".format(mae_all))
print("R2: {:.2f}".format(r2_all))



Модель на всех данных
RMSE: 70131.70
MAE: 50447.24
R2: 0.64


                                                                                

<b>Построение модели на числовых данных</b>

Инициализируем модель

In [196]:
lr_numeric = LinearRegression(featuresCol='scaled_num_features', labelCol='median_house_value')
pipeline_num = Pipeline(stages=[num_assembler, scaler, lr_numeric])

Обучим

In [197]:
lr_model_numeric = pipeline_num.fit(train_all)

23/05/08 21:57:32 WARN Instrumentation: [9330ac4c] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

Получим предсказания

In [198]:
predictions_numeric = lr_model_numeric.transform(test_all)

Оценим по метрикам RMSE, MAE и R2

In [199]:
rmse_numeric = evaluator.evaluate(predictions_numeric)
mae_numeric = evaluator.setMetricName('mae').evaluate(predictions_numeric)
r2_numeric = evaluator.setMetricName('r2').evaluate(predictions_numeric)

print("Модель на числовых данных")
print("RMSE: {:.2f}".format(rmse_numeric))
print("MAE: {:.2f}".format(mae_numeric))
print("R2: {:.2f}".format(r2_numeric))



Модель на числовых данных
RMSE: 0.63
MAE: 51550.12
R2: 0.63


                                                                                

In [200]:
spark.stop()

# Анализ результатов

Нами было проведено сравнение двух моделей линейной регрессии для предсказания стоимости жилья.
<br>
Первая модель использовала все признаки, включая категориальный признак "ocean_proximity", который был закодирован методом ohe, а вторая модель использовала только числовые признаки.
<br>
Результаты показали, что модель на всех признаках имеет лучшие показатели, чем модель на числовых признаках. Значение метрики RMSE для первой модели составило 70131.70, а для второй - 0.63. Однако модель на числовых признаках также имеет хорошие показатели с точки зрения метрик MAE и R2, что говорит о том, что числовые признаки имеют высокую корреляцию с целевой переменной.
<br>
Таким образом, можно заключить, что включение категориального признака "ocean_proximity" в модель улучшает ее предсказательную способность