## 3. Model m1
- With sklearn, several pipelines are built which combine further preprocessing, feature selection, dimensionality reduction, and a modeling algorithm.
- Computations are run with dask, through the joblib backend.

### Setup

In [3]:
from datetime import datetime

from dask.distributed import Client
import joblib

import pandas as pd
import numpy as np

from sklearn.neighbors import LocalOutlierFactor

from sklearn.decomposition import PCA
from sklearn.preprocessing import PolynomialFeatures

from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Ridge
from sklearn.kernel_ridge import KernelRidge
from sklearn.linear_model import Lasso
from sklearn.tree import DecisionTreeRegressor


from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline

import matplotlib.pyplot as plt
import seaborn as sns
sns.set_theme()

In [4]:
class Feature_reader():
    """
    Each feature has a first name, optionally a second name, a statistic and a number.
    This class allows to group feautures according to these aspects or combinations of these aspects.
    Each method produces a list of feature names or a list of lists of feature names.
    """

    def __init__(self, csv):
        self.fts = pd.read_csv(csv, dtype={'n':"string"})
        self.fts = self.fts.fillna('')

    def format(self, select):
        return select.apply(lambda x: '_'.join(x).replace('__', '_'), axis=1).tolist()

    def all(self):
        select = self.fts.copy()
        return self.format(select)

    def first(self):
        select = self.fts.copy()
        select = select.loc[select['n']=='01']
        return self.format(select)

    def min(self):
        select = self.fts.copy()
        select = select.loc[select['stat']=='min']
        return self.format(select)
    
    def max(self):
        select = self.fts.copy()
        select = select.loc[select['stat']=='max']
        return self.format(select)

    def median(self):
        select = self.fts.copy()
        select = select.loc[select['stat']=='median']
        return self.format(select)

    def mean(self):
        select = self.fts.copy()
        select = select.loc[select['stat']=='mean']
        return self.format(select)
    
    def std(self):
        select = self.fts.copy()
        select = select.loc[select['stat']=='std']
        return self.format(select)

    def skew(self):
        select = self.fts.copy()
        select = select.loc[select['stat']=='skew']
        return self.format(select)

    def kurtosis(self):
        select = self.fts.copy()
        select = select.loc[select['stat']=='kurtosis']
        return self.format(select)

    def per_nns(self):
        """
        List of lists per name1, name2, stat, per name1, name2.
        """
        select = self.fts.copy()
        select = [[self.format(grp2) for idx2, grp2 in grp.groupby(by=['stat'], sort=False)] for idx, grp in select.groupby(by=['name1', 'name2'])]
        return select
    
    def per_sn(self):
        """
        List of lists per stat, n. 
        """
        select = self.fts.copy()
        select = [[self.format(grp2) for idx2, grp2 in grp.groupby(by=['n'], sort=False)] for idx, grp in select.groupby(by=['stat'], sort=False)]
        return select

fts = Feature_reader('features.csv')

In [5]:
y = pd.read_csv(f'featsets/m1/y.csv', header=None)
X = pd.read_csv(f'featsets/m1/X.csv', delimiter=',', header=None)

In [6]:
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:57468,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:57479,Total threads: 2
Dashboard: http://127.0.0.1:57482/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:57471,
Local directory: /Users/juliet/Documents/ugent/2021-2022/big_data/practicals/bds/regression/dask-worker-space/worker-nzh9t5xd,Local directory: /Users/juliet/Documents/ugent/2021-2022/big_data/practicals/bds/regression/dask-worker-space/worker-nzh9t5xd

0,1
Comm: tcp://127.0.0.1:57485,Total threads: 2
Dashboard: http://127.0.0.1:57486/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:57473,
Local directory: /Users/juliet/Documents/ugent/2021-2022/big_data/practicals/bds/regression/dask-worker-space/worker-dt8gpeys,Local directory: /Users/juliet/Documents/ugent/2021-2022/big_data/practicals/bds/regression/dask-worker-space/worker-dt8gpeys

0,1
Comm: tcp://127.0.0.1:57487,Total threads: 2
Dashboard: http://127.0.0.1:57489/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:57474,
Local directory: /Users/juliet/Documents/ugent/2021-2022/big_data/practicals/bds/regression/dask-worker-space/worker-rqo71jn1,Local directory: /Users/juliet/Documents/ugent/2021-2022/big_data/practicals/bds/regression/dask-worker-space/worker-rqo71jn1

0,1
Comm: tcp://127.0.0.1:57480,Total threads: 2
Dashboard: http://127.0.0.1:57481/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:57472,
Local directory: /Users/juliet/Documents/ugent/2021-2022/big_data/practicals/bds/regression/dask-worker-space/worker-7ga_pxez,Local directory: /Users/juliet/Documents/ugent/2021-2022/big_data/practicals/bds/regression/dask-worker-space/worker-7ga_pxez


### Linear regression

