# Предсказание стоимости жилья

В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

## Подготовка данных

In [2]:
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import isnan, when, count, col

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator
        

spark = SparkSession.builder \
                    .master("local") \
                    .appName("Housing in California") \
                    .getOrCreate()

Exception in thread "main" java.nio.file.NoSuchFileException: /tmp/tmpfl6q03ec/connection2076306384977217840.info
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
	at java.nio.file.Files.newByteChannel(Files.java:361)
	at java.nio.file.Files.createFile(Files.java:632)
	at java.nio.file.TempFileHelper.create(TempFileHelper.java:138)
	at java.nio.file.TempFileHelper.createTempFile(TempFileHelper.java:161)
	at java.nio.file.Files.createTempFile(Files.java:852)
	at org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:52)
	at org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.i

In [3]:
spark = SparkSession.builder.appName('spark3.2show').getOrCreate()
print('Spark info :')
spark

Spark info :


Используем Spark версии 3.0.2 .  Так же посмотрим на версию Python и выведем версию Spark

In [4]:
print('Version of python: ') 
!python -V
print('Version of pyspark :', pyspark.__version__)

Version of python: 
Python 3.9.5
Version of pyspark : 3.0.2


In [5]:
df = spark.read.option('header', 'true').csv('hous.csv', inferSchema = True) 

                                                                                

Импортируем DataFrame 

In [6]:
type(df)

pyspark.sql.dataframe.DataFrame

Убеждаемся, что это датафрейм spark

In [7]:
df.printSchema() 

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Все столбцы имеют тип данных **double (float)**, и один столбец тип **string**.

In [8]:
df.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 3 rows



In [9]:
df.count() 

20640

В нашем DataFrame 20640 строк.

In [10]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



In [11]:
df.summary().select(['summary', 'longitude', 'latitude', 'housing_median_age', 'total_rooms']).show()

                                                                                

+-------+-------------------+-----------------+------------------+------------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|
+-------+-------------------+-----------------+------------------+------------------+
|  count|              20640|            20640|             20640|             20640|
|   mean|-119.56970445736148| 35.6318614341087|28.639486434108527|2635.7630813953488|
| stddev|  2.003531723502584|2.135952397457101| 12.58555761211163|2181.6152515827944|
|    min|            -124.35|            32.54|               1.0|               2.0|
|    25%|             -121.8|            33.93|              18.0|            1447.0|
|    50%|            -118.49|            34.26|              29.0|            2127.0|
|    75%|            -118.01|            37.71|              37.0|            3146.0|
|    max|            -114.31|            41.95|              52.0|           39320.0|
+-------+-------------------+-----------------+-------

In [12]:
Dict_Null = {col:df.filter(df[col].isNull()).count() for col in df.columns}
Dict_Null

{'longitude': 0,
 'latitude': 0,
 'housing_median_age': 0,
 'total_rooms': 0,
 'total_bedrooms': 207,
 'population': 0,
 'households': 0,
 'median_income': 0,
 'median_house_value': 0,
 'ocean_proximity': 0}

У нас 207 значений в столбце 'total_bedrooms' являются NaN.

In [13]:
df.select('total_bedrooms').show(3)

+--------------+
|total_bedrooms|
+--------------+
|         129.0|
|        1106.0|
|         190.0|
+--------------+
only showing top 3 rows



In [14]:
med2 = df.approxQuantile("total_bedrooms", [0.5], 0.25)

In [16]:
df = df.na.fill(med2[0])

In [15]:
Dict_Null = {col:df.filter(df[col].isNull()).count() for col in df.columns}
Dict_Null

{'longitude': 0,
 'latitude': 0,
 'housing_median_age': 0,
 'total_rooms': 0,
 'total_bedrooms': 0,
 'population': 0,
 'households': 0,
 'median_income': 0,
 'median_house_value': 0,
 'ocean_proximity': 0}

Заменил NaN значения в столбце total_bedrooms на медиану этого столбца.

## Обучение моделей

### Трансформация категориальных признаков 

In [16]:
df.select([c for c in df.columns if c in ['ocean_proximity']]).show(5)

+---------------+
|ocean_proximity|
+---------------+
|       NEAR BAY|
|       NEAR BAY|
|       NEAR BAY|
|       NEAR BAY|
|       NEAR BAY|
+---------------+
only showing top 5 rows



Тип колонки string 

In [17]:
ocean_proxim = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index")
df = ocean_proxim.fit(df).transform(df)

                                                                                

Мы не можем напрямую применять OneHotEncoder к строковым столбцам. Нам нужно сначала преобразовать строковые столбцы в числовое значение. Для этого мы будем использовать StringIndexer. После этого мы можем применить OneHotEncoder.

In [18]:
df.select([c for c in df.columns if c in ['ocean_proximity_index']]).show(5)

