Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Gloo controller #1181

Merged
merged 8 commits into from Aug 10, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -8,42 +8,42 @@ repository=823773083436.dkr.ecr.us-east-1.amazonaws.com/buildkite

# list of all the tests
tests=( \
test-cpu-openmpi-py2_7-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
test-cpu-openmpi-py3_5-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
test-cpu-openmpi-py3_6-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
test-cpu-openmpi-py2_7-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
test-cpu-openmpi-py3_5-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
test-cpu-openmpi-py3_6-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
test-cpu-openmpi-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-py3_5-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
# test-cpu-openmpi-py2_7-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
# test-cpu-openmpi-py3_5-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
# test-cpu-openmpi-py3_6-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
# test-cpu-openmpi-py2_7-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
# test-cpu-openmpi-py3_5-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
# test-cpu-openmpi-py3_6-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
# test-cpu-openmpi-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
# test-cpu-openmpi-py3_5-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
# test-cpu-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-gloo-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-gloo-py3_5-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-gloo-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-gloo-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-gloo-py3_5-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-gloo-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-py2_7-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-cpu-openmpi-py3_5-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-cpu-openmpi-py3_6-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-cpu-openmpi-py2_7-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-cpu-openmpi-py3_6-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-cpu-mpich-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-cpu-mlsl-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-gpu-openmpi-py2_7-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
test-gpu-openmpi-py3_5-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
test-gpu-openmpi-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
# test-cpu-openmpi-py2_7-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-cpu-openmpi-py3_5-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-cpu-openmpi-py3_6-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-cpu-openmpi-py2_7-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
# test-cpu-openmpi-py3_6-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
# test-cpu-mpich-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-cpu-mlsl-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-gpu-openmpi-py2_7-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
# test-gpu-openmpi-py3_5-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
# test-gpu-openmpi-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
# test-gpu-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-gloo-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-gloo-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-openmpi-gloo-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-openmpi-gloo-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-openmpi-py2_7-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-gpu-openmpi-py3_6-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-gpu-openmpi-py2_7-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-gpu-openmpi-py3_6-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-gpu-mpich-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-mixed-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-gpu-openmpi-py2_7-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-gpu-openmpi-py3_6-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-gpu-openmpi-py2_7-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
# test-gpu-openmpi-py3_6-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
# test-gpu-mpich-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-mixed-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
)

