In [1]:
# Chunk 1: Imports

import numpy as np
import pandas as pd

# Enable IterativeImputer (experimental in scikit-learn)
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import SimpleImputer, KNNImputer, IterativeImputer

from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.model_selection import ParameterGrid   # helps to iterate over param_grid


In [2]:
# Chunk 2: Define a function to perform random masking on numeric columns

def create_masked_dataframe(df, mask_ratio=0.1, random_state=42):
    """
    Randomly masks 'mask_ratio' fraction of numeric values in df.
    Returns:
        df_masked (pd.DataFrame): the masked version of the dataframe
        mask_info (dict): which cells are masked per column (boolean array)
    """
    np.random.seed(random_state)
    
    # Copy the original
    df_masked = df.copy()
    
    # We only consider numeric columns for demonstration
    numeric_cols = df_masked.select_dtypes(include=[np.number]).columns.tolist()
    
    mask_info = {}
    
    for col in numeric_cols:
        non_missing_indices = df_masked.index[df_masked[col].notna()]  # valid indices
        n_to_mask = int(len(non_missing_indices) * mask_ratio)
        # choose random subset to mask
        masked_indices = np.random.choice(non_missing_indices, size=n_to_mask, replace=False)
        
        # build a boolean mask for each column
        mask_col = np.zeros(df_masked.shape[0], dtype=bool)
        mask_col[df_masked.index.get_indexer(masked_indices)] = True
        mask_info[col] = mask_col
        
        # apply mask
        df_masked.loc[masked_indices, col] = np.nan
    
    return df_masked, mask_info


In [3]:
# Chunk 3: Grid search function to evaluate MSE/MAE for various imputation methods and hyperparameters

def grid_search_imputation(df, param_grid, mask_ratio=0.1, random_state=42):
    """
    Perform a custom grid search over imputation methods + hyperparameters.
    We will:
      1) Randomly mask a fraction of numeric data
      2) For each param setting, impute missing values
      3) Compute MSE and MAE on the masked positions
      4) Compare and return the results sorted by MSE
    
    Parameters:
    df (pd.DataFrame): The original dataframe
    param_grid (list of dict): Like in scikit-learn (a list of dicts), each dict has keys
                               e.g. "imputer": [SimpleImputer()],
                                     "imputer__strategy": ["mean", "median"]
                               etc.
    mask_ratio (float): fraction of numeric values to mask
    random_state (int): random seed
    
    Returns:
    pd.DataFrame with columns [params, MSE, MAE], sorted by MSE ascending
    """
    
    # 1) Create the masked dataframe and record which positions were masked
    df_original = df.copy()
    df_masked, mask_info = create_masked_dataframe(df_original, mask_ratio, random_state)
    
    # 2) Figure out numeric columns
    numeric_cols = df_original.select_dtypes(include=[np.number]).columns.tolist()
    if len(numeric_cols) == 0:
        raise ValueError("No numeric columns found in the dataset.")
    
    # 3) Convert param_grid into an iterable of all param combinations
    #    ParameterGrid will yield one dictionary per combination
    all_params = list(ParameterGrid(param_grid))
    
    results = []
    best_mse = float("inf")
    best_params = None
    
    # 4) Loop over each hyperparameter setting
    for param_dict in all_params:
        # We expect param_dict to define something like:
        # {"imputer": SomeImputer, "imputer__some_param": value, ...}
        
        # Identify which 'imputer' we are dealing with
        # Then set parameters accordingly
        imputer_class = param_dict.get("imputer", None)
        if imputer_class is None:
            raise ValueError("Each param_dict must include 'imputer' key with an imputer object.")
        
        # Create a clone of the imputer object to set parameters
        # (Alternatively, we can just directly assign if it's small.)
        imputer = imputer_class
        
        # For each key in param_dict, if it starts with "imputer__",
        # we set that parameter to the imputer
        for k, v in param_dict.items():
            if k.startswith("imputer__"):
                param_name = k.split("__", 1)[1]
                setattr(imputer, param_name, v)
        
        # 5) Fit-transform on the masked data
        imputed_array = imputer.fit_transform(df_masked[numeric_cols])
        imputed_df = pd.DataFrame(imputed_array, columns=numeric_cols, index=df_masked.index)
        
        # 6) Compute MSE & MAE only on the masked positions
        method_mse_list = []
        method_mae_list = []
        
        for col in numeric_cols:
            col_mask = mask_info[col]  # True = masked positions
            true_values = df_original.loc[col_mask, col]
            pred_values = imputed_df.loc[col_mask, col]
            
            if len(true_values) > 0:
                mse = mean_squared_error(true_values, pred_values)
                mae = mean_absolute_error(true_values, pred_values)
                method_mse_list.append(mse)
                method_mae_list.append(mae)
        
        avg_mse = np.mean(method_mse_list) if method_mse_list else None
        avg_mae = np.mean(method_mae_list) if method_mae_list else None
        
        # Track best
        if avg_mse is not None and avg_mse < best_mse:
            best_mse = avg_mse
            best_params = param_dict
        
        # Add to results
        results.append({
            "params": param_dict,
            "MSE": avg_mse,
            "MAE": avg_mae
        })
    
    # 7) Convert results to DataFrame and sort by MSE
    results_df = pd.DataFrame(results).sort_values(by="MSE", ascending=True, na_position="last")
    return results_df, best_params, best_mse


