Skip to content

Commit

Permalink
Merge branch 'develop' into asgd_optimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaishaonvjituizi committed May 6, 2022
2 parents 3a21267 + 2c5cecb commit 46e5362
Show file tree
Hide file tree
Showing 521 changed files with 18,296 additions and 9,465 deletions.
1 change: 1 addition & 0 deletions AUTHORS.md
Expand Up @@ -57,6 +57,7 @@
| reyoung | Yang Yu |
| [Sand3r-](https://raw.githubusercontent.com/jczaja/Paddle/paddle-poland-team/doc/images/paddle_poland_team.jpg)| Michal Gallus |
| [sfraczek](https://raw.githubusercontent.com/jakpiase/Paddle/new_paddle_intel_authors/img/img.jpg)| Sylwester Fraczek |
| Silv3S | Slawomir Siwek |
| sneaxiy | Jin-Le Zeng |
| Superjom | Chun-Wei Yan |
| tensor-tang | Jian Tang |
Expand Down
8 changes: 6 additions & 2 deletions CMakeLists.txt
Expand Up @@ -100,7 +100,11 @@ if(APPLE AND WITH_ARM)
endif()

if(WITH_ASCEND_CL AND NOT WITH_ASCEND_CXX11)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_GLIBCXX_USE_CXX11_ABI=0")
if(WITH_ARM_BRPC)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_GLIBCXX_USE_CXX11_ABI=1")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_GLIBCXX_USE_CXX11_ABI=0")
endif()
endif()

if(WIN32)
Expand Down Expand Up @@ -386,7 +390,7 @@ if(WITH_DISTRIBUTE)
if(LINUX)
set(WITH_GLOO ON CACHE STRING "Enable GLOO when compiling WITH_DISTRIBUTE=ON." FORCE)
endif()
if(WITH_ASCEND_CL)
if(WITH_ASCEND_CL AND NOT WITH_ARM_BRPC)
# disable WITH_PSCORE for NPU before include third_party
MESSAGE(WARNING "Disable WITH_PSCORE when compiling with NPU. Force WITH_PSCORE=OFF.")
set(WITH_PSCORE OFF CACHE BOOL "Disable WITH_PSCORE when compiling with NPU" FORCE)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -49,7 +49,7 @@ Now our developers can acquire Tesla V100 online computing resources for free. I
[Click here to learn more](https://github.com/PaddlePaddle/Fleet)


- **High-Performance Inference Engines for Comprehensive Deployment Enviroments**
- **High-Performance Inference Engines for Comprehensive Deployment Environments**

PaddlePaddle is not only compatible with models trained in 3rd party open-source frameworks , but also offers complete inference products for various production scenarios. Our inference product line includes [Paddle Inference](https://paddle-inference.readthedocs.io/en/latest/product_introduction/summary.html): Native inference library for high-performance server and cloud inference; [Paddle Serving](https://github.com/PaddlePaddle/Serving): A service-oriented framework suitable for distributed and pipeline productions; [Paddle Lite](https://github.com/PaddlePaddle/Paddle-Lite): Ultra-Lightweight inference engine for mobile and IoT environments; [Paddle.js](https://www.paddlepaddle.org.cn/paddle/paddlejs): A frontend inference engine for browser and mini-apps. Furthermore, by great amounts of optimization with leading hardware in each scenario, Paddle inference engines outperform most of the other mainstream frameworks.

Expand Down
2 changes: 1 addition & 1 deletion cmake/coverallsGcovJsons.cmake
Expand Up @@ -238,7 +238,7 @@ foreach (GCOV_FILE ${GCOV_FILES})
message("MD5: ${GCOV_SRC_PATH} = ${GCOV_CONTENTS_MD5}")

# Loads the gcov file as a list of lines.
# (We first open the file and replace all occurences of [] with _
# (We first open the file and replace all occurrences of [] with _
# because CMake will fail to parse a line containing unmatched brackets...
# also the \ to escaped \n in macros screws up things.)
# https://public.kitware.com/Bug/view.php?id=15369
Expand Down
4 changes: 2 additions & 2 deletions cmake/external/xpu.cmake
Expand Up @@ -9,15 +9,15 @@ SET(XPU_RT_LIB_NAME "libxpurt.so")

if(NOT DEFINED XPU_BASE_URL)
SET(XPU_BASE_URL_WITHOUT_DATE "https://baidu-kunlun-product.cdn.bcebos.com/KL-SDK/klsdk-dev")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220411")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220425")
else()
SET(XPU_BASE_URL "${XPU_BASE_URL}")
endif()

# ubuntu and centos: use output by XDNN API team
if(NOT DEFINED XPU_XDNN_BASE_URL)
SET(XPU_XDNN_BASE_URL_WITHOUT_DATE "https://klx-sdk-release-public.su.bcebos.com/xdnn/dev")
SET(XPU_XDNN_BASE_URL "${XPU_XDNN_BASE_URL_WITHOUT_DATE}/20220412")
SET(XPU_XDNN_BASE_URL "${XPU_XDNN_BASE_URL_WITHOUT_DATE}/20220425")
else()
SET(XPU_XDNN_BASE_URL "${XPU_XDNN_BASE_URL}")
endif()
Expand Down
5 changes: 4 additions & 1 deletion cmake/flags.cmake
Expand Up @@ -158,12 +158,15 @@ if(WITH_IPU)
)
endif()

if(WITH_ASCEND_CL AND WITH_ARM_BRPC)
set(COMMON_FLAGS ${COMMON_FLAGS} -faligned-new)
endif()

if(NOT APPLE)
if((${CMAKE_CXX_COMPILER_VERSION} VERSION_GREATER 8.0) OR (WITH_ROCM))
set(COMMON_FLAGS
${COMMON_FLAGS}
-Wno-format-truncation # Warning in boost gcc 8.2
-Wno-error=cast-function-type # Warning in boost gcc 8.2
-Wno-error=parentheses # Warning in boost gcc 8.2
-Wno-error=catch-value # Warning in boost gcc 8.2
-Wno-error=nonnull-compare # Warning in boost gcc 8.2
Expand Down
110 changes: 106 additions & 4 deletions paddle/fluid/distributed/collective/ProcessGroupHeter.cc
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
#include <chrono>
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
Expand All @@ -24,6 +25,8 @@ namespace paddle {
namespace distributed {

using Place = paddle::platform::Place;
int ProcessGroupHeter::send_count = 0;
int ProcessGroupHeter::recv_count = 0;

std::shared_ptr<ProcessGroupHeter::HeterTask> ProcessGroupHeter::CreateTask(
int rank, CommType comm_type, const std::vector<phi::DenseTensor>& inputs) {
Expand All @@ -47,15 +50,19 @@ bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
ProcessGroupHeter::ProcessGroupHeter(
const std::shared_ptr<Store>& store, int rank, int size,
const platform::Place& place, int gid, int local_rank, int local_size,
int gloo_rank, int gloo_size, bool with_switch, std::string switch_endpoint)
int gloo_rank, int gloo_size, bool with_switch, std::string switch_endpoint,
int src_rank, int dst_rank)
: ProcessGroup(rank, size, place, gid),
store_(store),
local_rank_(local_rank),
local_size_(local_size),
gloo_rank_(gloo_rank),
gloo_size_(gloo_size),
with_switch_(with_switch),
switch_endpoint_(switch_endpoint) {
switch_endpoint_(switch_endpoint),
src_rank_(src_rank),
dst_rank_(dst_rank) {
return;
#if defined(PADDLE_WITH_NCCL)
inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
place_, IGNORE_ID);
Expand Down Expand Up @@ -116,7 +123,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
auto dense_cpu_tensor = cpu_tensors[0];
std::vector<int> send_size;
std::vector<int64_t> send_size;
send_size.push_back(dense_cpu_tensor.numel());
int ret = client_->Send(
gid_, {dense_cpu_tensor.name()}, send_size, dense_cpu_tensor.data(),
Expand Down Expand Up @@ -212,7 +219,7 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
auto dense_cpu_tensor = cpu_tensors[0];
if (gloo_rank_ == 0) {
std::vector<int> send_size;
std::vector<int64_t> send_size;
send_size.push_back(dense_cpu_tensor.numel());
int ret = client_->Send(
gid_, {dense_cpu_tensor.name()}, send_size,
Expand Down Expand Up @@ -246,5 +253,100 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
return CreateTask(rank_, CommType::BROADCAST, in_tensors);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(
std::vector<phi::DenseTensor>& in_tensors, int peer) {
#if defined(PADDLE_WITH_NCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif

PADDLE_ENFORCE_EQ(
in_tensors.size(), 1,
platform::errors::PreconditionNotMet(
"For each send operation, there can only be one tensor to send."));
// Copy Tensor to cpu
auto start = std::chrono::high_resolution_clock::now();
phi::DenseTensor cpu_tensor;
auto& gpu_tensor = in_tensors[0];
framework::TensorCopySync(gpu_tensor, platform::CPUPlace(), &cpu_tensor);
PADDLE_ENFORCE_EQ(with_switch_, true,
platform::errors::PreconditionNotMet(
"Gloo does not support the send operation."));
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
<< ") from gpu to cpu for send " << std::setw(9)
<< " is: " << diff.count() << " s" << std::endl;

// Send to switch
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
int64_t tensor_size =
cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype());
std::vector<int64_t> send_size;
send_size.push_back(tensor_size);
auto id = src_rank_ * 10000 + dst_rank_;
std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
std::string("_") + std::to_string(send_count++);
VLOG(2) << "tensor_name:" << tensor_name;
int ret = client_->Send(gid_, {tensor_name}, send_size, cpu_tensor.data(),
tensor_size);
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"Send to the switch module error."));
return CreateTask(rank_, CommType::SEND, in_tensors);
}

std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
std::vector<phi::DenseTensor>& out_tensors, int peer) {
#if defined(PADDLE_WITH_NCCL)
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(out_tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
#endif

PADDLE_ENFORCE_EQ(
out_tensors.size(), 1,
platform::errors::PreconditionNotMet(
"For each rece operation, there can only be one tensor to receive."));

// Copy Tensor to cpu
phi::DenseTensor cpu_tensor;
auto& gpu_tensor = out_tensors[0];
cpu_tensor.Resize(gpu_tensor.dims());
cpu_tensor.set_layout(gpu_tensor.layout());
cpu_tensor.mutable_data(platform::CPUPlace(), gpu_tensor.dtype());

PADDLE_ENFORCE_EQ(with_switch_, true,
platform::errors::PreconditionNotMet(
"Gloo does not support the send operation."));
// recv from switch
HeterClient* client_ =
HeterClient::GetInstance({switch_endpoint_}, {}, 0).get();
auto id = src_rank_ * 10000 + dst_rank_;
std::string tensor_name = std::to_string(gid_) + "_id_" + std::to_string(id) +
std::string("_") + std::to_string(recv_count++);
VLOG(2) << "tensor_name: " << tensor_name;
auto start = std::chrono::high_resolution_clock::now();
int ret = client_->Recv(
gid_, {tensor_name}, cpu_tensor.data(),
cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"receive to the switch module error."));
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start;
double goodput = cpu_tensor.numel() *
framework::DataTypeSize(cpu_tensor.dtype()) / diff.count();
VLOG(2) << "Goodput: " << goodput << "B/s" << std::endl;
start = std::chrono::high_resolution_clock::now();
framework::TensorCopySync(cpu_tensor, gpu_tensor.place(), &gpu_tensor);
end = std::chrono::high_resolution_clock::now();
diff = end - start;
VLOG(2) << "Time to copy tensor of dims(" << cpu_tensor.dims()
<< ") from gpu to cpu for recv " << std::setw(9)
<< " is: " << diff.count() << " s" << std::endl;
return CreateTask(rank_, CommType::RECV, out_tensors);
}

} // namespace distributed
} // namespace paddle
13 changes: 12 additions & 1 deletion paddle/fluid/distributed/collective/ProcessGroupHeter.h
Expand Up @@ -83,7 +83,8 @@ class ProcessGroupHeter : public ProcessGroup {
ProcessGroupHeter(const std::shared_ptr<Store>& store, int rank, int size,
const platform::Place& place, int gid, int local_rank,
int local_size, int gloo_rank, int gloo_size,
bool with_switch, std::string switch_endpoints);
bool with_switch, std::string switch_endpoints,
int src_rank, int dst_rank);

const std::string GetBackendName() const override {
return std::string(HETER_BACKEND_NAME);
Expand All @@ -97,6 +98,12 @@ class ProcessGroupHeter : public ProcessGroup {
std::vector<phi::DenseTensor>&, std::vector<phi::DenseTensor>&,
const BroadcastOptions& = BroadcastOptions()) override;

std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>& in_tensors, int peer) override;

std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& out_tensors, int peer) override;

protected:
virtual std::shared_ptr<ProcessGroupHeter::HeterTask> CreateTask(
int rank, CommType opType, const std::vector<phi::DenseTensor>& inputs);
Expand All @@ -112,6 +119,10 @@ class ProcessGroupHeter : public ProcessGroup {
int gloo_size_;
bool with_switch_;
std::string switch_endpoint_;
int src_rank_;
int dst_rank_;
static int send_count;
static int recv_count;
};

} // namespace distributed
Expand Down
10 changes: 6 additions & 4 deletions paddle/fluid/distributed/collective/reducer.cc
Expand Up @@ -447,10 +447,12 @@ void EagerReducer::TraverseBackwardGraph(const std::vector<Tensor> &outputs) {
while (!queue.empty()) {
egr::GradNodeBase *node = queue.front();
queue.pop();
const std::vector<std::vector<egr::Edge>> &edges = node->GetEdges();
for (size_t i = 0; i < edges.size(); i++) {
for (size_t j = 0; j < edges[i].size(); j++) {
const egr::Edge &edge = edges[i][j];
const paddle::small_vector<std::vector<egr::GradSlotMeta>,
egr::kSlotSmallVectorSize> &metas =
node->OutputMeta();
for (size_t i = 0; i < metas.size(); i++) {
for (size_t j = 0; j < metas[i].size(); j++) {
const egr::Edge &edge = metas[i][j].GetEdge();
auto next_node_shared = edge.GetMutableGradNode();
if (!next_node_shared || !next_node_shared.get()) {
continue;
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/distributed/ps/service/CMakeLists.txt
@@ -1,10 +1,15 @@
set(BRPC_SRCS ps_client.cc server.cc)
set_source_files_properties(${BRPC_SRCS})


if(WITH_HETERPS)

set(BRPC_DEPS brpc ssl crypto protobuf gflags glog zlib leveldb snappy gflags glog device_context rocksdb)

else()

set(BRPC_DEPS brpc ssl crypto protobuf gflags glog zlib leveldb snappy gflags glog device_context)

endif()

brpc_library(sendrecv_rpc SRCS
Expand Down
2 changes: 0 additions & 2 deletions paddle/fluid/distributed/ps/service/brpc_ps_client.cc 100644 → 100755
Expand Up @@ -55,8 +55,6 @@ DEFINE_int32(pserver_sparse_merge_thread, 1, "pserver sparse merge thread num");
DEFINE_int32(pserver_sparse_table_shard_num, 1000,
"sparse table shard for save & load");

DEFINE_int32(heter_world_size, 100, "group size"); // 可配置

namespace paddle {
namespace framework {
class Scope;
Expand Down
17 changes: 12 additions & 5 deletions paddle/fluid/distributed/ps/service/heter_client.cc 100644 → 100755
Expand Up @@ -17,10 +17,14 @@
#include "paddle/fluid/framework/convert_utils.h"
#include "paddle/fluid/platform/profiler.h"

DEFINE_int32(heter_world_size, 100, "group size"); // group max size
DEFINE_int32(switch_send_recv_timeout_s, 600, "switch_send_recv_timeout_s");

namespace paddle {
namespace distributed {

std::shared_ptr<HeterClient> HeterClient::s_instance_ = nullptr;
std::mutex HeterClient::mtx_;
std::shared_ptr<HeterClient> HeterClient::switch_s_instance_ = nullptr;

int GetMicroId(const platform::DeviceContext& ctx,
const framework::Scope* scope) {
Expand Down Expand Up @@ -222,6 +226,7 @@ int HeterClient::Send(const platform::DeviceContext& ctx,
distributed::MultiVarMsg request;
// 1. set req message_name(string)
request.set_message_name(message_name);
request.set_group_id(0);

// 2. set req send_var_names(<string>)
for (auto& send_var_name : send_var_names) {
Expand Down Expand Up @@ -263,7 +268,7 @@ int HeterClient::Send(const platform::DeviceContext& ctx,
}

int HeterClient::Send(int group_id, const std::vector<std::string>& var_names,
const std::vector<int>& vars_len, void* data_ptr,
const std::vector<int64_t>& vars_size, void* data_ptr,
int64_t data_size) {
OnHeterRpcDone* closure = new OnHeterRpcDone([](void* done) {
auto* closure = reinterpret_cast<OnHeterRpcDone*>(done);
Expand All @@ -282,7 +287,7 @@ int HeterClient::Send(int group_id, const std::vector<std::string>& var_names,
for (auto& send_var_name : var_names) {
request.add_send_var_names(send_var_name);
}
for (auto var_len : vars_len) {
for (auto var_len : vars_size) {
request.add_vars_len(var_len);
}
auto& request_buffer = closure->cntl.request_attachment();
Expand All @@ -301,6 +306,7 @@ int HeterClient::Send(int group_id, const std::vector<std::string>& var_names,
::paddle::distributed::PsService_Stub stub(channel);
stub.SendToSwitch(&closure->cntl, &request, &closure->ps_response, closure);
fut.wait();
delete closure;
return 0;
}

Expand All @@ -325,6 +331,7 @@ int HeterClient::Recv(const platform::DeviceContext& ctx,
distributed::MultiVarMsg request;
// 1. set req message_name(string)
request.set_message_name(message_name);
request.set_group_id(0);

// 2. set req recv_var_names(<string>)
for (auto& recv_var_name : recv_var_names) {
Expand Down Expand Up @@ -396,8 +403,8 @@ int HeterClient::Recv(int group_id, const std::vector<std::string>& var_names,
// save in worker
auto& res_io_buffer = closure->cntl.response_attachment();
butil::IOBufBytesIterator io_buffer_itr(res_io_buffer);
io_buffer_itr.copy_and_forward(reinterpret_cast<void*>(data_ptr),
data_size * sizeof(float));
io_buffer_itr.copy_and_forward(reinterpret_cast<void*>(data_ptr), data_size);
delete closure;
VLOG(4) << "Recv done";
return 0;
}
Expand Down

1 comment on commit 46e5362

@paddle-bot-old
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🕵️ CI failures summary

🔍 PR: #42805 Commit ID: 46e5362 contains failed CI.

🔹 Failed: PR-CI-GpuPS

Unknown Failed
Unknown Failed

Please sign in to comment.