## Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

In [5]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
        
from pyspark.sql.functions import isnan, when, count, col
RANDOM_SEED = 2022 

In [3]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("Housing - Linear regression") \
                    .getOrCreate()

In [71]:
df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema = True) 

# Подготовка данных

In [6]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [74]:
for column in df.columns:
    df.describe([column]).show()

+-------+-------------------+
|summary|          longitude|
+-------+-------------------+
|  count|              20640|
|   mean|-119.56970445736148|
| stddev|  2.003531723502584|
|    min|            -124.35|
|    max|            -114.31|
+-------+-------------------+

+-------+-----------------+
|summary|         latitude|
+-------+-----------------+
|  count|            20640|
|   mean| 35.6318614341087|
| stddev|2.135952397457101|
|    min|            32.54|
|    max|            41.95|
+-------+-----------------+

+-------+------------------+
|summary|housing_median_age|
+-------+------------------+
|  count|             20640|
|   mean|28.639486434108527|
| stddev| 12.58555761211163|
|    min|               1.0|
|    max|              52.0|
+-------+------------------+

+-------+------------------+
|summary|       total_rooms|
+-------+------------------+
|  count|             20640|
|   mean|2635.7630813953488|
| stddev|2181.6152515827944|
|    min|               2.0|
|    max|  

In [7]:
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()


[Stage 4:>                                                          (0 + 1) / 1]

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



                                                                                

Получается, что в данном датасете нет Nan значений 

In [8]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



In [9]:
df = df.withColumn('total_bedrooms', when(df['total_bedrooms'].isNull(), df['total_rooms'] * 0.2).otherwise(df['total_bedrooms']))
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



In [10]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ["longitude", "latitude", "housing_median_age", 
                   "total_rooms", "total_bedrooms", "population",
                  "households", "median_income"]
target = "median_house_value" 

Соберем новый датасет из численных признаков

In [44]:
numeric_df = df[numerical_cols]

Трансформируем категориальные признаки с помощью трансформера StringIndexer

In [12]:
indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index")
df = indexer.fit(df).transform(df)

                                                                                

In [21]:
df.groupBy('ocean_proximity_index').count().show()



+---------------------+-----+
|ocean_proximity_index|count|
+---------------------+-----+
|                  0.0| 9136|
|                  1.0| 6551|
|                  4.0|    5|
|                  3.0| 2290|
|                  2.0| 2658|
+---------------------+-----+



                                                                                

Теперь можно создать OHE-кодирование для полученных категорий.

In [22]:
encoder = OneHotEncoder(inputCols= ['ocean_proximity_index'],
                        outputCols=['ocean_proximity_index_ohe'])
df = encoder.fit(df).transform(df)

In [23]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_index: double (nullable = false)
 |-- ocean_proximity_index_ohe: vector (nullable = true)



Удалим ненужные признаки

In [24]:
columns_to_drop = ['ocean_proximity', 'ocean_proximity_index']
df = df.drop(*columns_to_drop)

Векторизуем и стандартизируем числовые признаки

In [25]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")
df = numerical_assembler.transform(df)

In [26]:
standardScaler = StandardScaler(inputCol='numerical_features',
                                                                outputCol="numerical_features_scaled")
df = standardScaler.fit(df).transform(df)

                                                                                

In [27]:
print(df.columns)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'ocean_proximity_index_ohe', 'numerical_features', 'numerical_features_scaled']


In [28]:
categorical_assembler = \
        VectorAssembler(inputCols=['ocean_proximity_index_ohe'],
                                        outputCol="categorical_features")
df = categorical_assembler.transform(df) 

Для обучения модели в будущем соберем все признаки в один и снова векторизуем

In [29]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 
df = final_assembler.transform(df)

df.select(all_features).show(3) 

