In [1]:
'''Import the packages needed to run'''
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer
from scipy.stats import normaltest
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
import nbformat
from nbconvert import PythonExporter
import pandas as pd

In [None]:
class data_preprocessing():
    def __init__(self,df):
        self.df =df

    def data_pipeline(self):
        """
        This function takes a dataset and identifies string or object data types which need to be encoded,
        and leaves the other data types which are numerical as is.
        """
        df=self.df
        
        # Identify string or object data types
        cat_vars = df.select_dtypes(include='object').columns.tolist()

        # If there are no string or object data types, print a message and return the original dataframe
        if len(cat_vars) == 0:
            print('No string or object data types found.')
            return df

        # If there are string or object data types, encode them using LabelEncoder or OneHotEncoder
        for col in cat_vars:
            # Check if column has a unique identifier
            if len(df[col].unique()) == df[col].count():
                print(f"Skipping column {col} as it has a unique identifier.")
                continue
            if len(df[col].unique()) > 2:
                # If number of unique values is greater than 2, use one-hot encoding
                ohe = OneHotEncoder(sparse=False, handle_unknown='ignore')
                encoded = ohe.fit_transform(df[[col]].fillna('Unknown'))
                new_cols = [f"{col}_{cat}" for cat in ohe.categories_[0]]
                df[new_cols] = encoded
                df.drop(col, axis=1, inplace=True)
            else:
                # If number of unique values is 2 or less, use label encoding
                le = LabelEncoder()
                df[col] = le.fit_transform(df[col].fillna('Unknown'))

        # Return the processed dataframe
        return df


    def nan_pipeline(self):
        """
        This function takes a dataset and decides how to handle NaN values without affecting the statistical 
        properties too much, and gives a suggested output with a statement and the new dataframe.
        """
        df=self.data_pipeline()

        
        # Identify columns with NaN values
        nan_cols = df.columns[df.isna().any()].tolist()

        # If there are no NaN values, print a message and return the original dataframe
        if len(nan_cols) == 0:
            print('No NaN values found.')
            return df

        # If there are NaN values, impute them using IterativeImputer
        imputer = IterativeImputer(max_iter=100, random_state=342)
        imputed = imputer.fit_transform(df)
        df_imputed = pd.DataFrame(imputed, columns=df.columns)

        # Check for normality of each variable before and after imputation
        normality_tests_before = []
        for col in df.columns:
            # Check if column has numeric data type
            if not np.issubdtype(df[col].dtype, np.number):
                continue
            normal_test, p = normaltest(df[col].dropna())
            normality_tests_before.append((col, normal_test, p))
        normality_df_before = pd.DataFrame(normality_tests_before, columns=['column', 'normal_test_before', 'p_value_before'])

        normality_tests_after = []
        for col in df.columns:
            # Check if column has numeric data type
            if not np.issubdtype(df[col].dtype, np.number):
                continue
            normal_test, p = normaltest(df_imputed[col].dropna())
            normality_tests_after.append((col, normal_test, p))
        normality_df_after = pd.DataFrame(normality_tests_after, columns=['column', 'normal_test_after', 'p_value_after'])

        # Calculate the difference in normality before and after imputation
        normality_df = pd.merge(normality_df_before, normality_df_after, on='column')
        normality_df['normal_test_diff'] = normality_df['normal_test_before'] - normality_df['normal_test_after']

        # Suggest the best course of action based on the difference in normality before and after imputation
        if (normality_df['normal_test_diff'] < -1000).any():
            print('Imputed NaN values have greatly affected normality. Consider a different approach.')
        elif (normality_df['normal_test_diff'] < 0).any():
            print('Imputed NaN values have slightly affected normality. Proceed with caution.')
        else:
            print('Imputed NaN values have not affected normality significantly.')

        # Return the imputed dataframe
        return df_imputed



    def handle_missing_values(self):
        """
        This function takes a dataset and determines the best way to handle missing values based on multiple tests.
        """
        
        df=self.nan_pipeline()


        # Check for missing values
        if df.isnull().sum().sum() == 0:
            print('No missing values found.')
            return df

        # Check for binary columns and replace missing values with the mode for binary columns
        binary_vars = []
        for col in df.columns:
            if df[col].nunique() == 2:
                binary_vars.append(col)
                df[col].fillna(df[col].mode()[0], inplace=True)
                df[col] = df[col].astype('category').cat.codes.astype('int8')
        if binary_vars:
            print(f"Binary variables replaced NaNs with mode: {binary_vars}")

        # Check for string columns with ID-like values and leave them as is
        id_vars = []
        for col in df.select_dtypes(include='object').columns:
            if df[col].apply(lambda x: x.isnumeric() or x.isdigit()).all():
                id_vars.append(col)
        if id_vars:
            print(f"Columns with unique identifiers: {id_vars}")

        # Check for normality of each variable
        normality_tests = []
        for col in df.columns:
            # Check if column has numeric data type
            if not np.issubdtype(df[col].dtype, np.number):
                continue
            normal_test, p = normaltest(df[col].dropna())
            normality_tests.append((col, normal_test, p))
        normality_df = pd.DataFrame(normality_tests, columns=['column', 'normal_test', 'p_value'])

        # Identify variables with non-normal distributions
        non_normal_vars = normality_df[normality_df['p_value'] < 0.05]['column']

        # If all variables are normal, replace missing values with mean
        if len(non_normal_vars) == 0:
            imputer = SimpleImputer(strategy='mean')
            df = pd.DataFrame(imputer.fit_transform(df), columns=df.columns)
            print('Missing values replaced with mean for all variables.')
        else:
            # Replace missing values with median for non-normal variables
            if len(non_normal_vars) > 0:
                imputer = SimpleImputer(strategy='median')
                df[non_normal_vars] = imputer.fit_transform(df[non_normal_vars])
                print('Missing values replaced with median for non-normal variables:', non_normal_vars.tolist())

            # Replace missing values with mode for categorical variables
            cat_vars = df.select_dtypes(include=['category']).columns.tolist()
            if len(cat_vars) > 0:
                imputer = SimpleImputer(strategy='most_frequent')
                df[cat_vars] = imputer.fit_transform(df[cat_vars])
                print('Missing values replaced with mode for categorical variables:', cat_vars)

        # Return the processed dataframe
        return df

