In [1]:
### Импорт библиотек
import pandas as pd 
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

pyspark_version = pyspark.__version__
if int(pyspark_version[:1]) == 3:
    from pyspark.ml.feature import OneHotEncoder    
elif int(pyspark_version[:1]) == 2:
    from pyspark.ml.feature import OneHotEncodeEstimator

In [2]:
### Запуск спарк приложения  
RANDOM_SEED = 42
spark = SparkSession.builder \
                    .master('local') \
                    .appName('Titanic - Logistic regression') \
                    .getOrCreate()

## Подготовка данных

### Вывод общей информации

In [3]:
### Открытие файла с таблицей
df_housing = spark.read.load('/datasets/housing.csv', format='csv', sep=',', inferSchema=True, header='true')
df_housing.printSchema() 
df_housing.show(3)

                                                                                

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR B

In [4]:
### Вывод базовых статистик
df_housing.describe().toPandas()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


### Замена пропусков

In [5]:
### Замена пропусков медианой
df_housing = df_housing.withColumn('total_bedrooms_drop', F.expr('approx_percentile(total_bedrooms, 0.5) over()')) \
                        .withColumn('total_bedrooms', F.coalesce(F.col('total_bedrooms'), F.col('total_bedrooms_drop'))) \
                        .drop(F.col('total_bedrooms_drop'))
df_housing.describe().toPandas()

22/12/14 10:57:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,536.8388565891473,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,419.3918779216887,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


### Деление на выборки

In [6]:
### Деление на выборки
train_data, test_data = df_housing.randomSplit([.8,.2], seed=RANDOM_SEED)
display(train_data.count(), test_data.count()) 

22/12/14 10:57:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


16560

4080

In [7]:
### Деление на категориальные и числовые колонки
categorical_cols = ['ocean_proximity']
numerical_cols  = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income']
target = 'median_house_value'

### Преобразование колонки с категориальными значениями

In [8]:
### Преобразование категориальной колонки с помощью StringIndexer для train_data
indexer = StringIndexer(inputCols=categorical_cols, outputCols=[c+'_idx' for c in categorical_cols]).fit(train_data)
train_data_idx = indexer.transform(train_data)
cols_train = [c for c in train_data_idx.columns for i in categorical_cols if (c.startswith(i))]
train_data_idx.select(cols_train).show(3)

22/12/14 10:57:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 3 rows



In [9]:
### Преобразование категориальной колонки с помощью StringIndexer для test_data
test_data_idx = indexer.transform(test_data)
cols_test = [c for c in test_data_idx.columns for i in categorical_cols if (c.startswith(i))]
test_data_idx.select(cols_test).show(3)

22/12/14 10:57:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 3 rows



In [10]:
### Преобразование с помощью техники OneHotEncoder для train_data_idx
encoder = OneHotEncoder(inputCols=[c+'_idx' for c in categorical_cols], outputCols=[c+'_ohe' for c in categorical_cols]).fit(train_data_idx)
train_data_ohe = encoder.transform(train_data_idx)
cols_train_ohe = [c for c in train_data_ohe.columns for i in categorical_cols if (c.startswith(i))]
train_data_ohe.select(cols_train_ohe).show(3)

22/12/14 10:57:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows



In [11]:
### Преобразование с помощью техники OneHotEncoder для test_data_idx
test_data_ohe = encoder.transform(test_data_idx)
cols_test_ohe = [c for c in test_data_ohe.columns for i in categorical_cols if (c.startswith(i))]
test_data_ohe.select(cols_test_ohe).show(3)

22/12/14 10:57:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------------+-------------------+-------------------+
|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|
+---------------+-------------------+-------------------+
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|
+---------------+-------------------+-------------------+
only showing top 3 rows



In [12]:
### Объединение в единый вектор для train_data_ohe
categorical_assembler = VectorAssembler(inputCols=[c+'_ohe' for c in categorical_cols], outputCol='categorical_features')
train_data_ohe = categorical_assembler.transform(train_data_ohe) 
train_data_ohe.show(3)

