Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ps lite #409

Merged
merged 3 commits into from Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion 3rdparty/ps-lite
Submodule ps-lite updated 50 files
+6 −0 .clang-format
+5 −3 .github/workflows/main.yml
+7 −0 .gitignore
+10 −0 Makefile
+29 −4 README.md
+0 −0 docs/efa.md
+1 −0 include/ps/base.h
+28 −16 include/ps/internal/assign_op.h
+19 −14 include/ps/internal/customer.h
+6 −6 include/ps/internal/env.h
+116 −39 include/ps/internal/message.h
+150 −30 include/ps/internal/postoffice.h
+178 −0 include/ps/internal/spsc_queue.h
+69 −14 include/ps/internal/threadsafe_queue.h
+8 −8 include/ps/internal/utils.h
+41 −9 include/ps/internal/van.h
+257 −151 include/ps/kv_app.h
+168 −12 include/ps/ps.h
+2 −1 include/ps/range.h
+133 −42 include/ps/sarray.h
+29 −18 include/ps/simple_app.h
+32 −21 src/customer.cc
+82 −74 src/fabric_transport.h
+21 −27 src/fabric_utils.h
+114 −100 src/fabric_van.h
+23 −5 src/meta.h
+285 −0 src/multi_van.h
+71 −57 src/network_utils.h
+125 −39 src/postoffice.cc
+142 −91 src/rdma_transport.h
+25 −29 src/rdma_utils.h
+168 −150 src/rdma_van.h
+12 −10 src/resender.h
+928 −404 src/ucx_van.h
+325 −154 src/van.cc
+16 −15 src/van_common.h
+5 −5 src/windows/unistd.h
+105 −54 src/zmq_van.h
+0 −7 tests/README.md
+0 −37 tests/local.sh
+0 −41 tests/local_multi_workers.sh
+0 −18 tests/repeat.sh
+163 −0 tests/test.sh
+340 −77 tests/test_benchmark.cc
+492 −0 tests/test_benchmark_stress.cc
+7 −3 tests/test_ipc_benchmark.cc
+0 −319 tests/test_sparse_benchmark.cc
+65 −0 tests/test_stress.sh
+14 −0 tools/check_diff.sh
+10 −0 tools/format_code.sh
3 changes: 2 additions & 1 deletion MANIFEST.in
@@ -1,7 +1,8 @@
include */* LICENSE byteps.lds byteps.exp
recursive-include * *.cc *.h
prune .git
prune dist
recursive-include * *.cc *.h
prune bin
prune __pycache__
prune 3rdparty
graft 3rdparty/ps-lite
Expand Down
6 changes: 3 additions & 3 deletions byteps/common/global.cc
Expand Up @@ -286,8 +286,8 @@ ps::KVWorker<char>* BytePSGlobal::GetOrInitPS() {
if (!_ps && IsDistributed() &&
_my_role == BytePSRole::LOCAL_ROOT) { // only the root needs networking
// init low-level ps implementation
_ps = new ps::KVWorker<char>(0, 0);
ps::StartAsync(0, "byteps\0");
ps::StartPS(0, ps::Node::WORKER, -1, true, "byteps\0");
_ps = new ps::KVWorker<char>(0, 0, 0);
if (BytePSGlobal::IsResuming() || !ps::Postoffice::Get()->is_recovery()) {
ps::Postoffice::Get()->Barrier(
0, ps::kWorkerGroup + ps::kServerGroup + ps::kScheduler);
Expand Down Expand Up @@ -344,7 +344,7 @@ void BytePSGlobal::Shutdown() {

if (_ps) {
// shutdown _ps and wait for the completion acks of other workers/servers
ps::Finalize(0, true);
ps::Finalize(0, ps::Node::WORKER, true);
delete _ps;
_ps = NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion byteps/common/shared_memory.cc
Expand Up @@ -31,7 +31,7 @@ void* BytePSSharedMemory::openSharedMemory(const std::string& prefix,
std::string shm_name(prefix);
shm_name += std::to_string(key);
int shm_fd = shm_open(shm_name.c_str(), O_CREAT | O_RDWR, 0666);
BPS_CHECK_GE(shm_fd, 0) << "shm_open failed for " << shm_name;
BPS_CHECK_GE(shm_fd, 0) << "shm_open failed for " << shm_name << " " << strerror(errno);

BPS_CHECK_GE(ftruncate(shm_fd, size), 0) << strerror(errno);

Expand Down
18 changes: 15 additions & 3 deletions byteps/server/server.cc
Expand Up @@ -408,6 +408,18 @@ void init_global_env() {
// enable to print key profile
log_key_info_ = GetEnv("PS_KEY_LOG", 0);

std::string role_str = GetEnv("DMLC_ROLE", "server");
role_ = ps::GetRole(role_str);
if (role_str == std::string("server")) {
is_server_ = true;
preferred_rank = -1;
} else {
is_server_ = false;
preferred_rank = 0;
}

LOG(INFO) << "This is a " << role_str << " is_server=" << is_server_;

// enable engine block mode (default disabled)
is_engine_blocking_ = GetEnv("BYTEPS_SERVER_ENGINE_BLOCKING", 0);
if (is_engine_blocking_)
Expand Down Expand Up @@ -480,16 +492,16 @@ extern "C" void byteps_server() {
}

// init server instance
byteps_server_ = new KVServer<SERVER_DATA_TYPE>(0);
ps::StartPS(0, role_, preferred_rank, true, "byteps\0");
byteps_server_ = new KVServer<SERVER_DATA_TYPE>(0, false, 0);
byteps_server_->set_request_handle(BytePSHandler);
StartAsync(0, "byteps_server\0");
if (!Postoffice::Get()->is_recovery()) {
Postoffice::Get()->Barrier(
0, ps::kWorkerGroup + ps::kServerGroup + ps::kScheduler);
}

// clean the server resource
Finalize(0, true);
Finalize(0, role_, true);
if (byteps_server_) {
delete byteps_server_;
byteps_server_ = nullptr;
Expand Down
4 changes: 4 additions & 0 deletions byteps/server/server.h
Expand Up @@ -129,6 +129,10 @@ volatile bool sync_mode_ = true;
volatile bool debug_mode_ = false;
volatile bool enable_schedule_ = false;

ps::Node::Role role_;
int preferred_rank = -1;
volatile bool is_server_ = true;

// debug
uint64_t debug_key_;
std::mutex debug_mu_;
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -642,7 +642,7 @@ def get_nccl_vals():
nccl_include_dirs += ['%s/include' % nccl_home]
nccl_lib_dirs += ['%s/lib' % nccl_home, '%s/lib64' % nccl_home]

nccl_link_mode = os.environ.get('BYTEPS_NCCL_LINK', 'STATIC')
nccl_link_mode = os.environ.get('BYTEPS_NCCL_LINK', 'SHARED')
if nccl_link_mode.upper() == 'SHARED':
nccl_libs += ['nccl']
else:
Expand Down