Skip to content

Commit

Permalink
Support resurrecting blacklisted hosts
Browse files Browse the repository at this point in the history
This adds support for resurrecting blacklisted hosts in elastic mode.
Currently hosts that get blacklisted remain in the blacklist for the lifetime of the job.
This cannot handle transient host failure or a scale-up after as scale-down.
This is especially the case for the Kubeflow mpi-operator on Kubernetes, as it always
gives pods known hostnames from its hostfile.

This patch will allow blacklisted hosts to become whitelisted after a configured countdown period.
Cooldown periods can be configured with the ``--blacklist-cooldown-range`` parameter like this:

.. code-block:: bash
    $ horovodrun -np 8 --blacklist-cooldown-range 10 100 --min-np 4 --max-np 12 --host-discovery-script discover_hosts.py python train.py

The above example configures the minimum cooldown period to 10 seconds and the maximum cooldown period to 100 seconds.
The intial cooldown period would be 10 seconds. For repeat failures the cooldown period would grow with an exponential
backoff delay: 10s, 20s, 30s, and so on. However, the maximum cooldown period would be capped at 100 seconds, regardless
of failure count. The default behavior is to have no cooldown period, and blacklisted hosts would remain in blacklist.

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
  • Loading branch information
ashahab committed Jan 4, 2022
1 parent 976a879 commit cf1df3e
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 23 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -12,6 +12,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

