Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LazyInterpreter for FetchOutputOpExpr and set op parallel_distribution #5527

Merged
merged 8 commits into from
Jul 19, 2021
72 changes: 62 additions & 10 deletions oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ bool GetIsDynamicOfTensor(const std::shared_ptr<Tensor>& tensor) {

Maybe<void> GenParallelDistributionByTensor(ParallelDistribution* parallel_distribution,
const std::shared_ptr<Tensor>& tensor) {
// TODO(chengcheng)
parallel_distribution->clear_sbp_parallel();
if (tensor->is_local()) {
parallel_distribution->add_sbp_parallel()->mutable_broadcast_parallel();
chengtbf marked this conversation as resolved.
Show resolved Hide resolved
} else {
JUST(tensor->parallel_distribution())->ToProto(parallel_distribution);
}
return Maybe<void>::Ok();
}

Expand Down Expand Up @@ -79,9 +84,7 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedInputOpExpr& op_expr, const Ten
input_tensor->shape()->ToProto(blob_conf->mutable_shape());
blob_conf->set_data_type(input_tensor->dtype());
blob_conf->set_is_dynamic(GetIsDynamicOfTensor(input_tensor));
if (input_tensor->is_consistent()) {
JUST(GenParallelDistributionByTensor(blob_conf->mutable_parallel_distribution(), input_tensor));
}
JUST(GenParallelDistributionByTensor(blob_conf->mutable_parallel_distribution(), input_tensor));

auto infer_ctx = JUST(GetCurInferCtx());
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf));
Expand Down Expand Up @@ -136,9 +139,7 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const
// NOTE(chengcheng): VariableOpConf initializer_conf is useless because variable is inited
// by EagerTensor.
var_conf->mutable_initializer()->mutable_empty_conf();
if (input_tensor->is_consistent()) {
// TODO(chengcheng): GenerateParallelDistributionString by tensor.
}
// TODO(chengcheng): GenerateParallelDistributionString by tensor.
if (!input_tensor->requires_grad()) { var_conf->set_trainable(false); }
// TODO(chengcheng, xuxiaoyu): Set L1/L2 RegularizerConf by nn.Graph Optimizer

Expand Down Expand Up @@ -175,9 +176,60 @@ Maybe<void> LazyInterpreter::ApplyImpl(const FeedVariableOpExpr& op_expr, const

Maybe<void> LazyInterpreter::ApplyImpl(const FetchOutputOpExpr& op_expr, const TensorTuple& inputs,
TensorTuple* outputs, const OpExprInterpContext& ctx) const {
// TODO(chengcheng)
OF_UNIMPLEMENTED() << "The type " << op_expr.op_type_name()
<< " has not been supported in LazyInterpreter::Apply.";
// NOTE(chengcheng): inputs[0] is the LazyTensor
CHECK_EQ_OR_RETURN(inputs.size(), 1);
CHECK_EQ_OR_RETURN(op_expr.input_size(), 1);
const std::shared_ptr<Tensor>& input_tensor = inputs.at(0);
CHECK_OR_RETURN(input_tensor->is_lazy());
// NOTE(chengcheng): Lazy always consistent.
CHECK_OR_RETURN(input_tensor->is_consistent());
const std::string& input_lbn = TensorNameScope::Global()->Lookup(input_tensor);
CHECK_OR_RETURN(!input_lbn.empty()); // lbn must exist.

const auto& scope = JUST(GetCurrentScope());
int64_t scope_symbol_id = JUST(scope->symbol_id());

OperatorConf op_conf;
op_conf.set_name(op_expr.op_name()); // construct by python nn.Graph
op_conf.set_scope_symbol_id(scope_symbol_id); // TODO(chengcheng): NewScope by cur scope.
op_conf.set_device_tag(GetDeviceTagOfTensor(input_tensor));
// NOTE(chengcheng):
// We contruct OutputOpConf instead of FetchOutputOpConf because FetchOutputOpExpr JUST
// for get nn.Graph output LazyTensor.
OutputOpConf* output_conf = op_conf.mutable_output_conf();
output_conf->set_in(input_lbn);
output_conf->set_out("out");
InterfaceBlobConf* blob_conf = output_conf->mutable_blob_conf();
input_tensor->shape()->ToProto(blob_conf->mutable_shape());
blob_conf->set_data_type(input_tensor->dtype());
blob_conf->set_is_dynamic(GetIsDynamicOfTensor(input_tensor));
JUST(GenParallelDistributionByTensor(blob_conf->mutable_parallel_distribution(), input_tensor));

auto infer_ctx = JUST(GetCurInferCtx());
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(op_conf));

const std::string& op_name = op_conf.name();

// temp debug log
std::cout << "cclog: Lazy nn.Graph AddOpName: " << op_name << std::endl
<< " and the origin op_conf is :" << op_conf.DebugString();

int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(op_conf));
const std::shared_ptr<ParallelDesc>& blob_parallel_desc_sym =
JUST(GetSymbol<cfg::ParallelConf, ParallelDesc>(parallel_desc_sym_id));

