Permalink
Browse files

Optimize Horovod Timeline and add Cycle Markers (#782)

  • Loading branch information...
alsrgv committed Jan 26, 2019
1 parent 3171a47 commit c314949a60fca30674388b69e730c83b79d22ff5
@@ -4,3 +4,42 @@
[submodule "third_party/eigen"]
path = third_party/eigen
url = https://github.com/eigenteam/eigen-git-mirror.git
[submodule "third_party/boost/assert"]
path = third_party/boost/assert
url = https://github.com/boostorg/assert.git
[submodule "third_party/boost/lockfree"]
path = third_party/boost/lockfree
url = https://github.com/boostorg/lockfree.git
[submodule "third_party/boost/static_assert"]
path = third_party/boost/static_assert
url = https://github.com/boostorg/static_assert.git
[submodule "third_party/boost/parameter"]
path = third_party/boost/parameter
url = https://github.com/boostorg/parameter.git
[submodule "third_party/boost/utility"]
path = third_party/boost/utility
url = https://github.com/boostorg/utility.git
[submodule "third_party/boost/config"]
path = third_party/boost/config
url = https://github.com/boostorg/config.git
[submodule "third_party/boost/core"]
path = third_party/boost/core
url = https://github.com/boostorg/core.git
[submodule "third_party/boost/type_traits"]
path = third_party/boost/type_traits
url = https://github.com/boostorg/type_traits.git
[submodule "third_party/boost/preprocessor"]
path = third_party/boost/preprocessor
url = https://github.com/boostorg/preprocessor.git
[submodule "third_party/boost/iterator"]
path = third_party/boost/iterator
url = https://github.com/boostorg/iterator.git
[submodule "third_party/boost/mpl"]
path = third_party/boost/mpl
url = https://github.com/boostorg/mpl.git
[submodule "third_party/boost/detail"]
path = third_party/boost/detail
url = https://github.com/boostorg/detail.git
[submodule "third_party/boost/predef"]
path = third_party/boost/predef
url = https://github.com/boostorg/predef.git
@@ -1,4 +1,4 @@
recursive-include * *.h *.cc *.md
recursive-include * *.h *.hpp *.cc *.md

include LICENSE horovod.lds horovod.exp
prune .eggs
@@ -41,3 +41,20 @@ workers were early and which were late.

* In case of `HOROVOD_HIERARCHICAL_ALLREDUCE=1`, *NCCL_ALLREDUCE* will become a sequence or a subsequence of *NCCL_REDUCESCATTER*,
*NCCL_REDUCE*, *MEMCPY_IN_HOST_BUFFER*, *MPI_ALLREDUCE*, *MEMCPY_OUT_HOST_BUFFER*, *NCCL_ALLGATHER*, *NCCL_BCAST*.

### Adding cycle markers

Horovod performs work in cycles. These cycles are used to aid [Tensor Fusion](tensor-fusion.md).

Horovod has the ability to record the moment when each cycle starts for debugging of Tensor Fusion.

![Cycle Markers](https://user-images.githubusercontent.com/16640218/51659458-64806100-1f5f-11e9-9a27-ba934ceec75f.png)

Since this information makes timeline view very crowded, it is not enabled by default.

To add cycle markers to the timeline, set the `HOROVOD_TIMELINE_MARK_CYCLES` environment variable to `1`:

```bash
$ HOROVOD_TIMELINE=/path/to/timeline.json HOROVOD_TIMELINE_MARK_CYCLES=1 \
mpirun -np 4 -x HOROVOD_TIMELINE python train.py
```
@@ -1,5 +1,5 @@
// Copyright 2016 The TensorFlow Authors. All Rights Reserved.
// Modifications copyright (C) 2018 Uber Technologies, Inc.
// Modifications copyright (C) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -151,13 +151,13 @@ void MPIRequest_SerializeToWire(const MPIRequest& request,

} // namespace

void MPIRequest::ParseFromString(MPIRequest& request,
const std::string& input) {
auto obj = flatbuffers::GetRoot<wire::MPIRequest>((uint8_t*)input.c_str());
void MPIRequest::ParseFromBytes(MPIRequest& request, const uint8_t* input) {
auto obj = flatbuffers::GetRoot<wire::MPIRequest>(input);
MPIRequest_ParseFromWire(request, obj);
}

void MPIRequest::SerializeToString(MPIRequest& request, std::string& output) {
void MPIRequest::SerializeToString(const MPIRequest& request,
std::string& output) {
flatbuffers::FlatBufferBuilder builder(1024);
flatbuffers::Offset<wire::MPIRequest> obj;
MPIRequest_SerializeToWire(request, builder, obj);
@@ -180,23 +180,26 @@ bool MPIRequestList::shutdown() const { return shutdown_; }

void MPIRequestList::set_shutdown(bool value) { shutdown_ = value; }

void MPIRequestList::add_requests(const MPIRequest& value) {
void MPIRequestList::add_request(const MPIRequest& value) {
requests_.push_back(value);
}

void MPIRequestList::ParseFromString(MPIRequestList& request_list,
const std::string& input) {
auto obj =
flatbuffers::GetRoot<wire::MPIRequestList>((uint8_t*)input.c_str());
void MPIRequestList::emplace_request(MPIRequest&& value) {
requests_.emplace_back(value);
}

void MPIRequestList::ParseFromBytes(MPIRequestList& request_list,
const uint8_t* input) {
auto obj = flatbuffers::GetRoot<wire::MPIRequestList>(input);
for (const auto& req_obj : *obj->requests()) {
MPIRequest request;
MPIRequest_ParseFromWire(request, req_obj);
request_list.add_requests(std::move(request));
request_list.emplace_request(std::move(request));
}
request_list.set_shutdown(obj->shutdown());
}

void MPIRequestList::SerializeToString(MPIRequestList& request_list,
void MPIRequestList::SerializeToString(const MPIRequestList& request_list,
std::string& output) {
// FlatBuffers must be built bottom-up.
flatbuffers::FlatBufferBuilder builder(1024);
@@ -269,7 +272,7 @@ void MPIResponse::set_tensor_names(const std::vector<std::string>& value) {
tensor_names_ = value;
}

void MPIResponse::add_tensor_names(const std::string& value) {
void MPIResponse::add_tensor_name(const std::string& value) {
tensor_names_.push_back(value);
}

@@ -285,7 +288,7 @@ void MPIResponse::set_devices(const std::vector<int32_t>& value) {
devices_ = value;
}

void MPIResponse::add_devices(int32_t value) { devices_.push_back(value); }
void MPIResponse::add_device(int32_t value) { devices_.push_back(value); }

const std::vector<int64_t>& MPIResponse::tensor_sizes() const {
return tensor_sizes_;
@@ -295,26 +298,25 @@ void MPIResponse::set_tensor_sizes(const std::vector<int64_t>& value) {
tensor_sizes_ = value;
}

void MPIResponse::add_tensor_sizes(int64_t value) {
void MPIResponse::add_tensor_size(int64_t value) {
tensor_sizes_.push_back(value);
}

void MPIResponse::add_allgather_response(
horovod::common::MPIResponse response) {
void MPIResponse::add_allgather_response(const MPIResponse& response) {
assert(response_type() == MPIResponse::ResponseType::ALLGATHER);
assert(response.tensor_names().size() == 1);
assert(response.devices() == devices());
add_tensor_names(response.tensor_names()[0]);
for (auto size: response.tensor_sizes()){
add_tensor_sizes(size);
add_tensor_name(response.tensor_names()[0]);
for (auto size : response.tensor_sizes()) {
add_tensor_size(size);
}
}

void MPIResponse_ParseFromWire(MPIResponse& response,
const wire::MPIResponse* obj) {
response.set_response_type((MPIResponse::ResponseType)obj->response_type());
for (const auto& tensor_name_obj : *obj->tensor_names()) {
response.add_tensor_names(tensor_name_obj->str());
response.add_tensor_name(tensor_name_obj->str());
}
response.set_error_message(obj->error_message()->str());
response.set_devices(
@@ -323,15 +325,14 @@ void MPIResponse_ParseFromWire(MPIResponse& response,
obj->tensor_sizes()->end()));
}

void MPIResponse::ParseFromString(MPIResponse& response,
const std::string& input) {
auto obj = flatbuffers::GetRoot<wire::MPIResponse>((uint8_t*)input.c_str());
void MPIResponse::ParseFromBytes(MPIResponse& response, const uint8_t* input) {
auto obj = flatbuffers::GetRoot<wire::MPIResponse>(input);
MPIResponse_ParseFromWire(response, obj);
}

void MPIResponse_SerializeToWire(const MPIResponse& response,
flatbuffers::FlatBufferBuilder& builder,
flatbuffers::Offset<wire::MPIResponse>& obj) {
flatbuffers::FlatBufferBuilder& builder,
flatbuffers::Offset<wire::MPIResponse>& obj) {
// FlatBuffers must be built bottom-up.
auto tensor_names_wire =
builder.CreateVectorOfStrings(response.tensor_names());
@@ -349,7 +350,7 @@ void MPIResponse_SerializeToWire(const MPIResponse& response,
obj = response_builder.Finish();
}

void MPIResponse::SerializeToString(MPIResponse& response,
void MPIResponse::SerializeToString(const MPIResponse& response,
std::string& output) {
flatbuffers::FlatBufferBuilder builder(1024);
flatbuffers::Offset<wire::MPIResponse> obj;
@@ -373,24 +374,27 @@ bool MPIResponseList::shutdown() const { return shutdown_; }

void MPIResponseList::set_shutdown(bool value) { shutdown_ = value; }

void MPIResponseList::add_responses(const MPIResponse& value) {
void MPIResponseList::add_response(const MPIResponse& value) {
responses_.push_back(value);
}

void MPIResponseList::ParseFromString(MPIResponseList& response_list,
const std::string& input) {
auto obj =
flatbuffers::GetRoot<wire::MPIResponseList>((uint8_t*)input.c_str());
void MPIResponseList::emplace_response(MPIResponse&& value) {
responses_.emplace_back(value);
}

void MPIResponseList::ParseFromBytes(MPIResponseList& response_list,
const uint8_t* input) {
auto obj = flatbuffers::GetRoot<wire::MPIResponseList>(input);
for (const auto& resp_obj : *obj->responses()) {
MPIResponse response;
MPIResponse_ParseFromWire(response, resp_obj);
response_list.add_responses(std::move(response));
response_list.emplace_response(std::move(response));
}
response_list.set_shutdown(obj->shutdown());
}

void MPIResponseList::SerializeToString(MPIResponseList& response_list,
std::string& output) {
void MPIResponseList::SerializeToString(const MPIResponseList& response_list,
std::string& output) {
// FlatBuffers must be built bottom-up.
flatbuffers::FlatBufferBuilder builder(1024);
std::vector<flatbuffers::Offset<wire::MPIResponse>> responses;
@@ -1,5 +1,5 @@
// Copyright 2016 The TensorFlow Authors. All Rights Reserved.
// Modifications copyright (C) 2018 Uber Technologies, Inc.
// Modifications copyright (C) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -72,8 +72,8 @@ class MPIRequest {
void set_tensor_shape(const std::vector<int64_t>& value);
void add_tensor_shape(int64_t value);

static void ParseFromString(MPIRequest& request, const std::string& input);
static void SerializeToString(MPIRequest& request, std::string& output);
static void ParseFromBytes(MPIRequest& request, const uint8_t* input);
static void SerializeToString(const MPIRequest& request, std::string& output);

private:
int32_t request_rank_ = 0;
@@ -89,13 +89,14 @@ class MPIRequestList {
public:
const std::vector<MPIRequest>& requests() const;
void set_requests(const std::vector<MPIRequest>& value);
void add_requests(const MPIRequest& value);
void add_request(const MPIRequest& value);
void emplace_request(MPIRequest&& value);
bool shutdown() const;
void set_shutdown(bool value);

static void ParseFromString(MPIRequestList& request_list,
const std::string& input);
static void SerializeToString(MPIRequestList& request_list,
static void ParseFromBytes(MPIRequestList& request_list,
const uint8_t* input);
static void SerializeToString(const MPIRequestList& request_list,
std::string& output);

private:
@@ -110,12 +111,7 @@ class MPIRequestList {
// an error message instead.
class MPIResponse {
public:
enum ResponseType {
ALLREDUCE = 0,
ALLGATHER = 1,
BROADCAST = 2,
ERROR = 3
};
enum ResponseType { ALLREDUCE = 0, ALLGATHER = 1, BROADCAST = 2, ERROR = 3 };

static const std::string& ResponseType_Name(ResponseType value);

@@ -126,27 +122,29 @@ class MPIResponse {
const std::vector<std::string>& tensor_names() const;
const std::string tensor_names_string() const;
void set_tensor_names(const std::vector<std::string>& value);
void add_tensor_names(const std::string& value);
void add_tensor_name(const std::string& value);

// Empty unless response_type is ERROR.
const std::string& error_message() const;
void set_error_message(const std::string& value);

const std::vector<int32_t>& devices() const;
void set_devices(const std::vector<int32_t>& value);
void add_devices(int32_t value);
void add_device(int32_t value);

// Empty unless response_type is ALLGATHER.
// These tensor sizes are the dimension zero sizes of all the input matrices,
// indexed by the rank.
const std::vector<int64_t>& tensor_sizes() const;
void set_tensor_sizes(const std::vector<int64_t>& value);
void add_tensor_sizes(int64_t value);
void add_tensor_size(int64_t value);

// To fuse multiple allgather responses
void add_allgather_response(MPIResponse response);
void add_allgather_response(const MPIResponse& response);

static void ParseFromString(MPIResponse& response, const std::string& input);
static void SerializeToString(MPIResponse& response, std::string& output);
static void ParseFromBytes(MPIResponse& response, const uint8_t* input);
static void SerializeToString(const MPIResponse& response,
std::string& output);

private:
ResponseType response_type_ = ResponseType::ALLREDUCE;
@@ -160,13 +158,14 @@ class MPIResponseList {
public:
const std::vector<MPIResponse>& responses() const;
void set_responses(const std::vector<MPIResponse>& value);
void add_responses(const MPIResponse& value);
void add_response(const MPIResponse& value);
void emplace_response(MPIResponse&& value);
bool shutdown() const;
void set_shutdown(bool value);

static void ParseFromString(MPIResponseList& response_list,
const std::string& input);
static void SerializeToString(MPIResponseList& response_list,
static void ParseFromBytes(MPIResponseList& response_list,
const uint8_t* input);
static void SerializeToString(const MPIResponseList& response_list,
std::string& output);

private:
Oops, something went wrong.

0 comments on commit c314949

Please sign in to comment.