+---------------------+
|ocean_proximity_index|
+---------------------+
|                  3.0|
|                  3.0|
|                  3.0|
|                  3.0|
|                  3.0|
+---------------------+
only showing top 5 rows



Проверяем результат

In [19]:
one_hotencoder_1 = OneHotEncoder(dropLast=True, inputCol="ocean_proximity_index", outputCol="ocean_proximity_vec")
df = one_hotencoder_1.fit(df).transform(df)

Кодировали признак при помощи OneHotEncoder

In [20]:
df.select([c for c in df.columns if c in ['ocean_proximity_vec']]).show(5)

+-------------------+
|ocean_proximity_vec|
+-------------------+
|      (4,[3],[1.0])|
|      (4,[3],[1.0])|
|      (4,[3],[1.0])|
|      (4,[3],[1.0])|
|      (4,[3],[1.0])|
+-------------------+
only showing top 5 rows



Проверяем кодировку столбца

In [22]:
categorical_assembler = VectorAssembler(inputCols=['ocean_proximity_vec'],
                                        outputCol="categorical_features")
df = categorical_assembler.transform(df) 

In [23]:
df.show(2)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+-------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_index|ocean_proximity_vec|categorical_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---------------------+-------------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|                  3.0|      (4,[3],[1.0])|       (4,[3],[1.0])|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|                  3.0|      (4,[3],[1.0])|       (4,[3],[1.

### Трансформация числовых признаков

In [24]:
numerical_cols = (['longitude', 
                   'latitude', 
                   'housing_median_age', 
                   'total_rooms', 
                   'total_bedrooms',
                   'population',
                   'households',
                   'median_income'])

target = 'median_house_value'                  # Целевой признак

В список записали числовые признаки для дальнейшей трансформации.

In [25]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols,
                                      outputCol="numerical_features")

df = numerical_assembler.transform(df) 

standardScaler = StandardScaler(inputCol='numerical_features',
                                outputCol="numerical_features_scaled")
df = standardScaler.fit(df).transform(df) 

                                                                                

In [26]:
df.printSchema() 

root
 |-- longitude: double (nullable = false)
 |-- latitude: double (nullable = false)
 |-- housing_median_age: double (nullable = false)
 |-- total_rooms: double (nullable = false)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = false)
 |-- households: double (nullable = false)
 |-- median_income: double (nullable = false)
 |-- median_house_value: double (nullable = false)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_index: double (nullable = false)
 |-- ocean_proximity_vec: vector (nullable = true)
 |-- categorical_features: vector (nullable = true)
 |-- numerical_features: vector (nullable = true)
 |-- numerical_features_scaled: vector (nullable = true)



После всех трансформаций получается такая таблица

In [27]:
all_features = ['categorical_features', 'numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, 
                                  outputCol='features') 
df = final_assembler.transform(df)

df.select(all_features).show(3) 


