## Предсказание стоимости жилья

В проекте нам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучим модель и сделаем предсказания на тестовой выборке. Для оценки качества модели используем метрики RMSE, MAE и R2.

# Подготовка данных

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.functions import vector_to_array
import pyspark.sql.functions as F
from pyspark.mllib.evaluation import RegressionMetrics

RANDOM_SEED = 12345

In [2]:
spark = SparkSession.builder.master("local").appName("Spark project").getOrCreate()

In [3]:
df = spark.read.load('/datasets/housing.csv', 
                     format="csv", sep=",", inferSchema=True, header="true")
df.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



                                                                                

In [4]:
df.show(5)
df.describe().toPandas()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


Исследуем данные и обработаем пропуски

In [5]:
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



In [10]:
df = df.na.fill(int(df.approxQuantile('total_bedrooms', [0.5], 0)[0]))

Заполнили пропуски в кол-ве комнат медианой. Преобразуем категорию ocean_proximity в числовое значение

In [13]:
numerical_cols = ['housing_median_age','total_rooms','total_bedrooms','population','households','median_income']
categorial_cols = 'ocean_proximity'
target = 'median_house_value'

indexer = StringIndexer(inputCol=categorial_cols, 
outputCol=categorial_cols+'_idx') 
df = indexer.fit(df).transform(df)
#OHE 
encoder = OneHotEncoder(inputCol=categorial_cols+'_idx',outputCol='cat_feature')
df = encoder.fit(df).transform(df)

df.toPandas().head(5)

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,ocean_proximity_idx,cat_feature
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)"
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)"
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)"
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)"
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY,3.0,"(0.0, 0.0, 0.0, 1.0)"


Трансформируем числовые признаки

In [16]:
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)

numerical_assembler = VectorAssembler(inputCols=numerical_cols,outputCol="numerical_features")
train_data = numerical_assembler.transform(train_data)
standardScaler = StandardScaler(inputCol='numerical_features',
                              outputCol="numerical_features_scaled")
train_data = standardScaler.fit(train_data).transform(train_data)
all_features = ['cat_feature','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol="features") 
train_data = final_assembler.transform(train_data)

test_data = numerical_assembler.transform(test_data)
test_data = standardScaler.fit(test_data).transform(test_data)
test_data = final_assembler.transform(test_data)

train_data.select(all_features).show(3)
train_data.show(3)

+-------------+-------------------------+
|  cat_feature|numerical_features_scaled|
+-------------+-------------------------+
|(4,[2],[1.0])|     [4.12971097473364...|
|(4,[2],[1.0])|     [1.50893285615267...|
|(4,[2],[1.0])|     [1.35009781866292...|
+-------------+-------------------------+
only showing top 3 rows

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------+--------------------+-------------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|  cat_feature|  numerical_features|numerical_features_scaled|            features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------+--------------------+---------

# Обучение моделей

In [13]:
%%time
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count())
# model training using all features
lr_all = LinearRegression(labelCol=target, featuresCol='features')
model_all = lr_all.fit(train_data)
predictions_all = model_all.transform(test_data)

predictedLabes_all = predictions_all.select(target, "prediction")
predictedLabes_all.show(5)

16431 4209


22/11/12 08:32:53 WARN Instrumentation: [8a58bc9b] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          106700.0|234648.59882537296|
|          128900.0| 229397.8691104067|
|          116100.0|257398.93248751463|
|           70500.0| 188525.5161073701|
|           85600.0|212834.34324941318|
+------------------+------------------+
only showing top 5 rows

CPU times: user 19.9 ms, sys: 5.36 ms, total: 25.3 ms
Wall time: 4.49 s


In [12]:
%%time
lr_num = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled')
model_num = lr_num.fit(train_data)
predictions_num = model_num.transform(test_data)
 
predictedLabes_num = predictions_num.select(target, "prediction")
predictedLabes_num.show(5) 

22/11/12 08:32:44 WARN Instrumentation: [f156b1a6] regParam is zero, which might cause numerical instability and overfitting.
22/11/12 08:32:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/11/12 08:32:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/11/12 08:32:45 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/11/12 08:32:45 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          106700.0|204973.19455590227|
|          128900.0|182059.91676221372|
|          116100.0|213770.97939171502|
|           70500.0|152686.01998715577|
|           85600.0| 176219.0845495657|
+------------------+------------------+
only showing top 5 rows

CPU times: user 30.8 ms, sys: 0 ns, total: 30.8 ms
Wall time: 4.86 s


# Анализ результатов

In [25]:
metrics = RegressionMetrics(predictedLabes_all.rdd)
print("RMSE всех признаков = %s" % metrics.rootMeanSquaredError)

metrics = RegressionMetrics(predictedLabes_num.rdd)
print("RMSE числовых признаков = %s" % metrics.rootMeanSquaredError)

RMSE всех признаков = 68480.9925939211
RMSE числовых признаков = 74220.03657063161


In [29]:
metrics = RegressionMetrics(predictedLabes_all.rdd)
print("MAE всех признаков = %s" % metrics.meanAbsoluteError)

metrics = RegressionMetrics(predictedLabes_num.rdd)
print("MAE числовых признаков = %s" % metrics.meanAbsoluteError)

MAE всех признаков = 49644.355043246964
MAE числовых признаков = 54892.3346696914


In [28]:
metrics = RegressionMetrics(predictedLabes_all.rdd)
print("r2 всех признаков = %s" % metrics.r2)

metrics = RegressionMetrics(predictedLabes_num.rdd)
print("r2 числовых признаков = %s" % metrics.r2)

r2 всех признаков = 0.4340444213798942
r2 числовых признаков = 0.2249154048296097


In [6]:
spark.stop()

Видим, что MAE и RMSE мы получили меньшими на данных со всеми признаками, однако коэффициент зависимости r2 на числовых признаках получился в два раза меньше у числовых признаков, чем на всех признаках. 