### Time-Based Validation 

This class has:

- sliding_window method

- forward_chaining method 

In [None]:
from dateutil.relativedelta import *
from datetime import datetime

class TimeBasedCV():
    '''
    This class is inspired from:
    https://gist.github.com/orhermansaffar/2bd2342c81026de1c09c97d66226eb46#file-timebasedcv-py
    
    Parameters
    
    - train_period: int
      number of time units to include in each train set for sliding window method
      number of time units to initally include in first train set for forward chaining method
      default is 3(months)
        
    - test_period: int
      number of time units to include in each test set
      default is 1(month)
      
    - freq: string
        frequency of input parameters. possible values are: days, months, years, weeks, hours, minutes, seconds
        possible values designed to be used by dateutil.relativedelta class
        deafault is months
        
    '''
    
    def __init__(self, train_period = 3, test_period = 1, freq = 'months'):
        self.train_period = train_period
        self.test_period = test_period
        self.freq = freq
        
        
    def sliding_window(self, data, first_split_date = None, date_column = 'Month', gap = 0):
        
        '''
         Generate indices using sliding window method to split data into training and test set
         
         Parameters:
         
        - data: pandas DataFrame
          the data contains date column
            
        - first_split_date: datetime.date()
          first date to perform the splitting on.
          if not provided will set to be the minimum date in the data after first training set
        
        - date_column: string, default = 'Month'
          date of each record
            
        - gap: int, default = 0
          for cases the test set does not come right after the train set, 
          *gap* days are left between train and test sets
            
            
        Returns
        
        - train_index, test_index:
          list of tuples(train index, test index)
        
        '''
        
        # check if date_column exist in the data:
        
        try: 
            data[date_column]
        except:
            raise KeyError(date_column)
            
        train_indices_list = []
        test_indices_list = []
        
        if first_split_date == None:
            first_split_date = data[date_column].min().date() + eval('relativedelta('+self.freq+'=self.train_period)')
            
        start_train = first_split_date - eval('relativedelta('+self.freq+'=self.train_period)')
        end_train = start_train + eval('relativedelta('+self.freq+'=self.train_period)')
        start_test = end_train + eval('relativedelta('+self.freq+'=gap)')
        end_test = start_test + eval('relativedelta('+self.freq+'=self.test_period)')
        
        while end_test <= data[date_column].max().date() + eval('relativedelta('+self.freq+'=1)'):
            # train indices:
            cur_train_indices = list(data[(data[date_column].dt.date>=start_train) & 
                                     (data[date_column].dt.date<end_train)].index)

            # test indices:
            cur_test_indices = list(data[(data[date_column].dt.date>=start_test) &
                                    (data[date_column].dt.date<end_test)].index)
            
            print("Train period:",start_train,"-" , end_train, ", Test period", start_test, "-", end_test)
                  #"# train records", len(cur_train_indices), ", # test records", len(cur_test_indices))

            train_indices_list.append(cur_train_indices)
            test_indices_list.append(cur_test_indices)

            # update dates:
            start_train = start_train + eval('relativedelta('+self.freq+'=self.test_period)')
            end_train = start_train + eval('relativedelta('+self.freq+'=self.train_period)')
            start_test = end_train + eval('relativedelta('+self.freq+'=gap)')
            end_test = start_test + eval('relativedelta('+self.freq+'=self.test_period)')

        
        index_output = [(train, test) for train, test in zip(train_indices_list,test_indices_list)]

        self.n_splits = len(index_output)
        
        return index_output
    
    
    def forward_chaining(self, data, first_split_date = None, date_column = 'Month', gap = 0):
        
        try:
            data[date_column]
        except:
            raise KeyError(date_column)
        
        train_indices_list = []
        test_indices_list = []
        
        if first_split_date == None:
            first_split_date = data[date_column].min().date() + eval('relativedelta('+self.freq+'=self.train_period)')
            
        start_train = first_split_date - eval('relativedelta('+self.freq+'=self.train_period)')
        end_train = start_train + eval('relativedelta('+self.freq+'=self.train_period)')
        start_test = end_train + eval('relativedelta('+self.freq+'=gap)')
        end_test = start_test + eval('relativedelta('+self.freq+'=self.test_period)')   
        
        while end_test <= data[date_column].max().date() + eval('relativedelta('+self.freq+'=1)'):
            cur_train_indices = list(data[(data[date_column].dt.date>=start_train) & 
                                     (data[date_column].dt.date<end_train)].index)
            
            cur_test_indices = list(data[(data[date_column].dt.date>=start_test) &
                                    (data[date_column].dt.date<end_test)].index)
            
            print("Train period:",start_train,"-" , end_train, ", Test period", start_test, "-", end_test)
                  #"# train records", len(cur_train_indices), ", # test records", len(cur_test_indices))
            
            train_indices_list.append(cur_train_indices)
            test_indices_list.append(cur_test_indices)
            
            # update dates:
            start_train = start_train 
            end_train = start_test + eval('relativedelta('+self.freq+'=self.test_period)')
            start_test = end_train + eval('relativedelta('+self.freq+'=gap)')
            end_test = start_test + eval('relativedelta('+self.freq+'=self.test_period)')
        
        index_output = [(train, test) for train, test in zip(train_indices_list,test_indices_list)]
        
        self.n_splits = len(index_output)
        
        return index_output
        
        
    def get_n_splits(self):
        
        '''
        Returns the number of splitting iterations in the cross-validator
        
        Returns
        
        - n_splits : int
          Returns the number of splitting iterations in the cross-validator.
          
        '''
        return self.n_splits 

