-
Notifications
You must be signed in to change notification settings - Fork 662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat multi input sharing graph, save and load compiled graph #9754
Conversation
GetInputCriticalSectionCallbackBufferName(new_job_name)); | ||
} else if (buffer_name.rfind(kOutputCriticalSectionCallbackBufferNamePrefix, 0) == 0) { | ||
op_conf.mutable_critical_section_callback_tick_conf()->set_buffer_name( | ||
GetOutputCriticalSectionCallbackBufferName(new_job_name)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
和 @chengtbf 讨论后,发现卡死的问题是上面这些 tick 相关的 op,都是基于全局 buffer 通信的,buffer 通信需要生成独立的 buffer name
buffer_mgr->Get(GetSourceTickBufferName(job_name))->Push(job_instance); | ||
LOG(INFO) << "vm run lazy " << job_name << " push source tick " | ||
<< " run count " << run_cnt; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
调试完成后需要删除
oneflow/core/lazy/actor/actor.cpp
Outdated
@@ -433,6 +442,9 @@ void Actor::ActUntilFail() { | |||
AsyncRetInplaceConsumedRegstIfNoConsumer(); | |||
|
|||
AsyncSendQueuedMsg(); | |||
LOG(INFO) << "Actor " << actor_id_ << " name " << op_name << " finish to act count " | |||
<< act_cnt_; | |||
++act_cnt_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
调试完成后需要删除
reshape 的问题出了,sd2 里面。 |
一种可能的解法是: 新的 graph 再执行一遍 build,这便 build 不会触发后续的 job pass,但是会获取到新的合法的 reshape conf 等跟 shape 相关的配置(random、input 等) |
应该可行,重新执行一遍 build 的目的就是为了刷新 attr 等,但如何把新 build 出来的 reshape 等 op 与老的 graph 里面的对应 reshape 关联起来呢(需要填充新的 attr) |
按照创建顺序关联, 就像 input 的 conf attr shape 关联一样。 只不过稍微复杂一点。有多种办法,比如:
|
这个 order 每次 build 时以及执行 pass 后是稳定的吗?有些 pass 对图的改写会影响 order 吧?还是说完全忽略 pass,只考虑原始的 graph |
只考虑原始 graph,不考虑 job pass 改写(reshape 不会被 fusion)。在原始 build 逻辑里,是按照 python 脚本的执行顺序触发的,每次都是一样的。 |
def forward(self, x): | ||
y = self.linear(x) | ||
assert len(y.shape) == 2 | ||
return flow.reshape(y, (y.shape[1], y.shape[0])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reshape 的测试通过
auto attr_iter = new_op_conf->user_conf().attr().find(pair.first); | ||
CHECK_OR_RETURN(attr_iter != new_op_conf->user_conf().attr().end()) | ||
<< " There is not attr " << pair.first << " in new op " << new_op_conf->DebugString(); | ||
*pair.second.mutable_at_shape() = attr_iter->second.at_shape(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
更新 shape attr
NewOp4SharedOpName) { | ||
// job is a copy from a shared graph. | ||
// The job name has already update in py nn.Graph. | ||
const auto& new_job_name = job->job_conf().job_name(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
更新了一下结构,会清楚一点
destination = OrderedDict() | ||
destination._metadata = OrderedDict() | ||
|
||
destination["graph_name"] = self.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runtime_state_dict 保存了 graph 运行时执行需要的信息:
- job name
- job id
- 输入 tensor 和 name
- 输出 tensor 和 name
- variable tensor 和 name
- plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- 输入 tensor 和 name
- 输入 tensor 和 name
输出 tensor 和 name ? 为什么需要保存输入输出 tensor ? 这里其实是保存 tensor meta ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是的,实际上只需要 meta。但是保存 tensor 的机制是成熟的,所以就直接保存了 tensor。
实际 c nn graph 里面也是只使用 meta,但也是存为了 tensor。
# Create a c nn graph to run with lazy runtime. | ||
self._c_nn_graph = oneflow._oneflow_internal.nn.graph.CNNGraph( | ||
self._name, | ||
state_dict["exe_plan"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
load graph 时,直接传递 plan
Speed stats:
|
View latest API docs preview at: https://staging.oneflow.info/docs/Oneflow-Inc/oneflow/pr/9754/ |
"OutputCriticalSectionCallback-"; | ||
static const std::string kInputBufferNamePrefix = "Input-"; | ||
static const std::string kOutputBufferNamePrefix = "Output-"; | ||
static const std::string kSourceTickBufferNamePrefix = "SourceTick-"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
因为本文件外部要使用这些名字,所以放到了函数外部
: name_(name), | ||
job_id_(job_id), | ||
session_ctx_(session_ctx), | ||
plan_(plan), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
支持重 plan 初始化 NNGraph
@@ -28,7 +29,15 @@ class JobCompleter final { | |||
JobCompleter() = default; | |||
~JobCompleter() = default; | |||
|
|||
Maybe<void> Complete(Job* job) const; | |||
static Maybe<void> Complete(Job* job); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Complete 函数改成了 static
@@ -262,8 +262,8 @@ def is_deprecated(func_or_class): | |||
import oneflow.framework.session_context as session_ctx | |||
from oneflow.framework.tensor_str import set_printoptions | |||
|
|||
__oneflow_global_unique_env = env_util.GetEnv() | |||
session_ctx.NewDefaultSession(__oneflow_global_unique_env) | |||
_oneflow_global_unique_env = env_util.GetEnv() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
创建 new session 时需要依赖 env,所以去掉了 __
以在其它模块可以获取 env
python/oneflow/nn/graph/graph.py
Outdated
@@ -910,7 +1207,13 @@ def __build_graph(self, *args, **kwargs): | |||
with graph_build_util.graph_build_context(self.config.proto, self._session): | |||
# Deal with inputs | |||
self.__print(0, 1, self._shallow_repr() + " start building graph inputs.") | |||
arg_op_names, lazy_args, lazy_kwargs, self._args_repr, _ = self.__build_io( | |||
( | |||
self._input_op_names, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个字段不是 enable_save_runtime_state_dict 为 true 时才会有吗?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
嗯,我再改回去,这里之前没用 share 开关,直接保存了
python/oneflow/nn/graph/graph.py
Outdated
output_op_names, | ||
self._eager_outputs, | ||
self._output_op_names, | ||
self._build_eager_outputs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个字段没看到设置的地方
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
还有下面的 out2name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
上面有设置,不过不太清楚,我改一下
_, # empty kwargs return | ||
outs_repr, | ||
out2name, | ||
) = self.__build_io("output", graph_build_util.build_graph_output, *outputs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
前面把 output 转成 tuple,这里再 unpack 是为了什么?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个是 __build_io 的设定的逻辑。build 成 tuple 是假设了输入形式为 (args, kwargs),这样比较通用。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
直接传入 outputs 也符合这个设定吧,outputs 也会被解析为 args 的一员。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
想起来了,是为了处理边界情况:#7539
} | ||
return std::make_shared<NNGraph>(name, job, job_id, session_ctx); | ||
})) | ||
.def(py::init([](const std::string& name, const std::string& serialized_plan, int64_t job_id, | ||
const std::shared_ptr<MultiClientSessionContext>& session_ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个 ctx 是为了还原 save 的时候的 session?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个 ctx 是为了还原 save 的时候的 session?
这个 ctx 是之前那个用引用计数来释放 graph/session/env 的 pr 引入的,这个 pr 只是新增了一个从 plan 构造 c nn graph 构造函数。
|
||
// NOTE(chengcheng): Singleton<JobDesc> need be clear before GlobalJobDescScope construct. | ||
if (Singleton<JobDesc>::Get() != nullptr) { Singleton<JobDesc>::Delete(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这行逻辑挪到哪里了呢
oneflow/core/framework/nn_graph.cpp
Outdated
Maybe<void> NNGraph::BuildWithNewInputFromSharedGraph( | ||
const std::vector<std::string>& shared_inputs_op_names, | ||
const std::vector<std::shared_ptr<one::Tensor>>& new_input_tensors, | ||
const std::vector<std::string>& shared_op_names, const std::string& new_serialized_job) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shared_op_names 这个参数有什么用呢, new_serialized_job 里是不是都有
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shared_op_names 这个参数有什么用呢, new_serialized_job 里是不是都有
shared_op_names = []
for op_idx in range(len(self._forward_job_proto.net.op)):
shared_op_names.append(
self._shared_graph._forward_job_proto.net.op[op_idx].name
)
shared_op_names 是从 build 那里直接产生的原始逻辑图得到的,new_serialized_job 里面已经是优化后的图了。
优化后的图,没有顺序保证了。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new_serialized_job 里面已经是优化后的图了
那如何保证 shared_op_names 和 new_serialized_job 两者相同呢,可能 new_serialized_job 没有 shared_op_names 里的 op 了
for (int64_t idx = 0; idx < shared_inputs_op_names.size(); ++idx) { | ||
input_name2tensor.emplace(shared_inputs_op_names[idx], new_input_tensors[idx]); | ||
} | ||
const auto& InputTensor4Name = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个是不是应该放在 RegisterInputOpNamesAndTensors 里
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个是不是应该放在 RegisterInputOpNamesAndTensors 里
InputTensor4Name 只有下面用了,上面的RegisterInputOpNamesAndTensors不依赖这个查找,所以就写成了用完就释放的形式
oneflow/core/framework/nn_graph.cpp
Outdated
for (int64_t op_idx = 0; op_idx < shared_op_names.size(); ++op_idx) { | ||
// Assume that the new graph and the shared graph from nn.Graph.build have the same op order. | ||
const auto& op = new_build_job.mutable_net()->mutable_op()->at(op_idx); | ||
shared_op_name2_new_op.emplace(shared_op_names[op_idx], &op); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
其实可以把 : new_build_job 直接传给: CompleteSharedGraphForNewInput 吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
其实可以把 : new_build_job 直接传给: CompleteSharedGraphForNewInput 吧
这个 map 其实是从:shared op name 到new build job 中的 op。
中间用 op 顺序做了下对应, shared op name 都 op order 到 new build job 中的 op。
以给后面修改 shared graph op attr 做准备。所以只传递 new_build_job 还不行。
这个我改下名字,然后注释下。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
所以只传递 new_build_job 还不行。
这里我理解是 new_build_job 是不包含 op 顺序导致的?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new_build_job 是新的 build 函数产生的临时 job,后来改了下名字,它作为新 graph attr 的词典存在。
所以要额外传递 op name 信息来维护新老 op 的对应关系
state_dict["states"] | ||
) | ||
if type(self) != Graph: | ||
# Graph init with eager module, try to share mem with eager module |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里的 state dict 不是 load plan 时加载的吗,怎么还需要处理 eager module 的事情
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not load_with_eager:
# 不带 eager 的 graph 初始化
linear_g = flow.nn.Graph()
else:
# 带有 eager 的 graph 初始化
class LinearGraph(flow.nn.Graph):
def __init__(self):
super().__init__()
self.my_linear = linear_reshape
def build(self, x):
return self.my_linear(x)
linear_g = LinearGraph()
# 加载运行时状态
linear_g.load_runtime_state_dict(state_dict_list[0])
测试 sd 的时候,它采用了 带有 eager 的 graph 初始化。
发现如果不考虑 eager 的参数共享,会多 1.8 G 显存开销,所以加上了对这个情况的处理。
): | ||
if self._enable_save_runtime_state_dict or self._enable_shared_from_this: | ||
self._input_op_names = input_op_names | ||
self._output_op_names = output_op_names |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这些内容如果默认就保存,有什么代价吗(不区分 _enable_save_runtime_state_dict )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
之前区分主要是考虑保存 tensor 的额外开销,比如保存 _inputs_tensor_tuple ,多占用显存。
然后觉得既然考虑了,就都做了区分。
python/oneflow/nn/graph/graph.py
Outdated
) = oneflow._oneflow_internal.DumpVariableTensorMgr() | ||
self._state_tensor_tuple = convert_to_tensor_tuple(state_tensors) | ||
self._state_tensor_tuple = convert_to_tensor_tuple(self._state_tensors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_state_tensors 好像可以作为临时变量,不用保存在 self 上?
我看到只有 runtime_state_dict 用到了 _state_op_names 和 _state_tensor_tuple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
的确,已经去掉
Speed stats:
|
View latest API docs preview at: https://staging.oneflow.info/docs/Oneflow-Inc/oneflow/pr/9754/ |
Speed stats:
|
View latest API docs preview at: https://staging.oneflow.info/docs/Oneflow-Inc/oneflow/pr/9754/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
看完回复了
Speed stats:
|
View latest API docs preview at: https://staging.oneflow.info/docs/Oneflow-Inc/oneflow/pr/9754/ |
return self.my_linear(x) | ||
|
||
linear_g = LinearGraph() | ||
linear_g.enable_shared() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
第一个 graph,允许它被共享;
test_case.assertTrue(np.array_equal(of_lazy_out.numpy(), of_eager_out.numpy())) | ||
|
||
linear_g1 = LinearGraph() | ||
linear_g1.share_from(linear_g) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
第二个 graph,共享第一个 graph 的优化后的图和参数;
return self.my_linear(x) | ||
|
||
linear_g = LinearGraph() | ||
linear_g.enable_save_runtime_state_dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
做离线编译时,允许 graph 保存运行时状态;
return_dict["save1"] = test_case1 | ||
|
||
state_dict_list = [] | ||
state_dict0 = linear_g.runtime_state_dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
做离线编译时,获取 graph 的运行时状态;
这个 state_dict 可以用 flow.save 保存;
linear_g = LinearGraph() | ||
if with_share is True: | ||
linear_g.enable_shared() | ||
linear_g.load_runtime_state_dict(state_dict_list[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
做在线加载时,state_dict 用 flow.load() 从磁盘获取;然后 graph 加载运行时状态即可;
支持 共享编译后的 graph 和 variables
支持保存和加载运行时相关状态,实现离线编译。