In [None]:
!pip install synthcity
!pip uninstall -y torchaudio torchdata
!pip install plotly

In [None]:
# stdlib
import sys
import warnings

# third party
import optuna
from sklearn.datasets import load_diabetes

import numpy as np
import pandas as pd

# synthcity absolute
import synthcity.logger as log
from synthcity.plugins import Plugins
from synthcity.plugins.core.dataloader import GenericDataLoader

log.add(sink=sys.stderr, level="INFO")
warnings.filterwarnings("ignore")

In [None]:
# objective function for the optuna optmization
# we optmize for minimizing detection of synthetic vs real data

from synthcity.utils.optuna_sample import suggest_all
from synthcity.benchmark import Benchmarks

def objective(trial: optuna.Trial):
    hp_space = Plugins().get(PLUGIN).hyperparameter_space()
    params = suggest_all(trial, hp_space)
    if PLUGIN == "ddpm":
        params["is_classification"] = False
    ID = f"trial_{trial.number}"
    try:
        report = Benchmarks.evaluate(
            [(ID, PLUGIN, params)],
            train_loader,
            repeats=1,
            metrics={"detection": ["detection_xgb"]}, 
        )
    except Exception as e:  # invalid set of params
        print(f"{type(e).__name__}: {e}")
        print(params)
        raise optuna.TrialPruned()
    score = report[ID].query('direction == "minimize"')['mean'].mean()
    # average score across all metrics with direction="minimize"
    return score


def enforce_dtypes(dat, 
                   num_variables, 
                   cat_variables):
    """
    Enforce "float64" type for numeric variables and "object" type for the
    categorical variables
    Parameters:
        dat (pd.DataFrame): Input data matrix (numeric, categorical, or mixed).
        num_variables (list): Indices of numeric variables.
        cat_variables (list): Indices of categorical variables.

    Returns:
    pd.DataFrame: with transformed data types
    """
    if num_variables is not None and cat_variables is None:
        dat_N = pd.DataFrame(dat.iloc[:, num_variables], dtype = "float64")
        dat = dat_N

    elif num_variables is None and cat_variables is not None:
        dat_C = pd.DataFrame(dat.iloc[:, cat_variables], dtype = "str")
        dat = dat_C

    elif num_variables is not None and cat_variables is not None:
        dat_N = pd.DataFrame(dat.iloc[:, num_variables], dtype = "float64")
        dat_C = pd.DataFrame(dat.iloc[:, cat_variables], dtype = "str")
        dat = pd.concat([dat_N, dat_C], axis=1)
        # Reorder columns to match the order in the original data
        reordered_indices = num_variables + cat_variables
        dat = dat.iloc[:, np.argsort(reordered_indices)]

    else:
        raise ValueError("At least one of num_variables or cat_variables must be specified.")
    
    return dat 


def train_test_data_split(X, my_seed):
    """
    Splits the data X into training and testing sets, using a random seed.
    
    Parameters:
    X (pd.DataFrame): The input data DataFrame.
    my_seed (int): The random seed for reproducibility.
    
    Returns:
    dict: A dictionary containing the training and testing DataFrames.
          {'X_train': X_train, 'X_test': X_test}
    """
    # Set random seed
    np.random.seed(my_seed)
    
    # Get the total number of rows
    n = X.shape[0]
    n_sub = n // 2  # Floor division to get half the rows
    
    # Randomly sample indexes for the training set
    idx_train = np.random.choice(X.index, size=n_sub, replace=False)
    
    # Compute the test indexes as the set difference
    idx_test = X.index.difference(idx_train)

    # Adjust sizes to make them equal if necessary
    if len(idx_train) < len(idx_test):
        idx_test = idx_test[:-1]  # Remove the last test index
    
    # Split the data
    X_train = X.loc[idx_train]
    X_test = X.loc[idx_test]
    
    return {"X_train": X_train, "X_test": X_test}

In [None]:
# load the data

from sklearn.datasets import fetch_openml

# Fetch the Abalone dataset
abalone = fetch_openml(name="abalone", version=1, as_frame=True)

# Access the data and target
X = abalone.data
y = abalone.target

X['target'] =  y # Rings

num_idx = [1, 2, 3, 4, 5, 6, 7, 8]
cat_idx = [0]

X = enforce_dtypes(dat = X, 
                   num_variables = num_idx, 
                   cat_variables = cat_idx)

# Split the data
aux = train_test_data_split(X, my_seed=123)

X_train = aux["X_train"]
X_test = aux["X_test"]

In [None]:
# create data loader

train_loader = GenericDataLoader(
    X_train,
    target_column="target",
)

test_loader = GenericDataLoader(
    X_test,
    target_column="target",
)

In [None]:
# set number of optuna trials

n_trials = 20

In [None]:
# run optuna for ddpm

np.random.seed(123)

PLUGIN = "ddpm"
plugin_cls = type(Plugins().get(PLUGIN))

study_ddpm = optuna.create_study(direction="minimize")
study_ddpm.optimize(objective, n_trials=n_trials)
study_ddpm.best_params

In [None]:
np.random.seed(123)

PLUGIN = "arf"
plugin_cls = type(Plugins().get(PLUGIN))

study_arf = optuna.create_study(direction="minimize")
study_arf.optimize(objective, n_trials=n_trials)
study_arf.best_params

In [None]:
# run optuna for tvae

np.random.seed(123)

PLUGIN = "tvae"
plugin_cls = type(Plugins().get(PLUGIN))

study_tvae = optuna.create_study(direction="minimize")
study_tvae.optimize(objective, n_trials=n_trials)
study_tvae.best_params

In [None]:
# run optuna for ctgan

np.random.seed(123)

PLUGIN = "ctgan"
plugin_cls = type(Plugins().get(PLUGIN))

study_ctgan = optuna.create_study(direction="minimize")
study_ctgan.optimize(objective, n_trials=n_trials)
study_ctgan.best_params