Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control Graph / Session / Env's python c++ object destruction #5845

Merged
merged 41 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
64ec102
ref count del of session and env
strint Aug 11, 2021
38206e3
Merge branch 'master' into fea/destruct_session_and_graph
strint Aug 11, 2021
834cc77
auto format by CI
oneflow-ci-bot Aug 11, 2021
40e13c1
merge master
strint Aug 12, 2021
6093d85
Merge branch 'fea/destruct_session_and_graph' of https://github.com/O…
strint Aug 12, 2021
c2e1452
add test
strint Aug 12, 2021
4a9b1bb
pass graph in global test
strint Aug 13, 2021
ba191c9
Merge branch 'master' of https://github.com/Oneflow-Inc/oneflow into …
strint Aug 13, 2021
7ace8ee
Merge branch 'master' into fea/destruct_session_and_graph
strint Aug 14, 2021
b9af0b7
auto format by CI
oneflow-ci-bot Aug 14, 2021
061badd
rm cuda
strint Aug 14, 2021
17f5cc3
Merge branch 'fea/destruct_session_and_graph' of https://github.com/O…
strint Aug 14, 2021
ba84109
rm debug lof
strint Aug 14, 2021
93b7698
rm debug info
strint Aug 14, 2021
d41bdcd
Merge branch 'master' into fea/destruct_session_and_graph
strint Aug 14, 2021
9322958
move graph del sync from c to python
strint Aug 14, 2021
86cdff9
Merge branch 'fea/destruct_session_and_graph' of https://github.com/O…
strint Aug 14, 2021
b39de50
rm log
strint Aug 14, 2021
bd1fa84
auto format by CI
oneflow-ci-bot Aug 14, 2021
f3517da
refine test to pass ci
strint Aug 14, 2021
711fa24
Merge branch 'fea/destruct_session_and_graph' of https://github.com/O…
strint Aug 14, 2021
cfe2975
Merge branch 'master' of https://github.com/Oneflow-Inc/oneflow into …
strint Aug 14, 2021
802bac6
fix env sync call
strint Aug 14, 2021
b7dd0a3
auto format by CI
oneflow-ci-bot Aug 14, 2021
3a20855
deal with graph destruction when graph not compiled
strint Aug 14, 2021
03be911
Merge branch 'master' of https://github.com/Oneflow-Inc/oneflow into …
strint Aug 14, 2021
381c29f
Merge branch 'fea/destruct_session_and_graph' of https://github.com/O…
strint Aug 14, 2021
cb79eb7
deal with not compiled graph destruction
strint Aug 14, 2021
f017c2b
print log
strint Aug 14, 2021
94f6f35
deal with single client sync
strint Aug 14, 2021
142b9c2
Merge branch 'master' into fea/destruct_session_and_graph
strint Aug 14, 2021
dcafd9c
auto format by CI
oneflow-ci-bot Aug 14, 2021
f5ed490
rm py object ref
strint Aug 16, 2021
411ddbf
Merge branch 'fea/destruct_session_and_graph' of https://github.com/O…
strint Aug 16, 2021
7dd0209
rm python obj ref count
strint Aug 16, 2021
15316b5
add log of env
strint Aug 16, 2021
7c063f0
Merge branch 'master' of https://github.com/Oneflow-Inc/oneflow into …
strint Aug 16, 2021
af9e2af
address comment
strint Aug 16, 2021
52db422
Merge branch 'master' into fea/destruct_session_and_graph
strint Aug 16, 2021
ea05697
auto format by CI
oneflow-ci-bot Aug 16, 2021
543a4f9
Merge branch 'master' into fea/destruct_session_and_graph
oneflow-ci-bot Aug 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion oneflow/api/python/session/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ ONEFLOW_API_PYBIND11_MODULE("", m) {
// multi-client lazy global session context
m.def("CreateMultiClientSessionContext", &CreateMultiClientSessionContext);
m.def("InitMultiClientSessionContext", &InitMultiClientSessionContext);
m.def("TryDestroyMultiClientSessionContext", &TryDestroyMultiClientSessionContext);
m.def("MultiClientSessionContextAddCGraph", &MultiClientSessionContextAddCGraph);
m.def("TryDestroyMultiClientSessionContext", &TryDestroyMultiClientSessionContext,
py::call_guard<py::gil_scoped_release>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个 gil 锁是必须要加的么

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

试了下,现在不必要了,已经去掉


using namespace oneflow;
m.def("NewSessionId", &NewSessionId);
Expand Down
7 changes: 7 additions & 0 deletions oneflow/api/python/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ limitations under the License.
#include "oneflow/core/job/resource_desc.h"
#include "oneflow/core/framework/config_def.h"
#include "oneflow/core/framework/multi_client_session_context.h"
#include "oneflow/core/framework/nn_graph.h"
#include "oneflow/core/persistence/tee_persistent_log_stream.h"

namespace oneflow {
Expand Down Expand Up @@ -127,6 +128,12 @@ inline Maybe<void> InitMultiClientSessionContext(const std::string& config_proto
return Maybe<void>::Ok();
}

inline Maybe<void> MultiClientSessionContextAddCGraph(
const std::shared_ptr<oneflow::NNGraph>& c_graph_ptr) {
JUST(Global<MultiClientSessionContext>::Get()->AddCGraph(c_graph_ptr));
return Maybe<void>::Ok();
}

inline Maybe<void> TryDestroyMultiClientSessionContext() {
// Global<T>::Delete is not allowed to be called here
// because glog is not constructed yet and LOG(INFO) has bad bahavior
Expand Down
6 changes: 6 additions & 0 deletions oneflow/api/python/session/session_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
#define ONEFLOW_API_PYTHON_SESSION_SESSION_API_H_

#include "oneflow/api/python/session/session.h"
#include "oneflow/core/framework/nn_graph.h"

inline bool IsSessionInited() { return oneflow::IsSessionInited().GetOrThrow(); }

Expand All @@ -42,6 +43,11 @@ inline void InitMultiClientSessionContext(const std::string& config_proto_str) {
return oneflow::InitMultiClientSessionContext(config_proto_str).GetOrThrow();
}

inline void MultiClientSessionContextAddCGraph(
const std::shared_ptr<oneflow::NNGraph>& c_graph_ptr) {
return oneflow::MultiClientSessionContextAddCGraph(c_graph_ptr).GetOrThrow();
}

inline void TryDestroyMultiClientSessionContext() {
return oneflow::TryDestroyMultiClientSessionContext().GetOrThrow();
}
Expand Down
17 changes: 14 additions & 3 deletions oneflow/core/framework/multi_client_session_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,21 @@ Maybe<void> MultiClientSessionContext::TryInit(const ConfigProto& config_proto)
return Maybe<void>::Ok();
}

Maybe<void> MultiClientSessionContext::AddCGraph(
const std::shared_ptr<oneflow::NNGraph>& c_graph_ptr) {
graphs_.push_back(c_graph_ptr);
return Maybe<void>::Ok();
}

Maybe<void> MultiClientSessionContext::TryClose() {
if (is_inited_) {
VLOG(2) << "Try to delete multi client session." << std::endl;
JUST(vm::MultiClientSync());
VLOG(2) << "Start to delete multi client session." << std::endl;
VLOG(2) << "Try to delete multi client session context." << std::endl;
for (auto wk_graph_ptr : graphs_) {
if (auto sh_graph_ptr = wk_graph_ptr.lock()) {
VLOG(2) << "grap name " << sh_graph_ptr->job_name() << " not closed, try to close it.";
JUST(sh_graph_ptr->Close());
}
}
{
// NOTE(chengcheng): delete runtime global objects
Global<boxing::collective::CollectiveBoxingDeviceCtxPoller>::Delete();
Expand All @@ -148,6 +158,7 @@ Maybe<void> MultiClientSessionContext::TryClose() {
Global<ResourceDesc, ForSession>::New(Global<ResourceDesc, ForEnv>::Get()->resource(),
GlobalProcessCtx::NumOfProcessPerNode());
}
VLOG(2) << "Finish delete multi client session context." << std::endl;
return Maybe<void>::Ok();
}

Expand Down
3 changes: 3 additions & 0 deletions oneflow/core/framework/multi_client_session_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
#include "oneflow/core/common/util.h"
#include "oneflow/core/job/job_set.pb.h"
#include "oneflow/core/common/maybe.h"
#include "oneflow/core/framework/nn_graph.h"
#include "oneflow/core/framework/tensor.h"

namespace oneflow {
Expand All @@ -30,6 +31,7 @@ class MultiClientSessionContext {
~MultiClientSessionContext() {}

Maybe<void> TryInit(const ConfigProto& config_proto);
Maybe<void> AddCGraph(const std::shared_ptr<oneflow::NNGraph>& c_graph_ptr);
Maybe<void> TryClose();

// NOTE(chengcheng): for nn.Graph catch free EagerTensor in Graph.build().
Expand All @@ -48,6 +50,7 @@ class MultiClientSessionContext {
bool is_inited_;
HashMap<std::string, std::vector<std::pair<std::string, std::shared_ptr<one::Tensor>>>>
graph_name2free_eager_tensors_;
std::vector<std::weak_ptr<NNGraph>> graphs_;
};

} // namespace oneflow
Expand Down
21 changes: 16 additions & 5 deletions oneflow/core/framework/nn_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,28 @@ limitations under the License.
#include "oneflow/core/job/job_desc.h"
#include "oneflow/core/job/job_instance.h"
#include "oneflow/core/job/plan_util.h"
#include "oneflow/core/job/runtime.h"
#include "oneflow/core/persistence/tee_persistent_log_stream.h"
#include "oneflow/core/vm/vm_util.h"

namespace oneflow {

NNGraph::~NNGraph() {
VLOG(2) << "Try to delete c nn graph name " << name_ << "." << std::endl;
CloseRuntimeBuffers();
runtime_.reset();
Global<MultiClientSessionContext>::Get()->RemoveGraphFreeEagerTensors(name_);
if (!is_closed_) {
strint marked this conversation as resolved.
Show resolved Hide resolved
VLOG(2) << "graph destructor Try to close c nn graph name " << name_ << "." << std::endl;
Close();
}
}

Maybe<void> NNGraph::Close() {
if (!is_closed_) {
VLOG(2) << "Try to close c nn graph name " << name_ << "." << std::endl;
CloseRuntimeBuffers();
runtime_.reset();
Global<MultiClientSessionContext>::Get()->RemoveGraphFreeEagerTensors(name_);
is_closed_ = true;
VLOG(2) << "Finish close c nn graph name " << name_ << "." << std::endl;
}
return Maybe<void>::Ok();
}

const std::vector<std::string>& NNGraph::inputs_op_names() const { return input_op_names_; }
Expand Down
7 changes: 5 additions & 2 deletions oneflow/core/framework/nn_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ limitations under the License.
#include "oneflow/core/framework/tensor_tuple.h"
#include "oneflow/core/job/job.pb.h"
#include "oneflow/core/job/plan.pb.h"
#include "oneflow/core/job/runtime.h"

namespace oneflow {

class Blob;
class Runtime;

class NNGraph final : public NNGraphIf {
public:
explicit NNGraph(const std::string& name) : name_(name), runtime_inited_(false) {}
explicit NNGraph(const std::string& name)
: name_(name), runtime_inited_(false), is_closed_(false) {}
~NNGraph();

const std::string& job_name() const { return name_; }
Expand All @@ -43,6 +44,7 @@ class NNGraph final : public NNGraphIf {
const std::vector<std::string>& variable_op_names,
const std::vector<std::shared_ptr<one::Tensor>>& variable_tensors);
Maybe<void> CompileAndInitRuntime();
Maybe<void> Close();

private:
Maybe<void> RegisterFreeEagerTensorsToVariableOpNames();
Expand All @@ -59,6 +61,7 @@ class NNGraph final : public NNGraphIf {
// TODO(chengcheng): temp impl using runtime now, need reimplement for dynamic multi nn.Graph.
std::unique_ptr<Runtime> runtime_;
bool runtime_inited_;
bool is_closed_;
};

Maybe<void> RunLazyNNGraph(const one::TensorTuple& inputs, const one::TensorTuple& outputs,
Expand Down
4 changes: 4 additions & 0 deletions oneflow/core/job/env_global_objects_scope.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ limitations under the License.
#include "oneflow/core/transport/transport.h"
#include "oneflow/core/device/node_device_descriptor_manager.h"
#include "oneflow/core/vm/symbol_storage.h"
#include "oneflow/core/framework/multi_client_session_context.h"
#include "oneflow/core/framework/symbol_id_cache.h"
#include "oneflow/core/operator/op_node_signature.cfg.h"
#include "oneflow/core/operator/op_conf.cfg.h"
Expand Down Expand Up @@ -199,6 +200,9 @@ Maybe<void> EnvGlobalObjectsScope::Init(const EnvProto& env_proto) {
}

EnvGlobalObjectsScope::~EnvGlobalObjectsScope() {
auto session_ctx = Global<MultiClientSessionContext>::Get();
if (session_ctx != nullptr) { session_ctx->TryClose(); }

if (!Global<ResourceDesc, ForSession>::Get()->enable_dry_run()) {
#ifdef __linux__
if (Global<ResourceDesc, ForSession>::Get()->process_ranks().size() > 1) {
Expand Down
30 changes: 9 additions & 21 deletions python/oneflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,16 @@ def is_deprecated(func_or_class):
import oneflow.framework.session_context as session_ctx
from oneflow.framework.multi_client_session import MultiClientSession

if not env_util.HasAllMultiClientEnvVars():
env_util.SetDefaultMultiClientEnvVars()
oneflow._oneflow_internal.SetIsMultiClient(True)
env_util.api_env_init()
env_util.GetEnvHolder()

oneflow._oneflow_internal.InitDefaultConsistentTransportTokenScope()
session_ctx.OpenDefaultSession(
MultiClientSession(oneflow._oneflow_internal.NewSessionId())
MultiClientSession(
oneflow._oneflow_internal.NewSessionId(), env_util.GetEnvHolder()
)
)
scope_util.InitScopeStack()
oneflow._oneflow_internal.EnableEagerEnvironment(True)
del env_util
from oneflow.framework import python_callback, register_python_callback

oneflow._oneflow_internal.RegisterGlobalForeignCallback(
Expand All @@ -93,24 +92,13 @@ def is_deprecated(func_or_class):
del register_python_callback


def _SyncOnMasterFn():
import oneflow

def Sync():
if not oneflow._oneflow_internal.IsEnvInited():
return
if oneflow.framework.distribute.is_multi_client():
oneflow._oneflow_internal.eager.multi_client.Sync()
elif oneflow.framework.distribute.get_rank() == 0:
oneflow._oneflow_internal.eager.single_client.Sync()
def _ExitOneFlow():
session_ctx.TryCloseDefaultSession()
env_util.DelEnvHolder()

return Sync

atexit.register(_ExitOneFlow)

strint marked this conversation as resolved.
Show resolved Hide resolved
atexit.register(oneflow._oneflow_internal.SetShuttingDown)
atexit.register(oneflow._oneflow_internal.DestroyEnv)
atexit.register(oneflow.framework.session_context.TryCloseDefaultSession)
atexit.register(_SyncOnMasterFn)
strint marked this conversation as resolved.
Show resolved Hide resolved
del atexit
del oneflow
import oneflow.framework.docstr as docstr
Expand Down
42 changes: 41 additions & 1 deletion python/oneflow/framework/env_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,52 @@ def str2int(env_config):
if os.getenv("GLOG_log_dir"):
cpp_logging_conf.log_dir = os.getenv("GLOG_log_dir")
if os.getenv("GLOG_logtostderr"):
cpp_logging_conf.logtostderr = os.getenv("GLOG_logtostderr")
cpp_logging_conf.logtostderr = int(os.getenv("GLOG_logtostderr"))
if os.getenv("GLOG_logbuflevel"):
cpp_logging_conf.logbuflevel = os.getenv("GLOG_logbuflevel")
env_proto.cpp_logging_conf.CopyFrom(cpp_logging_conf)


def _SyncOnMasterFn():
def Sync():
if not oneflow._oneflow_internal.IsEnvInited():
return
if oneflow.framework.distribute.is_multi_client():
oneflow._oneflow_internal.eager.multi_client.Sync()
elif oneflow.framework.distribute.get_rank() == 0:
oneflow._oneflow_internal.eager.single_client.Sync()

return Sync


class EnvHolder(object):
def __init__(self):
if not HasAllMultiClientEnvVars():
SetDefaultMultiClientEnvVars()
oneflow._oneflow_internal.SetIsMultiClient(True)
api_env_init()

def __del__(self):
_SyncOnMasterFn()
strint marked this conversation as resolved.
Show resolved Hide resolved
oneflow._oneflow_internal.DestroyEnv()
oneflow._oneflow_internal.SetShuttingDown()


def GetEnvHolder():
global _env_holder
if _env_holder is not None:
return _env_holder
else:
_env_holder = EnvHolder()
return _env_holder


def DelEnvHolder():
global _env_holder
del _env_holder


_env_holder = None
device_tag2default_parallel_conf = {}
default_env_proto = _DefaultEnvProto()
config_master_addr = ctrl_bootstrap_pb.Address()
Expand Down
15 changes: 10 additions & 5 deletions python/oneflow/framework/multi_client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class Status(enum.Enum):
INITED = 2
CLOSED = 3

def __init__(self, sess_id):
def __init__(self, sess_id, env_holder):
self._env_holder = env_holder
self.sess_ = oneflow._oneflow_internal.RegsiterSession(sess_id)
oneflow._oneflow_internal.CreateMultiClientSessionContext()
self.config_proto_ = self._make_config_proto()
Expand All @@ -39,22 +40,26 @@ def __init__(self, sess_id):
self._update_scope_attr_name2defaultVal()
self.status_ = self.Status.CREATED

def __del__(self):
self.TryClose()

def TryInit(self):
self._check_status(self.Status.CREATED, self.Status.INITED)
if self.status_ == self.Status.CREATED:
config_proto_str = text_format.MessageToString(self.config_proto)
oneflow._oneflow_internal.InitMultiClientSessionContext(config_proto_str)
self.status_ = self.Status.INITED

def TryClose(self):
def AddCGraph(self, graph):
self._check_status(self.Status.INITED)
oneflow._oneflow_internal.MultiClientSessionContextAddCGraph(graph)

def _try_close(self):
if self.status_ != self.Status.CLOSED:
oneflow._oneflow_internal.TryDestroyMultiClientSessionContext()
oneflow._oneflow_internal.ClearSessionById(self.id)
self.status_ = self.Status.CLOSED

def __del__(self):
self._try_close()

@property
def status(self):
return self.status_
Expand Down
3 changes: 1 addition & 2 deletions python/oneflow/framework/session_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ def TryCloseDefaultSession():
default_sess_id = oneflow._oneflow_internal.GetDefaultSessionId()
assert default_sess_id in _sess_id2sess
if default_sess_id in _sess_id2sess:
_sess_id2sess[default_sess_id].TryClose()
del _sess_id2sess[default_sess_id]
del _sess_id2sess[default_sess_id]


def try_init_default_session(func):
Expand Down
13 changes: 9 additions & 4 deletions python/oneflow/nn/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self):
self._args_repr = []
self._outs_repr = []
self._debug = False
self._session = None

@property
def name(self):
Expand Down Expand Up @@ -163,11 +164,12 @@ def _compile(self, *args):
def _build_forward_graph(self, *args):
self._generate_optimizer_and_variable_configs()

session = session_ctx.GetDefaultSession()
assert type(session) is MultiClientSession
session.TryInit()
self._session = session_ctx.GetDefaultSession()
assert type(self._session) is MultiClientSession
self._session.TryInit()
self._session.AddCGraph(self._c_nn_graph)

with graph_build_util.graph_build_context(self.config.proto, session):
with graph_build_util.graph_build_context(self.config.proto, self._session):
# Deal with inputs
arg_op_names, lazy_args, self._args_repr = self._build_io(
"input", graph_build_util.build_graph_input_arg, *args
Expand Down Expand Up @@ -459,6 +461,9 @@ def _shallow_repr(self):
shallow_repr = "(GRAPH:" + self._name + ":" + self.__class__.__name__ + ")"
return shallow_repr

def __del__(self):
oneflow._oneflow_internal.eager.multi_client.Sync()


class GraphConfig(FunctionConfig):
def __init__(self):
Expand Down