In [97]:
import pandas as pd 
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib
import pyspark
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [98]:
RANDOM_SEED = 2022

spark = SparkSession.builder \
                    .master("local") \
                    .appName("Housing - Linear regression") \
                    .getOrCreate()

df = spark.read.load('/datasets/housing.csv',
                                        format='csv',
                                             sep=',',
                                    inferSchema=True,
                                         header=True)

In [99]:
df.count()

20640

Таблица состоит из 20640 строк.

In [100]:
columns = df.columns

for column in columns:
    print(column, df.where(F.isnan(column) | F.col(column).isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Так как в таблице 20640 строк, а пропусков всего 207, заполним их медианным значением.

In [101]:
print(pd.DataFrame(df.dtypes, columns=['column', 'type']))

               column    type
0           longitude  double
1            latitude  double
2  housing_median_age  double
3         total_rooms  double
4      total_bedrooms  double
5          population  double
6          households  double
7       median_income  double
8  median_house_value  double
9     ocean_proximity  string


In [102]:
median = df.approxQuantile('total_bedrooms', [0.5], 0)[0]

In [103]:
df= df.fillna(median, subset=['total_bedrooms'])
df.select(
    [F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]
    ).toPandas().T.rename(columns={0:'NA'})

Unnamed: 0,NA
longitude,0
latitude,0
housing_median_age,0
total_rooms,0
total_bedrooms,0
population,0
households,0
median_income,0
median_house_value,0
ocean_proximity,0


In [104]:
df.describe().toPandas()

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,536.8388565891473,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,419.3918779216887,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


На данном этапе мы успешно загрузили DataFrame, посмотрели типы колонок, увидели, что в одном из толпцов всего спален 207 пропусков, и так как это около 1% от общего числа, поэтому мы решили заменить его медианным значением. Данные подготовлены, и далее можно будет с ними работать.

Разделим все признаки на категориальные, числовый и выделим целевой.

In [105]:
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']
target = 'median_house_value' 

In [106]:
df_num=df.select(numerical_cols )
df_num.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|
|  -122.25|   37.85|              52.0|     1627.0|         280.0|     565.0|     259.0|       3.8462|
|  -122.25|   37.85|              52.0|      919.0|         213.0|     413.0|     193.0|       4.0368|
|  -122.25|   37.84|              52.0|     2535.0|         489.0|    109

Перед тем, как трансформировать признаки и обучать модели, мы разделим датасет на тренировочную и обучающую выборки, чтобы не было утечки данных.

In [107]:
train, test = df.randomSplit([.8,.2], seed=RANDOM_SEED)
print('Количество объектов в обучающей выборке:', train.count())
print('Количество объектов в тестовой выборке:', test.count())

Количество объектов в обучающей выборке: 16418
Количество объектов в тестовой выборке: 4222


Трансформируем категориальные признаки с помощью StringIndexer

In [109]:
indexer = StringIndexer(inputCol = 'ocean_proximity', 
                        outputCol= 'ocean_proximity_idx') 
indexer_model = indexer.fit(train)
train_indexed = indexer_model.transform(train)
test_indexed = indexer_model.transform(test)

train_indexed.show()
test_indexed.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|                2.0|
|   -124.3|    41.8|              19.0|     2672.0|         552.0|    1298.0|     478.0|       1.9797|           85800.0|     NEAR OCEAN|                2.0|
|  -124.27|   40.69|              36.0|     2349.0|         528.0|    1194.0|     465.0|       2.5179|           79000.0|     NEAR OCEAN|                2.0|
|  -124.26|   40.58|              52.0|     2217.0| 

Далее применим OHE-кодирование

In [110]:
encoder = OneHotEncoder(inputCol = 'ocean_proximity_idx', 
                        outputCol= 'ocean_proximity_ohe')
encoder_model = encoder.fit(train_indexed)
train_ohe= encoder_model.transform(train_indexed)
test_ohe = encoder_model.transform(test_indexed)

train_ohe.show()
test_ohe.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|   -124.3|    41.8|              19.0|     2672.0|         552.0|    1298.0|     478.0|       1.9797|           85800.0|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|
|  -124.27|   40.69|              36.0|     2349.0|         528.0|    1194.0|     465.0|       2.5179|        

Далее преобразовывем наш единственный категориальный признак в векторы.

In [111]:
categorical_cols = ['ocean_proximity_ohe']
categorical_assembler = VectorAssembler(inputCols= categorical_cols, outputCol="categorical_features")
df_vect_categ_train = categorical_assembler.transform(train_ohe)
df_vect_categ_test = categorical_assembler.transform(test_ohe)

df_vect_categ_train.show()
df_vect_categ_test.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|categorical_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|       [0.0,0.0,1.0]|
|   -124.3|    41.8|              19.0|     2672.0|         552.0|    1298.0|     478.0|       1.9797|           85800.0|     NEAR OCEAN|                2.0|      (3,[2],[1.0])|       [0.0,0.0,1.0]|
|  -1

На данном этапе мы будем трансформировать числовые признаки с помощью стандарт-скеллера, чтобы на предсказания модели сильно не влияли выбросы.

In [112]:
numerical_cols = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features")

df_scaler_train = numerical_assembler.transform(df_vect_categ_train) 
df_scaler_test = numerical_assembler.transform(df_vect_categ_test)

In [113]:
scaler = StandardScaler(inputCol='numerical_features', outputCol='numerical_features_scaled')
scaler_model = scaler.fit(df_scaler_train)
df_housing_train = scaler_model.transform(df_scaler_train)
df_housing_test = scaler_model.transform(df_scaler_test)
df_housing_train.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- ocean_proximity_idx: double (nullable = false)
 |-- ocean_proximity_ohe: vector (nullable = true)
 |-- categorical_features: vector (nullable = true)
 |-- numerical_features: vector (nullable = true)
 |-- numerical_features_scaled: vector (nullable = true)



На данном шаге преобразуем категориальные и числовые признаки в один вектор.

In [114]:
all_features = ['ocean_proximity_ohe','numerical_features_scaled']

final_assembler = VectorAssembler(inputCols=all_features, outputCol='features')
df_housing_train = final_assembler.transform(df_housing_train)

In [115]:
df_housing_test = final_assembler.transform(df_housing_test)

In [116]:
df_housing_test.select('features','numerical_features_scaled').show(5)

+--------------------+-------------------------+
|            features|numerical_features_scaled|
+--------------------+-------------------------+
|[0.0,0.0,1.0,-61....|     [-61.927977100733...|
|[0.0,0.0,1.0,-61....|     [-61.893102133741...|
|[0.0,0.0,1.0,-61....|     [-61.883137857458...|
|[0.0,0.0,1.0,-61....|     [-61.883137857458...|
|[0.0,0.0,1.0,-61....|     [-61.868191443033...|
+--------------------+-------------------------+
only showing top 5 rows



**Обучение модели.**

Необходимо построить модели линейной регрессии  
1)lr_all - на всех данных  
2) lr_num - на числовых данных.

In [117]:
lr_all = LinearRegression(labelCol=target, featuresCol='features',
                          maxIter=10, regParam=0.3, elasticNetParam=0.8)
model_all = lr_all.fit(df_housing_train) 

24/08/28 14:43:04 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/08/28 14:43:04 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [118]:
lr_num = LinearRegression(labelCol=target, featuresCol='numerical_features_scaled',
                          maxIter=10, regParam=0.3, elasticNetParam=0.8)
model_num = lr_num.fit(df_housing_train)

In [119]:
predictions_all = model_all.transform(df_housing_test)
predicted_all = predictions_all.select('median_house_value', 'prediction')
predicted_all.show(5)

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|183497.42937253427|
|           50800.0|235433.95229170576|
|           58100.0|158833.03767158056|
|           68400.0|162229.71770885703|
|           72200.0| 181567.9706344204|
+------------------+------------------+
only showing top 5 rows



In [120]:
predictions_num = model_num.transform(df_housing_test)
predicted_num = predictions_num.select('median_house_value', 'prediction')
predicted_num.show(5)

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|101298.92550544953|
|           50800.0|185022.49009660957|
|           58100.0|109505.56520016352|
|           68400.0| 78896.52190991072|
|           72200.0|129260.54787339922|
+------------------+------------------+
only showing top 5 rows



Оценим качество модели по всем признакам и только по числовым.

In [122]:
def two_metrics(model1, model2):
    summ1 = model1.summary
    summ2 = model2.summary
    
    print("All Features Metrics:")
    print('RMSE: %f' % summ1.rootMeanSquaredError)
    print('MAE: %f' % summ1.meanAbsoluteError)
    print('R2: %f' % summ1.r2)

    print("Numweric Features Metrics:")
    print('RMSE: %f' % summ2.rootMeanSquaredError)
    print('MAE: %f' % summ2.meanAbsoluteError)
    print('R2: %f' % summ2.r2)


In [123]:
two_metrics(model_all,model_num)

All Features Metrics:
RMSE: 69434.153485
MAE: 50178.684349
R2: 0.636301
Numweric Features Metrics:
RMSE: 69927.877248
MAE: 51014.808131
R2: 0.631110


Проанализировав две модели, можно сделать выводы, что модель первая, которая обучена на всех признаках, имеет более низкий показатель корня среднеквадратичной ошибки и более высокий коэффициент детерминации, что позволяет сказать, что модель номер один показала себя лучше, чем модель обученная только на числовых признаках. Также на это указывает то, что средняя абсолютная ошибка в модели номер один ниже, чем показатель MAE на второй модели. Поэтому модель обученная на всех признаках в данном случае более предпочтительна. Хотя я не могу сказать, что результат положительный и модель показывает себя хорошо. Видно, что можно еще ее улучшать и добиваться более высоких показателей.

In [124]:
spark.stop()

**ОБЩИЙ ВЫВОД**

Этот проект был посвящен тому, чтобы обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году и использовать библиотеку MLlib в Spark.   
После обучения обе модели показали достаточно удовлетворительный результат, хотя есть что улучшать и к чему стремиться. Мы видим, что качество предсказаний модели, обученной на всех признаках, немного выше, чем модель, обученная только на числовых признаках, хотя разница совершенно незначительная. Благодаря тому, что результаты двух моделей близки, можно сделать вывод, что модели стабильны, нормального качества и не переобучились.  

All Features Metrics:
RMSE: 69434.153485,
MAE: 50178.684349,
R2: 0.636301,


Numweric Features Metrics:
RMSE: 69927.877248,
MAE: 51014.808131,
R2: 0.631110.