## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import numpy as np
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import QuantileDiscretizer

In [2]:
RANDOM_SEED=5632

Инициализируем spark-сессию:

In [3]:
spark =  SparkSession \
         .builder \
         .master('local') \
         .appName('California project') \
         .getOrCreate()

log4j:ERROR Could not read configuration file from URL [file:/opt/conda/lib/python3.9/site-packages/pyspark/conf/log4j.properties].
java.io.FileNotFoundException: /opt/conda/lib/python3.9/site-packages/pyspark/conf/log4j.properties (Permission denied)
	at java.io.FileInputStream.open0(Native Method)
	at java.io.FileInputStream.open(FileInputStream.java:195)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at java.io.FileInputStream.<init>(FileInputStream.java:93)
	at sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90)
	at sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188)
	at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557)
	at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
	at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
	at org.slf4j.impl.Log4jLoggerFactory.<init>(Log4jLoggerFactory.java:66)
	at org.slf4j.impl.StaticLoggerBind

# Подготовка данных

Загрузим и изучим имеющиеся данные:

In [4]:
df = spark.read.load('/datasets/housing.csv', format='csv', inferSchema=True, header='true')
df.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



                                                                                

In [5]:
df.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [6]:
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


In [7]:
for col in df.columns:
    counter = 0
    counter = df.filter(f'{col} is NULL').count()
    print(f'{col}: {counter}')

longitude: 0
latitude: 0
housing_median_age: 0
total_rooms: 0
total_bedrooms: 207
population: 0
households: 0
median_income: 0
median_house_value: 0
ocean_proximity: 0


Избавимся от пропусков:

In [8]:
df = df.na.drop()
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433
1,mean,-119.57068859198068,35.63322125972706,28.633093525179856,2636.5042333480155,537.8705525375618,1424.9469485635982,499.43346547252,3.8711616013312273,206864.4131551901,
2,stddev,2.003577890751096,2.1363476663779872,12.591805202182837,2185.269566977601,421.3850700740312,1133.2084897449597,382.2992258828481,1.899291249306247,115435.66709858322,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


Закодируем категориальные данные и создадим из них вектор:

In [9]:
indexer = StringIndexer(inputCols=['ocean_proximity'],
                        outputCols=['ocean_proximity_i'])
df = indexer.fit(df).transform(df)
encoder = OneHotEncoder(inputCols=['ocean_proximity_i'],
                        outputCols=['categorical_features'])
df = encoder.fit(df).transform(df)
df.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,ocean_proximity_i
0,count,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433.0,20433,20433.0
1,mean,-119.57068859198068,35.63322125972706,28.633093525179856,2636.5042333480155,537.8705525375618,1424.9469485635982,499.43346547252,3.8711616013312273,206864.4131551901,,0.9094112465129937
2,stddev,2.003577890751096,2.1363476663779872,12.591805202182837,2185.269566977601,421.3850700740312,1133.2084897449597,382.2992258828481,1.899291249306247,115435.66709858322,,1.0045735326216698
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN,0.0
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN,4.0


In [10]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-----------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_i|categorical_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-----------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|              3.0|       (4,[3],[1.0])|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|              3.0|       (4,[3],[1.0])|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352

Соберем все численные признаки в вектор и проскалируем их:

In [11]:
numerical_assembler = VectorAssembler(inputCols=['housing_median_age', 
                                                 'total_rooms', 
                                                 'total_bedrooms', 
                                                 'population', 
                                                 'households', 
                                                 'median_income'], 
                                      outputCol='numerical_features')

df = numerical_assembler.transform(df)

In [12]:
discretizer = QuantileDiscretizer(numBuckets=5, inputCol="latitude", outputCol="latitude_cat")
df = discretizer.fit(df).transform(df)

In [13]:
encoder = OneHotEncoder(inputCols=['latitude_cat'],
                        outputCols=['categorical_features_v2'])