+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[3],[1.0])|     [-61.007269596069...|
|       (4,[3],[1.0])|     [-61.002278409814...|
|       (4,[3],[1.0])|     [-61.012260782324...|
+--------------------+-------------------------+
only showing top 3 rows



In [56]:
numeric_df = df[['numerical_features_scaled','median_house_value']]

In [57]:
numeric_df.show()

+-------------------------+------------------+
|numerical_features_scaled|median_house_value|
+-------------------------+------------------+
|     [-61.007269596069...|          452600.0|
|     [-61.002278409814...|          358500.0|
|     [-61.012260782324...|          352100.0|
|     [-61.017251968579...|          341300.0|
|     [-61.017251968579...|          342200.0|
|     [-61.017251968579...|          269700.0|
|     [-61.017251968579...|          299200.0|
|     [-61.017251968579...|          241400.0|
|     [-61.022243154834...|          226700.0|
|     [-61.017251968579...|          261100.0|
|     [-61.022243154834...|          281500.0|
|     [-61.022243154834...|          241800.0|
|     [-61.022243154834...|          213500.0|
|     [-61.022243154834...|          191300.0|
|     [-61.022243154834...|          159200.0|
|     [-61.022243154834...|          140000.0|
|     [-61.027234341089...|          152500.0|
|     [-61.027234341089...|          155500.0|
|     [-61.02

# Обучение моделей

Разделим датасет на обучающие и тестовые выборки

In [35]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())

[Stage 29:>                                                         (0 + 1) / 1]

16418 4222


                                                                                

In [58]:
numeric_train_data, numeric_test_data = numeric_df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(numeric_train_data.count(), numeric_test_data.count())

                                                                                

16418 4222


Теперь иницируем и обучим модели для общего датасета и числового 

In [39]:
lr = LinearRegression(labelCol=target, featuresCol='features')

In [40]:
model = lr.fit(train_data)

22/06/06 21:32:50 WARN Instrumentation: [37873356] regParam is zero, which might cause numerical instability and overfitting.
22/06/06 21:32:51 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/06/06 21:32:51 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/06/06 21:32:52 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/06/06 21:32:52 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
22/06/06 21:32:52 WARN Instrumentation: [37873356] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
22/06/06 21:32:54 ERROR LBFGS: Failure! Resetting history: breeze.optimize.FirstOrderException: Line search zoom failed
                                                                                

In [59]:
numeric_lr = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')

In [60]:
numeric_model = numeric_lr.fit(numeric_train_data)

22/06/06 21:43:55 WARN Instrumentation: [3cfdc765] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

# Анализ результатов

In [61]:
predictions = model.transform(test_data)

predictedLabes = predictions.select("median_house_value", "prediction")
predictedLabes.show() 

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0| 152791.2402468701|
|           50800.0|215668.98234039592|
|           58100.0|143288.28411708027|
|           68400.0| 132092.2624493139|
|           72200.0|163042.00581792276|
|           67000.0|154688.45326637244|
|           81300.0|151650.22712867148|
|           70500.0| 163065.2669858574|
|           60000.0| 144079.0329603944|
|          109400.0| 170363.0811676453|
|           74100.0|151317.53441630676|
|           74700.0|167066.84335186938|
|           90000.0| 208699.3061046577|
|          104200.0|199438.52662651055|
|           74100.0|156257.78715729248|
|           67500.0|147619.24190479936|
|          103100.0| 47775.59587908862|
|           92500.0|165487.87888506847|
|          128100.0|221371.33085720055|
|           99600.0| 186257.7123783962|
+------------------+------------------+
only showing top 20 rows



                                                                                

In [62]:
results = predictions.select(["median_house_value", "prediction"])
 
## prepare score-label set
results_collect = results.collect()
results_list = [ (float(i[0]), float(i[1])) for i in results_collect]
scoreAndLabels = spark.sparkContext.parallelize(results_list)

                                                                                

In [63]:
# Instantiate metrics object
metrics = RegressionMetrics(scoreAndLabels)

# Squared Error
print("MSE = ", metrics.meanSquaredError)
print("RMSE = ", metrics.rootMeanSquaredError)

# R-squared
print("R-squared = ", metrics.r2)

                                                                                

MSE =  4690473909.587973
RMSE =  68487.03460939138
R-squared =  0.44778824110748505


In [64]:
numeric_predictions = numeric_model.transform(numeric_test_data)

numeric_predictedLabes = numeric_predictions.select("median_house_value", "prediction")
numeric_predictedLabes.show() 

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|101896.46677056747|
|           50800.0|184434.60668721003|
|           58100.0|110635.08713017311|
|           68400.0|  80726.3918724223|
|           72200.0|129197.00442492217|
|           67000.0|121117.80250556301|
|           81300.0|117145.66684774077|
|           70500.0|129408.49222369958|
|           60000.0|111917.55108316895|
|          109400.0|117924.63941164035|
|           74100.0| 118874.6584638399|
|           74700.0|133893.89174431656|
|           90000.0|175112.48979143891|
|          104200.0| 165773.2110000332|
|           74100.0|122401.84285740973|
|           67500.0|113396.27866047854|
|          103100.0|-7202.959182919469|
|           92500.0| 139824.6806590967|
|          128100.0| 190793.7435223218|
|           99600.0|151974.29617243353|
+------------------+------------------+
only showing top 20 rows



In [65]:
numeric_results = numeric_predictions.select(["median_house_value", "prediction"])
 
## prepare score-label set
numeric_results_collect = numeric_results.collect()
numeric_results_list = [ (float(i[0]), float(i[1])) for i in numeric_results_collect]
numeric_scoreAndLabels = spark.sparkContext.parallelize(numeric_results_list)

In [66]:
# Instantiate metrics object
numeric_metrics = RegressionMetrics(numeric_scoreAndLabels)

# Squared Error
print("MSE = ", numeric_metrics.meanSquaredError)
print("RMSE = ", numeric_metrics.rootMeanSquaredError)

# R-squared
print("R-squared = ", numeric_metrics.r2)

MSE =  4791411332.064669
RMSE =  69220.02117931393
R-squared =  0.42590728351452134


# Вывод 

Из приведенного выше анализа результатов можно сделать вывод, что модель обученная на исключительно на числовых признаках показывает  результаты хуже, чем модель с числовыми и категориальными признаками

In [76]:
 spark.stop()