In [None]:
"""Evaluation for the CGAN 2D w/ CNN."""

from __future__ import annotations

import os

os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"

from engibench.utils.all_problems import BUILTIN_PROBLEMS
import numpy as np
import torch as th
import wandb

from engiopt import metrics
from engiopt.cgan_cnn_2d import Generator

problem_id: str = "beams2d"
"""Problem identifier."""
seed: int = 1
"""Random seed."""
wandb_project: str = "engiopt"
"""Wandb project name."""
wandb_entity: str | None = None
"""Wandb entity name."""
n_samples: int = 5
"""Number of generated samples."""
sigma: float = 10
"""Kernel bandwidth."""


problem = BUILTIN_PROBLEMS[problem_id]()
problem.reset(seed=seed)

if th.backends.mps.is_available():
    device = th.device("mps")
elif th.cuda.is_available():
    device = th.device("cuda")
else:
    device = th.device("cpu")

In [None]:
test_ds = problem.dataset["test"]

# Extract conditions
condition_keys = [key for key, _ in problem.conditions]
conditions_ds = test_ds.select_columns(condition_keys)

# Sample conditions at random indices
selected_indices = np.random.choice(len(test_ds), n_samples, replace=True)
sampled_conditions = conditions_ds.select(selected_indices)

# Convert dict_values to a list of lists
conditions_list = list(sampled_conditions[:].values())

# Transpose the list to align the data correctly
conditions_list = list(map(list, zip(*conditions_list)))

# Create a tensor from the list
conditions_tensor = th.tensor(conditions_list, dtype=th.float32, device=device)

# Reshape to match the expected input shape for the model
conditions_tensor = conditions_tensor.unsqueeze(-1).unsqueeze(-1)

In [None]:
# Restores the pytorch model from wandb
if wandb_entity is not None:
    artifact_path = f"{wandb_entity}/{wandb_project}/{problem_id}_cgan_cnn_2d_generator:seed_{seed}"
else:
    artifact_path = f"{wandb_project}/{problem_id}_cgan_cnn_2d_generator:seed_{seed}"


api = wandb.Api()
artifact = api.artifact(artifact_path, type="model")
run = artifact.logged_by()
artifact_dir = artifact.download()


ckpt_path = os.path.join(artifact_dir, "generator.pth")
ckpt = th.load(ckpt_path)
model = Generator(
    latent_dim=run.config["latent_dim"], n_conds=len(problem.conditions), design_shape=problem.design_space.shape
)
model.load_state_dict(ckpt["generator"])
model.eval()  # Set to evaluation mode
model.to(device)


# Sample noise as generator input
z = th.randn((n_samples, run.config["latent_dim"], 1, 1), device=device, dtype=th.float)

# Generate a batch of designs
gen_designs = model(z, conditions_tensor)
gen_designs_np = gen_designs.detach().cpu().numpy()
gen_designs_np = gen_designs_np.reshape(n_samples, *problem.design_space.shape)

gen_designs_np = np.clip(gen_designs_np, 1e-3, 1)

In [None]:
i = 1

baseline = np.array(test_ds["optimal_design"][selected_indices[i]])

# Load/Simulate GAN Warmstart Design
problem.render(gen_designs_np[i])
GAN_opt_design, GAN_opt_history = problem.optimize(gen_designs_np[i], config=sampled_conditions[i])
problem.render(GAN_opt_design)

# Load/Simulate Dataset Design
problem.render(baseline)
opt_design, opt_history = problem.optimize(baseline, config=sampled_conditions[i])
problem.render(opt_design)

print(
    "Obj_GAN_output:",
    problem.simulate(gen_designs_np[i], config=sampled_conditions[i]),
    "Obj_GAN_optimized:",
    problem.simulate(opt_design, config=sampled_conditions[i]),
)
print(
    "Obj_Dataset:",
    problem.simulate(baseline, config=sampled_conditions[i]),
    "Obj_Dataset_optimized:",
    problem.simulate(GAN_opt_design, config=sampled_conditions[i]),
)

In [None]:
# Metrics
print("MMD:", metrics.mmd(gen_designs_np, np.array(test_ds["optimal_design"])[selected_indices], sigma=sigma))
print("DPP:", metrics.dpp_diversity(gen_designs_np, sigma=sigma))

In [None]:
# Compute the average instnataneous and cumulative optimality gap for the dataset of generated designs
cog_list = []
iog_list = []
fog_list = []
for i in range(n_samples):
    opt_design, opt_history = problem.optimize(gen_designs_np[i], sampled_conditions[i])
    reference_optimum = problem.simulate(np.array(test_ds["optimal_design"][selected_indices[i]]))
    opt_history_gaps = metrics.optimality_gap(opt_history, reference_optimum)

    iog_list.append(opt_history_gaps[0])
    cog_list.append(np.sum(opt_history_gaps))
    fog_list.append(opt_history_gaps[-1])


average_iog = np.mean(iog_list)
average_cog = np.mean(cog_list)
average_fog = np.mean(fog_list)

print("Average COG:", average_cog, "Average IOG:", average_iog, "Average FOG:", average_fog)

In [None]:
metrics = metrics.metrics(
    problem, gen_designs_np, np.array(test_ds["optimal_design"])[selected_indices], sampled_conditions, sigma=sigma
)
metrics