here we run some baselines to understand the level of performance to beat
we use nested CV and grid search to get realistic performance estimates

In [1]:
from netCDF4 import Dataset
from collections import defaultdict, namedtuple
import datetime
import json
from json import JSONEncoder
import numpy as np
import itertools
import pandas as pd
import base64
import pickle
import hashlib
import sympy as sp
import matplotlib.pyplot as plt
import math
import time
import os
import pickle
import csv
from sklearn.neighbors import KNeighborsRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import GridSearchCV, KFold, RandomizedSearchCV
from sklearn.linear_model import Ridge
from sklearn.utils import shuffle
from sklearn import metrics
from scipy.optimize import fmin_cg, fmin_ncg
from scipy import stats
from IPython.display import clear_output

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
10991,application_1526283611315_0080,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
def read_df():
    dframe_path = 'hdfs:///Projects/more_stuff/cabauw/processed-full-log.csv.gz'

    from hops import hdfs
    fs = hdfs.get_fs()

    with fs.open_file(dframe_path) as f:
        df = pd.read_csv(f, na_values='--', compression='gzip')

    df = df[(df.ustar > 0.1) & (abs(df.H) > 10) & (df.wind > 1)]
    df = df[df.ds != 201603]
    
    return df

In [3]:
def make_index(dtimes, interval):
    # returns a tuple index_above, index_below
    # index_above[i] is the largest i
    # such that dtimes[index_above[i]] - dtimes[i] < interval
    # index_below[i] is the smallest i
    # such that dtimes[i] - dtimes[index_below[i]] < interval
    # dtimes must be already sorted!
    index_below, index_above = np.zeros(
        (2, len(dtimes)), dtype=np.int
    ) - 1

    for i, x in enumerate(dtimes):
        j = index_below[i - 1] if i > 0 else 0
        while x - dtimes[j] > interval:
            j += 1

        index_below[i] = j
        index_above[j] = i

    last_above = index_above[0]
    for i in range(len(dtimes)):
        if index_above[i] < 0:
            index_above[i] = last_above
        else:
            last_above = index_above[i]
    
    return index_above, index_below


def compute_trend(df, columns, interval=3600):
    df = df.sort_values('datetime')
    for z in df.z.unique():  
        this_level = df[df.z == z]
        index_above, index_below = make_index(this_level.datetime.values, interval)

        for col in columns:
            val_above = this_level[col].values
            val_below = this_level.iloc[index_below][col].values

            time_above = this_level.datetime.values
            time_below = this_level.iloc[index_below].datetime.values

            trend = 3600 * (val_above - val_below) / (time_above - time_below)

            df.loc[df.z == z, col + '_trend'] = trend

    return df, [col + '_trend' for col in columns]


def get_features(df, use_trend, feature_level):
    # get feature names of the corresponding level
    # adding them to df if not already there

    feature_sets = [
        [
            'z', 'wind', 'temp', 'soil_temp',
            'wind_10', 'wind_20', 'wind_40',
            'temp_10', 'temp_20', 'temp_40',
        ],
        ['soilheat'],
        ['netrad'],
        ['rain', 'dewpoint'],
        ['H', 'LE'],
    ]

    if isinstance(feature_level, int):
        features = [
            f for fset in feature_sets[:feature_level]
            for f in fset
        ]
    elif isinstance(feature_level, (list, tuple)):
        features = feature_level
    else:
        raise ValueError('pass list or int')

    
    if ('wind_40' not in df.columns or
            'wind_20' not in df.columns or
            'wind_10' not in df.columns):

        wind_temp_levels = df.pivot_table(
            values=['wind', 'temp'], columns='z', index=['ds', 'tt']
        ).reset_index()
        wind_temp_levels.columns = [
            '%s_%d' % (a, b) if b else a
            for a, b in wind_temp_levels.columns.values
        ]
        df = df.merge(wind_temp_levels, on=['ds', 'tt'])

    if use_trend:
        missing_trend = [
            f for f in features
            if f != 'z' and f + '_trend' not in df.columns
        ]

        if missing_trend:
            df, added_cols = compute_trend(df, missing_trend)
            features.extend(added_cols)

    # remove feature columns with only nulls and rows with any null
    empty_columns = df.isnull().all(axis=0)
    keep_columns = df.columns.isin(features) & ~empty_columns
    missing = df.loc[:, keep_columns].isnull().any(axis=1)
    df = df[~missing]
    features = keep_columns.index.values[keep_columns.values]

    return df, features


