In [1]:
import warnings
import pandas as pd
import numpy as np

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.feature import OneHotEncoder

from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator


In [2]:
# pd.set_option('display.float_format', '{:,.2f}'.format)
warnings.filterwarnings('ignore')

# Цель

Предсказание стоимости жилья. В проекте вам нужно обучить модель линейной регрессии на данных о жилье в Калифорнии в 1990 году. На основе данных нужно предсказать медианную стоимость дома в жилом массиве. Обучите модель и сделайте предсказания на тестовой выборке. Для оценки качества модели используйте метрики RMSE, MAE и R2.

# Загрузка и подготовка данных

In [3]:
# Инициализируйте локальную Spark-сессию.
RANDOM_SEED = 2022

spark = (SparkSession.builder
         .master("local")
         .appName("EDA California Housing")
         .getOrCreate())

## Загрузка данных

In [4]:
df = spark.read.option('header', 'true').csv('/datasets/housing.csv', inferSchema=True)
df.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



                                                                                

В колонках датасета содержатся следующие данные:

- longitude: широта;
- latitude: долгота;
- housing_median_age: медианный возраст жителей жилого массива;
- total_rooms: общее количество комнат в домах жилого массива;
- total_bedrooms: общее количество спален в домах жилого массива;
- population: количество человек, которые проживают в жилом массиве;
- households: количество домовладений в жилом массиве;
- median_income: медианный доход жителей жилого массива;
- median_house_value: медианная стоимость дома в жилом массиве;
- ocean_proximity: близость к океану.

В большинстве колонок хранятся количественные данные, кроме одной — ocean_proximity. Она хранит категориальные значения.

In [5]:
df.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [6]:
df.describe().toPandas().head()

                                                                                

Unnamed: 0,summary,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
1,mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
2,stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
3,min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
4,max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


 В датасете примерно 21 тысяча строк. Максимальная медианная стоимость дома равна полумиллиону долларов.

In [7]:
df.filter(F.col('median_house_value') > 500000).count()

965

## Предобработка

### Наличие пропусков

