In [8]:
%load_ext autoreload
%autoreload 2
import sys
from pathlib import Path
sys.path.insert(1, str(Path.cwd().parent))
str(Path.cwd().parent)

'c:\\Users\\Joaquín Amat\\Documents\\GitHub\\skforecast'

In [9]:
import pandas as pd
import numpy as np
from typing import Union, Optional

class TimeSeriesFold():

    def __init__(
            self,
            window_size: int,
            initial_train_size: Union[int, None],
            test_size: int,
            #externally_fitted: bool = False,
            refit: Union[bool, int] = False,
            fixed_train_size: bool = True,
            gap: int = 0,
            skip_folds: Optional[Union[int, list]] = None,
            allow_incomplete_fold: bool = True,
            return_all_indexes: bool = False,
            differentiation: Optional[int] = None,
            verbose: bool = True
    ):
        self.window_size = window_size
        self.initial_train_size = initial_train_size
        self.test_size = test_size
        self.refit = refit
        self.fixed_train_size = fixed_train_size
        self.gap = gap
        self.skip_folds = skip_folds
        self.allow_incomplete_fold = allow_incomplete_fold
        self.return_all_indexes = return_all_indexes
        self.differentiation = differentiation
        self.verbose = verbose

        if not isinstance(window_size, int) or window_size < 1:
            raise ValueError(
                f"window_size must be an integer greater than 0. Got {window_size}."
            )
        if not isinstance(initial_train_size, int) and initial_train_size is not None:
            raise ValueError(
                f"initial_train_size must be an integer or None. Got {initial_train_size}."
            )
        if not isinstance(test_size, int) or test_size < 1:
            raise ValueError(
                f"test_size must be an integer greater than 0. Got {test_size}."
            )
        if not isinstance(refit, (bool, int)):
            raise ValueError(
                f"refit must be a boolean or an integer. Got {refit}."
            )
        if not isinstance(fixed_train_size, bool):
            raise ValueError(
                f"fixed_train_size must be a boolean. Got {fixed_train_size}."
            )
        if not isinstance(gap, int) or gap < 0:
            raise ValueError(
                f"gap must be an integer greater than or equal to 0. Got {gap}."
            )
        if skip_folds is not None:
            if not isinstance(skip_folds, (int, list)):
                raise ValueError(
                    f"skip_folds must be an integer or a list. Got {skip_folds}."
                )
            if isinstance(skip_folds, int) and skip_folds < 1:
                raise ValueError(
                    f"skip_folds must be an integer greater than 0. Got {skip_folds}."
                )
            if isinstance(skip_folds, list) and any([x < 0 for x in skip_folds]):
                raise ValueError(
                    f"skip_folds list must contain integers greater than or equal to 0. "
                    f"Got {skip_folds}."
                )
        if not isinstance(allow_incomplete_fold, bool):
            raise ValueError(
                f"allow_incomplete_fold must be a boolean. Got {allow_incomplete_fold}."
            )
        if not isinstance(return_all_indexes, bool):
            raise ValueError(
                f"return_all_indexes must be a boolean. Got {return_all_indexes}."
            )
        if differentiation is not None:
            if not isinstance(differentiation, int) or differentiation < 1:
                raise ValueError(
                    f"differentiation must be an integer greater than 0. Got {differentiation}."
                )
        


    def split(
            self,
            X: Union[pd.Series, pd.DataFrame, pd.Index, dict],
            externally_fitted: bool = False,
    ) -> list:
        

        # TODO: What to do when X is a dictionary?

        if not isinstance(X, (pd.Series, pd.DataFrame, pd.Index, dict)):
            raise ValueError(
                f"X must be a pandas Series, DataFrame, Index or a dictionary. Got {type(X)}."
            )
        
        # Extract index
        if isinstance(X, (pd.Series, pd.DataFrame)):
            index = X.index
        elif isinstance(X, dict):
            freqs = [s.index.freq for s in X.values() if s.index.freq is not None]
            if not freqs:
                raise ValueError("At least one series must have a frequency.")
            if not all(f == freqs[0] for f in freqs):
                raise ValueError(
                    "All series with frequency must have the same frequency."
                )
            min_index = min([v.index[0] for v in X.values()])
            max_index = max([v.index[-1] for v in X.values()])
            index = pd.date_range(start=min_index, end=max_index, freq=freqs[0])
        else:
            index = X

        idx = range(len(X))
        folds = []
        i = 0
        last_fold_excluded = False

        while self.initial_train_size + (i * self.test_size) + self.gap < len(X):

            if self.refit:
                # If `fixed_train_size` the train size doesn't increase but moves by 
                # `test_size` positions in each iteration. If `False`, the train size
                # increases by `test_size` in each iteration.
                train_iloc_start = i * (self.test_size) if self.fixed_train_size else 0
                train_iloc_end = self.initial_train_size + i * (self.test_size)
                test_iloc_start = train_iloc_end
            else:
                # The train size doesn't increase and doesn't move.
                train_iloc_start = 0
                train_iloc_end = self.initial_train_size
                test_iloc_start = self.initial_train_size + i * (self.test_size)
            
            last_window_iloc_start = test_iloc_start - self.window_size
            test_iloc_end = test_iloc_start + self.gap + self.test_size
        
            partitions = [
                idx[train_iloc_start : train_iloc_end],
                idx[last_window_iloc_start : test_iloc_start],
                idx[test_iloc_start : test_iloc_end],
                idx[test_iloc_start + self.gap : test_iloc_end]
            ]
            folds.append(partitions)
            i += 1

        if not self.allow_incomplete_fold:
            if len(folds[-1][3]) < self.test_size:
                folds = folds[:-1]
                last_fold_excluded = True

        # Replace partitions inside folds with length 0 with `None`
        folds = [[partition if len(partition) > 0 else None 
                for partition in fold] 
                for fold in folds]

        # Create a flag to know whether to train the forecaster
        if self.refit == 0:
            refit = False
            
        if isinstance(refit, bool):
            fit_forecaster = [refit] * len(folds)
            fit_forecaster[0] = True
        else:
            fit_forecaster = [False] * len(folds)
            for i in range(0, len(fit_forecaster), refit): 
                fit_forecaster[i] = True
        
        for i in range(len(folds)): 
            folds[i].append(fit_forecaster[i])
            if fit_forecaster[i] is False:
                folds[i][0] = folds[i - 1][0]

        # This is done to allow parallelization when `refit` is `False`. The initial 
        # Forecaster fit is outside the auxiliary function.
        folds[0][4] = False

        index_to_skip = []
        if self.skip_folds is not None:
            if isinstance(self.skip_folds, int) and self.skip_folds > 0:
                index_to_keep = np.arange(0, len(folds), self.skip_folds)
                index_to_skip = np.setdiff1d(np.arange(0, len(folds)), index_to_keep, assume_unique=True)
                index_to_skip = [int(x) for x in index_to_skip] # Required since numpy 2.0
            if isinstance(self.skip_folds, list):
                index_to_skip = [i for i in self.skip_folds if i < len(folds)]        
        
        if self.verbose:
            print("Information of backtesting process")
            print("----------------------------------")
            if externally_fitted:
                print(
                    f"An already trained forecaster is to be used. Window size: "
                    f"{self.window_size}"
                )
            else:
                if self.differentiation is None:
                    print(
                        f"Number of observations used for initial training: "
                        f"{self.initial_train_size}"
                    )
                else:
                    print(
                        f"Number of observations used for initial training: "
                        f"{self.initial_train_size - self.differentiation}"
                    )
                    print(f"    First {self.differentiation} observation/s in training sets "
                          f"are used for differentiation"
                    )
            print(
                f"Number of observations used for backtesting: "
                f"{len(X) - self.initial_train_size}"
            )
            print(f"    Number of folds: {len(folds)}")
            print(
                f"    Number skipped folds: "
                f"{len(index_to_skip)} {index_to_skip if index_to_skip else ''}"
            )
            print(f"    Number of steps per fold: {self.test_size}")
            print(
                f"    Number of steps to exclude from the end of each train set "
                f"before test (gap): self.{self.gap}"
            )
            if last_fold_excluded:
                print("    Last fold has been excluded because it was incomplete.")
            if len(folds[-1][3]) < self.test_size:
                print(f"    Last fold only includes {len(folds[-1][3])} observations.")
            print("")

            if self.differentiation is None:
                self.differentiation = 0
            
            for i, fold in enumerate(folds):
                is_fold_skipped   = i in index_to_skip
                has_training      = fold[-1] if i != 0 else True
                training_start    = (
                    index[fold[0][0] + self.differentiation] if fold[0] is not None else None
                )
                training_end      = index[fold[0][-1]] if fold[0] is not None else None
                training_length   = (
                    len(fold[0]) - self.differentiation if fold[0] is not None else 0
                )
                validation_start  = index[fold[3][0]]
                validation_end    = index[fold[3][-1]]
                validation_length = len(fold[3])

                print(f"Fold: {i}")
                if is_fold_skipped:
                    print("    Fold skipped")
                elif not externally_fitted and has_training:
                    print(
                        f"    Training:   {training_start} -- {training_end}  "
                        f"(n={training_length})"
                    )
                    print(
                        f"    Validation: {validation_start} -- {validation_end}  "
                        f"(n={validation_length})"
                    )
                else:
                    print("    Training:   No training in this fold")
                    print(
                        f"    Validation: {validation_start} -- {validation_end}  "
                        f"(n={validation_length})"
                    )

            print("")

        folds = [fold for i, fold in enumerate(folds) if i not in index_to_skip]
        if not self.return_all_indexes:
            # +1 to prevent iloc pandas from deleting the last observation
            folds = [
                [[fold[0][0], fold[0][-1] + 1], 
                [fold[1][0], fold[1][-1] + 1], 
                [fold[2][0], fold[2][-1] + 1],
                [fold[3][0], fold[3][-1] + 1],
                fold[4]] 
                for fold in folds
            ]

        return folds

        