22/12/14 10:57:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|categorical_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|       (4,[2],[1.0])|
|   -124.3|    41.8|              19.0|     2672.0|         552.0|    1298.0|     478.0|       1.9797|           85800.0|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|       (4,[2],[1.0])|
|  -1

In [13]:
### Объединение в единый вектор для test_data_ohe
test_data_ohe = categorical_assembler.transform(test_data_ohe) 
test_data_ohe.show(3)

22/12/14 10:57:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|ocean_proximity_idx|ocean_proximity_ohe|categorical_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+-------------------+--------------------+
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|       (4,[2],[1.0])|
|  -124.23|   40.54|              52.0|     2694.0|         453.0|    1152.0|     435.0|       3.0806|          106700.0|     NEAR OCEAN|                2.0|      (4,[2],[1.0])|       (4,[2],[1.0])|
|  -1

### Преобразование колонок с числовыми значениями

In [14]:
### Масштабирование числовых колонок для train_data_ohe
numerical_assembler = VectorAssembler(inputCols=numerical_cols, outputCol='numerical_features')
train_num = numerical_assembler.transform(train_data_ohe) 
standardScaler = StandardScaler(inputCol='numerical_features', outputCol='numerical_features_scaled').fit(train_num)
train_num = standardScaler.transform(train_num) 
train_num.columns

22/12/14 10:57:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity',
 'ocean_proximity_idx',
 'ocean_proximity_ohe',
 'categorical_features',
 'numerical_features',
 'numerical_features_scaled']

In [15]:
### Масштабирование числовых колонок для test_data_ohe
test_num = numerical_assembler.transform(test_data_ohe)
test_num = standardScaler.transform(test_num) 
test_num.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity',
 'ocean_proximity_idx',
 'ocean_proximity_ohe',
 'categorical_features',
 'numerical_features',
 'numerical_features_scaled']

In [16]:
### Объединение числовых и категориальных признаков
all_features = ['categorical_features','numerical_features_scaled']
final_assembler = VectorAssembler(inputCols=all_features, outputCol='features') 
df_train = final_assembler.transform(train_num)
df_test = final_assembler.transform(test_num)
df_train.select(all_features).show(3) 
df_test.select(all_features).show(3) 

