In [1]:
from Impute import fill_with_meta, fill_with_et
import pandas as pd
import numpy as np

In [2]:
from hyperimpute.plugins.imputers import Imputers, ImputerPlugin

In [3]:
# stdlib
import copy
from time import time, strftime, localtime
from typing import Any
import warnings

# third party
from IPython.display import display
from joblib import Parallel, delayed
import numpy as np
import pandas as pd
from pydantic import validate_arguments
from scipy.stats import wasserstein_distance
from sklearn.preprocessing import MinMaxScaler

# hyperimpute absolute
import hyperimpute.logger as log
from hyperimpute.plugins.imputers import Imputers
from hyperimpute.plugins.utils.metrics import RMSE
from hyperimpute.plugins.utils.simulate import simulate_nan
from hyperimpute.utils.distributions import enable_reproducible_results
from hyperimpute.utils.metrics import generate_score, print_score

enable_reproducible_results()

warnings.filterwarnings("ignore")


@validate_arguments(config=dict(arbitrary_types_allowed=True))
def ampute(
        x: pd.DataFrame,
        mechanism: str,
        p_miss: float,
        column_limit: int = 8,
        sample_columns: bool = True,
) -> tuple:
    columns = x.columns
    column_limit = min(len(columns), column_limit)

    if sample_columns:
        sampled_columns = columns[
            np.random.choice(len(columns), size=column_limit, replace=False)
        ]
    else:
        sampled_columns = columns[list(range(column_limit))]

    x_simulated = simulate_nan(
        x[sampled_columns].values, p_miss, mechanism, sample_columns=sample_columns, opt="selfmasked"
    )

    isolated_mask = pd.DataFrame(x_simulated["mask"], columns=sampled_columns)
    isolated_x_miss = pd.DataFrame(x_simulated["X_incomp"], columns=sampled_columns)

    mask = pd.DataFrame(np.zeros(x.shape), columns=columns)
    mask[sampled_columns] = pd.DataFrame(isolated_mask, columns=sampled_columns)

    x_miss = pd.DataFrame(x.copy(), columns=columns)
    x_miss[sampled_columns] = isolated_x_miss

    return (
        pd.DataFrame(x, columns=columns),
        x_miss,
        mask,
    )


@validate_arguments(config=dict(arbitrary_types_allowed=True))
def scale_data(X: pd.DataFrame) -> pd.DataFrame:
    preproc = MinMaxScaler()
    cols = X.columns
    return pd.DataFrame(preproc.fit_transform(X), columns=cols)


@validate_arguments(config=dict(arbitrary_types_allowed=True))
def simulate_scenarios(
        X: pd.DataFrame, column_limit: int = 8, sample_columns: bool = True, percentages: list = [0.1, 0.3, 0.5],
) -> pd.DataFrame:
    X = scale_data(X)

    datasets: dict = {}

    mechanisms = ["MAR", "MNAR", "MCAR"]
    # percentages = [0.1, 0.3, 0.5, 0.7, 0.9]

    for ampute_mechanism in mechanisms:
        for p_miss in percentages:
            if ampute_mechanism not in datasets:
                datasets[ampute_mechanism] = {}

            datasets[ampute_mechanism][p_miss] = ampute(
                X,
                ampute_mechanism,
                p_miss,
                column_limit=column_limit,
                sample_columns=sample_columns,
            )

    return datasets


def ws_score(imputed: pd.DataFrame, ground: pd.DataFrame) -> pd.DataFrame:
    res = 0
    for col in range(ground.shape[1]):
        res += wasserstein_distance(
            np.asarray(ground)[:, col], np.asarray(imputed)[:, col]
        )
    return res


@validate_arguments(config=dict(arbitrary_types_allowed=True))
def benchmark_model(
        name: str,
        model: Any,
        X: pd.DataFrame,
        X_miss: pd.DataFrame,
        mask: pd.DataFrame,
) -> tuple:
    start = time()

    imputed = model.fit_transform(X_miss.copy())

    distribution_score = ws_score(imputed, X)
    rmse_score = RMSE(np.asarray(imputed), np.asarray(X), np.asarray(mask))

    log.info(f"benchmark {model.name()} took {time() - start}")
    return rmse_score, distribution_score


