Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support resurrecting blacklisted hosts #3319

Merged
merged 1 commit into from Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -12,6 +12,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

- TensorFlow: Added in-place broadcasting of variables. ([#3128](https://github.com/horovod/horovod/pull/3128))

- Added support for resurrecting blacklisted hosts. ([#3319](https://github.com/horovod/horovod/pull/3319))

### Changed

- Moved to CMake version 3.13 with first-class CUDA language support and re-enabled parallelized builds. ([#3261](https://github.com/horovod/horovod/pull/3261))
Expand Down
19 changes: 17 additions & 2 deletions docs/elastic.rst
Expand Up @@ -332,8 +332,23 @@ The maximum np can be used to cap the number of processes (to prevent over-utili
as a reference point for learning rate scales and data partitions (in cases where these need to be held constant
regardless of the current number of workers). If unspecified, maximum np also defaults to ``-np``.

Instances that fail will be added to a blacklist, as they may have faulty hardware. Ranks that fail repeatedly
will result in job failure, as it may be the case that the training process cannot make progress.
Instances that fail will be added to a blacklist, as they may have faulty hardware. Hosts will remain in blacklist for a configured cooldown period.
After the cooldown period ends, the hosts will be whitelisted back. This is to account for transient failures, and cases where the same host
is added back to a job.
Cooldown periods can be configured with the ``--blacklist-cooldown-range`` parameter like this:

.. code-block:: bash

$ horovodrun -np 8 --blacklist-cooldown-range 10 100 --min-np 4 --max-np 12 --host-discovery-script discover_hosts.py python train.py

The above example configures the minimum cooldown period to 10 seconds and the maximum cooldown period to 100 seconds.
The intial cooldown period would be 10 seconds. For repeat failures the cooldown period would grow with an exponential
backoff delay (with a constant exponent of 2): 10s, 20s, 40s, and so on. However, the maximum cooldown period would be
capped at 100 seconds, regardless of failure count. A random backoff fraction of the cooldown lower limit is added
to the cooldown delay.
The default behavior is to have no cooldown period, and blacklisted hosts would remain in blacklist.

Ranks that fail repeatedly will result in job failure, as it may be the case that the training process cannot make progress.


Running on Ray
Expand Down
16 changes: 15 additions & 1 deletion horovod/ray/elastic_v2.py
@@ -1,4 +1,4 @@
from typing import Callable, List, Any, Dict, Optional
from typing import Callable, List, Any, Dict, Optional, Tuple
import logging
import ray.exceptions
import socket
Expand Down Expand Up @@ -162,6 +162,11 @@ class ElasticParams(BaseParams):
reset_limit (int): Maximum number of times that the training
job can scale up or down the number of workers after
which the job is terminated.
cooldown_range(Tuple[int, int]): Range(in seconds) a failing
host will remain in blacklist.
ashahab marked this conversation as resolved.
Show resolved Hide resolved
Example: cooldown_range=(10, 100)
This sets the minimum cooldown period to 10 seconds,
and the maximum cooldown period to 100 seconds.
elastic_timeout (int): Timeout for elastic initialisation after
re-scaling the cluster. The default value is 600 seconds.
Alternatively, the environment variable
Expand All @@ -177,6 +182,7 @@ class ElasticParams(BaseParams):
min_workers: int = 1
max_workers: int = None
reset_limit: int = None
cooldown_range: Optional[Tuple[int, int]] = None
elastic_timeout: int = 600
override_discovery: bool = True

Expand Down Expand Up @@ -205,6 +211,11 @@ class ElasticAdapter(Adapter):
reset_limit (int): Maximum number of times that the training
job can scale up or down the number of workers after
which the job is terminated.
cooldown_range (Tuple[int, int]): Range(in seconds) a failing
host will remain in blacklist.
ashahab marked this conversation as resolved.
Show resolved Hide resolved
Example: cooldown_range=(10, 100)
This sets the minimum cooldown period to 10 seconds,
and the maximum cooldown period to 100 seconds.
elastic_timeout (int): Timeout for elastic initialisation after
re-scaling the cluster. The default value is 600 seconds.
Alternatively, the environment variable
Expand All @@ -228,6 +239,7 @@ def __init__(self,
gpus_per_worker: Optional[int] = None,
override_discovery: bool=True,
reset_limit: int = None,
cooldown_range: Optional[Tuple[int, int]] = None,
elastic_timeout: int = 600):
self.settings = settings
if override_discovery:
Expand All @@ -243,6 +255,7 @@ def __init__(self,
self.max_workers = max_workers
self.num_workers = min_workers
self.reset_limit = reset_limit
self.cooldown_range = cooldown_range
self.elastic_timeout = elastic_timeout
self.driver = None
self.rendezvous = None
Expand Down Expand Up @@ -275,6 +288,7 @@ def start(self,
max_np=self.max_workers,
timeout=self.elastic_timeout,
reset_limit=self.reset_limit,
cooldown_range=self.cooldown_range,
verbose=self.settings.verbose)
handler = create_rendezvous_handler(self.driver)
logger.debug("[ray] starting rendezvous")
Expand Down
2 changes: 2 additions & 0 deletions horovod/ray/runner.py
Expand Up @@ -253,6 +253,7 @@ def __init__(
min_workers: int = None,
max_workers: int = None,
reset_limit: int = None,
cooldown_range: List[int] = None,
elastic_timeout: int = 600,
override_discovery: bool = True
):
Expand All @@ -268,6 +269,7 @@ def __init__(
min_workers=min_workers,
max_workers=max_workers,
reset_limit=reset_limit,
cooldown_range=cooldown_range,
elastic_timeout=elastic_timeout,
override_discovery=override_discovery,
cpus_per_worker=cpus_per_worker,
Expand Down
4 changes: 4 additions & 0 deletions horovod/runner/__init__.py
Expand Up @@ -54,6 +54,7 @@ def __init__(self):
self.slots = None
self.elastic_timeout = None
self.reset_limit = None
self.cooldown_range = None

# timeline arguments
self.timeline_filename = None
Expand Down Expand Up @@ -98,6 +99,7 @@ def run(
max_np=None,
slots=None,
reset_limit=None,
cooldown_range=None,
hosts=None,
hostfile=None,
start_timeout=None,
Expand Down Expand Up @@ -133,6 +135,7 @@ def run(
job after the initial registration. So a reset_limit of 0 would mean the job cannot change
membership after its initial set of workers. A reset_limit of 1 means it can resize at most
once, etc.
:param cooldown_range: Range of seconds(min, max) a failing host will remain in blacklist.

:param hosts: List of host names and the number of available slots
for running processes on each, of the form: <hostname>:<slots>
Expand Down Expand Up @@ -192,6 +195,7 @@ def wrapped_func():
hargs.max_np = max_np
hargs.slots = slots
hargs.reset_limit = reset_limit
hargs.cooldown_range = cooldown_range
hargs.hosts = hosts
hargs.hostfile = hostfile
hargs.start_timeout = start_timeout
Expand Down
100 changes: 90 additions & 10 deletions horovod/runner/elastic/discovery.py
Expand Up @@ -15,20 +15,48 @@

import io
import logging
import random
import threading

import time
from collections import defaultdict

from horovod.runner.common.util import safe_shell_exec
from horovod.runner.elastic.worker import HostUpdateResult

# The default lower bound for cooldown period. If a range is provided,
# the provided lower limit must be at or above this lower bound
DEFAULT_COOLDOWN_LOWER_LIMIT_SECONDS = 1
# The default upper bound for cooldown period. If a range is provided,
# the provided upper limit must be at or below this upper bound
DEFAULT_COOLDOWN_UPPER_LIMIT_SECONDS = 1 * 60 * 60

ashahab marked this conversation as resolved.
Show resolved Hide resolved
class HostState(object):
ashahab marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self):

def __init__(self, cooldown_range=None):
self._event = threading.Event()

# TODO(travis): blacklisted hosts should have a timeout period that increases with each failure
self._blacklisted = False
self._blacklist_count = 0
if cooldown_range:
HostState._validate_cooldown_range(cooldown_range)
self._cooldown_lower_limit, self._cooldown_upper_limit = cooldown_range
else:
self._cooldown_lower_limit = -1
self._cooldown_upper_limit = -1
self._cooldown_period_end_ts = 0

@staticmethod
def _validate_cooldown_range(cooldown_range):
cooldown_lower_limit, cooldown_upper_limit = cooldown_range

if (cooldown_lower_limit < DEFAULT_COOLDOWN_LOWER_LIMIT_SECONDS):
raise ValueError(f"Provided cooldown lower limit: {cooldown_lower_limit} \
cannot be lower than default cooldown lower limit: {DEFAULT_COOLDOWN_LOWER_LIMIT_SECONDS}")


if (cooldown_upper_limit > DEFAULT_COOLDOWN_UPPER_LIMIT_SECONDS):
raise ValueError(f"Provided cooldown upper limit: {cooldown_upper_limit} \
cannot be higher than default cooldown upper limit: {DEFAULT_COOLDOWN_UPPER_LIMIT_SECONDS}")

def get_event(self):
if self._event.is_set():
Expand All @@ -39,13 +67,48 @@ def get_event(self):
def set_event(self):
self._event.set()

def _in_cooldown_period(self, current_time):
return self._cooldown_period_end_ts > current_time


def _set_cooldown_period(self, current_time):
if self._cooldown_lower_limit == -1 or self._cooldown_upper_limit == -1:
return
self._blacklist_count += 1

cooldown_delay = self._cooldown_lower_limit * (1 << self._blacklist_count) + (random.uniform(0,1) * self._cooldown_lower_limit)
logging.debug(f"{self._blacklist_count}:{self._cooldown_period_end_ts} cooldown_delay: {cooldown_delay}")
# We need to ensure that the cooldown upper limit is the upper bound of the delay
cooldown_delta_seconds = max(self._cooldown_lower_limit, min(self._cooldown_upper_limit, cooldown_delay))

self._cooldown_period_end_ts = current_time + cooldown_delta_seconds
logging.debug(f"cooldown delta seconds: {cooldown_delta_seconds}")

def blacklist(self):
"""Moves this host to a blacklist, and starts the cooldown period."""
self._blacklisted = True
now = time.time()
if self._in_cooldown_period(now):
return
self._set_cooldown_period(now)
self.set_event()

def whitelist(self):
"""Ends the cooldown period and moves this host out of blacklist."""
self._cooldown_period_end_ts = 0
self._blacklisted = False

def is_blacklisted(self):
"""Checks if the host is in the blacklist."""
return self._blacklisted

def is_resurrected(self):
"""Checks if host is in an expired cooldown period."""
if self._cooldown_period_end_ts > 0:
return not self._in_cooldown_period(time.time())
return False



class DiscoveredHosts(object):
def __init__(self, host_slots, host_assignment_order):
Expand Down Expand Up @@ -76,15 +139,17 @@ def update(self, hosts_state):
if not hosts_state[host].is_blacklisted()]
return self

def __str__(self):
return f"slots: {self._host_slots} order: {self._host_assignment_order}"


class HostManager(object):
def __init__(self, discovery):
def __init__(self, discovery, cooldown_range=None):
self._current_hosts = DiscoveredHosts(host_slots={}, host_assignment_order=[])
self._hosts_state = defaultdict(HostState)
self._hosts_state = defaultdict(lambda: HostState(cooldown_range))
self._discovery = discovery

def update_available_hosts(self):
# TODO(travis): also check for hosts removed from the blacklist in the future
def check_update(cur_host_slots, prev_host_slots):
res = HostUpdateResult.no_update

Expand All @@ -103,17 +168,32 @@ def check_update(cur_host_slots, prev_host_slots):
elif cur_host_slots[h] < prev_host_slots[h]:
# h has removed some slots
res |= HostUpdateResult.removed
elif self._hosts_state[h].is_resurrected():
res |= HostUpdateResult.added
return res

prev_host_slots = self._current_hosts.host_slots
prev_host_assignment_order = self._current_hosts.host_assignment_order
host_slots = self._discovery.find_available_hosts_and_slots()
if prev_host_slots != host_slots:
available_hosts = set([host for host in host_slots.keys() if not self._hosts_state[host].is_blacklisted()])

def whitelist_all_hosts():
for host in host_slots.keys():
if self._hosts_state[host].is_resurrected():
self._hosts_state[host].whitelist()

def has_resurrected_hosts():
resurrected_hosts = [host for host in host_slots.keys() if self._hosts_state[host].is_resurrected()]
return len(resurrected_hosts) > 0

if prev_host_slots != host_slots or has_resurrected_hosts():
available_hosts = set([host for host in host_slots.keys() \
if not (self._hosts_state[host].is_blacklisted() and not self._hosts_state[host].is_resurrected())])
host_assignment_order = HostManager.order_available_hosts(available_hosts, prev_host_assignment_order)
self._current_hosts = DiscoveredHosts(host_slots=host_slots,
host_assignment_order=host_assignment_order)
return check_update(self._current_hosts.host_slots, prev_host_slots)
host_update_state = check_update(self._current_hosts.host_slots, prev_host_slots)
whitelist_all_hosts()
return host_update_state
else:
return HostUpdateResult.no_update

Expand All @@ -123,7 +203,7 @@ def current_hosts(self):

def blacklist(self, host):
if not self._hosts_state[host].is_blacklisted():
logging.warning('blacklist failing host: {}'.format(host))
logging.info('blacklist failing host: {}'.format(host))
self._hosts_state[host].blacklist()

def is_blacklisted(self, host):
Expand Down
4 changes: 2 additions & 2 deletions horovod/runner/elastic/driver.py
Expand Up @@ -66,9 +66,9 @@ def get_results(self):


class ElasticDriver(object):
def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, verbose=0):
def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, cooldown_range=None, verbose=0):
self._rendezvous = rendezvous
self._host_manager = HostManager(discovery)
self._host_manager = HostManager(discovery, cooldown_range)
self._min_np = min_np
self._max_np = max_np
self._verbose = verbose
Expand Down
5 changes: 4 additions & 1 deletion horovod/runner/elastic/settings.py
Expand Up @@ -17,7 +17,7 @@


class ElasticSettings(BaseSettings):
def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, **kwargs):
def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, cooldown_range=None, **kwargs):
"""
:param discovery: object used to detect and manage available hosts
:type discovery: horovod.runner.elastic.discovery.HostDiscovery
Expand All @@ -29,13 +29,16 @@ def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, **kw
:type elastic_timeout: int
:param reset_limit: maximum number of resets after which the job is terminated
:type reset_limit: int
:param cooldown_range: maximum number of resets after which the job is terminated
:type cooldown_range: int
"""
super(ElasticSettings, self).__init__(elastic=True, **kwargs)
self.discovery = discovery
self.min_np = min_np
self.max_np = max_np
self.elastic_timeout = elastic_timeout
self.reset_limit = reset_limit
self.cooldown_range=cooldown_range

# we do not serialize the discovery instance
# it is not needed on the worker and might not be serializable
Expand Down
1 change: 1 addition & 0 deletions horovod/runner/gloo_run.py
Expand Up @@ -307,6 +307,7 @@ def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfa
settings.min_np, settings.max_np,
timeout=settings.elastic_timeout,
reset_limit=settings.reset_limit,
cooldown_range=settings.cooldown_range,
verbose=settings.verbose)

handler = create_rendezvous_handler(driver)
Expand Down