Skip to content

Commit

Permalink
Rework RABIT.
Browse files Browse the repository at this point in the history
get host name.

send port.

all gather.

assert.

op.

begin work on bootstrap.

utilities.

work on bootstrap.

move listener.

catch.

comm.

async send.

block.

block.

Start work on async.

batch poll.

tests.

move.

Start working on tracker.

better tests.

work on tracker.

bind.

work on accepting workers.

complete allgather.

Move.

Send with JSON.

work on shutdown.

msg.

compare task.

rename.

Move.

cleanup.

Start work on broadcast.

Work comm.

Cleanup bootstrap.

hide.

Move to bootstrap.

non blocking.

cleanup.

any op.

shift.

cleanup.

Cleanup.

per-thread.

checks.

start working on nccl.

backend.

Get the prototype compile.

log.

test print.

timeout on connection.

get nccl allreduce basic.

look into federated.

proto.

scatter reduce.

allreduce prototype.

Work on tests.

cleanup.

Initialization.

Init.

Work on Python.

get args.

Start working on allgatherv.

convert some allreduce.

remove some old use.

remove cpu impl.

work on gpu.

play with dlopen.

convert.

convert.

convert.

placeholder.

backend.

work on federated.

remove.

Move.

Federated tracker.

Move.

move into comm.

GPU variant.

not just nccl.

fix.

fix.

Convert.

bitwise.

stream.

copying allgather.

replace.

Remove.

remove.

Remove device.

Remove rabit.

Remove rabit.

cmake.

tests.

use gmock.

Move.

Split.

init.

Extract.

compiler.

test timeout.

exc.

comments.

Tests for federated.

Remove.

remove.

Split up.

refactor tests.

format.

extract magic number.

Extract more commands.

refactor.

Remove.

Reduce dependency on c api.

remove old code.

throw.

coll error.

indirect.

look into dask module.

parameters.

command.

probing.

listen for error.

debug.

host.

cleanup.

dask.

loop.

working basic.

header.

guard.

test.

type.

socket.

cleanup & notes.

use a state machine.

work on tests.

header.

test channel.

cleanup.

cleanup broadcast.

unneeded changes.

allgather string.

Fixes.

cleanup rebase.

fixes after rebase.

split up nccl comm.

Move data copying.

allgatherv test.

Extract.

tests.

test allreduce.

remove the use of ctx.

tests.

rebase.

work on fed.

work on allgatherv.

name.

lint.

Split.

split.

remove gmock.

move.

CPU.

CUDA.

compile.

Cleanup.

header.

Work on tests.

checks.

fixes.

tests.

work on CUDA test.

comm.

Share the implementation.

tests.

cleanup.

cleanup.

cleanup

cleanup.

set device.

cleanup.

cleanup.

more.

cleanup.

Get it work.

wait.

revert dask changes.

time.

remove reference to encoder.

extract.

extract.

split up the training function.

Fix.

deterministic.

Fix.

debug.

Fixes.

remove.

cleanup.

fix.

Move worker env.

cleanup.

cleanup.

wait.

cleanup.

extract error handling.

get abort to work as well.

Move.

policy.

cleanups.

cleanup.

Split up.

doc.

Cleanup ctor.

tests.

tests.

tests.

configuration.

tests.

task id.

start working on metric tests.

Remove.

type.

agg.

fix seq.

tests.

start working on cuda test.

type.

fixes.

tests.

Use device ord.

Remove auc.

remove elementwise.

remove multi-class

cleanup aft.

cleanup ranking.

remove old tests.

headers.

Move.

move.

single gpu tests.

Cleanup C API.

unknown.

C API.

Small cleanup.

cleanup.

Fix.

cleanup.

work on async queue.

work on sync.

Use blocking op.

result.

Fuzzing.

result.

Remove coll error.

Move.

cleanup.

cleanup.

cleanup.

cleanup.

cleanup.

cleanup.

cleanup.

Fix removal.

test

lint.

remove.

cleanup.

cleanup.

cleanup.

invoke result.

note.

Fix rebase.

Fix rebase.

fix & cleanup.

Fix.

remove coll error for now.

cleanup.

replace.

replace.

replace.

test basic.

cleanup, fix.

deduced size.

cleanup.

Convert to new routines.

Fixes.

Fix.

Add test.

use vector.

cleanup.

safe coll.

Fix.

Fix.

Fix.

Fix.

cleanup.

cleanup.

Don't throw.

Fixes.

v6

timeout.

Cleanups.

Remove error handling for now.

Timeout.

syc.

mac.

cli.

remove.

types.

federated.

build.

build.

sortby.

build.
  • Loading branch information
