In [None]:
%load_ext autoreload
%autoreload 2
#%pip install prettytable
import warnings
import json

warnings.filterwarnings("ignore")
# import the necessary package
from baseline.OE_GPLVM.aeb_gplvm import AEB_GPLVM, NNEncoder, kl_gaussian_loss_term
#from baseline.OE_GPLVM.composed_elbo import ComposedVariationalELBO
from baseline.OE_GPLVM.train import *
from baseline.OE_GPLVM.utils import *
from baseline.PyOD import PYOD
from gpytorch.mlls import KLGaussianAddedLossTerm
from gpytorch.distributions import MultivariateNormal
from gpytorch.likelihoods import GaussianLikelihood
from gpytorch.mlls import VariationalELBO, KLGaussianAddedLossTerm
from torch.distributions import kl_divergence
from gpytorch.priors import MultivariateNormalPrior
from tqdm import trange
from utils.data_generator import DataGenerator
from utils.myutils import Utils
import matplotlib.pyplot as plt
import numpy as np
import torch


In [None]:
plt.style.use("ggplot")
datagenerator = DataGenerator()  # data generator
utils = Utils()  # utils function

In [None]:
# dataset and model list / dict
dataset_list = [
    "01_ALOI",
    "02_annthyroid",
    "03_backdoor",
    "04_breastw",
    "05_campaign",
    "06_cardio",
    "07_Cardiotocography",
    "08_celeba",
    "09_census",
    "99_linear",
    "99_circles",
    "99_moons",
]

In [None]:
dataset = dataset_list[-1]
datagenerator.dataset = dataset
data = datagenerator.generator(la=1.0, realistic_synthetic_mode=None, noise_type=None)

In [None]:
Y_train = torch.tensor(data["X_train"], dtype=torch.float32)
Y_test = torch.tensor(data["X_test"], dtype=torch.float32)
lb_train = torch.tensor(data["y_train"], dtype=torch.float32)
lb_test = torch.tensor(data["y_test"], dtype=torch.float32)

In [None]:
fig = plt.figure(figsize=(5, 5))
plt.subplot(111)
plt.scatter(
    data["X_train"][:, 0][np.where(lb_train == 1)[0]],
    data["X_train"][:, 1][np.where(lb_train == 1)[0]],
    label="Anomaly",
)
plt.scatter(
    data["X_train"][:, 0][np.where(lb_train == 0)[0]],
    data["X_train"][:, 1][np.where(lb_train == 0)[0]],
    label="Normal",
    alpha=0.2,
)
plt.legend()

In [None]:
experiment = Experiment(
    Y_train,
    Y_test,
    lb_train,
    lb_test,
    len(Y_train),
    Y_train.shape[1],
    200,
    2,
    75,
    1600,
    (10, 10),
    0.001,
    "loe",
    "hard",
)

In [None]:
N = experiment.N
data_dim = experiment.data_dim
latent_dim = experiment.latent_dim
n_inducing = experiment.n_inducing
n_epochs = experiment.n_epochs
nn_layers = experiment.nn_layers
lr = experiment.lr
method = experiment.method
elbo_type = experiment.elbo
batch_size = experiment.batch_size

In [None]:
model_dict = {}
noise_trace_dict = {}
loss_list = []
noise_trace = []
report = Reporter(experiment)
lln_list = []
kln_list = []
lla_list = []
kla_list = []

X_prior_mean = torch.zeros(experiment.N, latent_dim)  # shape: N x Q
X_prior_covar = torch.eye(X_prior_mean.shape[1])
prior_x = MultivariateNormalPrior(X_prior_mean, X_prior_covar)
encoder = NNEncoder(experiment.N, latent_dim, prior_x, data_dim, nn_layers)
model = AEB_GPLVM(experiment.N, data_dim, latent_dim, n_inducing, encoder, nn_layers)
likelihood = GaussianLikelihood()
optimizer = torch.optim.Adam(
    [{"params": model.parameters()}, {"params": likelihood.parameters()}], lr
)
elbo = VariationalELBO(likelihood, model, num_data=len(Y_train), combine_terms=False)

model.train()
iterator = trange(n_epochs, leave=True)
for i in iterator:
    optimizer.zero_grad()
    idx_n, idx_a, batch_index, ratio = model._get_batch_indices(
        batch_size, lb_train, method=method
    )

    idx_n, idx_a = get_loe_idx(
        model, likelihood, Y_train, batch_index, train_data=experiment.N, ratio=ratio
    )
    target_n, target_a = Y_train[idx_n], Y_train[idx_a]
    ll_n, kl_n = calculate_elbo(
        model,
        likelihood,
        target_n,
        num_data=experiment.N,
        batch_size=batch_size,
        elbo_shape=elbo_type,
    )
    ll_a, kl_a = calculate_elbo(
        model,
        likelihood,
        target_a,
        num_data=experiment.N,
        batch_size=batch_size,
        elbo_shape=elbo_type,
    )
    loss_normal, loss_anomaly = (ll_n - kl_n).sum(), (ll_a - kl_a).sum()
    loss = -loss_loe(method, loss_normal, loss_anomaly).sum()

    # sample = model.sample_latent_variable(Y_train)
    # sample_batch = sample[batch_index]
    # output_batch = model(sample_batch)
    # ll, klu, _, klx = elbo(output_batch, Y_train[batch_index].T)
    # loss = -(ll - klu - klx).sum()

    loss_list.append(loss.item())
    noise_trace.append(np.round(likelihood.noise_covar.noise.item(), 3))
    if i in [0, 10, 50, 250,500,1000]:
        report.save_elbo_terms(ll_n.sum(), kl_n.sum(), ll_a.sum(), kl_a.sum())
        report.save_batch_info(i, idx_n, idx_a, loss.item())
        iterator.set_description(
            "Loss: " + str(float(np.round(loss.item(), 2))) + ", iter no: "
        )
    loss.backward()
    optimizer.step()
    # if loss < -10:
    #    break
model.store(loss_list, likelihood)

In [None]:
report.plot_train_evolution()

In [None]:
with torch.no_grad():
    model.eval()
    likelihood.eval()

In [None]:
report.plot_test(model, likelihood)

In [None]:
Y_pred_mean, Y_pred_covar = model.reconstruct_y(Y_test)
X_pred_mean, X_pred_covar = model.predict_latent(Y_test)

In [None]:
from sklearn.preprocessing import MinMaxScaler
lln, kln = calculate_elbo(
    model,
    likelihood,
    Y_test,
    num_data=len(Y_test),
    batch_size=len(Y_test),
    elbo_shape=elbo_type,
)
score = -(lln - kln).detach().numpy()
score = MinMaxScaler().fit_transform(np.reshape(score, (-1, 1)))
print(utils.metric(y_true=lb_test, y_score=score))