# ⚡ Stage‑2 Imputation — Speed & Cost Demo
*Autogenerated 2025-06-07*

Run the notebook cell‑by‑cell to profile different imputation strategies, see per‑feature runtimes & estimated \$ costs, and toggle speed boosters (multi‑core Joblib, adaptive sampling, mean early‑exit, etc.).

In [None]:

# === One‑time dependency install (uncomment on Colab) ===
# !pip install polars tqdm joblib numba psutil nbformat


In [None]:

import time, json, os, math, psutil, datetime, functools, warnings
from pathlib import Path
from typing import Dict, Tuple, Optional
import numpy as np, pandas as pd
from scipy.stats import ks_2samp
from joblib import Parallel, delayed, cpu_count
import polars as pl
warnings.filterwarnings("ignore")

N_JOBS = max(cpu_count() - 1, 1)      # multi‑core
PRICE_PER_MIN = 0.001                 # $ / compute‑minute (example)
SUBSAMPLE = 5_000                     # adaptive sample size


In [None]:

_runtime_log = {}

def timed(fn):
    """Decorator to time functions & accumulate cost stats."""
    @functools.wraps(fn)
    def wrapper(*args, **kw):
        t0 = time.perf_counter()
        out = fn(*args, **kw)
        dt = time.perf_counter() - t0
        _runtime_log.setdefault(fn.__name__, []).append(dt)
        return out
    return wrapper

def cost_df():
    rows = []
    for name, laps in _runtime_log.items():
        t = sum(laps)
        rows.append(dict(function=name,
                         calls=len(laps),
                         seconds=t,
                         cost_usd=t/60*PRICE_PER_MIN))
    return pd.DataFrame(rows).sort_values("seconds", ascending=False)


In [None]:

def ks_fast(a: np.ndarray, b: np.ndarray) -> float:
    # vectorised Rust implementation via Polars
    return pl.Series(a).ks_test(pl.Series(b))[1]


In [None]:

from numba import njit
@njit(cache=True, fastmath=True)
def _rand_fill(mask, vals, seed):
    rng = np.random.default_rng(seed)
    out = vals.copy()
    j = 0
    for i in range(mask.size):
        if mask[i]:
            out[i] = rng.choice(vals)
    return out


In [None]:

from sklearn.impute import KNNImputer, SimpleImputer
knn_shared = None

def get_knn(df_numeric):
    global knn_shared
    if knn_shared is None:
        knn_shared = KNNImputer(n_neighbors=5).fit(df_numeric)
    return knn_shared


In [None]:

@timed
def evaluate_candidate(col, series, df_numeric):
    """Quick demo of adaptive evaluation loop for ONE column."""
    metrics = {}
    # -- Mean --
    mean_val = series.mean()
    arr_mean = series.fillna(mean_val)
    ks = ks_fast(series.dropna().values, arr_mean.values)
    metrics['mean'] = dict(ks=ks)
    # early‑exit if ks high
    if ks > 0.8:
        return 'mean', metrics

    # -- Median --
    med = series.median()
    arr_med = series.fillna(med)
    metrics['median'] = dict(ks=ks_fast(series.dropna().values, arr_med.values))

    # -- KNN only if sample small
    if len(series) <= SUBSAMPLE:
        knn = get_knn(df_numeric)
        arr_knn_full = knn.transform(df_numeric)[:, df_numeric.columns.get_loc(col)]
        metrics['knn'] = dict(ks=ks_fast(series.dropna().values, arr_knn_full[~np.isnan(arr_knn_full)]))
    return max(metrics, key=lambda k: metrics[k]['ks']), metrics


In [None]:

# ---- Demo on random DF ----
df_demo = pd.DataFrame({
    'x': np.random.randn(50_000),
    'y': np.random.randn(50_000)
})
# Inject NaNs
nan_idx = np.random.choice(df_demo.index, size=10_000, replace=False)
df_demo.loc[nan_idx, 'x'] = np.nan

best, met = evaluate_candidate('x', df_demo['x'], df_demo[['x', 'y']])
print('Chosen method:', best)
print(json.dumps(met, indent=2))
cost_df()


In [None]:

import ace_tools as tools
tools.display_dataframe_to_user('runtime_costs', cost_df())
