![arknights](images/California_Housing.jpg)

# Median house value prediction – pySpark pipline

---

**Цель**

Обучить модель линейной регрессии для прогнозирования медианной стоимости жилья в определенном районе.  
Метрики для оценки модели: R2, RMSE, MAE.  

---

**Входные данные**: данные о жилье в Калифорнии в 1990 году.

---

**Задачи:**  

- для решения использовать pySpark, MLlib;
- разделить данные на обучающую и отложенную выборки: train и test;
- для числовых признаков применить:
    - масштабирование;
    - полиномиальное приеобразование;
- для категориальных признаков применить:
    - one hote encoding;
    - кластеризацию на основе числовых признаков;
- при подборе гиперпараметров применить метод случайного поиска по сетке;
- подбор гиперпараметров сделать с помощью кросс-валидации;
- объединить все шаги в pipeline;
- создать 2 модели:
    - используя все доступные признаки;
    - используя только числовые признаки.
- оценить метрики созданных моделей:
    - по результатам кросс-валидации;
    - на отложенной выборке.

**Some explanations**

---

Permanent data tables named like: **data**.  

Temporary data tables named like: **df**.  

---

Intermediate conclusions are highlighted as follows:

> Intermediate conclusion.

---

The code of the cells are as independent as possible from each other in order to freely manipulate the cells.

---

## Intro

| Признак              | Описание                                              |
|:---------------------|:------------------------------------------------------|
| `longitude`          | широта                                                |
| `latitude`           | долгота                                               |
| `housing_median_age` | медианный возраст жителей жилого массива              |
| `total_rooms`        | общее количество комнат в домах жилого массива        |
| `total_bedrooms`     | общее количество спален в домах жилого массива        |
| `population`         | количество человек, которые проживают в жилом массиве |
| `households`         | количество домовладений в жилом массиве               |
| `median_income`      | медианный доход жителей жилого массива                |
| `median_house_value` | медианная стоимость дома в жилом массиве              |
| `ocean_proximity`    | близость к океану                                     |

---

## Initial

In [1]:
# # если возникают проблемы с environment

# import os

# os.environ['JAVA_HOME'] = "C:\\Program Files\\Java\\jdk-19\\"
# os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] pyspark-shell"

### Imports

In [2]:
import pandas as pd
import numpy as np

import os
import warnings

import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.colors import LinearSegmentedColormap
import plotly.graph_objects as go
import plotly.io as pio

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window

from pyspark.ml.feature import QuantileDiscretizer, StringIndexer, OneHotEncoder, StandardScaler, RobustScaler
from pyspark.ml.feature import PolynomialExpansion, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

### Constants

In [3]:
PATH_LOCAL = 'datasets/'                      # local path to data
PATH_REMOTE = '/datasets/'                    # remote path to data

CR = '\n'                                     # new line
RANDOM_STATE = RANDOM_SEED = RS = 66          # random_state
TEST_FRAC = 0.1                               # доля отложенной выборки

N_TRIALS = 10                                 # количество попыток при подборе гиперпараметров
N_CV = 4                                      # количество фолдов при кросс-валидации
MAX_ITER = 1000                               # max количество итераций для LinearRegression
DEGREE_POLYNOMIAL = 5                         # степень для полиномиального преобразования

### Functions

In [4]:
def custom_read_spark(spark_session, file_name, file_format='csv', separator=',', header='true'):
    """
    чтение датасета в spark-сессии:
      сначала из локального хранилища;
      при неудаче — из удаленного хранилища
    """

    path_local = f'{PATH_LOCAL}{file_name}'
    path_remote = f'{PATH_REMOTE}{file_name}'
    
    if os.path.exists(path_local):
        return spark_session.read.load(
                                       path_local,
                                       inferSchema=True,
                                       format=file_format,
                                       sep=separator,
                                       header=header,
                                      )

    elif os.path.exists(path_remote):
        return spark_session.read.load(
                                       path_remote,
                                       inferSchema=True,
                                       format=file_format,
                                       sep=separator,
                                       header=header,
                                      )

    else:
        print(f'File "{file_name}" not found at the specified path.')

