diff --git a/docs/getting_started.md b/docs/getting_started.md index 55493f58b..b4fa8aa46 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -13,7 +13,7 @@ pip install neural-pipeline-search ## The 3 Main Components -1. **Establish a [`pipeline_space=`](reference/pipeline_space.md)**: +1. **Establish a [`pipeline_space`](reference/pipeline_space.md)**: ```python pipeline_space={ diff --git a/neps/api.py b/neps/api.py index b7c37ca1b..2ba184c40 100644 --- a/neps/api.py +++ b/neps/api.py @@ -46,6 +46,7 @@ def run( # noqa: C901, D417, PLR0913 objective_value_on_error: float | None = None, cost_value_on_error: float | None = None, sample_batch_size: int | None = None, + worker_id: str | None = None, optimizer: ( OptimizerChoice | Mapping[str, Any] @@ -249,6 +250,22 @@ def evaluate_pipeline(some_parameter: float) -> float: evaluations, even if they were to come in relatively quickly. + worker_id: An optional string to identify the worker (run instance). + If not provided, a `worker_id` will be automatically generated using the pattern: + `worker_`, where `` is a unique integer for each worker and increments with each new worker. + A list of all workers created so far is stored in + `root_directory/optimizer_state.pkl` under the attribute `worker_ids`. + + ??? tip "Why specify a `worker_id`?" + Specifying a `worker_id` is useful for tracking which worker performed specific tasks + in the results. For example, when debugging or running on a cluster, you can include + the process ID and machine name in the `worker_id` for better traceability. + + ??? warning "Duplication of `worker_id`" + Ensure that each worker has a unique `worker_id`. If a duplicate `worker_id` is detected, + the optimization process will be stopped with an error to prevent overwriting the results + of other workers. + optimizer: Which optimizer to use. Not sure which to use? Leave this at `"auto"` and neps will @@ -513,6 +530,7 @@ def __call__( overwrite_optimization_dir=overwrite_root_directory, sample_batch_size=sample_batch_size, write_summary_to_disk=write_summary_to_disk, + worker_id=worker_id, ) post_run_csv(root_directory) diff --git a/neps/runtime.py b/neps/runtime.py index 30915a7fb..28b230978 100644 --- a/neps/runtime.py +++ b/neps/runtime.py @@ -2,7 +2,6 @@ from __future__ import annotations -import datetime import logging import os import shutil @@ -52,11 +51,6 @@ logger = logging.getLogger(__name__) -def _default_worker_name() -> str: - isoformat = datetime.datetime.now(datetime.timezone.utc).isoformat() - return f"{os.getpid()}-{isoformat}" - - _DDP_ENV_VAR_NAME = "NEPS_DDP_TRIAL_ID" @@ -197,12 +191,13 @@ def new( worker_id: str | None = None, ) -> DefaultWorker: """Create a new worker.""" + worker_id = state.lock_and_set_new_worker_id(worker_id) return DefaultWorker( state=state, optimizer=optimizer, settings=settings, evaluation_fn=evaluation_fn, - worker_id=worker_id if worker_id is not None else _default_worker_name(), + worker_id=worker_id, ) def _check_worker_local_settings( @@ -882,6 +877,7 @@ def _launch_runtime( # noqa: PLR0913 max_evaluations_for_worker: int | None, sample_batch_size: int | None, write_summary_to_disk: bool = True, + worker_id: str | None = None, ) -> None: default_report_values = DefaultReportValues( objective_value_on_error=objective_value_on_error, @@ -926,6 +922,7 @@ def _launch_runtime( # noqa: PLR0913 ) ), shared_state=None, # TODO: Unused for the time being... + worker_ids=None, ), ) break @@ -990,5 +987,6 @@ def _launch_runtime( # noqa: PLR0913 optimizer=optimizer, evaluation_fn=evaluation_fn, settings=settings, + worker_id=worker_id, ) worker.run() diff --git a/neps/state/neps_state.py b/neps/state/neps_state.py index 67d8bd9a8..ba46c32cd 100644 --- a/neps/state/neps_state.py +++ b/neps/state/neps_state.py @@ -257,6 +257,45 @@ class NePSState: all_best_configs: list = field(default_factory=list) """Trajectory to the newest incbumbent""" + def lock_and_set_new_worker_id(self, worker_id: str | None = None) -> str: + """Acquire the state lock and set a new worker id in the optimizer state. + + Args: + worker_id: The worker id to set. If `None`, a new worker id will be generated. + + Returns: + The worker id that was set. + + Raises: + NePSError: If the worker id already exists. + """ + with self._optimizer_lock.lock(): + with self._optimizer_state_path.open("rb") as f: + opt_state: OptimizationState = pickle.load(f) # noqa: S301 + assert isinstance(opt_state, OptimizationState) + worker_id = ( + worker_id + if worker_id is not None + else _get_worker_name( + len(opt_state.worker_ids) + if opt_state.worker_ids is not None + else 0 + ) + ) + if opt_state.worker_ids and worker_id in opt_state.worker_ids: + raise NePSError( + f"Worker id '{worker_id}' already exists, \ + reserved worker ids: {opt_state.worker_ids}" + ) + if opt_state.worker_ids is None: + opt_state.worker_ids = [] + + opt_state.worker_ids.append(worker_id) + bytes_ = pickle.dumps(opt_state, protocol=pickle.HIGHEST_PROTOCOL) + with atomic_write(self._optimizer_state_path, "wb") as f: + f.write(bytes_) + return worker_id + def lock_and_read_trials(self) -> dict[str, Trial]: """Acquire the state lock and read the trials.""" with self._trial_lock.lock(): @@ -683,3 +722,7 @@ def _deserialize_optimizer_info(path: Path) -> OptimizerInfo: f" {path}. Expected a `dict` or `None`." ) return OptimizerInfo(name=name, info=info or {}) + + +def _get_worker_name(idx: int) -> str: + return f"worker_{idx}" diff --git a/neps/state/optimizer.py b/neps/state/optimizer.py index 7d4787cce..8eab08a61 100644 --- a/neps/state/optimizer.py +++ b/neps/state/optimizer.py @@ -47,3 +47,5 @@ class OptimizationState: Please reach out to @eddiebergman if you have a use case for this so we can make it more robust. """ + worker_ids: list[str] | None = None + """The list of workers that have been created so far.""" diff --git a/neps_examples/basic_usage/hyperparameters.py b/neps_examples/basic_usage/hyperparameters.py index 6b1c6a5d7..0f3fdc898 100644 --- a/neps_examples/basic_usage/hyperparameters.py +++ b/neps_examples/basic_usage/hyperparameters.py @@ -1,7 +1,8 @@ import logging import numpy as np import neps - +import socket +import os # This example demonstrates how to use NePS to optimize hyperparameters # of a pipeline. The pipeline is a simple function that takes in # five hyperparameters and returns their sum. @@ -28,4 +29,5 @@ def evaluate_pipeline(float1, float2, categorical, integer1, integer2): pipeline_space=pipeline_space, root_directory="results/hyperparameters_example", evaluations_to_spend=30, + worker_id=f"worker_1-{socket.gethostname()}-{os.getpid()}", ) diff --git a/tests/test_runtime/test_worker_creation.py b/tests/test_runtime/test_worker_creation.py new file mode 100644 index 000000000..4b741640a --- /dev/null +++ b/tests/test_runtime/test_worker_creation.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from pathlib import Path + +import pytest + +from neps.optimizers import OptimizerInfo +from neps.optimizers.algorithms import random_search +from neps.runtime import ( + DefaultReportValues, + DefaultWorker, + OnErrorPossibilities, + WorkerSettings, +) +from neps.space import Float, SearchSpace +from neps.state import NePSState, OptimizationState, SeedSnapshot + + +@pytest.fixture +def neps_state(tmp_path: Path) -> NePSState: + return NePSState.create_or_load( + path=tmp_path / "neps_state", + optimizer_info=OptimizerInfo(name="blah", info={"nothing": "here"}), + optimizer_state=OptimizationState( + budget=None, seed_snapshot=SeedSnapshot.new_capture(), shared_state={} + ), + ) + + +def test_create_worker_manual_id(neps_state: NePSState) -> None: + settings = WorkerSettings( + on_error=OnErrorPossibilities.IGNORE, + default_report_values=DefaultReportValues(), + evaluations_to_spend=1, + include_in_progress_evaluations_towards_maximum=True, + cost_to_spend=None, + max_evaluations_for_worker=None, + max_evaluation_time_total_seconds=None, + max_wallclock_time_for_worker_seconds=None, + max_evaluation_time_for_worker_seconds=None, + max_cost_for_worker=None, + batch_size=None, + fidelities_to_spend=None, + ) + + def eval_fn(config: dict) -> float: + return 1.0 + + test_worker_id = "my_worker_123" + optimizer = random_search(SearchSpace({"a": Float(0, 1)})) + + worker = DefaultWorker.new( + state=neps_state, + settings=settings, + optimizer=optimizer, + evaluation_fn=eval_fn, + worker_id=test_worker_id, + ) + + assert worker.worker_id == test_worker_id + assert neps_state.lock_and_get_optimizer_state().worker_ids == [test_worker_id] + + +def test_create_worker_auto_id(neps_state: NePSState) -> None: + settings = WorkerSettings( + on_error=OnErrorPossibilities.IGNORE, + default_report_values=DefaultReportValues(), + evaluations_to_spend=1, + include_in_progress_evaluations_towards_maximum=True, + cost_to_spend=None, + max_evaluations_for_worker=None, + max_evaluation_time_total_seconds=None, + max_wallclock_time_for_worker_seconds=None, + max_evaluation_time_for_worker_seconds=None, + max_cost_for_worker=None, + batch_size=None, + fidelities_to_spend=None, + ) + + def eval_fn(config: dict) -> float: + return 1.0 + + optimizer = random_search(SearchSpace({"a": Float(0, 1)})) + + worker = DefaultWorker.new( + state=neps_state, + settings=settings, + optimizer=optimizer, + evaluation_fn=eval_fn, + ) + + assert worker.worker_id == "worker_0" + assert neps_state.lock_and_get_optimizer_state().worker_ids == [worker.worker_id]