# Step 2: Multiple Imputation by Chained Equations (MICE)

In [None]:
import pandas as pd
import miceforest as mf
import os
import matplotlib.pyplot as plt
import numpy as np
from typing import Union
import json

from paths import PROCESSED_CSV_PATH, MICE_BASE_DIR, MICE_IMPUTED_METRICS_DIR, IMPUTED_DATASETS_DIR

## 1. Load Data

In [None]:
#Load data
df_full = pd.read_csv(PROCESSED_CSV_PATH, encoding='utf-8')
df_full.shape

(6724, 100)

In [None]:
df_full.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6724 entries, 0 to 6723
Data columns (total 99 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   胎膜早破                6724 non-null   int64  
 1   胎儿宫内窘迫              6724 non-null   int64  
 2   巨大儿                 6724 non-null   int64  
 3   羊水污染                6724 non-null   int64  
 4   妊娠期糖尿病              6724 non-null   int64  
 5   妊娠期高血压              6724 non-null   int64  
 6   妊娠合并肝损害             6724 non-null   int64  
 7   妊娠合并肝内胆汁淤积症         6724 non-null   int64  
 8   孕妇产次                6724 non-null   int64  
 9   足月产次数               6724 non-null   int64  
 10  早产次数                6724 non-null   int64  
 11  流产次数                6724 non-null   int64  
 12  人流次数                6724 non-null   int64  
 13  体重                  6724 non-null   float64
 14  身高                  6724 non-null   float64
 15  1小时葡萄糖              6724 non-null   float64
 16  2小时葡萄糖

## 2. Apply MICE Imputation

In [None]:
IMPUTED_DATASETS = 1
ITERATIONS = 20

# Initialize the imputation kernel
kernel = mf.ImputationKernel(
    data=df_full,
    num_datasets=IMPUTED_DATASETS,  # Number of imputed datasets
    random_state=42
)

# Perform MICE with N iterations per dataset
kernel.mice(ITERATIONS)

# Retrieve the imputed datasets 
imputed_datasets = [kernel.complete_data(dataset=i) for i in range(IMPUTED_DATASETS)]

# Ensure indexes match
for i, imputed_df in enumerate(imputed_datasets, start=1):
    assert imputed_df.shape[0] == df_full.shape[0], f"Row count mismatch in dataset {i}"
    assert all(imputed_df.index == df_full.index), f"Index mismatch in dataset {i}"
print("All imputed datasets match the original DataFrame indexes.")


All imputed datasets match the original DataFrame indexes.


## 3. Save Imputed Datasets

In [None]:
# Save each imputed dataset with a unique name
for i, imputed_df in enumerate(imputed_datasets, start=1):
    if i < 10:
        file_name = f"imputed_0{i}.csv"
    else:
        file_name = f"imputed_{i}.csv"
    output_path = os.path.join(IMPUTED_DATASETS_DIR, file_name)
    imputed_df.to_csv(output_path, index=False)
    print(f"Saved {file_name} with shape {imputed_df.shape}")

Saved imputed_01.csv with shape (6724, 99)
Saved imputed_02.csv with shape (6724, 99)
Saved imputed_03.csv with shape (6724, 99)


## 4. Get metrics

In [None]:
#Get feature names that had missing values before imputation
def get_na_feature_names(df: pd.DataFrame):
    return [col for col in df.columns if df[col].isna().any()]

#Convergence diagnostic
def get_convergence_diagnostic(kernel: mf.ImputationKernel, feature_names: list[str], iterations_cap: int=ITERATIONS):
    for dataset_id in range(kernel.num_datasets):
        #Check directory for current dataset
        dataset_file_dir = f"Convergence Metrics {dataset_id + 1}"
        save_dir = os.path.join(MICE_BASE_DIR, dataset_file_dir)
        if not os.path.isdir(save_dir):
            os.makedirs(save_dir)
        
        for feature_name in feature_names:
            means_per_iteration = []
            for iteration in range(iterations_cap):
                current_imputed = kernel.complete_data(dataset=dataset_id, iteration=iteration)
                means_per_iteration.append(np.mean(current_imputed[feature_name]))

            plt.plot(means_per_iteration, marker='o')
            plt.xlabel("Iteration")
            plt.ylabel("Mean of Imputed Values")
            plt.title(f"Mean Convergence for '{feature_name}'")
            
            # Adjust plot display for the X axis
            _ticks = np.arange(iterations_cap)
            _labels = np.arange(1, iterations_cap + 1)
            plt.xticks(ticks=_ticks, labels=_labels)
            
            save_path = os.path.join(save_dir, feature_name + ".png")
            plt.savefig(save_path, bbox_inches='tight')
            plt.close()
            
        print(f"{dataset_file_dir} completed.")

# Imputed distributions
def get_imputed_distributions(kernel: mf.ImputationKernel, feature_names: Union[list[str], None]=None, individual_plots: bool=True):
    ''' 
    It works using miceforest's authors implementation of the method `.plot_imputed_distributions()`.
    
    Set individual_plots=False to save a single image with all feature distributions.
    '''
    # Styling parameters
    legend_kwargs = {'frameon': True, 'facecolor': 'white', 'framealpha': 0.8}
    label_font = {'size': 14, 'weight': 'bold'}

    def _process_figure(fig, filename):
        """Helper function to add labels and legends to a figure"""
        for ax in fig.axes:
            # Set axis labels
            ax.set_xlabel('Value', **label_font)
            ax.set_ylabel('Density', **label_font)
            
            # Add legend based on line colors
            lines = ax.get_lines()
            if len(lines) >= 1:
                lines[0].set_label('Original Data')
                if len(lines) > 1:
                    lines[1].set_label('Imputed Data')
                ax.legend(**legend_kwargs)
                
        # Adjust layout and save
        fig.tight_layout()
        fig.savefig(
            os.path.join(MICE_IMPUTED_METRICS_DIR, filename),
            dpi=300,
            bbox_inches='tight',
            pad_inches=0.2
        )
        plt.close(fig)

    if individual_plots and feature_names:
        # Generate individual plots per feature
        for feature in feature_names:
            fig = kernel.plot_imputed_distributions(variables=[feature])
            _process_figure(fig, f"{feature}.png")
    else:
        # Generate combined plot
        fig = kernel.plot_imputed_distributions(variables=feature_names)
        _process_figure(fig, "Combined_Distributions.png")
    
    print("Imputed distributions saved successfully.")

In [None]:
# Run functions
na_feature_names = get_na_feature_names(df_full)
get_convergence_diagnostic(kernel=kernel, feature_names=na_feature_names)
get_imputed_distributions(kernel=kernel, feature_names=na_feature_names)

Convergence Imputed_1 completed.
Convergence Imputed_2 completed.
Convergence Imputed_3 completed.
Imputed Distribution Legend:
	Red lines are the distribution of original data.
	Black lines are the distribution of the imputed values.
