## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

In [1]:
# Загрузим необходимые библиотеки
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator

RANDOM_SEED = 44

# Подготовка данных

In [2]:
# Инициализируем Spark сессию
spark = SparkSession.builder \
                    .master("local") \
                    .appName("EDA California Housing") \
                    .getOrCreate()

In [3]:
# Прочитаем данные
df_housing = spark.read.csv('/datasets/housing.csv', inferSchema=True, header=True)
df_housing.show(10)

                                                                                

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [4]:
# Выведем типы данных и названия колонок
print(pd.DataFrame(df_housing.dtypes, columns=['column', 'type']).head(10))


               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


In [5]:
# Выведем основные метрики распределений данных 
df_housing.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


In [6]:
# выведем пропущенные значения в каждой колонке
columns = df_housing.columns

for column in columns:
    check_col = F.col(column).cast('float')
    print(column, df_housing.filter(check_col.isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 20640


Только в столбце 'total_bedrooms' у нас выявлено 207 пропусков, что составляет около 1%. Заменим пропуски на среднее значение 538 для столбца 'total_bedrooms', полученное нами ранее при выводе describe().

In [7]:
#df_housing = df_housing.na.drop(how='any')
df_housing = df_housing.fillna(538, subset=['total_bedrooms'])

In [8]:
# Проверим результат после удаления пропусков
columns = df_housing.columns

for column in columns:
    check_col = F.col(column).cast('float')
    print(column, df_housing.filter(check_col.isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 20640


Пропуски не найдены.

In [9]:
# Разделим колонки на два типа: числовые и категориальные
categorical_cols = ['ocean_proximity']
numerical_cols  = ["longitude", "latitude", "housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income"]
target = "median_house_value" 

В первую очередь трансформируем категориальные признаки с помощью трансформера StringIndexer. Он переводит текстовые категории в числовое представление. Затем создадим OHE-кодирование.

In [10]:
# Трансформируем категориальные признаки
indexer = StringIndexer(inputCols=categorical_cols, 
                        outputCols=[c+'_idx' for c in categorical_cols]) 
df_housing = indexer.fit(df_housing).transform(df_housing)

                                                                                

In [11]:
# Преобразуем колонку с категориальными значениями техникой One hot encoding
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
                        outputCols=[c+'_ohe' for c in categorical_cols])
df_housing = encoder.fit(df_housing).transform(df_housing)

df_housing.toPandas().head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,ocean_proximity_idx,ocean_proximity_ohe
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)"
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)"
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)"
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)"
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)"


Объединим признаки в один вектор с помощью VectorAssembler

In [12]:
categorical_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols], outputCol="categorical_features")
df_housing = categorical_assembler.transform(df_housing) 

Числовые признаки трансформируем путем шкалирования значений, чтобы сильные выбросы не смещали предсказания модели.

In [13]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
df_housing = numerical_assembler.transform(df_housing) 

standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled", withMean=True)
df_housing = standardScaler.fit(df_housing).transform(df_housing) 

                                                                                

Далее необходимо собрать трансформированные категорийные и числовые признаки с помощью VectorAssembler.

In [14]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 
df_housing = final_assembler.transform(df_housing)

df_housing.select(all_features).show(3) 

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[3],[1.0])|     [-1.3278030546902...|
|       (4,[3],[1.0])|     [-1.3228118684350...|
|       (4,[3],[1.0])|     [-1.3327942409452...|
+--------------------+-------------------------+
only showing top 3 rows



# Обучение моделей

Перед обучением модели разобьем наш датасет на выборки

In [15]:
train_data, test_data = df_housing.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count()) 

                                                                                

16490 4150


                                                                                

По условиям задачи нам необходимо подготовить две модели линейной регрессии. Одна с использованием всех данных из файла, другая - используя только числовые признаки, исключив категориальные.

In [16]:
# обучим первую модель
lr = LinearRegression(labelCol=target, featuresCol='features')
model = lr.fit(train_data) 

24/11/23 22:53:28 WARN Instrumentation: [d8dfe2ca] regParam is zero, which might cause numerical instability and overfitting.
24/11/23 22:53:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/11/23 22:53:29 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/11/23 22:53:30 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/11/23 22:53:30 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

In [17]:
# запишем предсказания на тестовой выборке
predictions = model.transform(test_data)
# Сохраним предсказания и ответы в отдельную переменную
results = predictions.select(['prediction', target])
# Создадим RDD для расчета метрик
results_collect = results.collect()
results_list = [ (float(i[0]), float(i[1])) for i in results_collect]
scoreAndLabels = spark.sparkContext.parallelize(results_list)
 
metrics = RegressionMetrics(scoreAndLabels)

print('RMSE', metrics.rootMeanSquaredError) 
print('MAE', metrics.meanAbsoluteError) 
print('R2', metrics.r2) 

                                                                                

RMSE 68180.8238732294
MAE 49698.78254468224
R2 0.659289160949233


In [18]:
# Повторим аналогичные операции для модели на количественных признаках
lr = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')
model = lr.fit(train_data)
# запишем предсказания на тестовой выборке
predictions = model.transform(test_data)
# Сохраним предсказания и ответы в отдельную переменную
results = predictions.select(['prediction', target])
# Создадим RDD для расчета метрик
results_collect = results.collect()
results_list = [ (float(i[0]), float(i[1])) for i in results_collect]
scoreAndLabels = spark.sparkContext.parallelize(results_list)
 
metrics = RegressionMetrics(scoreAndLabels)

print('RMSE', metrics.rootMeanSquaredError) 
print('MAE', metrics.meanAbsoluteError) 
print('R2', metrics.r2)

24/11/23 22:53:35 WARN Instrumentation: [316df805] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

RMSE 69162.12825063201
MAE 50871.4361149453
R2 0.6494111028837695


# Анализ результатов

В итоге мы получили следующие результаты согласно наших метрик:
<br>
RMSE (меньше-лучше) - модель на полном наборе данных лучше чем модель на количественных признаках
<br>
MAE (меньше-лучше) - модель на полном наборе данных лучше чем модель на количественных признаках
<br>
R2 (больше-лучше) - модель на полном наборе данных лучше чем модель на количественных признаках
<br>

Таким образом, по всем трем метрикам модель на полном наборе данных превосходит модель на количественных признаках.