def get_train_test(df, features, target, train_idx, test_idx, normalize):
    train_x, train_y = df.iloc[train_idx][features], df.iloc[train_idx][target]
    test_x, test_y = df.iloc[test_idx][features], df.iloc[test_idx][target]

    if normalize:
        mean_x, std_x = train_x.mean(), train_x.std()
        train_x = (train_x - mean_x) / std_x
        test_x = (test_x - mean_x) / std_x

        mean_y, std_y = train_y.mean(), train_y.std()
        train_y = (train_y - mean_y) / std_y
        test_y = (test_y - mean_y) / std_y
    else:
        mean_y, std_y = 0, 1

    return train_x, train_y, test_x, test_y, mean_y, std_y

In [4]:
class MOSTEstimator:
    ''' estimator for the universal functions in the monin-obukhov similarity theory
        implementing scikit's interface
        
        fitting is done by minimizing the L2 regularized squared error
        via conjugate gradient
    '''
    def __init__(self, regu=0.1, use_hessian=True):
        self.regu = regu
        self.a, self.b, self.c, self.d = (1, 4.8, np.sqrt(19.3), -0.25)
        self.use_hessian = use_hessian
        self.symbols = None

    def _lazy_init_hessian(self):
        # we initialize these functions lazily so that we can pickle
        # this object and send it around before fitting
        if self.use_hessian and self.symbols is None:
            a, b, c, d, x = sp.symbols('a b c d x')
            self.symbols = a, b, c, d

            self._neg_H_fn = self._get_hessian_functions(
                a * sp.Pow(1 - x * c**2, d), x, a, b, c, d
            )
            self._pos_H_fn = self._get_hessian_functions(
                a + b * x, x, a, b, c, d
            )

    @staticmethod
    def _get_hessian_functions(expr, x, *symbols):
        # returns functions computing second-order partial derivatives
        # of expr. keyed by differentiation variables
        return {
            (s1, s2): sp.lambdify(
                [x] + list(symbols),
                sp.simplify(sp.diff(sp.diff(expr, s1), s2)),
                'numpy'
            ) for s1 in symbols for s2 in symbols
        }

    def get_params(self, deep=True):
        return {'regu': self.regu}

    def set_params(self, regu):
        self.regu = regu
        return self

    @classmethod
    def _compute_phi(cls, zL, a, b, c, d):
        zL = cls._to_vec(zL)
        mask = zL >= 0
        yy = np.zeros(zL.shape)
        yy[mask] = a + b * zL[mask]
        yy[~mask] = a * np.power(1 - c**2 * zL[~mask], d)
        assert all(np.isfinite(zL))
        assert all(np.isfinite(yy)), (a, b, c, d)
        return yy

    @classmethod
    def _compute_phi_prime(cls, zL, a, b, c, d):
        zL = cls._to_vec(zL)
        dpda, dpdb, dpdc, dpdd = np.zeros((4, len(zL)))

        pos, neg = zL >= 0, zL < 0

        dpda[pos] = 1
        dpdb[pos] = zL[pos]

        inner = 1 - c**2 * zL[neg]
        dpda[neg] = np.power(inner, d)
        dpdc[neg] = -2 * zL[neg] * a * c * d * np.power(inner, d - 1)
        dpdd[neg] = a * np.log(inner) * np.power(inner, d)

        return dpda, dpdb, dpdc, dpdd

    def _fmin_hess(self, params, xx, yy, regu):
        self._lazy_init_hessian()

        preds = self._compute_phi(xx, *params)
        xpos_mask = xx >= 0
        hh1 = np.zeros((4, 4))

        for i, s1 in enumerate(self.symbols):
            for j, s2 in enumerate(self.symbols):
                # when xx >= 0 the function is linear
                # its hessian is always 0

                neg = self._neg_H_fn[s1, s2](xx[~xpos_mask], *params)
                hh1[i, j] = np.sum((preds[~xpos_mask] - yy[~xpos_mask]) * neg)

        hh2 = np.zeros((4, len(xx)))
        hh2[:, :] = self._compute_phi_prime(xx, *params)
        
        hess = 2 * ((hh2.dot(hh2.T) + hh1) / len(xx) + regu * np.eye(4))
        return hess

    @staticmethod
    def _fmin_target(params, xx, yy, regu):
        preds = MOSTEstimator._compute_phi(xx, *params)
        err = np.mean((yy - preds)**2) + regu * sum(p**2 for p in params)
        return err

    @staticmethod
    def _fmin_grad(params, xx, yy, regu):
        preds = MOSTEstimator._compute_phi(xx, *params)
        der = MOSTEstimator._compute_phi_prime(xx, *params)

        grads = [
            2 * np.mean((preds - yy) * parpr) + 2 * regu * par
            for par, parpr in zip(params, der)
        ]

        return np.array(grads)

    @staticmethod
    def _to_vec(mat):
        mat = np.array(mat)
        
        # check that multi-dimensional arrays have only one
        # dimension with more than one sample
        # e.g. 1x1x99x1 is fine, 1x2x99x is not
        assert sum(1 for n in mat.shape if n > 1) == 1
        return mat.reshape(-1)
    
    def fit(self, X, y, disp=False):
        X = self._to_vec(X)
        y = self._to_vec(y)

        if self.use_hessian:
            self.a, self.b, self.c, self.d = fmin_ncg(
                self._fmin_target,
                (self.a, self.b, self.c, self.d),
                self._fmin_grad,
                fhess=self._fmin_hess,
                args=(X, y, self.regu),
                disp=disp,
            )
        else:
            self.a, self.b, self.c, self.d = fmin_cg(
                self._fmin_target,
                (self.a, self.b, self.c, self.d),
                self._fmin_grad,
                args=(X, y, self.regu),
                disp=disp,
            )
        
        return self

    def predict(self, X):
        return self._compute_phi(X, self.a, self.b, self.c, self.d)

    def score(self, X, y):
        preds = self.predict(X)
        return metrics.mean_squared_error(y, preds)

