In [None]:
class CustomOrdinalEncoder(BaseEstimator, TransformerMixin):
    '''
    This Custom Ordinal Encoder (COE) defines a mapping between unique values of each categorical features and integers, by defining an integer index starting at 1 up to N (where N is the length of unique values discovered during fitting), finally two more itegers are added for two specific situations:
- N+1 -> 'MISSING': for missing values
- N+2 -> 'UNKNOWN': for unseen values during fitting
'''
    
    def __init__(self, categorical_columns, verbose=False):
        # Define the list of categorical columns
        self.categorical_columns = categorical_columns

        # Define the mapping between textual-like values of categorical features
        # to integer ones and vice versa
        self.direct_mapping = dict() # categorical -> integer
        self.inverse_mapping = dict() # integer -> categorical
        
        # Define the original column types (for "inverse_transform()" method)
        self._categorical_columns_types_dict = dict()
        
        # Define the verbosity level
        self.verbose = verbose 
        
        
    def _check_column_names(self, cols):
        
        
        assert(len(cols) == len(self.direct_mapping.keys()))
        for i in range(len(cols)):
            if cols[i] != list(self.direct_mapping.keys())[i]:
                print(f'ERROR: in position "{i+1}", there is a different column name w.r.t. fit time.')
                print(f'Expected: "{list(self.direct_mapping.keys())[i]}" - Found: "{X.columns.tolist()[i]}"')
                print()
                raise SystemError
        
        
    def fit(self, X, y=None):
        
        X = X.copy()
        
        start_time = time.time()
        
        # For each categorical column, define the mapping
        self._categorical_columns_types_dict = X.dtypes.to_dict()
        for cat_column in self.categorical_columns:  

            unique_values = [value for value in X[cat_column].unique()]

            
            # Check whether the categorical column has missing values or not
            missing_values_in = X[cat_column].tolist().count(np.nan)
            if missing_values_in:
                print(f'WARN: the "{cat_column}" categorical column contains missing values '\
                      f'({round((missing_values_in / len(X[cat_column]) * 100), 2)}% of the total rows).')
            
            # Define the mapping between seen values at fit time with a integer index
            # starting by 1 to N (where N is the length of unique values)
            
            self.direct_mapping[cat_column] = {
                str_value: int_index+1 for int_index, str_value in enumerate(sorted(unique_values))
            }
            
            self.inverse_mapping[cat_column] = {
                int_index+1: str_value for int_index, str_value in enumerate(sorted(unique_values))
            }
            
            # Add two more keys:
            # - N+1 -> missing values ["<MISSING>"]
            # - N+2 -> unknown values ["<UNKNOWN>"]
            max_value = max(self.direct_mapping[cat_column].values())+1
            self.direct_mapping[cat_column]['<MISSING>'] = max_value
            self.inverse_mapping[cat_column][max_value] = '<MISSING>'
            
            max_value = max(self.direct_mapping[cat_column].values())+1
            self.direct_mapping[cat_column]['<UNKNOWN>'] = max_value
            self.inverse_mapping[cat_column][max_value] = '<UNKNOWN>'
            

        if self.verbose:
            print(f'INFO: fitting of the custom Ordinal Encoder finished correctly in' \
                  f' {round(time.time()-start_time, 3)} seconds.')

        return self
    
    def transform(self, X, y=None):
        
        X = X.copy()
        start_time = time.time()
        
        # Check whether the list of columns are equal to the one seen at fit time
        self._check_column_names(cols=X.columns)
        
        for cat_column in self.categorical_columns:  
            # Define the list of rows with missing values and unkown values
            missing_indices = X[cat_column].loc[X[cat_column].isna()].index
            
            unique_values = [value for value in self.direct_mapping[cat_column].keys() \
                             if value not in ['<MISSING>', '<UNKNOWN>']]
            unknown_indices = X[cat_column].loc[(~X[cat_column].isna()) & \
                                                (~X[cat_column].isin(unique_values))].index
            
            # Cast the column to generic object type
            X[cat_column] = X[cat_column].astype('object')
            
            # Apply the mapping for known values (i.e. already seen during fitting)
            X[cat_column] = X[cat_column].map(self.direct_mapping[cat_column])
                        
            # Fill missing values with "<MISSING>" token
            assert(X.index.nunique()==X.shape[0])

            #not using .at because when missing_indexes is empty throws InvalidIndex error
            X.loc[missing_indices, cat_column] = self.direct_mapping[cat_column]['<MISSING>']
            
            # Fill unknown values (i.e. the values never seen during fitting) with "<UNKNOWN>" token
            X.loc[unknown_indices, cat_column] = self.direct_mapping[cat_column]['<UNKNOWN>']
            
            
            
            # Cast the column to integer type
            X[cat_column] = X[cat_column].astype('int64')

        if self.verbose:
            print(f'INFO: transforming with the custom Ordinal Encoder finished correctly in' \
                  f' {round(time.time()-start_time, 3)} seconds.')
                
        return X
    
    def inverse_transform(self, X, y=None):
        X = X.copy()
        
        if self.verbose:
            print('INFO: inverse transforming with the fitted custom Ordinal Encoder...')
        start_time= time.time()
        
        # Check whether the list of columns are equal to the one seen at fit time
        self._check_column_names(cols=X.columns)
        
        for cat_column in self.categorical_columns:
            # Use the inverse mapping for mapping back to the original values
            X[cat_column] = X[cat_column].map(self.inverse_mapping[cat_column])
            
            # Convert the "<MISSING>" token to np.NaN
            X.at[X.loc[X[cat_column] == '<MISSING>'].index, cat_column] = np.nan
            
            # Convert the column to the original type
            if self._categorical_columns_types_dict[cat_column] == 'object':
                X[cat_column] = X[cat_column].astype('object')
            
            elif self._categorical_columns_types_dict[cat_column] == 'category':
                new_categories = self._categorical_columns_types_dict[cat_column].categories.tolist()
                if '<UNKNOWN>' in X[cat_column].unique():
                    new_categories.append('<UNKNOWN>')
                X[cat_column] = pd.Categorical(values=X[cat_column], categories=new_categories, ordered=False)
         
        if self.verbose:
            print(f'INFO: inverse transforming with the custom Ordinal Encoder finished correctly in' \
                  f' {round(time.time()-start_time, 3)} seconds.')
        
        return X