# AutoNSGA-II Tuning Demo (ZDT1, real VAMOS components)
This notebook wires the VAMOS tuning module to the AutoNSGA-II builder,
runs a small random-search tuning on ZDT1 (30 vars), and saves the tuning history.

In [None]:
from __future__ import annotations

import json
import csv
from pathlib import Path
from typing import Any, Dict, List

import numpy as np

from vamos.tuning import (
    ParamSpace,
    Real,
    Int,
    Categorical,
    Condition,
    TuningTask,
    Instance,
    EvalContext,
    RandomSearchTuner,
    TrialResult,
    filter_active_config,
    history_to_dict,
    save_history_json,
    save_history_csv,
)
from vamos.problem.registry import make_problem_selection
from vamos.algorithm.autonsga2_builder import build_autonsga2
from vamos.metrics.hv_zdt import compute_normalized_hv

In [None]:
def build_problem(name: str, n_var: int, **kwargs):
    """
    Thin wrapper around VAMOS problem factory for ZDT problems.
    """
    selection = make_problem_selection(name, n_var=n_var)
    return selection.instantiate()

In [None]:
def create_param_space() -> ParamSpace:
    """
    AutoNSGA-II-like hyperparameter space for real-valued ZDT problems.
    Ensure names align with build_autonsga2 expectations.
    """
    return ParamSpace(
        params={
            "population_size": Int(40, 200),
            "offspring_size": Int(40, 200),
            "init.type": Categorical(["random", "lhs", "scatter_search"]),
            "crossover.type": Categorical(["sbx", "blx_alpha", "arithmetic", "pcx", "undx", "spx"]),
            "crossover.prob": Real(0.6, 1.0),
            "crossover.sbx_eta": Real(5.0, 40.0),
            "crossover.blx_alpha": Real(0.0, 1.0),
            "crossover.blx_repair": Categorical(["clip", "resample", "reflect", "round"]),
            "crossover.pcx_sigma_eta": Real(0.01, 1.0),
            "crossover.pcx_sigma_zeta": Real(0.01, 1.0),
            "crossover.undx_sigma_xi": Real(0.05, 1.0),
            "crossover.undx_sigma_eta": Real(0.05, 1.0),
            "crossover.spx_epsilon": Real(0.05, 1.0),
            "mutation.type": Categorical(["uniform", "polynomial", "linked_polynomial", "non_uniform", "gaussian", "uniform_reset", "cauchy"]),
            "mutation.prob_factor": Real(0.1, 2.0),
            "mutation.poly_eta": Real(5.0, 40.0),
            "mutation.uniform_perturb": Real(0.0, 1.0),
            "mutation.non_uniform_perturb": Real(0.0, 1.0),
            "mutation.gaussian_sigma": Real(0.001, 1.0),
            "mutation.cauchy_gamma": Real(0.001, 1.0),
            "selection.type": Categorical(["tournament", "random"]),
            "selection.tournament_size": Int(2, 8),
            "repair": Categorical(["clip", "resample", "round"]),
            "archive_size": Int(0, 200),
            "result_mode": Categorical(["population", "external_archive"]),
        },
        conditions=[
            Condition("crossover.sbx_eta", "cfg['crossover.type'] == 'sbx'"),
            Condition("crossover.blx_alpha", "cfg['crossover.type'] == 'blx_alpha'"),
            Condition("crossover.blx_repair", "cfg['crossover.type'] == 'blx_alpha'"),
            Condition("crossover.pcx_sigma_eta", "cfg['crossover.type'] == 'pcx'"),
            Condition("crossover.pcx_sigma_zeta", "cfg['crossover.type'] == 'pcx'"),
            Condition("crossover.undx_sigma_xi", "cfg['crossover.type'] == 'undx'"),
            Condition("crossover.undx_sigma_eta", "cfg['crossover.type'] == 'undx'"),
            Condition("crossover.spx_epsilon", "cfg['crossover.type'] == 'spx'"),
            Condition("mutation.poly_eta", "cfg['mutation.type'] in ['polynomial', 'linked_polynomial']"),
            Condition("selection.tournament_size", "cfg['selection.type'] == 'tournament'"),
            Condition("mutation.uniform_perturb", "cfg['mutation.type'] == 'uniform'"),
            Condition("mutation.non_uniform_perturb", "cfg['mutation.type'] == 'non_uniform'"),
            Condition("mutation.gaussian_sigma", "cfg['mutation.type'] == 'gaussian'"),
            Condition("mutation.cauchy_gamma", "cfg['mutation.type'] == 'cauchy'"),
            Condition("archive_size", "cfg.get('result_mode', 'population') == 'external_archive'"),
        ],
    )