### Time-Based Target Encoding 
target encoding using time based validation

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
import category_encoders.utils as util
import numpy as np

class SWTargetEncoder(BaseEstimator, TransformerMixin):
    
    '''
    Target encoding for categorical features using sliding window method
    
    Parameters
    
    - col_names: list
      a list of columns to encode, if None, all string columns will be encoded.
    
    - target_name: string
    
    '''
    
    def __init__(self, target_name, col_names = None,):
        self.col_names = col_names
        self.target_name = target_name
        
        
    def fit(self, X, y = None, drop_target = True, drop_na = True, drop_cat = True):
        return self
    
    def transform(self, X, drop_target = True, drop_na = True, drop_cat = True):
        
        # if columns aren't passed, just use every string column
        if self.col_names is None:
            self.col_names = util.get_obj_cols(X)
        else:
            self.col_names = util.convert_cols_to_list(self.col_names)
        
        assert type(self.target_name) == str, "target_name must be a string"
        assert type(self.col_names) == list, "col_names must be a list"
        assert set(self.col_names).issubset(set(X.columns)), "column name is not found in the dataframe"
        assert self.target_name in X.columns, "target_name is not found in the dataframe"
        
        target_mean = X[self.target_name].mean()
        
        # split X using sliding window method and get indices
        time_cv = SlidingWindow(train_period = 1, test_period = 1)
        sliding_window_indices = time_cv.split(X)
        
        for col in self.col_names:
            col_mean_name = col + '_' + 'target_mean'
            X[col_mean_name] = np.nan
            
            for train_indices, valid_indices in sliding_window_indices:
                X_train, X_valid = X.loc[train_indices], X.loc[valid_indices]
                X.loc[valid_indices, col_mean_name] = X_valid[col].map(X_train.groupby(col)[self.target_name].mean())
        
        # drop the rows having NaN values (not encoded)
        if drop_na == True:
            X.dropna(inplace = True)
        
        # drop target
        if drop_target == True:
            X.drop(self.target_name, axis = 1, inplace = True)
        
        # drop categorical columns
        if drop_cat == True:
            X.drop([cols for cols in self.col_names], axis = 1, inplace = True)
        
        return X 


In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
import category_encoders.utils as util
import numpy as np

class FCTargetEncoder(BaseEstimator, TransformerMixin):
    
    '''
    Target encoding for categorical features using forward chaining method
    
    Parameters
    
    - col_names: list
      a list of columns to encode, if None, all string columns will be encoded.
    
    - target_name: string
    
    '''
    
    def __init__(self, target_name, col_names = None):
        self.target_name = target_name
        self.col_names = col_names
      
    def fit(self, X, y = None, drop_target = True, drop_na = True, drop_cat = True):
        return self
    
    def transform(self, X, drop_target = True, drop_na = True, drop_cat = True):
        
        # if columns aren't passed, just use every string column
        if self.col_names is None:
            self.col_names = util.get_obj_cols(X)
        else:
            self.col_names = util.convert_cols_to_list(self.col_names)
        
        assert type(self.target_name) == str, "target_name must be a string"
        assert type(self.col_names) == list, "col_names must be a list"
        assert set(self.col_names).issubset(set(X.columns)), "column name is not found in the dataframe"
        assert self.target_name in X.columns, "target_name is not found in the dataframe"
        
        target_mean = X[self.target_name].mean()
        
        # split X using  forward chaining method and get indices
        time_cv = TimeBasedCV(train_period = 1, test_period = 1)
        forward_chaining_indices = time_cv.forward_chaining(X) 
        
        for col in self.col_names:
            col_mean_name = col + '_' + 'target_mean'
            X[col_mean_name] = np.nan
            
            for train_indices, valid_indices in forward_chaining_indices:
                X_train, X_valid = X.loc[train_indices], X.loc[valid_indices]
                X.loc[valid_indices, col_mean_name] = X_valid[col].map(X_train.groupby(col)[self.target_name].mean()
                                                                      )
        # drop the rows having NaN values (not encoded)
        if drop_na == True:
            X.dropna(inplace = True)
        
        #drop target
        if drop_target == True:
            X.drop(self.target_name, axis = 1, inplace = True)
        
        # drop categorical columns
        if drop_cat == True:
            X.drop([cols for cols in self.col_names], axis = 1, inplace = True)
            
        return X 