In [5]:
def var_name(var):
    """
    var name determination
    """
    return [name for name in globals() if globals()[name] is var][0]

In [6]:
# найдено в Инете

class RandomGridBuilder: 
  '''
  Grid builder for random search. Sets up grids for use in CrossValidator in Spark
  using values randomly sampled from user-provided distributions.
  Distributions should be provided as lambda functions, so that the numbers are generated at call time.
  
  Parameters:
    num_models: Integer (Python) - number of models to generate hyperparameters for
    seed: Integer (Python) - seed (optional, default is None)
    
  Returns:
    param_map: list of parameter maps to use in cross validation.
    
  Example usage:
    from pyspark.ml.classification import LogisticRegression
    lr = LogisticRegression()
    paramGrid = RandomGridBuilder(2)\
               .addDistr(lr.regParam, lambda: np.random.rand()) \
               .addDistr(lr.maxIter, lambda : np.random.randint(10))\
               .build()
               
    Returns similar output as Spark ML class ParamGridBuilder and can be used in its place.
    The above paramGrid provides random hyperparameters for 2 models.
    '''
  
  def __init__(self, num_models, seed=None):
    self._param_grid = {}
    self.num_models = num_models
    self.seed = seed
    
  def addDistr(self, param, distr_generator):
    '''Add distribution based on dictionary generated by function passed to addDistr.'''
    
    if 'pyspark.ml.param.Param' in str(type(param)):
      self._param_grid[param] = distr_generator
    else:
      raise TypeError('param must be an instance of Param')

    return self
  
  def build(self):    
    param_map = []
    for n in range(self.num_models):
      if self.seed:
        # Set seeds for both numpy and random in case either is used for the random distribution
        np.random.seed(self.seed + n)
        random.seed(self.seed + n)
      param_dict = {}
      for param, distr in self._param_grid.items():
        param_dict[param] = distr()
      param_map.append(param_dict)
    
    return param_map

### Settings

In [7]:
# text styles
class f:
    BOLD = "\033[1m"
    ITALIC = "\033[3m"
    END = "\033[0m"

In [8]:
# defaults for charts

# Matplotlib, Seaborn
PLOT_DPI = 150  # dpi for charts rendering 
sns.set_style('whitegrid', {'axes.facecolor': '0.98', 'grid.color': '0.9', 'axes.edgecolor': '1.0'})
plt.rc(
       'axes',
       labelweight='bold',
       titlesize=16,
       titlepad=10,
      )

# Plotly Graph_Objects
pio.templates['my_theme'] = go.layout.Template(
                                               layout_autosize=True,
                                               # width=900,
                                               layout_height=200,
                                               layout_legend_orientation="h",
                                               layout_margin=dict(t=40, b=40),         # (l=0, r=0, b=0, t=0, pad=0)
                                               layout_template='seaborn',
                                              )
pio.templates.default = 'my_theme'

# colors, color schemes
CMAP_SYMMETRIC = LinearSegmentedColormap.from_list('', ['steelblue', 'aliceblue', 'steelblue'])

In [9]:
# Pandas defaults
pd.options.display.max_colwidth = 100
pd.options.display.max_rows = 500
pd.options.display.max_columns = 100
pd.options.display.float_format = '{:.3f}'.format
pd.options.display.colheader_justify = 'left'

In [10]:
# others
warnings.filterwarnings('ignore')

---

## Spark session open

In [11]:
spark = (
         SparkSession
         .builder
         .master('local')
         .appName('California Housing – Price Regression')
         .getOrCreate()
        )

spark

## Read and Check data

### Read data

In [12]:
data = custom_read_spark(spark, 'housing.csv', file_format='csv', separator=',', header='true')

                                                                                

### First look at data

In [14]:
data.printSchema()
# pd.DataFrame(data.dtypes, columns=['column', 'type'])      # либо так

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [15]:
data.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 3 rows



In [16]:
data.describe().toPandas().set_index('summary')

                                                                                

