Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Experiment] Buffer liveness tracking & deallocation #4381

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions dali/pipeline/executor/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <unordered_map>
#include <unordered_set>

#include "dali/core/call_at_exit.h"
#include "dali/pipeline/data/backend.h"
#include "dali/pipeline/executor/executor.h"
#include "dali/pipeline/executor/queue_metadata.h"
Expand Down Expand Up @@ -188,6 +189,9 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunCPUImpl(size_t iteration_id) {
for (int cpu_op_id = 0; cpu_op_id < graph_->NumOp(OpType::CPU) && !exec_error_; ++cpu_op_id) {
OpNode &op_node = graph_->Node(OpType::CPU, cpu_op_id);
decltype(auto) ws = ws_policy_.template GetWorkspace<OpType::CPU>(cpu_idxs, *graph_, cpu_op_id);
auto finally = AtScopeExit([&]{
AdjustLiveness(liveness_info_, ws);
});

int batch_size = InferBatchSizeFromInput(ws, stage_batch_size);
ws.SetBatchSizes(batch_size);
Expand Down Expand Up @@ -235,6 +239,9 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunMixedImpl(size_t iteration_id) {
try {
decltype(auto) ws = ws_policy_.template GetWorkspace<OpType::MIXED>(mixed_idxs, *graph_, i);

auto finally = AtScopeExit([&]{
AdjustLiveness(liveness_info_, ws);
});
int batch_size = InferBatchSizeFromInput(ws, stage_batch_size);
ws.SetBatchSizes(batch_size);

Expand Down Expand Up @@ -300,6 +307,9 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunGPUImpl(size_t iteration_id) {
OpNode &op_node = graph_->Node(OpType::GPU, i);
try {
decltype(auto) ws = ws_policy_.template GetWorkspace<OpType::GPU>(gpu_idxs, *graph_, i);
auto finally = AtScopeExit([&]{
AdjustLiveness(liveness_info_, ws);
});

int batch_size = InferBatchSizeFromInput(ws, stage_batch_size);
ws.SetBatchSizes(batch_size);
Expand Down
74 changes: 73 additions & 1 deletion dali/pipeline/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,73 @@ class DLL_PUBLIC ExecutorBase {
friend class ExecutorTest;
};


struct LivenessInfo {
int total_consumers{0};
std::atomic_int consumers_left{0};
};

inline void InitializeLivenessInfo(
std::map<const void *, LivenessInfo> &liveness,
const std::vector<tensor_data_store_queue_t> &tensor_to_store_queue) {
for (auto &t : tensor_to_store_queue) {
tuple_for_each(t, [&](auto &q) {
for (auto &ptr : q) {
if (ptr) {
liveness[ptr.get()].total_consumers = q.num_consumers;
}
}
});
}
}

template <typename Backend>
void ConsumeInput(std::map<const void *, LivenessInfo> &liveness, TensorList<Backend> *tl) {
LivenessInfo &L = liveness[tl];
if (!L.total_consumers) {
// std::cout << "Tensor list " << typeid(Backend).name()
// << " has no liveness info." << std::endl;
return;
}

int left = --L.consumers_left;
// std::cout << "Consuming tensor list " << typeid(Backend).name()
// << ": " << left << " consumers left out of " << L.total_consumers << endl;
if (left == 0) {
// std::cout << "Resetting tensor list: " << (void*)tl << std::endl;
tl->Reset();
}
}

inline void AdjustLiveness(std::map<const void *, LivenessInfo> &liveness, Workspace &ws) {
for (int i = 0; i < ws.NumOutput(); i++) {
const void *handle = 0;
if (ws.OutputIsType<CPUBackend>(i))
handle = &ws.Output<CPUBackend>(i);
else if (ws.OutputIsType<GPUBackend>(i))
handle = &ws.Output<GPUBackend>(i);
else
continue;
LivenessInfo &L = liveness[handle];
L.consumers_left = L.total_consumers;
}

for (int i = 0; i < ws.NumInput(); i++) {
if (ws.InputIsType<CPUBackend>(i)) {
ConsumeInput(liveness, &ws.UnsafeMutableInput<CPUBackend>(i));
} else if (ws.InputIsType<GPUBackend>(i)) {
ConsumeInput(liveness, &ws.UnsafeMutableInput<GPUBackend>(i));
}
}

ArgumentWorkspace &aws = ws;
for (auto &input : aws) {
ConsumeInput(liveness, input.second.tvec.get());
}
}



/**
* @brief Basic executor for dali graphs. This executor enables
* prefetching of results by maintaining two copies of output
Expand Down Expand Up @@ -385,6 +452,8 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy {
int checkpointing_epoch_size_ = 0;

private:
std::map<const void *, LivenessInfo> liveness_info_;

void RunHelper(OpNode &op_node, Workspace &ws, size_t iteration_id);

void RethrowError() const {
Expand Down Expand Up @@ -524,7 +593,10 @@ void Executor<WorkspacePolicy, QueuePolicy>::Build(OpGraph *graph, vector<string

// Create corresponding storage type for TensorNodes in graph
tensor_to_store_queue_ =
CreateBackingStorageForTensorNodes(*graph_, max_batch_size_, queue_sizes);
CreateBackingStorageForTensorNodes(*graph_, max_batch_size_, queue_sizes, output_names_);

InitializeLivenessInfo(liveness_info_, tensor_to_store_queue_);

// Setup stream and events that will be used for execution
if (device_id_ != CPU_ONLY_DEVICE_ID) {
DeviceGuard g(device_id_);
Expand Down
33 changes: 17 additions & 16 deletions dali/pipeline/executor/workspace_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ Workspace CreateWorkspace(
// template <typename QueuePolicy>
// struct WS_Policy {
// // Type trait describing how will the workspace be returned (usually by copy or by ref)
// template <OpType op_type>
// using ws_t = ...;
//
// // Initialize state of Workspace Storage
Expand All @@ -251,7 +250,7 @@ Workspace CreateWorkspace(
// * @return Corresponding workspace for that operator/OpNode
// */
// template <OpType op_type>
// ws_t<op_type> GetWorkspace(QueueIdxs idxs, const OpGraph &graph, OpPartitionId partition_idx);
// ws_t GetWorkspace(QueueIdxs idxs, const OpGraph &graph, OpPartitionId partition_idx);
//
// /**
// * @brief Get the Workspace for given `op_type` stage, when executing queue indexes `idx` part
Expand All @@ -264,7 +263,7 @@ Workspace CreateWorkspace(
// * @return Corresponding workspace for that operator/OpNode
// */
// template <OpType op_type>
// ws_t<op_type> GetWorkspace(QueueIdxs idxs, const OpGraph &graph, const OpNode &node);
// ws_t GetWorkspace(QueueIdxs idxs, const OpGraph &graph, const OpNode &node);
// };

/**
Expand All @@ -277,7 +276,6 @@ Workspace CreateWorkspace(
*/
template <typename QueuePolicy>
struct JIT_WS_Policy {
template <OpType op_type>
using ws_t = Workspace;

void InitializeWorkspaceStore(const OpGraph &graph, int device_id,
Expand All @@ -295,14 +293,14 @@ struct JIT_WS_Policy {
}

template <OpType op_type>
ws_t<op_type> GetWorkspace(QueueIdxs idxs, const OpGraph &graph, OpPartitionId partition_idx) {
ws_t GetWorkspace(QueueIdxs idxs, const OpGraph &graph, OpPartitionId partition_idx) {
return CreateWorkspace<op_type>(graph, graph.Node(op_type, partition_idx), device_id_,
tensor_to_store_queue_, thread_pool_, mixed_op_stream_,
gpu_op_stream_, mixed_op_events_, idxs);
}

template <OpType op_type>
ws_t<op_type> GetWorkspace(QueueIdxs idxs, const OpGraph &graph, const OpNode &node) {
ws_t GetWorkspace(QueueIdxs idxs, const OpGraph &graph, const OpNode &node) {
DALI_ENFORCE(node.op_type == op_type,
"Wrong variant of method selected. OpType does not match.");
return CreateWorkspace<op_type>(graph, node, device_id_, tensor_to_store_queue_, thread_pool_,
Expand Down Expand Up @@ -373,15 +371,16 @@ struct AOT_WS_Policy<SeparateQueuePolicy> {
for (auto &q : cpu_workspaces_)
for (auto &ws : q)
SetOrder(ws, AccessOrder::host());

for (auto &q : mixed_workspaces_)
for (auto &ws : q)
SetOrder(ws, AccessOrder::host());
SetOrder(ws, AccessOrder::host());

for (auto &q : gpu_workspaces_)
for (auto &ws : q)
SetOrder(ws, AccessOrder::host());
SetOrder(ws, AccessOrder::host());
}

template <OpType op_type>
using ws_t = Workspace &;

void InitializeWorkspaceStore(const OpGraph &graph, int device_id,
Expand Down Expand Up @@ -428,14 +427,14 @@ struct AOT_WS_Policy<SeparateQueuePolicy> {
}

template <OpType op_type>
ws_t<op_type> GetWorkspace(QueueIdxs idxs, const OpGraph &graph, OpPartitionId partition_idx) {
ws_t GetWorkspace(QueueIdxs idxs, const OpGraph &graph, OpPartitionId partition_idx) {
int sequential_ws_idx = SequentialIndex(idxs, depths_, op_type);
auto &workspaces = GetWorkspacesCollection<op_type>();
return workspaces[sequential_ws_idx][partition_idx];
}

template <OpType op_type>
ws_t<op_type> GetWorkspace(QueueIdxs idxs, const OpGraph &graph, const OpNode &node) {
ws_t GetWorkspace(QueueIdxs idxs, const OpGraph &graph, const OpNode &node) {
DALI_ENFORCE(node.op_type == op_type,
"Wrong variant of method selected. OpType does not match.");
return GetWorkspace<op_type>(idxs, graph, node.partition_index);
Expand Down Expand Up @@ -502,7 +501,6 @@ inline std::vector<std::vector<Workspace>>&
*/
template <>
struct AOT_WS_Policy<UniformQueuePolicy> {
template <OpType op_type>
using ws_t = Workspace &;

~AOT_WS_Policy() {
Expand Down Expand Up @@ -531,17 +529,19 @@ struct AOT_WS_Policy<UniformQueuePolicy> {
}

template <OpType op_type>
ws_t<op_type> GetWorkspace(QueueIdxs idxs, const OpGraph &graph, OpPartitionId partition_idx) {
ws_t GetWorkspace(QueueIdxs idxs, const OpGraph &graph, OpPartitionId partition_idx) {
auto &ws_vec = std::get<static_cast<size_t>(op_type)>(wss_[idxs[op_type]].op_data);
return ws_vec[partition_idx];
auto &ws = ws_vec[partition_idx];
return ws;
}

template <OpType op_type>
ws_t<op_type> GetWorkspace(QueueIdxs idxs, const OpGraph &graph, const OpNode &node) {
ws_t GetWorkspace(QueueIdxs idxs, const OpGraph &graph, const OpNode &node) {
DALI_ENFORCE(node.op_type == op_type,
"Wrong variant of method selected. OpType does not match.");
auto &ws_vec = std::get<static_cast<size_t>(op_type)>(wss_[idxs[op_type]].op_data);
return ws_vec[node.partition_index];
auto &ws = ws_vec[node.partition_index];
return ws;
}

private:
Expand Down Expand Up @@ -588,6 +588,7 @@ struct AOT_WS_Policy<UniformQueuePolicy> {
int queue_size_ = -1;
};


} // namespace dali

#endif // DALI_PIPELINE_EXECUTOR_WORKSPACE_POLICY_H_
16 changes: 15 additions & 1 deletion dali/pipeline/graph/op_graph_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,39 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <set>
#include <string>
#include <vector>

#include "dali/pipeline/graph/op_graph_storage.h"

namespace dali {

std::vector<tensor_data_store_queue_t> CreateBackingStorageForTensorNodes(
const OpGraph &op_graph, int batch_size, const std::vector<int> &queue_sizes) {
const OpGraph &op_graph, int batch_size, const std::vector<int> &queue_sizes,
const std::vector<std::string> &output_names) {
DALI_ENFORCE(static_cast<int>(queue_sizes.size()) == op_graph.NumTensor(),
"Data queue sizes undefined for some Tensor nodes.");
std::vector<tensor_data_store_queue_t> result;
result.resize(op_graph.NumTensor());

std::set<int64_t> outputs;
auto output_ids = op_graph.GetOutputs(output_names);;
outputs.insert(output_ids.begin(), output_ids.end());

// Assign data to each Tensor node in graph
for (int i = 0; i < op_graph.NumTensor(); i++) {
const auto &tensor = op_graph.Tensor(i);
auto producer_op_type = op_graph.Node(tensor.producer.node).op_type;
result[i] =
BatchFactory(producer_op_type, tensor.producer.storage_device, batch_size, queue_sizes[i]);

bool is_output = outputs.count(tensor.id) > 0;
tuple_for_each(result[i], [&](auto &x) {
x.num_consumers = tensor.consumers.size();
if (is_output)
x.num_consumers++;
});
}
return result;
}
Expand Down
4 changes: 3 additions & 1 deletion dali/pipeline/graph/op_graph_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef DALI_PIPELINE_GRAPH_OP_GRAPH_STORAGE_H_
#define DALI_PIPELINE_GRAPH_OP_GRAPH_STORAGE_H_

#include <string>
#include <vector>

#include "dali/pipeline/graph/op_graph.h"
Expand All @@ -28,7 +29,8 @@ namespace dali {
using MixedOpEventMap = std::vector<std::vector<cudaEvent_t>>;

DLL_PUBLIC std::vector<tensor_data_store_queue_t> CreateBackingStorageForTensorNodes(
const OpGraph& op_graph, int batch_size, const std::vector<int>& queue_sizes);
const OpGraph& op_graph, int batch_size, const std::vector<int>& queue_sizes,
const std::vector<std::string> &output_names);

// Mapping from MixedOp partition id to queue of corresponding events
DLL_PUBLIC MixedOpEventMap CreateEventsForMixedOps(EventPool& event_pool, const OpGraph& op_graph,
Expand Down
37 changes: 37 additions & 0 deletions dali/pipeline/workspace/workspace_data_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,41 @@

namespace dali {


template <template <typename...> class TupleLike, typename... Ts, typename F>
void tuple_for_each_impl(TupleLike<Ts...> &t, F &&fun,
std::integral_constant<int, sizeof...(Ts)>) {}

template <template <typename...> class TupleLike, typename... Ts, typename F, int N>
std::enable_if_t<N != sizeof...(Ts)>
tuple_for_each_impl(TupleLike<Ts...> &t, F &&fun, std::integral_constant<int, N>) {
fun(std::get<N>(t));
tuple_for_each_impl(t, std::forward<F>(fun), std::integral_constant<int, N+1>());
}

template <template <typename...> class TupleLike, typename... Ts, typename F>
void tuple_for_each(TupleLike<Ts...> &t, F &&fun) {
tuple_for_each_impl(t, std::forward<F>(fun), std::integral_constant<int, 0>());
}


template <template <typename...> class TupleLike, typename... Ts, typename F>
void tuple_for_each_impl(const TupleLike<Ts...> &t, F &&fun,
std::integral_constant<int, sizeof...(Ts)>) {}

template <template <typename...> class TupleLike, typename... Ts, typename F, int N>
std::enable_if_t<N != sizeof...(Ts)>
tuple_for_each_impl(const TupleLike<Ts...> &t, F &&fun, std::integral_constant<int, N>) {
fun(std::get<N>(t));
tuple_for_each_impl(t, std::forward<F>(fun), std::integral_constant<int, N+1>());
}

template <template <typename...> class TupleLike, typename... Ts, typename F>
void tuple_for_each(const TupleLike<Ts...> &t, F &&fun) {
tuple_for_each_impl(t, std::forward<F>(fun), std::integral_constant<int, 0>());
}


/*
* Mappings from OpType, StorageDevice to index in tensor_data_store_queue_t
*/
Expand Down Expand Up @@ -113,6 +148,8 @@ struct StoreBufferQueue {
size_t size() const {
return store.size();
}

int num_consumers = 0;
};

// Generator for Queue of Worskpace Output Type, indexed by GetTensorStoreIndex()
Expand Down