In [None]:
def eval_fn(config: Dict[str, Any], ctx: EvalContext) -> float:
    """
    Evaluate a configuration on the given instance/seed using VAMOS NSGA-II.
    Returns normalized hypervolume.
    """
    problem = build_problem(name=ctx.instance.name, n_var=ctx.instance.n_var, **ctx.instance.kwargs)
    algo = build_autonsga2(config, problem, seed=ctx.seed)
    result = algo.run(problem, termination=("n_eval", ctx.budget), seed=ctx.seed)
    # Prefer archive if present
    F = None
    if isinstance(result, dict):
        arch = result.get("archive")
        if isinstance(arch, dict):
            F = arch.get("F")
        if F is None:
            F = result.get("F")
    else:
        arch = getattr(result, "archive", None)
        if isinstance(arch, dict):
            F = arch.get("F")
        if F is None:
            F = getattr(result, "F", None)
    if F is None:
        raise RuntimeError("NSGA-II result missing objectives matrix 'F'.")
    F_arr = np.asarray(F, dtype=float)
    if F_arr.ndim == 1:
        if F_arr.size % 2 == 0:
            F_arr = F_arr.reshape(-1, 2)
        else:
            F_arr = F_arr.reshape(1, -1)
    elif F_arr.ndim > 2:
        F_arr = F_arr.reshape(F_arr.shape[0], -1)
    if F_arr.size == 0:
        raise RuntimeError("Empty objective matrix returned by NSGA-II.")
    if F_arr.shape[1] < 2:
        raise ValueError(f"Expected at least 2 objectives, got {F_arr.shape[1]}")
    return compute_normalized_hv(F_arr, ctx.instance.name)


In [None]:
# Reference configs from the AutoNSGA-II paper for quick comparison
PAPER_DEFAULT_CONFIGS: List[Dict[str, Any]] = [
    {
        "label": "Default NSGA-II",
        "algorithmResult": "population",
        "populationSize": 100,
        "offspringPopulationSize": 100,
        "variation": "crossoverAndMutationVariation",
        "crossover": "sbx",
        "crossoverProbability": 0.9,
        "crossoverRepairStrategy": "random",
        "sbxDistributionIndexValue": 20.0,
        "mutation": "polynomial",
        "mutationProbabilityFactor": 1.0,
        "mutationRepairStrategy": "random",
        "polynomialMutationDistributionIndex": 20.0,
        "selection": "tournament",
        "selectionTournamentSize": 2,
    },
    {
        "label": "AutoNSGA-II (paper)",
        "algorithmResult": "externalArchive",
        "populationSizeWithArchive": 56,
        "offspringPopulationSize": 14,
        "variation": "crossoverAndMutationVariation",
        "crossover": "blx_alpha",
        "crossoverProbability": 0.88,
        "crossoverRepairStrategy": "bounds",
        "blxAlphaCrossoverAlphaValue": 0.94,
        "mutation": "nonUniform",
        "mutationProbabilityFactor": 0.45,
        "mutationRepairStrategy": "round",
        "nonUniformMutationPerturbation": 0.3,
        "selection": "tournament",
        "selectionTournamentSize": 9,
    },
]

DISPLAY_COLUMNS = [
    "label",
    "algorithmResult",
    "populationSize",
    "populationSizeWithArchive",
    "offspringPopulationSize",
    "variation",
    "crossover",
    "crossoverProbability",
    "crossoverRepairStrategy",
    "sbxDistributionIndexValue",
    "blxAlphaCrossoverAlphaValue",
    "mutation",
    "mutationProbabilityFactor",
    "mutationRepairStrategy",
    "polynomialMutationDistributionIndex",
    "nonUniformMutationPerturbation",
    "selection",
    "selectionTournamentSize",
]

def project_config_for_display(cfg: Dict[str, Any], label: str) -> Dict[str, Any]:
    # Map our tuning config keys into the human-readable table schema.
    result_mode = cfg.get("result_mode", "population")
    crossover_type = cfg.get("crossover.type") or cfg.get("crossover")
    mutation_type = cfg.get("mutation.type") or cfg.get("mutation")
    return {
        "label": label,
        "algorithmResult": result_mode,
        "populationSize": cfg.get("population_size"),
        "populationSizeWithArchive": cfg.get("population_size"),
        "offspringPopulationSize": cfg.get("offspring_size", cfg.get("population_size")),
        "variation": "crossoverAndMutationVariation",
        "crossover": crossover_type,
        "crossoverProbability": cfg.get("crossover.prob"),
        "crossoverRepairStrategy": cfg.get("crossover.blx_repair") or cfg.get("repair"),
        "sbxDistributionIndexValue": cfg.get("crossover.sbx_eta"),
        "blxAlphaCrossoverAlphaValue": cfg.get("crossover.blx_alpha"),
        "mutation": mutation_type,
        "mutationProbabilityFactor": cfg.get("mutation.prob_factor"),
        "mutationRepairStrategy": cfg.get("repair"),
        "polynomialMutationDistributionIndex": cfg.get("mutation.poly_eta"),
        "nonUniformMutationPerturbation": cfg.get("mutation.non_uniform_perturb"),
        "selection": cfg.get("selection.type"),
        "selectionTournamentSize": cfg.get("selection.tournament_size"),
    }