In [4]:
# Chunk 4: Example usage

# Suppose you have a dataset 'nicu_120.csv'
data_path = "../data/final/nicu_30.csv"
df = pd.read_csv(data_path)

# Define parameter grid similar to scikit-learn
param_grid = [
    {
        "imputer": [SimpleImputer()],
        "imputer__strategy": ["mean", "median", "most_frequent"]
    },
    {
        "imputer": [KNNImputer()],
        "imputer__n_neighbors": [2, 5, 10]
    },
    {
        "imputer": [IterativeImputer(random_state=42)],
        "imputer__max_iter": [5, 10, 20],
    }
]

# Run the custom grid search for imputation
results_df, best_params, best_mse = grid_search_imputation(
    df,
    param_grid=param_grid,
    mask_ratio=0.1,         # mask 10% of numeric values
    random_state=42
)

# Print results
print("\nAll Results (sorted by MSE):")
for index, row in results_df.iterrows():
    print(f"Params: {row['params']}, MSE: {row['MSE']:.6f}, MAE: {row['MAE']:.6f}")

print("\nBest Parameters Found:", best_params)
print("Best MSE:", best_mse)





All Results (sorted by MSE):
Params: {'imputer': SimpleImputer(strategy='most_frequent'), 'imputer__strategy': 'median'}, MSE: 986534.574089, MAE: 98.992708
Params: {'imputer': SimpleImputer(strategy='most_frequent'), 'imputer__strategy': 'mean'}, MSE: 987636.504422, MAE: 99.081742
Params: {'imputer': IterativeImputer(max_iter=20, random_state=42), 'imputer__max_iter': 10}, MSE: 998811.228978, MAE: 93.429664
Params: {'imputer': IterativeImputer(max_iter=20, random_state=42), 'imputer__max_iter': 20}, MSE: 998811.228978, MAE: 93.429664
Params: {'imputer': IterativeImputer(max_iter=20, random_state=42), 'imputer__max_iter': 5}, MSE: 998811.799936, MAE: 93.429756
Params: {'imputer': KNNImputer(n_neighbors=10), 'imputer__n_neighbors': 10}, MSE: 1083034.317005, MAE: 98.076498
Params: {'imputer': KNNImputer(n_neighbors=10), 'imputer__n_neighbors': 5}, MSE: 1150175.283742, MAE: 100.015752
Params: {'imputer': KNNImputer(n_neighbors=10), 'imputer__n_neighbors': 2}, MSE: 3328224.165812, MAE: 17