In [1]:
# Standard Libraries
import inspect

# Data Ingestion and Manipulation
import numpy as np
import pandas as pd

# Preprocessing
from sklearn.impute import KNNImputer

# Model Selection and Evaluation
from sklearn.model_selection import train_test_split, ParameterGrid
from sklearn.metrics import mean_squared_error, root_mean_squared_error, mean_absolute_error, pairwise_distances

# Machine Learning Models
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestRegressor


# Display options for pandas
pd.set_option('display.max_rows', 1000)
pd.set_option('display.max_columns', 1000)
pd.set_option('display.width', 1000)


#### GridSearchImputer

This class allows for the use of supervised and unsupervised models, it's main purpose is to impute data using supervised or unsupervised data

In [2]:
class GridSearchImputer:
    """
    A grid search-based imputation class for finding optimal hyperparameters 
    to impute missing data using supervised or unsupervised methods.

    Parameters:
    ----------
    imputer_dict : dict
        Dictionary containing:
          - `param_grid`: Dictionary of hyperparameters for grid search.
          - `cost_args`: Dictionary containing:
              - `call`: Callable cost function.
              - `kwargs`: Additional arguments for the cost function.
    imputer : object, optional
        An imputer or algorithm instance (e.g., KNNImputer, RandomForestRegressor).
    task : str, optional, default="regression"
        Specifies the type of task ('regression' or 'classification').

    Attributes:
    ----------
    param_grid : dict
        Dictionary of hyperparameters for grid search.
    cost_function : callable
        The cost function used to evaluate imputation performance.
    best_score : float
        The best score achieved during grid search.
    best_params : dict
        The hyperparameters corresponding to the best score.
    best_imputer : object
        The best imputer model after grid search.
    results : list
        List of results containing parameter combinations and their respective scores.

    Notes:
    -----
    - For regression tasks, the default cost function is `mean_squared_error`.
    - For classification tasks, the default cost function is `accuracy_score`.
    """
    def __init__(self, imputer_dict, imputer=None, task="regression"):
        # Extract param_grid and cost_args from the imputer_dict
        self.param_grid = imputer_dict.get('param_grid', {})
        self.cost_args = imputer_dict.get('cost_args', {})
        
        # Initialize attributes
        self.imputer = imputer
        self.task = task.lower()
        self.best_score = float('inf')
        self.best_params = None
        self.best_imputer = None
        self.results = []
        
        # Validate hyperparameters and cost function arguments
        self.validate_imputer_params(self.param_grid)  
        self.validate_cost_function(self.cost_args)  
    
        # Initialize the cost function with provided arguments
        cost_callable = self.cost_args.get('call')
        cost_kwargs = self.cost_args.get('kwargs', {})
        
        if not callable(cost_callable):
            raise ValueError(f"The cost function must be callable. Got: {cost_callable}")
        
        # Store the name of the callable loss function before wrapping in a lambda
        self.cost_callable_name = cost_callable.__name__
        self.cost_function = lambda true, pred: cost_callable(true, pred, **cost_kwargs)
    
        # Summarize configurations
        print('\n' + "=" * 40)
        print("GridSearchImputer Initialized with Configurations")
        print("=" * 40)
        
        print(f"Task: {self.task}")
        print(f"Imputer: {self.imputer.__name__ if self.imputer else 'None'}")
        
        print("\nGrid Search Parameters:")
        for key, value in self.param_grid.items():
            print(f"\t{key}: {value}")
        
        print(f"\nCost Function: {cost_callable.__name__ if cost_callable else 'None'}")
        print("Cost Function Parameters:")
        for key, value in cost_kwargs.items():
            print(f"\t{key}: {value}")
        
        print("=" * 40 + '\n')

    def validate_imputer_params(self, params):
        """
        Validate hyperparameters against the imputer's expected parameters.
        
        Parameters:
        ----------
        params : dict
            Dictionary of hyperparameters to validate.
        
        Raises:
        ------
        ValueError:
            If any parameter is invalid for the specified imputer.
        """
        self.learning_type = self._classify_model(self.imputer())
        self._validate_constructor_params(params)

    def validate_cost_function(self, cost_args):
        """
        Validate the cost function and its arguments.

        Parameters:
        ----------
        cost_args : dict
            Dictionary containing `call` (cost function) and `kwargs` (arguments).
        
        Raises:
        ------
        ValueError:
            If the cost function is not callable or if invalid arguments are provided.
        """
        cost_callable = cost_args.get('call')
        cost_kwargs = cost_args.get('kwargs', {})
        
        if not callable(cost_callable):
            raise ValueError(f"The cost function must be callable. Got: {cost_callable}")

        cost_signature = inspect.signature(cost_callable)
        valid_args = cost_signature.parameters.keys()
        
        invalid_args = [arg for arg in cost_kwargs if arg not in valid_args]
        if invalid_args:
            raise ValueError(f"Invalid arguments for the cost function: {', '.join(invalid_args)}. "
                             f"Valid arguments are: {', '.join(valid_args)}.")

    def _classify_model(self, model):
        """
        Classify the model as supervised or unsupervised based on inheritance.

        Parameters:
        ----------
        model : object
            An instance of the model to classify.

        Returns:
        -------
        str
            'supervised' or 'unsupervised'.

        Raises:
        ------
        ValueError:
            If the model does not belong to a recognized class.
        """
        is_imputer = '_BaseImputer' in str(model.__class__.mro())
        is_estimator = 'BaseEstimator' in str(model.__class__.mro())
        is_ensemble = 'BaseEnsemble' in str(model.__class__.mro())

        if is_estimator and is_imputer:
            return "unsupervised"
        elif (is_estimator or is_ensemble) and not is_imputer:
            return "supervised"
        else:
            raise ValueError(f"The imputer {self.imputer} must be a subclass of either '_BaseImputer' (for unsupervised imputers) or 'BaseEstimator' (for supervised models).")

    def _validate_constructor_params(self, params):
        """
        Validate parameters against the imputer's constructor.

        Parameters:
        ----------
        params : dict
            Dictionary of parameters to validate.

        Raises:
        ------
        ValueError:
            If any parameter is not valid for the imputer's constructor.
        """
        constructor_signature = inspect.signature(self.imputer)
        constructor_params = constructor_signature.parameters.keys()

        invalid_params = [param for param in params if param not in constructor_params]
        if invalid_params:
            raise ValueError(f"Invalid parameters: {', '.join(invalid_params)}. "
                             f"Valid parameters are: {', '.join(constructor_params)}.")

    def make_missing(self, df, missing_percentage=0.2):
        """
        Introduce missing values into a dataframe or series by randomly selecting entries.

        Parameters:
        ----------
        df : pd.DataFrame or pd.Series
            Input data to introduce missing values.
        missing_percentage : float, optional, default=0.2
            Fraction of total entries to set as missing.

        Returns:
        -------
        tuple:
            - pd.DataFrame: Data with missing values.
            - np.ndarray: Row indices of missing values.
            - np.ndarray: Column indices of missing values.
        """
        df_with_missing = df.copy().astype('float64')
        n_rows, n_cols = df_with_missing.shape
        n_total_values = n_rows * n_cols
        n_missing = int(n_total_values * missing_percentage)

        missing_indices = np.random.choice(n_total_values, size=n_missing, replace=False)
        row_indices, col_indices = np.unravel_index(missing_indices, df_with_missing.shape)

        df_with_missing.iloc[row_indices, col_indices] = np.nan
        return df_with_missing, row_indices, col_indices

    def fit(self, X, missing_percentage=0.2, round_data=False, export_csv=None, y=None):
        """
        Fit the imputer by performing grid search to find the best hyperparameters.
    
        Parameters:
        ----------
        X : pd.DataFrame
            The input training data.
        missing_percentage : float, optional, default=0.2
            Percentage of data to set as missing during training.
        round_data : bool, optional, default=False
            Whether to round imputed values to integers.
        export_csv : str or None, optional
            Path to export results to a CSV file. If None, results are not exported.
        y : pd.DataFrame or None, optional
            Target values for supervised imputers (used during fitting).
    
        Notes:
        -----
        - For supervised imputers, the target `y` is required for training.
        - For unsupervised imputers, missing values are introduced in the data for evaluation.
        - Results of the grid search are stored in `self.results`.
        - The best model is stored in `self.best_imputer`.
    
        Raises:
        ------
        TypeError:
            If `X` contains non-numeric data.
        """
        # Ensure all data columns are numeric
        if not np.issubdtype(X[X.columns].dtypes.to_numpy().flatten()[0], np.number):
            raise TypeError("GridSearchImputer only supports numeric data.")
    
        # Perform grid search over all parameter combinations
        for params in ParameterGrid(self.param_grid):
            # Instantiate the imputer with current parameters
            imputer_instance = self.imputer(**params)
            
            # Print progress current parameters set
            print(f"Testing parameters: {params}")
            
            if self.learning_type == 'supervised':
                # Check if target `y` is provided
                if y is None:
                    raise ValueError("Target values `y` are required for supervised imputers.")
                
                # Split data into training and validation sets
                X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
                
                # Train and predict using the imputer
                imputer_instance.fit(X_train, y_train)
                imputed_values = imputer_instance.predict(X_val)
                true_values = y_val
            elif self.learning_type == 'unsupervised':
                # Introduce missing values for evaluation
                X_missing, missing_rows, missing_cols = self.make_missing(X, missing_percentage)
                imputed_data = imputer_instance.fit_transform(X_missing)
                
                # Extract true and imputed values for the same indices
                imputed_values = imputed_data[missing_rows, missing_cols]
                true_values = X.values[missing_rows, missing_cols]
    
                # Flatten the extracted values for evaluation
                imputed_values = np.array(imputed_values).flatten()
                true_values = np.array(true_values).flatten()
            else:
                raise ValueError("Unsupported learning type. Expected 'supervised' or 'unsupervised'.")
    
            # Optionally round the imputed values
            if round_data:
                imputed_values = np.round(imputed_values).astype('int64')
    
            # Compute the loss metric
            loss = self.cost_function(true_values, imputed_values)
    
            # Update the best model based on the lowest loss
            if loss < self.best_score:
                self.best_score = loss
                self.best_params = params
                self.best_imputer = imputer_instance
    
                # Log the results for analysis
                self.results.append({
                    "Parameters": params,
                    "Loss": loss,
                })
    
                # Print progress for the current parameter set
                print('\n' + ">" * 40)
                print("New best set of parameters found:\n")
                for key, value in params.items():
                    print(f"\t{key}: {value}")
                print(f"\nWith {self.cost_callable_name}: {np.round(loss, decimals=5)}")
                print(">" * 40 + '\n')
                
        # Export results to a CSV file if specified
        if export_csv:
            pd.DataFrame(self.results).to_csv(export_csv, index=False)
            print(f"Results exported to {export_csv}")


    def transform(self, X, round_data=False, y_fit=None, x_predict=None):
        """
        Impute missing values in the test data using the best found imputer.
    
        Parameters:
        ----------
        X : pd.DataFrame
            The dataset with missing values to be imputed.
        round_data : bool, optional, default=False
            If True, round imputed values to integers.
        y_fit : pd.DataFrame or None, optional
            Target values for supervised imputers (used during fitting).
        x_predict : pd.DataFrame or None, optional
            Input features for supervised imputers (used during prediction).
    
        Returns:
        -------
        np.ndarray:
            The imputed data.
    
        Raises:
        ------
        ValueError:
            If the `fit` method has not been called to determine the best imputer.
        """
        # Ensure the model has been fitted
        if self.best_params is None:
            raise ValueError("The model has not been fitted yet. Please call 'fit' first.")
    
        # Instantiate a fresh imputer with the best parameters
        imputer_instance = self.imputer(**self.best_params)
    
        if self.learning_type == 'supervised':
            # Check that target and prediction data are provided
            if y_fit is None or x_predict is None:
                raise ValueError("Both `y_fit` and `x_predict` are required for supervised imputers.")
    
            imputer_instance.fit(X, y_fit)  # Fit the imputer
            imputed_data = imputer_instance.predict(x_predict)  # Predict and fill missing values
        elif self.learning_type == 'unsupervised':
            imputed_data = imputer_instance.fit_transform(X)  # Fit and transform (impute values directly)
        else:
            raise ValueError("Unsupported learning type. Expected 'supervised' or 'unsupervised'.")
    
        # Optionally round the imputed data
        if round_data:
            imputed_data = np.round(imputed_data).astype('int64')
    
        return imputed_data