22/12/14 10:57:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [-61.931952286653...|
|       (4,[2],[1.0])|     [-61.907050013920...|
|       (4,[2],[1.0])|     [-61.892108650280...|
+--------------------+-------------------------+
only showing top 3 rows



22/12/14 10:57:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------------------+-------------------------+
|categorical_features|numerical_features_scaled|
+--------------------+-------------------------+
|       (4,[2],[1.0])|     [-61.907050013920...|
|       (4,[2],[1.0])|     [-61.872186832094...|
|       (4,[2],[1.0])|     [-61.872186832094...|
+--------------------+-------------------------+
only showing top 3 rows



In [17]:
### Выделение числовых признаков
num_features = ['numerical_features_scaled']
final_assembler_num = VectorAssembler(inputCols=num_features, outputCol='features_sec') 
df_train = final_assembler_num.transform(df_train)
df_test = final_assembler_num.transform(df_test)
df_train.select(num_features).show(3) 
df_test.select(num_features).show(3) 

22/12/14 10:57:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------------------+
|numerical_features_scaled|
+-------------------------+
|     [-61.931952286653...|
|     [-61.907050013920...|
|     [-61.892108650280...|
+-------------------------+
only showing top 3 rows



22/12/14 10:57:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------------------+
|numerical_features_scaled|
+-------------------------+
|     [-61.907050013920...|
|     [-61.872186832094...|
|     [-61.872186832094...|
+-------------------------+
only showing top 3 rows



## Обучение моделей

In [18]:
### Модель линейной регрессии на всем наборе данных
lr = LinearRegression(labelCol=target, featuresCol='features')
model = lr.fit(df_train) 
predictions = model.transform(df_test)
predictedLabes = predictions.select('median_house_value', 'prediction')
predictedLabes.show(10) 

22/12/14 10:57:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:28 WARN Instrumentation: [5f858273] regParam is zero, which might cause numerical instability and overfitting.
22/12/14 10:57:28 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/12/14 10:57:28 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/12/14 10:57:29 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/12/14 10:57:29 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
22/12/14 10:57:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partitio

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0| 150533.0466631665|
|          106700.0| 217624.6519294884|
|           73200.0|125169.93075912399|
|           90100.0|195019.31599252112|
|           67000.0|152291.13957492216|
|           86400.0| 186076.1616788553|
|           70500.0|163936.40808554692|
|           85100.0|180008.60114514176|
|           80500.0|181580.61779531697|
|           96000.0|170314.90779798012|
+------------------+------------------+
only showing top 10 rows



In [19]:
### Модель линейной регрессии на числовых данных (без категориальных)
lr_sec = LinearRegression(labelCol=target, featuresCol='features_sec')
model_sec = lr_sec.fit(df_train) 
predictions_sec = model_sec.transform(df_test)
predictedLabes_sec = predictions_sec.select('median_house_value', 'prediction')
predictedLabes_sec.show(10) 

22/12/14 10:57:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:31 WARN Instrumentation: [7726603f] regParam is zero, which might cause numerical instability and overfitting.
22/12/14 10:57:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          103600.0|100751.20280909305|
|          106700.0| 190787.2397370534|
|           73200.0| 74736.22098583262|
|           90100.0|162342.36984607764|
|           67000.0|119469.73784150137|
|           86400.0| 155921.9725331883|
|           70500.0|131193.75530479895|
|           85100.0|150446.14696751675|
|           80500.0|150167.76520445198|
|           96000.0|133759.40251472825|
+------------------+------------------+
only showing top 10 rows



In [20]:
### Расчет метрик качества моделей (rmse)
rmse_one = round(RegressionEvaluator(labelCol='median_house_value', metricName='rmse').evaluate(predictions), 3)
rmse_sec = round(RegressionEvaluator(labelCol='median_house_value', metricName='rmse').evaluate(predictions_sec), 3)
### Расчет метрик качества моделей (mae)
mae_one = round(RegressionEvaluator(labelCol='median_house_value', metricName='mae').evaluate(predictions), 3)
mae_sec = round(RegressionEvaluator(labelCol='median_house_value', metricName='mae').evaluate(predictions_sec), 3)
### Расчет метрик качества моделей (r2)
r2_one = round(RegressionEvaluator(labelCol='median_house_value', metricName='r2').evaluate(predictions), 3)
r2_sec = round(RegressionEvaluator(labelCol='median_house_value', metricName='r2').evaluate(predictions_sec), 3)

22/12/14 10:57:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/14 10:57:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


## Анализ результатов

In [21]:
### Объединение в одну таблицу
data_results = spark.createDataFrame([('model_cat_and_num', rmse_one, mae_one, r2_one), 
                                      ('model_num_only', rmse_sec, mae_sec, r2_sec)], ['model', 'RMSE', 'MAE', 'R2'])
data_results.printSchema()
data_results.show()
### Остановка сессии 
spark.stop()

root
 |-- model: string (nullable = true)
 |-- RMSE: double (nullable = true)
 |-- MAE: double (nullable = true)
 |-- R2: double (nullable = true)



                                                                                

+-----------------+---------+---------+-----+
|            model|     RMSE|      MAE|   R2|
+-----------------+---------+---------+-----+
|model_cat_and_num|70786.462|50863.551|0.638|
|   model_num_only|71791.284|51804.393|0.627|
+-----------------+---------+---------+-----+

