Skip to content

Commit

Permalink
Move KVStoreServer code to where driver ip is known, revert earlier c…
Browse files Browse the repository at this point in the history
…hanges

Reverts changes to run_task.py, launch.py and http_client.py.

Signed-off-by: Enrico Minack <github@enrico.minack.dev>
  • Loading branch information
EnricoMi committed Jun 16, 2022
1 parent f3f72f9 commit a3484cf
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 143 deletions.
2 changes: 0 additions & 2 deletions horovod/runner/__init__.py
Expand Up @@ -135,7 +135,6 @@ def run(
:param min_num_proc: Minimum number of processes running for training to continue. If number of
available processes dips below this threshold, then training will wait for
more instances to become available. Defaults to num_proc
Consider providing network_interface to speed up elastic tasks startup.
:param max_num_proc: Maximum number of training processes, beyond which no additional processes
will be created. If not specified, then will be unbounded.
:param slots: Number of slots for processes per host. Normally 1 slot per GPU per host.
Expand All @@ -162,7 +161,6 @@ def run(
Providing a discovery script enables elastic training.
The job will fail immediately if execution of the script returns a non-zero exit
code on the first call. Subsequent calls will be retried until timeout.
Consider providing network_interface to speed up elastic tasks startup.
:param start_timeout: Horovodrun has to perform all the checks and
start the processes before the specified
timeout. The default value is 30 seconds.
Expand Down
68 changes: 48 additions & 20 deletions horovod/runner/gloo_run.py
Expand Up @@ -21,15 +21,16 @@
import sys
import threading
import time

from shlex import quote
from typing import Optional, Any

from horovod.runner.common.util import env as env_util, safe_shell_exec
from horovod.runner.common.util.hosts import get_host_assignments, parse_hosts
from horovod.runner.driver import driver_service
from horovod.runner.elastic.driver import ElasticDriver
from horovod.runner.elastic.rendezvous import create_rendezvous_handler
from horovod.runner.http.http_server import RendezvousServer
from horovod.runner.http.http_client import read_data_from_kvstore, put_data_into_kvstore
from horovod.runner.http.http_server import KVStoreServer, RendezvousServer
from horovod.runner.util import network, threads
from horovod.runner.util.remote import get_remote_command

Expand Down Expand Up @@ -299,7 +300,7 @@ def gloo_run(settings, nics, env, server_ip, command):
launch_gloo(command, exec_command, settings, nics, env, server_ip)


def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous):
def launch_gloo_elastic(command_or_func, exec_command, settings, env, get_common_interfaces, rendezvous, executable):
# Make the output directory if it does not exist
if settings.output_filename:
_mkdir_p(settings.output_filename)
Expand All @@ -317,29 +318,56 @@ def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfa

nics = get_common_interfaces(driver)
server_ip = network.get_driver_ip(nics)
run_func_server = None
run_func_server_port = None

event = register_shutdown_event()
run_command = get_run_command(command, server_ip, nics, global_rendezv_port, elastic=True)
if settings.run_func_mode:
# when running a func, we have to spin up the KVStoreServer
# to get the func to the remote process and the result back
run_func_server = KVStoreServer(verbose=settings.verbose)
run_func_server_port = run_func_server.start_server()
put_data_into_kvstore(server_ip, run_func_server_port, 'runfunc', 'func', command_or_func)

create_worker = _create_elastic_worker_fn(exec_command, run_command, env, event)
command = [executable, '-m', 'horovod.runner.run_task', ','.join(server_ip), str(run_func_server_port)]
else:
command = command_or_func

driver.start(settings.num_proc, create_worker)
res = driver.get_results()
driver.stop()
try:
event = register_shutdown_event()
run_command = get_run_command(command, server_ip, nics, global_rendezv_port, elastic=True)

if res.error_message is not None:
raise RuntimeError(res.error_message)
create_worker = _create_elastic_worker_fn(exec_command, run_command, env, event)

for name, value in sorted(res.worker_results.items(), key=lambda item: item[1][1]):
exit_code, timestamp = value
if exit_code != 0:
raise RuntimeError('Horovod detected that one or more processes exited with non-zero '
'status, thus causing the job to be terminated. The first process '
'to do so was:\nProcess name: {name}\nExit code: {code}\n'
.format(name=name, code=exit_code))
driver.start(settings.num_proc, create_worker)
res = driver.get_results()
driver.stop()

if res.error_message is not None:
raise RuntimeError(res.error_message)

for name, value in sorted(res.worker_results.items(), key=lambda item: item[1][1]):
exit_code, timestamp = value
if exit_code != 0:
raise RuntimeError('Horovod detected that one or more processes exited with non-zero '
'status, thus causing the job to be terminated. The first process '
'to do so was:\nProcess name: {name}\nExit code: {code}\n'
.format(name=name, code=exit_code))

# fetch the result if running a func
if settings.run_func_mode:
results = [None] * settings.min_num_proc
# TODO: make it parallel to improve performance
for i in range(settings.min_num_proc):
results[i] = read_data_from_kvstore(server_ip, run_func_server_port, 'runfunc_result', str(i))
return results

return None
finally:
if run_func_server:
run_func_server.shutdown_server()


def gloo_run_elastic(settings, env, command):
def gloo_run_elastic(settings, env, command_or_func, executable) -> Optional[Any]:

def get_common_interfaces(driver):
# Host-to-host common interface detection requires at least 2 hosts in an elastic job.
Expand All @@ -349,4 +377,4 @@ def get_common_interfaces(driver):

exec_command = _exec_command_fn(settings)
rendezvous = RendezvousServer(settings.verbose)
launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous)
return launch_gloo_elastic(command_or_func, exec_command, settings, env, get_common_interfaces, rendezvous, executable)
9 changes: 4 additions & 5 deletions horovod/runner/http/http_client.py
Expand Up @@ -12,34 +12,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================
import socket
from urllib.error import HTTPError, URLError
from urllib.request import Request
from urllib.request import urlopen