def print_config_table(rows: List[Dict[str, Any]]) -> None:
    # Render a plain-text table with aligned columns.
    cols = [c for c in DISPLAY_COLUMNS if any(c in r for r in rows)]
    str_rows = []
    widths = {col: len(col) for col in cols}
    for row in rows:
        str_row = []
        for col in cols:
            val = row.get(col, "-")
            val_str = f"{val}" if not isinstance(val, float) else f"{val:.4g}"
            widths[col] = max(widths[col], len(val_str))
            str_row.append(val_str)
        str_rows.append(str_row)
    header = " ".join(col.ljust(widths[col]) for col in cols)
    print(header)
    print("-" * len(header))
    for str_row in str_rows:
        line = " ".join(val.ljust(widths[col]) for val, col in zip(str_row, cols))
        print(line)


In [None]:
def create_zdt1_tuning_task() -> TuningTask:
    """
    Create a tuning task for ZDT1 (30 vars) with small seed set and budget.
    """
    param_space = create_param_space()
    instances = [Instance("zdt1", 30)]
    seeds = [1, 2]
    budget_per_run = 20_000
    return TuningTask(
        name="autonsga2_zdt1_small",
        param_space=param_space,
        instances=instances,
        seeds=seeds,
        budget_per_run=budget_per_run,
        maximize=True,
        aggregator=np.mean,
    )

In [None]:
def main() -> None:
    """Run a small AutoNSGA-II tuning experiment on ZDT1 and save history."""
    global task, tuner, best_config, history
    task = create_zdt1_tuning_task()
    tuner = RandomSearchTuner(task=task, max_trials=20, seed=42)
    best_config, history = tuner.run(eval_fn, verbose=True)

    results_dir = Path("results")
    results_dir.mkdir(parents=True, exist_ok=True)
    save_history_json(history, task.param_space, results_dir / "autonsga2_zdt1_history.json")
    save_history_csv(history, task.param_space, results_dir / "autonsga2_zdt1_history.csv")

    # Friendly summary
    ordered = sorted(history, key=lambda t: t.score, reverse=task.maximize)
    best = ordered[0]
    best_config_clean = filter_active_config(best.config, task.param_space)
    print(f"\nCompleted {len(history)} trials")
    print(f"Best score: {best.score:.4f} (trial {best.trial_id})")
    print("Best config:")
    print(json.dumps(best_config_clean, indent=2))

    print("\nTop 5 configurations:")
    header = f"{'rank':<4} {'trial':<5} {'score':<10} config"
    print(header)
    print('-' * len(header))
    for rank, trial in enumerate(ordered[:5], start=1):
        cfg_clean = filter_active_config(trial.config, task.param_space)
        cfg_str = json.dumps(cfg_clean)
        print(f"{rank:<4} {trial.trial_id:<5} {trial.score:<10.4f} {cfg_str}")

    # Table comparison against paper defaults
    table_rows = PAPER_DEFAULT_CONFIGS + [project_config_for_display(best_config_clean, "Tuned best")]
    print("\nConfiguration table (paper vs tuned best):")
    print_config_table(table_rows)


if __name__ == "__main__":
    main()


[trial 0] max score=0.983903
[trial 1] max score=0.935410


ValueError: All solutions must be dominated by the reference point.

In [None]:
# %%
from typing import Sequence, Tuple


def extract_top_k_trials(
    history: Sequence[TrialResult],
    k: int,
) -> List[TrialResult]:
    """
    Return the top-k trials from `history`, sorted by descending score.
    If k >= len(history), return all trials sorted by score.
    """
    sorted_trials = sorted(history, key=lambda t: t.score, reverse=True)
    return sorted_trials[:k]