def benchmark_standard(
        model_name: str,
        X: pd.DataFrame,
        X_miss: pd.DataFrame,
        mask: pd.DataFrame,
) -> tuple:
    imputer = imputers.get(model_name)
    return benchmark_model(model_name, imputer, X, X_miss, mask)


@validate_arguments(config=dict(arbitrary_types_allowed=True))
def evaluate_dataset(
        name: str,
        evaluated_model: Any,
        X_raw: pd.DataFrame,
        ref_methods: list = ["mean", "missforest", "ice", "gain", "sinkhorn", "softimpute"],
        scenarios: list = ["MAR", "MCAR", "MNAR"],
        miss_pct: list = [0.1, 0.3, 0.5],
        sample_columns: bool = True,
) -> tuple:
    imputation_scenarios = simulate_scenarios(X_raw, sample_columns=sample_columns, percentages=miss_pct)

    rmse_results: dict = {}
    distr_results: dict = {}

    for scenario in scenarios:

        rmse_results[scenario] = {}
        distr_results[scenario] = {}

        for missingness in miss_pct:

            log.debug(f"  > eval {scenario} {missingness}")
            rmse_results[scenario][missingness] = {}
            distr_results[scenario][missingness] = {}

            try:
                x, x_miss, mask = imputation_scenarios[scenario][missingness]
                (our_rmse_score, our_distribution_score) = benchmark_model(
                    name, copy.deepcopy(evaluated_model), x, x_miss, mask
                )
                rmse_results[scenario][missingness]["our"] = our_rmse_score
                distr_results[scenario][missingness]["our"] = our_distribution_score

                for method in ref_methods:
                    x, x_miss, mask = imputation_scenarios[scenario][missingness]

                    (
                        mse_score,
                        distribution_score,
                    ) = benchmark_standard(method, x, x_miss, mask)
                    rmse_results[scenario][missingness][method] = mse_score
                    distr_results[scenario][missingness][method] = distribution_score
            except BaseException as e:
                print(f"scenario failed {str(e)}")

                continue
    return rmse_results, distr_results


def compare_models(
        name: str,
        evaluated_model: Any,
        X_raw: pd.DataFrame,
        ref_methods: list = ["mean", "missforest", "ice", "gain", "sinkhorn", "softimpute"],
        scenarios: list = ["MNAR"],
        miss_pct: list = [0.1, 0.3, 0.5, 0.7],
        n_iter: int = 2,
        sample_columns: bool = True,
        display_results: bool = True,
        n_jobs: int = 1,
) -> dict:
    dispatcher = Parallel(n_jobs=n_jobs)
    start = time()

    def add_metrics(
            store: dict, scenario: str, missingness: float, method: str, score: float
    ) -> None:
        if scenario not in store:
            store[scenario] = {}
        if missingness not in store[scenario]:
            store[scenario][missingness] = {}
        if method not in store[scenario][missingness]:
            store[scenario][missingness][method] = []

        store[scenario][missingness][method].append(score)

    rmse_results_dict: dict = {}
    distr_results_dict: dict = {}

    def eval_local(it: int) -> Any:
        enable_reproducible_results(it)
        log.debug(f"> evaluation trial {it}")
        return evaluate_dataset(
            name=name,
            evaluated_model=evaluated_model,
            X_raw=X_raw,
            ref_methods=ref_methods,
            scenarios=scenarios,
            miss_pct=miss_pct,
            sample_columns=sample_columns,
        )

    repeated_evals_results = dispatcher(delayed(eval_local)(it) for it in range(n_iter))

    for (
            local_rmse_results,
            local_distr_results,
    ) in repeated_evals_results:
        for scenario in local_rmse_results:
            for missingness in local_rmse_results[scenario]:
                for method in local_rmse_results[scenario][missingness]:
                    add_metrics(
                        rmse_results_dict,
                        scenario,
                        missingness,
                        method,
                        local_rmse_results[scenario][missingness][method],
                    )
                    add_metrics(
                        distr_results_dict,
                        scenario,
                        missingness,
                        method,
                        local_distr_results[scenario][missingness][method],
                    )

    rmse_results = []
    distr_results = []

    rmse_str_results = []
    distr_str_results = []

    for scenario in rmse_results_dict:

        for missingness in rmse_results_dict[scenario]:

            local_rmse_str_results = [scenario, missingness]
            local_distr_str_results = [scenario, missingness]

            local_rmse_results = [scenario, missingness]
            local_distr_results = [scenario, missingness]

            for method in ["our"] + ref_methods:
                rmse_mean, rmse_std = generate_score(
                    rmse_results_dict[scenario][missingness][method]
                )
                rmse_str = print_score((rmse_mean, rmse_std))
                distr_mean, distr_std = generate_score(
                    distr_results_dict[scenario][missingness][method]
                )
                distr_str = print_score((distr_mean, distr_std))

                local_rmse_str_results.append(rmse_str)
                local_rmse_results.append((rmse_mean, rmse_std))

                local_distr_str_results.append(distr_str)
                local_distr_results.append((distr_mean, distr_std))

            rmse_str_results.append(local_rmse_str_results)
            rmse_results.append(local_rmse_results)
            distr_str_results.append(local_distr_str_results)
            distr_results.append(local_distr_results)

    if display_results:
        log.info(f"benchmark took {time() - start}")
        headers = (
                ["Scenario", "miss_pct [0, 1]"]
                + [f"Evaluated: {evaluated_model.name()}"]
                + ref_methods
        )

        sep = "\n==========================================================\n\n"
        cur_time = strftime("%Y%m%d_%H%M%S", localtime())
        data = pd.DataFrame(rmse_str_results, columns=headers)
        data.to_csv(f"./{name}_rmse.csv", index=False)
        display(data)

        print(sep + "Wasserstein score")

        data = pd.DataFrame(distr_str_results, columns=headers)
        data.to_csv(f"./{name}_dis.csv", index=False)
        display(data)

    return {
        "headers": headers,
        "rmse": rmse_results,
        "wasserstein": distr_results,
    }