Unnamed: 0_level_0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
summary,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
count,20640.0,20640.0,20640.0,20640.0,20433.0,20640.0,20640.0,20640.0,20640.0,20640
mean,-119.56970445736148,35.6318614341087,28.639486434108527,2635.7630813953488,537.8705525375618,1425.4767441860463,499.5396802325581,3.8706710029070246,206855.81690891477,
stddev,2.003531723502584,2.135952397457101,12.58555761211163,2181.6152515827944,421.3850700740312,1132.46212176534,382.3297528316098,1.899821717945263,115395.6158744136,
min,-124.35,32.54,1.0,2.0,1.0,3.0,1.0,0.4999,14999.0,<1H OCEAN
max,-114.31,41.95,52.0,39320.0,6445.0,35682.0,6082.0,15.0001,500001.0,NEAR OCEAN


## Preprocessing

### Drop features

In [17]:
# data = data.drop('index')
# data.sample(0.001).show(1)

> Поле `index` содержит уникальные номера наблюдений, поэтому удалено.

### Missing values

In [18]:
def count_null(df):
    '''
    Функция для подсчета пропущенных значений в датасете.
    Выводит результат на экран.
    '''

    print(f'missing values:{CR}')

    for column in data.columns:
#         is_null = F.col(column).isNull() | F.col(column).isin([None, np.nan])
        is_null = F.isnull(F.col(column)) | F.isnan(F.col(column))
        print(f'{column:25} {data.filter(is_null).count():5d}')

In [19]:
count_null(data)

missing values:

longitude                     0
latitude                      0
housing_median_age            0
total_rooms                   0
total_bedrooms              207
population                    0
households                    0
median_income                 0
median_house_value            0
ocean_proximity               0


> Есть пропущенные значения.
> 
> Можно заменить их средними значениями с учетом контекста.  
> Предполагаемые зависимости:
> - `total_rooms`: больше комнат – больше спален среди них;
> - `population`: больше людей – больше спален;
> - `median_income`: выше доход – просторнее размещение.

> Для более взвешенного решения лучше изучить корреляции признаков.

Для того, чтобы использовать непрерывные значения для группировки, необходимо их дискретизировать.  
По 10 интервалов для каждой переменной должно хватить для достижения приемлемой точности, не создавая при этом слишком много групп.

In [22]:
df_stats_before = data.describe().toPandas().set_index('summary').dropna(axis=1).astype('float')     # before change 

                                                                                

In [23]:
data = (QuantileDiscretizer(
                            numBucketsArray=[10,10,10],
                            inputCols=['total_rooms','population','median_income'],
                            outputCols=['total_rooms_discrete','population_discrete','median_income_discrete'],
                            relativeError=0.01,
                            handleInvalid='error',
                           )
        .fit(data)
        .transform(data)
       )

In [24]:
data.select('total_rooms','total_rooms_discrete','population','population_discrete',
            'median_income','median_income_discrete').sample(0.001).show(3)

+-----------+--------------------+----------+-------------------+-------------+----------------------+
|total_rooms|total_rooms_discrete|population|population_discrete|median_income|median_income_discrete|
+-----------+--------------------+----------+-------------------+-------------+----------------------+
|     3761.0|                 8.0|    1776.0|                7.0|       4.5317|                   7.0|
|     1739.0|                 3.0|     820.0|                2.0|       4.0556|                   6.0|
|     2718.0|                 6.0|    3018.0|                9.0|       2.9076|                   3.0|
|     4624.0|                 9.0|    1572.0|                7.0|       6.5533|                   9.0|
|     1292.0|                 1.0|    2081.0|                8.0|       0.9563|                   0.0|
+-----------+--------------------+----------+-------------------+-------------+----------------------+
only showing top 5 rows



Используя оконную функцию, заполним пропуски в `total_bedrooms` средним (медианным) значением `total_bedrooms` для каждой группы.

**!!! функция percentile_approx(), используемая для вычисления медианы, добавлена в Spark, начиная с версии 3.1**

In [25]:
window = Window().partitionBy('total_rooms_discrete','population_discrete','median_income_discrete')

data = (data.withColumn('total_bedrooms',
                        F.when(
                               F.col('total_bedrooms').isNull(),
                               F.avg(F.col('total_bedrooms')).over(window)
#                                F.percentile_approx(F.col('total_bedrooms'), 0.5).over(window)
                              )
                         .otherwise(F.col('total_bedrooms'))
                       )
       )