In [5]:
class AttributeKFold:
    ''' k-fold cross validator splitting on a particular attribute
        so that all samples with a given value are either in the train or test set

        attribute value for each sample is given in the constructor, so that
        the attribute itself need not be in the features for the model
    '''
    def __init__(self, cv, attr):
        self.cv, self.attr = cv, attr

    def get_n_splits(self, *args, **kwargs):
        return self.cv.get_n_splits(*args, **kwargs)

    def split(self, X, y=None, groups=None):
        vals = self.attr.unique()
        for train_idx, test_idx in self.cv.split(vals):
            train_mask = self.attr.isin(vals[train_idx])
            test_mask = self.attr.isin(vals[test_idx])

            yield (
                np.argwhere(train_mask).reshape(-1),
                np.argwhere(test_mask).reshape(-1),
            )

In [6]:
def test_attributekfold():
    outer_cv = AttributeKFold(KFold(10, shuffle=True), df.ds)
    outer_train, outer_test = np.zeros((2, len(df)))
    for outer_train_idx, outer_test_idx in outer_cv.split(df):

        outer_train[outer_train_idx] += 1
        outer_test[outer_test_idx] += 1

        inner_train, inner_test = np.zeros((2, len(outer_train_idx)))
        inner_cv = AttributeKFold(KFold(5, shuffle=True), df.iloc[outer_train_idx].ds)
        for inner_train_idx, inner_test_idx in inner_cv.split(df.iloc[outer_train_idx]):
            inner_train[inner_train_idx] += 1
            inner_test[inner_test_idx] += 1

        assert all(inner_train == 4)
        assert all(inner_test == 1)

    assert all(outer_train == 9)
    assert all(outer_test == 1)

In [7]:
def plot_preds(ypred, ytrue):
    minn = max(min(ypred), min(ytrue))
    maxx = min(max(ypred), max(ytrue))
    
    plt.scatter(ytrue, ypred, s=2)
    plt.plot([minn, maxx], [minn, maxx], 'r--')
    plt.xlabel('True')
    plt.ylabel('Predicted')
    
    plt.show()

In [8]:
class LogUniform:
    ''' random variable X such that log(x) is distributed uniformly
    '''
    def __init__(self, base, expmin, expmax):
        self.base, self.expmin, self.expmax = base, expmin, expmax

    def rvs(self, size=None, random_state=None):
        random_state = random_state or np.random.RandomState()
        exp = random_state.uniform(self.expmin, self.expmax, size=size)
        return np.power(self.base, exp)


class IntDistribution:
    ''' random variable taking only integer values
    '''
    def __init__(self, rv):
        self.rv = rv

    def rvs(self, *args, **kwargs):
        sample = self.rv.rvs(*args, **kwargs)
        return int(sample)

In [9]:
CVSpec = namedtuple('CVSpec', [
    'model', 'param_distribution', 'features',
    'target', 'inner_cv', 'outer_cv', 'n_iter', 'normalize',
    'inner_seed', 'outer_seed', 'param_seed', 'meta',
    'save_to',
])

CVResult = namedtuple('CVResult', [
    'meta', 'scores', 'test_x', 'test_y', 'y_pred', 'imps'
])


# set default value to None, https://stackoverflow.com/a/18348004/521776
CVSpec.__new__.__defaults__ = (None,) * len(CVSpec._fields)
CVResult.__new__.__defaults__ = (None,) * len(CVResult._fields)