trivialfis committed Mar 15, 2024
1 parent 53fc175 commit 834e598
Show file tree
Hide file tree
Showing 159 changed files with 2,068 additions and 6,704 deletions.
3 changes: 0 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,6 @@ if(MSVC)
endif()
endif()

# rabit
add_subdirectory(rabit)

# core xgboost
add_subdirectory(${xgboost_SOURCE_DIR}/src)
target_link_libraries(objxgboost PUBLIC dmlc)
Expand Down
9 changes: 2 additions & 7 deletions R-package/src/Makevars.in
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,9 @@ OBJECTS= \
$(PKGROOT)/src/collective/allreduce.o \
$(PKGROOT)/src/collective/broadcast.o \
$(PKGROOT)/src/collective/comm.o \
$(PKGROOT)/src/collective/comm_group.o \
$(PKGROOT)/src/collective/coll.o \
$(PKGROOT)/src/collective/communicator-inl.o \
$(PKGROOT)/src/collective/tracker.o \
$(PKGROOT)/src/collective/communicator.o \
$(PKGROOT)/src/collective/in_memory_communicator.o \
$(PKGROOT)/src/collective/in_memory_handler.o \
$(PKGROOT)/src/collective/loop.o \
$(PKGROOT)/src/collective/socket.o \
Expand All @@ -132,7 +130,4 @@ OBJECTS= \
$(PKGROOT)/src/common/version.o \
$(PKGROOT)/src/c_api/c_api.o \
$(PKGROOT)/src/c_api/c_api_error.o \
$(PKGROOT)/amalgamation/dmlc-minimum0.o \
$(PKGROOT)/rabit/src/engine.o \
$(PKGROOT)/rabit/src/rabit_c_api.o \
$(PKGROOT)/rabit/src/allreduce_base.o
$(PKGROOT)/amalgamation/dmlc-minimum0.o
9 changes: 2 additions & 7 deletions R-package/src/Makevars.win
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,9 @@ OBJECTS= \
$(PKGROOT)/src/collective/allreduce.o \
$(PKGROOT)/src/collective/broadcast.o \
$(PKGROOT)/src/collective/comm.o \
$(PKGROOT)/src/collective/comm_group.o \
$(PKGROOT)/src/collective/coll.o \
$(PKGROOT)/src/collective/communicator-inl.o \
$(PKGROOT)/src/collective/tracker.o \
$(PKGROOT)/src/collective/communicator.o \
$(PKGROOT)/src/collective/in_memory_communicator.o \
$(PKGROOT)/src/collective/in_memory_handler.o \
$(PKGROOT)/src/collective/loop.o \
$(PKGROOT)/src/collective/socket.o \
Expand All @@ -132,7 +130,4 @@ OBJECTS= \
$(PKGROOT)/src/common/version.o \
$(PKGROOT)/src/c_api/c_api.o \
$(PKGROOT)/src/c_api/c_api_error.o \
$(PKGROOT)/amalgamation/dmlc-minimum0.o \
$(PKGROOT)/rabit/src/engine.o \
$(PKGROOT)/rabit/src/rabit_c_api.o \
$(PKGROOT)/rabit/src/allreduce_base.o
$(PKGROOT)/amalgamation/dmlc-minimum0.o
4 changes: 3 additions & 1 deletion cmake/Utils.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ function(xgboost_set_cuda_flags target)
$<$<COMPILE_LANGUAGE:CUDA>:--expt-extended-lambda>
$<$<COMPILE_LANGUAGE:CUDA>:--expt-relaxed-constexpr>
$<$<COMPILE_LANGUAGE:CUDA>:-Xcompiler=${OpenMP_CXX_FLAGS}>
$<$<COMPILE_LANGUAGE:CUDA>:-Xfatbin=-compress-all>)
$<$<COMPILE_LANGUAGE:CUDA>:-Xfatbin=-compress-all>
$<$<COMPILE_LANGUAGE:CUDA>:--brief-diagnostics=false>)

