In [6]:
import sys
sys.path.append('..')

In [10]:
# Import Dependencies
import time
import warnings

import numpy as np
import pandas as pd
from pyod.models.cd import CD
from pyod.utils.data import get_outliers_inliers
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
from spellchecker import SpellChecker

from clean_package import CleanData

warnings.simplefilter(action='ignore', category=FutureWarning)

In [1]:
#FIXME: Insert time measurement feature to evaluate the performance of the function as a "printed" output to evaluate performance live (Identify Pivot opportunities)

class CleanData:
    def __init__(self, data: pd.DataFrame, na: np.nan):
        self.data = data
        self.na = na
    
    #!############################# # Memory Optimisation # ##############################

    class Memory:
        def __init__(self, data):
            self.data = data
        
        
        #* (1) Method 
        @classmethod
        def optimise_mem(cls, data: pd.DataFrame, verbose=True) -> pd.DataFrame:
            # Create a function to optimise the memory
            start_mem = data.memory_usage().sum() / 1024 ** 2
            numerics = ["int8", "int16", "int32", "int64", "float16", "float32", "float64"]
            for col in data.columns:
                col_type = data[col].dtypes
                if col_type in numerics:
                    # Retrieve the min and max values of a column
                    c_min = data[col].min()
                    c_max = data[col].max()
                    # ? Treating integer columns
                    if str(col_type)[:3] == "int":
                        if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                            data[col] = data[col].astype(np.int8)
                        elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                            data[col] = data[col].astype(np.int16)
                        elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                            data[col] = data[col].astype(np.int32)
                        elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                            data[col] = data[col].astype(np.int64)
                    # ? Treating float columns 
                    else:
                        if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                            data[col] = data[col].astype(np.float32)
                        elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                            data[col] = data[col].astype(np.float32)
                        else:
                            data[col] = data[col].astype(np.float64)
            # Returning the end megabytes calculation (reduction)
            end_mem = data.memory_usage().sum() / 1024 ** 2

            if verbose:
                print("Mem. usage decreased to {:.2f} Mb ({:.1}% reduction)".format(end_mem, 100 * ((start_mem - end_mem) / start_mem)))
            
            return data 
    
    
    
    
    
    #!############################# # Treating NA values subclass # ##############################
    
    class TreatNA:
        def __init__(self, data):
            self.data = data

        
        #* (1) Method 
        @classmethod
        def IdentifyNAs(cls, data: pd.DataFrame) -> pd.DataFrame:
            missing_values = [np.nan, 'missing', 'null', '', 'empty']
            # Find rows containing any of the missing values
            mask = data.apply(lambda row: any(str(val) in missing_values for val in row), axis=1)
            return data[mask]

        
        #* (2) Method
        @classmethod
        def complete_case_na(cls, data: pd.DataFrame) -> pd.DataFrame:
            # Include edge cases
            missing_values = ['missing', 'null', '', 'empty']

            # Filter the data to return CCA with edge cases
            return data[data.apply(lambda row: any(pd.isna(val) or str(val) in missing_values for val in row), axis=1)]

        
        #* (3) Method 
        @classmethod
        def drop_complete_case_na(cls, data: pd.DataFrame) -> pd.DataFrame:
            # Identify rows to drop and return the corresponding index value
            missing_values = ['missing', 'null', '', 'empty']
            rows_to_drop = data[data.apply(lambda row: any(pd.isna(val) or str(val) in missing_values for val in row), axis=1)].index

            # Drop rows
            cleaned_data = data.drop(index=rows_to_drop)
            return cleaned_data
        
        
        #* (4) Method 
        @classmethod
        def DataImpute(cls, data: pd.DataFrame, features: list, missing_values=np.nan, numeric_strategy='mean', string_strategy='constant', string_fill_value=None):
            """Apply univariate data imputation for numerical & categorical strategies. Suitable for MCAR cases (A variable is missing completely at random (MCAR) if the probability of being missing is the same for all the observations)

            Args:
                - data (pd.DataFrame)
                - features (list)
                - missing_values (the placeholder for the missing values): **Defaults to np.nan;  Can also be pd.NA, int, float,  or str. 
                - numeric_strategy (str, optional): _description_. Defaults to 'mean'; Numerical data imputation (strategy).
                - string_strategy (str, optional): _description_. Defaults to 'constant'. Categorical data imputation (strategy).
                - string_fill_value (_type_, optional): When strategy == “constant”, fill_value is used to replace all occurrences of missing_values. Defaults to None.

            Returns:
                _type_: pd.DataFrame
            """            
            imputed_data = data.copy()
            numeric_features = list(data.select_dtypes(include=np.number).columns)
            string_features = list(data.select_dtypes(include=object).columns)

            # Numeric imputation: Mean, Median, and Mode
            if any(feature in numeric_features for feature in features):
                numeric_imputer = SimpleImputer(strategy=numeric_strategy, missing_values=missing_values)
                imputed_data[numeric_features] = numeric_imputer.fit_transform(data[numeric_features])

            # String/object imputation: Custom (user based)
            if any(feature in string_features for feature in features):
                string_imputer = SimpleImputer(strategy=string_strategy, fill_value=string_fill_value, missing_values=missing_values)
                imputed_data[string_features] = string_imputer.fit_transform(data[string_features])

            return imputed_data
        
        
        #* (5) Method 
        @classmethod
        def MNAR(cls, data:pd.DataFrame , features:list) -> pd.DataFrame:
            """Missing of values is not at random (MNAR) if their being missing depends on information not recorded in the dataset. Transaction dataset = the values are missing if if we don't have transaction_number (NOTE: here we could have more the one independent variable)
            NOTE: **Independent = transaction_number; **dependent (in their occurance): the rest of the variables

            Args:
                data (_type_): pd.DataFrame / np.array (2d array)

            Returns:
                pd.DataFrame: This function will drop all corresponsing NA values from the dependent variables based on the Independent variable/s
            """            
            return data.loc[:, features].dropna()
        
        # NOTE: if the occurrance of missing values is dependent on a certain value within a class for example the in our dataset, girls will not disclose their weight in some sort of ages
        # - Spark beyond - Israel 
        # - Use logisitic regression to evaluate soft predictions for each of the variables.
        """Maybe what I can do is plot to features based on the project, dataset and purpose of the analysis.
            Based on that explore the feature that has missing values against other features and identify interaction of the boolean representation (is_na as 'hue')
            for the feature the indicated interaction, based on a user input using the **input** function + extracting the user input value as the feature independent
            variable and then fill in the gaps based on the requested with another **input** function"""
        @classmethod
        def logistic_regression_MAR_identifier(cls,df: pd.DataFrame, max_iter=1000) -> pd.DataFrame:
            # Identify columns containing NA/NaN values
            columns_with_na = df.columns[df.isna().any()].tolist()

            # Create NA flags columns
            flag_columns = {f'Flag {i+1}': df[col].isna().astype(int) for i, col in enumerate(columns_with_na)}

            # Vertically append the flags to the data frame
            df = pd.concat([df, pd.DataFrame(flag_columns)], axis=1)

            # Drop the columns that contain NA values
            df.drop(columns=columns_with_na, inplace=True)

            # Run logistic regression for evaluation
            results = []  # Initialize an empty list to store results
            for col in df.columns:
                X = pd.get_dummies(df.drop(columns=[col]), drop_first=True)
                y = df[col]

                # Check if y is binary/multi-class categorical for logistic regression
                if y.nunique() <= 2:
                    X_scaled = StandardScaler().fit_transform(X)
                    try:
                        lr = LogisticRegression(max_iter=max_iter, n_jobs=-1).fit(X_scaled, y)
                        # Collect coefficients along with the column name
                        for idx, coef in enumerate(lr.coef_[0]):
                            results.append({'Column': col, 'Feature': X.columns[idx], 'Coefficient': float(coef)})
                    except ValueError as e:
                        print(f"Error fitting model with target {col}: {e}")
                else:
                    print(f"Skipping '{col.upper()}' - not suitable for logistic regression.")

            # Convert results to DataFrame for easier analysis and return it
            results_df = pd.DataFrame(results)

            # Specify data types for each column
            results_df = results_df.astype({'Column': 'str', 'Feature': 'str', 'Coefficient': 'float'})

            return results_df




    #!############################# # Treat Duplicated Values Class # ##############################

    class FindTreatDuplicates:
        def __init__(self, data):
            self.data = data
        
        
        #* (1) Method 
        @classmethod
        def find_duplicates(cls, data: pd.DataFrame, subset=None, identify_all=False) -> pd.DataFrame:
            """Idenfify duplicated values in DataFrame. 
            Args:
                data (pd.DataFrame): pd.DataFrame
                subset (list | pd.Series): A list of features OR a singular searies. Default = None (return duplicates for the intire dataset).
                identify_all (bool, optional): If 'first' specified, then return only the first instances of the duplicated values. Defaults to False (identify all duplicated values). If 'last' return only the last instances of the duplicated values. Defaults to False (identify all duplicated values)
            """
            return data[data.duplicated(subset=subset, keep=identify_all)]            
        
        
        #* (2) Method 
        @classmethod
        def drop_duplicates(cls, data: pd.DataFrame, subset=None, identify_all='first') -> pd.DataFrame:
            """Drop duplicated values in DataFrame. 
            Args:
                data (pd.DataFrame): pd.DataFrame
                subset (list | pd.Series): A list of features OR a singular searies. Default = None (applies to all features).
                identify_all (bool, optional): Defaults to 'first' (drop all duplicated values but keep the 'first' instances); If 'last' return only the last instances of the duplicated values; If False = drop all duplicates.
            """
            return data.drop_duplicates(subset=subset, keep=identify_all, inplace=True)       


    

    #!############################# # Find & Treat Text Typos # ##############################

    class TextTypos:
        def __init__(self, data: pd.DataFrame):
            self.data = data
        
        #* (1) Method
        @classmethod
        def strip_and_lower_strings(cls, data: pd.DataFrame) -> pd.DataFrame: 
            for col in data.select_dtypes(exclude="number").columns:
                data[col] = data[col].str.lower()
                if data[col].str.startswith(" ").any() or data[col].str.endswith(" ").any():
                    data[col] = data[col].str.strip()
            return data

        
        #* (2) Method 
        @classmethod
        def object_to_numeric(cls,df:pd.DataFrame, features: list) -> pd.DataFrame:
            for col in df[features].columns:
                df[col] = pd.to_numeric(df[col], errors='coerce')
            return df[col]
        
        
        #* (3) Method 
        @classmethod
        def correct_word(cls, word: str) -> str:
            spell = SpellChecker()
            return spell.correction(word)
        
        
        #* (4) Method 
        @classmethod
        def correct_sentence(cls, strings:str) -> str:
            words = strings.split(' ')  # Split the string into words
            corrected_words = [cls.correct_word(word) for word in words]  # Apply correction to each word
            return ' '.join(corrected_words)  


    

    #!############################# # Find & Treat Anomalies # ##############################
    #FIXME: finalise Anomalies class -> Create a function to find categorical Anomalies with value_counts(normalise=True). 

    class Anomalies:
        def __init__(self, data):
            self.data = data

        
        #* (1) Method
        @classmethod # class method for date anomalies
        def find_date_anomalies(cls, data:pd.DataFrame, date_column: str, identify_by='month'):
            """_summary_

            Args:
                data (pd.DataFrame): _description_
                date_column (str): _description_
                identify_by (str, optional): _description_. Defaults to 'month'.

            Returns:
                _type_: _description_
            """            

            if identify_by == 'month':
                data['year'] = data[date_column].dt.year
                data['month'] = data[date_column].dt.month
                eval_data = data.groupby(['year', 'month']).size().reset_index(name="days_count")
                return eval_data.loc[eval_data['days_count'] < 28]
            
            elif identify_by == 'year':
                # Create year & month features
                data['month'] = data[date_column].dt.month
                data['year'] = data[date_column].dt.year
                eval_data = data.groupby(['year', 'month']).size().reset_index(name="days_count")
                year_count = eval_data.groupby('year')['days_count'].sum().reset_index()
                year_missing = year_count[year_count['days_count'] < 365]

                # Print statements for missing days in each year
                for index, row in year_missing.iterrows():
                    missing_days = 365 - row['days_count']
                    print(f"Year {row['year']} is missing {missing_days} days ({round(missing_days/30, 2)} month/s)")
        
        
        #* (2) Method
        @classmethod
        def nonlinear_outliers_influencers_knn(cls, data: pd.DataFrame, features: list, neighbors_fraction: float = 0.1, contamination='auto', center_measure='mean'):
            """Detects outliers in a dataset based on nonlinear methods and KNN.

            Args:
                data (pd.DataFrame): The dataset to analyze.
                features (list): List of features to consider for outlier detection.
                neighbors_fraction (float, optional): Fraction of dataset size to use as neighbors. Defaults to 0.1.
                contamination (str, optional): Method for calculating contamination ('auto', '3std'). Defaults to 'auto'.
                center_measure (str, optional): Central distribution measure to use ('mean' or 'median'). Defaults to 'mean'.

            Returns:
                pd.DataFrame: DataFrame of outliers.
            """    
            
            if contamination == 'auto': 
                # Start the timer
                start_time = time.time()
                
                # Calculate the number of neighbors based on a fraction of the dataset size
                n_neighbors = max(1, int(len(data) * neighbors_fraction))

                # Use Local Outlier Factor for outlier detection (contamination auto = 0.1)
                clf = LocalOutlierFactor(n_neighbors=n_neighbors, contamination='auto')
                X = data[features]
                # Fit the model and predict outliers (-1 for outliers, 1 for inliers)
                y_pred = clf.fit_predict(X)
                
                # Filter outliers
                X['outlier'] = y_pred
                outliers = X[X['outlier'] == -1]
                
                # End the timer
                end_time = time.time()
                # Calculate the elapsed time
                elapsed_time = end_time - start_time
                
                # Convert elapsed time to milliseconds
                elapsed_time_ms = elapsed_time * 1000

                # Print the elapsed time in milliseconds
                print("Elapsed time:", elapsed_time_ms, "milliseconds")
                print("n_nighbors:", n_neighbors)
                return outliers.drop('outlier', axis=1)
            
            elif contamination == '3std':
                # Start the timer
                start_time = time.time()
                
                # Calculate the mean and standard deviation of features
                if center_measure == 'mean':
                    centers = data[features].mean().values
                    spreads = data[features].std().values
                elif center_measure == 'median':
                    centers = data[features].median().values
                    spreads = data[features].std().values  
                        
                # Calculate contamination -> return 3 STD from the mean +/-
                contamination_values = []
                for i, spread in enumerate(spreads):
                    if center_measure == 'mean':
                        outlier_mask = (data.iloc[:, i] < centers[i] - 3 * spread) | (data.iloc[:, i] > centers[i] + 3 * spread)
                    elif center_measure == 'median':
                        outlier_mask = (data.iloc[:, i] < centers[i] - 3 * spread) | (data.iloc[:, i] > centers[i] + 3 * spread)
                    contamination_values.append(data[outlier_mask].shape[0] / data.shape[0])
                        
                # Return contamination
                contamination = np.median(contamination_values)
                
                # Calculate the number of neighbors based on a fraction of the dataset size
                n_neighbors = max(1, int(len(data) * neighbors_fraction))

                # Use Local Outlier Factor for outlier detection (contamination auto = 3 STD away from the mean +/-) 
                clf = LocalOutlierFactor(n_neighbors=n_neighbors, contamination=contamination)
                X = data[features]
                # Fit the model and predict outliers (-1 for outliers, 1 for inliers)
                y_pred = clf.fit_predict(X)
                
                # Filter outliers
                X['outlier'] = y_pred
                outliers = X[X['outlier'] == -1]
                
                # End the timer
                end_time = time.time()
                # Calculate the elapsed time
                elapsed_time = end_time - start_time
                
                # Convert elapsed time to milliseconds
                elapsed_time_ms = elapsed_time * 1000

                # Print the elapsed time in milliseconds
                print("Elapsed time:", elapsed_time_ms, "milliseconds")
                print("n_nighbors:", n_neighbors)
                return outliers.drop('outlier', axis=1)
            else:
                raise ValueError("Invalid value for contamination. Please provide 'auto' or '3std'.")

        
        #* (3) Method
        @classmethod
        def linear_outliers_influencers(cls,data:pd.DataFrame ,features:list, center_measure='mean'):
            """This function align for linear datasets to explore outliers using Cook's D (distance based evaluation). A Cook’s result > 1 = Significant influence, while Cook’s D > 0.5 is worth investigating. 

            Args:
                data (pd.DataFrame): The dataset to analyze.
                features (list): List of features to consider for outlier detection.
                center_measure (str, optional): Central distribution measure to use ('mean' or 'median'). Defaults to 'mean'.

            Returns:
                pd.DataFrame: DataFrame of outliers.
            """            
            
            # Calculate the mean and standard deviation of features
            if center_measure == 'mean':
                centers = data[features].mean().values
                spreads = data[features].std().values
            elif center_measure == 'median':
                centers = data[features].median().values
                spreads = data[features].std().values  
                        
            # Calculate contamination -> return 3 STD from the mean +/-
            contamination_values = []
            for i, spread in enumerate(spreads):
                if center_measure == 'mean':
                    outlier_mask = (data.iloc[:, i] < centers[i] - 3 * spread) | (data.iloc[:, i] > centers[i] + 3 * spread)
                elif center_measure == 'median':
                    outlier_mask = (data.iloc[:, i] < centers[i] - 3 * spread) | (data.iloc[:, i] > centers[i] + 3 * spread)
                contamination_values.append(data[outlier_mask].shape[0] / data.shape[0])
                        
            # Return contamination
            contamination = np.median(contamination_values)

            # Splitting to X & Y columns
            X = data[features]

            # Instantiate the Cook's D evaluator
            cooks_D = CD(contamination=contamination)
            # predict outliers
            cooks_D.fit(X)

            # predict outliers & inliners (bool array = 1/0)
            pred = cooks_D.predict(X, return_confidence=True)
            # We have to transpose the dataset to match the dataset
            df = pd.DataFrame(pred).T
            df = df.rename(columns={0: "predictions", 1: "confidence"})

            X_array = X.to_numpy() # Transforming to numpy array to leverage get_outliers_inliers
            y_array = df["predictions"]   
                
            # Extracting inliners and outliers
            X_outliers, X_inliners = get_outliers_inliers(X_array, y_array)

            # Result outliers
            return pd.DataFrame(X_outliers, columns=X.columns)

NameError: name 'pd' is not defined