In [47]:
import pandas as pd
import numpy as np
from sklearn.pipeline import make_pipeline
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.neighbors import KNeighborsRegressor, KNeighborsClassifier
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.preprocessing import RobustScaler, MinMaxScaler, StandardScaler, LabelEncoder
from scipy.stats import zscore

In [48]:
#@title preprocessing_classes
class Duplicates:
    def __init__(self, duplicates=True):
        self.duplicates = duplicates

    def handle(self, df):
        if self.duplicates:
            df.drop_duplicates(inplace=True, ignore_index=True)
        return df

class MissingValues:
    def __init__(self, missing_num=None, missing_categ=None):
        self.missing_num = missing_num
        self.missing_categ = missing_categ

    def handle(self, df, _n_neighbors=5):
        if self.missing_num or self.missing_categ:
            if df.isna().sum().sum() != 0:
                if self.missing_num:
                    df = self._handle_missing_num(df, _n_neighbors)
                if self.missing_categ:
                    df = self._handle_missing_categ(df, _n_neighbors)
        return df

    def _handle_missing_num(self, df, _n_neighbors):
        num_cols = df.select_dtypes(include=np.number).columns
        for col in num_cols:
            if self.missing_num in ['auto', 'knn']:  # Use KNN imputation
                imputer = KNNImputer(n_neighbors=_n_neighbors)
                df[col] = imputer.fit_transform(df[[col]])
                df[col] = df[col].round().astype('Int64')
        return df

    def _handle_missing_categ(self, df, _n_neighbors):
        cat_cols = set(df.columns) - set(df.select_dtypes(include=np.number).columns)
        for col in cat_cols:
            if self.missing_categ in ['auto', 'logreg', 'most_frequent']:
                if self.missing_categ == 'most_frequent':
                    strategy = self.missing_categ
                else:
                    strategy = 'constant'
                imputer = SimpleImputer(strategy=strategy)
                df[col] = imputer.fit_transform(df[[col]])
        return df

class Outliers:
    def __init__(self, method=None, threshold=2.5):
        self.method = method
        self.threshold = threshold

    def handle(self, df):
        if self.method == 'knn':  # Use KNN for imputation
            df = self._handle_knn(df)
        return df

    def _handle_knn(self, df):
        imputer = KNNImputer()
        df_imputed = pd.DataFrame(imputer.fit_transform(df), columns=df.columns)

        # Identify outliers using z-score
        z_scores = np.abs(zscore(df_imputed))
        outlier_indices = np.any(z_scores > self.threshold, axis=1)

        # If outliers are detected, apply a different imputation strategy
        if np.any(outlier_indices):
            df_outliers = df_imputed[outlier_indices]
            df_outliers.fillna(df_outliers.mean(), inplace=True)  # Replace NaNs with mean
            df_imputed.loc[outlier_indices] = df_outliers

        return df_imputed


class Adjust:
    def __init__(self, scaler=None, extract_datetime=False):
        self.scaler = scaler
        self.extract_datetime = extract_datetime

    def handle(self, df):
        if self.scaler:
            if self.scaler in ['minMax', 'standard', 'robust']:
                scaler = preprocessing.__getattribute__(self.scaler.capitalize()+'Scaler')()
                df[df.columns] = scaler.fit_transform(df[df.columns])
        if self.extract_datetime:
            df = self._convert_datetime(df)
        return df

    def _convert_datetime(self, df):
        cols = set(df.columns) ^ set(df.select_dtypes(include=np.number).columns)
        for col in cols:
            try:
                df[col] = pd.to_datetime(df[col], infer_datetime_format=True)
                if self.extract_datetime != False:
                    df = df.join(pd.to_datetime(df[col]).dt.__getattribute__(self.extract_datetime))
            except:
                pass
        return df

class EncodeCateg:
    def __init__(self, encode_categ=None):
        self.encode_categ = encode_categ

    def handle(self, df):
        if self.encode_categ:
            if self.encode_categ == 'auto':
                self._auto_encode(df)
            elif isinstance(self.encode_categ, list):
                for col in self.encode_categ:
                    if col in df.columns:
                        self._auto_encode(df, col)
        return df

    def _auto_encode(self, df, col=None):
        if col:
            if len(df[col].unique()) <= 10:
                df = pd.get_dummies(df, columns=[col], prefix=[col])
            else:
                le = LabelEncoder()
                df[col] = le.fit_transform(df[col])
        else:
            for col in df.select_dtypes(include='object'):
                if len(df[col].unique()) <= 10:
                    df = pd.get_dummies(df, columns=[col], prefix=[col])
                else:
                    le = LabelEncoder()
                    df[col] = le.fit_transform(df[col])
        return df