// Check outputs num and setup output tensor properties.
CHECK_EQ_OR_RETURN(outputs->size(), 1);
CHECK_EQ_OR_RETURN(op_expr.output_size(), 1);

const std::string obn = "out"; // NOTE(chengcheng): obn is NOT op_expr.indexed_obns
const auto& parallel_attr =
JUST(compatible_py::GetOpArgParallelAttribute(blob_parallel_desc_sym, op_attr, obn));
const auto& blob_attr = JUST(compatible_py::GetOpArgBlobAttribute(op_attr, obn));

CHECK_OR_RETURN(!outputs->at(0).get());
// TODO(chengcheng): Build EagerLocalTensor if parllel attr is this rank.
(*outputs)[0] = JUST(OpInterpUtil::BuildTensor(blob_attr, parallel_attr, /*is_lazy=*/false));
return Maybe<void>::Ok();
}

Expand Down
105 changes: 105 additions & 0 deletions oneflow/python/test/graph/test_output_op_expr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest

import numpy as np
import os

os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "12139"
os.environ["WORLD_SIZE"] = "1"
os.environ["RANK"] = "0"
os.environ["LOCAL_RANK"] = "0"

import oneflow
import oneflow.experimental as flow
import oneflow.python.framework.session_context as session_ctx
import oneflow._oneflow_internal
from oneflow.python.framework.multi_client_session import MultiClientSession
import oneflow.python.framework.c_api_util as c_api_util


@flow.unittest.skip_unless_1n1d()
class TestFetchOutputTensor(unittest.TestCase):
def test_fetch_output_tensor(test_case):
test_case.assertTrue(oneflow.distributed.is_multi_client())
test_case.assertTrue(
oneflow.python.framework.env_util.HasAllMultiClientEnvVars()
)

x = flow.Tensor(1, 1, 10, 10)
flow.nn.init.uniform_(x, a=-1.0, b=1.0)

session = session_ctx.GetDefaultSession()
test_case.assertTrue(isinstance(session, MultiClientSession))
session.TryInit()

with oneflow._oneflow_internal.lazy_mode.gard(True):

oneflow._oneflow_internal.JobBuildAndInferCtx_Open(
"cc_test_output_op_expr_job"
)
job_conf = (
oneflow._oneflow_internal.oneflow.core.job.job_conf.JobConfigProto()
)
job_conf.set_job_name("cc_test_output_op_expr_job")
job_conf.mutable_predict_conf()
c_api_util.CurJobBuildAndInferCtx_SetJobConf(job_conf)

attrs = oneflow._oneflow_internal.MutableCfgAttrMap()

input_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FeedInputOpConf()
)
input_conf.set_in_0("EagerTensorInput")
input_conf.set_out_0("out_0")
input_op = oneflow._oneflow_internal.one.FeedInputOpExpr(
"cc_Input_0", input_conf, ["in_0"], ["out_0"]
)

output_conf = (
oneflow._oneflow_internal.oneflow.core.operator.op_conf.FetchOutputOpConf()
)
output_conf.set_in_0(
"LazyTensorInput"
) # don't care lbn of feed/fetch op conf
output_conf.set_out_0("out_0")
output_op = oneflow._oneflow_internal.one.FetchOutputOpExpr(
"cc_Output_0", output_conf, ["in_0"], ["out_0"]
)

if not x.is_determined:
x.determine()
x_tensor_in_c = x._local_or_consistent_tensor

lazy_tensor = input_op.apply([x_tensor_in_c], attrs)[0]
test_case.assertEqual(lazy_tensor.shape, (1, 1, 10, 10))
test_case.assertTrue(lazy_tensor.is_lazy)
test_case.assertTrue(lazy_tensor.is_consistent)

if not lazy_tensor.is_determined:
lazy_tensor.determine()
lazy_tensor_in_c = lazy_tensor._local_or_consistent_tensor

eager_tensor = output_op.apply([lazy_tensor_in_c], attrs)[0]
test_case.assertEqual(eager_tensor.shape, (1, 1, 10, 10))
test_case.assertTrue(not eager_tensor.is_lazy)
test_case.assertTrue(eager_tensor.is_consistent)
print("output eager tensor: ", eager_tensor)


if __name__ == "__main__":
unittest.main()