Skip to content

Commit

Permalink
Merge From Develop
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanRisheng committed Nov 1, 2021
2 parents 16e6bf1 + fe81306 commit dfec7a0
Show file tree
Hide file tree
Showing 418 changed files with 28,313 additions and 8,643 deletions.
7 changes: 5 additions & 2 deletions cmake/external/boost.cmake
Expand Up @@ -23,7 +23,10 @@ set(BOOST_PROJECT "extern_boost")
# checked that the devtools package of CentOS 6 installs boost 1.41.0.
# So we use 1.41.0 here.
set(BOOST_VER "1.41.0")
set(BOOST_TAR "boost_1_41_0" CACHE STRING "" FORCE)
# boost_1_41_0_2021_10.tar.gz is almost the same with boost_1_41_0.tar.gz,
# except in visualc.hpp i comment a warning of "unknown compiler version",
# so if you need to change boost, you may need to block the warning similarly.
set(BOOST_TAR "boost_1_41_0_2021_10" CACHE STRING "" FORCE)
set(BOOST_URL "http://paddlepaddledeps.bj.bcebos.com/${BOOST_TAR}.tar.gz" CACHE STRING "" FORCE)

MESSAGE(STATUS "BOOST_VERSION: ${BOOST_VER}, BOOST_URL: ${BOOST_URL}")
Expand All @@ -46,7 +49,7 @@ ExternalProject_Add(
${BOOST_PROJECT}
${EXTERNAL_PROJECT_LOG_ARGS}
"${BOOST_DOWNLOAD_CMD}"
URL_MD5 f891e8c2c9424f0565f0129ad9ab4aff
URL_MD5 51be7cc203628dc0848e97eee32d79e3
PREFIX ${BOOST_PREFIX_DIR}
DOWNLOAD_DIR ${BOOST_SOURCE_DIR}
SOURCE_DIR ${BOOST_SOURCE_DIR}
Expand Down
2 changes: 1 addition & 1 deletion cmake/external/cinn.cmake
Expand Up @@ -27,7 +27,7 @@ add_definitions(-w)
include(ExternalProject)
set(CINN_SOURCE_DIR ${THIRD_PARTY_PATH}/CINN)
# TODO(zhhsplendid): Modify git tag after we have release tag
set(CINN_GIT_TAG e422c01b7875301996a2baf67a14ba61b0e6192a)
set(CINN_GIT_TAG 2122413fc74f4020ff4397b54488a793529d581b)
set(CINN_OPTIONAL_ARGS -DPY_VERSION=${PY_VERSION} -DWITH_CUDA=${WITH_GPU} -DWITH_CUDNN=${WITH_GPU} -DPUBLISH_LIBS=ON -DWITH_TESTING=ON)
set(CINN_BUILD_COMMAND $(MAKE) cinnapi -j)
ExternalProject_Add(
Expand Down
2 changes: 1 addition & 1 deletion cmake/external/xpu.cmake
Expand Up @@ -35,7 +35,7 @@ ELSE ()
ENDIF()

SET(XPU_BASE_URL_WITHOUT_DATE "https://baidu-kunlun-product.cdn.bcebos.com/KL-SDK/klsdk-dev")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20210921")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20211020")
SET(XPU_XRE_URL "${XPU_BASE_URL}/${XPU_XRE_DIR_NAME}.tar.gz" CACHE STRING "" FORCE)
SET(XPU_XDNN_URL "${XPU_BASE_URL}/${XPU_XDNN_DIR_NAME}.tar.gz" CACHE STRING "" FORCE)
SET(XPU_XCCL_URL "${XPU_BASE_URL_WITHOUT_DATE}/20210623/${XPU_XCCL_DIR_NAME}.tar.gz" CACHE STRING "" FORCE)
Expand Down
6 changes: 4 additions & 2 deletions cmake/operators.cmake
Expand Up @@ -185,6 +185,7 @@ function(op_library TARGET)
list(REMOVE_ITEM hip_srcs "cholesky_op.cu")
list(REMOVE_ITEM hip_srcs "matrix_rank_op.cu")
list(REMOVE_ITEM hip_srcs "svd_op.cu")
list(REMOVE_ITEM hip_srcs "eigvalsh_op.cu")
list(REMOVE_ITEM hip_srcs "qr_op.cu")
list(REMOVE_ITEM hip_srcs "eigh_op.cu")
list(REMOVE_ITEM hip_srcs "multinomial_op.cu")
Expand Down Expand Up @@ -217,7 +218,8 @@ function(op_library TARGET)
"fusion_transpose_flatten_concat_op" "fusion_conv_inception_op"
"sync_batch_norm_op" "sparse_attention_op" "dgc_op" "fused_fc_elementwise_layernorm_op"
"skip_layernorm_op" "multihead_matmul_op" "fusion_group_op" "fused_bn_activation_op" "fused_embedding_eltwise_layernorm_op" "fusion_gru_op" "fusion_lstm_op"
"fused_bn_add_activation_op" "fused_attention_op" "resnet_unit_op")
"fused_bn_add_activation_op" "fused_attention_op" "resnet_unit_op" "fused_feedforward_op")

