In [1]:
import pandas as pd
import numpy as np
import torch
import os
import sys
from tqdm import tqdm, trange

sys.path.append("../../")
import biked_commons
from biked_commons.design_evaluation.design_evaluation import get_standard_evaluations
from biked_commons.data_loading import data_loading
from biked_commons.conditioning import conditioning

from biked_commons.design_evaluation.scoring import *
from biked_commons.benchmark_models import benchmarking_utils
from biked_commons.transformation.one_hot_encoding import encode_to_continuous, decode_to_mixed

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
data = data_loading.load_bike_bench_train()
device = "cpu"
save_scores = False

In [3]:
def get_condition(idx=0):
    rider_condition = conditioning.sample_riders(10, split="test")
    use_case_condition = conditioning.sample_use_case(10, split="test")
    image_embeddings = conditioning.sample_image_embedding(10, split="test")
    condition = {"Rider": rider_condition[idx], "Use Case": use_case_condition[idx], "Embedding": image_embeddings[idx]}
    return condition


In [4]:
import numpy as np
import pandas as pd
import torch

from pymoo.core.problem import ElementwiseProblem
from pymoo.core.variable import Real, Integer, Choice, Binary


class BikeBenchProblem(ElementwiseProblem):
    def __init__(self,
                 data_sample_df: pd.DataFrame,   # your one-hot–encoded df
                 conditioning: dict,
                 **kwargs):
        # 1) keep around the original continuous columns for the evaluator:
        self.continuous_cols = list(data_sample_df.columns)

        # 2) build your evaluator as before:
        evaluator, requirement_names, requirement_types = construct_tensor_evaluator(
            get_standard_evaluations("cpu"),
            self.continuous_cols
        )
        self.evaluator      = evaluator
        is_obj              = torch.tensor(requirement_types) == 1
        self.isobjective    = is_obj
        n_obj               = int(is_obj.sum().item())
        n_ieq_constr        = int((~is_obj).sum().item())
        self.conditioning   = conditioning

        # 3) decode the ONE-HOT df into a mixed-type df for variable inference:
        mixed_df = decode_to_mixed(data_sample_df)

        # 4) infer each variable from its mixed dtype
        vars = {}
        for col in mixed_df.columns:
            series = mixed_df[col]
            if pd.api.types.is_bool_dtype(series):
                vars[col] = Binary()
            elif pd.api.types.is_integer_dtype(series):
                low, high    = int(series.min()), int(series.max())
                vars[col]    = Integer(bounds=(low, high))
            elif pd.api.types.is_float_dtype(series):
                # use your desired quantiles for bounds
                low, high    = np.quantile(series, 0.01), np.quantile(series, 0.99)
                vars[col]    = Real(bounds=(float(low), float(high)))
            else:
                # categorical / object
                opts             = series.dropna().unique().tolist()
                vars[col]        = Choice(options=opts)
        super().__init__(
            vars=vars,
            n_obj=n_obj,
            n_ieq_constr=n_ieq_constr,
            **kwargs                                 
        )

    def _evaluate(self, X, out, *args, **kwargs):
        # X is a dict: { col_name: mixed_value }
        # 1) build a one-row DataFrame in the mixed space
        mixed_row = pd.DataFrame([X], columns=self.vars.keys())

        # 2) encode it back to continuous one-hot form
        cont_row  = encode_to_continuous(mixed_row)

        # 3) ensure the columns line up exactly with the original one-hot df
        cont_row  = cont_row[self.continuous_cols]

        # 4) to numpy → tensor
        x_np      = cont_row.to_numpy().astype(np.float32)
        x_t       = torch.tensor(x_np, dtype=torch.float32)

        # 5) evaluate
        scores    = self.evaluator(x_t, self.conditioning)

        # 6) split objectives vs constraints
        f = scores[:,   self.isobjective ].detach().numpy().flatten().tolist()   # ← as list
        g = scores[:, (~self.isobjective) ].detach().numpy().flatten().tolist()

        out["F"] = f
        out["G"] = g


In [5]:
from pymoo.core.mixed import MixedVariableGA
from pymoo.algorithms.moo.nsga2 import RankAndCrowdingSurvival
from pymoo.optimize import minimize

for i in range(1): #replace with range(10) for full run
    condition = get_condition(i)
    problem = BikeBenchProblem(data, condition)

    algorithm = MixedVariableGA(pop_size=100, survival = RankAndCrowdingSurvival())

    res = minimize(problem, algorithm, ('n_gen', 10), seed=1, verbose=True)
    mixed_df = pd.DataFrame(list(res.X)  , columns=problem.vars.keys())
    res_df_onehot = encode_to_continuous(mixed_df)
    results_tens = torch.tensor(res_df_onehot.values, dtype=torch.float32)
    main_scores, detailed_scores = benchmarking_utils.evaluate_uncond(results_tens, "NSGA2", i, data.columns, device="cpu", save=save_scores)
    print(main_scores)
    print(detailed_scores)

n_gen  |  n_eval  |     cv_min    |     cv_avg    |     f_avg     |     f_min    
     1 |      100 |  0.000000E+00 |  1.413745E+02 |  0.5436578223 |  0.2826067209
     2 |      200 |  0.000000E+00 |  2.181522E+01 |  0.5504693723 |  0.2826067209
     3 |      300 |  0.000000E+00 |  0.2809061462 |  0.5634386267 |  0.2826067209
     4 |      400 |  0.000000E+00 |  0.000000E+00 |  0.5781799173 |  0.2826067209
     5 |      500 |  0.000000E+00 |  0.000000E+00 |  0.5865576488 |  0.6891837716
     6 |      600 |  0.000000E+00 |  0.000000E+00 |  0.5960802871 |  0.3868730068
     7 |      700 |  0.000000E+00 |  0.000000E+00 |  0.6168394673 |  0.4928049445
     8 |      800 |  0.000000E+00 |  0.000000E+00 |  0.6005525100 |  0.2378673553
     9 |      900 |  0.000000E+00 |  0.000000E+00 |  0.6069310242 |  0.4850115776
    10 |     1000 |  0.000000E+00 |  0.000000E+00 |  0.5943281734 |  0.5973629951
Hypervolume                     0.265555
Constraint Satisfaction Rate    1.000000
Maximum Mean Dis