def build_refined_param_space_from_history(
    base_space: ParamSpace,
    history: Sequence[TrialResult],
    k_top: int = 5,
    shrink_factor: float = 0.5,
) -> ParamSpace:
    """
    Build a refined ParamSpace around the top-k configurations observed in `history`.

    - Numeric params (Real/Int): shrink bounds toward observed [min, max].
    - Categorical params: restrict to choices used by top-k configs.
    - Conditions are copied from the base ParamSpace.
    """
    if not history:
        raise ValueError("Cannot build refined ParamSpace: history is empty")

    top_trials = extract_top_k_trials(history, k_top)
    top_configs = [t.config for t in top_trials]

    new_params: Dict[str, Any] = {}

    for name, spec in base_space.params.items():
        values: List[Any] = []
        for cfg in top_configs:
            if name in cfg and base_space.is_active(name, cfg):
                values.append(cfg[name])

        if not values:
            new_params[name] = spec
            continue

        if isinstance(spec, Real):
            v_min = float(min(values))
            v_max = float(max(values))
            orig_low, orig_high = spec.low, spec.high
            if v_min == v_max:
                width = (orig_high - orig_low) * 0.05
                low = max(orig_low, v_min - width)
                high = min(orig_high, v_max + width)
            else:
                low = orig_low + shrink_factor * (v_min - orig_low)
                high = orig_high - shrink_factor * (orig_high - v_max)
            if low > high:
                low, high = high, low
            new_params[name] = Real(low=low, high=high, log=spec.log)

        elif isinstance(spec, Int):
            v_min = int(min(values))
            v_max = int(max(values))
            orig_low, orig_high = spec.low, spec.high
            if v_min == v_max:
                width = max(1, int(round((orig_high - orig_low) * 0.05)))
                low = max(orig_low, v_min - width)
                high = min(orig_high, v_max + width)
            else:
                low_f = orig_low + shrink_factor * (v_min - orig_low)
                high_f = orig_high - shrink_factor * (orig_high - v_max)
                low = int(round(low_f))
                high = int(round(high_f))
            if low > high:
                low, high = high, low
            new_params[name] = Int(low=low, high=high, log=spec.log)

        elif isinstance(spec, Categorical):
            original_choices = list(spec.choices)
            used_choices: List[Any] = []
            for val in values:
                if val in original_choices and val not in used_choices:
                    used_choices.append(val)
            if not used_choices:
                used_choices = original_choices
            new_params[name] = Categorical(choices=used_choices)

        else:
            new_params[name] = spec

    refined_space = ParamSpace(params=new_params, conditions=list(base_space.conditions))
    return refined_space


In [None]:
# %%
from typing import Tuple

def run_phase2_tuning(
    base_tuner: RandomSearchTuner,
    base_history: List[TrialResult],
    k_top: int = 5,
    shrink_factor: float = 0.5,
    max_trials: int = 30,
    seed: int = 123,
) -> Tuple[Dict[str, Any], List[TrialResult], RandomSearchTuner]:
    """
    Run a second-phase tuning using a refined ParamSpace built around the top-k
    configurations from base_history.

    Returns:
        best_config_phase2, history_phase2, tuner_phase2
    """
    base_task = base_tuner.task
    base_space = base_task.param_space

    refined_space = build_refined_param_space_from_history(
        base_space=base_space,
        history=base_history,
        k_top=k_top,
        shrink_factor=shrink_factor,
    )

    task_phase2 = TuningTask(
        name=f"{base_task.name}_phase2",
        param_space=refined_space,
        instances=base_task.instances,
        seeds=base_task.seeds,
        budget_per_run=base_task.budget_per_run,
        maximize=base_task.maximize,
        aggregator=base_task.aggregator,
    )

    tuner_phase2 = RandomSearchTuner(task=task_phase2, max_trials=max_trials, seed=seed)

    best_config_phase2, history_phase2 = tuner_phase2.run(eval_fn, verbose=True)

    results_dir = Path("results")
    results_dir.mkdir(parents=True, exist_ok=True)
    json_path = results_dir / f"{task_phase2.name}_history.json"
    csv_path = results_dir / f"{task_phase2.name}_history.csv"
    save_history_json(history_phase2, task_phase2.param_space, json_path)
    save_history_csv(history_phase2, task_phase2.param_space, csv_path)

    clean_best_phase2 = filter_active_config(best_config_phase2, task_phase2.param_space)
    print("Best Phase 2 config (active params only):")
    print(json.dumps(clean_best_phase2, indent=2))

    return best_config_phase2, history_phase2, tuner_phase2


In [11]:
# %%
# Run phase 2 refinement based on the first-phase tuner and history.
# Assumes `tuner` and `history` are defined from phase 1.

best_config_phase2, history_phase2, tuner_phase2 = run_phase2_tuning(
    base_tuner=tuner,
    base_history=history,
    k_top=5,
    shrink_factor=0.5,
    max_trials=30,
    seed=123,
)

print("Phase 2 tuning completed.")
print("Best Phase 2 score:", max(t.score for t in history_phase2))


Phase 1 results not found; running phase 1 with defaults...
[trial 0] max score=0.239682
[trial 1] max score=0.822120
[trial 2] max score=0.607409
[trial 3] max score=0.664700


KeyboardInterrupt: 