From ad5c97c9db10a839255bda147bcb8d7121d0a139 Mon Sep 17 00:00:00 2001 From: Xinqi Li Date: Thu, 5 Aug 2021 10:02:06 +0800 Subject: [PATCH] bugfix: DeviceId4ParallelId -> MachineId4ParallelId --- .../boxing/eager_boxing_interpreter_mgr.cpp | 24 ++++++++---- .../boxing/eager_boxing_interpreter_util.cpp | 9 ----- .../boxing/eager_boxing_interpreter_util.h | 2 - .../boxing/naive_b2p_boxing_interpreter.cpp | 2 +- .../eager_consistent_op_interpreter.cpp | 39 +++++++++++++------ .../core/functional/impl/consistent_cast.cpp | 12 ++---- 6 files changed, 49 insertions(+), 39 deletions(-) diff --git a/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_mgr.cpp b/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_mgr.cpp index ec353b69500..f3969d3a9b3 100644 --- a/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_mgr.cpp +++ b/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_mgr.cpp @@ -15,6 +15,7 @@ limitations under the License. */ #include #include "oneflow/core/common/constant.h" +#include "oneflow/core/common/cached_caller.h" #include "oneflow/core/common/container_util.h" #include "oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_mgr.h" #include "oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.h" @@ -68,12 +69,10 @@ Maybe GetOneDimNcclCollectiveEagerBoxingInterpreter( out_parallel_distribution->sbp_parallel(0)))); } -} // namespace - -Maybe EagerBoxingInterpreterManager::GetEagerBoxingInterpreter( +Maybe GetBoxingInterpreter( Symbol in_parallel_distribution, Symbol out_parallel_distribution, - Symbol in_parallel_desc, Symbol out_parallel_desc) const { + Symbol in_parallel_desc, Symbol out_parallel_desc) { if (in_parallel_distribution == out_parallel_distribution && in_parallel_desc == out_parallel_desc) { static std::shared_ptr identity_boxing_interpreter = @@ -82,14 +81,13 @@ Maybe EagerBoxingInterpreterManager::GetEagerBoxingInter } if (in_parallel_distribution->sbp_parallel_size() == 1 && out_parallel_distribution->sbp_parallel_size() == 1) { - if (EagerBoxingInterpreterUtil::IsPlacementEqual(in_parallel_desc, out_parallel_desc)) { + if (in_parallel_desc == out_parallel_desc) { if (EagerBoxingInterpreterUtil::IsBoxingB2P(in_parallel_distribution->sbp_parallel(0), out_parallel_distribution->sbp_parallel(0))) { std::shared_ptr naive_bp_boxing_interpreter = std::make_shared(); return naive_bp_boxing_interpreter; - } - if (EagerBoxingInterpreterUtil::IsDeviceTypeGPU(in_parallel_desc)) { + } else if (in_parallel_desc->device_type() == DeviceType::kGPU) { return GetOneDimNcclCollectiveEagerBoxingInterpreter(in_parallel_distribution, out_parallel_distribution); } else { @@ -103,6 +101,18 @@ Maybe EagerBoxingInterpreterManager::GetEagerBoxingInter } } +auto* CachedGetBoxingInterpreter = THREAD_LOCAL_CACHED(&GetBoxingInterpreter); + +} // namespace + +Maybe EagerBoxingInterpreterManager::GetEagerBoxingInterpreter( + Symbol in_parallel_distribution, + Symbol out_parallel_distribution, + Symbol in_parallel_desc, Symbol out_parallel_desc) const { + return CachedGetBoxingInterpreter(in_parallel_distribution, out_parallel_distribution, + in_parallel_desc, out_parallel_desc); +} + COMMAND(Global::SetAllocated(new EagerBoxingInterpreterManager())); } // namespace oneflow diff --git a/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.cpp b/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.cpp index 43bad772444..796c5097fb8 100644 --- a/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.cpp +++ b/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.cpp @@ -17,15 +17,6 @@ limitations under the License. namespace oneflow { -bool EagerBoxingInterpreterUtil::IsPlacementEqual(Symbol src, - Symbol dst) { - return src == dst; -} - -bool EagerBoxingInterpreterUtil::IsDeviceTypeGPU(Symbol parallel_desc) { - return parallel_desc->device_type() == DeviceType::kGPU; -} - bool EagerBoxingInterpreterUtil::IsBoxingS2B(const cfg::SbpParallel& src, const cfg::SbpParallel& dst) { return src.has_split_parallel() && dst.has_broadcast_parallel(); diff --git a/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.h b/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.h index c1f0a0d2b80..d34880abbe6 100644 --- a/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.h +++ b/oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.h @@ -23,8 +23,6 @@ limitations under the License. namespace oneflow { struct EagerBoxingInterpreterUtil { - static bool IsPlacementEqual(Symbol src, Symbol dst); - static bool IsDeviceTypeGPU(Symbol parallel_desc); static bool IsBoxingS2B(const cfg::SbpParallel& src, const cfg::SbpParallel& dst); static bool IsBoxingS2P(const cfg::SbpParallel& src, const cfg::SbpParallel& dst); static bool IsBoxingP2S(const cfg::SbpParallel& src, const cfg::SbpParallel& dst); diff --git a/oneflow/core/framework/op_interpreter/boxing/naive_b2p_boxing_interpreter.cpp b/oneflow/core/framework/op_interpreter/boxing/naive_b2p_boxing_interpreter.cpp index eb362215326..2dbbac01959 100644 --- a/oneflow/core/framework/op_interpreter/boxing/naive_b2p_boxing_interpreter.cpp +++ b/oneflow/core/framework/op_interpreter/boxing/naive_b2p_boxing_interpreter.cpp @@ -28,7 +28,7 @@ Maybe NaiveB2PBoxingInterpreter::InterpretImpl( Symbol out_parallel_distribution, Symbol in_parallel_desc, Symbol out_parallel_desc) const { CHECK_EQ_OR_RETURN(in_parallel_desc, out_parallel_desc); - int64_t root = JUST(in_parallel_desc->DeviceId4ParallelId(0)); + int64_t root = JUST(in_parallel_desc->MachineId4ParallelId(0)); if (root == GlobalProcessCtx::LocalRank()) { std::string device_type = Device::DeviceType4ParallelDesc(in_parallel_desc->device_tag()); return JUST(one::functional::Copy(input, device_type, root)); diff --git a/oneflow/core/framework/op_interpreter/eager_consistent_op_interpreter.cpp b/oneflow/core/framework/op_interpreter/eager_consistent_op_interpreter.cpp index aebff181a78..0f6fe486cfb 100644 --- a/oneflow/core/framework/op_interpreter/eager_consistent_op_interpreter.cpp +++ b/oneflow/core/framework/op_interpreter/eager_consistent_op_interpreter.cpp @@ -32,6 +32,7 @@ limitations under the License. #include "oneflow/core/autograd/autograd_mode.h" #include "oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_mgr.h" #include "oneflow/user/kernels/stateful_local_opkernel.h" +#include "oneflow/core/framework/tensor_rpc_util.h" namespace oneflow { namespace one { @@ -61,6 +62,26 @@ std::string GetDynamicOpConsistentFailedDebugString(const UserOpExpr& user_op_ex return ss.str(); } +namespace { + +Maybe GetBoxingOutput(const std::shared_ptr& input, + Symbol parallel_distribution) { + const auto& ctx = JUST(LaunchTensorMetaConsistencyCheck(*input)); + // Eager boxing + const auto& boxing_interpreter = + JUST(Global::Get()->GetEagerBoxingInterpreter( + JUST(input->parallel_distribution()), parallel_distribution, JUST(input->parallel_desc()), + JUST(input->parallel_desc()))); + const auto& output = JUST(boxing_interpreter->Interpret( + input, JUST(input->parallel_distribution()), parallel_distribution, + JUST(input->parallel_desc()), JUST(input->parallel_desc()))); + JUST(RpcUtil::WaitUntilDoneOrTimeout(*ctx, RpcUtil::TimeoutSeconds())); + JUST(ctx->Check()); + return output; +} + +} // namespace + } // namespace Maybe Interpret(const UserOpExpr& user_op_expr, const TensorTuple& inputs, @@ -87,7 +108,7 @@ Maybe Interpret(const UserOpExpr& user_op_expr, const TensorTuple& inputs, outputs->at(i).reset(new ConsistentTensor(tensor_impl)); } // Do nothing if the `parallel_desc` doesn't cover current ProcessCtx. - if (!device) { return Maybe::Ok(); } + if (!parallel_id.has_value()) { return Maybe::Ok(); } // Run instruction LocalCallOpKernel const auto& kernel = JUST(user_op_expr.MutKernel4Device(*device)); CHECK_EQ_OR_RETURN(kernel->output_tuple_indexes4mut2_obns().size(), 0) @@ -95,17 +116,11 @@ Maybe Interpret(const UserOpExpr& user_op_expr, const TensorTuple& inputs, std::shared_ptr input_eager_blob_objects = std::make_shared(inputs.size()); for (int i = 0; i < inputs.size(); ++i) { - // Eager boxing - const auto& boxing_interpreter = - JUST(Global::Get()->GetEagerBoxingInterpreter( - JUST(inputs.at(i)->parallel_distribution()), - result->input_parallel_distributions().at(i), JUST(inputs.at(i)->parallel_desc()), - JUST(inputs.at(i)->parallel_desc()))); - const auto& boxing_output = JUST(boxing_interpreter->Interpret( - inputs.at(i), JUST(inputs.at(i)->parallel_distribution()), - result->input_parallel_distributions().at(i), JUST(inputs.at(i)->parallel_desc()), - JUST(inputs.at(i)->parallel_desc()))); - const auto& local_tensor = JUST(boxing_output->cur_rank_phy_tensor()); + std::shared_ptr input = inputs.at(i); + if (result->input_parallel_distributions().at(i) != JUST(input->parallel_distribution())) { + input = JUST(GetBoxingOutput(input, result->input_parallel_distributions().at(i))); + } + const auto& local_tensor = JUST(input->cur_rank_phy_tensor()); input_eager_blob_objects->at(i) = JUST(local_tensor->eager_blob_object()); } std::shared_ptr output_eager_blob_objects = diff --git a/oneflow/core/functional/impl/consistent_cast.cpp b/oneflow/core/functional/impl/consistent_cast.cpp index f35173b90f0..d8382f84706 100644 --- a/oneflow/core/functional/impl/consistent_cast.cpp +++ b/oneflow/core/functional/impl/consistent_cast.cpp @@ -30,12 +30,10 @@ limitations under the License. #include "oneflow/core/job/global_for.h" #include "oneflow/core/job/resource_desc.h" #include "oneflow/core/job/rank_group_scope.h" -#include "oneflow/core/framework/rpc_token.h" -#include "oneflow/core/framework/rpc_util.h" #include "oneflow/core/common/flat_shape.h" #include "oneflow/core/common/container_util.h" #include "oneflow/core/common/balanced_splitter.h" -#include "oneflow/core/framework/tensor_rpc_util.h" +#include "oneflow/core/framework/rpc_util.h" namespace oneflow { namespace one { @@ -145,16 +143,14 @@ Maybe FindOrCreatParallelDistributionOpExpr( Maybe ConsistentToConsistent(const std::shared_ptr& x, Symbol parallel_desc, const std::vector>& sbp_parallels) { - const auto& ctx = JUST(LaunchTensorMetaConsistencyCheck(*x)); - JUST(RpcUtil::WaitUntilDoneOrTimeout(*ctx, RpcUtil::TimeoutSeconds())); - JUST(ctx->Check()); const auto& consistent_tensor = std::dynamic_pointer_cast(x); CHECK_NOTNULL_OR_RETURN(consistent_tensor) << "consistent tensors supported only"; CHECK_OR_RETURN(consistent_tensor->is_eager()) << "eager tensors supported only"; const auto& parallel_distribution_cast_op_expr = JUST(FindOrCreatParallelDistributionOpExpr(sbp_parallels)); - return JUST(OpInterpUtil::Dispatch(*parallel_distribution_cast_op_expr, - {consistent_tensor})); + const auto& ret = JUST(OpInterpUtil::Dispatch(*parallel_distribution_cast_op_expr, + {consistent_tensor})); + return ret; } Maybe LocalToConsistent(const std::shared_ptr& x,