Проверка результата.

In [26]:
count_null(data)

missing values:

longitude                     0
latitude                      0
housing_median_age            0
total_rooms                   0




total_bedrooms                1


                                                                                

population                    0
households                    0
median_income                 0
median_house_value            0
ocean_proximity               0
total_rooms_discrete          0
population_discrete           0
median_income_discrete        0


> Отсталось одно пропущенное значение. Вероятно, это наблюдение выбивается из общего ряда, и для него не нашлось соответствующей группы.  
> Оставшиеся пропущенные значения заменим, уменьшив количество полей для группировки (оконной функции).

In [27]:
window = Window().partitionBy('population_discrete')

data = (data.withColumn('total_bedrooms',
                        F.when(
                               F.col('total_bedrooms').isNull(),
                               F.avg(F.col('total_bedrooms')).over(window)
#                                F.percentile_approx(F.col('total_bedrooms'), 0.5).over(window)
                              )
                         .otherwise(F.col('total_bedrooms'))
                       )
       )

Проверка результата.

In [28]:
count_null(data)

missing values:

longitude                     0
latitude                      0
housing_median_age            0
total_rooms                   0


                                                                                

total_bedrooms                0
population                    0
households                    0
median_income                 0
median_house_value            0
ocean_proximity               0
total_rooms_discrete          0
population_discrete           0
median_income_discrete        0


Удаление временных полей.

In [29]:
data = data.drop('total_rooms_discrete','population_discrete','median_income_discrete')

Сравнение статистик до и после заполнения пропущенных значений.

In [30]:
df_stats_after = data.describe().toPandas().set_index('summary').dropna(axis=1).astype('float')      # after change

                                                                                

In [31]:
# разница статистик до и после заполнения пропущенных значений

df_chng = (df_stats_after - df_stats_before) / df_stats_before

df_chng.style.format('{:.3%}', na_rep='-').applymap(lambda value: 'color:red;' if abs(value) > 0.0001 else None)

Unnamed: 0_level_0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
summary,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
count,0.000%,0.000%,0.000%,0.000%,1.013%,0.000%,0.000%,0.000%,0.000%
mean,0.000%,-0.000%,0.000%,0.000%,0.025%,0.000%,0.000%,-0.000%,0.000%
stddev,0.000%,0.000%,0.000%,-0.000%,-0.072%,0.000%,0.000%,0.000%,0.000%
min,-0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%
max,-0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%,0.000%


> Пропущенные значения заполнены с учетом контекста.
> 
> Процесс получился длинным, поэтому для будущих применений можно оформить его в виде функции.

### New calculated features

Несколько расчетных признаков, которые, возможно, улучшат качество модели.

In [32]:
# rooms / people
data = data.withColumn('rooms_div_population', F.col('total_rooms') / F.col('population'))

# bedrooms / people
# data = data.withColumn('bedrooms_div_population', F.col('total_bedrooms') / F.col('population'))

# rooms / bedrooms
# data = data.withColumn('rooms_div_bedrooms', F.col('total_rooms') / F.col('total_bedrooms'))

# rooms / households
# data = data.withColumn('rooms_div_households', F.col('total_rooms') / F.col('households'))

# bedrooms / households
# data = data.withColumn('bedrooms_div_households', F.col('total_bedrooms') / F.col('households'))

# people / households
# data = data.withColumn('population_div_households', F.col('population') / F.col('households'))

# total income
# data = data.withColumn('total_income', F.col('population') * F.col('median_income'))

> Все перечисленные расчетные признаки проверены экспериментально.  
> Небольшой прирост метрики дал только `rooms_div_population` – количество комнат на одного человека.

### Target and Feature lists (categorical and numerical)

In [33]:
target = 'median_house_value'

In [36]:
# creation of nunerical and categorical lists

num_feature_list = []
cat_feature_list = []

for feature in data.dtypes:
    if feature[1] in ['double','float','int']:
        num_feature_list.append(feature[0])
    elif feature[1] in ['string']:
        cat_feature_list.append(feature[0])

