From cf1df3edb2d0a19e10b5652d16a28b541e6f25ea Mon Sep 17 00:00:00 2001 From: Abin Shahab Date: Mon, 13 Dec 2021 09:51:44 -0800 Subject: [PATCH] Support resurrecting blacklisted hosts This adds support for resurrecting blacklisted hosts in elastic mode. Currently hosts that get blacklisted remain in the blacklist for the lifetime of the job. This cannot handle transient host failure or a scale-up after as scale-down. This is especially the case for the Kubeflow mpi-operator on Kubernetes, as it always gives pods known hostnames from its hostfile. This patch will allow blacklisted hosts to become whitelisted after a configured countdown period. Cooldown periods can be configured with the ``--blacklist-cooldown-range`` parameter like this: .. code-block:: bash $ horovodrun -np 8 --blacklist-cooldown-range 10 100 --min-np 4 --max-np 12 --host-discovery-script discover_hosts.py python train.py The above example configures the minimum cooldown period to 10 seconds and the maximum cooldown period to 100 seconds. The intial cooldown period would be 10 seconds. For repeat failures the cooldown period would grow with an exponential backoff delay: 10s, 20s, 30s, and so on. However, the maximum cooldown period would be capped at 100 seconds, regardless of failure count. The default behavior is to have no cooldown period, and blacklisted hosts would remain in blacklist. Signed-off-by: Abin Shahab --- CHANGELOG.md | 2 + docs/elastic.rst | 16 +++++- horovod/ray/elastic_v2.py | 8 +++ horovod/ray/runner.py | 2 + horovod/runner/__init__.py | 4 ++ horovod/runner/elastic/discovery.py | 89 +++++++++++++++++++++++++---- horovod/runner/elastic/driver.py | 4 +- horovod/runner/elastic/settings.py | 5 +- horovod/runner/gloo_run.py | 1 + horovod/runner/launch.py | 13 +++-- test/integration/elastic_common.py | 77 ++++++++++++++++++++++++- test/single/test_ray_elastic_v2.py | 25 ++++++++ 12 files changed, 223 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0093a4377e..dec5d0dc5f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - TensorFlow: Added in-place broadcasting of variables. ([#3128](https://github.com/horovod/horovod/pull/3128)) +- Added support for resurrecting blacklisted hosts. ([#3319](https://github.com/horovod/horovod/pull/3319)) + ### Changed ### Deprecated diff --git a/docs/elastic.rst b/docs/elastic.rst index 6f66a254ab..53925b30c2 100644 --- a/docs/elastic.rst +++ b/docs/elastic.rst @@ -332,8 +332,20 @@ The maximum np can be used to cap the number of processes (to prevent over-utili as a reference point for learning rate scales and data partitions (in cases where these need to be held constant regardless of the current number of workers). If unspecified, maximum np also defaults to ``-np``. -Instances that fail will be added to a blacklist, as they may have faulty hardware. Ranks that fail repeatedly -will result in job failure, as it may be the case that the training process cannot make progress. +Instances that fail will be added to a blacklist, as they may have faulty hardware. Hosts will remain in blacklist for a configured cooldown period. +After the cooldown period ends, the hosts will be whitelisted back. This is to account for transient failures, and cases where the same host +is added back to a job. +Cooldown periods can be configured with the ``--blacklist-cooldown-range`` parameter like this: + +.. code-block:: bash + $ horovodrun -np 8 --blacklist-cooldown-range 10 100 --min-np 4 --max-np 12 --host-discovery-script discover_hosts.py python train.py + +The above example configures the minimum cooldown period to 10 seconds and the maximum cooldown period to 100 seconds. +The intial cooldown period would be 10 seconds. For repeat failures the cooldown period would grow with an exponential +backoff delay: 10s, 20s, 30s, and so on. However, the maximum cooldown period would be capped at 100 seconds, regardless +of failure count. The default behavior is to have no cooldown period, and blacklisted hosts would remain in blacklist. + +Ranks that fail repeatedly will result in job failure, as it may be the case that the training process cannot make progress. Running on Ray diff --git a/horovod/ray/elastic_v2.py b/horovod/ray/elastic_v2.py index 20f92d2b19..379c455aba 100644 --- a/horovod/ray/elastic_v2.py +++ b/horovod/ray/elastic_v2.py @@ -162,6 +162,8 @@ class ElasticParams(BaseParams): reset_limit (int): Maximum number of times that the training job can scale up or down the number of workers after which the job is terminated. + cooldown_range(int range): Range of seconds(min, max) a failing + host will remain in blacklist. elastic_timeout (int): Timeout for elastic initialisation after re-scaling the cluster. The default value is 600 seconds. Alternatively, the environment variable @@ -177,6 +179,7 @@ class ElasticParams(BaseParams): min_workers: int = 1 max_workers: int = None reset_limit: int = None + cooldown_range: List[int] = None elastic_timeout: int = 600 override_discovery: bool = True @@ -205,6 +208,8 @@ class ElasticAdapter(Adapter): reset_limit (int): Maximum number of times that the training job can scale up or down the number of workers after which the job is terminated. + cooldown_range (int range): Range of seconds(min, max) a failing + host will remain in blacklist. elastic_timeout (int): Timeout for elastic initialisation after re-scaling the cluster. The default value is 600 seconds. Alternatively, the environment variable @@ -228,6 +233,7 @@ def __init__(self, gpus_per_worker: Optional[int] = None, override_discovery: bool=True, reset_limit: int = None, + cooldown_range: List[int] = None, elastic_timeout: int = 600): self.settings = settings if override_discovery: @@ -243,6 +249,7 @@ def __init__(self, self.max_workers = max_workers self.num_workers = min_workers self.reset_limit = reset_limit + self.cooldown_range = cooldown_range self.elastic_timeout = elastic_timeout self.driver = None self.rendezvous = None @@ -275,6 +282,7 @@ def start(self, max_np=self.max_workers, timeout=self.elastic_timeout, reset_limit=self.reset_limit, + cooldown_range=self.cooldown_range, verbose=self.settings.verbose) handler = create_rendezvous_handler(self.driver) logger.debug("[ray] starting rendezvous") diff --git a/horovod/ray/runner.py b/horovod/ray/runner.py index ab94d18424..1e9a65b632 100644 --- a/horovod/ray/runner.py +++ b/horovod/ray/runner.py @@ -253,6 +253,7 @@ def __init__( min_workers: int = None, max_workers: int = None, reset_limit: int = None, + cooldown_range: List[int] = None, elastic_timeout: int = 600, override_discovery: bool = True ): @@ -268,6 +269,7 @@ def __init__( min_workers=min_workers, max_workers=max_workers, reset_limit=reset_limit, + cooldown_range=cooldown_range, elastic_timeout=elastic_timeout, override_discovery=override_discovery, cpus_per_worker=cpus_per_worker, diff --git a/horovod/runner/__init__.py b/horovod/runner/__init__.py index ada877222b..1b03554bf4 100644 --- a/horovod/runner/__init__.py +++ b/horovod/runner/__init__.py @@ -54,6 +54,7 @@ def __init__(self): self.slots = None self.elastic_timeout = None self.reset_limit = None + self.cooldown_range = None # timeline arguments self.timeline_filename = None @@ -98,6 +99,7 @@ def run( max_np=None, slots=None, reset_limit=None, + cooldown_range=None, hosts=None, hostfile=None, start_timeout=None, @@ -133,6 +135,7 @@ def run( job after the initial registration. So a reset_limit of 0 would mean the job cannot change membership after its initial set of workers. A reset_limit of 1 means it can resize at most once, etc. + :param cooldown_range: Range of seconds(min, max) a failing host will remain in blacklist. :param hosts: List of host names and the number of available slots for running processes on each, of the form: : @@ -192,6 +195,7 @@ def wrapped_func(): hargs.max_np = max_np hargs.slots = slots hargs.reset_limit = reset_limit + hargs.cooldown_range = cooldown_range hargs.hosts = hosts hargs.hostfile = hostfile hargs.start_timeout = start_timeout diff --git a/horovod/runner/elastic/discovery.py b/horovod/runner/elastic/discovery.py index 78233dcb30..8001ec000f 100644 --- a/horovod/runner/elastic/discovery.py +++ b/horovod/runner/elastic/discovery.py @@ -15,20 +15,33 @@ import io import logging +import random import threading - +import time from collections import defaultdict from horovod.runner.common.util import safe_shell_exec from horovod.runner.elastic.worker import HostUpdateResult - - +# Maximum time hosts are in cooldown period +COOLDOWN_UPPER_LIMIT_SECONDS = 5 * 60 +# Minimum time hosts are in cooldown +COOLDOWN_LOWER_LIMIT_SECONDS = 10 +# Unit of increasing cooldown for repeat failure +COOLDOWN_RANGE_SECONDS = [COOLDOWN_LOWER_LIMIT_SECONDS, COOLDOWN_UPPER_LIMIT_SECONDS] class HostState(object): - def __init__(self): + + def __init__(self, cooldown_range=None): self._event = threading.Event() - # TODO(travis): blacklisted hosts should have a timeout period that increases with each failure self._blacklisted = False + self._blacklist_count = 0 + if cooldown_range: + self._cooldown_lower_limit = cooldown_range[0] + self._cooldown_upper_limit = cooldown_range[1] + else: + self._cooldown_lower_limit = -1 + self._cooldown_upper_limit = -1 + self._cooldown_period_end_ts = 0 def get_event(self): if self._event.is_set(): @@ -39,13 +52,48 @@ def get_event(self): def set_event(self): self._event.set() + def _in_cooldown_period(self, current_time): + return self._cooldown_period_end_ts > current_time + + + def _set_cooldown_period(self, current_time): + if self._cooldown_lower_limit == -1 or self._cooldown_upper_limit == -1: + return + self._blacklist_count += 1 + def _exponential_backoff_time(): + cooldown_delay = self._cooldown_lower_limit * (1 << self._blacklist_count) + + (random.uniform(0,1) * self._cooldown_lower_limit) + logging.debug(f"{self._blacklist_count}:{self._cooldown_period_end_ts} cooldown_delay: {cooldown_delay}") + return max(self._cooldown_lower_limit, min(self._cooldown_upper_limit, cooldown_delay)) + cooldown_delta_seconds = _exponential_backoff_time() + self._cooldown_period_end_ts = current_time + cooldown_delta_seconds + logging.debug(f"cooldown delta seconds: {cooldown_delta_seconds}") + def blacklist(self): + """Moves this host to a blacklist, and starts the cooldown period.""" self._blacklisted = True + now = time.time() + if self._in_cooldown_period(now): + return + self._set_cooldown_period(now) self.set_event() + def whitelist(self): + """Ends the cooldown period and moves this host out of blacklist.""" + self._cooldown_period_end_ts = 0 + self._blacklisted = False + def is_blacklisted(self): + """Checks if the host is in the blacklist.""" return self._blacklisted + def is_resurrected(self): + """Checks if host is in an expired cooldown period.""" + if self._cooldown_period_end_ts > 0: + return not self._in_cooldown_period(time.time()) + return False + + class DiscoveredHosts(object): def __init__(self, host_slots, host_assignment_order): @@ -76,15 +124,17 @@ def update(self, hosts_state): if not hosts_state[host].is_blacklisted()] return self + def __str__(self): + return f"slots: {self._host_slots} order: {self._host_assignment_order}" + class HostManager(object): - def __init__(self, discovery): + def __init__(self, discovery, cooldown_range=None): self._current_hosts = DiscoveredHosts(host_slots={}, host_assignment_order=[]) - self._hosts_state = defaultdict(HostState) + self._hosts_state = defaultdict(lambda: HostState(cooldown_range)) self._discovery = discovery def update_available_hosts(self): - # TODO(travis): also check for hosts removed from the blacklist in the future def check_update(cur_host_slots, prev_host_slots): res = HostUpdateResult.no_update @@ -103,17 +153,32 @@ def check_update(cur_host_slots, prev_host_slots): elif cur_host_slots[h] < prev_host_slots[h]: # h has removed some slots res |= HostUpdateResult.removed + elif self._hosts_state[h].is_resurrected(): + res |= HostUpdateResult.added return res prev_host_slots = self._current_hosts.host_slots prev_host_assignment_order = self._current_hosts.host_assignment_order host_slots = self._discovery.find_available_hosts_and_slots() - if prev_host_slots != host_slots: - available_hosts = set([host for host in host_slots.keys() if not self._hosts_state[host].is_blacklisted()]) + + def whitelist_all_hosts(): + for host in host_slots.keys(): + if self._hosts_state[host].is_resurrected(): + self._hosts_state[host].whitelist() + + def has_resurrected_hosts(): + resurrected_hosts = [host for host in host_slots.keys() if self._hosts_state[host].is_resurrected()] + return len(resurrected_hosts) > 0 + + if prev_host_slots != host_slots or has_resurrected_hosts(): + available_hosts = set([host for host in host_slots.keys() \ + if not (self._hosts_state[host].is_blacklisted() and not self._hosts_state[host].is_resurrected())]) host_assignment_order = HostManager.order_available_hosts(available_hosts, prev_host_assignment_order) self._current_hosts = DiscoveredHosts(host_slots=host_slots, host_assignment_order=host_assignment_order) - return check_update(self._current_hosts.host_slots, prev_host_slots) + host_update_state = check_update(self._current_hosts.host_slots, prev_host_slots) + whitelist_all_hosts() + return host_update_state else: return HostUpdateResult.no_update @@ -123,7 +188,7 @@ def current_hosts(self): def blacklist(self, host): if not self._hosts_state[host].is_blacklisted(): - logging.warning('blacklist failing host: {}'.format(host)) + logging.debug('blacklist failing host: {}'.format(host)) self._hosts_state[host].blacklist() def is_blacklisted(self, host): diff --git a/horovod/runner/elastic/driver.py b/horovod/runner/elastic/driver.py index 73d9e9ffe7..20a2dd7b97 100644 --- a/horovod/runner/elastic/driver.py +++ b/horovod/runner/elastic/driver.py @@ -66,9 +66,9 @@ def get_results(self): class ElasticDriver(object): - def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, verbose=0): + def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, cooldown_range=None, verbose=0): self._rendezvous = rendezvous - self._host_manager = HostManager(discovery) + self._host_manager = HostManager(discovery, cooldown_range) self._min_np = min_np self._max_np = max_np self._verbose = verbose diff --git a/horovod/runner/elastic/settings.py b/horovod/runner/elastic/settings.py index 8cf38cdbe1..92bfbd6aea 100644 --- a/horovod/runner/elastic/settings.py +++ b/horovod/runner/elastic/settings.py @@ -17,7 +17,7 @@ class ElasticSettings(BaseSettings): - def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, **kwargs): + def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, cooldown_range=None, **kwargs): """ :param discovery: object used to detect and manage available hosts :type discovery: horovod.runner.elastic.discovery.HostDiscovery @@ -29,6 +29,8 @@ def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, **kw :type elastic_timeout: int :param reset_limit: maximum number of resets after which the job is terminated :type reset_limit: int + :param cooldown_range: maximum number of resets after which the job is terminated + :type cooldown_range: int """ super(ElasticSettings, self).__init__(elastic=True, **kwargs) self.discovery = discovery @@ -36,6 +38,7 @@ def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, **kw self.max_np = max_np self.elastic_timeout = elastic_timeout self.reset_limit = reset_limit + self.cooldown_range=cooldown_range # we do not serialize the discovery instance # it is not needed on the worker and might not be serializable diff --git a/horovod/runner/gloo_run.py b/horovod/runner/gloo_run.py index 544d5fb27c..3e3744c8ec 100644 --- a/horovod/runner/gloo_run.py +++ b/horovod/runner/gloo_run.py @@ -307,6 +307,7 @@ def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfa settings.min_np, settings.max_np, timeout=settings.elastic_timeout, reset_limit=settings.reset_limit, + cooldown_range=settings.cooldown_range, verbose=settings.verbose) handler = create_rendezvous_handler(driver) diff --git a/horovod/runner/launch.py b/horovod/runner/launch.py index 2c57028960..037caf0b6d 100644 --- a/horovod/runner/launch.py +++ b/horovod/runner/launch.py @@ -392,6 +392,8 @@ def parse_args(): group_elastic.add_argument('--reset-limit', action='store', dest='reset_limit', type=int, help='Maximum number of times that the training job can scale up or down ' 'the number of workers after which the job is terminated. (default: None)') + group_elastic.add_argument('--blacklist-cooldown-range', action='store', dest='cooldown_range', type=int, nargs=2, + help='Range of seconds(min, max) a failing host will remain in blacklist. (default: None)') group_timeline = parser.add_argument_group('timeline arguments') group_timeline.add_argument('--timeline-filename', action=make_override_action(override_args), @@ -456,23 +458,23 @@ def parse_args(): choices=config_parser.LOG_LEVELS, help='Minimum level to log to stderr from the Horovod backend. (default: WARNING).') group_logging_timestamp = group_logging.add_mutually_exclusive_group() - group_logging_timestamp.add_argument('--log-with-timestamp', + group_logging_timestamp.add_argument('--log-with-timestamp', action=make_override_true_action(override_args), help=argparse.SUPPRESS) group_logging_timestamp.add_argument('--log-without-timestamp', dest='log_with_timestamp', - action=make_override_false_action(override_args), + action=make_override_false_action(override_args), help='Hide the timestamp from Horovod internal log messages.') group_logging_timestamp.add_argument('-prefix-timestamp', '--prefix-output-with-timestamp', action='store_true', dest='prefix_output_with_timestamp', help='Timestamp each line of output to stdout, stderr, and stddiag.') - group_logging_timestamp.add_argument('--log-hide-timestamp', + group_logging_timestamp.add_argument('--log-hide-timestamp', dest='log_with_timestamp', action=make_deprecated_bool_action(override_args, False, '--log-without-timestamp'), help=argparse.SUPPRESS) - group_logging_timestamp.add_argument('--no-log-hide-timestamp', + group_logging_timestamp.add_argument('--no-log-hide-timestamp', dest='log_with_timestamp', action=make_deprecated_bool_action(override_args, True, '--log-with-timestamp'), - help=argparse.SUPPRESS) + help=argparse.SUPPRESS) group_hosts_parent = parser.add_argument_group('host arguments') group_hosts = group_hosts_parent.add_mutually_exclusive_group() @@ -647,6 +649,7 @@ def _run_elastic(args): max_np=args.max_np, elastic_timeout=args.elastic_timeout, reset_limit=args.reset_limit, + cooldown_range=args.cooldown_range, num_proc=args.np, verbose=2 if args.verbose else 0, ssh_port=args.ssh_port, diff --git a/test/integration/elastic_common.py b/test/integration/elastic_common.py index 1abe9942df..18a904dbaa 100644 --- a/test/integration/elastic_common.py +++ b/test/integration/elastic_common.py @@ -73,7 +73,7 @@ def __init__(self, training_script, *args, **kwargs): super(BaseElasticTests, self).__init__(*args, **kwargs) def _run(self, discovery_schedule=None, exit_schedule=None, exit_mode='exception', - np=2, min_np=2, max_np=4, hosts=None, reset_limit=None): + np=2, min_np=2, max_np=4, hosts=None, reset_limit=None, cooldown_range=None, epoch_wait=None, epochs=None): if not discovery_schedule and not hosts: raise ValueError('at least one of discovery schedule or hosts must be given') @@ -93,9 +93,16 @@ def _run(self, discovery_schedule=None, exit_schedule=None, exit_mode='exception if reset_limit is not None: command_args += ['--reset-limit', str(reset_limit)] + if cooldown_range is not None: + command_args += ['--blacklist-cooldown-range', str(cooldown_range[0]), str(cooldown_range[1])] + command_args += ['python', self._training_script, '--logfile', logfile] if discovery_schedule: command_args += ['--discovery-schedule', json.dumps(discovery_schedule)] + if epoch_wait: + command_args += ['--epoch-wait', json.dumps(epoch_wait)] + if epochs: + command_args += ['--epochs', json.dumps(epochs)] if exit_schedule: command_args += ['--exit-schedule', json.dumps(exit_schedule), '--exit-mode', exit_mode] @@ -261,3 +268,71 @@ def test_reset_limit(self, mock_get_min_start_hosts): # Job should succeed with reset_limit=2 results = self._run(discovery_schedule, np=2, min_np=2, max_np=4, reset_limit=2) self.assertEqual(len(results), 3) + + @mock.patch('horovod.runner.elastic.driver.DISCOVER_HOSTS_FREQUENCY_SECS', 0.01) + @mock.patch('horovod.runner.gloo_run._get_min_start_hosts', return_value=1) + def test_resurrecting_blacklisted_hosts(self, mock_get_min_start_hosts): + """Put a host on the blacklist and then resurrect""" + exit_mode = 'exception' + discovery_schedule = [ + (0, ['localhost:2', '127.0.0.1:2']), + (1, ['localhost:2', '127.0.0.1:2']), + (None, ['localhost:2', '127.0.0.1:2']), + ] + exit_schedule = { + str((1, 0)): [1], + } + + results = self._run(discovery_schedule=discovery_schedule, + exit_schedule=exit_schedule, exit_mode=exit_mode, + epoch_wait=1, cooldown_range=[1, 1], epochs=3) + + self.assertEqual(len(results), 3) + self.assertEqual(results[0]['start_rank'], 0) + self.assertEqual(results[0]['size'], 4) + self.assertEqual(results[0]['rendezvous'], 1) + + self.assertEqual(results[1]['start_rank'], 2) + self.assertEqual(results[1]['size'], 2) + self.assertEqual(results[1]['rendezvous'], 2) + + self.assertEqual(results[2]['start_rank'], 2) + self.assertEqual(results[2]['size'], 4) + self.assertEqual(results[2]['rendezvous'], 3) + + @mock.patch('horovod.runner.elastic.driver.DISCOVER_HOSTS_FREQUENCY_SECS', 0.01) + @mock.patch('horovod.runner.gloo_run._get_min_start_hosts', return_value=1) + def test_resurrecting_blacklisted_hosts_exponential_backoff(self, mock_get_min_start_hosts): + """Ensure that delay times are longer for multiple failures""" + exit_mode = 'exception' + num_epochs = 5 + discovery_schedule = [ + (0, ['localhost:2', '127.0.0.1:2']), + (2, ['localhost:2', '127.0.0.1:2']), + (None, ['localhost:2', '127.0.0.1:2']), + ] + exit_schedule = { + str((1, 0)): [3], + str((3, 0)): [3], + } + + results = self._run(discovery_schedule=discovery_schedule, + exit_schedule=exit_schedule, exit_mode=exit_mode, + epoch_wait=1, epochs=num_epochs, cooldown_range=[1, 10]) + + self.assertEqual(len(results), num_epochs) + self.assertEqual(results[0]['start_rank'], 0) + self.assertEqual(results[0]['size'], 4) + self.assertEqual(results[0]['rendezvous'], 1) + + self.assertEqual(results[1]['start_rank'], 0) + self.assertEqual(results[1]['size'], 2) + self.assertEqual(results[1]['rendezvous'], 2) + + self.assertEqual(results[2]['start_rank'], 0) + self.assertEqual(results[2]['size'], 2) + self.assertEqual(results[2]['rendezvous'], 2) + + self.assertEqual(results[4]['start_rank'], 0) + self.assertEqual(results[4]['size'], 2) + self.assertEqual(results[4]['rendezvous'], 3) diff --git a/test/single/test_ray_elastic_v2.py b/test/single/test_ray_elastic_v2.py index be478e98b6..ae3c36df47 100644 --- a/test/single/test_ray_elastic_v2.py +++ b/test/single/test_ray_elastic_v2.py @@ -272,6 +272,31 @@ def test_fault_tolerance_hosts_remove_and_add(ray_8_cpus): assert sum(int("started" in e) for e in events) == 7, events assert sum(int("finished" in e) for e in events) == 4, events +@pytest.mark.skipif( + not gloo_built(), reason='Gloo is required for Ray integration') +def test_fault_tolerance_hosts_remove_and_add_cooldown(ray_8_cpus): + with fault_tolerance_patches(): + discovery_schedule = [ + (10, ['host-1:2', 'host-2:1', 'host-3:2']), + (10, ['host-1:2']), + (None, ['host-1:2', 'host-2:1', 'host-3:2']), + ] + nics = list(psutil.net_if_addrs().keys())[0] + + settings = RayExecutor.create_settings(nics={nics}) + settings.discovery = SimpleTestDiscovery(discovery_schedule) + executor = RayExecutor(settings, + min_workers=1, cpus_per_worker=1, override_discovery=False, cooldown_range=[1,1]) + + training_fn = _create_training_function(iterations=30) + executor.start() + trace = StatusCallback() + results = executor.run(training_fn, callbacks=[trace]) + assert len(results) == 5 + + events = trace.fetch() + assert sum(int("started" in e) for e in events) == 5, events + assert sum(int("finished" in e) for e in events) == 5, events @pytest.mark.skipif( not gloo_built(), reason='Gloo is required for Ray integration')