In [10]:
my_series = pd.Series(range(100), index=pd.date_range(start='2020-01-01', periods=100, freq='D'))

tsf = TimeSeriesFold(
    window_size=5,
    initial_train_size=50,
    test_size=10,
    refit=False,
    fixed_train_size=True,
    gap=0,
    skip_folds=None,
    allow_incomplete_fold=True,
    return_all_indexes=False,
    differentiation=None,
    verbose=True
)

folds = tsf.split(my_series)
folds

Information of backtesting process
----------------------------------
Number of observations used for initial training: 50
Number of observations used for backtesting: 50
    Number of folds: 5
    Number skipped folds: 0 
    Number of steps per fold: 10
    Number of steps to exclude from the end of each train set before test (gap): self.0

Fold: 0
    Training:   2020-01-01 00:00:00 -- 2020-02-19 00:00:00  (n=50)
    Validation: 2020-02-20 00:00:00 -- 2020-02-29 00:00:00  (n=10)
Fold: 1
    Training:   No training in this fold
    Validation: 2020-03-01 00:00:00 -- 2020-03-10 00:00:00  (n=10)
Fold: 2
    Training:   No training in this fold
    Validation: 2020-03-11 00:00:00 -- 2020-03-20 00:00:00  (n=10)
Fold: 3
    Training:   No training in this fold
    Validation: 2020-03-21 00:00:00 -- 2020-03-30 00:00:00  (n=10)