if ("${TARGET}" STREQUAL "${manual_pybind_op}")
set(pybind_flag 1)
endif()
Expand Down Expand Up @@ -298,7 +300,7 @@ function(op_library TARGET)
file(APPEND ${pybind_file} "USE_OP_DEVICE_KERNEL(${TARGET}, CUDNN);\n")
endif()

if (WITH_XPU AND ${xpu_cc_srcs_len} GREATER 0)
if (WITH_XPU AND ${pybind_flag} EQUAL 0 AND ${xpu_cc_srcs_len} GREATER 0)
file(APPEND ${pybind_file} "USE_OP_DEVICE_KERNEL(${TARGET}, XPU);\n")
endif()

Expand Down
1 change: 1 addition & 0 deletions cmake/third_party.cmake
Expand Up @@ -363,6 +363,7 @@ endif (WITH_LITE)
if (WITH_CINN)
message(STATUS "Compile Paddle with CINN.")
include(external/cinn)
add_definitions(-DPADDLE_WITH_CINN)
endif (WITH_CINN)

if (WITH_CRYPTO)
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/CMakeLists.txt
Expand Up @@ -11,6 +11,7 @@ if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
"${DISTRIBUTE_COMPILE_FLAGS} -faligned-new")
endif()

add_subdirectory(common)
add_subdirectory(service)
add_subdirectory(table)
add_subdirectory(test)
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/distributed/common/CMakeLists.txt
@@ -0,0 +1,4 @@

cc_library(afs_wrapper SRCS afs_warpper.cc DEPS fs ps_framework_proto)

#set_property(GLOBAL PROPERTY COMMON_DEPS afs_warpper)
89 changes: 89 additions & 0 deletions paddle/fluid/distributed/common/afs_warpper.cc
@@ -0,0 +1,89 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/common/afs_warpper.h"
#include "paddle/fluid/framework/io/fs.h"

namespace paddle {
namespace distributed {
// AfsClient impl
int AfsClient::initialize(const FsClientParameter& fs_client_param) {
// temporarily implemented with hdfs-client
return initialize(fs_client_param.hadoop_bin(), fs_client_param.uri(),
fs_client_param.user(), fs_client_param.passwd(),
fs_client_param.buffer_size());
}
int AfsClient::initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& user, const std::string& passwd,
int buffer_size_param) {
return initialize(hadoop_bin, uri, paddle::string::format_string(
"%s,%s", user.c_str(), passwd.c_str()),
buffer_size_param);
}
int AfsClient::initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& ugi, int buffer_size_param) {
// temporarily implemented with hdfs-client
size_t buffer_size = 1L << 25; // 32MB
if (buffer_size_param > static_cast<int>(buffer_size)) {
buffer_size = buffer_size_param;
}
paddle::framework::hdfs_set_buffer_size(buffer_size);
paddle::framework::hdfs_set_command(paddle::string::format_string(
"2>>./hdfs_err.log %s fs -Dfs.default.name=%s -Dhadoop.job.ugi=%s "
"-Ddfs.client.block.write.retries=15 -Ddfs.rpc.timeout=300000",
hadoop_bin.c_str(), uri.c_str(), ugi.c_str()));
return 0;
}

// open file in 'w' or 'r'
std::shared_ptr<FsReadChannel> AfsClient::open_r(const FsChannelConfig& config,
uint32_t buffer_size,
int* err_no) {
std::shared_ptr<FsReadChannel> channel =
std::make_shared<FsReadChannel>(buffer_size);
std::shared_ptr<FILE> fp =
paddle::framework::fs_open_read(config.path, err_no, config.deconverter);
channel->open(fp, config);
return channel;
}
std::shared_ptr<FsWriteChannel> AfsClient::open_w(const FsChannelConfig& config,
uint32_t buffer_size,
int* err_no) {
std::shared_ptr<FsWriteChannel> channel =
std::make_shared<FsWriteChannel>(buffer_size);
std::shared_ptr<FILE> fp =
paddle::framework::fs_open_write(config.path, err_no, config.converter);
channel->open(fp, config);
return channel;
}