Scores = namedtuple('Scores', [
    'train_mse',
    'explained_variance_score',
    'mean_absolute_error',
    'mean_squared_error',
    'median_absolute_error',
    'r2_score',
    'mean_abs_percent_error',
    'median_abs_percent_error',
])

OuterCVResult = namedtuple('OuterCVResult', [
    'scores',
    'param_keys',
    'param_values',
    'test_x',
    'test_y',
    'y_pred',
])

InnerCVResult = namedtuple('InnerCVResult', [
    'inner_mse', 'final_results'
])


def get_cv_fold(fold, cv_k, seed, attr):
    assert fold < cv_k
    
    cv = AttributeKFold(
        KFold(cv_k, shuffle=True, random_state=seed),
        attr
    ).split(attr)
    
    for _ in range(fold):
        _ = next(cv)
    return next(cv)


def get_train_test(df, features, target, train_idx, test_idx, normalize):
    train_x, train_y = df.iloc[train_idx][features], df.iloc[train_idx][target]
    test_x, test_y = df.iloc[test_idx][features], df.iloc[test_idx][target]

    if normalize:
        mean_x, std_x = train_x.mean(), train_x.std()
        train_x = (train_x - mean_x) / std_x
        test_x = (test_x - mean_x) / std_x

        mean_y, std_y = train_y.mean(), train_y.std()
        train_y = (train_y - mean_y) / std_y
        test_y = (test_y - mean_y) / std_y
    else:
        mean_y, std_y = 0, 1

    return train_x, train_y, test_x, test_y, mean_y, std_y


