Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve resiliency of trial reservation #693

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions docs/src/user/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Full Example of Global Configuration
heartbeat: 120
interrupt_signal_code: 130
max_broken: 10
max_idle_time: 60
reservation_timeout: 60
max_trials: 1000000000
user_script_config: config

Expand Down Expand Up @@ -365,7 +365,7 @@ Worker
heartbeat: 120
interrupt_signal_code: 130
max_broken: 10
max_idle_time: 60
reservation_timeout: 60
max_trials: 1000000000
user_script_config: config

Expand Down Expand Up @@ -464,21 +464,37 @@ max_broken
Maximum number of broken trials before worker stops.


.. _config_worker_reservation_timeout:

reservation_timeout
~~~~~~~~~~~~~~~~~~~

:Type: int
:Default: 60
:Env var: ORION_RESERVATION_TIMEOUT
:Description:
Maximum time the experiment can spend trying to reserve a new suggestion. Such timeout are
generally caused by slow database, large number of concurrent workers leading to many race
conditions or small search spaces with integer/categorical dimensions that may be fully
explored.


.. _config_worker_max_idle_time:

max_idle_time
~~~~~~~~~~~~~

.. warning::

**DEPRECATED.** This argument will be removed in v0.3.
Use :ref:`config_worker_reservation_timeout` instead.

:Type: int
:Default: 60
:Env var: ORION_MAX_IDLE_TIME
:Description:
Maximum time the producer can spend trying to generate a new suggestion.Such timeout are
generally caused by slow database, large number of concurrent workers leading to many race
conditions or small search spaces with integer/categorical dimensions that may be fully
explored.

(DEPRECATED) This argument will be removed in v0.3. Use :ref:`config_worker_reservation_timeout`
instead.


.. _config_worker_interrupt_signal_code:
Expand Down
8 changes: 4 additions & 4 deletions src/orion/algo/hyperband.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def tabulate_status(brackets):
row.append(r_i)
data.append(row)
table = tabulate(data, header, tablefmt="github")
logger.debug(table)
logger.info(table)


def display_budgets(budgets_tab, max_resources, reduction_factor):
Expand All @@ -116,7 +116,7 @@ def display_budgets(budgets_tab, max_resources, reduction_factor):
table_str += "max resource={}, eta={}, trials number of one execution={}\n".format(
max_resources, reduction_factor, total_trials
)
logger.debug(table_str)
logger.info(table_str)