// remove file in path, path maybe a reg, such as 'part-000-*'
void AfsClient::remove(const std::string& path) {
return paddle::framework::fs_remove(path);
}
void AfsClient::remove_dir(const std::string& dir) {
return paddle::framework::fs_remove(dir);
}

// list files in path, path maybe a dir with reg
std::vector<std::string> AfsClient::list(const std::string& path) {
return paddle::framework::fs_list(path);
}

// exist or not
bool AfsClient::exist(const std::string& dir) {
return paddle::framework::fs_exists(dir);
}
} // namespace distributed
} // namespace paddle
156 changes: 156 additions & 0 deletions paddle/fluid/distributed/common/afs_warpper.h
@@ -0,0 +1,156 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/string/string_helper.h"

namespace paddle {
namespace distributed {
struct FsDataConverter {
std::string converter;
std::string deconverter;
};

struct FsChannelConfig {
std::string path; // path of file
std::string converter; // data converter
std::string deconverter;
};

class FsReadChannel {
public:
FsReadChannel() : _buffer_size(0) {}
explicit FsReadChannel(uint32_t buffer_size) : _buffer_size(buffer_size) {}
virtual ~FsReadChannel() {}
FsReadChannel(FsReadChannel&&) = delete;
FsReadChannel(const FsReadChannel&) = delete;
int open(std::shared_ptr<FILE> fp, const FsChannelConfig& config) {
_file = fp;
return 0;
}
inline int close() {
_file.reset();
return 0;
}

inline uint32_t read_line(std::string& line_data) { // NOLINT
line_data.clear();
char buffer = '\0';
size_t read_count = 0;
while (1 == fread(&buffer, 1, 1, _file.get()) && buffer != '\n') {
++read_count;
line_data.append(&buffer, 1);
}
if (read_count == 0 && buffer != '\n') {
return -1;
}
return 0;
}

private:
uint32_t _buffer_size;
FsChannelConfig _config;
std::shared_ptr<FILE> _file;
};
class FsWriteChannel {
public:
FsWriteChannel() : _buffer_size(0) {}
explicit FsWriteChannel(uint32_t buffer_size) : _buffer_size(buffer_size) {}
virtual ~FsWriteChannel() {}
FsWriteChannel(FsWriteChannel&&) = delete;
FsWriteChannel(const FsWriteChannel&) = delete;

int open(std::shared_ptr<FILE> fp, const FsChannelConfig& config) {
_file = fp;

// the buffer has set in fs.cc
// if (_buffer_size != 0) {
// _buffer = std::shared_ptr<char>(new char[_buffer_size]);

// CHECK(0 == setvbuf(&*_file, _buffer.get(), _IOFBF, _buffer_size));
//}
return 0;
}

inline void flush() { return; }

inline int close() {
flush();
_file.reset();
return 0;
}

inline uint32_t write_line(const char* data, uint32_t size) {
size_t write_count = fwrite_unlocked(data, 1, size, _file.get());
if (write_count != size) {
return -1;
}
write_count = fwrite_unlocked("\n", 1, 1, _file.get());
if (write_count != 1) {
return -1;
}
return 0;
}
inline uint32_t write_line(const std::string& data) {
return write_line(data.c_str(), data.size());
}

private:
uint32_t _buffer_size;
FsChannelConfig _config;
std::shared_ptr<FILE> _file;
std::shared_ptr<char> _buffer;
};

class AfsClient {
public:
AfsClient() {}
virtual ~AfsClient() {}
AfsClient(AfsClient&&) = delete;
AfsClient(const AfsClient&) = delete;

int initialize(const FsClientParameter& fs_client_param);
int initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& user, const std::string& passwd,
int buffer_size_param = (1L << 25));
int initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& ugi, int buffer_size_param = (1L << 25));

// open file in 'w' or 'r'
std::shared_ptr<FsReadChannel> open_r(const FsChannelConfig& config,
uint32_t buffer_size = 0,
int* err_no = nullptr);
std::shared_ptr<FsWriteChannel> open_w(const FsChannelConfig& config,
uint32_t buffer_size = 0,
int* err_no = nullptr);

// remove file in path, path maybe a reg, such as 'part-000-*'
void remove(const std::string& path);
void remove_dir(const std::string& dir);

// list files in path, path maybe a dir with reg
std::vector<std::string> list(const std::string& path);

// exist or not
bool exist(const std::string& dir);
};
} // namespace distributed
} // namespace paddle

1 comment on commit dfec7a0

@paddle-bot-old
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congratulation! Your pull request passed all required CI. You could ask reviewer(s) to approve and merge. 🎉

Please sign in to comment.