Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide the timeout value in useful places #2436

Merged
merged 2 commits into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 2 additions & 18 deletions horovod/common/gloo/gloo_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,6 @@ constexpr auto CreateDevice = gloo::transport::uv::CreateDevice;
namespace horovod {
namespace common {

// Horovod Gloo rendezvous knobs.
#define HOROVOD_GLOO_TIMEOUT_SECONDS "HOROVOD_GLOO_TIMEOUT_SECONDS"
#define HOROVOD_GLOO_RENDEZVOUS_ADDR "HOROVOD_GLOO_RENDEZVOUS_ADDR"
#define HOROVOD_GLOO_RENDEZVOUS_PORT "HOROVOD_GLOO_RENDEZVOUS_PORT"
#define HOROVOD_GLOO_GLOBAL_PREFIX "global"
#define HOROVOD_GLOO_LOCAL_PREFIX "local_"
#define HOROVOD_GLOO_CROSS_PREFIX "cross_"
#define HOROVOD_GLOO_GET_RANK_AND_SIZE "rank_and_size"
#define HOROVOD_HOSTNAME "HOROVOD_HOSTNAME"
#define HOROVOD_RANK "HOROVOD_RANK"
#define HOROVOD_SIZE "HOROVOD_SIZE"
#define HOROVOD_LOCAL_RANK "HOROVOD_LOCAL_RANK"
#define HOROVOD_LOCAL_SIZE "HOROVOD_LOCAL_SIZE"
#define HOROVOD_CROSS_RANK "HOROVOD_CROSS_RANK"
#define HOROVOD_CROSS_SIZE "HOROVOD_CROSS_SIZE"
#define HOROVOD_ELASTIC "HOROVOD_ELASTIC"

int ParseNextInt(std::stringstream& ss) {
assert(ss.good());

Expand Down Expand Up @@ -90,7 +73,8 @@ std::shared_ptr<gloo::Context> Rendezvous(const std::string& prefix,
store.reset(new MemoryStore());
}
LOG(DEBUG) << prefix << " rendezvous started for rank=" << rank << ", size=" << size
<< ", dev={" << dev->str() << "}";
<< ", dev={" << dev->str() << "}, timeout="
<< std::to_string(std::chrono::duration_cast<std::chrono::seconds>(timeout).count());

auto context = std::make_shared<gloo::rendezvous::Context>(rank, size);
context->setTimeout(timeout);
Expand Down
17 changes: 17 additions & 0 deletions horovod/common/gloo/gloo_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@
#include "../mpi/mpi_context.h"
#endif

// Horovod Gloo rendezvous knobs.
#define HOROVOD_GLOO_TIMEOUT_SECONDS "HOROVOD_GLOO_TIMEOUT_SECONDS"
#define HOROVOD_GLOO_RENDEZVOUS_ADDR "HOROVOD_GLOO_RENDEZVOUS_ADDR"
#define HOROVOD_GLOO_RENDEZVOUS_PORT "HOROVOD_GLOO_RENDEZVOUS_PORT"
#define HOROVOD_GLOO_GLOBAL_PREFIX "global"
#define HOROVOD_GLOO_LOCAL_PREFIX "local_"
#define HOROVOD_GLOO_CROSS_PREFIX "cross_"
#define HOROVOD_GLOO_GET_RANK_AND_SIZE "rank_and_size"
#define HOROVOD_HOSTNAME "HOROVOD_HOSTNAME"
#define HOROVOD_RANK "HOROVOD_RANK"
#define HOROVOD_SIZE "HOROVOD_SIZE"
#define HOROVOD_LOCAL_RANK "HOROVOD_LOCAL_RANK"
#define HOROVOD_LOCAL_SIZE "HOROVOD_LOCAL_SIZE"
#define HOROVOD_CROSS_RANK "HOROVOD_CROSS_RANK"
#define HOROVOD_CROSS_SIZE "HOROVOD_CROSS_SIZE"
#define HOROVOD_ELASTIC "HOROVOD_ELASTIC"

namespace horovod {
namespace common {

Expand Down
8 changes: 6 additions & 2 deletions horovod/common/gloo/http_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// ============================================================================

#include "http_store.h"
#include "gloo_context.h"

#include <cstring>
#include <iostream>
Expand Down Expand Up @@ -49,8 +50,11 @@ void HTTPStore::wait(const std::vector<std::string>& keys,
const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start);
if (timeout != gloo::kNoTimeout && elapsed > timeout) {
GLOO_THROW_IO_EXCEPTION(GLOO_ERROR_MSG("Wait timeout for key(s): ",
::gloo::MakeString(keys)));
auto timeout_seconds = std::chrono::duration_cast<std::chrono::seconds>(timeout);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Can you also add a suggestion here to try increasing HOROVOD_GLOO_TIMEOUT_SECONDS?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, that should be in the exception message, but this part of the code does not know (and should not make this assumption) where the timeout value is coming from, it is just an argument to this method. The env var is defined and read in gloo_context.cc.

Ideally, the timeout exception should be caught there and re-thrown with an enhanced message saying where to set the timeout. For that, the exception must be of a very specific type, like gloo timeout exception for that to work.

My hope is seeing the timeout at all in the logs helps to realize that whatever timeout knob you used is the wrong one. Not ideal but better than nothing.

Any better idea?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move the reading of HOROVOD_GLOO_TIMEOUT_SECONDS into this class. It's never set to anything else in practice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no gloo:TimeoutException, that would have been too good... Then I would rather add the env var to the error message, though it is bad practise, but moving the env var in this code would be worse I think.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having the env variable in the error is very useful. Otherwise users will not know how to resolve the issue, and will ask questions on GitHub that could be answered by the error message (this has happened a few times). I agree it's making an assumption that isn't guaranteed by the interface, but I think it's worth it. What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the information is more useful for users than clean code separation for developers in this case, I will go for it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to move the constants into gloo_context.h: 6cd9213

GLOO_THROW_IO_EXCEPTION(GLOO_ERROR_MSG("Wait timeout after ", std::to_string(timeout_seconds.count()),
" seconds for key(s): ", ::gloo::MakeString(keys),
". You may want to increase the timeout via ",
HOROVOD_GLOO_TIMEOUT_SECONDS));
}
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(10));
Expand Down
8 changes: 6 additions & 2 deletions horovod/common/gloo/memory_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// ============================================================================

#include "memory_store.h"
#include "gloo_context.h"

#include <chrono>
#include <thread>
Expand Down Expand Up @@ -47,8 +48,11 @@ void MemoryStore::wait(const std::vector<std::string>& keys,
auto now = std::chrono::steady_clock::now();
const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - start);
if (timeout != gloo::kNoTimeout && elapsed > timeout) {
GLOO_THROW_IO_EXCEPTION(GLOO_ERROR_MSG("Wait timeout for key(s): ",
::gloo::MakeString(keys)));
auto timeout_seconds = std::chrono::duration_cast<std::chrono::seconds>(timeout);
GLOO_THROW_IO_EXCEPTION(GLOO_ERROR_MSG("Wait timeout after ", std::to_string(timeout_seconds.count()),
" seconds for key(s): ", ::gloo::MakeString(keys),
". You may want to increase the timeout via ",
HOROVOD_GLOO_TIMEOUT_SECONDS));
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
Expand Down
9 changes: 8 additions & 1 deletion horovod/runner/common/util/timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

class Timeout(object):
def __init__(self, timeout, message):
self._timeout = timeout
self._timeout_at = time.time() + timeout
self._message = message

Expand All @@ -29,4 +30,10 @@ def timed_out(self):

def check_time_out_for(self, activity):
if self.timed_out():
raise Exception(self._message.format(activity=activity))
raise Exception(
'{}{} Timeout after {} seconds.'.format(
self._message.format(activity=activity),
'.' if not self._message.rstrip().endswith('.') else '',
self._timeout
)
)
31 changes: 16 additions & 15 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,6 @@ def build_extensions(self):
# python packages required to use horovod in general
require_list = ['cloudpickle', 'psutil', 'pyyaml', 'dataclasses;python_version<"3.7"']

# python packages required / recommended to develop horovod
# e.g., set of framework versions pinned for development, keep in sync with Dockerfile.test.cpu
# NOTE: do not use versions with +cpu or +gpu here as users would need to add --find-links to pip
dev_require_list = ['tensorflow-cpu==1.15.0',
'keras==2.2.4',
'torch==1.2.0',
'torchvision==0.4.0',
'mxnet==1.5.0',
'pyspark==2.4.7']

# python packages required only to run tests
# Pin h5py: https://github.com/h5py/h5py/issues/1732
test_require_list = ['mock', 'pytest', 'pytest-forked', 'parameterized', 'h5py<3']

# framework dependencies
tensorflow_require_list = ['tensorflow']
tensorflow_cpu_require_list = ['tensorflow-cpu']
Expand All @@ -119,7 +105,8 @@ def build_extensions(self):
mxnet_require_list = ['mxnet>=1.4.1']
pyspark_require_list = ['pyspark>=2.3.2;python_version<"3.8"',
'pyspark>=3.0.0;python_version>="3.8"']
spark_require_list = ['h5py>=2.9', 'numpy', 'petastorm>=0.9.0,!=0.9.3', 'pyarrow>=0.15.0'] + \
# Pin h5py: https://github.com/h5py/h5py/issues/1732
spark_require_list = ['h5py<3', 'numpy', 'petastorm>=0.9.0,!=0.9.3', 'pyarrow>=0.15.0'] + \
pyspark_require_list
ray_require_list = ['ray']

Expand All @@ -131,6 +118,20 @@ def build_extensions(self):
mxnet_require_list + \
spark_require_list

# python packages required / recommended to develop horovod
# e.g., set of framework versions pinned for development, keep in sync with Dockerfile.test.cpu
# NOTE: do not use versions with +cpu or +gpu here as users would need to add --find-links to pip
dev_require_list = ['tensorflow-cpu==1.15.0',
'keras==2.2.4',
'torch==1.2.0',
'torchvision==0.4.0',
'mxnet==1.5.0',
'pyspark==2.4.7'] + spark_require_list

# python packages required only to run tests
# Pin h5py: https://github.com/h5py/h5py/issues/1732
test_require_list = ['mock', 'pytest', 'pytest-forked', 'parameterized', 'h5py<3']

# Skip cffi if pytorch extension explicitly disabled
if not os.environ.get('HOROVOD_WITHOUT_PYTORCH'):
require_list.append('cffi>=1.4.0')
Expand Down
6 changes: 3 additions & 3 deletions test/integration/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def test_task_service_wait_for_command_start_without_timeout(self):

def test_task_service_wait_for_command_start_with_timeout(self):
with spark_task_service(0) as (task, client, _):
tmout = timeout.Timeout(1.0, 'timed out waiting for {activity}')
tmout = timeout.Timeout(1.0, 'Timed out waiting for {activity}.')
start = time.time()
d = delay(lambda: client.run_command('true', {}), 0.5)
task.wait_for_command_start(tmout)
Expand All @@ -209,10 +209,10 @@ def test_task_service_wait_for_command_start_with_timeout(self):
d.join()

with spark_task_service(0) as (task, client, _):
tmout = timeout.Timeout(1.0, 'timed out waiting for {activity}')
tmout = timeout.Timeout(1.0, 'Timed out waiting for {activity}.')
start = time.time()
d = delay(lambda: client.run_command('true', {}), 1.5)
with pytest.raises(Exception, match='^timed out waiting for command to run$'):
with pytest.raises(Exception, match='^Timed out waiting for command to run. Timeout after 1.0 seconds.$'):
task.wait_for_command_start(tmout)
duration = time.time() - start
self.assertGreaterEqual(duration, 1.0)
Expand Down