In [9]:
measures = ['r2', 'neg_mean_squared_error', 'neg_root_mean_squared_error', 'neg_mean_absolute_error']

In [10]:
def frmt(df):
    
    df = df[['params', 'mean_test_r2', 'mean_test_neg_mean_squared_error', 'mean_test_neg_root_mean_squared_error','mean_test_neg_mean_absolute_error']]
    df = df.rename(columns={
        'mean_test_r2': 'r2',
        'mean_test_neg_mean_squared_error': 'mse',
        'mean_test_neg_root_mean_squared_error': 'rmse',
        'mean_test_neg_mean_absolute_error': 'mae'
    })

    df['r2'] = df['r2'].round(3)
    df['mse'] = df['mse'].abs().round(3)
    df['rmse'] = df['rmse'].abs().round(3)
    df['mae'] = df['mae'].abs().round(3)

    df = df.set_index('params')

    return df

#### Linear regression with all fts

- Initial results, with unscaled target (not yet a box cox applied), were:

|param|r2|mse|rmse|mae|
|-|-|-|-|-|
|intercept=true|0.017|6.272885e+07|7905.048|2520.407|
|intercept=false|0.017|6.273949e+07|7905.764|2520.480|

- Features were not scaled yet.

params	r2	mse	rmse	mae			
{'reg__fit_intercept': False}	0.085	1.533	1.238	0.962
{'reg__fit_intercept': True}	0.083	1.537	1.239	0.962

Pretty bad.

In [12]:
with joblib.parallel_backend('dask'):
    pipeline = Pipeline([
        ('reg', LinearRegression()),
    ])
    # print(pipeline.get_params().keys())

    parameters = {
        'reg__fit_intercept': [False, True]
    }

    grid_search = GridSearchCV(pipeline, parameters, cv=5, n_jobs=-1, scoring=measures, refit=False)
    grid_search.fit(X, y)

    results = pd.DataFrame(grid_search.cv_results_)

frmt(results)

Unnamed: 0_level_0,r2,mse,rmse,mae
params,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
{'reg__fit_intercept': False},-39.771,68.257,8.065,6.398
{'reg__fit_intercept': True},0.083,1.537,1.239,0.962


In [7]:
from sklearn.svm import SVR

In [13]:
# https://ai.stackexchange.com/questions/7202/why-does-training-an-svm-take-so-long-how-can-i-speed-it-up
# https://datascience.stackexchange.com/questions/989/svm-using-scikit-learn-runs-endlessly-and-never-completes-execution
# https://www.quora.com/Why-is-SVM-so-slow

with joblib.parallel_backend('dask'):
    pipeline = Pipeline([
        ('svr', SVR()),
    ])
    # print(pipeline.get_params().keys())

    parameters = {
    }

    grid_search = GridSearchCV(pipeline, parameters, cv=2, n_jobs=-1, scoring=measures, refit=False)
    grid_search.fit(X, y)

    results = pd.DataFrame(grid_search.cv_results_)

frmt(results)

  y = column_or_1d(y, warn=True)
  y = column_or_1d(y, warn=True)


KeyboardInterrupt: 

In [14]:
X.shape

(85258, 518)

#### Linear regression with pca reduction

In [13]:
with joblib.parallel_backend('dask'):
    pipeline = Pipeline([
        ('pca', PCA()),
        ('reg', LinearRegression()),
    ])
    # print(pipeline.get_params().keys())

    parameters = {
        'pca__n_components': [5, 10, 20],
    }

    grid_search = GridSearchCV(pipeline, parameters, cv=5, n_jobs=-1, scoring=measures, refit=False)
    grid_search.fit(X, y)

    results = pd.DataFrame(grid_search.cv_results_)

frmt(results)

Unnamed: 0_level_0,r2,mse,rmse,mae
params,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
{'pca__n_components': 5},0.034,1.619,1.272,1.01
{'pca__n_components': 10},0.042,1.606,1.267,1.006
{'pca__n_components': 20},0.062,1.572,1.254,0.994


#### Linear regression with pca reduction and polynomial fts

- Computing polynomial keeps running into compute errors. 
- First try was `parameters = {'pca__n_components': [5, 10, 20], 'poly__degree': [2, 5, 10]}`, which ran into trouble.
- Considering the shape of the data as visualized in the PCA in the exploration phase, for a pca to two dimensions, a polynomial of two degrees would do.

In [14]:
with joblib.parallel_backend('dask'):
    
    pipeline = Pipeline([
        ('pca', PCA()),
        ('poly', PolynomialFeatures()),
        ('reg', LinearRegression()),
    ])

    parameters = {
        'pca__n_components': [5, 10, 20],
        'poly__degree': [2]
    }

    grid_search = GridSearchCV(pipeline, parameters, cv=5, n_jobs=-1, scoring=measures, refit=False)
    grid_search.fit(X, y)

    results = pd.DataFrame(grid_search.cv_results_)

frmt(results)