df = encoder.fit(df).transform(df)
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-----------------+--------------------+--------------------+------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_i|categorical_features|  numerical_features|latitude_cat|categorical_features_v2|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-----------------+--------------------+--------------------+------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|              3.0|       (4,[3],[1.0])|[41.0,880.0,129.0...|         4.0|              (4,[],[])|
|  -122.22|   37.86|              21.0|     7099.0|        1

In [14]:
cat_assembler = VectorAssembler(inputCols=['categorical_features', 'categorical_features_v2'], 
                                      outputCol='cat_features')

df = cat_assembler.transform(df)

In [15]:
standardScaler = StandardScaler(inputCol='numerical_features', outputCol='numerical_features_scaled')
df = standardScaler.fit(df).transform(df)

                                                                                

In [16]:
df.select(['cat_features', 'numerical_features_scaled']).show(5)

+-------------+-------------------------+
| cat_features|numerical_features_scaled|
+-------------+-------------------------+
|(8,[3],[1.0])|     [3.25608594968515...|
|(8,[3],[1.0])|     [1.66775134008264...|
|(8,[3],[1.0])|     [4.12966998496654...|
|(8,[3],[1.0])|     [4.12966998496654...|
|(8,[3],[1.0])|     [4.12966998496654...|
+-------------+-------------------------+
only showing top 5 rows



Объеденяем признаки, на основе которых будет обучаться модель:

In [17]:
all_features = ['cat_features','numerical_features_scaled']
final_assembler = VectorAssembler(inputCols=all_features, outputCol='features') 
df = final_assembler.transform(df)

In [18]:
df.select(all_features).show(5)

+-------------+-------------------------+
| cat_features|numerical_features_scaled|
+-------------+-------------------------+
|(8,[3],[1.0])|     [3.25608594968515...|
|(8,[3],[1.0])|     [1.66775134008264...|
|(8,[3],[1.0])|     [4.12966998496654...|
|(8,[3],[1.0])|     [4.12966998496654...|
|(8,[3],[1.0])|     [4.12966998496654...|
+-------------+-------------------------+
only showing top 5 rows



# Обучение моделей

Разделим датасет на обучающую и тестовую выборки в соотошении 4:1 соответственно (предварительно задав RANDOM_SEED, что бы не скакали результаты):

In [19]:
target='median_house_value'
#RANDOM_SEED=5632

train_df, test_df = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print('Обучающая выборка:', train_df.count(), '\nТестовая выборка:', test_df.count()) 

[Stage 40:>                                                         (0 + 1) / 1]

Обучающая выборка: 16318 
Тестовая выборка: 4115


                                                                                

Ну вот и пришло время обучить модель. Я попытался реализовать имеющуюся в pyspark кросс-валидацию:

In [20]:
lr = LinearRegression(labelCol=target, featuresCol='features')
grid = ParamGridBuilder().addGrid(lr.maxIter, [1, 5, 10, 20, 50]).addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.5, 1.0, 2.0]).addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0]).build()
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=target)
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, parallelism=2, numFolds = 4)
cvModel = cv.fit(train_df)
best = cvModel.bestModel

22/06/03 16:02:43 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/06/03 16:02:43 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/06/03 16:02:43 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/06/03 16:02:43 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

In [21]:
predictions = best.transform(test_df)
print('На обучающей выборке R2:', best.summary.r2)
print('На обучающей выборке RMSE:', best.summary.rootMeanSquaredError)
print('На обучающей выборке MSE:', best.summary.meanSquaredError)
print('На тестовой выборке R2:', evaluator.setParams(metricName="r2").evaluate(predictions))
print('На тестовой выборке RMSE:', evaluator.setParams(metricName="rmse").evaluate(predictions))
print('На тестовой выборке MSE:', evaluator.setParams(metricName="mse").evaluate(predictions))

На обучающей выборке R2: 0.6434870416786485
На обучающей выборке RMSE: 68749.39551060466
На обучающей выборке MSE: 4726479383.073549


                                                                                

На тестовой выборке R2: 0.6349861672629691
На тестовой выборке RMSE: 70412.44392173366
На тестовой выборке MSE: 4957912259.031287


Вроде получается. А теперь те же данные, но обучаем только основываясь на численных проскаллированных данных:

In [22]:
lr_wo = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')
grid_wo = ParamGridBuilder().addGrid(lr_wo.maxIter, [1, 5, 10, 20, 50]).addGrid(lr_wo.regParam, [0.001, 0.01, 0.1, 0.5, 1.0, 2.0]).addGrid(lr_wo.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0]).build()
cv_wo = CrossValidator(estimator=lr_wo, estimatorParamMaps=grid_wo, evaluator=evaluator, parallelism=2, numFolds = 4)
cvModel_wo = cv_wo.fit(train_df)
best_wo = cvModel_wo.bestModel

                                                                                

In [23]:
predictions_wo = best_wo.transform(test_df)
print('На обучающей выборке R2:', best_wo.summary.r2)
print('На обучающей выборке RMSE:', best_wo.summary.rootMeanSquaredError)
print('На обучающей выборке MSE:', best_wo.summary.meanSquaredError)
print('На обучающей выборке R2:', evaluator.setParams(metricName="r2").evaluate(predictions_wo))
print('На обучающей выборке RMSE:', evaluator.setParams(metricName="rmse").evaluate(predictions_wo))
print('На обучающей выборке MSE:', evaluator.setParams(metricName="mse").evaluate(predictions_wo))

На обучающей выборке R2: 0.5688193061268736
На обучающей выборке RMSE: 75606.80351794005
На обучающей выборке MSE: 5716388738.200392
На обучающей выборке R2: 0.5580721214435987
На обучающей выборке RMSE: 77476.58265790308
На обучающей выборке MSE: 6002620860.3468895


In [24]:
spark.stop()

# Анализ результатов

По результатам обучения моделей можно сделать вывод, что лучший результат нам дает модель с использованием категориальных признаков в обучающих данных.