build_test() {
@@ -109,7 +109,8 @@ void GlooContext::Initialize(const std::string& gloo_iface) {
const std::string local_scope =
std::string(HOROVOD_GLOO_LOCAL_PREFIX) + std::to_string(cross_rank);
auto local_rendezvous = HTTPStore(rendezvous_server_addr,
rendezvous_server_port, local_scope, rank);
rendezvous_server_port, local_scope,
local_rank);
LOG(DEBUG) << "Local Rendezvous started for rank " << rank
<< ", total size of " << local_size;
auto local_context =
@@ -122,7 +123,8 @@ void GlooContext::Initialize(const std::string& gloo_iface) {
const std::string cross_scope =
std::string(HOROVOD_GLOO_CROSS_PREFIX) + std::to_string(local_rank);
auto cross_rendezvous = HTTPStore(rendezvous_server_addr,
rendezvous_server_port, cross_scope, rank);
rendezvous_server_port, cross_scope,
cross_rank);
LOG(DEBUG) << "Cross-node Rendezvous started for rank " << rank
<< ", total size of " << size;
auto cross_context =
@@ -28,16 +28,16 @@
namespace horovod {
namespace common {

HTTPStore::~HTTPStore() {
PerformHTTP(std::to_string(rank_), std::vector<char>(), FINALIZE);
}
HTTPStore::~HTTPStore() { HTTPDELETE(std::to_string(rank_)); }

void HTTPStore::set(const std::string& key, const std::vector<char>& data) {
PerformHTTP(key, data, SET);
HTTPPUT(key, data);
}

std::vector<char> HTTPStore::get(const std::string& key) {
return PerformHTTP(key, std::vector<char>(), GET);
std::vector<char> result;
HTTPGET(key, result);
return result;
}

void HTTPStore::wait(const std::vector<std::string>& keys,
@@ -57,68 +57,63 @@ void HTTPStore::wait(const std::vector<std::string>& keys,
}

bool HTTPStore::CheckKeys(const std::vector<std::string>& keys) {
std::vector<char> result;
for (const auto& key : keys) {
if (PerformHTTP(key, std::vector<char>(), GET).empty()) {
if (!HTTPGET(key, result)) {
return false;
}
}
return true;
}

// Perform http request to rendezvous server with retry logic
std::vector<char> HTTPStore::PerformHTTP(const std::string& key,
const std::vector<char>& data,
Type type) {
http::Response HTTPStore::PerformHTTP(http::Request& request,
const std::string& method = HTTP_GET,
const std::string& body = "") {
int retry_cnt = 0;

while (retry_cnt < 3) {
while (retry_cnt < MAX_RETRY_TIME) {
try {
switch (type){
case GET:
return HTTPGET(key);
case SET:
HTTPPUT(key, data);
return std::vector<char>();
case FINALIZE:
HTTPDELETE(key);
return std::vector<char>();
http::Response response = request.send(method, body);
if (response.status != HTTP_OK && response.status != HTTP_NOT_FOUND) {
LOG(WARNING) << "HTTP response not OK, got" << response.status;
} else {
return response;
}
} catch (std::runtime_error& e) {
retry_cnt++;
} catch (std::exception& e) {
LOG(DEBUG) << "Exception: " << e.what();
if (retry_cnt >= 3) {
std::cerr << "HTTP request failed too many times, aborting. See "
"exception message above.";
throw e;
}
}

// sleep for 500ms before another try.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
retry_cnt++;
if (retry_cnt >= 3) {
LOG(ERROR) << "HTTP GET request failed too many times, aborting. See "
"exception message above.";
throw std::runtime_error("HTTP request failed.");
}

// sleep for 500ms before another try.
std::this_thread::sleep_for(
std::chrono::milliseconds(RETRY_WAITING_TIME_MILLSEC));
}
return std::vector<char>();

return http::Response();
}

std::vector<char> HTTPStore::HTTPGET(const std::string& key) {
bool HTTPStore::HTTPGET(const std::string& key, std::vector<char>& result) {
std::string url = "http://" + server_ip_ + ":" +
std::to_string(server_port_) + "/" + scope_ + "/" + key;
LOG(DEBUG) << "Send GET request to " << url;
http::Request request(url);
http::Response response = request.send("GET");

if (response.status != 200) {
std::string msg("HTTP response not OK, got ");
msg += std::to_string(response.status);
throw std::runtime_error(msg);
}
http::Response response = PerformHTTP(request, HTTP_GET);

if (response.body.size() == 0) {
LOG(DEBUG) << "Receive empty body, with status code " << response.status;
// If the key is not present, return false.
if (response.status == 404) {
return false;
} else {
This conversation was marked as resolved by zsh-thu

This comment has been minimized.

Copy link
@tgaddair

tgaddair Aug 9, 2019

Collaborator

Nit: you can remove the else here.

result.clear();
result.insert(result.begin(), response.body.begin(), response.body.end());
return true;
}
std::vector<char> result(response.body.begin(), response.body.end());

LOG(DEBUG) << "Got response with length " << response.body.size();
return result;
}

void HTTPStore::HTTPPUT(const std::string& key, const std::vector<char>& data) {
@@ -130,27 +125,15 @@ void HTTPStore::HTTPPUT(const std::string& key, const std::vector<char>& data) {
std::string body;
body.insert(body.size(), data.data(), data.size());

http::Response response = request.send("PUT", body);
if (response.status != 200) {
std::string msg("HTTP response not OK, got ");
msg += std::to_string(response.status);
throw std::runtime_error(msg);
}
http::Response response = PerformHTTP(request, HTTP_PUT, body);
}

void HTTPStore::HTTPDELETE(const std::string& key) {
std::string url = "http://" + server_ip_ + ":" +
std::to_string(server_port_) + "/" + scope_ + "/" + key;
LOG(DEBUG) << "Send GET request to " << url;
http::Request request(url);
http::Response response = request.send("DELETE");

if (response.status != 200) {
std::string msg("HTTP response not OK, got ");
msg += std::to_string(response.status);
throw std::runtime_error(msg);
}

http::Response response = PerformHTTP(request, HTTP_DELETE);
}

} // namespace common
@@ -22,9 +22,17 @@
namespace horovod {
namespace common {

#define MAX_RETRY_TIME 3
#define RETRY_WAITING_TIME_MILLSEC 500
#define HTTP_GET "GET"
#define HTTP_PUT "PUT"
#define HTTP_DELETE "DELETE"
#define HTTP_OK 200
#define HTTP_NOT_FOUND 404
class HTTPStore : public gloo::rendezvous::Store {
public:
HTTPStore(const std::string server_ip, int port, const std::string scope, int rank)
HTTPStore(const std::string server_ip, int port, const std::string scope,
int rank)
: server_ip_(server_ip), server_port_(port), scope_(scope), rank_(rank) {}

~HTTPStore() override;
@@ -43,15 +51,22 @@ class HTTPStore : public gloo::rendezvous::Store {
bool CheckKeys(const std::vector<std::string>& keys);

protected:
enum Type { GET, SET, FINALIZE };

std::vector<char> PerformHTTP(const std::string& key,
const std::vector<char>& data, Type type);

std::vector<char> HTTPGET(const std::string& key);

// Send HTTP request to server, retry if the status code is not 200 (OK) or
// 404 (Key not found).
http::Response PerformHTTP(http::Request& request, const std::string& method,
const std::string& body);

// HTTP GET: result is an out parameter for retrieved value for the key.
// Return a bool representing whether the key is found in the store.
bool HTTPGET(const std::string& key, std::vector<char>& result);

// HTTP PUT: send HTTP PUT request to server with the key and value data.
// The key is a string and will be embed into the url; the data is
// the PUT body.
void HTTPPUT(const std::string& key, const std::vector<char>& data);

// HTTP DELETE: send HTTP DELETE request to server, informing the server that
// this rank has finished.
void HTTPDELETE(const std::string& key);

std::string server_ip_;
@@ -18,9 +18,8 @@
LOG_LEVEL_STR = ['FATAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'TRACE']

# List of regular expressions to ignore environment variables by.
IGNORE_REGEXES = {'BASH_FUNC_.*\(\)', 'OLDPWD', 'MICHELANGELO_INSTANCE_ID',
'PELOTON_INSTANCE_ID', 'PELOTON_TASK_ID'}
IGNORE_REGEXES = {'BASH_FUNC_.*\(\)', 'OLDPWD'}


def is_exportable(v):
return not any(re.match(r, v) for r in IGNORE_REGEXES)
return not any(re.match(r, v) for r in IGNORE_REGEXES)
@@ -162,13 +162,16 @@ def _exec_command(_command, _index):
run_command=_run_command
)
else:
env = os.environ.copy()
command = \
'ssh -o StrictHostKeyChecking=no {host} {ssh_port_arg} ' \
'\'{horovod_env} ' \
'\'{horovod_env} {env} ' \
'{run_command}\''.format(
host=host_name,
ssh_port_arg=ssh_port_arg,
horovod_env=horovod_rendez_env,
env=' '.join(quote('%s=%s' % (quote(key), quote(value))) for key, value in env.items()
if env_util.is_exportable(key)),
run_command=_run_command
)
args_list.append([command, alloc_info.rank])
@@ -191,7 +194,7 @@ def gloo_run(args, remote_host_names, common_intfs):
# Start rendezvous server and get port that it is listening
global_rendezv_port = global_rendezv.start_server(host_alloc_plan)

# get the server address
# get the server ipv4 address
iface = list(common_intfs)[0]
server_ip = None
for addr in net_if_addrs()[iface]:
@@ -202,21 +205,18 @@ def gloo_run(args, remote_host_names, common_intfs):
raise RuntimeError(
'Cannot find an ipv4 address of the common interface.')

env = os.environ.copy()
run_command = (
'HOROVOD_GLOO_RENDEZVOUS_ADDR={addr} '
'HOROVOD_GLOO_RENDEZVOUS_PORT={port} '
'HOROVOD_CONTROLLER=gloo '
'HOROVOD_CPU_OPERATIONS=gloo '
'HOROVOD_IFACE={iface} '
'NCCL_SOCKET_IFNAME={common_intfs} '
'{env} {command}' # expect a lot of environment variables
'{command}' # expect a lot of environment variables
.format(addr=server_ip,
port=global_rendezv_port,
iface=iface, # TODO: add multiple ifaces in future
common_intfs=','.join(common_intfs),
env=' '.join(quote('%s=%s' % (quote(key), quote(value))) for key, value in env.items()
if env_util.is_exportable(key)),
command=' '.join(quote(par) for par in args.command))
)

@@ -46,12 +46,16 @@ def do_GET(self):

_, scope, key = paths
with self.server.cache_lock:
value = self.server.cache.get(scope, {}).get(key, bytes(''))
value = self.server.cache.get(scope, {}).get(key, None)

self.send_response(200)
self.send_header("Content-Length", str(len(value)))
self.end_headers()
self.wfile.write(value)
if value is None:
self.send_status_code(404)

This conversation was marked as resolved by zsh-thu

This comment has been minimized.

Copy link
@tgaddair

tgaddair Aug 9, 2019

Collaborator

Nit: remove extra line between if/else.

else:
self.send_response(200)
self.send_header("Content-Length", str(len(value)))
self.end_headers()
self.wfile.write(value)

# Override PUT handler
def do_PUT(self):
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.