In [3]:
from sklearn.metrics import root_mean_squared_error
# Example usage

# Sample data
X = pd.DataFrame({
    'A': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    'B': [5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
})

# Sample target with missing values
y = pd.DataFrame({
    'C': [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    'D': [9, 10, 11, np.nan, 13, 14, 15, np.nan, 17, 18]
})

# Define the parameter grid for RandomForestRegressor
imputer_dict = \
    {
    'param_grid' : \
    {
        'n_estimators': [10, 50, 100],
        'max_depth': [None, 5, 10],
        'min_samples_split': [2, 5],
        'min_samples_leaf': [1, 2]
    },
    'cost_args' : \
    {
        'call' : root_mean_squared_error,
        'kwargs' : {
        
            }
    }   
}
# Initialize the GridSearchImputer
grid_imputer = GridSearchImputer(imputer_dict, imputer=RandomForestRegressor, task='regression')

# Fit the imputer using the training data
grid_imputer.fit(X, y=y['C'], round_data=True)

# Select the rows where y['D'] is missing
x_predict = X[y['D'].isna()]  # Select corresponding rows in X where y['D'] is NaN

# Transform the test data using the best imputer found, and impute the missing values in y['D']
imputed_data = grid_imputer.transform(X, y_fit=y['C'], x_predict=x_predict, round_data=True)

# Output results
print("\nBest RandomForestRegressor parameters:")
print(grid_imputer.best_params)
print(f"Best {imputer_dict['cost_args']['call'].__name__}: {grid_imputer.best_score}")
print("\nImputed Data:\n", imputed_data)



GridSearchImputer Initialized with Configurations
Task: regression
Imputer: RandomForestRegressor

Grid Search Parameters:
	n_estimators: [10, 50, 100]
	max_depth: [None, 5, 10]
	min_samples_split: [2, 5]
	min_samples_leaf: [1, 2]

Cost Function: root_mean_squared_error
Cost Function Parameters:

Testing parameters: {'max_depth': None, 'min_samples_leaf': 1, 'min_samples_split': 2, 'n_estimators': 10}

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
New best set of parameters found:

	max_depth: None
	min_samples_leaf: 1
	min_samples_split: 2
	n_estimators: 10

With root_mean_squared_error: 0.0
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Testing parameters: {'max_depth': None, 'min_samples_leaf': 1, 'min_samples_split': 2, 'n_estimators': 50}
Testing parameters: {'max_depth': None, 'min_samples_leaf': 1, 'min_samples_split': 2, 'n_estimators': 100}
Testing parameters: {'max_depth': None, 'min_samples_leaf': 1, 'min_samples_split': 5, 'n_estimators': 10}
Testing parameters: {'max_depth': None, 

In [4]:
# Sample data
X = pd.DataFrame({
    'A': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    'B': [5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
})

# Sample target with missing values
y = pd.DataFrame({
    'C': [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
    'D': [np.nan, 10, 11, np.nan, 13, np.nan, 15, np.nan, 17, np.nan]
})

# Define the parameter grid for KNNImputer
imputer_dict = \
    {
    'param_grid' : {
        'n_neighbors': np.arange(1,20,1),  # Number of neighbors for KNN
        'weights': ['uniform', 'distance']
    },
    'cost_args' : \
    {
        'call' : mean_absolute_error,
        'kwargs' : {

            }
    }   
}
# Initialize the GridSearchImputer with KNNImputer
grid_imputer = GridSearchImputer(imputer_dict, imputer=KNNImputer, task='regression')

# Fit the imputer using the training data
grid_imputer.fit(X, missing_percentage=0.2, round_data=True)

# Transform the test data using the best imputer found, and impute the missing values in y['D']
imputed_data = grid_imputer.transform(y, round_data=True, x_predict=x_predict)

# Output results
print("\nBest KNNImputer parameters:")
print(grid_imputer.best_params)
print(f"Best {imputer_dict['cost_args']['call'].__name__}: {grid_imputer.best_score}")
print("\nImputed Data:\n", imputed_data)




GridSearchImputer Initialized with Configurations
Task: regression
Imputer: KNNImputer

Grid Search Parameters:
	n_neighbors: [ 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19]
	weights: ['uniform', 'distance']

Cost Function: mean_absolute_error
Cost Function Parameters:

Testing parameters: {'n_neighbors': 1, 'weights': 'uniform'}

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
New best set of parameters found:

	n_neighbors: 1
	weights: uniform

With mean_absolute_error: 2.75
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Testing parameters: {'n_neighbors': 1, 'weights': 'distance'}
Testing parameters: {'n_neighbors': 2, 'weights': 'uniform'}
Testing parameters: {'n_neighbors': 2, 'weights': 'distance'}

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
New best set of parameters found:

	n_neighbors: 2
	weights: distance

With mean_absolute_error: 2.25
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Testing parameters: {'n_neighbors': 3, 'weights': 'uniform'}

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>