if(USE_PER_THREAD_DEFAULT_STREAM)
target_compile_options(${target} PRIVATE
Expand Down Expand Up @@ -151,6 +152,7 @@ function(xgboost_set_cuda_flags target)
target_include_directories(
${target} PRIVATE
${xgboost_SOURCE_DIR}/gputreeshap
${xgboost_SOURCE_DIR}/rabit/include
${CUDAToolkit_INCLUDE_DIRS})

if(MSVC)
Expand Down
2 changes: 1 addition & 1 deletion include/xgboost/collective/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ struct Result {

// We don't have monad, a simple helper would do.
template <typename Fn>
[[nodiscard]] Result operator<<(Result&& r, Fn&& fn) {
[[nodiscard]] std::enable_if_t<std::is_invocable_v<Fn>, Result> operator<<(Result&& r, Fn&& fn) {
if (!r.OK()) {
return std::forward<Result>(r);
}
Expand Down
33 changes: 22 additions & 11 deletions include/xgboost/collective/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,15 +449,26 @@ class TCPSocket {
return newsock;
}

[[nodiscard]] Result Accept(TCPSocket *out, SockAddrV4 *addr) {
struct sockaddr_in caddr;
socklen_t caddr_len = sizeof(caddr);
HandleT newfd = accept(Handle(), reinterpret_cast<sockaddr *>(&caddr), &caddr_len);
if (newfd == InvalidSocket()) {
return system::FailWithCode("Failed to accept.");
}
*addr = SockAddrV4{caddr};
*out = TCPSocket{newfd};
[[nodiscard]] Result Accept(TCPSocket *out, SockAddress *addr) {
if (this->Domain() == SockDomain::kV4) {
struct sockaddr_in caddr;
socklen_t caddr_len = sizeof(caddr);
HandleT newfd = accept(Handle(), reinterpret_cast<sockaddr *>(&caddr), &caddr_len);
if (newfd == InvalidSocket()) {
return system::FailWithCode("Failed to accept.");
}
*addr = SockAddress{SockAddrV4{caddr}};
*out = TCPSocket{newfd};
} else {
struct sockaddr_in6 caddr;
socklen_t caddr_len = sizeof(caddr);
HandleT newfd = accept(Handle(), reinterpret_cast<sockaddr *>(&caddr), &caddr_len);
if (newfd == InvalidSocket()) {
return system::FailWithCode("Failed to accept.");
}
*addr = SockAddress{SockAddrV6{caddr}};
*out = TCPSocket{newfd};
}
return Success();
}

Expand Down Expand Up @@ -621,9 +632,9 @@ class TCPSocket {
*/
std::size_t Send(StringView str);
/**
* \brief Receive string, format is matched with the Python socket wrapper in RABIT.
* @brief Receive string, format is matched with the Python socket wrapper in RABIT.
*/
std::size_t Recv(std::string *p_str);
[[nodiscard]] Result Recv(std::string *p_str);
/**
* \brief Close the socket, called automatically in destructor if the socket is not closed.
*/
Expand Down
20 changes: 1 addition & 19 deletions plugin/federated/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,13 @@ protobuf_generate(
PLUGIN "protoc-gen-grpc=\$<TARGET_FILE:gRPC::grpc_cpp_plugin>"
PROTOC_OUT_DIR "${PROTO_BINARY_DIR}")

add_library(federated_old_proto STATIC federated.old.proto)
target_link_libraries(federated_old_proto PUBLIC protobuf::libprotobuf gRPC::grpc gRPC::grpc++)
target_include_directories(federated_old_proto PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
xgboost_target_properties(federated_old_proto)

protobuf_generate(
TARGET federated_old_proto
LANGUAGE cpp
PROTOC_OUT_DIR "${PROTO_BINARY_DIR}")
protobuf_generate(
TARGET federated_old_proto
LANGUAGE grpc
GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc
PLUGIN "protoc-gen-grpc=\$<TARGET_FILE:gRPC::grpc_cpp_plugin>"
PROTOC_OUT_DIR "${PROTO_BINARY_DIR}")

# Wrapper for the gRPC client.
add_library(federated_client INTERFACE)
target_sources(federated_client INTERFACE federated_client.h)
target_link_libraries(federated_client INTERFACE federated_proto)
target_link_libraries(federated_client INTERFACE federated_old_proto)

# Rabit engine for Federated Learning.
target_sources(
objxgboost PRIVATE federated_tracker.cc federated_server.cc federated_comm.cc federated_coll.cc
objxgboost PRIVATE federated_tracker.cc federated_comm.cc federated_coll.cc
)
if(USE_CUDA)
target_sources(objxgboost PRIVATE federated_comm.cu federated_coll.cu)
Expand Down
81 changes: 0 additions & 81 deletions plugin/federated/federated.old.proto

This file was deleted.

132 changes: 0 additions & 132 deletions plugin/federated/federated_client.h

This file was deleted.

6 changes: 1 addition & 5 deletions plugin/federated/federated_coll.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,7 @@ Coll *FederatedColl::MakeCUDAVar() {

[[nodiscard]] Result FederatedColl::Broadcast(Comm const &comm, common::Span<std::int8_t> data,
std::int32_t root) {
if (comm.Rank() == root) {
return BroadcastImpl(comm, &this->sequence_number_, data, root);
} else {
return BroadcastImpl(comm, &this->sequence_number_, data, root);
}
return BroadcastImpl(comm, &this->sequence_number_, data, root);
}

[[nodiscard]] Result FederatedColl::Allgather(Comm const &comm, common::Span<std::int8_t> data,
Expand Down
Loading

0 comments on commit 834e598

Please sign in to comment.