In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from pathlib import Path
from scipy.special import expit  # sigmoid
from sklearn.metrics import mean_absolute_error, mean_squared_error
import statsmodels.api as sm

sns.set(style="whitegrid")
np.random.seed(0)

# plt.rcParams["pdf.fonttype"] = 42  
# plt.rcParams["ps.fonttype"] = 42
# PLOT_DIR = Path("./plots")


In [None]:
DATA_DIR = Path("./data")   
csv_files = sorted(DATA_DIR.glob("*.csv"))

def load_dataset(path: Path):
    df = pd.read_csv(path, index_col=0, header=0)
    mask = (~df.isna()).astype(int)
    return df, mask

def protein_stats(df: pd.DataFrame, mask: pd.DataFrame):
    mu = df.mean(skipna=True)
    r = 1 - mask.mean()
    stats = pd.DataFrame({"mu": mu, "r_real": r})
    stats = stats.dropna(subset=["mu"])  
    return stats

datasets = {}
missing_stats = {}
for f in csv_files:
    z, mask = load_dataset(f)
    datasets[f.stem] = {"data": z, "mask": mask}

    missing_rate = 1 - mask.values.mean()
    missing_stats[f.stem] = missing_rate

print(f"Loaded {len(datasets)} datasets.\n")

print("Missing rate per dataset:")
for name, rate in missing_stats.items():
    print(f" - {name}: {rate:.4f}")



In [None]:
def fit_logistic_mu(stats: pd.DataFrame):
    y = stats["r_real"].values
    X = sm.add_constant(stats["mu"].values)
    model = sm.Logit(y, X)
    res = model.fit(disp=False)
    return res.params  # [intercept, slope]

def mnar_logistic(mu, params):
    a, b = params
    return expit(a + b * mu)

def mnar_threshold(mu, q=0.3, p_hi=0.8, p_lo=0.05):
    thr = np.quantile(mu, q)
    return np.where(mu < thr, p_hi, p_lo)

def mnar_jingwei(mu, q_low=0.1, q_high=0.6, p_low=0.8, p_high=0.2):
    t_low = np.quantile(mu, q_low)
    t_high = np.quantile(mu, q_high)
    mu = np.asarray(mu, float)
    p = np.empty_like(mu, float)
    p[mu <= t_low] = p_low
    mid = (mu > t_low) & (mu < t_high)
    p[mid] = p_low + (p_high - p_low) * (mu[mid] - t_low) / (t_high - t_low + 1e-12)
    p[mu >= t_high] = 0
    return p


In [None]:
def simulate_missing_on_observed(df: pd.DataFrame, mask: pd.DataFrame, p_per_protein: pd.Series):
    p = p_per_protein.reindex(mask.columns)
    sim_missing = np.zeros_like(mask, dtype=int)

    for col_idx, col in enumerate(mask.columns):
        obs_pos = np.flatnonzero(mask.iloc[:, col_idx].to_numpy() == 1)
        if obs_pos.size == 0:
            continue
        prob = p.iloc[col_idx]
        if pd.isna(prob):
            continue
        prob = float(prob)
        sim_missing[obs_pos, col_idx] = np.random.binomial(1, prob, size=obs_pos.size)

    sim_df = pd.DataFrame(sim_missing, index=mask.index, columns=mask.columns)
    r_sim = sim_df.sum() / mask.sum().replace(0, np.nan)
    return r_sim

In [None]:
results = []
for f in csv_files:
    print(f"\nProcessing {f.name} ...")
    df, mask = load_dataset(f)
    stats_df = protein_stats(df, mask)  

    try:
        params = fit_logistic_mu(stats_df)
        p_logit = mnar_logistic(stats_df["mu"], params)
    except Exception as e:
        print("Logistic fit failed:", e)
        p_logit = pd.Series(np.full(len(stats_df), np.nan), index=stats_df.index)

    p_thr = mnar_threshold(stats_df["mu"])
    p_jw = mnar_jingwei(stats_df["mu"])

    r_sim_logit = simulate_missing_on_observed(df, mask, pd.Series(p_logit, index=stats_df.index))
    r_sim_thr   = simulate_missing_on_observed(df, mask, pd.Series(p_thr,   index=stats_df.index))
    r_sim_jw    = simulate_missing_on_observed(df, mask, pd.Series(p_jw,    index=stats_df.index))

    r_real = stats_df["r_real"]
    def compute_metrics(pred):
        pred = pred.reindex(r_real.index)
        mask_valid = ~(r_real.isna() | pred.isna())
        rr = r_real[mask_valid]
        pp = pred[mask_valid]
        mae = mean_absolute_error(rr, pp)
        rmse = np.sqrt(mean_squared_error(rr, pp))
        return mae, rmse

    mae_logit, rmse_logit = compute_metrics(r_sim_logit)
    mae_thr,   rmse_thr   = compute_metrics(r_sim_thr)
    mae_jw,    rmse_jw    = compute_metrics(r_sim_jw)

    results.append({
        "dataset": f.stem,
        "MAE_logit": mae_logit, "RMSE_logit": rmse_logit,
        "MAE_thr": mae_thr,     "RMSE_thr": rmse_thr,
        "MAE_jw": mae_jw,       "RMSE_jw": rmse_jw,
    })

    log_mu = np.log10(stats_df["mu"])
    plt.figure(figsize=(7,5))
    plt.scatter(log_mu, r_real, s=15, alpha=0.7, label="Real missing", color="black")
    plt.scatter(log_mu, r_sim_logit, s=12, alpha=0.6, label="Logistic", color="#1f77b4")
    plt.scatter(log_mu, r_sim_thr,   s=12, alpha=0.6, label="Threshold", color="#ff7f0e")
    plt.scatter(log_mu, r_sim_jw,    s=12, alpha=0.6, label="JINGWEI", color="#2ca02c")
    plt.xlabel("log10(Protein mean intensity)")
    plt.ylabel("Missing rate")
    plt.title(f"Missing curves: {f.stem}")
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
res_df = pd.DataFrame(results)
if not res_df.empty:
    display(res_df)
# res_df.to_csv("MNAR_simulation_results.csv", index=False)