Skip to content

Commit

Permalink
multi client launch (#5372)
Browse files Browse the repository at this point in the history
* add changes for multi dev demo

Signed-off-by: daquexian <daquexian566@gmail.com>

* add part of backward hook

Signed-off-by: daquexian <daquexian566@gmail.com>

* update

Signed-off-by: daquexian <daquexian566@gmail.com>

* add naive init_with_env

Signed-off-by: daquexian <daquexian566@gmail.com>

* update

Signed-off-by: daquexian <daquexian566@gmail.com>

* update

Signed-off-by: daquexian <daquexian566@gmail.com>

* support_multi_client

* update

Signed-off-by: daquexian <daquexian566@gmail.com>

* Remove unused code

* Fix multi client launch

* fix __main__ bug

* update abcd op

Signed-off-by: daquexian <daquexian566@gmail.com>

* fix multi client sync, make nccl instr ordered

Signed-off-by: daquexian <daquexian566@gmail.com>

* temp changes

Signed-off-by: daquexian <daquexian566@gmail.com>

* Use functional api instead of op_expr_helper::XXXOp.

* align with latest master, remove unused code

Signed-off-by: daquexian <daquexian566@gmail.com>

* local rank returns 0 when no env var, save is_multi_client in EnvDesc

Signed-off-by: daquexian <daquexian566@gmail.com>

* move is_multi_client to ProcessCtx, rename cuda_d2d device to nccl, remove unused code

Signed-off-by: daquexian <daquexian566@gmail.com>

* abcd -> return_first_input op

Signed-off-by: daquexian <daquexian566@gmail.com>

* remove launch.py for now

Signed-off-by: daquexian <daquexian566@gmail.com>

* refine

Signed-off-by: daquexian <daquexian566@gmail.com>

* update IsMultiClient in env_util.py

Signed-off-by: daquexian <daquexian566@gmail.com>

* rm multi_dev_demo.py

Signed-off-by: daquexian <daquexian566@gmail.com>

* remove exported functions in env_util.py

Signed-off-by: daquexian <daquexian566@gmail.com>

* remove unused op expr helper func

Signed-off-by: daquexian <daquexian566@gmail.com>

* fix bug

Signed-off-by: daquexian <daquexian566@gmail.com>

* remove ddp code

Signed-off-by: daquexian <daquexian566@gmail.com>

* refine env.init. only call env.init in init.py when multi client

Signed-off-by: daquexian <daquexian566@gmail.com>

* revert unrelated changes

Signed-off-by: daquexian <daquexian566@gmail.com>

* add missing parameter

Signed-off-by: daquexian <daquexian566@gmail.com>

* fix python api bug

Signed-off-by: daquexian <daquexian566@gmail.com>

* address comments

Signed-off-by: daquexian <daquexian566@gmail.com>

Co-authored-by: clackhan <han_binbin@163.com>
Co-authored-by: hjchen2 <chenhoujiangcug@gmail.com>
Co-authored-by: oneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
  • Loading branch information
4 people committed Jul 5, 2021
1 parent 49b95c0 commit 5806e2b
Show file tree
Hide file tree
Showing 19 changed files with 155 additions and 73 deletions.
2 changes: 1 addition & 1 deletion oneflow/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
def StartWorker(env_proto):
import oneflow._oneflow_internal

oneflow._oneflow_internal.InitEnv(env_proto)
oneflow._oneflow_internal.InitEnv(env_proto, False)


def main():
Expand Down
26 changes: 26 additions & 0 deletions oneflow/api/python/eager/multi_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <pybind11/pybind11.h>
#include "oneflow/api/python/of_api_registry.h"
#include "oneflow/core/vm/vm_util.h"

ONEFLOW_API_PYBIND11_MODULE("eager.multi_client", m) {
using namespace oneflow;
namespace py = pybind11;
m.def(
"Sync", []() { vm::MultiClientSync().GetOrThrow(); },
py::call_guard<py::gil_scoped_release>());
}
File renamed without changes.
3 changes: 3 additions & 0 deletions oneflow/api/python/env/env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ ONEFLOW_API_PYBIND11_MODULE("", m) {

m.def("GetRank", &GetRank);
m.def("GetWorldSize", &GetWorldSize);
m.def("GetNodeSize", &GetNodeSize);
m.def("GetLocalRank", &GetLocalRank);
m.def("IsMultiClient", &IsMultiClient);
}
17 changes: 12 additions & 5 deletions oneflow/api/python/env/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ inline Maybe<void> DestroyDefaultEnv() {

inline Maybe<void> DestroyEnv() {
if (Global<EnvGlobalObjectsScope>::Get() == nullptr) { return Maybe<void>::Ok(); }
if (GlobalProcessCtx::IsThisProcessMaster()) { ClusterInstruction::MasterSendHalt(); }
if (!GlobalProcessCtx::IsMultiClient()) {
if (GlobalProcessCtx::IsThisProcessMaster()) { ClusterInstruction::MasterSendHalt(); }
} else {
ClusterInstruction::HaltBarrier();
}
Global<EnvGlobalObjectsScope>::Delete();
return Maybe<void>::Ok();
}
Expand All @@ -70,12 +74,12 @@ inline Maybe<void> InitDefaultEnv(const std::string& env_proto_str) {
// Global<T>::New is not allowed to be called here
// because glog is not constructed yet and LOG(INFO) has bad bahavior
Global<EnvGlobalObjectsScope>::SetAllocated(new EnvGlobalObjectsScope());
JUST(Global<EnvGlobalObjectsScope>::Get()->Init(env_proto));
JUST(Global<EnvGlobalObjectsScope>::Get()->Init(env_proto, false));
if (!GlobalProcessCtx::IsThisProcessMaster()) { JUST(Cluster::WorkerLoop()); }
return Maybe<void>::Ok();
}

inline Maybe<void> InitEnv(const std::string& env_proto_str) {
inline Maybe<void> InitEnv(const std::string& env_proto_str, bool is_multi_client) {
EnvProto env_proto;
CHECK_OR_RETURN(TxtString2PbMessage(env_proto_str, &env_proto))
<< "failed to parse env_proto" << env_proto_str;
Expand All @@ -84,15 +88,18 @@ inline Maybe<void> InitEnv(const std::string& env_proto_str) {
// Global<T>::New is not allowed to be called here
// because glog is not constructed yet and LOG(INFO) has bad bahavior
Global<EnvGlobalObjectsScope>::SetAllocated(new EnvGlobalObjectsScope());
JUST(Global<EnvGlobalObjectsScope>::Get()->Init(env_proto));
if (!GlobalProcessCtx::IsThisProcessMaster()) { JUST(Cluster::WorkerLoop()); }
JUST(Global<EnvGlobalObjectsScope>::Get()->Init(env_proto, is_multi_client));
if (!GlobalProcessCtx::IsThisProcessMaster() && !is_multi_client) { JUST(Cluster::WorkerLoop()); }
return Maybe<void>::Ok();
}

inline Maybe<long long> CurrentMachineId() { return GlobalProcessCtx::Rank(); }

inline Maybe<int64_t> GetRank() { return GlobalProcessCtx::Rank(); }
inline Maybe<size_t> GetWorldSize() { return GlobalProcessCtx::WorldSize(); }
inline Maybe<size_t> GetNodeSize() { return GlobalProcessCtx::NodeSize(); }
inline Maybe<size_t> GetLocalRank() { return GlobalProcessCtx::LocalRank(); }
inline Maybe<bool> IsMultiClient() { return GlobalProcessCtx::IsMultiClient(); }

} // namespace oneflow

Expand Down
10 changes: 8 additions & 2 deletions oneflow/api/python/env/env_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ inline void EnableEagerEnvironment(bool enable_eager_execution) {

inline bool IsEnvInited() { return oneflow::IsEnvInited().GetOrThrow(); }

inline void InitEnv(const std::string& env_proto_str) {
return oneflow::InitEnv(env_proto_str).GetOrThrow();
inline void InitEnv(const std::string& env_proto_str, bool is_multi_client) {
return oneflow::InitEnv(env_proto_str, is_multi_client).GetOrThrow();
}

inline void InitDefaultEnv(const std::string& env_proto_str) {
Expand All @@ -44,4 +44,10 @@ inline int64_t GetRank() { return oneflow::GetRank().GetOrThrow(); }

inline size_t GetWorldSize() { return oneflow::GetWorldSize().GetOrThrow(); }

inline size_t GetNodeSize() { return oneflow::GetNodeSize().GetOrThrow(); }

inline size_t GetLocalRank() { return oneflow::GetLocalRank().GetOrThrow(); }

inline bool IsMultiClient() { return oneflow::IsMultiClient().GetOrThrow(); }

#endif // ONEFLOW_API_PYTHON_ENV_ENV_API_H_
2 changes: 1 addition & 1 deletion oneflow/core/control/ctrl_bootstrap.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ message ProcessCtx {
repeated Address ctrl_addr = 1;
required int64 rank = 2;
required int64 node_size = 3;
required bool is_multi_client = 4;
}

message BootstrapConf {
Expand All @@ -19,7 +20,6 @@ message BootstrapConf {
optional string host = 4;
optional int32 ctrl_port = 5 [default = -1];
optional int64 node_size = 6 [default = -1];
optional NumProcessPerNode num_process_per_node = 7;
}

message NumProcessPerNode {
Expand Down
3 changes: 2 additions & 1 deletion oneflow/core/job/env_global_objects_scope.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ Resource GetDefaultResource(const EnvProto& env_proto) {

} // namespace

Maybe<void> EnvGlobalObjectsScope::Init(const EnvProto& env_proto) {
Maybe<void> EnvGlobalObjectsScope::Init(const EnvProto& env_proto, bool is_multi_client) {
is_default_physical_env_ = env_proto.is_default_physical_env();
InitLogging(env_proto.cpp_logging_conf(), JUST(is_default_physical_env_));
#ifdef WITH_CUDA
InitGlobalCudaDeviceProp();
#endif
Global<EnvDesc>::New(env_proto);
Global<ProcessCtx>::New();
Global<ProcessCtx>::Get()->set_is_multi_client(is_multi_client);
// Avoid dead lock by using CHECK_JUST instead of JUST. because it maybe be blocked in
// ~CtrlBootstrap.
if (Global<ResourceDesc, ForSession>::Get()->enable_dry_run()) {
Expand Down
2 changes: 1 addition & 1 deletion oneflow/core/job/env_global_objects_scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class EnvGlobalObjectsScope final {
EnvGlobalObjectsScope() : is_default_physical_env_(Error::ValueError("Not initialized")) {}
~EnvGlobalObjectsScope();

Maybe<void> Init(const EnvProto& env_proto);
Maybe<void> Init(const EnvProto& env_proto, bool is_multi_client);
const Maybe<bool>& is_default_physical_env() const { return is_default_physical_env_; }

const std::shared_ptr<const ParallelDesc>& MutParallelDesc4Device(const Device& device);
Expand Down
2 changes: 2 additions & 0 deletions oneflow/core/rpc/include/global_process_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ namespace oneflow {
struct GlobalProcessCtx {
static void GetCurrentMachineIdAndDeviceId(int64_t* machine_id, int64_t* device_id);
static int64_t Rank();
static int64_t LocalRank();
static int64_t NodeSize();
static int64_t ThisNodeId();
static int64_t NumOfProcessPerNode();
static bool IsThisProcessMaster();
static bool IsMultiClient();
static size_t WorldSize();
static std::string LogDirEntry();
};
Expand Down
14 changes: 14 additions & 0 deletions oneflow/core/rpc/lib/global_process_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/common/global.h"
#include "oneflow/core/common/str_util.h"
#include "oneflow/core/control/ctrl_bootstrap.pb.h"
#include "oneflow/core/rpc/include/global_process_ctx.h"

Expand All @@ -29,6 +30,14 @@ int64_t GlobalProcessCtx::Rank() {
return Global<ProcessCtx>::Get()->rank();
}

int64_t GlobalProcessCtx::LocalRank() {
char* local_rank_env = std::getenv("LOCAL_RANK");
if (!local_rank_env) { return 0; }
CHECK(IsStrInt(local_rank_env));
static int64_t local_rank = std::stol(local_rank_env);
return local_rank;
}

int64_t GlobalProcessCtx::NodeSize() {
CHECK_NOTNULL(Global<ProcessCtx>::Get());
return Global<ProcessCtx>::Get()->node_size();
Expand All @@ -53,6 +62,11 @@ bool GlobalProcessCtx::IsThisProcessMaster() {
return Global<ProcessCtx>::Get()->rank() == 0;
}

bool GlobalProcessCtx::IsMultiClient() {
CHECK_NOTNULL(Global<ProcessCtx>::Get());
return Global<ProcessCtx>::Get()->is_multi_client();
}

size_t GlobalProcessCtx::WorldSize() {
CHECK_NOTNULL(Global<ProcessCtx>::Get());
return Global<ProcessCtx>::Get()->ctrl_addr().size();
Expand Down
13 changes: 13 additions & 0 deletions oneflow/core/vm/vm_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,18 @@ Maybe<void> SingleClientSync() {
return Maybe<void>::Ok();
}

Maybe<void> MultiClientSync() {
BlockingCounter bc(1);
JUST(PhysicalRun([&bc](InstructionsBuilder* builder) -> Maybe<void> {
JUST(builder->ComputeGlobalFrontSeqBarrier());
JUST(builder->ComputeRankFrontSeqCallback([&bc]() { bc.Decrease(); }));
return Maybe<void>::Ok();
}));

bc.WaitUntilCntEqualZero();

return Maybe<void>::Ok();
}

} // namespace vm
} // namespace oneflow
1 change: 1 addition & 0 deletions oneflow/core/vm/vm_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ ObjectMsgPtr<InstructionMsg> NewInstruction(const std::string& instr_type_name);

Maybe<void> Run(vm::InstructionMsgList* instr_msg_list);
Maybe<void> SingleClientSync();
Maybe<void> MultiClientSync();

} // namespace vm
} // namespace oneflow
Expand Down
16 changes: 12 additions & 4 deletions oneflow/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,19 @@

import oneflow.python.framework.env_util as env_util

env_util.init_default_physical_env()

if env_util.HasAllMultiClientEnvVars():
env_util.env_init(True)
else:
env_util.init_default_physical_env()

del env_util


# capture oneflow methods so that they can be still accessed after `del oneflow`
def _SyncOnMasterFn(get_rank, sync):
def _SyncOnMasterFn(is_multi_client, get_rank, sync):
def SyncOnMaster():
if get_rank() == 0:
if is_multi_client or get_rank() == 0:
sync()

return SyncOnMaster
Expand All @@ -88,8 +93,11 @@ def SyncOnMaster():
# so sync vm in advance to avoid data race
atexit.register(
_SyncOnMasterFn(
oneflow.python.framework.distribute.is_multi_client(),
oneflow.python.framework.distribute.get_rank,
oneflow._oneflow_internal.eager.single_client.Sync,
oneflow._oneflow_internal.eager.multi_client.Sync
if oneflow.python.framework.distribute.is_multi_client()
else oneflow._oneflow_internal.eager.single_client.Sync,
)
)
del atexit
Expand Down
4 changes: 2 additions & 2 deletions oneflow/python/framework/c_api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ def InitDefaultEnv(env_proto):
oneflow._oneflow_internal.InitDefaultEnv(env_proto_str)


def InitEnv(env_proto):
def InitEnv(env_proto, is_multi_client):
assert type(env_proto) is env_pb2.EnvProto
env_proto_str = text_format.MessageToString(env_proto)
oneflow._oneflow_internal.InitEnv(env_proto_str)
oneflow._oneflow_internal.InitEnv(env_proto_str, is_multi_client)


def InitLazyGlobalSession(config_proto):
Expand Down
10 changes: 10 additions & 0 deletions oneflow/python/framework/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ def assert_is_valid_distribute(
expected: 1) oneflow.distribute.split(axis); 2) oneflow.distribute.broadcast(); 3) oneflow.distribute.auto()"""


@oneflow_export("distributed.get_local_rank")
def get_local_rank():
return oneflow._oneflow_internal.GetLocalRank()


@oneflow_export("distributed.get_rank")
def get_rank():
r"""Returns the rank of current process group.
Expand All @@ -203,3 +208,8 @@ def get_world_size():
"""
return oneflow._oneflow_internal.GetWorldSize()


@oneflow_export("distributed.is_multi_client")
def is_multi_client():
return oneflow._oneflow_internal.IsMultiClient()
Loading

0 comments on commit 5806e2b

Please sign in to comment.