In [4]:
imputers = Imputers()

In [5]:
class EtImputer(ImputerPlugin):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._model = fill_with_et

    @staticmethod
    def name():
        return "et"

    @staticmethod
    def hyperparameter_space():
        return []

    def _fit(self, *args, **kwargs):
        return self

    def _transform(self, df):
        # 按照缺失值的比例进行排序
        miss_rate = df.isnull().sum() / df.shape[0]
        cols = miss_rate.sort_values().index.tolist()
        cols = [col for col in cols if miss_rate[col] > 0]
        for col in cols:
            df_col_filled = self._model(df, col)
            df[col] = df_col_filled[col]
        return df


imputers.add("et", EtImputer)

<hyperimpute.plugins.imputers.Imputers at 0x230197d1dd0>

In [6]:
imputer = imputers.get("et")

In [7]:
from pathlib import Path

# p = Path('./dataset')
# datasets = list(p.glob('*.csv'))
# datasets = [d for d in datasets if
#             d.stem != '3D_printer' and d.stem != 'Concrete Compressive Strength' and d.stem != "Bala_regression_dataset"]

datasets = [Path("./dataset/Bala_regression_dataset.csv")]

datasets

[WindowsPath('dataset/Bala_regression_dataset.csv')]

In [9]:
from tqdm import tqdm

for d in tqdm(datasets):
    # if Path.exists(Path(f"./Et-knn-{d.stem}_rmse.csv")):
    #     print(f"skip {d.stem}")
    #     continue
    df = pd.read_csv(d)
    df = df.select_dtypes(include=[np.number])
    compare_models(
        name=f"Et-adsadasdasd-{d.stem}",
        evaluated_model=imputer,
        X_raw=df,
        ref_methods=["mean", "hyperimpute", "missforest"],
        scenarios=["MCAR"],
        miss_pct=[0.1],
        n_iter=1
    )

  0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,Scenario,"miss_pct [0, 1]",Evaluated: et,mean,hyperimpute,missforest
0,MCAR,0.1,0.0691 +/- 0.0,0.187 +/- 0.0,0.0769 +/- 0.0,0.0769 +/- 0.0




Wasserstein score


Unnamed: 0,Scenario,"miss_pct [0, 1]",Evaluated: et,mean,hyperimpute,missforest
0,MCAR,0.1,0.01 +/- 0.0,0.0766 +/- 0.0,0.0229 +/- 0.0,0.023 +/- 0.0


100%|██████████| 1/1 [00:23<00:00, 23.49s/it]