Fold: 4
    Training:   No training in this fold
    Validation: 2020-03-31 00:00:00 -- 2020-04-09 00:00:00  (n=10)



[[[0, 50], [45, 50], [50, 60], [50, 60], False],
 [[0, 50], [55, 60], [60, 70], [60, 70], False],
 [[0, 50], [65, 70], [70, 80], [70, 80], False],
 [[0, 50], [75, 80], [80, 90], [80, 90], False],
 [[0, 50], [85, 90], [90, 100], [90, 100], False]]

In [11]:
tsf = TimeSeriesFold(
    window_size=5,
    initial_train_size=50,
    test_size=10,
    refit=False,
    fixed_train_size=True,
    gap=0,
    skip_folds=None,
    allow_incomplete_fold=True,
    return_all_indexes=True,
    differentiation=None,
    verbose=True
)

folds = tsf.split(my_series)
folds

Information of backtesting process
----------------------------------
Number of observations used for initial training: 50
Number of observations used for backtesting: 50
    Number of folds: 5
    Number skipped folds: 0 
    Number of steps per fold: 10
    Number of steps to exclude from the end of each train set before test (gap): self.0

Fold: 0
    Training:   2020-01-01 00:00:00 -- 2020-02-19 00:00:00  (n=50)
    Validation: 2020-02-20 00:00:00 -- 2020-02-29 00:00:00  (n=10)
Fold: 1
    Training:   No training in this fold
    Validation: 2020-03-01 00:00:00 -- 2020-03-10 00:00:00  (n=10)
Fold: 2
    Training:   No training in this fold
    Validation: 2020-03-11 00:00:00 -- 2020-03-20 00:00:00  (n=10)
Fold: 3
    Training:   No training in this fold
    Validation: 2020-03-21 00:00:00 -- 2020-03-30 00:00:00  (n=10)
Fold: 4
    Training:   No training in this fold
    Validation: 2020-03-31 00:00:00 -- 2020-04-09 00:00:00  (n=10)



[[range(0, 50), range(45, 50), range(50, 60), range(50, 60), False],
 [range(0, 50), range(55, 60), range(60, 70), range(60, 70), False],
 [range(0, 50), range(65, 70), range(70, 80), range(70, 80), False],
 [range(0, 50), range(75, 80), range(80, 90), range(80, 90), False],
 [range(0, 50), range(85, 90), range(90, 100), range(90, 100), False]]