num_feature_list = list(set(num_feature_list) - set([target]))     # исключение целевого признака из списков
cat_feature_list = list(set(cat_feature_list) - set([target]))     # универсальный вариант – почти готовая функция

In [37]:
num_feature_list

['housing_median_age',
 'latitude',
 'total_rooms',
 'households',
 'total_bedrooms',
 'rooms_div_population',
 'population',
 'median_income',
 'longitude']

In [38]:
cat_feature_list

['ocean_proximity']

### Split data for train and test

In [39]:
data_train, data_test = data.randomSplit([1-TEST_FRAC, TEST_FRAC], seed=RS)

print(f'train samples: {data_train.count():6d}')
print(f'test samples:  {data_test.count():6d}')

                                                                                

train samples:  18557




test samples:    2083


                                                                                

## Stages for pipeline

### Numerical features

#### Initial num vector

In [40]:
def crteate_initial_num_vector():
    
    return VectorAssembler(
                           inputCols=num_feature_list,
                           outputCol='num_features'
                          )

#### Scaler

In [41]:
def create_scaler():
    
    return RobustScaler(
                        inputCol='num_features',
                        outputCol='num_features_scaled'
                       )

#### PolynomialExpansion

In [42]:
def create_polynomial(degree=2):
    
    return PolynomialExpansion(
                               degree=degree,
                               inputCol='num_features_scaled',
                               outputCol='num_features_poly'
                              )

### Categoriacal features

#### One hot encoding

In [43]:
def create_string_indexer():
    
    return StringIndexer(
                         inputCols=cat_feature_list,
                         outputCols=[f'{feature}_indexer' for feature in cat_feature_list]
                        )

In [44]:
def create_ohe():
    
    return OneHotEncoder(
                         inputCols=[f'{feature}_indexer' for feature in cat_feature_list],
                         outputCols=[f'{feature}_ohe' for feature in cat_feature_list]
                        )

#### K-Means clustering

In [45]:
def create_KMeans_prepare(feature_list, outputCol_name):
    
    return VectorAssembler(
                           inputCols=feature_list,
                           outputCol=outputCol_name
                          )

In [46]:
def create_KMeans_clusters(featuresCol_name, predictionCol_name, k_clusters=5):
    
    return KMeans(
                  k=k_clusters,
#                   maxIter=100,
                  featuresCol=featuresCol_name,
                  predictionCol=predictionCol_name,
                  seed=RS,
                 )

#### VectorAssembler for categoriacal features

In [47]:
def create_cat_features_encoded_vector():
    
    return VectorAssembler(
                           inputCols=[f'{feature}_ohe' for feature in cat_feature_list] + ['cluster_group'],
                           outputCol='cat_features_encoded'
                          )

### Final VectorAssembler

In [48]:
def create_final_feature_vector(switch='all'):
    
    if switch == 'all':
        return VectorAssembler(inputCols=['cat_features_encoded','num_features_poly'],
                               outputCol='features')
    
    elif switch == 'num_only':
        return VectorAssembler(inputCols=['num_features_poly'],
                               outputCol='features')

### Crossvalidation

In [49]:
def create_estimator(estimator_name, features_col_name):
    
    if estimator_name == 'LinearRegression':
        
        return LinearRegression(
                                labelCol=target,
                                featuresCol=features_col_name,
                                standardization=False,
                                maxIter=MAX_ITER,
                               )

In [50]:
# def create_params_grid(estimator):
    
#     return (ParamGridBuilder()
#             .addGrid(estimator.regParam, [0.0, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0])
#             .addGrid(estimator.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
#             .build()
#            )

In [51]:
def create_params_grid(estimator):
    
    return (RandomGridBuilder(N_TRIALS)
            .addDistr(estimator.regParam, lambda: np.random.rand())
            .addDistr(estimator.elasticNetParam, lambda: np.random.rand())
            .build()
           )

In [52]:
def create_crossvalidator(estimator, params_grid):

    return CrossValidator(
                          estimator=estimator,
                          estimatorParamMaps=params_grid,
                          evaluator=RegressionEvaluator(labelCol=target, predictionCol='prediction',  metricName='r2'),
                          numFolds=N_CV,
                          parallelism=16,
                          seed=RS,
                         )

