diff --git a/ax/core/tests/test_utils.py b/ax/core/tests/test_utils.py index ccd64b6c4ec..83208838ac2 100644 --- a/ax/core/tests/test_utils.py +++ b/ax/core/tests/test_utils.py @@ -10,7 +10,6 @@ from datetime import datetime, timedelta from unittest.mock import patch -import numpy as np import pandas as pd from ax.core.arm import Arm from ax.core.batch_trial import BatchTrial @@ -28,11 +27,8 @@ from ax.core.utils import ( _maybe_update_trial_status_to_complete, batch_trial_only, - best_feasible_objective, compute_metric_availability, extract_pending_observations, - get_missing_metrics, - get_missing_metrics_by_name, get_model_times, get_model_trace_of_times, get_pending_observation_features, @@ -40,7 +36,6 @@ get_target_trial_index, is_bandit_experiment, MetricAvailability, - MissingMetrics, ) from ax.exceptions.core import AxError from ax.utils.common.constants import Keys @@ -95,120 +90,11 @@ def setUp(self) -> None: trial_index=self.hss_trial.index, metadata=self.hss_cand_metadata, ) - self.df = pd.DataFrame( - [ - { - "arm_name": "0_0", - "mean": 2.0, - "sem": 0.2, - "trial_index": 1, - "metric_name": "a", - "start_time": "2018-01-01", - "end_time": "2018-01-02", - "metric_signature": "a", - }, - { - "arm_name": "0_0", - "mean": 1.8, - "sem": 0.3, - "trial_index": 1, - "metric_name": "b", - "start_time": "2018-01-01", - "end_time": "2018-01-02", - "metric_signature": "b", - }, - { - "arm_name": "0_1", - "mean": float("nan"), - "sem": float("nan"), - "trial_index": 1, - "metric_name": "a", - "start_time": "2018-01-01", - "end_time": "2018-01-02", - "metric_signature": "a", - }, - { - "arm_name": "0_1", - "mean": 3.7, - "sem": 0.5, - "trial_index": 1, - "metric_name": "b", - "start_time": "2018-01-01", - "end_time": "2018-01-02", - "metric_signature": "b", - }, - { - "arm_name": "0_2", - "mean": 0.5, - "sem": None, - "trial_index": 1, - "metric_name": "a", - "start_time": "2018-01-01", - "end_time": "2018-01-02", - "metric_signature": "a", - }, - { - "arm_name": "0_2", - "mean": float("nan"), - "sem": float("nan"), - "trial_index": 1, - "metric_name": "b", - "start_time": "2018-01-01", - "end_time": "2018-01-02", - "metric_signature": "b", - }, - { - "arm_name": "0_2", - "mean": float("nan"), - "sem": float("nan"), - "trial_index": 1, - "metric_name": "c", - "start_time": "2018-01-01", - "end_time": "2018-01-02", - "metric_signature": "c", - }, - ] - ) - - self.data = Data(df=self.df) - - self.optimization_config = OptimizationConfig( - objective=Objective(metric=Metric(name="a"), minimize=False), - outcome_constraints=[ - OutcomeConstraint( - metric=Metric(name="b"), - op=ComparisonOp.GEQ, - bound=0, - relative=False, - ) - ], - ) self.batch_experiment = get_branin_experiment(with_completed_trial=False) self.batch_experiment.status_quo = Arm( name="status_quo", parameters={"x1": 0.0, "x2": 0.0} ) - def test_get_missing_metrics_by_name(self) -> None: - expected = {"a": {("0_1", 1)}, "b": {("0_2", 1)}} - actual = get_missing_metrics_by_name(self.data, ["a", "b"]) - self.assertEqual(actual, expected) - - def test_get_missing_metrics(self) -> None: - expected = MissingMetrics( - {"a": {("0_1", 1)}}, - {"b": {("0_2", 1)}}, - {"c": {("0_0", 1), ("0_1", 1), ("0_2", 1)}}, - ) - actual = get_missing_metrics(self.data, self.optimization_config) - self.assertEqual(actual, expected) - - def test_best_feasible_objective(self) -> None: - bfo = best_feasible_objective( - self.optimization_config, - values={"a": np.array([1.0, 3.0, 2.0]), "b": np.array([0.0, -1.0, 0.0])}, - ) - self.assertEqual(list(bfo), [1.0, 1.0, 2.0]) - def test_get_model_times(self) -> None: exp = get_branin_experiment(num_trial=2) fit_times, gen_times = get_model_trace_of_times(exp) diff --git a/ax/core/utils.py b/ax/core/utils.py index b2af1adb463..e0e8b298f46 100644 --- a/ax/core/utils.py +++ b/ax/core/utils.py @@ -12,126 +12,29 @@ from enum import Enum from functools import wraps from logging import Logger -from typing import Any, NamedTuple +from typing import Any -import numpy as np -import numpy.typing as npt import pandas as pd from ax.core.arm import Arm from ax.core.base_trial import BaseTrial, TrialStatus from ax.core.batch_trial import BatchTrial -from ax.core.data import Data from ax.core.experiment import Experiment from ax.core.generator_run import GeneratorRun from ax.core.map_metric import MapMetric -from ax.core.objective import MultiObjective from ax.core.observation import ObservationFeatures from ax.core.optimization_config import OptimizationConfig from ax.core.trial import Trial -from ax.core.types import ComparisonOp from ax.exceptions.core import AxError from ax.utils.common.constants import Keys from ax.utils.common.logger import get_logger from pyre_extensions import none_throws logger: Logger = get_logger(__name__) -TArmTrial = tuple[str, int] # Threshold for switching to pending points extraction based on trial status. MANY_TRIALS_IN_EXPERIMENT = 100 OLD_TRIAL_THRESHOLD_DAYS = 10 -# --------------------------- Data integrity utils. --------------------------- - - -class MissingMetrics(NamedTuple): - objective: dict[str, set[TArmTrial]] - outcome_constraints: dict[str, set[TArmTrial]] - tracking_metrics: dict[str, set[TArmTrial]] - - -def get_missing_metrics( - data: Data, optimization_config: OptimizationConfig -) -> MissingMetrics: - """Return all arm_name, trial_index pairs, for which some of the - observations of optimization config metrics are missing. - - Args: - data: Data to search. - optimization_config: provides metric_names to search for. - - Returns: - A NamedTuple(missing_objective, Dict[str, missing_outcome_constraint]) - """ - objective = optimization_config.objective - if isinstance(objective, MultiObjective): - objective_metric_names = [m.name for m in objective.metrics] - else: - objective_metric_names = [optimization_config.objective.metric.name] - - outcome_constraints_metric_names = [ - outcome_constraint.metric.name - for outcome_constraint in optimization_config.outcome_constraints - ] - missing_objectives = { - objective_metric_name: _get_missing_arm_trial_pairs(data, objective_metric_name) - for objective_metric_name in objective_metric_names - } - missing_outcome_constraints = get_missing_metrics_by_name( - data, outcome_constraints_metric_names - ) - all_metric_names = set(data.df["metric_name"]) - optimization_config_metric_names = set(missing_objectives.keys()).union( - outcome_constraints_metric_names - ) - missing_tracking_metric_names = all_metric_names.difference( - optimization_config_metric_names - ) - missing_tracking_metrics = get_missing_metrics_by_name( - data=data, metric_names=missing_tracking_metric_names - ) - return MissingMetrics( - objective={k: v for k, v in missing_objectives.items() if len(v) > 0}, - outcome_constraints={ - k: v for k, v in missing_outcome_constraints.items() if len(v) > 0 - }, - tracking_metrics={ - k: v for k, v in missing_tracking_metrics.items() if len(v) > 0 - }, - ) - - -def get_missing_metrics_by_name( - data: Data, metric_names: Iterable[str] -) -> dict[str, set[TArmTrial]]: - """Return all arm_name, trial_index pairs missing some observations of - specified metrics. - - Args: - data: Data to search. - metric_names: list of metrics to search for. - - Returns: - A Dict[str, missing_metrics], one entry for each metric_name. - """ - missing_metrics = { - metric_name: _get_missing_arm_trial_pairs(data=data, metric_name=metric_name) - for metric_name in metric_names - } - return missing_metrics - - -def _get_missing_arm_trial_pairs(data: Data, metric_name: str) -> set[TArmTrial]: - """Return arm_name and trial_index pairs missing a specified metric.""" - metric_df = data.df[data.df.metric_name == metric_name] - present_metric_df = metric_df[metric_df["mean"].notnull()] - arm_trial_pairs = set(zip(data.df["arm_name"], data.df["trial_index"])) - arm_trial_pairs_with_metric = set( - zip(present_metric_df["arm_name"], present_metric_df["trial_index"]) - ) - missing_arm_trial_pairs = arm_trial_pairs.difference(arm_trial_pairs_with_metric) - return missing_arm_trial_pairs - # ------------------- Utils shared by Client and BatchClient-------------------- def _maybe_update_trial_status_to_complete( @@ -169,40 +72,6 @@ def _maybe_update_trial_status_to_complete( # -------------------- Experiment result extraction utils. --------------------- -def best_feasible_objective( - optimization_config: OptimizationConfig, - values: dict[str, npt.NDArray], -) -> npt.NDArray: - """Compute the best feasible objective value found by each iteration. - - Args: - optimization_config: Optimization config. - values: Dictionary from metric name to array of value at each - iteration. If optimization config contains outcome constraints, values - for them must be present in `values`. - - Returns: Array of cumulative best feasible value. - """ - # Get objective at each iteration - objective = optimization_config.objective - f = values[objective.metric.signature] - # Set infeasible points to have infinitely bad values - infeas_val = np.inf if objective.minimize else -np.inf - for oc in optimization_config.outcome_constraints: - if oc.relative: - raise ValueError( - "Benchmark aggregation does not support relative constraints" - ) - g = values[oc.metric.signature] - feas = g <= oc.bound if oc.op == ComparisonOp.LEQ else g >= oc.bound - f[~feas] = infeas_val - - # Get cumulative best - minimize = objective.minimize - accumulate = np.minimum.accumulate if minimize else np.maximum.accumulate - return accumulate(f) - - def _extract_generator_runs(trial: BaseTrial) -> list[GeneratorRun]: if isinstance(trial, BatchTrial): return trial.generator_runs