In [49]:
#@title master_class
class AutoDataCleaner:
    def __init__(self, duplicates=True, missing_strategy=None, outliers_method=None,
                 threshold=3, scaling_method=None, extract_datetime=False, encode_categorical=None):
        self.duplicates = duplicates
        self.missing_strategy = missing_strategy
        self.outliers_method = outliers_method
        self.threshold = threshold
        self.scaling_method = scaling_method
        self.extract_datetime = extract_datetime
        self.encode_categorical = encode_categorical

    def fit_transform(self, df):
        if self.duplicates:
            df = self.remove_duplicates(df)
        if self.missing_strategy:
            df = self.handle_missing_values(df)
        if self.outliers_method:
            df = self.handle_outliers(df)
        if self.scaling_method:
            df = self.adjust_scaling(df)
        if self.extract_datetime:
            df = self.convert_datetime(df)
        if self.encode_categorical:
            df = self.encode_categorical_features(df)
        return df

    def remove_duplicates(self, df):
        df.drop_duplicates(inplace=True, ignore_index=True)
        return df

    def handle_missing_values(self, df):
        num_cols = df.select_dtypes(include=np.number).columns
        cat_cols = df.select_dtypes(include='object').columns

        if 'auto' in self.missing_strategy:
            df[num_cols] = self.impute_missing_values(df[num_cols], strategy='numeric')
            df[cat_cols] = self.impute_missing_values(df[cat_cols], strategy='categorical')

        return df

    def impute_missing_values(self, df, strategy='numeric'):
        imputer = None
        if 'knn' in self.missing_strategy:
            if strategy == 'numeric':
                imputer = KNNImputer()
            elif strategy == 'categorical':
                imputer = KNeighborsClassifier()
        elif 'most_frequent' in self.missing_strategy:
            imputer = SimpleImputer(strategy='most_frequent')

        if imputer:
            df = pd.DataFrame(imputer.fit_transform(df), columns=df.columns)

        return df

    def handle_outliers(self, df):
        num_cols = df.select_dtypes(include=np.number).columns

        if 'knn' in self.outliers_method:
            df = self.handle_outliers_knn(df[num_cols])
        else:
            df = self.remove_outliers(df[num_cols])

        return df

    def handle_outliers_knn(self, df):
        imputer = KNNImputer()
        df_imputed = pd.DataFrame(imputer.fit_transform(df), columns=df.columns)

        z_scores = np.abs(zscore(df_imputed))
        outlier_indices = np.any(z_scores > self.threshold, axis=1)

        if np.any(outlier_indices):
            df_outliers = df_imputed[outlier_indices]
            df_outliers.fillna(df_outliers.mean(), inplace=True)
            df_imputed.loc[outlier_indices] = df_outliers

        return df_imputed

    def remove_outliers(self, df):
        if 'zscore' in self.outliers_method:
            z_scores = np.abs(zscore(df))
            df = df[(z_scores < self.threshold).all(axis=1)]
        elif 'iqr' in self.outliers_method:
            for col in df.columns:
                q1 = df[col].quantile(0.25)
                q3 = df[col].quantile(0.75)
                iqr = q3 - q1
                lower_bound = q1 - (self.threshold * iqr)
                upper_bound = q3 + (self.threshold * iqr)
                df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
        return df

    def adjust_scaling(self, df):
        scaler = None
        if 'minmax' in self.scaling_method:
            scaler = MinMaxScaler()
        elif 'standard' in self.scaling_method:
            scaler = StandardScaler()
        elif 'robust' in self.scaling_method:
            scaler = RobustScaler()

        if scaler:
            df[df.columns] = scaler.fit_transform(df[df.columns])
        return df

    def convert_datetime(self, df):
        cols = df.select_dtypes(include='datetime64').columns
        for col in cols:
            df[col] = pd.to_datetime(df[col], infer_datetime_format=True)
            if self.extract_datetime != False:
                df = df.join(pd.to_datetime(df[col]).dt.__getattribute__(self.extract_datetime))

        return df

    def encode_categorical_features(self, df):
        if 'auto' in self.encode_categorical:
            for col in df.select_dtypes(include='object'):
                if len(df[col].unique()) <= 10:
                    df = pd.get_dummies(df, columns=[col], prefix=[col])
                else:
                    le = LabelEncoder()
                    df[col] = le.fit_transform(df[col])
        return df