## Modeling

### Pipeline

In [53]:
def create_pipeline(estimator_name, features_col_name, switch='all'):

    # numerical features preprocessing
    
    initial_num_vector = crteate_initial_num_vector()
    
    scaler = create_scaler()
    
    polynomial = create_polynomial(degree=DEGREE_POLYNOMIAL)
    

    # categorical features preprocessing

    string_indexer = create_string_indexer()
    
    ohe = create_ohe()
    
    kmeans_prepare = create_KMeans_prepare(['latitude','longitude', 'median_income'], 'KMeans_group')
    kmeans_clusters = create_KMeans_clusters('KMeans_group', 'cluster_group', k_clusters=6)

    cat_features_encoded_vector = create_cat_features_encoded_vector()
    
    
    # preprocessing finalization
    
    final_feature_vector = create_final_feature_vector(switch=switch)


    # hyperparameters searching with cross-validation

    estimator = create_estimator(estimator_name, features_col_name)
    params_grid = create_params_grid(estimator)
    crossvalidator = create_crossvalidator(estimator, params_grid)
    
    return Pipeline(stages=[
                            initial_num_vector,            # объединение числовых признаков в вектор
                            scaler,                        # масштабирование числовых признаков
                            polynomial,                    # полиномиальное преобразование вектора числовых признаков
                            string_indexer,                # преобразование категориальных признаков в числовые (а-ля OrdinalEncoder)
                            ohe,                           # OHE для категориальных признаков, преобразованых ранее в числовые
                            kmeans_prepare,                # подготовка вектора признаков для кластеризации посредством K-Means
                            kmeans_clusters,               # кластеризация посредством K-Means
                            cat_features_encoded_vector,   # объединение категориальных признаков в вектор
                            final_feature_vector,          # объединение числового и категориального векторов в общий вектор признаков 
                            crossvalidator,                # кроссвалидация
                           ]
                   )

### Fit pipelines

Поиск оптимальных гиперпараметров.

In [54]:
%%time

# для всех признаков
pipeline_num_cat = create_pipeline('LinearRegression', 'features', switch='all').fit(data_train)

23/03/16 14:40:17 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/03/16 14:40:17 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/03/16 14:41:34 WARN MemoryStore: Not enough space to cache rdd_529_187 in memory! (computed 47.1 MiB so far)
23/03/16 14:41:34 WARN BlockManager: Persisting block rdd_529_187 to disk instead.
23/03/16 14:41:34 WARN MemoryStore: Not enough space to cache rdd_529_187 in memory! (computed 47.1 MiB so far)
23/03/16 14:42:19 WARN MemoryStore: Not enough space to cache rdd_529_187 in memory! (computed 47.1 MiB so far)
23/03/16 14:42:58 WARN MemoryStore: Not enough space to cache rdd_529_187 in memory! (computed 47.1 MiB so far)
23/03/16 14:43:37 WARN MemoryStore: Not enough space to cache rdd_529_187 in memory! (computed 47.1 MiB so far)
23/03/16 14:44:17 WARN MemoryStore: Not enough space to cache rdd_529_187 in memory! (computed 47.1 MiB so far)
23/03/16 14:44:56 WARN MemoryStore

CPU times: user 3.59 s, sys: 1.48 s, total: 5.07 s
Wall time: 34min 7s


                                                                                

In [55]:
%%time

# только для числовых исходных признаков
pipeline_num_only = create_pipeline('LinearRegression', 'features', switch='num_only').fit(data_train)

