<a href="https://colab.research.google.com/github/hyunho-song09/Microglia_EVs_study/blob/main/missing_value_imputation_v2_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd "/content/drive/MyDrive/250325_missing_value_imputation"

Mounted at /content/drive
/content/drive/MyDrive/250325_missing_value_imputation


In [None]:
!pip install scikit-learn fancyimpute tensorflow
!pip install fpdf

Collecting fancyimpute
  Downloading fancyimpute-0.7.0.tar.gz (25 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting knnimpute>=0.1.0 (from fancyimpute)
  Downloading knnimpute-0.1.0.tar.gz (8.3 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting nose (from fancyimpute)
  Downloading nose-1.3.7-py3-none-any.whl.metadata (1.7 kB)
Downloading nose-1.3.7-py3-none-any.whl (154 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.7/154.7 kB[0m [31m7.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: fancyimpute, knnimpute
  Building wheel for fancyimpute (setup.py) ... [?25l[?25hdone
  Created wheel for fancyimpute: filename=fancyimpute-0.7.0-py3-none-any.whl size=29879 sha256=dc2640ab4effc761f0819f96bd3def746d7eadf61afe38781d183b398d637397
  Stored in directory: /root/.cache/pip/wheels/1a/f3/a1/f7f10b5ae2c2459398762a3fcf4ac18c325311c7e3163d5a15
  Building wheel for knnimpute (setup.py) ... [?25l[?25hdone
  C

## Step-by-step implementation:

#### 1. Generate a complete data frame and a data frame with missing values

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

# Set random seed for reproducibility
np.random.seed(42)

# Generate a complete DataFrame (1000 rows, 10 columns)
complete_df = pd.DataFrame(np.random.randn(1000, 10), columns=[f'col_{i}' for i in range(1, 11)])

# Introduce missing values randomly (second DataFrame)
missing_df = complete_df.copy()
missingness_prop = 0.1  # 10% missing values
missing_mask = np.random.rand(*missing_df.shape) < missingness_prop
missing_df[missing_mask] = np.nan

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from sklearn.impute import SimpleImputer, KNNImputer
from fancyimpute import IterativeImputer
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import umap
import os
from fpdf import FPDF

# ----------------------------
# Deep Learning Imputers
# ----------------------------
class SimpleTransformerImputer(nn.Module):
    def __init__(self, input_dim, n_heads=2, num_layers=2):
        super().__init__()
        self.embedding = nn.Linear(input_dim, 64)
        encoder_layer = nn.TransformerEncoderLayer(d_model=64, nhead=n_heads)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.output_layer = nn.Linear(64, input_dim)

    def forward(self, x):
        x = self.embedding(x).unsqueeze(1)
        x = self.transformer(x)
        x = self.output_layer(x.squeeze(1))
        return x

class AutoEncoder(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 32), nn.ReLU(),
            nn.Linear(32, 16), nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.Linear(16, 32), nn.ReLU(),
            nn.Linear(32, input_dim)
        )

    def forward(self, x):
        return self.decoder(self.encoder(x))

# ----------------------------
# Main Evaluator Class
# ----------------------------
class ImputationEvaluator:
    def __init__(self, df):
        self.df = df
        self.methods = {
            'Mean Imputation': self.mean_imputation,
            'Median Imputation': self.median_imputation,
            'Most Frequent Value Imputation': self.most_frequent_imputation,
            'Zero Imputation': self.zero_imputation,
            'Constant Imputation (e.g., -1)': lambda df: self.constant_imputation(df, -1),
            'K-NN Imputation': self.knn_imputation,
            'MICE Imputation': self.mice_imputation,
            'Transformer Imputation': self.transformer_imputation,
            'AutoEncoder Imputation': self.autoencoder_imputation,
        }
        self.imputed_results = {}
        self.mean_deviation = {}
        self.variance_deviation = {}
        self.pca_dist = {}
        self.tsne_dist = {}
        self.umap_dist = {}
        self.best_method_name = None
        self.best_imputed_df = None

    # ----------------------------
    # Imputation Methods
    # ----------------------------
    def mean_imputation(self, df):
        return pd.DataFrame(SimpleImputer(strategy='mean').fit_transform(df), columns=df.columns)

    def median_imputation(self, df):
        return pd.DataFrame(SimpleImputer(strategy='median').fit_transform(df), columns=df.columns)

    def most_frequent_imputation(self, df):
        return pd.DataFrame(SimpleImputer(strategy='most_frequent').fit_transform(df), columns=df.columns)

    def zero_imputation(self, df):
        return pd.DataFrame(SimpleImputer(strategy='constant', fill_value=0).fit_transform(df), columns=df.columns)

    def constant_imputation(self, df, fill_value):
        return pd.DataFrame(SimpleImputer(strategy='constant', fill_value=fill_value).fit_transform(df), columns=df.columns)

    def knn_imputation(self, df):
        return pd.DataFrame(KNNImputer(n_neighbors=5).fit_transform(df), columns=df.columns)

    def mice_imputation(self, df):
        return pd.DataFrame(IterativeImputer().fit_transform(df), columns=df.columns)

    def transformer_imputation(self, df, num_epochs=30):
        df_tensor = torch.tensor(df.values, dtype=torch.float32)
        mask = torch.isnan(df_tensor)
        df_filled = df_tensor.clone()
        df_filled[mask] = 0.0

        model = SimpleTransformerImputer(input_dim=df.shape[1])
        optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
        criterion = nn.MSELoss()

        for _ in range(num_epochs):
            model.train()
            optimizer.zero_grad()
            output = model(df_filled)
            loss = criterion(output[~mask], df_tensor[~mask])
            loss.backward()
            optimizer.step()

        with torch.no_grad():
            imputed = model(df_filled).numpy()
        return pd.DataFrame(imputed, columns=df.columns)

    def autoencoder_imputation(self, df, num_epochs=30):
        df_tensor = torch.tensor(df.values, dtype=torch.float32)
        mask = torch.isnan(df_tensor)
        df_filled = df_tensor.clone()
        df_filled[mask] = 0.0

        model = AutoEncoder(input_dim=df.shape[1])
        optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
        criterion = nn.MSELoss()

        for _ in range(num_epochs):
            model.train()
            optimizer.zero_grad()
            output = model(df_filled)
            loss = criterion(output[~mask], df_tensor[~mask])
            loss.backward()
            optimizer.step()

        with torch.no_grad():
            imputed = model(df_filled).numpy()
        return pd.DataFrame(imputed, columns=df.columns)

    # ----------------------------
    # Evaluation Pipeline
    # ----------------------------
    def evaluate(self):
        os.makedirs("report", exist_ok=True)

        sns.heatmap(self.df.isnull(), cbar=False, yticklabels=False, cmap='viridis')
        plt.title('Missing Values Heatmap')
        plt.savefig("report/missing_heatmap.png")
        plt.close()

        for name, func in self.methods.items():
            print(f"\n🔄 Running: {name}")
            imputed = func(self.df)
            reference = self.df.dropna().iloc[:len(imputed)]
            self.imputed_results[name] = imputed

            self.mean_deviation[name] = np.abs(reference.mean() - imputed.mean()).mean()
            self.variance_deviation[name] = np.abs(reference.var() - imputed.var()).mean()

            pca = PCA(n_components=2)
            pca_ref = pca.fit_transform(reference)
            pca_imp = pca.transform(imputed.iloc[:len(reference)])
            self.pca_dist[name] = np.mean(np.linalg.norm(pca_ref - pca_imp, axis=1))

            tsne = TSNE(n_components=2, perplexity=30, random_state=42)
            tsne_ref = tsne.fit_transform(reference)
            tsne_imp = tsne.fit_transform(imputed.iloc[:len(reference)])
            self.tsne_dist[name] = np.mean(np.linalg.norm(tsne_ref - tsne_imp, axis=1))

            reducer = umap.UMAP(n_components=2, random_state=42)
            umap_ref = reducer.fit_transform(reference)
            umap_imp = reducer.transform(imputed.iloc[:len(reference)])
            self.umap_dist[name] = np.mean(np.linalg.norm(umap_ref - umap_imp, axis=1))

        self._plot_scores()
        self._generate_report()
        self._generate_pdf()
        self._visualize_best()

    def _plot_scores(self):
        metrics = {
            'Mean': self.mean_deviation,
            'Variance': self.variance_deviation,
            'PCA': self.pca_dist,
            't-SNE': self.tsne_dist,
            'UMAP': self.umap_dist
        }

        data = []
        for metric, values in metrics.items():
            for method, value in values.items():
                # Avoid division by zero
                inverse_value = 1 / value if value != 0 else 0
                data.append({'Metric': metric, 'Method': method, 'InverseValue': inverse_value})

        df_plot = pd.DataFrame(data)
        # Scale inverse values
        df_plot['ScaledInverse'] = df_plot.groupby('Metric')['InverseValue'].transform(
            lambda x: (x - x.min()) / (x.max() - x.min()) if x.max() != x.min() else 0
        )

        plt.figure(figsize=(14, 6))
        sns.barplot(data=df_plot, x='Metric', y='ScaledInverse', hue='Method')
        plt.title("Scaled Inverse Scores Across Evaluation Metrics (Higher is Better)")
        plt.tight_layout()
        plt.savefig("report/combined_barplot.png")
        plt.close()

    def _generate_report(self):
        with open("report/imputation_report.txt", "w") as f:
            f.write("Imputation Evaluation Report\n===========================\n\n")
            for method in self.methods:
                f.write(f"Method: {method}\n")
                f.write(f"  Mean Deviation:     {self.mean_deviation[method]:.4f}\n")
                f.write(f"  Variance Deviation: {self.variance_deviation[method]:.4f}\n")
                f.write(f"  PCA Distance:       {self.pca_dist[method]:.4f}\n")
                f.write(f"  t-SNE Distance:     {self.tsne_dist[method]:.4f}\n")
                f.write(f"  UMAP Distance:      {self.umap_dist[method]:.4f}\n\n")

    def _generate_pdf(self):
        pdf = FPDF()
        pdf.add_page()
        pdf.set_font("Arial", size=14)
        pdf.cell(200, 10, txt="Imputation Evaluation Report", ln=True, align="C")
        pdf.image("report/missing_heatmap.png", x=10, y=20, w=180)
        pdf.ln(95)
        pdf.image("report/combined_barplot.png", x=10, y=115, w=180)
        pdf.ln(95)
        pdf.add_page()
        with open("report/imputation_report.txt", "r") as f:
            for line in f:
                pdf.set_font("Courier", size=10)
                pdf.multi_cell(0, 6, line)
        pdf.output("report/imputation_summary.pdf")

    def _visualize_best(self):
        total_score = {
            m: (self.mean_deviation[m] + self.variance_deviation[m] + self.pca_dist[m] +
                self.tsne_dist[m] + self.umap_dist[m]) / 5
            for m in self.methods
        }
        best_method = min(total_score, key=total_score.get)
        full_imputed = self.imputed_results[best_method]
        reference = self.df.dropna()

        # 시각화는 reference 크기에 맞춰 자르지만, 전체 결과는 유지
        vis_ref = reference
        vis_imp = full_imputed.iloc[:len(vis_ref)]

        print(f"\n✅ Best imputation method: {best_method}")

        fig, axs = plt.subplots(1, 3, figsize=(18, 5))

        pca = PCA(n_components=2)
        pca_ref = pca.fit_transform(vis_ref)
        pca_imp = pca.transform(vis_imp)
        axs[0].scatter(pca_ref[:, 0], pca_ref[:, 1], alpha=0.5, label='Original')
        axs[0].scatter(pca_imp[:, 0], pca_imp[:, 1], alpha=0.5, label='Imputed')
        axs[0].set_title(f"PCA")
        axs[0].legend()

        tsne = TSNE(n_components=2, perplexity=30, random_state=42)
        tsne_ref = tsne.fit_transform(vis_ref)
        tsne_imp = tsne.fit_transform(vis_imp)
        axs[1].scatter(tsne_ref[:, 0], tsne_ref[:, 1], alpha=0.5, label='Original')
        axs[1].scatter(tsne_imp[:, 0], tsne_imp[:, 1], alpha=0.5, label='Imputed')
        axs[1].set_title(f"t-SNE")
        axs[1].legend()

        reducer = umap.UMAP(n_components=2, random_state=42)
        umap_ref = reducer.fit_transform(vis_ref)
        umap_imp = reducer.transform(vis_imp)
        axs[2].scatter(umap_ref[:, 0], umap_ref[:, 1], alpha=0.5, label='Original')
        axs[2].scatter(umap_imp[:, 0], umap_imp[:, 1], alpha=0.5, label='Imputed')
        axs[2].set_title(f"UMAP")
        axs[2].legend()

        plt.suptitle(f"Best Method: {best_method}", fontsize=16)
        plt.tight_layout()
        plt.savefig(f"report/best_method_comparison.png")
        plt.close()

        self.best_method_name = best_method
        self.best_imputed_df = full_imputed  # 전체 결과 보존

In [None]:
evaluator = ImputationEvaluator(missing_df)
evaluator.evaluate()


🔄 Running: Mean Imputation


  warn(



🔄 Running: Median Imputation


  warn(



🔄 Running: Most Frequent Value Imputation


  warn(



🔄 Running: Zero Imputation


  warn(



🔄 Running: Constant Imputation (e.g., -1)


  warn(



🔄 Running: K-NN Imputation


  warn(



🔄 Running: MICE Imputation


  warn(



🔄 Running: Transformer Imputation


  warn(



🔄 Running: AutoEncoder Imputation


  warn(



✅ Best imputation method: MICE Imputation


  warn(


In [None]:
# 최종 Imputed 결과
final_df = evaluator.best_imputed_df

In [None]:
final_df

Unnamed: 0,col_1,col_2,col_3,col_4,col_5,col_6,col_7,col_8,col_9,col_10
0,0.496714,-0.138264,-0.029151,-0.045056,-0.234153,-0.034442,1.579213,0.767435,-0.469474,0.542560
1,-0.463418,-0.465730,0.241962,-1.913280,-1.724918,-0.562288,-1.012831,0.314247,-0.908024,-1.412304
2,1.465649,-0.225776,0.067528,-1.424748,-0.544383,0.110923,-1.150994,0.375698,-0.600639,-0.291694
3,-0.601707,1.852278,-0.013497,-1.057711,0.822545,-1.220844,0.208864,-1.959670,-1.328186,0.196861
4,0.738467,0.171368,-0.115648,-0.301104,-1.478522,-0.719844,-0.460639,1.057122,-0.032466,-1.763040
...,...,...,...,...,...,...,...,...,...,...
995,0.867805,0.227405,-0.889845,-0.960780,0.254128,-0.028100,0.014991,-1.034598,0.650668,0.425911
996,-1.070666,0.043779,0.688496,-0.234508,1.589147,0.501129,-0.486631,-0.010206,0.063383,-0.728390
997,-0.912588,0.701390,0.845273,0.603781,1.515318,-0.032263,1.674271,0.003012,-1.012686,-1.759959
998,-0.445795,-0.503722,-0.032586,0.243891,-1.192973,-0.392726,-0.371462,0.010073,-0.980947,-0.770814


In [None]:
missing_df

Unnamed: 0,col_1,col_2,col_3,col_4,col_5,col_6,col_7,col_8,col_9,col_10
0,0.496714,-0.138264,,,-0.234153,,1.579213,0.767435,-0.469474,0.542560
1,-0.463418,-0.465730,0.241962,-1.913280,-1.724918,-0.562288,-1.012831,0.314247,-0.908024,-1.412304
2,1.465649,-0.225776,0.067528,-1.424748,-0.544383,0.110923,-1.150994,0.375698,-0.600639,-0.291694
3,-0.601707,1.852278,-0.013497,-1.057711,0.822545,-1.220844,0.208864,-1.959670,-1.328186,0.196861
4,0.738467,0.171368,-0.115648,-0.301104,-1.478522,-0.719844,-0.460639,1.057122,,-1.763040
...,...,...,...,...,...,...,...,...,...,...
995,0.867805,0.227405,-0.889845,-0.960780,0.254128,,,-1.034598,0.650668,0.425911
996,-1.070666,,0.688496,-0.234508,1.589147,0.501129,-0.486631,-0.010206,0.063383,-0.728390
997,-0.912588,0.701390,0.845273,0.603781,1.515318,,1.674271,,-1.012686,-1.759959
998,-0.445795,-0.503722,,0.243891,-1.192973,-0.392726,-0.371462,,-0.980947,-0.770814
