In [56]:
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, r2_score
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.linear_model import Ridge
from sklearn.pipeline import make_pipeline
import pandas as pd
from itertools import product
import csv 
import os.path
import datetime

In [89]:
def sliding_window_cv_regression(X, y, pipe, n_tr, n_ts=1, scorers=[]):
    assert len(X) == len(y), "Length of X ([]) must match that of y ([]).".format(len(X), len(y))
    y_pred = []
    y_target = []
    agg_results = {}


    for i_tr_start in range(0, len(X)-(n_tr+n_ts)):
        # The last i_ts_end should be len(X).
        # i_ts_end = i_ts_start + n_ts
        # Now, i_tr_end = i_ts_start
        # So, i_tr_start = i_ts_start - n_tr
        # But, i_ts_start = i_ts_end - n_ts
        # Thus, i_tr_start = i_ts_end - n_tr - n_ts
        # Hence, last i_tr_start = len(X) - (n_tr + n_ts)

        i_tr_end = i_ts_start = i_tr_start + n_tr 
        i_ts_end = i_ts_start + n_ts 

        if isinstance(X, pd.DataFrame):
            Xtr, Xts = X.iloc[i_tr_start:i_tr_end, :], X.iloc[i_ts_start:i_ts_end, :]
        elif isinstance(X, np.ndarray):
            Xtr, Xts = X[i_tr_start:i_tr_end, :], X[i_ts_start:i_ts_end, :]
        ytr, yts = y[i_tr_start:i_tr_end], y[i_ts_start:i_ts_end]

        pipe.fit(Xtr, ytr)
        yts_hat = pipe.predict(Xts)
        y_pred.extend(yts_hat)
        y_target.extend(yts)
    
    if len(y_pred) > 1:
        y_pred = np.squeeze(y_pred)

    agg_results['time'] = datetime.datetime.now()
    agg_results['model'] = str(pipe)
    for scorer in scorers:
        agg_results[scorer.__name__] = scorer(y_target, y_pred)
    
    return agg_results

In [60]:
X = np.random.random(size=(300, 64))
y = np.random.random(X.shape[0])
pipe = make_pipeline(
    RobustScaler(),
    Ridge()
)
n_tr = 30
n_ts = 2
res = sliding_window_cv_regression(X, y, pipe, n_tr=n_tr, n_ts=n_ts, scorers=[mean_squared_error,
mean_absolute_percentage_error, r2_score])

In [51]:
res

{'mean_squared_error': 0.16525680880996493,
 'mean_absolute_percentage_error': 6.4147208375498765,
 'r2_score': -0.9257532760198532}

In [90]:
def batch_test_swcv_regression(list_X, list_y, list_pipe, list_n_tr, list_n_ts, scorers, savefile):
    results = []
    for X, y in zip(list_X, list_y):
        for pipe, n_tr, n_ts in product(list_pipe, list_n_tr, list_n_ts):
            result = sliding_window_cv_regression(X, y, pipe, n_tr, n_ts, scorers)
            results.append(result)
            print("A test completed. (Comment : {}".format(result['comment']))

    if savefile is not None:
        file_exists = os.path.isfile(savefile)
        
        with open(savefile, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=results[0].keys(), delimiter=',', lineterminator='\n')

            if not file_exists:
                writer.writeheader()  # file doesn't exist yet, write a header

            writer.writerows(results)

In [91]:
batch_test_swcv_regression(
    list_X = [X],
    list_y = [y],
    list_pipe = [pipe],
    list_n_tr = [30, 45],
    list_n_ts = [1, 10],
    scorers = [mean_squared_error,mean_absolute_percentage_error, r2_score],
    savefile='temp_check.csv'
)

In [88]:
str(pipe)

"Pipeline(steps=[('robustscaler', RobustScaler()), ('ridge', Ridge())])"