23/03/16 15:15:15 WARN MemoryStore: Not enough space to cache rdd_2756_187 in memory! (computed 47.1 MiB so far)
23/03/16 15:15:15 WARN BlockManager: Persisting block rdd_2756_187 to disk instead.
23/03/16 15:15:15 WARN MemoryStore: Not enough space to cache rdd_2756_187 in memory! (computed 47.1 MiB so far)
23/03/16 15:15:56 WARN MemoryStore: Not enough space to cache rdd_2756_187 in memory! (computed 47.1 MiB so far)
23/03/16 15:16:36 WARN MemoryStore: Not enough space to cache rdd_2756_187 in memory! (computed 47.1 MiB so far)
23/03/16 15:17:14 WARN MemoryStore: Not enough space to cache rdd_2756_187 in memory! (computed 47.1 MiB so far)
23/03/16 15:17:52 WARN MemoryStore: Not enough space to cache rdd_2756_187 in memory! (computed 47.1 MiB so far)
23/03/16 15:18:32 WARN MemoryStore: Not enough space to cache rdd_2756_187 in memory! (computed 47.1 MiB so far)
23/03/16 15:19:11 WARN MemoryStore: Not enough space to cache rdd_2756_187 in memory! (computed 47.1 MiB so far)
23/03/16 15:

CPU times: user 3.72 s, sys: 1.51 s, total: 5.23 s
Wall time: 34min 28s


                                                                                

### Best models

Фиксация модели с лучшими гиперпараматрами.

In [56]:
model_num_cat = pipeline_num_cat.stages[-1].bestModel
model_num_only = pipeline_num_only.stages[-1].bestModel

## Results

### Prediction

In [57]:
def prediction_results(pipeline):
    '''
    Выводит несколько строк для сравнения правильного значения с прогнозом
    '''
    print(f'{CR}Predictions for LinearRegression with {f.BOLD}{(var_name(pipeline))}{f.END}{CR}')
    pipeline.transform(data_test).select('median_house_value', 'prediction').sample(0.01).show(5)    

In [58]:
prediction_results(pipeline_num_cat)
prediction_results(pipeline_num_only)


