Skip to content

Commit

Permalink
refractor HTTP rendezvous
Browse files Browse the repository at this point in the history
Signed-off-by: Sihan Zeng <zsh@uber.com>
  • Loading branch information
zsh-thu committed Aug 8, 2019
1 parent 2213083 commit df3d769
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 93 deletions.
52 changes: 26 additions & 26 deletions .buildkite/gen-pipeline.sh
Expand Up @@ -8,42 +8,42 @@ repository=823773083436.dkr.ecr.us-east-1.amazonaws.com/buildkite

# list of all the tests
tests=( \
test-cpu-openmpi-py2_7-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
test-cpu-openmpi-py3_5-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
test-cpu-openmpi-py3_6-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
test-cpu-openmpi-py2_7-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
test-cpu-openmpi-py3_5-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
test-cpu-openmpi-py3_6-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
test-cpu-openmpi-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-py3_5-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
# test-cpu-openmpi-py2_7-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
# test-cpu-openmpi-py3_5-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
# test-cpu-openmpi-py3_6-tf1_1_0-keras2_0_0-torch0_4_0-mxnet1_4_1-pyspark2_1_2 \
# test-cpu-openmpi-py2_7-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
# test-cpu-openmpi-py3_5-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
# test-cpu-openmpi-py3_6-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
# test-cpu-openmpi-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
# test-cpu-openmpi-py3_5-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
# test-cpu-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-gloo-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-gloo-py3_5-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-gloo-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-gloo-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-gloo-py3_5-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-gloo-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-cpu-openmpi-py2_7-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-cpu-openmpi-py3_5-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-cpu-openmpi-py3_6-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-cpu-openmpi-py2_7-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-cpu-openmpi-py3_6-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-cpu-mpich-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-cpu-mlsl-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-gpu-openmpi-py2_7-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
test-gpu-openmpi-py3_5-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
test-gpu-openmpi-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
# test-cpu-openmpi-py2_7-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-cpu-openmpi-py3_5-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-cpu-openmpi-py3_6-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-cpu-openmpi-py2_7-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
# test-cpu-openmpi-py3_6-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
# test-cpu-mpich-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-cpu-mlsl-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-gpu-openmpi-py2_7-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
# test-gpu-openmpi-py3_5-tf1_6_0-keras2_1_2-torch0_4_1-mxnet1_4_1-pyspark2_3_2 \
# test-gpu-openmpi-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
# test-gpu-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-gloo-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-gloo-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-openmpi-gloo-py2_7-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-openmpi-gloo-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_4_1-pyspark2_4_0 \
test-gpu-openmpi-py2_7-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-gpu-openmpi-py3_6-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-gpu-openmpi-py2_7-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-gpu-openmpi-py3_6-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
test-gpu-mpich-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
test-mixed-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-gpu-openmpi-py2_7-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-gpu-openmpi-py3_6-tf2_0_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-gpu-openmpi-py2_7-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
# test-gpu-openmpi-py3_6-tfhead-kerashead-torchhead-mxnethead-pyspark2_4_0 \
# test-gpu-mpich-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
# test-mixed-openmpi-py3_6-tf1_14_0-keras2_2_4-torch1_1_0-mxnet1_5_0-pyspark2_4_0 \
)

