In [1]:
import itertools
import pandas as pd
import numpy as np
from sklearn.model_selection import GridSearchCV, TimeSeriesSplit, cross_val_score
from sklearn import svm
from sklearn.metrics import root_mean_squared_log_error
from copy import deepcopy
from prophet import Prophet

import logging
logging.getLogger("prophet").setLevel(logging.CRITICAL)
logging.getLogger("cmdstanpy").setLevel(logging.CRITICAL)

In [2]:
def cross_val(estimator, X, y, p_grid, inner_splits=5, outer_splits=5, inner_gap=3, outer_gap=3, test_size=30):
    inner_cv = TimeSeriesSplit(n_splits=inner_splits, gap=inner_gap, test_size=test_size)
    outer_cv = TimeSeriesSplit(n_splits=outer_splits, gap=outer_gap, test_size=test_size)

    model = GridSearchCV(estimator=estimator, 
                         param_grid=p_grid, 
                         cv=inner_cv, 
                         scoring='neg_root_mean_squared_error')
    
    nested_score = cross_val_score(model, 
                                   X=X, 
                                   y=y, 
                                   cv=outer_cv, 
                                   scoring='neg_root_mean_squared_error')
    
    
    print(nested_score.mean())

In [3]:
def my_GridSearchCV(estimator, X, y, param_grid, cv):   
    keys, values = zip(*param_grid.items())
    permutations_dicts = [dict(zip(keys, v)) for v in itertools.product(*values)]

    best_model_score = float('inf')
    
    for params in permutations_dicts:

        scores = []
        pointer = 0

        for train_index, test_index in cv.split(X, y):  
            model = estimator(**params)                        
            pointer += 1
            x_train, x_test = X.loc[train_index], X.loc[test_index]
            y_train, y_test = y.loc[train_index], y.loc[test_index]

            if (isinstance(estimator(), Prophet)):
                df = pd.concat([x_train, y_train], axis = 1)
                df = df.rename(columns={"visit_date": "ds", "visitors": "y"})
                model.fit(df)

                df = deepcopy(x_test)
                df = df.rename(columns={"visit_date": "ds"})
                pred = model.predict(df)
                pred = pred[["yhat"]]
                pred[pred < 0] = 0

            else:
                model.fit(x_train, y_train)

                pred = model.predict(x_test)
    

            score = root_mean_squared_log_error(y_test, pred)
    
            scores.append(score)

        model_score = np.mean(scores)
        
        if model_score < best_model_score:
            best_model_score = model_score
            best_params = params

    best_model = estimator(**best_params)

    return best_params

In [4]:
def my_nested_cv(estimator, X, y, p_grid, inner_splits=5, outer_splits=5, inner_gap=3, outer_gap=3, test_size=30):
    cv_inner = TimeSeriesSplit(n_splits=inner_splits, gap=inner_gap, test_size=test_size)          
    cv_outer = TimeSeriesSplit(n_splits=outer_splits, gap=outer_gap, test_size=test_size)

    history = []

    pointer = 0
    for train_index, test_index in cv_outer.split(X, y):                          
        pointer += 1
        print('NestedCV: {} of outer fold {}'.format(pointer, cv_outer.get_n_splits()))
        x_train, x_test = X.loc[train_index], X.loc[test_index]
        y_train, y_test = y.loc[train_index], y.loc[test_index]

        
        params = my_GridSearchCV(estimator=estimator, 
                                X=X, 
                                y=y,
                                param_grid=p_grid, 
                                cv=cv_inner) 

        model = estimator(**params)    

         
        if (isinstance(estimator(), Prophet)):
            df = pd.concat([x_train, y_train], axis = 1)
            df = df.rename(columns={"visit_date": "ds", "visitors": "y"})
            model.fit(df)

            df = deepcopy(x_test)
            df = df.rename(columns={"visit_date": "ds"})
            pred = model.predict(df)
            pred = pred[["yhat"]]
            pred[pred < 0] = 0
            
        else:
            model.fit(x_train, y_train)

            pred = model.predict(x_test)

        score = root_mean_squared_log_error(y_test, pred)
    
        print("Score:", score, "\n")
        history.append(score)
        # print(train_index, test_index)

    print('Overall test performance: {}'.format(np.mean(history)))

a small code to test the function

In [5]:
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()
data = pd.concat([pd.DataFrame(housing.data), pd.DataFrame(housing.target).rename(columns={0: "target"})], axis=1).reset_index(drop=True)
X = data[[0, 1, 2, 3]].head(500)
y = data["target"].head(500)

reg = svm.SVR
p_grid = {"C": [1, 10], "gamma": [0.01, 0.1]}

my_nested_cv(reg, X, y, p_grid)

NestedCV: 1 of outer fold 5
Score: 0.15433429155854744 

NestedCV: 2 of outer fold 5
Score: 0.21172619512825575 

NestedCV: 3 of outer fold 5
Score: 0.1709678588115569 

NestedCV: 4 of outer fold 5
Score: 0.35687660156870993 

NestedCV: 5 of outer fold 5
Score: 0.277030172805938 

Overall test performance: 0.23418702397460162


In [6]:
from sklearn.datasets import fetch_california_housing

housing = fetch_california_housing()
data = pd.concat([pd.DataFrame(housing.data), pd.DataFrame(housing.target).rename(columns={0: "target"})], axis=1).reset_index(drop=True)
X = data[[0, 1, 2, 3]].head(500)
y = data["target"].head(500)

reg = svm.SVR()
p_grid = {"C": [1, 10], "gamma": [0.01, 0.1]}

cross_val(reg, X, y, p_grid)

-0.777866656214605
