In [1]:
import pandas as pd
import numpy as np
from sklearn.experimental import enable_iterative_imputer  
from sklearn.impute import IterativeImputer
from sklearn.impute import KNNImputer
from sklearn.linear_model import LinearRegression
from sklearn.neighbors import NearestNeighbors
from fancyimpute import SoftImpute
import torch
import torch.nn as nn
import torch.optim as optim
import os

import warnings
warnings.filterwarnings("ignore")

In [2]:
# Load raw data
input_path = '../data/raw/uc_diagnostic_tests.csv'
df = pd.read_csv(input_path, decimal=',')

print("Original data shape:", df.shape)
print("Missing values before imputation:", df.isna().sum().sum())

# Select numeric columns
numeric_cols = df.select_dtypes(include=[np.number]).columns

# Ensure output directory exists
os.makedirs('../data/processed', exist_ok=True)

Original data shape: (252, 56)
Missing values before imputation: 3939


In [3]:
### 1. MICE Imputation
print("Running MICE imputation...")
mice_imputer = IterativeImputer(max_iter=20, random_state=42)
df_mice = df.copy()
df_mice[numeric_cols] = mice_imputer.fit_transform(df[numeric_cols])

output_path_mice = '../data/processed/uc_diagnostic_tests_mice.csv'
df_mice.to_csv(output_path_mice, index=False)
print(f"MICE saved to: {output_path_mice}")

Running MICE imputation...
MICE saved to: ../data/processed/uc_diagnostic_tests_mice.csv


In [4]:
### 2. KNN Imputation
print("Running KNN imputation...")
knn_imputer = KNNImputer(n_neighbors=5)
df_knn = df.copy()
df_knn[numeric_cols] = knn_imputer.fit_transform(df[numeric_cols])

output_path_knn = '../data/processed/uc_diagnostic_tests_knn.csv'
df_knn.to_csv(output_path_knn, index=False)
print(f"KNN saved to: {output_path_knn}")

Running KNN imputation...
KNN saved to: ../data/processed/uc_diagnostic_tests_knn.csv


In [5]:
### 3. SoftImpute
print("Running SoftImputer...")
soft_imputer = SoftImpute(verbose=False)
df_soft = df.copy()
# SoftImpute usually requires matrix, returns matrix
df_soft[numeric_cols] = soft_imputer.fit_transform(df[numeric_cols].values)

output_path_soft = '../data/processed/uc_diagnostic_tests_softimpute.csv'
df_soft.to_csv(output_path_soft, index=False)
print(f"SoftImpute saved to: {output_path_soft}")

Running SoftImputer...
SoftImpute saved to: ../data/processed/uc_diagnostic_tests_softimpute.csv


In [6]:
### 4. GAIN Imputation

def gain_imputation(data_x, gain_parameters):
    # Define parameters
    batch_size = gain_parameters['batch_size']
    hint_rate = gain_parameters['hint_rate']
    alpha = gain_parameters['alpha']
    iterations = gain_parameters['iterations']
    
    No = len(data_x)
    Dim = len(data_x[0])
    
    # Define Mask
    data_m = 1 - np.isnan(data_x)
    
    # Normalization (GAIN needs normalized inputs 0-1 usually, but we need to inverse it back)
    min_val = np.zeros(Dim)
    max_val = np.zeros(Dim)
    
    # Creating a copy to avoid modifying original dataframe in place if passed directly
    norm_data = data_x.copy()
    
    for i in range(Dim):
        min_val[i] = np.nanmin(norm_data[:,i])
        norm_data[:,i] = norm_data[:,i] - min_val[i]
        max_val[i] = np.nanmax(norm_data[:,i])
        norm_data[:,i] = norm_data[:,i] / (max_val[i] + 1e-6)
        
    # Renormalization helper
    def renormalization(norm_data, min_val, max_val):
        renorm_data = norm_data.copy()
        for i in range(Dim):
            renorm_data[:,i] = renorm_data[:,i] * (max_val[i] + 1e-6)
            renorm_data[:,i] = renorm_data[:,i] + min_val[i]
        return renorm_data
        
    # Fill NaN with 0 for training
    norm_data = np.nan_to_num(norm_data, nan=0)
    
    # Convert to torch tensors
    data_x_tensor = torch.tensor(norm_data, dtype=torch.float32)
    data_m_tensor = torch.tensor(data_m, dtype=torch.float32)
    
    # Generator
    class Generator(nn.Module):
        def __init__(self, dim):
            super().__init__()
            self.fc1 = nn.Linear(dim * 2, dim)
            self.fc2 = nn.Linear(dim, dim)
            self.fc3 = nn.Linear(dim, dim)
            self.relu = nn.ReLU()
            self.sigmoid = nn.Sigmoid()
            
        def forward(self, x, m):
            inputs = torch.cat([x, m], dim=1)
            h1 = self.relu(self.fc1(inputs))
            h2 = self.relu(self.fc2(h1))
            out = self.sigmoid(self.fc3(h2))
            return out
            
    # Discriminator
    class Discriminator(nn.Module):
        def __init__(self, dim):
            super().__init__()
            self.fc1 = nn.Linear(dim * 2, dim)
            self.fc2 = nn.Linear(dim, dim)
            self.fc3 = nn.Linear(dim, dim)
            self.relu = nn.ReLU()
            self.sigmoid = nn.Sigmoid()
            
        def forward(self, x, h):
            inputs = torch.cat([x, h], dim=1)
            d1 = self.relu(self.fc1(inputs))
            d2 = self.relu(self.fc2(d1))
            out = self.sigmoid(self.fc3(d2))
            return out

    # Initialize models
    generator = Generator(Dim)
    discriminator = Discriminator(Dim)
    
    optimizer_G = optim.Adam(generator.parameters())
    optimizer_D = optim.Adam(discriminator.parameters())
    
    # Training
    for it in range(iterations):
        # Sample batch
        idx = np.random.permutation(No)[:batch_size]
        X_mb = data_x_tensor[idx]
        M_mb = data_m_tensor[idx]
        
        # Random noise for Missing values
        Z_mb = torch.rand((batch_size, Dim)) * 0.01
        
        # Hint vector
        H_mb_temp = torch.rand((batch_size, Dim))
        H_mb = (H_mb_temp > (1 - hint_rate)).float()
        H_mb = M_mb * H_mb + 0.5 * (1 - H_mb)
        
        # Combine random noise with data
        X_mb_noise = M_mb * X_mb + (1 - M_mb) * Z_mb
        
        # Train Discriminator
        optimizer_D.zero_grad()
        G_sample = generator(X_mb_noise, M_mb)
        Hat_X = M_mb * X_mb + (1 - M_mb) * G_sample
        D_prob = discriminator(Hat_X, H_mb)
        
        D_loss = -torch.mean(M_mb * torch.log(D_prob + 1e-8) + (1 - M_mb) * torch.log(1 - D_prob + 1e-8))
        D_loss.backward()
        optimizer_D.step()
        
        # Train Generator
        optimizer_G.zero_grad()
        G_sample = generator(X_mb_noise, M_mb)
        Hat_X = M_mb * X_mb + (1 - M_mb) * G_sample
        D_prob = discriminator(Hat_X, H_mb)
        
        G_loss_temp = -torch.mean((1 - M_mb) * torch.log(D_prob + 1e-8))
        MSE_loss = torch.mean((M_mb * X_mb - M_mb * G_sample)**2) / torch.mean(M_mb)
        
        G_loss = G_loss_temp + alpha * MSE_loss
        G_loss.backward()
        optimizer_G.step()
        
    # Final Imputation
    Z_mb_final = torch.rand((No, Dim)) * 0.01
    X_mb_final = data_m_tensor * data_x_tensor + (1 - data_m_tensor) * Z_mb_final
    
    generated_data = generator(X_mb_final, data_m_tensor).detach().numpy()
    
    # Re-normalize back to original range
    imputed_data_norm = data_m * norm_data + (1 - data_m) * generated_data
    imputed_data = renormalization(imputed_data_norm, min_val, max_val)
    
    return imputed_data

