Skip to content

Commit

Permalink
Deprecate use of np, min_np, max_np, replace with num_proc (#3409)
Browse files Browse the repository at this point in the history
* Deprecate use of np, min_np, max_np, replace with num_proc
* Test long and short num-proc horovodrun args
* Improve horovodrun help on --blacklist-cooldown-range

Signed-off-by: Enrico Minack <github@enrico.minack.dev>
  • Loading branch information
EnricoMi committed Mar 12, 2022
1 parent f8c9649 commit 980ce05
Show file tree
Hide file tree
Showing 22 changed files with 284 additions and 191 deletions.
30 changes: 21 additions & 9 deletions horovod/ray/elastic.py
Expand Up @@ -6,6 +6,7 @@
import random
import math
import threading
import warnings

from horovod.runner.common.util import timeout, secret

Expand Down Expand Up @@ -184,24 +185,28 @@ class ElasticRayExecutor:
"""

@staticmethod
def create_settings(min_np: int = 1,
max_np: int = None,
def create_settings(min_num_proc: int = 1,
max_num_proc: int = None,
reset_limit: int = None,
elastic_timeout: int = 600,
timeout_s: int = 30,
ssh_identity_file: str = None,
nics: str = None,
# min_np is deprecated, use min_num_proc instead
min_np=None,
# max_np is deprecated, use max_num_proc instead
max_np=None,
**kwargs):
"""Returns a Settings object for ElasticRayExecutor.
Note that the `discovery` property will be set at runtime.
Args:
min_np (int): Minimum number of processes running for
min_num_proc (int): Minimum number of processes running for
training to continue. If number of available processes dips
below this threshold, then training will wait for
more instances to become available.
max_np (int): Maximum number of training processes,
max_num_proc (int): Maximum number of training processes,
beyond which no additional processes will be created.
If not specified, then will be unbounded.
reset_limit (int): Maximum number of times that the training
Expand All @@ -218,6 +223,13 @@ def create_settings(min_np: int = 1,
the identity (private key) is read.
nics (set): Network interfaces that can be used for communication.
"""
if min_np is not None:
min_num_proc = min_np
warnings.warn('min_np is deprecated, use min_num_proc instead', DeprecationWarning)
if max_np is not None:
max_num_proc = max_np
warnings.warn('max_np is deprecated, use max_num_proc instead', DeprecationWarning)

start_timeout = timeout.Timeout(
timeout_s,
message="Timed out waiting for {activity}. Please "
Expand All @@ -228,11 +240,11 @@ def create_settings(min_np: int = 1,
"~/ray_bootstrap_key.pem")
settings = ElasticSettings(
discovery=None,
min_np=min_np,
max_np=max_np,
min_num_proc=min_num_proc,
max_num_proc=max_num_proc,
elastic_timeout=elastic_timeout,
reset_limit=reset_limit,
num_proc=min_np,
num_proc=min_num_proc,
ssh_identity_file=ssh_identity_file,
nics=nics,
start_timeout=start_timeout,
Expand Down Expand Up @@ -276,8 +288,8 @@ def start(self):
self.driver = ElasticDriver(
rendezvous=self.rendezvous,
discovery=self.settings.discovery,
min_np=self.settings.min_np,
max_np=self.settings.max_np,
min_num_proc=self.settings.min_num_proc,
max_num_proc=self.settings.max_num_proc,
timeout=self.settings.elastic_timeout,
reset_limit=self.settings.reset_limit,
verbose=self.settings.verbose)
Expand Down
4 changes: 2 additions & 2 deletions horovod/ray/elastic_v2.py
Expand Up @@ -284,8 +284,8 @@ def start(self,
self.driver = ElasticDriver(
rendezvous=self.rendezvous,
discovery=self.settings.discovery,
min_np=self.min_workers,
max_np=self.max_workers,
min_num_proc=self.min_workers,
max_num_proc=self.max_workers,
timeout=self.elastic_timeout,
reset_limit=self.reset_limit,
cooldown_range=self.cooldown_range,
Expand Down
47 changes: 32 additions & 15 deletions horovod/runner/__init__.py
Expand Up @@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
import warnings


class _HorovodArgs(object):
def __init__(self):
self.np = 1
self.num_proc = 1
self.check_build = None
self.ssh_port = None
self.ssh_identity_file = None
Expand Down Expand Up @@ -49,8 +50,8 @@ def __init__(self):
self.autotune_gaussian_process_noise = None

# elastic arguments
self.min_np = None
self.max_np = None
self.min_num_proc = None
self.max_num_proc = None
self.slots = None
self.elastic_timeout = None
self.reset_limit = None
Expand Down Expand Up @@ -94,9 +95,9 @@ def run(
func,
args=(),
kwargs=None,
np=1,
min_np=None,
max_np=None,
num_proc=1,
min_num_proc=None,
max_num_proc=None,
slots=None,
reset_limit=None,
cooldown_range=None,
Expand All @@ -112,7 +113,13 @@ def run(
use_mpi=None,
mpi_args=None,
network_interface=None,
executable=None):
executable=None,
# np is deprecated, use num_proc instead
np=None,
# min_np is deprecated, use min_num_proc instead
min_np=None,
# max_np is deprecated, use max_num_proc instead
max_np=None):
"""
Launch a Horovod job to run the specified process function and get the return value.
Expand All @@ -121,11 +128,11 @@ def run(
This function must be compatible with pickle.
:param args: Arguments to pass to `func`.
:param kwargs: Keyword arguments to pass to `func`.
:param np: Number of Horovod processes.
:param min_np: Minimum number of processes running for training to continue. If number of
:param num_proc: Number of Horovod processes.
:param min_num_proc: Minimum number of processes running for training to continue. If number of
available processes dips below this threshold, then training will wait for
more instances to become available. Defaults to np
:param max_np: Maximum number of training processes, beyond which no additional processes
more instances to become available. Defaults to num_proc
:param max_num_proc: Maximum number of training processes, beyond which no additional processes
will be created. If not specified, then will be unbounded.
:param slots: Number of slots for processes per host. Normally 1 slot per GPU per host.
If slots are provided by the output of the host discovery script, then that
Expand All @@ -140,7 +147,7 @@ def run(
:param hosts: List of host names and the number of available slots
for running processes on each, of the form: <hostname>:<slots>
(e.g.: host1:2,host2:4,host3:1 indicating 2 processes can run on host1,
4 on host2, and 1 on host3). If not specified, defaults to using localhost:<np>
4 on host2, and 1 on host3). If not specified, defaults to using localhost:<num_proc>
:param hostfile: Path to a host file containing the list of host names and the number of
available slots. Each line of the file must be of the form:
<hostname> slots=<slots>
Expand Down Expand Up @@ -174,6 +181,16 @@ def run(
:return: Return a list which contains values return by all Horovod processes.
The index of the list corresponds to the rank of each Horovod process.
"""
if np is not None:
num_proc = np
warnings.warn('np is deprecated, use num_proc instead', DeprecationWarning)
if min_np is not None:
min_num_proc = min_np
warnings.warn('min_np is deprecated, use min_num_proc instead', DeprecationWarning)
if max_np is not None:
max_num_proc = max_np
warnings.warn('max_np is deprecated, use max_num_proc instead', DeprecationWarning)

from .launch import _run

if kwargs is None:
Expand All @@ -190,9 +207,9 @@ def wrapped_func():

hargs = _HorovodArgs()

hargs.np = np
hargs.min_np = min_np
hargs.max_np = max_np
hargs.num_proc = num_proc
hargs.min_num_proc = min_num_proc
hargs.max_num_proc = max_num_proc
hargs.slots = slots
hargs.reset_limit = reset_limit
hargs.cooldown_range = cooldown_range
Expand Down
16 changes: 8 additions & 8 deletions horovod/runner/common/util/hosts.py
Expand Up @@ -97,18 +97,18 @@ def parse_hosts(hosts_string):
return [HostInfo.from_string(host_string) for host_string in hosts_string.split(',')]


def get_host_assignments(hosts, min_np, max_np=None):
def get_host_assignments(hosts, min_num_proc, max_num_proc=None):
"""Assign hosts with process capacities (slots) to ranks in the Horovod process.
This function will try to allocate as many as possible processes on the same host to leverage
local network.
:param hosts: list of HostInfo objects describing host and slot capacity
:type hosts: list[HostInfo]
:param min_np: minimum number of processes to be allocated
:type min_np: int
:param max_np: (optional) maximum number of processes to be allocated
:type max_np: int
:param min_num_proc: minimum number of processes to be allocated
:type min_num_proc: int
:param max_num_proc: (optional) maximum number of processes to be allocated
:type max_num_proc: int
:return: a list of the allocation of process on hosts in a `SlotInfo` object.
:rtype: list[SlotInfo]
"""
Expand All @@ -118,7 +118,7 @@ def get_host_assignments(hosts, min_np, max_np=None):
for host_info in hosts:
ranks = []
for local_rank in range(host_info.slots):
if rank == max_np:
if rank == max_num_proc:
break

ranks.append(rank)
Expand All @@ -130,9 +130,9 @@ def get_host_assignments(hosts, min_np, max_np=None):
host_ranks.append((host_info, ranks))

world_size = rank
if world_size < min_np:
if world_size < min_num_proc:
raise ValueError('Requested more processes ({}) than there are available slots ({})'
.format(min_np, world_size))
.format(min_num_proc, world_size))

alloc_list = []
for host_info, ranks in host_ranks:
Expand Down
37 changes: 22 additions & 15 deletions horovod/runner/elastic/driver.py
Expand Up @@ -18,6 +18,7 @@
import queue
import threading
import time
import warnings

from collections import defaultdict

Expand Down Expand Up @@ -66,11 +67,18 @@ def get_results(self):


class ElasticDriver(object):
def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, cooldown_range=None, verbose=0):
def __init__(self, rendezvous,
discovery,
min_num_proc,
max_num_proc,
timeout=None,
reset_limit=None,
cooldown_range=None,
verbose=0):
self._rendezvous = rendezvous
self._host_manager = HostManager(discovery, cooldown_range)
self._min_np = min_np
self._max_np = max_np
self._min_num_proc = min_num_proc
self._max_num_proc = max_num_proc
self._verbose = verbose

self._host_assignments = {}
Expand All @@ -91,12 +99,12 @@ def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_li
self._discovery_thread.daemon = True
self._discovery_thread.start()

def start(self, np, create_worker_fn):
def start(self, num_proc, create_worker_fn):
self._create_worker_fn = create_worker_fn
self._activate_workers(np)
self._activate_workers(num_proc)

def resume(self):
self._activate_workers(self._min_np)
self._activate_workers(self._min_num_proc)

def stop(self, error_message=None):
self._results.set_error_message(error_message)
Expand Down Expand Up @@ -142,7 +150,7 @@ def has_rank_assignment(self, host, slot):
def host_assignments(self):
return self._host_assignments

def wait_for_available_slots(self, min_np, min_hosts=1):
def wait_for_available_slots(self, min_num_proc, min_hosts=1):
extra_message = ' An elastic job also requires that at least two hosts ' \
'are available to resolve compatible network interfaces. If you know which interfaces ' \
'are compatible in your network, set `--network-interface` to skip this check.' \
Expand All @@ -151,8 +159,7 @@ def wait_for_available_slots(self, min_np, min_hosts=1):
tmout = timeout.Timeout(
self._timeout,
message='Timed out waiting for {{activity}}. Please check that you have '
'enough resources to run at least {min_np} Horovod processes.{extra_message}'
.format(min_np=min_np, extra_message=extra_message))
f'enough resources to run at least {min_num_proc} Horovod processes.{extra_message}')

self._wait_hosts_cond.acquire()
try:
Expand All @@ -162,7 +169,7 @@ def wait_for_available_slots(self, min_np, min_hosts=1):
logging.debug(f"current available slots: {avail_slots}")
avail_hosts = len(current_hosts.available_hosts)
logging.debug(f"current available hosts: {avail_hosts}.")
if avail_slots >= min_np and avail_hosts >= min_hosts:
if avail_slots >= min_num_proc and avail_hosts >= min_hosts:
return current_hosts
if self._shutdown.is_set():
raise RuntimeError('Job has been shutdown, see above error messages for details.')
Expand All @@ -171,9 +178,9 @@ def wait_for_available_slots(self, min_np, min_hosts=1):
finally:
self._wait_hosts_cond.release()

def _activate_workers(self, min_np):
logging.info('wait for available slots: {}'.format(min_np))
current_hosts = self.wait_for_available_slots(min_np)
def _activate_workers(self, min_num_proc):
logging.info('wait for available slots: {}'.format(min_num_proc))
current_hosts = self.wait_for_available_slots(min_num_proc)
pending_slots = self._update_host_assignments(current_hosts)
self._worker_registry.reset(self.world_size())
self._start_worker_processes(pending_slots)
Expand Down Expand Up @@ -202,7 +209,7 @@ def _discover_hosts(self):

def _notify_workers_host_changes(self, current_hosts, update_res):
next_host_assignments = {}
if current_hosts.count_available_slots() >= self._min_np:
if current_hosts.count_available_slots() >= self._min_num_proc:
# Assignments are required to be stable via contract
next_host_assignments, _ = self._get_host_assignments(current_hosts)

Expand Down Expand Up @@ -268,7 +275,7 @@ def _get_host_assignments(self, current_hosts):
# Adjust the host assignments to account for added / removed hosts
host_list = [hosts.HostInfo(host, current_hosts.get_slots(host))
for host in current_hosts.host_assignment_order]
host_assignments_list = hosts.get_host_assignments(host_list, self._min_np, self._max_np)
host_assignments_list = hosts.get_host_assignments(host_list, self._min_num_proc, self._max_num_proc)
host_assignments = defaultdict(list)
for slot_info in host_assignments_list:
host_assignments[slot_info.hostname].append(slot_info)
Expand Down
21 changes: 14 additions & 7 deletions horovod/runner/elastic/settings.py
Expand Up @@ -17,14 +17,21 @@


class ElasticSettings(BaseSettings):
def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, cooldown_range=None, **kwargs):
def __init__(self,
discovery,
min_num_proc,
max_num_proc,
elastic_timeout,
reset_limit,
cooldown_range=None,
**kwargs):
"""
:param discovery: object used to detect and manage available hosts
:type discovery: horovod.runner.elastic.discovery.HostDiscovery
:param min_np: minimum number of processes
:type min_np: int
:param max_np: maximum number of processes
:type max_np: int
:param min_num_proc: minimum number of processes
:type min_num_proc: int
:param max_num_proc: maximum number of processes
:type max_num_proc: int
:param elastic_timeout: timeout for elastic initialisation after re-scaling in seconds
:type elastic_timeout: int
:param reset_limit: maximum number of resets after which the job is terminated
Expand All @@ -34,8 +41,8 @@ def __init__(self, discovery, min_np, max_np, elastic_timeout, reset_limit, cool
"""
super(ElasticSettings, self).__init__(elastic=True, **kwargs)
self.discovery = discovery
self.min_np = min_np
self.max_np = max_np
self.min_num_proc = min_num_proc
self.max_num_proc = max_num_proc
self.elastic_timeout = elastic_timeout
self.reset_limit = reset_limit
self.cooldown_range=cooldown_range
Expand Down
2 changes: 1 addition & 1 deletion horovod/runner/gloo_run.py
Expand Up @@ -304,7 +304,7 @@ def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfa
_mkdir_p(settings.output_filename)

driver = ElasticDriver(rendezvous, settings.discovery,
settings.min_np, settings.max_np,
settings.min_num_proc, settings.max_num_proc,
timeout=settings.elastic_timeout,
reset_limit=settings.reset_limit,
cooldown_range=settings.cooldown_range,
Expand Down

0 comments on commit 980ce05

Please sign in to comment.