**Предсказание стоимости жилья**

**В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.**

## Подготовка данных

### Импортируем все нужные модули из *pyspark.ml.feature* для трансформации признаков, алгоритм линейной регрессии из *pyspark.ml.classification* и модули из *pyspark.ml.evaluation* для оценки качества модели

In [1]:
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator


In [2]:
RANDOM_SEED = 2022
spark = SparkSession.builder \
                    .master("local") \
                    .appName("Hausing - LinearRegression") \
                    .getOrCreate()

### Прочитаем данные из файла '/datasets/housing.csv'

In [3]:
df_housing = spark.read.load('/datasets/housing.csv', format='csv', sep=',', inferSchema=True, header='true')
df_housing.printSchema() 

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



                                                                                

### Анализ датасета

In [4]:
# выведем названия колонок 
print(pd.DataFrame(df_housing.dtypes, columns=['column', 'type']).head(10))

# выведем первые 10 строк 
df_housing.show(10)

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500

Выведем базовые описательные статистики данных в виде таблицы в pandas

In [6]:
# выведем базовые статистики
df_housing.describe().show()

                                                                                

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|    total_bedrooms|        population|       households|     median_income|median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|  count|              20640|            20640|             20640|             20640|             20433|             20640|            20640|             20640|             20640|          20640|
|   mean|-119.56970445736148| 35.6318614341087|28.639486434108527|2635.7630813953488| 537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|206855.81690891474|           null|
| stddev|  2.0035317

Разделим колонки на два типа: числовые и текстовые, которые представляют категориальные данные.

In [7]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 
                   'total_bedrooms', 'population', 'households', 'median_income']

Видим, что есть пропущенные значения в датасете в столбце 'total_bedrooms', проверим

In [8]:
# выведите пропущенные значения в каждой колонке
columns = numerical_cols

for column in columns:
    check_col = F.col(column).cast('float').isNull()
    print(column, df_housing.filter(check_col).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0


Заполним пропуски медианой 

In [10]:
df_housing = df_housing.fillna(int(df_housing.approxQuantile('total_bedrooms', [0.5], 0)[0]))

In [11]:
# проверим
columns = numerical_cols

for column in columns:
    check_col = F.col(column).cast('float').isNull()
    print(column, df_housing.filter(check_col).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0


Далее приступим к первому этапу — трансформации признаков.

### Разделение на выборки

In [12]:
train_data, test_data = df_housing.randomSplit([.8,.2], seed=RANDOM_SEED)
print(train_data.count(), test_data.count()) 

                                                                                

16418 4222


### Трансформация категорийных признаков

В первую очередь трансформируем категориальные признаки с помощью трансформера StringIndexer.

In [14]:
indexer = StringIndexer(inputCols=categorical_cols, 
                        outputCols=[c+'_idx' for c in categorical_cols])
indexer.setHandleInvalid("skip")
index_model = indexer.fit(train_data)
train_data = index_model.transform(train_data)

cols = [c for c in train_data.columns for i in categorical_cols if (c.startswith(i))]
train_data.select(cols).show(5) 

                                                                                

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 5 rows



Дополнительно создим OHE-кодирование для категорий

In [15]:
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols],
                        outputCols=[c+'_ohe' for c in categorical_cols])
encoder_model = encoder.fit(train_data)
train_data = encoder_model.transform(train_data)

cols = [c for c in train_data.columns for i in categorical_cols if (c.startswith(i))]
train_data.select(cols).show(5) 

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 5 rows



Объединим признаки в один вектор

In [16]:
categorical_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols], outputCol='categorical_features')
train_data = categorical_assembler.transform(train_data) 

**Проделаем трансформацию категорийных признаков для тестовой выборки**

In [17]:
test_data = index_model.transform(test_data)

cols = [c for c in test_data.columns for i in categorical_cols if (c.startswith(i))]
test_data.select(cols).show(5) 

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 5 rows



In [18]:
test_data = encoder_model.transform(test_data)

cols = [c for c in test_data.columns for i in categorical_cols if (c.startswith(i))]
test_data.select(cols).show(5) 

+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 5 rows



In [19]:
test_data = categorical_assembler.transform(test_data) 

### Трансформация числовых признаков

**- Для тренировочной выборки**

In [20]:
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol='numerical_features')
train_data = numerical_assembler.transform(train_data) 

In [21]:
standardScaler = StandardScaler(inputCol='numerical_features', outputCol='numerical_features_scaled')
ss_model = standardScaler.fit(train_data)
train_data = ss_model.transform(train_data) 

                                                                                

После всех трансформаций получается такая таблица:

In [22]:
print(train_data.columns)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'ocean_proximity', 'ocean_proximity_idx', 'ocean_proximity_ohe', 'categorical_features', 'numerical_features', 'numerical_features_scaled']


**- Для тестовой выборки**

In [23]:
test_data = numerical_assembler.transform(test_data) 

In [24]:
test_data = ss_model.transform(test_data)

