Author: Ahmed Sobhi - Amr Ahmed

Department: Data Science

Created_at: 2023-07-07

Objective: Fix Skewness

In [None]:
import os
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
pd.set_option('display.max_rows', 500)

import numpy as np
import matplotlib.pyplot as plt
import math

from scipy import stats
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, TransformerMixin

In [None]:
class SkewnessTransformer(BaseEstimator, TransformerMixin):
    """
        BaseEstimator: 
            - This is a base class for all estimators in scikit-learn.
            - It provides default implementations for the get_params() and set_params() methods.

        TransformerMixin: 
            - This is a mixin class for all transformers in scikit-learn.
            - It provides default implementations for the fit_transform() and fit() methods.

        The SkewnessTransformer class itself has the following parameters:
            - method_dict (default: None): 
                - This parameter is a dictionary that maps the skewness reduction method to a list of feature names or indices. 
                - The keys of the dictionary represent the skewness reduction methods (e.g., 'log', 'sqrt', 'boxcox'), and the values are lists of feature names or indices that should undergo the corresponding transformation.

        The SkewnessTransformer class has the following methods:
            - fit(X, y=None): 
                - This method is called to fit the transformer on the input data X. It calculates and stores the necessary parameters for each skewness reduction method specified in method_dict.

            - transform(X): 
                - This method applies the skewness reduction transformation to the input data X based on the parameters learned during the fitting process. It returns the transformed feature matrix.

            - _get_feature_indices(X, feature_names): 
                - This is a helper method that takes the feature matrix X and a list of feature names as input.
                - It returns the corresponding feature indices based on the feature names. 
                - This method is used internally in the fit() and transform() methods to handle feature names and indices correctly.
    """

    def __init__(self, skew_limit=0.8):
        self.skew_limit = skew_limit
        self.method_dict = {}
        self.lambda_dict = {}

    def fit(self, X, y=None):
        self.method_dict = self.extracrt_recommeneded_features(X)

        for method, features in self.method_dict.items():
            if method == 'boxcox':
                _, self.lambda_dict[method] = stats.boxcox(
                    X.iloc[:, self._get_feature_indices(X, features)].values.ravel())

            elif method == 'yeojohnson':
                _, self.lambda_dict[method] = stats.yeojohnson(
                    X.iloc[:, self._get_feature_indices(X, features)].values.ravel())

        return self

    def transform(self, X):
        X_transformed = X.copy()
        for method, features in self.method_dict.items():
            indices = self._get_feature_indices(X, features)
            if method == 'log':
                # Apply log transformation to the specified features
                X_transformed.iloc[:, indices] = np.log1p(X_transformed.iloc[:, indices])
            elif method == 'sqrt':
                # Apply square root transformation to the specified features
                X_transformed.iloc[:, indices] = np.sqrt(X_transformed.iloc[:, indices])
            elif method == 'boxcox':
                # Apply Box-Cox transformation to the specified features
                X_transformed.iloc[:, indices] = stats.boxcox(X_transformed.iloc[:, indices], lmbda=self.lambda_dict[method])
            elif method == 'yeojohnson':
                X_transformed.iloc[:, indices] = stats.yeojohnson(X_transformed.iloc[:, indices], lmbda=self.lambda_dict[method])
            elif method == 'cube':
                # Apply Cube transformation to the specified features
                X_transformed.iloc[:, indices] = np.cbrt(X_transformed.iloc[:, indices])
        return X_transformed

    def extracrt_recommeneded_features(self, X):
        # Select only columns with high skewness
        skew_col_lst = (
            X.skew()
            [X.skew().abs().sort_values(ascending=False) > self.skew_limit]
            .index.tolist()
        )

        methods_lst = []

        for feature in skew_col_lst:

            # Extract recommend fix method
            method = self.recommend_skewness_reduction_method(X[feature])

            methods_lst.append(method)

        # Convert to dictionary, where method is key, and values are corresponding features.
        method_dict = {}
        for index, method in enumerate(methods_lst):
            # Check if this is new method, then create a new one.
            if method not in method_dict:
                method_dict[method] = [skew_col_lst[index]]
            # Else Append to current features.
            else:
                method_dict[method].append(skew_col_lst[index])

        return method_dict

    def recommend_skewness_reduction_method(self, feature: pd.Series) -> str:
        """
            Returns a recommended skewness fix method for input feature.

            Input:
              feature: pd.Series, represent input feature data series, which we want to get recommend transformation method for it.

            Output:
              String, represent recomment transformation method for input feature.
        """

        # Create a dict to store all values of skewness
        skewness_dict = {}

        # Apply logarithmic transformation
        transformed_log = np.log(feature + 1e-10)
        skewness_log = stats.skew(transformed_log)
        # Add this method skewness
        skewness_dict['log'] = skewness_log

        # Apply square root transformation
        transformed_sqrt = np.sqrt(feature)
        skewness_sqrt = stats.skew(transformed_sqrt)
        # Add this method skewness
        skewness_dict['sqrt'] = skewness_sqrt

        # Check if data contains negative values, then don't use
        if (feature.values < 0).any() or (feature.values == 0).any():
            # Apply yeojohnson transformation
            transformed_yeojohnson, lambda_ = stats.yeojohnson(feature)
            skewness_yeojohnson = stats.skew(transformed_yeojohnson)
            # Add this method skewness
            skewness_dict['yeojohnson'] = skewness_yeojohnson

        else:
            # Apply Box-Cox transformation
            transformed_boxcox, lambda_ = stats.boxcox(feature)
            skewness_boxcox = stats.skew(transformed_boxcox)
            # Add this method skewness
            skewness_dict['Box-Cox'] = skewness_boxcox

        # Apply cube root transformation
        transformed_cbrt = np.cbrt(feature)
        skewness_cbrt = stats.skew(transformed_cbrt)
        # Add this method skewness
        skewness_dict['cube'] = skewness_cbrt

        # Extract the optimal method where if skewness if smallest
        return min(skewness_dict, key=lambda y: abs(skewness_dict[y]))

    def _get_feature_indices(self, X, feature_names):
        # Helper method to retrieve the indices of the specified feature names
        if isinstance(X, pd.DataFrame):
            return [X.columns.get_loc(feature_name) for feature_name in feature_names if feature_name in X.columns]
        elif isinstance(X, np.ndarray):
            return [feature_idx for feature_idx, feature_name in enumerate(X[0]) if feature_name in feature_names]
        else:
            raise ValueError("Unsupported input type. Expected Pandas DataFrame or NumPy array.")