from horovod.runner.common.util import codec


def read_data_from_kvstore(addr, port, scope, key, timeout=None):
def read_data_from_kvstore(addr, port, scope, key):
try:
url = "http://{addr}:{port}/{scope}/{key}".format(
addr=addr, port=str(port), scope=scope, key=key
)
req = Request(url)
resp = urlopen(req, timeout=timeout if timeout else socket.getdefaulttimeout())
resp = urlopen(req)
# TODO: remove base64 encoding because base64 is not efficient
return codec.loads_base64(resp.read())
except (HTTPError, URLError) as e:
raise RuntimeError("Read data from KVStore server failed.", e)


def put_data_into_kvstore(addr, port, scope, key, value, timeout=None):
def put_data_into_kvstore(addr, port, scope, key, value):
try:
url = "http://{addr}:{port}/{scope}/{key}".format(
addr=addr, port=str(port), scope=scope, key=key
)
req = Request(url, data=codec.dumps_base64(value, to_ascii=False))
req.get_method = lambda: "PUT" # for urllib2 compatibility
urlopen(req, timeout=timeout if timeout else socket.getdefaulttimeout())
urlopen(req)
except (HTTPError, URLError) as e:
raise RuntimeError("Put data input KVStore server failed.", e)
114 changes: 29 additions & 85 deletions horovod/runner/launch.py
Expand Up @@ -580,35 +580,34 @@ def parse_args():
return args


def _get_ips(nics):
import psutil
import socket

ips = set()
for intf, intf_addresses in psutil.net_if_addrs().items():
if nics and intf not in nics:
continue
for addr in intf_addresses:
if addr.family == socket.AF_INET:
ips.add(str(addr.address))

if len(ips) == 0:
if nics:
raise ValueError(f'No IP found for NICs {nics}')
else:
raise ValueError('No IPv4 IP found found')

# move localhost IP to the back
localhost = '127.0.0.1'
ips = list(ips)
if localhost in ips:
pos = ips.index(localhost)
ips = ips[:pos] + ips[pos+1:] + [ips[pos]]

return list(ips)
def _run_static(args):
# horovodrun has to finish all the checks before this timeout runs out.
if args.start_timeout:
start_timeout = args.start_timeout
else:
# Lookup default timeout from the environment variable.
start_timeout = int(os.getenv('HOROVOD_START_TIMEOUT', '30'))

tmout = timeout.Timeout(start_timeout,
message='Timed out waiting for {activity}. Please '
'check connectivity between servers. You '
'may need to increase the --start-timeout '
'parameter if you have too many servers.')
settings = hvd_settings.Settings(verbose=2 if args.verbose else 0,
ssh_port=args.ssh_port,
ssh_identity_file=args.ssh_identity_file,
extra_mpi_args=args.mpi_args,
tcp_flag=args.tcp_flag,
binding_args=args.binding_args,
key=secret.make_secret_key(),
start_timeout=tmout,
num_proc=args.num_proc,
hosts=args.hosts,
output_filename=args.output_filename,
run_func_mode=args.run_func is not None,
nics=args.nics,
prefix_output_with_timestamp=args.prefix_output_with_timestamp)

def _get_nics(args, settings):
# This cache stores the results of checks performed by horovod
# during the initialization step. It can be disabled by setting
# --disable-cache flag.
Expand Down Expand Up @@ -644,39 +643,8 @@ def _get_nics(args, settings):
if settings.verbose >= 2:
print('SSH was successful into all the remote hosts.')

return driver_service.get_common_interfaces(settings, all_host_names,
remote_host_names, fn_cache)


def _run_static(args):
# horovodrun has to finish all the checks before this timeout runs out.
if args.start_timeout:
start_timeout = args.start_timeout
else:
# Lookup default timeout from the environment variable.
start_timeout = int(os.getenv('HOROVOD_START_TIMEOUT', '30'))

