Skip to content

Commit

Permalink
replace using IDMgr interface
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves-zwx committed May 31, 2021
1 parent 0868a61 commit 98b5db9
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 105 deletions.
190 changes: 94 additions & 96 deletions oneflow/core/graph/boxing/slice_boxing_sub_task_graph_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,37 +160,38 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
UNIMPLEMENTED();
}
dst_node->Init(lbi, dst_slice, kSliceBoxingTaskModeCopy, src_node->machine_id(), thrd_id,
Global<IDMgr>::Get()->CpuMemZoneId());
EncodeMemZoneIdToInt64(kCPUMemZoneId));
dst_node->ConnectToSrcNodeWithSlice(src_node, NewEdge(), src_slice);
return dst_node;
};
const auto BuildSubTaskGphS2B =
[&ctx, &CreateBoxingNode121, &NewEdge, &lbi](
const ParallelDesc& in_pd, const ParallelDesc& out_pd, const SbpParallel& in_sbp,
const SbpParallel& out_sbp, const BlobDesc& blob_desc,
const std::vector<TaskNode*>& in_nodes, std::vector<TaskNode*>* out_nodes) {
CHECK(SubTskGphBuilderUtil::IsBoxingS2B(in_sbp, out_sbp));
const std::vector<TensorSliceView> in_slices =
GetTensorSliceView(in_pd.parallel_num(), in_sbp, blob_desc);
CHECK(!ContainsEmptySlice(in_slices));
const TensorSliceView out_slice = GetBroadcastTensorSliceView(blob_desc);
FOR_RANGE(int64_t, out_id, 0, out_pd.parallel_num()) {
SliceBoxingTaskNode* out_node =
CreateBoxingNode121(out_pd, out_id, out_slice, kSliceBoxingTaskModeCopy);
FOR_RANGE(int64_t, in_id, 0, in_pd.parallel_num()) {
const TensorSliceView& in_slice = in_slices.at(in_id);
TaskNode* in_node = in_nodes.at(in_id);
if (SubTskGphBuilderUtil::IsOnSameGPU(in_node, out_node)) {
out_node->ConnectToSrcNodeWithSlice(in_node, NewEdge(), in_slice);
} else {
TaskNode* proxy_node = ctx->task_graph()->GetProxyNode(
in_node, lbi, out_node->machine_id(), Global<IDMgr>::Get()->CpuMemZoneId());
out_node->ConnectToSrcNodeWithSlice(proxy_node, NewEdge(), in_slice);
}
}
out_nodes->push_back(out_node);
const auto BuildSubTaskGphS2B = [&ctx, &CreateBoxingNode121, &NewEdge, &lbi](
const ParallelDesc& in_pd, const ParallelDesc& out_pd,
const SbpParallel& in_sbp, const SbpParallel& out_sbp,
const BlobDesc& blob_desc,
const std::vector<TaskNode*>& in_nodes,
std::vector<TaskNode*>* out_nodes) {
CHECK(SubTskGphBuilderUtil::IsBoxingS2B(in_sbp, out_sbp));
const std::vector<TensorSliceView> in_slices =
GetTensorSliceView(in_pd.parallel_num(), in_sbp, blob_desc);
CHECK(!ContainsEmptySlice(in_slices));
const TensorSliceView out_slice = GetBroadcastTensorSliceView(blob_desc);
FOR_RANGE(int64_t, out_id, 0, out_pd.parallel_num()) {
SliceBoxingTaskNode* out_node =
CreateBoxingNode121(out_pd, out_id, out_slice, kSliceBoxingTaskModeCopy);
FOR_RANGE(int64_t, in_id, 0, in_pd.parallel_num()) {
const TensorSliceView& in_slice = in_slices.at(in_id);
TaskNode* in_node = in_nodes.at(in_id);
if (SubTskGphBuilderUtil::IsOnSameGPU(in_node, out_node)) {
out_node->ConnectToSrcNodeWithSlice(in_node, NewEdge(), in_slice);
} else {
TaskNode* proxy_node =
ctx->task_graph()->GetProxyNode(in_node, lbi, out_node->machine_id(), kCPUMemZoneId);
out_node->ConnectToSrcNodeWithSlice(proxy_node, NewEdge(), in_slice);
}
};
}
out_nodes->push_back(out_node);
}
};
const auto BuildSubTaskGphS2S = [&ctx, &lbi, &CreateBoxingNode121, &CreateBoxingNodeToHost,
&GetBoxingGpuThrdId,
&NewEdge](const ParallelDesc& in_pd, const ParallelDesc& out_pd,
Expand Down Expand Up @@ -283,82 +284,80 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
#endif
}
local_concat_node->Init(lbi, concat_slice, kSliceBoxingTaskModeCopy, in_machine_id,
local_concat_thrd_id, Global<IDMgr>::Get()->CpuMemZoneId());
local_concat_thrd_id, EncodeMemZoneIdToInt64(kCPUMemZoneId));
for (const int64_t in_id : in_parallel_ids) {
if (!in_id2intersection.at(in_id).IsEmpty()) {
local_concat_node->ConnectToSrcNodeWithSlice(in_nodes.at(in_id), NewEdge(),
in_slices.at(in_id));
}
}
TaskNode* local_add_proxy_node = ctx->task_graph()->GetProxyNode(
local_concat_node, lbi, out_node->machine_id(), Global<IDMgr>::Get()->CpuMemZoneId());
local_concat_node, lbi, out_node->machine_id(), kCPUMemZoneId);
out_node->ConnectToSrcNodeWithSlice(local_add_proxy_node, NewEdge(), concat_slice);
}
}
out_nodes->push_back(out_node);
}
};
const auto BuildSubTaskGphP2S = [&ctx, &lbi, &CreateBoxingNode121, &CreateBoxingNodeToHost,
&GetBoxingGpuThrdId,
&NewEdge](const ParallelDesc& in_pd, const ParallelDesc& out_pd,
const SbpParallel& in_sbp, const SbpParallel& out_sbp,
const BlobDesc& blob_desc,
const std::vector<TaskNode*>& in_nodes,
std::vector<TaskNode*>* out_nodes) {
CHECK(SubTskGphBuilderUtil::IsBoxingP2S(in_sbp, out_sbp));
const TensorSliceView in_slice = GetBroadcastTensorSliceView(blob_desc);
const std::vector<TensorSliceView> out_slices =
GetTensorSliceView(out_pd.parallel_num(), out_sbp, blob_desc);
CHECK(!ContainsEmptySlice(out_slices));
HashMap<int64_t, std::vector<int64_t>> machine_id2in_parallel_ids;
GroupParallelIdByMachine(in_pd, &machine_id2in_parallel_ids);
FOR_RANGE(int64_t, out_id, 0, out_pd.parallel_num()) {
const TensorSliceView& out_slice = out_slices.at(out_id);
SliceBoxingTaskNode* out_node =
CreateBoxingNode121(out_pd, out_id, out_slice, kSliceBoxingTaskModeAdd);
for (const auto& pair : machine_id2in_parallel_ids) {
const int64_t in_machine_id = pair.first;
const std::vector<int64_t>& in_parallel_ids = pair.second;
if (out_node->machine_id() == in_machine_id) {
for (const int64_t in_id : in_parallel_ids) {
TaskNode* in_node = in_nodes.at(in_id);
if (SubTskGphBuilderUtil::IsOnSameGPU(in_node, out_node)) {
out_node->ConnectToSrcNodeWithSlice(in_node, NewEdge(), in_slice);
} else if (in_pd.device_type() == DeviceType::kGPU) {
SliceBoxingTaskNode* copy_to_host =
CreateBoxingNodeToHost(in_node, in_slice, out_slice);
out_node->ConnectToSrcNodeWithSlice(copy_to_host, NewEdge(), out_slice);
const auto BuildSubTaskGphP2S =
[&ctx, &lbi, &CreateBoxingNode121, &CreateBoxingNodeToHost, &GetBoxingGpuThrdId, &NewEdge](
const ParallelDesc& in_pd, const ParallelDesc& out_pd, const SbpParallel& in_sbp,
const SbpParallel& out_sbp, const BlobDesc& blob_desc,
const std::vector<TaskNode*>& in_nodes, std::vector<TaskNode*>* out_nodes) {
CHECK(SubTskGphBuilderUtil::IsBoxingP2S(in_sbp, out_sbp));
const TensorSliceView in_slice = GetBroadcastTensorSliceView(blob_desc);
const std::vector<TensorSliceView> out_slices =
GetTensorSliceView(out_pd.parallel_num(), out_sbp, blob_desc);
CHECK(!ContainsEmptySlice(out_slices));
HashMap<int64_t, std::vector<int64_t>> machine_id2in_parallel_ids;
GroupParallelIdByMachine(in_pd, &machine_id2in_parallel_ids);
FOR_RANGE(int64_t, out_id, 0, out_pd.parallel_num()) {
const TensorSliceView& out_slice = out_slices.at(out_id);
SliceBoxingTaskNode* out_node =
CreateBoxingNode121(out_pd, out_id, out_slice, kSliceBoxingTaskModeAdd);
for (const auto& pair : machine_id2in_parallel_ids) {
const int64_t in_machine_id = pair.first;
const std::vector<int64_t>& in_parallel_ids = pair.second;
if (out_node->machine_id() == in_machine_id) {
for (const int64_t in_id : in_parallel_ids) {
TaskNode* in_node = in_nodes.at(in_id);
if (SubTskGphBuilderUtil::IsOnSameGPU(in_node, out_node)) {
out_node->ConnectToSrcNodeWithSlice(in_node, NewEdge(), in_slice);
} else if (in_pd.device_type() == DeviceType::kGPU) {
SliceBoxingTaskNode* copy_to_host =
CreateBoxingNodeToHost(in_node, in_slice, out_slice);
out_node->ConnectToSrcNodeWithSlice(copy_to_host, NewEdge(), out_slice);
} else {
out_node->ConnectToSrcNodeWithSlice(in_node, NewEdge(), in_slice);
}
}
} else {
out_node->ConnectToSrcNodeWithSlice(in_node, NewEdge(), in_slice);
}
}
} else {
auto* local_add_node = ctx->task_graph()->NewNode<SliceBoxingTaskNode>();
int64_t local_add_thrd_id = -1;
if (in_pd.device_type() == DeviceType::kCPU) {
local_add_thrd_id = Global<IDMgr>::Get()->PickCpuThrdIdEvenly(in_machine_id);
} else if (in_pd.device_type() == DeviceType::kGPU) {
auto* local_add_node = ctx->task_graph()->NewNode<SliceBoxingTaskNode>();
int64_t local_add_thrd_id = -1;
if (in_pd.device_type() == DeviceType::kCPU) {
local_add_thrd_id = Global<IDMgr>::Get()->PickCpuThrdIdEvenly(in_machine_id);
} else if (in_pd.device_type() == DeviceType::kGPU) {
#ifdef WITH_CUDA
TaskNode* node = in_nodes.at(in_parallel_ids.at(out_id % in_parallel_ids.size()));
local_add_thrd_id =
GetBoxingGpuThrdId(node->machine_id(), node->GpuPhyId(), CudaWorkType::kCopyD2H);
TaskNode* node = in_nodes.at(in_parallel_ids.at(out_id % in_parallel_ids.size()));
local_add_thrd_id = GetBoxingGpuThrdId(node->machine_id(), node->GpuPhyId(),
CudaWorkType::kCopyD2H);
#else
UNIMPLEMENTED();
UNIMPLEMENTED();
#endif
}
local_add_node->Init(lbi, out_slice, kSliceBoxingTaskModeAdd, in_machine_id,
local_add_thrd_id, EncodeMemZoneIdToInt64(kCPUMemZoneId));
for (const int64_t in_id : in_parallel_ids) {
local_add_node->ConnectToSrcNodeWithSlice(in_nodes.at(in_id), NewEdge(), in_slice);
}
TaskNode* local_add_proxy_node = ctx->task_graph()->GetProxyNode(
local_add_node, lbi, out_node->machine_id(), kCPUMemZoneId);
out_node->ConnectToSrcNodeWithSlice(local_add_proxy_node, NewEdge(), out_slice);
}
}
local_add_node->Init(lbi, out_slice, kSliceBoxingTaskModeAdd, in_machine_id,
local_add_thrd_id, Global<IDMgr>::Get()->CpuMemZoneId());
for (const int64_t in_id : in_parallel_ids) {
local_add_node->ConnectToSrcNodeWithSlice(in_nodes.at(in_id), NewEdge(), in_slice);
}
TaskNode* local_add_proxy_node = ctx->task_graph()->GetProxyNode(
local_add_node, lbi, out_node->machine_id(), Global<IDMgr>::Get()->CpuMemZoneId());
out_node->ConnectToSrcNodeWithSlice(local_add_proxy_node, NewEdge(), out_slice);
out_nodes->push_back(out_node);
}
}
out_nodes->push_back(out_node);
}
};
};

const auto BuildSubTaskGphP2B = [&ctx, &lbi, &GetBoxingGpuThrdId, &NewEdge](
const ParallelDesc& in_pd, const ParallelDesc& out_pd,
Expand Down Expand Up @@ -406,33 +405,32 @@ Maybe<SubTskGphBuilderStatus> SliceBoxingSubTskGphBuilder::Build(
const int64_t out_machine_id = machine_id7out_parallel_ids.first;
TaskNode* in_box_node = nullptr;
if (out_box_nodes.size() == 1) {
in_box_node = ctx->task_graph()->GetProxyNode(out_box_nodes.front(), lbi,
machine_id7out_parallel_ids.first,
Global<IDMgr>::Get()->CpuMemZoneId());
in_box_node = ctx->task_graph()->GetProxyNode(
out_box_nodes.front(), lbi, machine_id7out_parallel_ids.first, kCPUMemZoneId);
} else {
auto* add_node = ctx->task_graph()->NewNode<SliceBoxingTaskNode>();
add_node->Init(lbi, slice, kSliceBoxingTaskModeAdd, machine_id7out_parallel_ids.first,
Global<IDMgr>::Get()->PickCpuThrdIdEvenly(machine_id7out_parallel_ids.first),
Global<IDMgr>::Get()->CpuMemZoneId());
EncodeMemZoneIdToInt64(kCPUMemZoneId));
for (TaskNode* out_box_node : out_box_nodes) {
TaskNode* out_boxing_node_proxy = ctx->task_graph()->GetProxyNode(
out_box_node, lbi, out_machine_id, Global<IDMgr>::Get()->CpuMemZoneId());
TaskNode* out_boxing_node_proxy =
ctx->task_graph()->GetProxyNode(out_box_node, lbi, out_machine_id, kCPUMemZoneId);
add_node->ConnectToSrcNodeWithSlice(out_boxing_node_proxy, NewEdge(), slice);
}
in_box_node = add_node;
}
for (const int64_t out_id : machine_id7out_parallel_ids.second) {
int64_t mem_zone_id;
if (out_pd.device_type() == DeviceType::kCPU) {
mem_zone_id = Global<IDMgr>::Get()->CpuMemZoneId();
(*out_nodes)[out_id] =
ctx->task_graph()->GetProxyNode(in_box_node, lbi, out_machine_id, kCPUMemZoneId);
} else if (out_pd.device_type() == DeviceType::kGPU) {
mem_zone_id =
Global<IDMgr>::Get()->GpuMemZoneId(CHECK_JUST(out_pd.DeviceId4ParallelId(out_id)));
int64_t dev_id = CHECK_JUST(out_pd.DeviceId4ParallelId(out_id));
(*out_nodes)[out_id] = ctx->task_graph()->GetProxyNode(
in_box_node, lbi, out_machine_id,
MemZoneId{DeviceType::kGPU, static_cast<MemZoneId::device_index_t>(dev_id)});
} else {
UNIMPLEMENTED();
}
(*out_nodes)[out_id] =
ctx->task_graph()->GetProxyNode(in_box_node, lbi, out_machine_id, mem_zone_id);
}
}
};
Expand Down
13 changes: 7 additions & 6 deletions oneflow/core/graph/slice_boxing_task_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,15 @@ OperatorConf SliceBoxingTaskNode::GetBoxingOpConf() {
}

void SliceBoxingTaskNode::InitProducedRegstMemCase(MemoryCase* mem_case) {
if (Global<IDMgr>::Get()->IsCpuMemZone(mem_zone_id_)) {
auto mem_zone_id = DecodeMemZoneIdFromInt64(mem_zone_id_);
if (mem_zone_id.device_type() == DeviceType::kCPU) {
HostMemory* host_mem = mem_case->mutable_host_mem();
if (device_type() == DeviceType::kGPU) {
host_mem->mutable_cuda_pinned_mem()->set_device_id(GpuPhyId());
StreamId stream_id = DeserializeStreamIdFromInt64(thrd_id());
if (stream_id.device_id().device_type() == DeviceType::kGPU) {
host_mem->mutable_cuda_pinned_mem()->set_device_id(stream_id.device_id().device_index());
}
} else if (Global<IDMgr>::Get()->IsGpuMemZone(mem_zone_id_)) {
mem_case->mutable_device_cuda_mem()->set_device_id(
Global<IDMgr>::Get()->GetGpuPhyIdFromMemZoneId(mem_zone_id_));
} else if (mem_zone_id.device_type() == DeviceType::kGPU) {
mem_case->mutable_device_cuda_mem()->set_device_id(mem_zone_id.device_index());
} else {
UNIMPLEMENTED();
}
Expand Down
3 changes: 0 additions & 3 deletions oneflow/core/graph/task_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ class TaskGraph final : public Graph<TaskNode, TaskEdge> {
void EnableInplaceMemSharing(const std::function<bool(const std::string&, const std::string&)>&
IsOpNameDataOrCtrlReachable);

TaskNode* GetProxyNode(TaskNode* src_node, const LogicalBlobId& lbi, int64_t dst_machine_id,
int64_t dst_mem_zone_id);

TaskNode* GetProxyNode(TaskNode* src_node, const LogicalBlobId& lbi,
int64_t dst_machine_id, const MemZoneId& dst_mem_zone_id);

Expand Down

0 comments on commit 98b5db9

Please sign in to comment.