class EncodeMoreStuff(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return list(obj)
        return json.JSONEncoder.default(self, obj)


class CachedResults:
    def __init__(self, **kwargs):
        try:
            from hops import hdfs
        except ImportError:
            self.open_ = lambda f, m: open(f, m + 'b')
            self.cache_dir = './dev/checkpoints/'
        else:
            fs = hdfs.get_fs()
            self.open_ = lambda f, m: fs.open_file(f, m)
            
            # using HDFS as a filesystem cache
            # is a *terrible* idea in so many ways
            # but we have no alternative.
            self.cache_dir = 'hdfs:///Projects/more_stuff/cabauw/checkpoints/'

        self.fname = hashlib.sha256(
            json.dumps(kwargs, cls=EncodeMoreStuff).encode('utf8')
        ).hexdigest()
    
    def get_value(self):
        return
        try:
            with self.open_(self.cache_dir + self.fname, 'r') as f:
                return pickle.loads(f.read())
        except:  # want to invalidate cache regardless of what went wrong
            return None

    def set_value(self, val):
        return
        with self.open_(self.cache_dir + self.fname, 'w') as f:
            f.write(pickle.dumps(val))


def compute_metrics(train_y, y_pred_train, test_y, y_pred, mean_y, std_y):
    y_pred = y_pred * std_y + mean_y
    y_pred_train = y_pred_train * std_y + mean_y

    test_y = test_y * std_y + mean_y
    train_y = train_y * std_y + mean_y

    perc_errors = 100 * np.abs((test_y - y_pred) / test_y)

    return Scores(
        train_mse=metrics.mean_squared_error(train_y, y_pred_train),
        explained_variance_score=metrics.explained_variance_score(test_y, y_pred),
        mean_absolute_error=metrics.mean_absolute_error(test_y, y_pred),
        mean_squared_error=metrics.mean_squared_error(test_y, y_pred),
        median_absolute_error=metrics.median_absolute_error(test_y, y_pred),
        r2_score=metrics.r2_score(test_y, y_pred),
        mean_abs_percent_error=np.mean(perc_errors),
        median_abs_percent_error=np.median(perc_errors),
    )


def inner_train(df_bcast, model, features, target, params, outer_cv,
                outer_fold, inner_cv, inner_fold, keys, outer_seed,
                inner_seed, normalize):
    # pass outer_cv <= 0 to have plain CV instead of nested CV

    cache = CachedResults(
        model=model.__class__.__name__, features=features, target=target,
        params=params, outer_fold=outer_fold, inner_fold=inner_fold, keys=keys,
        outer_seed=outer_seed, inner_seed=inner_seed, normalize=normalize
    )
    saved = cache.get_value()
    if saved is not None:
        return saved

    df = df_bcast.value
    if any(f not in df.columns for f in features):
        print('some features are missing, reloading...')
        df, _ = get_features(read_df(), use_trend=True, feature_level=5)
    assert all(f in df.columns for f in features), (df.columns, features)

    if outer_cv > 0:
        outer_train_idx, outer_test_idx = get_cv_fold(
            outer_fold, outer_cv, outer_seed, df.ds
        )

        inner_train_idx, inner_test_idx = get_cv_fold(
            inner_fold, inner_cv, inner_seed, df.iloc[outer_train_idx].ds
        )

        train_idx = outer_train_idx[inner_train_idx]
        test_idx = outer_train_idx[inner_test_idx]
    else:
        train_idx, test_idx = get_cv_fold(
            inner_fold, inner_cv, inner_seed, df.ds
        )

    train_x, train_y, test_x, test_y, mean_y, std_y = get_train_test(
        df, features, target, train_idx, test_idx, normalize
    )

    model = model.set_params(**dict(zip(keys, params)))
    model.fit(train_x, train_y)
    y_pred_train = model.predict(train_x)
    y_pred = model.predict(test_x)
    
    test_mse = metrics.mean_squared_error(test_y, y_pred)
    
    if outer_cv > 0:
        final_results = None
    else:
        final_results = OuterCVResult(
            scores=compute_metrics(
                train_y, y_pred_train, test_y, y_pred, mean_y, std_y
            ),
            param_keys=keys,
            param_values=params,
            test_x=test_x if inner_fold == 0 else None,
            test_y=test_x if inner_fold == 0 else None,
            y_pred=test_x if inner_fold == 0 else None,
        )

    result = InnerCVResult(
        inner_mse=test_mse,
        final_results=final_results,
    )

    cache.set_value(result)
    return result


def outer_train(df_bcast, model, features, target, outer_cv, outer_fold,
                params, keys, outer_seed, normalize):
    cache = CachedResults(
        model=model.__class__.__name__, features=features,
        target=target, params=params, outer_fold=outer_fold,
        keys=keys, outer_seed=outer_seed, normalize=normalize
    )
    saved = cache.get_value()
    if saved is not None:
        return saved

    df = df_bcast.value
    if any(f not in df.columns for f in features):
        print('some features are missing, reloading...')
        df, _ = get_features(read_df(), use_trend=True, feature_level=5)
    assert all(f in df.columns for f in features), (df.columns, features)
    
    assert outer_cv > 0
    train_idx, test_idx = get_cv_fold(
        outer_fold, outer_cv, outer_seed, df.ds
    )

    train_x, train_y, test_x, test_y, mean_y, std_y = get_train_test(
        df, features, target, train_idx, test_idx, normalize
    )

    model = model.set_params(**dict(zip(keys, params)))
    model.fit(train_x, train_y)

    y_pred_train = model.predict(train_x)
    y_pred = model.predict(test_x)

    result = OuterCVResult(
        scores=compute_metrics(
            train_y, y_pred_train, test_y, y_pred, mean_y, std_y
        ),
        param_keys=keys,
        param_values=values,
        test_x=test_x if inner_fold == 0 else None,
        test_y=test_x if inner_fold == 0 else None,
        y_pred=test_x if inner_fold == 0 else None,
    )

    cache.set_value(result)
    return result


def finalize_result(meta, results, save_to=None):
    scores, params = [], []
    test_x = test_y = y_pred = None
    for res in list(results):
        scores.append(res.scores)
        params.append(dict(zip(res.param_keys, res.param_values)))
      
        #if res.test_x is not None:
        #    test_x = res.test_x
        #    
        #if res.test_y is not None:
        #    test_y = res.test_y
        #    
        #if res.y_pred is not None:
        #    y_pred = res.y_pred
    
    scores_df = pd.DataFrame(scores)
    params_df = pd.DataFrame(params)

    cvres = CVResult(
        meta=meta, scores=scores,
        test_x=test_x, test_y=test_y, y_pred=y_pred, imps=params_df
    )

    if save_to:
        try:
            from hops import hdfs
            fs = hdfs.get_fs()
            open_ = fs.open_file
            base_dir = 'hdfs:///Projects/more_stuff/cabauw/results/'
        except ImportError:
            open_ = open
            base_dir = './data/'

        pd.set_option('display.max_columns', None)
        pd.set_option('display.max_rows', None)
        pd.set_option('display.max_colwidth', -1)

        fname = '%s/results_%s.txt' % (base_dir, save_to)
        with open_(fname, 'w') as f:
            f.write(scores_df.describe().T.to_string())
            f.write('\n\n**raw scores\n\n')
            f.write(scores_df.to_string())
            f.write('\n\n**parameters\n\n')
            f.write(params_df.describe().T.to_string())
            f.write('\n\n**raw parameters\n\n')
            f.write(params_df.to_string())
            f.write('\n\n**raw json results\n\n')

            data = base64.b64encode(pickle.dumps(cvres)).decode('utf8')
            bufsize = 2**16
            for buf in range(0, len(data), bufsize):
                f.write(data[buf:buf+bufsize])

    return cvres


def make_nested_cv_tasks(df_bcast, spec, cv_vals):
    return (sc.parallelize(cv_vals, len(cv_vals))

         # train and evaluate on inner fold for each outer fold
         # x is (outer, inner, param values)
         # key by (outer fold, param values)
         .map(lambda x: ((x[0], x[2]), inner_train(
             df_bcast=df_bcast,
             model=spec.model,
             features=spec.features,
             target=spec.target,
             outer_cv=spec.outer_cv,
             outer_fold=x[0],
             inner_cv=spec.inner_cv,
             inner_fold=x[1],
             params=x[2],
             keys=tuple(spec.param_distribution.keys()),
             outer_seed=spec.outer_seed,
             inner_seed=spec.inner_seed,
             normalize=spec.normalize
         )))

         # for each outer fold/parameters, compute sum of mse
         .reduceByKey(lambda res1, res2: InnerCVResult(
                 inner_mse=res1.inner_mse + res2.inner_mse
             ), numPartitions=spec.outer_cv)

        # for each outer fold, find parameters with best mse
        .map(lambda x: (x[0][0], (x[0][1], x[1])))
        .reduceByKey(lambda x, y: x if x[1].inner_mse < y[1].inner_mse else y)

        # for each outer fold, validate using best parameters
        # x is (outer fold, (parameters, mse))
        .map(lambda x: outer_train(
            df_bcast=df_bcast,
             model=spec.model,
             features=spec.features,
             target=spec.target,
             outer_cv=spec.outer_cv,
             outer_fold=x[0],
             params=x[1][0],
             keys=tuple(spec.param_distribution.keys()),
             outer_seed=spec.outer_seed,
             normalize=spec.normalize
         ))

         # finalize results, optionally saving
         #
         # we could use a coalesce(1) -> mapPartitions, but this would
         # compute all the outer folds of all specs in the last stage
         # (the one that contains collect), which means that it will only
         # use len(cv_specs) executors to run sum(s.outer_cv for s in cv_specs)
         # tasks. needlessly to say, that sucks
         #
         # by adding a keyBy/groupByKey/map, we force the previos outer cv
         # to be in its own stage, thus there will be one partition for each
         # outer fold of each spec, and we can fully utilize the executors
         .keyBy(lambda x: 1)

         # create one partition for each CV result
         .groupByKey(spec.outer_cv)
         .map(lambda x: finalize_result(spec.meta, x[1], spec.save_to))

         # with this, we force finalize_result to be in its own stage
         # 
         # since the default scheduler is FIFO, this means we will finalize
         # the results as soon as the outer cv finishes
         # without this, we would need to wait for all the outer cvs to finish
         # and the results will all be finalized together at the end
         # that would also suck
         .coalesce(1)
         .repartition(2))


def make_cv_tasks(df_bcast, spec, cv_vals):

    def merge_cv_results(res1, res2):
        fin1, fin2 = res1.final_results, res2.final_results
        
        # make a list containing all the final results
        #
        # we cannot compare types directly because pyspark serialization
        # will use a different type (that has the same name and attributes)
        #
        # moreover, lists are serialized as tuples, thus we need to
        # convertthem  back to lists before joining
        list_fin_1 = [fin1] if hasattr(fin1, 'scores') else list(fin1)
        list_fin_2 = [fin2] if hasattr(fin2, 'scores') else list(fin2)
        all_fin = list_fin_1 + list_fin_2

        return InnerCVResult(
            inner_mse=res1.inner_mse + res2.inner_mse,
            final_results=all_fin,
        )

    return (sc.parallelize(cv_vals, len(cv_vals))
        # train and evaluate on each fold
        # x is (outer, inner, param values)
        # key by param values
        .map(lambda x: (x[2], inner_train(
            df_bcast=df_bcast,
            model=spec.model,
            features=spec.features,
            target=spec.target,
            outer_cv=spec.outer_cv,
            outer_fold=x[0],
            inner_cv=spec.inner_cv,
            inner_fold=x[1],
            params=x[2],
            keys=tuple(spec.param_distribution.keys()),
            outer_seed=spec.outer_seed,
            inner_seed=spec.inner_seed,
            normalize=spec.normalize
        )))

        # for each parameters, compute sum of scores
        # and preserve results of each fold
        .reduceByKey(lambda res1, res2: merge_cv_results(res1, res2))

        # find parameters with best mse
        .map(lambda x: (1, x[1]))
        .reduceByKey(lambda x, y: x if x.inner_mse < y.inner_mse else y)

        # at this point we only have the InnerCVResult of the best parameters
        # which contains the scores on each fold
        .flatMap(lambda x: x[1].final_results)

        # finalize results, optionally saving
        .keyBy(lambda x: 1)
        .groupByKey(spec.inner_cv)
        .map(lambda x: finalize_result(spec.meta, x[1], spec.save_to))
        .coalesce(1)
        .repartition(2))


def spark_cv(df, *cv_specs):
    ''' cross-validation performing random search

        optimized for running several nested cv in parallel, each with
        different models and/or grids
        
        use outer_cv > 0 for nested cross validation
    '''

    df_bcast = sc.broadcast(df)
    result_rdd = None
    for spec in cv_specs:
        # default values
        spec = spec._replace(
            inner_cv = spec.inner_cv or 10,
            outer_cv = 10 if spec.outer_cv is None else spec.outer_cv,
            n_iter = spec.n_iter or 10,
            normalize = spec.normalize or True,
            inner_seed = spec.inner_seed or np.random.randint(2**32, dtype=np.uint),
            outer_seed = spec.outer_seed or np.random.randint(2**32, dtype=np.uint),   
        )
        
        # build list of all grids to try and inner/outer combinations
        rnd = np.random.RandomState(spec.param_seed)
        cv_vals = []
        for outer_fold in range(max(spec.outer_cv, 1)):
            for rand in range(spec.n_iter):
                params = [(
                    distr.rvs(random_state=rnd)
                    if hasattr(distr, 'rvs')
                    else np.random.choice(distr)
                ) for par, distr in spec.param_distribution.items()]

                for inner_fold in range(spec.inner_cv):
                    cv_vals.append((
                        outer_fold,
                        inner_fold,
                        tuple(params),
                    ))

        if spec.outer_cv > 0:
            results = make_nested_cv_tasks(df_bcast, spec, cv_vals)
        else:
            results = make_cv_tasks(df_bcast, spec, cv_vals)

        # we build the result rdd gradually with unions so that we can proceed
        # to the outer cvs as soon as the spec's inner cv has finished
        # if we built the whole result rdd in one go we would need to wait on
        # all inner cvs from all specs to finish, before being able to start
        # the outer cvs
        if result_rdd is None:
            result_rdd = results
        else:
            result_rdd = result_rdd.union(results)

    cv_results = result_rdd.collect()
    return cv_results


def fit_most_estimator(df, most_only, spark=True):
    if most_only:
        df = df[(df.zL > -2) & (df.zL < 1)]

    dtime = datetime.datetime.utcnow().strftime('%Y%m%d-%H%M%S')
    fname = '%s_%smost-%s' % (
        'MOSTEstimator', '' if most_only else 'no', dtime
    )

    meta = {
        'model': 'MOSTEstimator',
        'most': most_only
    }

    if spark:
        spec = CVSpec(
            model=MOSTEstimator(),
            param_distribution={'regu': LogUniform(10, -6, 1)},
            features=['zL'],
            target='phi_m',
            inner_cv=10,
            outer_cv=10,
            n_iter=10,
            normalize=False,
            meta=meta,
            save_to=fname,
        )
    else:
        results, _, _, _ = nested_cv(
            df, MOSTEstimator, grid, ['zL'], 'phi_m',
            most_only, use_trend=trend,
        )
        spec = finalize_result(None, results.values, save_to=fname)

    if spark:
        return nested_cv_spark(df, spec)
    else:
        return spec

In [14]:
def do_test(df, most_only, outer_cv):
    ''' run each model on all features with and without trend
    '''
        
    if most_only:
        df = df[(df.zL > -2) & (df.zL < 1)]
        
    ridge_spec = Ridge, {
        'alpha': LogUniform(10, -6, 1)
    }
    
    knn_spec = KNeighborsRegressor, {
        'n_neighbors': IntDistribution(LogUniform(10, 0, 1.7)),  # 10**1.7 ~= 50
        'weights': ['uniform', 'distance'],
        'p': [1, 2],
    }
    
    gbr_spec = GradientBoostingRegressor, {
        'max_depth': IntDistribution(stats.uniform(loc=1, scale=9)),
        'subsample': stats.uniform(loc=0.25, scale=0.75),
        'max_features': stats.uniform(0.1, 0.9),
        'loss': ['lad', 'ls', 'huber'],
        'n_estimators': IntDistribution(LogUniform(10, 2, 3)),
        'learning_rate': LogUniform(10, -5, -1),
        'alpha': stats.uniform(0.01, 0.98),
    }
    
    repeats = {
        (True, 1): 1,
        (True, 3): 2,
        (False, 4): 3,
        (False, 5): 2,
    }
    
    all_specs = []
    for trend in [True, False]:
        for fset in range(1, 6):
            _, features = get_features(df, trend, fset)
            for model_cls, grid in [gbr_spec]: # [ridge_spec, knn_spec, gbr_spec]:
                for rep in range(repeats.get((trend, fset), 0)):
                    _, features = get_features(df, trend, fset)
                    dtime = datetime.datetime.utcnow().strftime('%Y%m%d-%H%M%S')
                    fname = '%s_f%d_%strend_%smost_%doutercv-%s' % (
                        model_cls.__name__, fset,
                        '' if trend else 'no',
                        '' if most_only else 'no',
                        outer_cv, dtime
                    )

                    all_specs.append(CVSpec(
                        model=model_cls(),
                        param_distribution=grid,
                        features=features,
                        target='phi_m',
                        inner_cv=10,
                        outer_cv=outer_cv,
                        n_iter=25,
                        normalize=True,
                        meta={
                            'trend': trend,
                            'fset': fset,
                            'model': model_cls,
                            'most': most_only
                        },
                        save_to=fname,
                    ))

    import random
    random.shuffle(all_specs)

    return spark_cv(df, *all_specs)

In [11]:
df = read_df()

In [12]:
ddf, _ = get_features(df, use_trend=True, feature_level=5)



In [None]:
_ = do_test(ddf, most_only=True, outer_cv=0)

In [13]:
most_most_res = fit_most_estimator(ddf, most_only=True, spark=True)

global name 'nested_cv_spark' is not defined
Traceback (most recent call last):
  File "<stdin>", line 542, in fit_most_estimator
NameError: global name 'nested_cv_spark' is not defined



In [14]:
most_nomost_res = fit_most_estimator(ddf, most_only=False, spark=True)

global name 'nested_cv_spark' is not defined
Traceback (most recent call last):
  File "<stdin>", line 542, in fit_most_estimator
NameError: global name 'nested_cv_spark' is not defined



In [16]:
_ = do_test(ddf, most_only=False, outer_cv=0)

KeyboardInterrupt: 

feature importances on F4 with trend and MOST only, check if this makes sense once you try F1

In [54]:
pd.DataFrame(
    aa, columns=features
).describe(
    percentiles=[]
).T.assign(
    imp=lambda df: df['mean'] / df['std']
).drop([
    'count', 'min', 'max', '50%'
], axis=1).sort_values('mean')

                     mean       std         imp
rain             0.012715  0.000319   39.906414
rain_trend       0.014674  0.000254   57.770853
temp_20_trend    0.015496  0.000298   51.973389
temp_10_trend    0.015859  0.000298   53.249730
temp_40_trend    0.016946  0.000542   31.290230
temp_20          0.017517  0.000331   52.876421
temp_10          0.017995  0.000322   55.843363
temp_trend       0.018592  0.000457   40.718311
temp_40          0.021400  0.000289   74.118898
wind_20_trend    0.022053  0.000320   68.947329
temp             0.023832  0.000249   95.614397
wind_trend       0.027351  0.000338   80.966239
soilheat_trend   0.028076  0.000390   71.923880
netrad_trend     0.029169  0.000350   83.305127
wind_10_trend    0.030118  0.000562   53.555488
wind_40_trend    0.032436  0.000715   45.386153
dewpoint_trend   0.033121  0.000399   83.083066
soilheat         0.039414  0.000597   66.060931
soil_temp_trend  0.040030  0.001077   37.150942
z                0.040719  0.000519   78

In [18]:
most_res, _, _, _, _ = nested_cv_spark(df, MOSTEstimator(), {
    'regu': LogUniform(10, -6, 1)
}, 'zL', 'phi_m', normalize=False, n_iter=10)

                          count        mean        std         min  \
explained_variance_score   10.0    0.604488   0.009749    0.588177   
mean_absolute_error        10.0    0.395237   0.010928    0.380288   
mean_squared_error         10.0    0.332043   0.021671    0.303898   
median_absolute_error      10.0    0.282347   0.006470    0.274640   
r2_score                   10.0    0.604319   0.009802    0.587802   
mean_abs_percent_error     10.0  184.221323  38.776409  113.325184   
median_abs_percent_error   10.0   24.624156   1.413938   22.637900   

                                 25%         50%         75%         max  
explained_variance_score    0.599939    0.602954    0.610194    0.619261  
mean_absolute_error         0.388139    0.392386    0.402019    0.416708  
mean_squared_error          0.316976    0.324940    0.345071    0.368475  
median_absolute_error       0.276705    0.282892    0.284840    0.294774  
r2_score                    0.599853    0.602929    0.610082    