## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

В колонках датасета содержатся следующие данные:

*longitude* — широта;

*latitude* — долгота;

*housing_median_age* — медианный возраст жителей жилого массива;

*total_rooms* — общее количество комнат в домах жилого массива;

*total_bedrooms* — общее количество спален в домах жилого массива;

*population* — количество человек, которые проживают в жилом массиве;

*households* — количество домовладений в жилом массиве;

*median_income* — медианный доход жителей жилого массива;

*median_house_value* — медианная стоимость дома в жилом массиве;

*ocean_proximity* — близость к океану.

# Подготовка данных

In [1]:
import pandas as pd 
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import pyspark
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml.feature import Bucketizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import (StringIndexer, OneHotEncoder,
                            VectorAssembler, StandardScaler)
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [2]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [3]:
RANDOM_SEED = 49

In [4]:
logger = logging.getLogger('py4j')
logger.setLevel(logging.ERROR)

In [5]:
spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("House - Linear regression") \
                    .getOrCreate()

In [6]:
spark.sparkContext.setLogLevel("ERROR")

In [7]:
df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema=True)

                                                                                

In [8]:
df.show(5)


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [9]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [10]:
columns = df.columns
for column in columns:
    check_col = F.isnan(column) | F.isnull(column)
    print(column, df.filter(check_col).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


У нас пропуски только в одном столбце total_bedrooms (общее количество спален в домах жилого массива). Чтобы заполнить пропуски сгруппируем по количеству человек в жилом массиве, возьмем медиану total_bedrooms и заполним по группам. Но т.к. количество человек в жилом массиве скорее всего будет кучей чисел, то разобьем на несколько суперкатегорий

Еще вариант сгруппировать по housing_median_age: в районах с похожим возрастом домов обычно схожая планировка. Соответветвенно одинаковое количество спален

In [11]:
population_counts = (df.groupBy('population').count()
      .orderBy(F.desc('count')))
population_counts.show()
df.groupBy('population').count().count()

                                                                                

+----------+-----+
|population|count|
+----------+-----+
|     891.0|   25|
|     761.0|   24|
|    1052.0|   24|
|    1227.0|   24|
|     850.0|   24|
|     825.0|   23|
|     999.0|   22|
|     782.0|   22|
|    1005.0|   22|
|     753.0|   21|
|     781.0|   21|
|     872.0|   21|
|    1098.0|   21|
|    1203.0|   20|
|     837.0|   20|
|     899.0|   20|
|     735.0|   20|
|    1056.0|   20|
|     804.0|   20|
|    1301.0|   20|
+----------+-----+
only showing top 20 rows



                                                                                

3888

Вариантов действительно очень много. нужно укрупнить категории. Построим график и посмотрим распределение

категорий очень много. Попробуем просто разбить на 100 корзин, потому что каких-то четко выделенных бинов мы не видим

In [12]:
min_pop, max_pop = df.select(F.min('population'), F.max('population')).first()

splits = np.linspace(min_pop, max_pop, num=101)

bucketizer = Bucketizer(splits=splits, inputCol='population', outputCol='population_bucket')

df = bucketizer.transform(df)

In [13]:
median_bedrooms = (df.groupBy('population_bucket','housing_median_age')
                   .agg(F.expr('percentile_approx(total_bedrooms, 0.5)').alias('median_bedrooms')))
df_with_medians = df.join(median_bedrooms, on=['population_bucket', 'housing_median_age'], how='left')

In [14]:
df = df_with_medians.withColumn(
    'total_bedrooms',F.coalesce(df_with_medians['total_bedrooms'], df_with_medians['median_bedrooms']))

df = df.drop('median_bedrooms','population_bucket')

df.show()

                                                                                

+------------------+---------+--------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|housing_median_age|longitude|latitude|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+------------------+---------+--------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|              41.0|  -122.23|   37.88|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|              21.0|  -122.22|   37.86|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|              52.0|  -122.24|   37.85|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|              52.0|  -122.25|   37.85|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|              52.0|  -122.25|   3

In [15]:
df.filter(df['total_bedrooms'].isNull()).count()

                                                                                

3

осталось три пропуска, их можно заполнить общим средним или дропнуть. т.к. у нас много данных просто удалим эти строки

In [16]:
df = df.na.drop()

In [17]:
categorical_cols = ['ocean_proximity']
numerical_cols = ['longitude','latitude','housing_median_age','total_rooms', 'total_bedrooms',
                  'population', 'households','median_income']
target = 'median_house_value'

In [18]:
df.describe().show()

[Stage 39:>                                                         (0 + 1) / 1]

+-------+------------------+-------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+---------------+
|summary|housing_median_age|          longitude|         latitude|       total_rooms|   total_bedrooms|        population|        households|     median_income|median_house_value|ocean_proximity|
+-------+------------------+-------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+---------------+
|  count|             20637|              20637|            20637|             20637|            20637|             20637|             20637|             20637|             20637|          20637|
|   mean| 28.63890100305277|-119.56970877549743|35.63193051315615| 2635.256577990987|537.5854048553568|1424.9095798807966|499.32330280564037|3.8709846828512378|206851.50268934437|           null|
| stddev|12.58588077

                                                                                

# Обучение моделей

### **Трансформация категориальных признаков**

У нас всего один категориальный признак: ocean_proximity. Используем два метода в связке: сначала переведем с помощью StringIndexer категориальные значения в числовые индексы. Т.к. OneHotEncoder принимает на вход числовые значения, затем onehot переведем эти индексы в векторы. 

In [19]:
df.groupBy('ocean_proximity').count().orderBy(F.desc('count')).show()




+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9134|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2289|
|         ISLAND|    5|
+---------------+-----+



                                                                                

Но меня смущает, что stringIndexer назначает числа по частоте встречаемости, по идее этот признак этот признак должен отражать последовательность удаленности от океана

0 ISLAND — на острове, максимально близко к воде

1 NEAR BAY — возле залива, почти на побережье

2 <1H OCEAN — менее часа до океана

3 NEAR OCEAN — рядом с океаном, но чуть дальше, чем <1H OCEAN

4 INLAND — вглубь континента, далеко от океана

Можно было бы закодировать их явно (например через when), чтобы модель распознавала порядковость. Но у нас проекте стоит требование сделать OneHot.Если применить OneHotEncoder, то модель не будет видеть в них порядковости, т.к. он убирает информацию о порядке и делает категории независимыми друг от друга. Мне кажется это несколько нправильным ну да ладно

Разделяем датасет на обучающую и тестовую выборки (для всех данных)

In [20]:
train_data, test_data = df.randomSplit([0.8,0.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count()) 

                                                                                

16591 4046


Сделаем пайплайн для обработки данных

In [21]:
indexer = StringIndexer(inputCol='ocean_proximity', 
                        outputCol='ocean_proximity_indx')

encoder = OneHotEncoder(inputCol='ocean_proximity_indx',
                        outputCol='ocean_proximity_enc')

# Объединяем категориальные признаков в вектор
categorical_assembler = VectorAssembler(inputCols=[c+'_enc' for c in categorical_cols],
                                        outputCol="categorical_features")

# и числовые в вектор
numerical_assembler = VectorAssembler(inputCols=numerical_cols,
                                      outputCol="numerical_features")

standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled", withMean=True)

#объединяем все признаки в один вектор
all_features = ['categorical_features', 'numerical_features_scaled']
final_assembler = VectorAssembler(inputCols=all_features,
                                  outputCol='features')


pipeline = Pipeline(stages=[
    indexer, 
    encoder, 
    categorical_assembler, 
    numerical_assembler, 
    standardScaler, 
    final_assembler
])


pipeline_model = pipeline.fit(train_data)
train_data = pipeline_model.transform(train_data)
test_data = pipeline_model.transform(test_data)


                                                                                

проверяем

In [22]:
train_data.select('ocean_proximity', 'ocean_proximity_indx', 'ocean_proximity_enc', 'features').show(3)

                                                                                

+---------------+--------------------+-------------------+--------------------+
|ocean_proximity|ocean_proximity_indx|ocean_proximity_enc|            features|
+---------------+--------------------+-------------------+--------------------+
|         INLAND|                 1.0|      (4,[1],[1.0])|[0.0,1.0,0.0,0.0,...|
|         INLAND|                 1.0|      (4,[1],[1.0])|[0.0,1.0,0.0,0.0,...|
|         INLAND|                 1.0|      (4,[1],[1.0])|[0.0,1.0,0.0,0.0,...|
+---------------+--------------------+-------------------+--------------------+
only showing top 3 rows



In [23]:
test_data.select('ocean_proximity', 'ocean_proximity_indx', 'ocean_proximity_enc', 'features').show(3)

                                                                                

+---------------+--------------------+-------------------+--------------------+
|ocean_proximity|ocean_proximity_indx|ocean_proximity_enc|            features|
+---------------+--------------------+-------------------+--------------------+
|         INLAND|                 1.0|      (4,[1],[1.0])|[0.0,1.0,0.0,0.0,...|
|      <1H OCEAN|                 0.0|      (4,[0],[1.0])|[1.0,0.0,0.0,0.0,...|
|      <1H OCEAN|                 0.0|      (4,[0],[1.0])|[1.0,0.0,0.0,0.0,...|
+---------------+--------------------+-------------------+--------------------+
only showing top 3 rows



Обучаем модель на всех данных

In [24]:
lr = LinearRegression(labelCol=target, featuresCol='features', regParam=0.1)
model = lr.fit(train_data)

                                                                                

In [25]:
predictions = model.transform(test_data)

predictions.select('features', target, 'prediction').show(5)

                                                                                

+--------------------+------------------+------------------+
|            features|median_house_value|        prediction|
+--------------------+------------------+------------------+
|[0.0,1.0,0.0,0.0,...|          141700.0|154065.61141373776|
|[1.0,0.0,0.0,0.0,...|          434700.0|167181.52189024218|
|[1.0,0.0,0.0,0.0,...|          425000.0|  243109.627979976|
|[0.0,1.0,0.0,0.0,...|          240200.0|214508.99841539894|
|[0.0,1.0,0.0,0.0,...|          137900.0|142704.79380542156|
+--------------------+------------------+------------------+
only showing top 5 rows



Оценка качества

In [26]:
evaluator = RegressionEvaluator(labelCol=target, predictionCol='prediction')

In [27]:
rmse = evaluator.setMetricName('rmse').evaluate(predictions)
mae = evaluator.setMetricName('mae').evaluate(predictions)
r2 = evaluator.setMetricName('r2').evaluate(predictions)

print('Точность модели линейной регрессии на всех данных')
print(f"RMSE: {rmse}")
print(f"MAE: {mae}")
print(f"R²: {r2}")

                                                                                

Точность модели линейной регрессии на всех данных
RMSE: 70030.935085654
MAE: 49849.5532637351
R²: 0.6217029387524774


Метрики не очень. Посмотрим как можно улучшить.
Попробуем перебрать гиперпараметры: коэффициент L2-регуляризации (штраф за большие веса), параметр ElasticNet, регулирующий баланс между L1 и L2 регуляризацией, максимальное количество итераций

In [28]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.5, 1.0, 2.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .addGrid(lr.maxIter, [50, 100, 200])\
    .build()


evaluator = RegressionEvaluator(labelCol=target, 
                                predictionCol='prediction', 
                                metricName='r2')

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid, 
                          evaluator=evaluator,
                          numFolds=5)

cvModel = crossval.fit(train_data)


bestModel = cvModel.bestModel
print("Best regParam:", bestModel._java_obj.getRegParam())
print("Best elasticNetParam:", bestModel._java_obj.getElasticNetParam())
print("Best maxIter:", bestModel._java_obj.getMaxIter())
print("Best tol:", bestModel._java_obj.getTol())

predictions = bestModel.transform(test_data)
r2 = evaluator.evaluate(predictions)
print("R² на тестовой выборке:", r2)

                                                                                

Best regParam: 2.0
Best elasticNetParam: 0.0
Best maxIter: 50
Best tol: 1e-06


                                                                                

R² на тестовой выборке: 0.6217093826989335


В итоге мы получили такую же R2. Оставим простую модель

In [29]:
print("Средние метрики кросс-валидации (R²):", cvModel.avgMetrics)

Средние метрики кросс-валидации (R²): [0.6500935363292741, 0.6500935363292741, 0.6500935363292741, 0.6496222096894226, 0.649750250170051, 0.6498880572705992, 0.6496096474997928, 0.6497410912956592, 0.6499504913021388, 0.6500975299595085, 0.6500975299595085, 0.6500975299595085, 0.6496097671140462, 0.6497238294262722, 0.6497574979301255, 0.649519294812598, 0.6498072549474623, 0.6498055240366565, 0.650102217672499, 0.650102217672499, 0.650102217672499, 0.6495190786565287, 0.6498113719217595, 0.649785145372602, 0.649521655889309, 0.649793657965853, 0.6498079651872403, 0.6501106505547161, 0.6501106505547161, 0.6501106505547161, 0.6495051353366694, 0.6497523917860932, 0.6497466715490338, 0.649597576326238, 0.6497474865085603, 0.6498957835620964]


При кросс-валидации на обучающей выборке метрика r2 немного выше, чем на тестовой. Но не сильно

### Построим вторую модель на неполном наборе данных (используем только числовые фичи, исключив категориальные)

In [30]:
lr2 = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled', regParam=0.1)
model2 = lr2.fit(train_data)

                                                                                

In [31]:
predictions2 = model2.transform(test_data)

predictions2.select('features', target, 'prediction').show(5)

                                                                                

+--------------------+------------------+------------------+
|            features|median_house_value|        prediction|
+--------------------+------------------+------------------+
|[0.0,1.0,0.0,0.0,...|          141700.0| 163182.7121927816|
|[1.0,0.0,0.0,0.0,...|          434700.0| 152002.7075919719|
|[1.0,0.0,0.0,0.0,...|          425000.0| 236974.8755806374|
|[0.0,1.0,0.0,0.0,...|          240200.0|233971.27147062236|
|[0.0,1.0,0.0,0.0,...|          137900.0|153162.41352593014|
+--------------------+------------------+------------------+
only showing top 5 rows



In [32]:
rmse2 = evaluator.setMetricName('rmse').evaluate(predictions2)
mae2 = evaluator.setMetricName('mae').evaluate(predictions2)
r2_2 = evaluator.setMetricName('r2').evaluate(predictions2)

print('Точность модели линейной регрессии на числовых данных')
print(f"RMSE: {rmse2}")
print(f"MAE: {mae2}")
print(f"R²: {r2_2}")

                                                                                

Точность модели линейной регрессии на числовых данных
RMSE: 70891.57168046109
MAE: 50792.332754549025
R²: 0.6123477342502256


In [33]:
spark.stop()

# Анализ результатов

В этой работе мы обучили две модели линейной регрессии на данных о жилье в Калифорнии в 1990 году.
В процессе предподготовки данных были восстановлены пропуски в признаке total_bedrooms с помощью заполнения медианным значениям по группам, агрегированным по количеству человек в жилом массиве и возрасту жилого массива. 
Различие созданных моделей было только в наличии одного категориального признака: ocean_proximity (в модели lr полный набор данных, а в модели lr2 только числовые). По результатам метрик RMSE,MAE и R² точность обеих моделей очень близка, значит признак близости к океану не оказывал очень большого влияния на предсказательную способность модели линейной регрессии (однако модель lr показала себя все же немного лучше по метрикам RMSE и MAE). В обоих случаях мы получили модели с низкой точностью (RMSE: 70030, MAE: 49849, R²: 0.6217 для модели с полным набром данных)

In [34]:
!pip freeze > requirements.txt