+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[3],[1.0])|     [-61.007269596069...|
|       (4,[3],[1.0])|     [-61.002278409814...|
|       (4,[3],[1.0])|     [-61.012260782324...|
+--------------------+-------------------------+
only showing top 3 rows



Собраем трансформированные категорийные и числовые признаки с помощью VectorAssembler

In [28]:
df.select(['categorical_features', 'numerical_features_scaled', 'features']).show(3)


+--------------------+-------------------------+--------------------+
|categorical_features|numerical_features_scaled|            features|
+--------------------+-------------------------+--------------------+
|       (4,[3],[1.0])|     [-61.007269596069...|[0.0,0.0,0.0,1.0,...|
|       (4,[3],[1.0])|     [-61.002278409814...|[0.0,0.0,0.0,1.0,...|
|       (4,[3],[1.0])|     [-61.012260782324...|[0.0,0.0,0.0,1.0,...|
+--------------------+-------------------------+--------------------+
only showing top 3 rows



In [29]:
RANDOM_SEED = 2022
train_data, test_data = df.randomSplit([.8,.2], seed=RANDOM_SEED)
display(train_data.count(), test_data.count()) 


                                                                                

16418

4222

Разделяем наш датасет на две части — выборку для обучения и выборку для тестирования качества модели.
Разделение на выборки в Spark можно сделать с помощью метода randomSplit()


### Обучаем модель используя все данные из файла

In [30]:
lin_reg = LinearRegression(featuresCol='features', labelCol=target) 

In [31]:
grid_search = ParamGridBuilder() \
    .addGrid(lin_reg.regParam, [0.01, 0.1]) \
    .addGrid(lin_reg.elasticNetParam, [0.5, 1.0]) \
    .build()

Используем ParamGridBuilder для перебора гиперпараметров

In [32]:
evaluator = RegressionEvaluator(predictionCol='prediction',
                                labelCol='median_house_value',
                                metricName='rmse')

In [33]:
cv = CrossValidator(estimator=lin_reg,
                    estimatorParamMaps=grid_search,
                    evaluator=evaluator)
cv_model = cv.fit(train_data)

22/04/28 14:23:44 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/04/28 14:23:44 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

Используем CrossValidator для нахождения лучших гиперпараметров.

In [34]:
best_model = cv_model.bestModel
best_reg_param = best_model._java_obj.getRegParam()
best_elasticnet_param = best_model._java_obj.getElasticNetParam()
best_elasticnet_param

1.0

In [35]:
best_reg_param

0.1

Выводим лучшие гиперпараметры

In [36]:
lr = LinearRegression(featuresCol='features', 
                      labelCol=target, 
                      regParam=best_reg_param, 
                      elasticNetParam=best_elasticnet_param) 
model = lr.fit(train_data)

Обучаем модель LinearRegression

In [37]:
predictions = model.transform(test_data)
predictedLabes = predictions.select("median_house_value", "prediction")
predictedLabes.show(5) 


+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|152939.92290328303|
|           50800.0|214910.79095488274|
|           58100.0|142694.09725516802|
|           68400.0|132405.62813236937|
|           72200.0|164438.67100292165|
+------------------+------------------+
only showing top 5 rows



Делаем предсказания и выводим часть результата целевой столбец и предсказанные значения

In [38]:
evaluator = RegressionEvaluator(predictionCol='prediction',
                                labelCol='median_house_value')

RegressionEvaluator по умолчанию в качестве метрики использует среднюю квадратическую ошибку (MSE).

In [39]:
r2_1 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

In [40]:
mae_1 = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})

In [41]:
rmse_1 = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})

### Обучаем модель используя только числовые переменные, исключив категориальные

In [42]:
lin_reg = LinearRegression(featuresCol='numerical_features_scaled', labelCol=target) 

In [43]:
grid_search = ParamGridBuilder() \
    .addGrid(lin_reg.regParam, [0.01, 0.1]) \
    .addGrid(lin_reg.elasticNetParam, [0.5, 1.0]) \
    .build()

In [44]:
evaluator = RegressionEvaluator(predictionCol='prediction',
                                labelCol='median_house_value',
                                metricName='rmse')

In [45]:
cv = CrossValidator(estimator=lin_reg,
                    estimatorParamMaps=grid_search,
                    evaluator=evaluator)
cv_model = cv.fit(train_data)

                                                                                

In [46]:
best_model = cv_model.bestModel
best_reg_param = best_model._java_obj.getRegParam()
best_elasticnet_param = best_model._java_obj.getElasticNetParam()
best_elasticnet_param

0.5

In [47]:
best_reg_param

0.01

In [48]:
lr = LinearRegression(featuresCol='numerical_features_scaled', 
                      labelCol=target, 
                      regParam=best_reg_param, 
                      elasticNetParam=best_elasticnet_param) 
model = lr.fit(train_data)

In [49]:
predictions = model.transform(test_data)
predictedLabes = predictions.select("median_house_value", "prediction")
predictedLabes.show(5) 

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|101296.77003160724|
|           50800.0|183232.23560047522|
|           58100.0|109544.91652085586|
|           68400.0|  80314.5777566433|
|           72200.0|130293.92468634155|
+------------------+------------------+
only showing top 5 rows



In [50]:
evaluator = RegressionEvaluator(predictionCol='prediction',
                                labelCol='median_house_value')

In [51]:
r2_2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

In [52]:
mae_2 = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})

In [53]:
rmse_2 = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})

69224.32369565684

# Анализ результатов

In [54]:
schema = StructType([StructField("type", StringType())\
                     ,StructField("cat_num_signs", StringType())\
                     ,StructField("num_signs", StringType())])

test_list = [['r2', r2_1, r2_2], 
             ['mae', mae_1, mae_2],
             ['rmse', rmse_1, rmse_2]]

df_3 = spark.createDataFrame(test_list,schema=schema) 
df_3.show()

[Stage 210:>                                                        (0 + 1) / 1]

+----+------------------+------------------+
|type|     cat_num_signs|         num_signs|
+----+------------------+------------------+
|  r2|0.6534198842147974|0.6460557656693573|
| mae| 49907.92036584393| 50906.16412328148|
|rmse| 68500.40222904057| 69224.32369565684|
+----+------------------+------------------+



                                                                                

Из таблицы можно сделать вывод, что результат с категориальным признаком немного лучше, чем без него.

## Вывод

В данном проекте применил библиотеку pyspark для обработки dataframe. Импортировал df, посмотрел на данные. Заполнил пропуски медианой, затем сделал трансформацию категориальных признаком, после числовых признаков. Разбил данные 80% - 20%. Обучил модель LinearRegression на всех данных, после только на числовых признаках. Вывел метрики "r2", "mae" и "rmse". Результат работы такой, модель нужно обучать на категориальных и числовых признаков, т к без категориальных - результат немного хуже.