In [1]:
import pandas as pd
import numpy as np
import itertools
from sklearn.preprocessing import Binarizer, QuantileTransformer, PowerTransformer
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, OrdinalEncoder
from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler, MaxAbsScaler
from scipy.stats import boxcox_normmax
from scipy.stats import boxcox
from scipy.special import boxcox1p
from pandas.api.types import CategoricalDtype

In [2]:
def encode_it(train_df,
              cols: list,
              method,
              test_df=None,
              cats='auto',
              target=None):

    train_df = train_df.copy()
    if test_df is not None:
        test_df = test_df.copy()

    for col in progress_bar(cols):

        if method == 'onehot':

            if test_df is not None:
                cats = [
                    sorted(
                        set(train_df[col].unique())
                        | set(test_df[col].unique()))
                ]

            encoder = OneHotEncoder(categories=cats,
                                    sparse=False,
                                    drop='if_binary',
                                    dtype=np.int64)

            encoder.fit(train_df[[col]])

            ohe_cols = encoder.get_feature_names([col])
            train_df.loc[:, ohe_cols] = encoder.transform(train_df[[col]])
            train_df.drop(columns=col, inplace=True)

            if test_df is not None:
                test_df[ohe_cols] = encoder.transform(test_df[[col]])
                test_df.drop(columns=col, inplace=True)

        if method == 'ordinal':
            if cats == 'auto':
                if train_df[col].dtype == pd.CategoricalDtype:
                    if train_df[col].cat.ordered:
                        categories = [list(train_df[col].cat.categories)]
                    else:
                        categories = cats
                else:
                    categories = cats
            else:
                categories = cats

            encoder = OrdinalEncoder(categories=categories, dtype=np.int64)

            train_df[col] = encoder.fit_transform(train_df[[col]])

            if test_df is not None:
                test_df[col] = encoder.transform(test_df[[col]])

        if method == 'label':
            encoder = LabelEncoder()
            train_df[col] = encoder.fit_transform(train_df[col])

            if test_df is not None:
                test_df[cols] = encoder.transform(test_df[col])

        if method == 'freq':
            values = train_df.groupby(col).size() / len(train_df)
            train_df[col] = train_df[col].apply(lambda x: values[x])

            if test_df is not None:
                for col in cols:
                    test_df[col] = test_df[col].apply(lambda x: values[x])

        if method == 'target':
            values = train_df.groupby(col)[target].mean()
            train_df[col] = train_df[col].apply(lambda x: values[x])

            if test_df is not None:
                for col in cols:
                    test_df[col] = test_df[col].apply(lambda x: values[x])

    if test_df is not None:
        return (train_df, test_df)
    else:
        return (train_df)

In [3]:
def scale_it(train_df, cols: list, method, test_df=None):

    train_df = train_df.copy()
    if test_df is not None:
        test_df = test_df.copy()

    scalers = {
        'standard': StandardScaler(),
        'robust': RobustScaler(),
        'minmax': MinMaxScaler(),
        'maxabs': MaxAbsScaler()
    }

    scaler = scalers[method]

    for col in progress_bar(cols):
        try:
            train_df[col] = scaler.fit_transform(train_df[[col]])
        except KeyError:
            print(f'{col} does not exist in train_df')
            continue
        try:
            if test_df is not None:
                test_df[col] = scaler.transform(test_df[[col]])
        except KeyError:
            print(f'{col} does not exist in test_df')
            continue

    if test_df is not None:
        return (train_df, test_df)
    else:
        return (train_df)

In [None]:
def transform_it(train_df, cols: list, method='power', test_df=None):

    train_df = train_df.copy()
    if test_df is not None:
        test_df = test_df.copy()

    if method == 'power':
        for col in progress_bar(cols):
            try:
                if test_df is not None:
                    if (train_df[col].min() > 0) and (test_df[col].min() > 0):
                        best_params = boxcox_normmax(train_df[col])
                        train_df[col] = boxcox(train_df[col], best_params)
                        test_df[col] = boxcox(test_df[col], best_params)
                        print(f'Box-Cox have been used on {col}')

                    elif (train_df[col].min() == 0) or (test_df[col].min() == 0):
                        best_params = boxcox_normmax(train_df[col] + 1)
                        train_df[col] = boxcox1p(train_df[col], best_params)
                        test_df[col] = boxcox1p(test_df[col], best_params)
                        print(f'Box-Cox1p have been used on {col}')
                        
                    else:
                        transformer = PowerTransformer(standardize=False,
                                                       method='yeo-johnson')
                        train_df[col] = transformer.fit_transform(train_df[[col]])
                        test_df[col] = transformer.transform(test_df[[col]])
                        print(f'Yeo-johnson have been used on {col}')
                        
                else:
                    if train_df[col].min() > 0:
                        best_params = boxcox_normmax(train_df[col])
                        train_df[col] = boxcox(train_df[col], best_params)
                        print(f'Box-Cox have been used on {col}')
                    
                    elif train_df[col].min() == 0:
                        best_params = boxcox_normmax(train_df[col] + 1)
                        train_df[col] = boxcox1p(train_df[col], best_params)
                        print(f'Box-Cox1p have been used on {col}')
                        
                    else:
                        transformer = PowerTransformer(standardize=False,
                                                       method='yeo-johnson')
                        train_df[col] = transformer.fit_transform(train_df[[col]])
                        print(f'Yeo-johnson have been used on {col}')

            except KeyError:
                print(f'{col} does not exist')
                continue

    if method == 'quantile':
        transformer = QuantileTransformer(output_distribution='normal')
        train_df[cols] = transformer.fit_transform(train_df[cols])

        if test_df is not None:
            test_df[cols] = transformer.transform(test_df[cols])

    if test_df is not None:
        return (train_df, test_df)
    else:
        return (train_df)

In [None]:
def cycle_it(df, col, max_val):
    df[col + '_sin'] = np.sin(2 * np.pi * df[col] / max_val)
    df[col + '_cos'] = np.cos(2 * np.pi * df[col] / max_val)
    df.drop(columns=col, inplace=True)
    return (df)

In [None]:
def corr_filter(df, target, border, sign='above'):

    corr_abs = df.corr().abs().sort_values(target, axis=0, ascending=False)

    if sign == 'above':
        cols = list(corr_abs.index[corr_abs[target] >= border])

    elif sign == 'below':
        cols = list(corr_abs.index[corr_abs[target] < border])
        
    cols.remove(target)

    print(f'''{len(cols)} out of {len(corr_abs)} numerical columns 
    have correlation with {target} {sign} |{border}|''')

    return (cols)

In [None]:
def useless_cols(df, percent=99.9):

    useless_cols = []
    for col in df.columns:
        counts = df[col].value_counts()
        freq = counts.iloc[0]
        if freq / len(df) * 100 > percent:
            useless_cols.append(col)
    return (list(useless_cols))