In [25]:
print(test_data.columns)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value', 'ocean_proximity', 'ocean_proximity_idx', 'ocean_proximity_ohe', 'categorical_features', 'numerical_features', 'numerical_features_scaled']


### Соберем трансформированные категорийные и числовые признаки с помощью VectorAssembler

In [26]:
all_features = ['categorical_features','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, outputCol='features') 
train_data = final_assembler.transform(train_data)
test_data = final_assembler.transform(test_data)

In [27]:
train_data.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+--------------------+-------------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|categorical_features|  numerical_features|numerical_features_scaled|            features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+--------------------+-------------------------+--------------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|       [0.0,0.0,1.0]|[

                                                                                

In [28]:
test_data.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+--------------------+-------------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|categorical_features|  numerical_features|numerical_features_scaled|            features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+--------------------+-------------------------+--------------------+
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|       [0.0,0.0,1.0]|[

In [29]:
df_final_train = train_data.select(['median_house_value', 'numerical_features_scaled', 'features'])
df_final_train.show(3)

+------------------+-------------------------+--------------------+
|median_house_value|numerical_features_scaled|            features|
+------------------+-------------------------+--------------------+
|           94600.0|     [-61.952887791441...|[0.0,0.0,1.0,-61....|
|           85800.0|     [-61.927977100733...|[0.0,0.0,1.0,-61....|
|           79000.0|     [-61.913030686308...|[0.0,0.0,1.0,-61....|
+------------------+-------------------------+--------------------+
only showing top 3 rows



In [30]:
df_final_test = test_data.select(['median_house_value', 'numerical_features_scaled', 'features'])
df_final_test.show(3)

+------------------+-------------------------+--------------------+
|median_house_value|numerical_features_scaled|            features|
+------------------+-------------------------+--------------------+
|          103600.0|     [-61.927977100733...|[0.0,0.0,1.0,-61....|
|           50800.0|     [-61.893102133741...|[0.0,0.0,1.0,-61....|
|           58100.0|     [-61.883137857458...|[0.0,0.0,1.0,-61....|
+------------------+-------------------------+--------------------+
only showing top 3 rows



## Обучение моделей

### Обучение модели

#### Используя все данные из файла

In [31]:
regressor_all = LinearRegression(featuresCol = 'features', labelCol = 'median_house_value', maxIter=10, regParam=0.3, elasticNetParam=0.8)
regressor_all = regressor_all.fit(df_final_train)

23/03/03 07:30:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/03/03 07:30:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

#### Используя только числовые переменные, исключив категориальные

In [32]:
regressor_num = LinearRegression(featuresCol = 'numerical_features_scaled', labelCol = 'median_house_value', maxIter=10, regParam=0.3, elasticNetParam=0.8)
regressor_num = regressor_num.fit(df_final_train)

                                                                                

### Предсказание на обучающей выборке

#### Для всех данных

In [33]:
pred_results_all = regressor_all.evaluate(df_final_train)
print("The RMSE for the model is: %2f"% pred_results_all.rootMeanSquaredError)
print("The MAE for the model is: %2f"% pred_results_all.meanAbsoluteError)
print("The r2 for the model is: %2f"% pred_results_all.r2)

                                                                                

The RMSE for the model is: 69434.153485
The MAE for the model is: 50178.684349
The r2 for the model is: 0.636301


#### Только для числовых переменных

In [34]:
pred_results_num = regressor_num.evaluate(df_final_train)
print("The RMSE for the model_num is: %2f"% pred_results_num.rootMeanSquaredError)
print("The MAE for the model_num is: %2f"% pred_results_num.meanAbsoluteError)
print("The r2 for the model_num is: %2f"% pred_results_num.r2)

                                                                                

The RMSE for the model_num is: 69927.877248
The MAE for the model_num is: 51014.808131
The r2 for the model_num is: 0.631110


## Анализ результатов

#### Для всех данных

In [35]:
pred_results_all = regressor_all.evaluate(df_final_test)
print("The RMSE for the model is: %2f"% pred_results_all.rootMeanSquaredError)
print("The MAE for the model is: %2f"% pred_results_all.meanAbsoluteError)
print("The r2 for the model is: %2f"% pred_results_all.r2)

The RMSE for the model is: 68791.573039
The MAE for the model is: 50066.800849
The r2 for the model is: 0.649826


#### Только для числовых переменных

In [36]:
pred_results_num = regressor_num.evaluate(df_final_test)
print("The RMSE for the model_num is: %2f"% pred_results_num.rootMeanSquaredError)
print("The MSE for the model_num is: %2f"% pred_results_num.meanAbsoluteError)
print("The r2 for the model_num is: %2f"% pred_results_num.r2)

The RMSE for the model_num is: 68979.298440
The MSE for the model_num is: 50760.829894
The r2 for the model_num is: 0.647913


In [37]:
spark.stop()

**Модель обученая на всех данных (числовых и категориальных) показывает лучшие параметры:**

    - RMSE ниже
    - MAE ниже
    - r2 выше