In [None]:
# stdlib
import warnings
from typing import Union

# third party
import numpy as np
import pandas as pd
import pytest
from scipy import stats
from scipy.stats import multivariate_normal
from sdv.tabular import TVAE
from sklearn.datasets import fetch_california_housing, fetch_covtype, load_digits
from sklearn.preprocessing import StandardScaler

# domias absolute
from domias.evaluator import evaluate_performance
from domias.models.ctgan import CTGAN
from domias.models.generator import GeneratorInterface

warnings.filterwarnings("ignore")


def get_dataset() -> np.ndarray:
    def data_loader() -> np.ndarray:
        scaler = StandardScaler()
        X = fetch_california_housing().data
        np.random.shuffle(X)
        return scaler.fit_transform(X)

    return data_loader()

In [None]:
def get_generator(
    gan_method: str = "TVAE",
    epochs: int = 100,
    seed: int = 0,
) -> GeneratorInterface:
    class LocalGenerator(GeneratorInterface):
        def __init__(self) -> None:
            if gan_method == "TVAE":
                syn_model = TVAE(epochs=epochs)
            elif gan_method == "CTGAN":
                syn_model = CTGAN(epochs=epochs)
            elif gan_method == "KDE":
                syn_model = None
            else:
                raise RuntimeError()
            self.method = gan_method
            self.model = syn_model

        def fit(self, data: pd.DataFrame) -> "LocalGenerator":
            if self.method == "KDE":
                self.model = stats.gaussian_kde(np.transpose(data))
            else:
                self.model.fit(data)

            return self

        def generate(self, count: int) -> pd.DataFrame:
            if gan_method == "KDE":
                samples = pd.DataFrame(self.model.resample(count).transpose(1, 0))
            elif gan_method == "TVAE":
                samples = self.model.sample(count)
            elif gan_method == "CTGAN":
                samples = self.model.generate(count)
            else:
                raise RuntimeError()

            return samples

    return LocalGenerator()

In [None]:
dataset = get_dataset()
gen_size = 10000
held_out_size = 10000
training_epochs = [1000, 2000, 3000]
training_sizes = [100, 500, 1000]

results = {}
for method in ["TVAE"]:
    results[method] = {}
    for density_estimator in ["prior"]:
        results[method][density_estimator] = {}
        for training_size in training_sizes:
            results[method][density_estimator][training_size] = {}
            for training_epoch in training_epochs:
                generator = get_generator(
                    gan_method=method,
                    epochs=training_epoch,
                )
                try:
                    perf = evaluate_performance(
                        generator,
                        dataset,
                        training_size,
                        held_out_size,
                        training_epoch,
                        gen_size_list=[gen_size],
                    )
                except BaseException as e:
                    print("task failed", e)
                    continue

                print(
                    f"""
                        SIZE_PARAM = {training_size} ADDITION_SIZE  = {held_out_size} TRAINING_EPOCH = {training_epoch}
                            metrics = {perf}
                    """
                )

                results[method][density_estimator][training_size][training_epoch] = perf

In [None]:
results

In [None]:
# third party
import cloudpickle

with open("experiment_1_results.bkp", "wb") as f:
    cloudpickle.dump(results, f)

In [None]:
results["TVAE"]["prior"][100][1000]["100_1000_10000"].keys()

## AUC by the number of iterations

In [None]:
training_size = 1000

output = pd.DataFrame([], columns=["epoch", "src", "AUC"])
for training_epoch in training_epochs:
    epoch_res = results["TVAE"]["prior"][training_size][training_epoch]
    epoch_res = epoch_res[f"{training_size}_{training_epoch}_{held_out_size}"]

    baseline_auc = epoch_res[f"{gen_size}_Baselines"]["auc"].values[0]
    eq1_auc = epoch_res[f"{gen_size}_Eqn1AUC"]
    eq2_auc = epoch_res[f"{gen_size}_Eqn2AUC"]

    output = pd.concat(
        [
            output,
            pd.DataFrame(
                [
                    [training_epoch, "baseline", baseline_auc],
                    [training_epoch, "eq1", eq1_auc],
                    [training_epoch, "eq2", eq2_auc],
                ],
                columns=["epoch", "src", "AUC"],
            ),
        ]
    )

output

In [None]:
# third party
import seaborn as sns

sns.lineplot(output, x="epoch", y="AUC", hue="src")

## AUC by the training dataset size

In [None]:
training_epoch = 3000
output = pd.DataFrame([], columns=["training_size", "src", "AUC"])

for training_size in training_sizes:
    epoch_res = results["TVAE"]["prior"][training_size][training_epoch]
    epoch_res = epoch_res[f"{training_size}_{training_epoch}_{held_out_size}"]

    baseline_auc = epoch_res[f"{gen_size}_Baselines"]["auc"].values[0]
    eq1_auc = epoch_res[f"{gen_size}_Eqn1AUC"]
    eq2_auc = epoch_res[f"{gen_size}_Eqn2AUC"]

    output = pd.concat(
        [
            output,
            pd.DataFrame(
                [
                    [training_size, "baseline", baseline_auc],
                    [training_size, "eq1", eq1_auc],
                    [training_size, "eq2", eq2_auc],
                ],
                columns=["training_size", "src", "AUC"],
            ),
        ]
    )

output

In [None]:
# third party
import seaborn as sns

sns.lineplot(output, x="training_size", y="AUC", hue="src")