Predictions for LinearRegression with [1mpipeline_num_cat[0m



                                                                                

+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          322900.0|234548.19174208213|
|          177800.0| 222221.7127292715|
|          229500.0|238866.84257583087|
|           48800.0| 89042.94886647398|
|           71400.0| 120957.3273287355|
+------------------+------------------+
only showing top 5 rows


Predictions for LinearRegression with [1mpipeline_num_only[0m





+------------------+------------------+
|median_house_value|        prediction|
+------------------+------------------+
|          166800.0| 244661.6385811898|
|          328000.0|  315869.907160006|
|          239600.0|262549.26060199086|
|          195400.0| 182979.9533693944|
|           71500.0|137075.68152344413|
+------------------+------------------+
only showing top 5 rows



                                                                                

### Model summary

In [59]:
def model_summary(model):
    '''
    Выводит ряд метрик и параметров/гиперпараметров модели, определенных после кросс-валидации.
    '''

    print(f'{CR}Summary for LinearRegression with {f.BOLD}{var_name(model)}{f.END}{CR}')
    
    # metrics
    print(f'R2   = {model.summary.r2:0.3f}')
    print(f'RMSE = {model.summary.rootMeanSquaredError:0.0f}')
    print(f'MAE  = {model.summary.meanAbsoluteError:0.0f}')
    
    # params
#     print(f'{CR}intercept: {model.intercept:0.0f}')
    # print(f'coeffs: {np.round(model.coefficients, 0)}{CR}')
    print(f'ElasticNetParam: {model.getElasticNetParam()}')
    print(f'RegParam: {model.getRegParam()}{CR}')

In [60]:
model_summary(model_num_cat)
model_summary(model_num_only)


Summary for LinearRegression with [1mmodel_num_cat[0m

R2   = 0.739
RMSE = 58886
MAE  = 41969

intercept: -2875445
ElasticNetParam: 0.8317666450465439
RegParam: 0.8955744454811672


Summary for LinearRegression with [1mmodel_num_only[0m

R2   = 0.732
RMSE = 59703
MAE  = 42825

intercept: -2112386
ElasticNetParam: 0.2973922567847659
RegParam: 0.6324419065503356



> Модель без использования категориального признака из входных данных оказалась немного хуже.  
> Вероятно, почти вся полезная информация содержится в других признаках. Либо деление на категории слишком грубое. Но тем не менее, какую-то часть полезной информации он все-таки содержит.

<div class="alert alert-block alert-success">
<b>Комментарий от ревьюера v1</b>
    
✔️ С выводом согласен.
</div> 


### Metrics with test data

In [61]:
def model_metric(pipeline, metric_dict={'r2':3}):
    '''
    Выводит ряд метрик для тестовой (отложенной) выборки.
    '''
    predicted = pipeline.transform(data_test)
    
    print(f'{CR}{f.BOLD}{var_name(pipeline)}{f.END} metrics:')
    
    for metric_name, metric_precission in metric_dict.items():
        metric = RegressionEvaluator(labelCol=target, predictionCol='prediction', metricName=metric_name).evaluate(predicted)
        print(f'{metric_name.upper():4} = {metric :{metric_precission}}')

In [62]:
model_metric(pipeline_num_cat, {'r2':'0.3f','rmse':'0.0f','mae':'0.0f'})
model_metric(pipeline_num_only, {'r2':'0.3f','rmse':'0.0f','mae':'0.0f'})

                                                                                


[1mpipeline_num_cat[0m metrics:


                                                                                

R2   = 0.736


                                                                                

RMSE = 59750


                                                                                

MAE  = 42369


                                                                                


[1mpipeline_num_only[0m metrics:


                                                                                

R2   = 0.730


                                                                                

RMSE = 60509




MAE  = 43166


                                                                                

> На тестовой выборке значения метрик примерно такие же.

> Для повышения качества модели вместо категориального признака `ocean proximity` стоит попробовать числовой признак. Для его генерации можно использовать имеющиеся долготу-широту и сторонние гео-данные линии побережья.

## Conclusion

### Краткий обзор проведенной работы

1. Выполнена проверка входных данных и незначительная их правка.
1. Создан (и далее проверен) ряд расчетных признаков.
1. Из исходного набора данных выделена тестовая (отложенная) выборка для последующего тестирования моделей.
1. Создан пайплайн.  
1. Обучены две модели на разных наборах признаков.
1. Оценены метрики и параметры созданных моделей.

### Основная часть

#### Проверка данных

Удалены лишние признаки (уникальный id).

В данных обнаружено незначительное количество пропущенных значений (1% от всех наблюдений).  
Наблюдения с пропусками удалены.

Сделана попытка заполнения пропусков средними значениями с учетом контекста. Заполнение выполнено успешно, но после этой операции наблюдается критическое падение скорости обучения моделей – примерно в 20-40 раз. Причину такого поведения выяснить не удалось, поиск причины продолжается.

#### Подготовка данных для моделей

Создан и проверен ряд признаков на основе арифметических операций. Небольшой прирост метрики дал признак `количество комнат на одного человека`.

Выделена целевая переменная и признаки.

Из исходного набора данных выделена тестовая (отложенная) выборка для последующего тестирования моделей.

#### Пайплайн

Разработанный пайплайн включает:

a) стандартизацию числовых признаков;  
b) полиномиальное преобразование числовых признаков;  
c) One-Hote-Encoding категориальных признаков;  
d) создание нового признака посредством кластеризации;  
e) кросс-валидацию для подбора гиперпараметров.

В пайплайн не включены некоторые шаги по предобработке. Их можно добавить после решения описанной выше проблемы с падением скорости обучения.

#### Результат

Обучены две модели:  
a) на всем входном наборе признаков;  
b) без использования категориальных входных признаков.

Оценены метрики и параметры созданных моделей:  
a) по результатам кроссвалидации;  
b) на тестовой (отложенной) выборке.

Модель на полном наборе признаков показала результат выше, чем модель без использования входных категориальных признаков. Но отличие невелико.

Обе модели ведут себя устойчиво: значения метрик для тестовой выборки отличаются незначительно от результатов на кроссвалидации.

### Рекомендации и риски

Необходимо найти причину описанной выше проблемы с падением скорости обучения. 

Сделать полноценный исследовательский анализ. Основная цель — выбор наиболее подходящих признаков для обучения.  
В качестве примера можно отметить, что во входных данных есть признаки клиппинга: целевой признак обрезан на уровне 500К.

По результатам исследовательского анализа провести feature engineering.

По мере решения описанных задач включать решение в разработанный пайплайн.