# GAIN parameters
gain_parameters = {
    'batch_size': 32,
    'hint_rate': 0.9,
    'alpha': 100,
    'iterations': 5000
}

print("Running GAIN imputation...")
imputed_data_gain = gain_imputation(df[numeric_cols].values, gain_parameters)

df_gain = df.copy()
df_gain[numeric_cols] = imputed_data_gain

output_path_gain = '../data/processed/uc_diagnostic_tests_gain.csv'
df_gain.to_csv(output_path_gain, index=False)
print(f"GAIN saved to: {output_path_gain}")

Running GAIN imputation...
GAIN saved to: ../data/processed/uc_diagnostic_tests_gain.csv


In [None]:
### 5. PMM Imputation (Predictive Mean Matching)
print("Running PMM imputation...")

def pmm_imputation(df, numeric_cols):
    df_pmm = df.copy()
    df_temp = df_pmm[numeric_cols].fillna(df_pmm[numeric_cols].mean())
    
    for col in numeric_cols:
        if df[col].isna().sum() == 0:
            continue
            
        is_nan = df[col].isna()
        X = df_temp.drop(columns=[col])
        y = df[col]
        
        X_obs, y_obs = X[~is_nan], y[~is_nan]
        X_mis = X[is_nan]
        
        model = LinearRegression()
        model.fit(X_obs, y_obs)
        
        pred_obs = model.predict(X_obs).reshape(-1, 1)
        pred_mis = model.predict(X_mis).reshape(-1, 1)
        
        nn = NearestNeighbors(n_neighbors=1)
        nn.fit(pred_obs)
        _, indices = nn.kneighbors(pred_mis)
        
        imputed_values = y_obs.iloc[indices.flatten()].values
        df_pmm.loc[is_nan, col] = imputed_values
        
    return df_pmm

# Run PMM
df_pmm = pmm_imputation(df, numeric_cols)

# Save result
output_path_pmm = '../data/processed/uc_diagnostic_tests_pmm.csv'
df_pmm.to_csv(output_path_pmm, index=False)
print(f"PMM saved to: {output_path_pmm}")

Running PMM imputation...
PMM saved to: ../data/processed/uc_diagnostic_tests_pmm.csv


In [8]:
print("\nAll imputations complete.")
print(f"MICE missing: {df_mice.isna().sum().sum()}")
print(f"KNN missing: {df_knn.isna().sum().sum()}")
print(f"SoftImpute missing: {df_soft.isna().sum().sum()}")
print(f"GAIN missing: {df_gain.isna().sum().sum()}")
print(f"PMM missing: {df_pmm.isna().sum().sum()}")


All imputations complete.
MICE missing: 0
KNN missing: 0
SoftImpute missing: 0
GAIN missing: 0
PMM missing: 0