class Hyperband(BaseAlgorithm):
Expand Down Expand Up @@ -360,7 +360,7 @@ def executed_times(self):
def _refresh_brackets(self):
"""Refresh bracket if one hyperband execution is done"""
if all(bracket.is_done for bracket in self.brackets):
logger.debug(
logger.info(
"Hyperband execution %i is done, required to execute %s times",
self.executed_times,
str(self.repetitions),
Expand Down Expand Up @@ -399,7 +399,7 @@ def observe(self, trials):
for trial in trials:

if not self.has_suggested(trial):
logger.info(
logger.debug(
"Ignoring trial %s because it was not sampled by current algo.",
trial,
)
Expand Down
18 changes: 12 additions & 6 deletions src/orion/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
Provides functions for communicating with `orion.core`.

"""
import logging

import orion.core.io.experiment_builder as experiment_builder
from orion.client.cli import (
interrupt_trial,
Expand All @@ -30,6 +32,8 @@
"workon",
]

log = logging.getLogger(__name__)


def create_experiment(name, **config):
"""Build an experiment to be executable
Expand Down Expand Up @@ -136,11 +140,8 @@ def build_experiment(
Working directory created for the experiment inside which a unique folder will be created
for each trial. Defaults to a temporary directory that is deleted at end of execution.
max_idle_time: int, optional
Maximum time the producer can spend trying to generate a new suggestion.
Such timeout are generally caused by slow database, large number of
concurrent workers leading to many race conditions or small search spaces
with integer/categorical dimensions that may be fully explored.
Defaults to ``orion.core.config.worker.max_idle_time``.
Deprecated and will be removed in v0.3.0.
Use experiment.workon(reservation_timeout) instead.
heartbeat: int, optional
Frequency (seconds) at which the heartbeat of the trial is updated.
If the heartbeat of a `reserved` trial is larger than twice the configured
Expand Down Expand Up @@ -202,6 +203,11 @@ def build_experiment(
If the algorithm, storage or strategy specified is not properly installed.

"""
if max_idle_time:
log.warning(
"max_idle_time is deprecated. Use experiment.workon(reservation_timeout) instead."
)

setup_storage(storage=storage, debug=debug)

try:
Expand Down Expand Up @@ -241,7 +247,7 @@ def build_experiment(
"repository."
) from e

producer = Producer(experiment, max_idle_time)
producer = Producer(experiment)

return ExperimentClient(experiment, producer, executor, heartbeat)

Expand Down
103 changes: 79 additions & 24 deletions src/orion/client/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"""
import inspect
import logging
import time
import traceback
from contextlib import contextmanager

Expand All @@ -18,7 +19,7 @@
BrokenExperiment,
CompletedExperiment,
InvalidResult,
SampleTimeout,
ReservationTimeout,
UnsupportedOperation,
WaitingForTrials,
)
Expand All @@ -32,28 +33,51 @@
log = logging.getLogger(__name__)


def reserve_trial(experiment, producer, pool_size, _depth=1):
def reserve_trial(experiment, producer, pool_size, timeout):
"""Reserve a new trial, or produce and reserve a trial if none are available."""
log.debug("Trying to reserve a new trial to evaluate.")
trial = experiment.reserve_trial()

if trial is None and not producer.is_done:
trial = None
start = time.time()
failure_count = 0

if _depth > 10:
raise WaitingForTrials(
"No trials are available at the moment "
"wait for current trials to finish"
)
n_trials_at_start = len(experiment.fetch_trials())

log.debug("#### Failed to pull a new trial from database.")
while (
trial is None
and not (experiment.is_done or experiment.is_broken)
and time.time() - start < timeout
):
trial = experiment.reserve_trial()

log.debug("#### Fetch most recent completed trials and update algorithm.")
producer.update()
if trial is not None:
break

log.debug("#### Produce new trials.")
producer.produce(pool_size)
failure_count += 1

return reserve_trial(experiment, producer, pool_size, _depth=_depth + 1)
# TODO: Add backoff
log.debug(
"#### Failed %s time to pull a new trial from database.",
failure_count,
)

if not (experiment.is_done or experiment.is_broken):
log.debug("#### Fetch most recent completed trials and update algorithm.")
producer.update()

log.debug("#### Produce new trials.")
produced = producer.produce(pool_size)
log.debug("#### %s trials produced.", produced)

if trial is None and time.time() - start > timeout:
new_trials_meanwhile = len(experiment.fetch_trials()) - n_trials_at_start
raise ReservationTimeout(
f"Unable to reserve a trial in less than {timeout} seconds. "
f"Failed to reserve {failure_count} times. "
f"{new_trials_meanwhile} new trials were generated meanwhile and reserved "
"by other workers. Consider increasing worker.pool_size if you have many workers "
"or increasing worker.reservation_timeout if only a few trials were generated."
)

return trial

Expand Down Expand Up @@ -500,7 +524,7 @@ def release(self, trial, status="interrupted"):
finally:
self._release_reservation(trial, raise_if_unreserved=raise_if_unreserved)

def suggest(self, pool_size=0):
def suggest(self, pool_size=0, timeout=None):
"""Suggest a trial to execute.

Experiment must be in executable ('x') mode.
Expand All @@ -518,6 +542,12 @@ def suggest(self, pool_size=0):
trials but may return less. Note: The method will still return only 1 trial even though
if the pool size is larger than 1. This is because atomic reservation of trials
can only be done one at a time.
timeout: int, optional
Maximum time allowed to try reserving a trial. ReservationTimeout will be raised if
timeout is reached. Such timeout are generally caused by slow database, large number
of concurrent workers leading to many race conditions or small search spaces with
integer/categorical dimensions that may be fully explored.
Defaults to ``orion.core.config.worker.reservation_timeout``.

Returns
-------
Expand All @@ -534,8 +564,8 @@ def suggest(self, pool_size=0):
if too many trials failed to run and the experiment cannot continue.
This is determined by ``max_broken`` in the configuration of the experiment.

:class:`orion.core.utils.exceptions.SampleTimeout`
if the algorithm of the experiment could not sample new unique trials.
:class:`orion.core.utils.exceptions.ReservationTimeout`
If a trial could not be reserved in less than ``timeout`` seconds.

:class:`orion.core.utils.exceptions.CompletedExperiment`
if the experiment was completed and algorithm could not sample new trials.
Expand All @@ -549,6 +579,8 @@ def suggest(self, pool_size=0):
pool_size = orion.core.config.worker.pool_size
if not pool_size:
pool_size = 1
if not timeout:
timeout = orion.core.config.worker.reservation_timeout

if self.is_broken:
raise BrokenExperiment("Trials failed too many times")
Expand All @@ -557,17 +589,19 @@ def suggest(self, pool_size=0):
raise CompletedExperiment("Experiment is done, cannot sample more trials.")

try:
trial = reserve_trial(self._experiment, self._producer, pool_size)
trial = reserve_trial(self._experiment, self._producer, pool_size, timeout)

except (WaitingForTrials, SampleTimeout) as e:
except (WaitingForTrials, ReservationTimeout) as e:
if self.is_broken:
raise BrokenExperiment("Trials failed too many times") from e

raise e

# This is to handle cases where experiment was completed during call to `reserve_trial`
if trial is None:
if trial is None and self.is_done:
raise CompletedExperiment("Producer is done, cannot sample more trials.")
elif trial is None and self.is_broken:
raise BrokenExperiment("Trials failed too many times")

self._maintain_reservation(trial)
return TrialCM(self, trial)
Expand Down Expand Up @@ -649,6 +683,7 @@ def workon(
fct,
n_workers=None,
pool_size=0,
reservation_timeout=None,
max_trials=None,
max_trials_per_worker=None,
max_broken=None,
Expand All @@ -673,6 +708,12 @@ def workon(
config if defined. Increase it to improve the sampling speed if workers spend too much
time waiting for algorithms to sample points. An algorithm will try sampling
`pool_size` trials but may return less.
reservation_timeout: int, optional
Maximum time allowed to try reserving a trial. ReservationTimeout will be raised if
timeout is reached. Such timeout are generally caused by slow database, large number
of concurrent workers leading to many race conditions or small search spaces with
integer/categorical dimensions that may be fully explored.
Defaults to ``orion.core.config.worker.reservation_timeout``.
max_trials: int, optional
Maximum number of trials to execute within ``workon``. If the experiment or algorithm
reach status is_done before, the execution of ``workon`` terminates.
Expand Down Expand Up @@ -714,7 +755,7 @@ def workon(
if too many trials failed to run and the experiment cannot continue.
This is determined by ``max_broken`` in the configuration of the experiment.

:class:`orion.core.utils.exceptions.SampleTimeout`
:class:`orion.core.utils.exceptions.ReservationTimeout`
if the algorithm of the experiment could not sample new unique points.

:class:`orion.core.utils.exceptions.UnsupportedOperation`
Expand All @@ -738,6 +779,9 @@ def workon(
if not pool_size:
pool_size = n_workers

if not reservation_timeout:
reservation_timeout = orion.core.config.worker.reservation_timeout

if max_trials is None:
max_trials = self.max_trials

Expand All @@ -758,6 +802,7 @@ def workon(
self._optimize,
fct,
pool_size,
reservation_timeout,
max_trials_per_worker,
max_broken,
trial_arg,
Expand All @@ -770,15 +815,25 @@ def workon(
return sum(trials)

def _optimize(
self, fct, pool_size, max_trials, max_broken, trial_arg, on_error, **kwargs
self,
fct,
pool_size,
reservation_timeout,
max_trials,
max_broken,
trial_arg,
on_error,
**kwargs,
):
worker_broken_trials = 0
trials = 0
kwargs = flatten(kwargs)
max_trials = min(max_trials, self.max_trials)
while not self.is_done and trials - worker_broken_trials < max_trials:
try:
with self.suggest(pool_size=pool_size) as trial:
with self.suggest(
pool_size=pool_size, timeout=reservation_timeout
) as trial:

kwargs.update(flatten(trial.params))

Expand Down
17 changes: 16 additions & 1 deletion src/orion/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,23 @@ def define_worker_config(config):
option_type=int,
default=60,
env_var="ORION_MAX_IDLE_TIME",
deprecate=dict(
version="v0.3",
alternative="worker.reservation_timeout",
name="worker.max_idle_time",
),
help=(
"This argument will be removed in v0.3.0. Use reservation_timeout instead."
),
)

worker_config.add_option(
"reservation_timeout",
option_type=int,
default=60,
env_var="ORION_RESERVATION_TIMEOUT",
help=(
"Maximum time the producer can spend trying to generate a new suggestion."
"Maximum time the experiment can spend trying to reserve a new suggestion."
"Such timeout are generally caused by slow database, large number of "
"concurrent workers leading to many race conditions or small search spaces "
"with integer/categorical dimensions that may be fully explored."
Expand Down
Loading