Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pip install neural-pipeline-search

## The 3 Main Components

1. **Establish a [`pipeline_space=`](reference/pipeline_space.md)**:
1. **Establish a [`pipeline_space`](reference/pipeline_space.md)**:

```python
pipeline_space={
Expand Down
18 changes: 18 additions & 0 deletions neps/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def run( # noqa: C901, D417, PLR0913
objective_value_on_error: float | None = None,
cost_value_on_error: float | None = None,
sample_batch_size: int | None = None,
worker_id: str | None = None,
optimizer: (
OptimizerChoice
| Mapping[str, Any]
Expand Down Expand Up @@ -249,6 +250,22 @@ def evaluate_pipeline(some_parameter: float) -> float:
evaluations, even if they were to come in relatively
quickly.

worker_id: An optional string to identify the worker (run instance).
If not provided, a `worker_id` will be automatically generated using the pattern:
`worker_<N>`, where `<N>` is a unique integer for each worker and increments with each new worker.
A list of all workers created so far is stored in
`root_directory/optimizer_state.pkl` under the attribute `worker_ids`.

??? tip "Why specify a `worker_id`?"
Specifying a `worker_id` is useful for tracking which worker performed specific tasks
in the results. For example, when debugging or running on a cluster, you can include
the process ID and machine name in the `worker_id` for better traceability.

??? warning "Duplication of `worker_id`"
Ensure that each worker has a unique `worker_id`. If a duplicate `worker_id` is detected,
the optimization process will be stopped with an error to prevent overwriting the results
of other workers.

optimizer: Which optimizer to use.

Not sure which to use? Leave this at `"auto"` and neps will
Expand Down Expand Up @@ -513,6 +530,7 @@ def __call__(
overwrite_optimization_dir=overwrite_root_directory,
sample_batch_size=sample_batch_size,
write_summary_to_disk=write_summary_to_disk,
worker_id=worker_id,
)

post_run_csv(root_directory)
Expand Down
12 changes: 5 additions & 7 deletions neps/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import datetime
import logging
import os
import shutil
Expand Down Expand Up @@ -52,11 +51,6 @@
logger = logging.getLogger(__name__)


def _default_worker_name() -> str:
isoformat = datetime.datetime.now(datetime.timezone.utc).isoformat()
return f"{os.getpid()}-{isoformat}"


_DDP_ENV_VAR_NAME = "NEPS_DDP_TRIAL_ID"


Expand Down Expand Up @@ -197,12 +191,13 @@ def new(
worker_id: str | None = None,
) -> DefaultWorker:
"""Create a new worker."""
worker_id = state.lock_and_set_new_worker_id(worker_id)
return DefaultWorker(
state=state,
optimizer=optimizer,
settings=settings,
evaluation_fn=evaluation_fn,
worker_id=worker_id if worker_id is not None else _default_worker_name(),
worker_id=worker_id,
)

def _check_worker_local_settings(
Expand Down Expand Up @@ -882,6 +877,7 @@ def _launch_runtime( # noqa: PLR0913
max_evaluations_for_worker: int | None,
sample_batch_size: int | None,
write_summary_to_disk: bool = True,
worker_id: str | None = None,
) -> None:
default_report_values = DefaultReportValues(
objective_value_on_error=objective_value_on_error,
Expand Down Expand Up @@ -926,6 +922,7 @@ def _launch_runtime( # noqa: PLR0913
)
),
shared_state=None, # TODO: Unused for the time being...
worker_ids=None,
),
)
break
Expand Down Expand Up @@ -990,5 +987,6 @@ def _launch_runtime( # noqa: PLR0913
optimizer=optimizer,
evaluation_fn=evaluation_fn,
settings=settings,
worker_id=worker_id,
)
worker.run()
43 changes: 43 additions & 0 deletions neps/state/neps_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,45 @@ class NePSState:
all_best_configs: list = field(default_factory=list)
"""Trajectory to the newest incbumbent"""

def lock_and_set_new_worker_id(self, worker_id: str | None = None) -> str:
"""Acquire the state lock and set a new worker id in the optimizer state.

Args:
worker_id: The worker id to set. If `None`, a new worker id will be generated.

Returns:
The worker id that was set.

Raises:
NePSError: If the worker id already exists.
"""
with self._optimizer_lock.lock():
with self._optimizer_state_path.open("rb") as f:
opt_state: OptimizationState = pickle.load(f) # noqa: S301
assert isinstance(opt_state, OptimizationState)
worker_id = (
worker_id
if worker_id is not None
else _get_worker_name(
len(opt_state.worker_ids)
if opt_state.worker_ids is not None
else 0
)
)
if opt_state.worker_ids and worker_id in opt_state.worker_ids:
raise NePSError(
f"Worker id '{worker_id}' already exists, \
reserved worker ids: {opt_state.worker_ids}"
)
if opt_state.worker_ids is None:
opt_state.worker_ids = []

opt_state.worker_ids.append(worker_id)
bytes_ = pickle.dumps(opt_state, protocol=pickle.HIGHEST_PROTOCOL)
with atomic_write(self._optimizer_state_path, "wb") as f:
f.write(bytes_)
return worker_id

def lock_and_read_trials(self) -> dict[str, Trial]:
"""Acquire the state lock and read the trials."""
with self._trial_lock.lock():
Expand Down Expand Up @@ -683,3 +722,7 @@ def _deserialize_optimizer_info(path: Path) -> OptimizerInfo:
f" {path}. Expected a `dict` or `None`."
)
return OptimizerInfo(name=name, info=info or {})


def _get_worker_name(idx: int) -> str:
return f"worker_{idx}"
2 changes: 2 additions & 0 deletions neps/state/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,5 @@ class OptimizationState:
Please reach out to @eddiebergman if you have a use case for this so we can make
it more robust.
"""
worker_ids: list[str] | None = None
"""The list of workers that have been created so far."""
4 changes: 3 additions & 1 deletion neps_examples/basic_usage/hyperparameters.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
import numpy as np
import neps

import socket
import os
# This example demonstrates how to use NePS to optimize hyperparameters
# of a pipeline. The pipeline is a simple function that takes in
# five hyperparameters and returns their sum.
Expand All @@ -28,4 +29,5 @@ def evaluate_pipeline(float1, float2, categorical, integer1, integer2):
pipeline_space=pipeline_space,
root_directory="results/hyperparameters_example",
evaluations_to_spend=30,
worker_id=f"worker_1-{socket.gethostname()}-{os.getpid()}",
)
93 changes: 93 additions & 0 deletions tests/test_runtime/test_worker_creation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from __future__ import annotations

from pathlib import Path

import pytest

from neps.optimizers import OptimizerInfo
from neps.optimizers.algorithms import random_search
from neps.runtime import (
DefaultReportValues,
DefaultWorker,
OnErrorPossibilities,
WorkerSettings,
)
from neps.space import Float, SearchSpace
from neps.state import NePSState, OptimizationState, SeedSnapshot


@pytest.fixture
def neps_state(tmp_path: Path) -> NePSState:
return NePSState.create_or_load(
path=tmp_path / "neps_state",
optimizer_info=OptimizerInfo(name="blah", info={"nothing": "here"}),
optimizer_state=OptimizationState(
budget=None, seed_snapshot=SeedSnapshot.new_capture(), shared_state={}
),
)


def test_create_worker_manual_id(neps_state: NePSState) -> None:
settings = WorkerSettings(
on_error=OnErrorPossibilities.IGNORE,
default_report_values=DefaultReportValues(),
evaluations_to_spend=1,
include_in_progress_evaluations_towards_maximum=True,
cost_to_spend=None,
max_evaluations_for_worker=None,
max_evaluation_time_total_seconds=None,
max_wallclock_time_for_worker_seconds=None,
max_evaluation_time_for_worker_seconds=None,
max_cost_for_worker=None,
batch_size=None,
fidelities_to_spend=None,
)

def eval_fn(config: dict) -> float:
return 1.0

test_worker_id = "my_worker_123"
optimizer = random_search(SearchSpace({"a": Float(0, 1)}))

worker = DefaultWorker.new(
state=neps_state,
settings=settings,
optimizer=optimizer,
evaluation_fn=eval_fn,
worker_id=test_worker_id,
)

assert worker.worker_id == test_worker_id
assert neps_state.lock_and_get_optimizer_state().worker_ids == [test_worker_id]


def test_create_worker_auto_id(neps_state: NePSState) -> None:
settings = WorkerSettings(
on_error=OnErrorPossibilities.IGNORE,
default_report_values=DefaultReportValues(),
evaluations_to_spend=1,
include_in_progress_evaluations_towards_maximum=True,
cost_to_spend=None,
max_evaluations_for_worker=None,
max_evaluation_time_total_seconds=None,
max_wallclock_time_for_worker_seconds=None,
max_evaluation_time_for_worker_seconds=None,
max_cost_for_worker=None,
batch_size=None,
fidelities_to_spend=None,
)

def eval_fn(config: dict) -> float:
return 1.0

optimizer = random_search(SearchSpace({"a": Float(0, 1)}))

worker = DefaultWorker.new(
state=neps_state,
settings=settings,
optimizer=optimizer,
evaluation_fn=eval_fn,
)

assert worker.worker_id == "worker_0"
assert neps_state.lock_and_get_optimizer_state().worker_ids == [worker.worker_id]
Loading