Skip to content

Commit

Permalink
Reduce horovodrun start timeout & improve messages (#1024)
Browse files Browse the repository at this point in the history
* Reduce horovodrun start timeout & improve messages

Signed-off-by: Alex Sergeev <alsrgv@users.noreply.github.com>

* Avoid an error message during interpreter shutdown

Signed-off-by: Alex Sergeev <alsrgv@users.noreply.github.com>
  • Loading branch information
alsrgv committed Apr 28, 2019
1 parent bed3acb commit d628989
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 18 deletions.
2 changes: 1 addition & 1 deletion horovod/run/common/service/task_service.py
Expand Up @@ -107,7 +107,7 @@ def wait_for_initial_registration(self, timeout):
try:
while not self._initial_registration_complete:
self._wait_cond.wait(timeout.remaining())
timeout.check_time_out_for('Spark tasks to start')
timeout.check_time_out_for('tasks to start')
finally:
self._wait_cond.release()

Expand Down
8 changes: 3 additions & 5 deletions horovod/run/common/util/timeout.py
Expand Up @@ -17,8 +17,9 @@


class Timeout(object):
def __init__(self, timeout):
def __init__(self, timeout, message):
self._timeout_at = time.time() + timeout
self._message = message

def remaining(self):
return max(0, self._timeout_at - time.time())
Expand All @@ -28,7 +29,4 @@ def timed_out(self):

def check_time_out_for(self, activity):
if self.timed_out():
raise Exception('Timed out waiting for %s. Please check that you have enough resources '
'to run all Horovod processes. Each Horovod process runs in a Spark task. '
'You may need to increase the start_timeout parameter to a larger value '
'if your Spark resources are allocated on-demand.' % activity)
raise Exception(self._message.format(activity=activity))
14 changes: 9 additions & 5 deletions horovod/run/run.py
Expand Up @@ -17,7 +17,6 @@
import argparse
import hashlib
import os
import re
import sys
import traceback
import six
Expand Down Expand Up @@ -314,11 +313,11 @@ def parse_args():
"Otherwise, all the checks will run every time "
"horovodrun is called.")

parser.add_argument('--horovod-start-timeout', action="store",
parser.add_argument('--start-timeout', action="store",
dest="start_timeout",
help="Horovodrun has to perform all the checks and "
"start the processes before the specified "
"timeout. The default value is 600 seconds. "
"timeout. The default value is 30 seconds. "
"Alternatively, The environment variable "
"HOROVOD_START_TIMEOUT can also be used to "
"specify the initialization timeout.")
Expand Down Expand Up @@ -361,12 +360,17 @@ def run():
start_timeout = args.start_timeout
else:
# Lookup default timeout from the environment variable.
start_timeout = int(os.getenv('HOROVOD_START_TIMEOUT', '600'))
start_timeout = int(os.getenv('HOROVOD_START_TIMEOUT', '30'))

tmout = timeout.Timeout(start_timeout,
message='Timed out waiting for {activity}. Please '
'check connectivity between servers. You '
'may need to increase the --start-timeout '
'parameter if you have too many servers.')
settings = hvd_settings.Settings(verbose=args.verbose,
ssh_port=args.ssh_port,
key=secret.make_secret_key(),
timeout=timeout.Timeout(start_timeout),
timeout=tmout,
num_hosts=len(all_host_names),
num_proc=args.np)

Expand Down
9 changes: 7 additions & 2 deletions horovod/spark/__init__.py
Expand Up @@ -15,7 +15,6 @@

import os
import pyspark
import re
from six.moves import queue
import sys
import threading
Expand Down Expand Up @@ -116,9 +115,15 @@ def run(fn, args=(), kwargs={}, num_proc=None, start_timeout=None, env=None,
# Lookup default timeout from the environment variable.
start_timeout = int(os.getenv('HOROVOD_SPARK_START_TIMEOUT', '600'))

tmout = timeout.Timeout(start_timeout,
message='Timed out waiting for {activity}. Please check that you have '
'enough resources to run all Horovod processes. Each Horovod '
'process runs in a Spark task. You may need to increase the '
'start_timeout parameter to a larger value if your Spark resources '
'are allocated on-demand.')
settings = hvd_settings.Settings(verbose=verbose,
key=secret.make_secret_key(),
timeout=timeout.Timeout(start_timeout))
timeout=tmout)

spark_context = pyspark.SparkContext._active_spark_context
if spark_context is None:
Expand Down
14 changes: 9 additions & 5 deletions horovod/spark/task/mpirun_exec_fn.py
Expand Up @@ -24,11 +24,15 @@


def parent_process_monitor(initial_ppid):
while True:
if initial_ppid != os.getppid():
# Parent process died, terminate
os._exit(1)
time.sleep(1)
try:
while True:
if initial_ppid != os.getppid():
# Parent process died, terminate
os._exit(1)
time.sleep(1)
except:
# Avoids an error message during Python interpreter shutdown.
pass


def main(driver_addresses, settings):
Expand Down

0 comments on commit d628989

Please sign in to comment.