Unnamed: 0_level_0,r2,mse,rmse,mae
params,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
"{'pca__n_components': 5, 'poly__degree': 2}",0.044,1.601,1.265,1.004
"{'pca__n_components': 10, 'poly__degree': 2}",0.038,1.613,1.27,0.997
"{'pca__n_components': 20, 'poly__degree': 2}",-0.018,1.709,1.302,0.982


### Ridge regression
- https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.Ridge.html#sklearn.linear_model.Ridge


In [20]:
with joblib.parallel_backend('dask'):
    pipeline = Pipeline([
        ('ridge', Ridge())
    ])
    # print(pipeline.get_params().keys())
    
    parameters = {
        'ridge__alpha': [0, 1, 2],
    }

    grid_search = GridSearchCV(pipeline, parameters, cv=5, n_jobs=-1, scoring=measures, refit=False)
    grid_search.fit(X, y)

    results = pd.DataFrame(grid_search.cv_results_)

frmt(results)

dict_keys(['memory', 'steps', 'verbose', 'ridge', 'ridge__alpha', 'ridge__copy_X', 'ridge__fit_intercept', 'ridge__max_iter', 'ridge__normalize', 'ridge__positive', 'ridge__random_state', 'ridge__solver', 'ridge__tol'])


Unnamed: 0_level_0,r2,mse,rmse,mae
params,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
{'ridge__alpha': 0},0.083,1.537,1.239,0.962
{'ridge__alpha': 1},0.083,1.536,1.239,0.962
{'ridge__alpha': 2},0.083,1.536,1.239,0.962


### Kernel ridge regression

In [27]:
with joblib.parallel_backend('dask'):
    pipeline = Pipeline([
        ('kridge', KernelRidge())
    ])
    print(pipeline.get_params().keys())
    
    parameters = {
        # 'ridge__alpha': [0, 1, 2],
    }

    grid_search = GridSearchCV(pipeline, parameters, cv=5, n_jobs=-1, scoring=measures, refit=False)
    grid_search.fit(X, y)

    results = pd.DataFrame(grid_search.cv_results_)

frmt(results)

dict_keys(['memory', 'steps', 'verbose', 'kridge', 'kridge__alpha', 'kridge__coef0', 'kridge__degree', 'kridge__gamma', 'kridge__kernel', 'kridge__kernel_params'])


### Lasso

- https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.Lasso.html#sklearn.linear_model.Lasso
- https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LassoLarsCV.html#sklearn.linear_model.LassoLarsCV

In [22]:
with joblib.parallel_backend('dask'):
    pipeline = Pipeline([
        ('lasso', Lasso())
    ])
    print(pipeline.get_params().keys())
    
    parameters = {
        # 'lasso__alpha': [0, 1, 2],
    }

    grid_search = GridSearchCV(pipeline, parameters, cv=5, n_jobs=-1, scoring=measures, refit=False)
    grid_search.fit(X, y)

    results = pd.DataFrame(grid_search.cv_results_)

frmt(results)

dict_keys(['memory', 'steps', 'verbose', 'lasso', 'lasso__alpha', 'lasso__copy_X', 'lasso__fit_intercept', 'lasso__max_iter', 'lasso__normalize', 'lasso__positive', 'lasso__precompute', 'lasso__random_state', 'lasso__selection', 'lasso__tol', 'lasso__warm_start'])


Unnamed: 0_level_0,r2,mse,rmse,mae
params,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
{},-0.0,1.676,1.295,1.031


### Tree regression

- https://scikit-learn.org/stable/modules/generated/sklearn.tree.DecisionTreeRegressor.html
- https://scikit-learn.org/stable/modules/generated/sklearn.tree.ExtraTreeRegressor.html#sklearn.tree.ExtraTreeRegressor
- https://george-jen.gitbook.io/data-science-and-apache-spark/decision-tree-regression

In [25]:
with joblib.parallel_backend('dask'):
    pipeline = Pipeline([
        ('tree', DecisionTreeRegressor())
    ])
    print(pipeline.get_params().keys())
    
    parameters = {
        # 'lasso__alpha': [0, 1, 2],
    }

    grid_search = GridSearchCV(pipeline, parameters, cv=5, n_jobs=-1, scoring=measures, refit=False)
    grid_search.fit(X, y)

    results = pd.DataFrame(grid_search.cv_results_)

frmt(results)

dict_keys(['memory', 'steps', 'verbose', 'tree', 'tree__ccp_alpha', 'tree__criterion', 'tree__max_depth', 'tree__max_features', 'tree__max_leaf_nodes', 'tree__min_impurity_decrease', 'tree__min_samples_leaf', 'tree__min_samples_split', 'tree__min_weight_fraction_leaf', 'tree__random_state', 'tree__splitter'])


Unnamed: 0_level_0,r2,mse,rmse,mae
params,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
{},-0.758,2.946,1.716,1.35


### Close client

In [16]:
client.close()