In [8]:
for column in df.columns:
    print(column, df.filter(df[column].isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 207
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


Заполним пустые значения

In [9]:
df = df.na.fill(0, ['total_bedrooms'])
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [10]:
for column in df.columns:
    print(column, df.filter(df[column].isNull()).count())

longitude 0
latitude 0
housing_median_age 0
total_rooms 0
total_bedrooms 0
population 0
households 0
median_income 0
median_house_value 0
ocean_proximity 0


### Преобразование колонок

#### Разделение на выборки

In [11]:
train_data, test_data = df.randomSplit([.8, .2], seed=RANDOM_SEED)
print(f'train_data: {train_data.count()}, test_data: {test_data.count()}')

train_data: 16418, test_data: 4222


#### Трансформация категорийных признаков

In [12]:
cat_columns = ['ocean_proximity']
cat_columns

['ocean_proximity']

В первую очередь трансформируем категориальные признаки с помощью трансформера StringIndexer

In [13]:
indexer = StringIndexer(inputCols=cat_columns, outputCols=[c + '_idx' for c in cat_columns], handleInvalid='keep')
indexer_model = indexer.fit(train_data)
train_data = indexer_model.transform(train_data)
test_data = indexer_model.transform(test_data)

                                                                                

In [14]:
cols = [i for i in train_data.columns for j in cat_columns if (i.startswith(j))]
train_data.select(cols).show(3)

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 3 rows



Дополнительно создаем OHE-кодирование для категорий.

In [15]:
encoder = OneHotEncoder(inputCols=[c + '_idx' for c in cat_columns], outputCols=[c + '_ohe' for c in cat_columns])
encoder_model = encoder.fit(train_data)
train_data = encoder_model.transform(train_data)
test_data = encoder_model.transform(test_data)

In [16]:
train_data.select(cols).show(3)

+---------------+-------------------+
|ocean_proximity|ocean_proximity_idx|
+---------------+-------------------+
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
|     NEAR OCEAN|                2.0|
+---------------+-------------------+
only showing top 3 rows



Объединение признаков в один вектор

In [17]:
categorical_assembler = VectorAssembler(inputCols=[c + '_ohe' for c in cat_columns], outputCol="categorical_features")
train_data = categorical_assembler.transform(train_data)
test_data = categorical_assembler.transform(test_data)

In [18]:
train_data.select('categorical_features').show(3)

+--------------------+
|categorical_features|
+--------------------+
|       (4,[2],[1.0])|
|       (4,[2],[1.0])|
|       (4,[2],[1.0])|
+--------------------+
only showing top 3 rows



#### Трансформация числовых признаков

In [19]:
num_columns = ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population',
               'households', 'median_income']
num_columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [20]:
numerical_assembler = VectorAssembler(inputCols=num_columns, outputCol="numerical_features")
train_data = numerical_assembler.transform(train_data)
test_data = numerical_assembler.transform(test_data)

In [21]:
standardScaler = StandardScaler(inputCol='numerical_features', outputCol="numerical_features_scaled")
standardScaler_model = standardScaler.fit(train_data) 
train_data = standardScaler_model.transform(train_data)
test_data = standardScaler_model.transform(test_data)

                                                                                

In [22]:
train_data.select('numerical_features_scaled').show(3)

+-------------------------+
|numerical_features_scaled|
+-------------------------+
|     [-61.952887791441...|
|     [-61.927977100733...|
|     [-61.913030686308...|
+-------------------------+
only showing top 3 rows



Объеденим категорийные и числовые признаки в колонку features

In [23]:
features_assembler = VectorAssembler(inputCols=['categorical_features', 'numerical_features_scaled'], outputCol="features")
train_data = features_assembler.transform(train_data)
test_data = features_assembler.transform(test_data)

In [24]:
train_data.select(['categorical_features', 'numerical_features_scaled', 'features']).show(3)

+--------------------+-------------------------+--------------------+
|categorical_features|numerical_features_scaled|            features|
+--------------------+-------------------------+--------------------+
|       (4,[2],[1.0])|     [-61.952887791441...|[0.0,0.0,1.0,0.0,...|
|       (4,[2],[1.0])|     [-61.927977100733...|[0.0,0.0,1.0,0.0,...|
|       (4,[2],[1.0])|     [-61.913030686308...|[0.0,0.0,1.0,0.0,...|
+--------------------+-------------------------+--------------------+
only showing top 3 rows



## Обучение моделей

In [25]:
evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction")

'''Функция создание модели. Вернет rmse, mae, r2'''
def model_fit(featuresCol):
    lr = LinearRegression(labelCol='median_house_value', featuresCol=featuresCol)
    model = lr.fit(train_data)
    predictions = model.transform(test_data)
    # predictions.select("median_house_value", "prediction").show()
    '''Оценка качества модели'''
    rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
    mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
    r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
    return rmse, mae, r2

### Обучение LogisticRegression на всех данных

In [26]:
rmse_features, mae_features, r2_features = model_fit('features')
print(f"RMSE: {rmse_features:.2f}")
print(f"MAE: {mae_features:.2f}")
print(f"R2: {r2_features:.2f}")

24/04/23 15:50:23 WARN Instrumentation: [298daea8] regParam is zero, which might cause numerical instability and overfitting.
24/04/23 15:50:24 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/04/23 15:50:24 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/04/23 15:50:25 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/04/23 15:50:25 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
24/04/23 15:50:25 WARN Instrumentation: [298daea8] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

RMSE: 68547.35
MAE: 49977.32
R2: 0.65


### Обучение LogisticRegression на числовых

In [27]:
rmse_num_features, mae_num_features, r2_num_features = model_fit('numerical_features')
print(f"RMSE: {rmse_num_features:.2f}")
print(f"MAE: {mae_num_features:.2f}")
print(f"R2: {r2_num_features:.2f}")

24/04/23 15:50:29 WARN Instrumentation: [7fdd35d8] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

RMSE: 69278.39
MAE: 51010.98
R2: 0.65


In [28]:
spark.stop()

### Сравнение результатов

Из полученных результатов можно сделать следующие выводы:

1. Значения метрик RMSE и MAE в первом и втором случае довольно близки, что говорит о том, что модель, построенная на всех данных, показывает практически такие же результаты, как и модель, построенная только на числовых переменных.


2. Значение коэффициента детерминации R2 в обоих случаях составляет 0.65, что говорит о том, что модели объясняют 65% дисперсии зависимой переменной. Это является приемлемым результатом, но не очень высоким.


3. Небольшое увеличение значений RMSE и MAE во втором случае (при использовании только числовых переменных) может говорить о том, что добавление категориальных переменных в модель улучшает ее качество. Однако, разница в значениях метрик не очень велика, что свидетельствует о том, что категориальные переменные не оказывают очень сильного влияния на качество модели.

Таким образом, можно сделать вывод, что обе модели показывают примерно одинаковые результаты, и использование всех доступных переменных (как числовых, так и категориальных) позволяет построить немного более качественную модель, чем использование только числовых переменных

<font color='green'>Немного полезного материала:
+  https://sparkbyexamples.com/pyspark-tutorial/, https://sparkbyexamples.com/
+  https://github.com/dvgodoy/handyspark
+  https://www.tutorialspoint.com/pyspark/index.htm
+  https://www.guru99.com/pyspark-tutorial.html
+  https://databricks.com/spark/getting-started-with-apache-spark/machine-learning#load-sample-data
</font>