- TensorFlow: Added in-place broadcasting of variables. ([#3128](https://github.com/horovod/horovod/pull/3128))

- Added support for resurrecting blacklisted hosts. ([#3319](https://github.com/horovod/horovod/pull/3319))

### Changed

### Deprecated
Expand Down
16 changes: 14 additions & 2 deletions docs/elastic.rst
Expand Up @@ -332,8 +332,20 @@ The maximum np can be used to cap the number of processes (to prevent over-utili
as a reference point for learning rate scales and data partitions (in cases where these need to be held constant
regardless of the current number of workers). If unspecified, maximum np also defaults to ``-np``.

Instances that fail will be added to a blacklist, as they may have faulty hardware. Ranks that fail repeatedly
will result in job failure, as it may be the case that the training process cannot make progress.
Instances that fail will be added to a blacklist, as they may have faulty hardware. Hosts will remain in blacklist for a configured cooldown period.
After the cooldown period ends, the hosts will be whitelisted back. This is to account for transient failures, and cases where the same host
is added back to a job.
Cooldown periods can be configured with the ``--blacklist-cooldown-range`` parameter like this:

.. code-block:: bash
$ horovodrun -np 8 --blacklist-cooldown-range 10 100 --min-np 4 --max-np 12 --host-discovery-script discover_hosts.py python train.py
The above example configures the minimum cooldown period to 10 seconds and the maximum cooldown period to 100 seconds.
The intial cooldown period would be 10 seconds. For repeat failures the cooldown period would grow with an exponential
backoff delay: 10s, 20s, 30s, and so on. However, the maximum cooldown period would be capped at 100 seconds, regardless
of failure count. The default behavior is to have no cooldown period, and blacklisted hosts would remain in blacklist.

Ranks that fail repeatedly will result in job failure, as it may be the case that the training process cannot make progress.


Running on Ray
Expand Down
8 changes: 8 additions & 0 deletions horovod/ray/elastic_v2.py
Expand Up @@ -162,6 +162,8 @@ class ElasticParams(BaseParams):
reset_limit (int): Maximum number of times that the training
job can scale up or down the number of workers after
which the job is terminated.
cooldown_range(int range): Range of seconds(min, max) a failing
host will remain in blacklist.
elastic_timeout (int): Timeout for elastic initialisation after
re-scaling the cluster. The default value is 600 seconds.
Alternatively, the environment variable
Expand All @@ -177,6 +179,7 @@ class ElasticParams(BaseParams):
min_workers: int = 1
max_workers: int = None
reset_limit: int = None
cooldown_range: List[int] = None
elastic_timeout: int = 600
override_discovery: bool = True

Expand Down Expand Up @@ -205,6 +208,8 @@ class ElasticAdapter(Adapter):
reset_limit (int): Maximum number of times that the training
job can scale up or down the number of workers after
which the job is terminated.
cooldown_range (int range): Range of seconds(min, max) a failing
host will remain in blacklist.
elastic_timeout (int): Timeout for elastic initialisation after
re-scaling the cluster. The default value is 600 seconds.
Alternatively, the environment variable
Expand All @@ -228,6 +233,7 @@ def __init__(self,
gpus_per_worker: Optional[int] = None,
override_discovery: bool=True,
reset_limit: int = None,
cooldown_range: List[int] = None,
elastic_timeout: int = 600):
self.settings = settings
if override_discovery:
Expand All @@ -243,6 +249,7 @@ def __init__(self,
self.max_workers = max_workers
self.num_workers = min_workers
self.reset_limit = reset_limit
self.cooldown_range = cooldown_range
self.elastic_timeout = elastic_timeout
self.driver = None
self.rendezvous = None
Expand Down Expand Up @@ -275,6 +282,7 @@ def start(self,
max_np=self.max_workers,
timeout=self.elastic_timeout,
reset_limit=self.reset_limit,
cooldown_range=self.cooldown_range,
verbose=self.settings.verbose)
handler = create_rendezvous_handler(self.driver)
logger.debug("[ray] starting rendezvous")
Expand Down
2 changes: 2 additions & 0 deletions horovod/ray/runner.py
Expand Up @@ -253,6 +253,7 @@ def __init__(
min_workers: int = None,
max_workers: int = None,
reset_limit: int = None,
cooldown_range: List[int] = None,
elastic_timeout: int = 600,
override_discovery: bool = True
):
Expand All @@ -268,6 +269,7 @@ def __init__(
min_workers=min_workers,
max_workers=max_workers,
reset_limit=reset_limit,
cooldown_range=cooldown_range,
elastic_timeout=elastic_timeout,
override_discovery=override_discovery,
cpus_per_worker=cpus_per_worker,
Expand Down
4 changes: 4 additions & 0 deletions horovod/runner/__init__.py
Expand Up @@ -54,6 +54,7 @@ def __init__(self):
self.slots = None
self.elastic_timeout = None
self.reset_limit = None
self.cooldown_range = None

# timeline arguments
self.timeline_filename = None
Expand Down Expand Up @@ -98,6 +99,7 @@ def run(
max_np=None,
slots=None,
reset_limit=None,
cooldown_range=None,
hosts=None,
hostfile=None,
start_timeout=None,
Expand Down Expand Up @@ -133,6 +135,7 @@ def run(
job after the initial registration. So a reset_limit of 0 would mean the job cannot change
membership after its initial set of workers. A reset_limit of 1 means it can resize at most
once, etc.
:param cooldown_range: Range of seconds(min, max) a failing host will remain in blacklist.
:param hosts: List of host names and the number of available slots
for running processes on each, of the form: <hostname>:<slots>
Expand Down Expand Up @@ -192,6 +195,7 @@ def wrapped_func():
hargs.max_np = max_np
hargs.slots = slots
hargs.reset_limit = reset_limit
hargs.cooldown_range = cooldown_range
hargs.hosts = hosts
hargs.hostfile = hostfile
hargs.start_timeout = start_timeout
Expand Down
89 changes: 77 additions & 12 deletions horovod/runner/elastic/discovery.py
Expand Up @@ -15,20 +15,33 @@

import io
import logging
import random
import threading

import time
from collections import defaultdict

from horovod.runner.common.util import safe_shell_exec
from horovod.runner.elastic.worker import HostUpdateResult


# Maximum time hosts are in cooldown period
COOLDOWN_UPPER_LIMIT_SECONDS = 5 * 60
# Minimum time hosts are in cooldown
COOLDOWN_LOWER_LIMIT_SECONDS = 10
# Unit of increasing cooldown for repeat failure
COOLDOWN_RANGE_SECONDS = [COOLDOWN_LOWER_LIMIT_SECONDS, COOLDOWN_UPPER_LIMIT_SECONDS]
class HostState(object):
def __init__(self):

def __init__(self, cooldown_range=None):
self._event = threading.Event()

# TODO(travis): blacklisted hosts should have a timeout period that increases with each failure
self._blacklisted = False
self._blacklist_count = 0
if cooldown_range:
self._cooldown_lower_limit = cooldown_range[0]
self._cooldown_upper_limit = cooldown_range[1]
else:
self._cooldown_lower_limit = -1
self._cooldown_upper_limit = -1
self._cooldown_period_end_ts = 0

def get_event(self):
if self._event.is_set():
Expand All @@ -39,13 +52,48 @@ def get_event(self):
def set_event(self):
self._event.set()

def _in_cooldown_period(self, current_time):
return self._cooldown_period_end_ts > current_time


def _set_cooldown_period(self, current_time):
if self._cooldown_lower_limit == -1 or self._cooldown_upper_limit == -1:
return
self._blacklist_count += 1
def _exponential_backoff_time():
cooldown_delay = self._cooldown_lower_limit * (1 << self._blacklist_count)
+ (random.uniform(0,1) * self._cooldown_lower_limit)
logging.debug(f"{self._blacklist_count}:{self._cooldown_period_end_ts} cooldown_delay: {cooldown_delay}")
return max(self._cooldown_lower_limit, min(self._cooldown_upper_limit, cooldown_delay))
cooldown_delta_seconds = _exponential_backoff_time()
self._cooldown_period_end_ts = current_time + cooldown_delta_seconds
logging.debug(f"cooldown delta seconds: {cooldown_delta_seconds}")

def blacklist(self):
"""Moves this host to a blacklist, and starts the cooldown period."""
self._blacklisted = True
now = time.time()
if self._in_cooldown_period(now):
return
self._set_cooldown_period(now)
self.set_event()

def whitelist(self):
"""Ends the cooldown period and moves this host out of blacklist."""
self._cooldown_period_end_ts = 0
self._blacklisted = False

def is_blacklisted(self):
"""Checks if the host is in the blacklist."""
return self._blacklisted

def is_resurrected(self):
"""Checks if host is in an expired cooldown period."""
if self._cooldown_period_end_ts > 0:
return not self._in_cooldown_period(time.time())
return False



class DiscoveredHosts(object):
def __init__(self, host_slots, host_assignment_order):
Expand Down Expand Up @@ -76,15 +124,17 @@ def update(self, hosts_state):
if not hosts_state[host].is_blacklisted()]
return self

def __str__(self):
return f"slots: {self._host_slots} order: {self._host_assignment_order}"


class HostManager(object):
def __init__(self, discovery):
def __init__(self, discovery, cooldown_range=None):
self._current_hosts = DiscoveredHosts(host_slots={}, host_assignment_order=[])
self._hosts_state = defaultdict(HostState)
self._hosts_state = defaultdict(lambda: HostState(cooldown_range))
self._discovery = discovery

def update_available_hosts(self):
# TODO(travis): also check for hosts removed from the blacklist in the future
def check_update(cur_host_slots, prev_host_slots):
res = HostUpdateResult.no_update

Expand All @@ -103,17 +153,32 @@ def check_update(cur_host_slots, prev_host_slots):
elif cur_host_slots[h] < prev_host_slots[h]:
# h has removed some slots
res |= HostUpdateResult.removed
elif self._hosts_state[h].is_resurrected():
res |= HostUpdateResult.added
return res

prev_host_slots = self._current_hosts.host_slots
prev_host_assignment_order = self._current_hosts.host_assignment_order
host_slots = self._discovery.find_available_hosts_and_slots()
if prev_host_slots != host_slots:
available_hosts = set([host for host in host_slots.keys() if not self._hosts_state[host].is_blacklisted()])

def whitelist_all_hosts():
for host in host_slots.keys():
if self._hosts_state[host].is_resurrected():
self._hosts_state[host].whitelist()

def has_resurrected_hosts():
resurrected_hosts = [host for host in host_slots.keys() if self._hosts_state[host].is_resurrected()]
return len(resurrected_hosts) > 0

if prev_host_slots != host_slots or has_resurrected_hosts():
available_hosts = set([host for host in host_slots.keys() \
if not (self._hosts_state[host].is_blacklisted() and not self._hosts_state[host].is_resurrected())])
host_assignment_order = HostManager.order_available_hosts(available_hosts, prev_host_assignment_order)
self._current_hosts = DiscoveredHosts(host_slots=host_slots,
host_assignment_order=host_assignment_order)
return check_update(self._current_hosts.host_slots, prev_host_slots)
host_update_state = check_update(self._current_hosts.host_slots, prev_host_slots)
whitelist_all_hosts()
return host_update_state
else:
return HostUpdateResult.no_update

Expand All @@ -123,7 +188,7 @@ def current_hosts(self):

def blacklist(self, host):
if not self._hosts_state[host].is_blacklisted():
logging.warning('blacklist failing host: {}'.format(host))
logging.debug('blacklist failing host: {}'.format(host))
self._hosts_state[host].blacklist()

def is_blacklisted(self, host):
Expand Down
4 changes: 2 additions & 2 deletions horovod/runner/elastic/driver.py
Expand Up @@ -66,9 +66,9 @@ def get_results(self):


class ElasticDriver(object):
def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, verbose=0):
def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, cooldown_range=None, verbose=0):
self._rendezvous = rendezvous
self._host_manager = HostManager(discovery)
self._host_manager = HostManager(discovery, cooldown_range)
self._min_np = min_np
self._max_np = max_np
self._verbose = verbose
Expand Down
5 changes: 4 additions & 1 deletion horovod/runner/elastic/settings.py
Expand Up @@ -17,7 +17,7 @@


class ElasticSettings(BaseSettings):
def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, **kwargs):
def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, cooldown_range=None, **kwargs):
"""
:param discovery: object used to detect and manage available hosts
:type discovery: horovod.runner.elastic.discovery.HostDiscovery
Expand All @@ -29,13 +29,16 @@ def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, **kw
:type elastic_timeout: int
:param reset_limit: maximum number of resets after which the job is terminated
:type reset_limit: int
:param cooldown_range: maximum number of resets after which the job is terminated
:type cooldown_range: int
"""
super(ElasticSettings, self).__init__(elastic=True, **kwargs)
self.discovery = discovery
self.min_np = min_np
self.max_np = max_np
self.elastic_timeout = elastic_timeout
self.reset_limit = reset_limit
self.cooldown_range=cooldown_range

# we do not serialize the discovery instance
# it is not needed on the worker and might not be serializable
Expand Down
1 change: 1 addition & 0 deletions horovod/runner/gloo_run.py
Expand Up @@ -307,6 +307,7 @@ def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfa
settings.min_np, settings.max_np,
timeout=settings.elastic_timeout,
reset_limit=settings.reset_limit,
cooldown_range=settings.cooldown_range,
verbose=settings.verbose)

handler = create_rendezvous_handler(driver)
Expand Down
13 changes: 8 additions & 5 deletions horovod/runner/launch.py
Expand Up @@ -392,6 +392,8 @@ def parse_args():
group_elastic.add_argument('--reset-limit', action='store', dest='reset_limit', type=int,
help='Maximum number of times that the training job can scale up or down '
'the number of workers after which the job is terminated. (default: None)')
group_elastic.add_argument('--blacklist-cooldown-range', action='store', dest='cooldown_range', type=int, nargs=2,
help='Range of seconds(min, max) a failing host will remain in blacklist. (default: None)')

group_timeline = parser.add_argument_group('timeline arguments')
group_timeline.add_argument('--timeline-filename', action=make_override_action(override_args),
Expand Down Expand Up @@ -456,23 +458,23 @@ def parse_args():
choices=config_parser.LOG_LEVELS,
help='Minimum level to log to stderr from the Horovod backend. (default: WARNING).')
group_logging_timestamp = group_logging.add_mutually_exclusive_group()
group_logging_timestamp.add_argument('--log-with-timestamp',
group_logging_timestamp.add_argument('--log-with-timestamp',
action=make_override_true_action(override_args),
help=argparse.SUPPRESS)
group_logging_timestamp.add_argument('--log-without-timestamp', dest='log_with_timestamp',
action=make_override_false_action(override_args),
action=make_override_false_action(override_args),
help='Hide the timestamp from Horovod internal log messages.')
group_logging_timestamp.add_argument('-prefix-timestamp', '--prefix-output-with-timestamp', action='store_true',
dest='prefix_output_with_timestamp',
help='Timestamp each line of output to stdout, stderr, and stddiag.')
group_logging_timestamp.add_argument('--log-hide-timestamp',
group_logging_timestamp.add_argument('--log-hide-timestamp',
dest='log_with_timestamp',
action=make_deprecated_bool_action(override_args, False, '--log-without-timestamp'),
help=argparse.SUPPRESS)
group_logging_timestamp.add_argument('--no-log-hide-timestamp',
group_logging_timestamp.add_argument('--no-log-hide-timestamp',
dest='log_with_timestamp',
action=make_deprecated_bool_action(override_args, True, '--log-with-timestamp'),
help=argparse.SUPPRESS)
help=argparse.SUPPRESS)

group_hosts_parent = parser.add_argument_group('host arguments')
group_hosts = group_hosts_parent.add_mutually_exclusive_group()
Expand Down Expand Up @@ -647,6 +649,7 @@ def _run_elastic(args):
max_np=args.max_np,
elastic_timeout=args.elastic_timeout,
reset_limit=args.reset_limit,
cooldown_range=args.cooldown_range,
num_proc=args.np,
verbose=2 if args.verbose else 0,
ssh_port=args.ssh_port,
Expand Down

0 comments on commit cf1df3e

Please sign in to comment.