build_test() {
Expand Down
6 changes: 4 additions & 2 deletions horovod/common/gloo_context.cc
Expand Up @@ -109,7 +109,8 @@ void GlooContext::Initialize(const std::string& gloo_iface) {
const std::string local_scope =
std::string(HOROVOD_GLOO_LOCAL_PREFIX) + std::to_string(cross_rank);
auto local_rendezvous = HTTPStore(rendezvous_server_addr,
rendezvous_server_port, local_scope, rank);
rendezvous_server_port, local_scope,
local_rank);
LOG(DEBUG) << "Local Rendezvous started for rank " << rank
<< ", total size of " << local_size;
auto local_context =
Expand All @@ -122,7 +123,8 @@ void GlooContext::Initialize(const std::string& gloo_iface) {
const std::string cross_scope =
std::string(HOROVOD_GLOO_CROSS_PREFIX) + std::to_string(local_rank);
auto cross_rendezvous = HTTPStore(rendezvous_server_addr,
rendezvous_server_port, cross_scope, rank);
rendezvous_server_port, cross_scope,
cross_rank);
LOG(DEBUG) << "Cross-node Rendezvous started for rank " << rank
<< ", total size of " << size;
auto cross_context =
Expand Down
98 changes: 42 additions & 56 deletions horovod/common/rendezvous/http_rendezvous.cc
Expand Up @@ -29,15 +29,17 @@ namespace horovod {
namespace common {

HTTPStore::~HTTPStore() {
PerformHTTP(std::to_string(rank_), std::vector<char>(), FINALIZE);
HTTPDELETE(std::to_string(rank_));
}

void HTTPStore::set(const std::string& key, const std::vector<char>& data) {
PerformHTTP(key, data, SET);
HTTPPUT(key, data);
}

std::vector<char> HTTPStore::get(const std::string& key) {
return PerformHTTP(key, std::vector<char>(), GET);
std::vector<char> result;
HTTPGET(key, result);
return result;
}

void HTTPStore::wait(const std::vector<std::string>& keys,
Expand All @@ -57,68 +59,64 @@ void HTTPStore::wait(const std::vector<std::string>& keys,
}

bool HTTPStore::CheckKeys(const std::vector<std::string>& keys) {
std::vector<char> result;
for (const auto& key : keys) {
if (PerformHTTP(key, std::vector<char>(), GET).empty()) {
if (!HTTPGET(key, result)) {
return false;
}
}
return true;
}

// Perform http request to rendezvous server with retry logic
std::vector<char> HTTPStore::PerformHTTP(const std::string& key,
const std::vector<char>& data,
Type type) {
http::Response HTTPStore::PerformHTTP(http::Request& request,
const std::string& method = "GET",
const std::string& body = "") {
int retry_cnt = 0;

while (retry_cnt < 3) {
try {
switch (type){
case GET:
return HTTPGET(key);
case SET:
HTTPPUT(key, data);
return std::vector<char>();
case FINALIZE:
HTTPDELETE(key);
return std::vector<char>();
while(retry_cnt < 3) {
try{
http::Response response = request.send(method, body);
if (response.status != 200 && response.status != 404) {
LOG(WARNING) << "HTTP response not OK, got" << response.status;
} else {
return response;
}
} catch (std::runtime_error& e) {
retry_cnt++;
} catch (std::exception& e) {
LOG(DEBUG) << "Exception: " << e.what();
if (retry_cnt >= 3) {
std::cerr << "HTTP request failed too many times, aborting. See "
"exception message above.";
throw e;
}
}

// sleep for 500ms before another try.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
retry_cnt ++;
if (retry_cnt >= 3) {
LOG(ERROR) << "HTTP GET request failed too many times, aborting. See "
"exception message above.";
throw std::runtime_error("HTTP request failed.");
}

// sleep for 500ms before another try.
std::this_thread::sleep_for(std::chrono::milliseconds(500));

}
return std::vector<char>();

return http::Response();
}

std::vector<char> HTTPStore::HTTPGET(const std::string& key) {
bool HTTPStore::HTTPGET(const std::string& key, std::vector<char>& result) {
std::string url = "http://" + server_ip_ + ":" +
std::to_string(server_port_) + "/" + scope_ + "/" + key;
LOG(DEBUG) << "Send GET request to " << url;
http::Request request(url);
http::Response response = request.send("GET");

if (response.status != 200) {
std::string msg("HTTP response not OK, got ");
msg += std::to_string(response.status);
throw std::runtime_error(msg);
}
http::Response response = PerformHTTP(request, "GET");

if (response.body.size() == 0) {
LOG(DEBUG) << "Receive empty body, with status code " << response.status;
// If the key is not present, return false.
if(response.status == 404) {
return false;
}
else{
result.clear();
result.insert(result.begin(), response.body.begin(), response.body.end());
return true;
}
std::vector<char> result(response.body.begin(), response.body.end());

LOG(DEBUG) << "Got response with length " << response.body.size();
return result;
}

void HTTPStore::HTTPPUT(const std::string& key, const std::vector<char>& data) {
Expand All @@ -130,27 +128,15 @@ void HTTPStore::HTTPPUT(const std::string& key, const std::vector<char>& data) {
std::string body;
body.insert(body.size(), data.data(), data.size());

http::Response response = request.send("PUT", body);
if (response.status != 200) {
std::string msg("HTTP response not OK, got ");
msg += std::to_string(response.status);
throw std::runtime_error(msg);
}
http::Response response = PerformHTTP(request, "PUT", body);
}

void HTTPStore::HTTPDELETE(const std::string& key) {
std::string url = "http://" + server_ip_ + ":" +
std::to_string(server_port_) + "/" + scope_ + "/" + key;
LOG(DEBUG) << "Send GET request to " << url;
http::Request request(url);
http::Response response = request.send("DELETE");

if (response.status != 200) {
std::string msg("HTTP response not OK, got ");
msg += std::to_string(response.status);
throw std::runtime_error(msg);
}

http::Response response = PerformHTTP(request, "DELETE");
}

} // namespace common
Expand Down
6 changes: 3 additions & 3 deletions horovod/common/rendezvous/http_rendezvous.h
Expand Up @@ -45,10 +45,10 @@ class HTTPStore : public gloo::rendezvous::Store {
protected:
enum Type { GET, SET, FINALIZE };

std::vector<char> PerformHTTP(const std::string& key,
const std::vector<char>& data, Type type);
http::Response PerformHTTP(http::Request& request, const std::string& method,
const std::string& body);

std::vector<char> HTTPGET(const std::string& key);
bool HTTPGET(const std::string& key, std::vector<char>& result);

void HTTPPUT(const std::string& key, const std::vector<char>& data);

Expand Down
14 changes: 9 additions & 5 deletions horovod/run/rendezvous/http_server.py
Expand Up @@ -46,12 +46,16 @@ def do_GET(self):

_, scope, key = paths
with self.server.cache_lock:
value = self.server.cache.get(scope, {}).get(key, bytes(''))
value = self.server.cache.get(scope, {}).get(key, None)

self.send_response(200)
self.send_header("Content-Length", str(len(value)))
self.end_headers()
self.wfile.write(value)
if value is None:
self.send_status_code(404)

else:
self.send_response(200)
self.send_header("Content-Length", str(len(value)))
self.end_headers()
self.wfile.write(value)

# Override PUT handler
def do_PUT(self):
Expand Down
2 changes: 1 addition & 1 deletion horovod/run/run.py
Expand Up @@ -351,7 +351,7 @@ def run():
"using gloo.")

if not args.host:
if args.hostfiles:
if args.hostfile:
args.host = parse_host_files(args.hostfiles)
else:
# Set hosts to localhost if not specified
Expand Down

0 comments on commit df3d769

Please sign in to comment.