tmout = timeout.Timeout(start_timeout,
message='Timed out waiting for {activity}. Please '
'check connectivity between servers. You '
'may need to increase the --start-timeout '
'parameter if you have too many servers.')
settings = hvd_settings.Settings(verbose=2 if args.verbose else 0,
ssh_port=args.ssh_port,
ssh_identity_file=args.ssh_identity_file,
extra_mpi_args=args.mpi_args,
tcp_flag=args.tcp_flag,
binding_args=args.binding_args,
key=secret.make_secret_key(),
start_timeout=tmout,
num_proc=args.num_proc,
hosts=args.hosts,
output_filename=args.output_filename,
run_func_mode=args.run_func is not None,
nics=args.nics,
prefix_output_with_timestamp=args.prefix_output_with_timestamp)
nics = driver_service.get_common_interfaces(settings, all_host_names, remote_host_names, fn_cache)

nics = _get_nics(args, settings)
if args.run_func:
# get the driver IPv4 address
driver_ip = network.get_driver_ip(nics)
Expand Down Expand Up @@ -752,32 +720,8 @@ def _run_elastic(args):

env = os.environ.copy()
config_parser.set_env_from_args(env, args)

# get the driver IPv4 address
driver_ips = _get_ips(settings.nics)
if args.run_func:
run_func_server = KVStoreServer(verbose=settings.verbose)
run_func_server_port = run_func_server.start_server()
put_data_into_kvstore(driver_ips[0], run_func_server_port,
'runfunc', 'func', args.run_func)

executable = args.executable or sys.executable
command = [executable, '-m', 'horovod.runner.run_task', ','.join(driver_ips), str(run_func_server_port)]

try:
gloo_run_elastic(settings, env, command)
np = (args.min_num_proc or args.num_proc)
results = [None] * np
# TODO: make it parallel to improve performance
for i in range(np):
results[i] = read_data_from_kvstore(driver_ips[0], run_func_server_port,
'runfunc_result', str(i))
return results
finally:
run_func_server.shutdown_server()
else:
gloo_run_elastic(settings, env, args.command)
return None
executable = args.executable or sys.executable
return gloo_run_elastic(settings, env, args.run_func if args.run_func else args.command, executable)


def is_gloo_used(use_gloo=None, use_mpi=None, use_jsrun=None):
Expand Down
34 changes: 4 additions & 30 deletions horovod/runner/run_task.py
Expand Up @@ -14,39 +14,13 @@
# =============================================================================

import sys
from urllib.error import URLError

from horovod.runner.common.util.env import get_env_rank_and_size
from horovod.runner.http.http_client import read_data_from_kvstore, put_data_into_kvstore


def _get_func(addrs, port, timeout=None):
# we try all provided addresses to connect to the kvstore
# the first addr that works will be returned, together with the run func
# we give each IP 5 seconds timeout, if that is not enough, the driver is not really well reachable
for addr in addrs:
try:
func = read_data_from_kvstore(addr, port, 'runfunc', 'func', timeout=timeout)
return addr, func
except RuntimeError as e:
# when there is only one addr in addrs, raise this error as is
# that was the behaviour before introducing multiple addrs
if len(addrs) == 1:
raise

# when the RuntimeError is caused by an URLError, the addr is probably not reachable for us
if len(e.args) >= 2 and isinstance(e.args[1], URLError):
# provide a warning when multiple addrs are provided on how to improve this situation
print(f'Driver is not reachable at {addr} within {timeout} seconds. '
f'Consider restricting the driver to some NICs, '
f'which reduces the number of IPs probed here: {e}')
continue

raise ValueError(f"None of the provided IPs could be used to connect to driver's KV store: {', '.join(addrs)}")


def main(addrs, port):
addr, func = _get_func(addrs, port, 10 if len(addrs) > 1 else None)
def main(addr, port):
func = read_data_from_kvstore(addr, port, 'runfunc', 'func')
try:
ret_val = func()
except BaseException as e:
Expand All @@ -58,6 +32,6 @@ def main(addrs, port):


if __name__ == '__main__':
_, driver_addrs, run_func_server_port_str = sys.argv
_, driver_addr, run_func_server_port_str = sys.argv
run_func_server_port = int(run_func_server_port_str)
main(driver_addrs.split(','), run_func_server_port)
main(driver_addr, run_func_server_port)
2 changes: 1 addition & 1 deletion horovod/spark/gloo_run.py
Expand Up @@ -99,4 +99,4 @@ def gloo_run_elastic(settings, driver, env, stdout=None, stderr=None):
exec_command = _exec_command_fn(driver, settings.key, settings, env,
stdout, stderr, settings.prefix_output_with_timestamp)
rendezvous = SparkRendezvousServer(driver, settings.verbose)
launch_gloo_elastic(command, exec_command, settings, env, lambda _: nics, rendezvous)
launch_gloo_elastic(command, exec_command, settings, env, lambda _: nics, rendezvous, sys.executable)

0 comments on commit a3484cf

Please sign in to comment.