Skip to content

Commit

Permalink
Track liveness and delete dead buffers.
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Zientkiewicz <michalz@nvidia.com>
  • Loading branch information
mzient committed Oct 3, 2023
1 parent c2d2bd4 commit 2c3c120
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 78 deletions.
8 changes: 7 additions & 1 deletion dali/pipeline/executor/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunCPUImpl(size_t iteration_id) {
OpNode &op_node = graph_->Node(OpType::CPU, cpu_op_id);
decltype(auto) ws = ws_policy_.template GetWorkspace<OpType::CPU>(cpu_idxs, *graph_, cpu_op_id);
auto finally = AtScopeExit([&]{
ws_policy_.WorkspaceUsed(ws);
AdjustLiveness(liveness_info_, ws);
});

int batch_size = InferBatchSizeFromInput(ws, stage_batch_size);
Expand Down Expand Up @@ -239,6 +239,9 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunMixedImpl(size_t iteration_id) {
try {
decltype(auto) ws = ws_policy_.template GetWorkspace<OpType::MIXED>(mixed_idxs, *graph_, i);

auto finally = AtScopeExit([&]{
AdjustLiveness(liveness_info_, ws);
});
int batch_size = InferBatchSizeFromInput(ws, stage_batch_size);
ws.SetBatchSizes(batch_size);

Expand Down Expand Up @@ -304,6 +307,9 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunGPUImpl(size_t iteration_id) {
OpNode &op_node = graph_->Node(OpType::GPU, i);
try {
decltype(auto) ws = ws_policy_.template GetWorkspace<OpType::GPU>(gpu_idxs, *graph_, i);
auto finally = AtScopeExit([&]{
AdjustLiveness(liveness_info_, ws);
});

int batch_size = InferBatchSizeFromInput(ws, stage_batch_size);
ws.SetBatchSizes(batch_size);
Expand Down
72 changes: 72 additions & 0 deletions dali/pipeline/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,73 @@ class DLL_PUBLIC ExecutorBase {
friend class ExecutorTest;
};


struct LivenessInfo {
int total_consumers{0};
std::atomic_int consumers_left{0};
};

inline void InitializeLivenessInfo(
std::map<const void *, LivenessInfo> &liveness,
const std::vector<tensor_data_store_queue_t> &tensor_to_store_queue) {
for (auto &t : tensor_to_store_queue) {
tuple_for_each(t, [&](auto &q) {
for (auto &ptr : q) {
if (ptr) {
liveness[ptr.get()].total_consumers = q.num_consumers;
}
}
});
}
}

template <typename Backend>
void ConsumeInput(std::map<const void *, LivenessInfo> &liveness, TensorList<Backend> *tl) {
LivenessInfo &L = liveness[tl];
if (!L.total_consumers) {
// std::cout << "Tensor list " << typeid(Backend).name()
// << " has no liveness info." << std::endl;
return;
}

int left = --L.consumers_left;
// std::cout << "Consuming tensor list " << typeid(Backend).name()
// << ": " << left << " consumers left out of " << L.total_consumers << endl;
if (left == 0) {
// std::cout << "Resetting tensor list: " << (void*)tl << std::endl;
tl->Reset();
}
}

inline void AdjustLiveness(std::map<const void *, LivenessInfo> &liveness, Workspace &ws) {
for (int i = 0; i < ws.NumOutput(); i++) {
const void *handle = 0;
if (ws.OutputIsType<CPUBackend>(i))
handle = &ws.Output<CPUBackend>(i);
else if (ws.OutputIsType<GPUBackend>(i))
handle = &ws.Output<GPUBackend>(i);
else
continue;
LivenessInfo &L = liveness[handle];
L.consumers_left = L.total_consumers;
}

for (int i = 0; i < ws.NumInput(); i++) {
if (ws.InputIsType<CPUBackend>(i)) {
ConsumeInput(liveness, &ws.UnsafeMutableInput<CPUBackend>(i));
} else if (ws.InputIsType<GPUBackend>(i)) {
ConsumeInput(liveness, &ws.UnsafeMutableInput<GPUBackend>(i));
}
}

ArgumentWorkspace &aws = ws;
for (auto &input : aws) {
ConsumeInput(liveness, input.second.tvec.get());
}
}



/**
* @brief Basic executor for dali graphs. This executor enables
* prefetching of results by maintaining two copies of output
Expand Down Expand Up @@ -383,6 +450,8 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy {
int checkpointing_epoch_size_;

private:
std::map<const void *, LivenessInfo> liveness_info_;

void RunHelper(OpNode &op_node, Workspace &ws, size_t iteration_id);

void RethrowError() const {
Expand Down Expand Up @@ -512,6 +581,9 @@ void Executor<WorkspacePolicy, QueuePolicy>::Build(OpGraph *graph, vector<string
// Create corresponding storage type for TensorNodes in graph
tensor_to_store_queue_ =
CreateBackingStorageForTensorNodes(*graph_, max_batch_size_, queue_sizes);

InitializeLivenessInfo(liveness_info_, tensor_to_store_queue_);

// Setup stream and events that will be used for execution
if (device_id_ != CPU_ONLY_DEVICE_ID) {
DeviceGuard g(device_id_);
Expand Down
78 changes: 1 addition & 77 deletions dali/pipeline/executor/workspace_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,6 @@ struct JIT_WS_Policy {
mixed_op_stream_, gpu_op_stream_, mixed_op_events_, idxs);
}

void WorkspaceUsed(Workspace &ws) {}

private:
// TODO(klecki): should consider if storing copy of backing storage is good idea
int device_id_;
Expand Down Expand Up @@ -360,56 +358,6 @@ inline void SetOrder(Workspace &ws, AccessOrder order) {
}
}

struct LivenessInfo {
int total_conumers{0};
std::atomic_int consumers_left{0};
};

inline void InitializeLivenessInfo(std::map<const void *, LivenessInfo> &liveness, Workspace &ws) {
for (int i = 0; i < ws.NumInput(); i++) {
const void *handle = 0;
if (ws.InputIsType<CPUBackend>(i))
handle = &ws.Input<CPUBackend>(i);
else if (ws.InputIsType<GPUBackend>(i))
handle = &ws.Input<GPUBackend>(i);
else
continue;
liveness[handle].total_conumers++;
}
}

template <typename Backend>
void ConsumeInput(std::map<const void *, LivenessInfo> &liveness, TensorList<Backend> *tl) {
LivenessInfo &L = liveness[tl];
if (--L.consumers_left == 0) {
fprintf(stderr, "Resetting tensor list: %p\n", tl);
tl->Reset();
}
}

inline void AdjustLiveness(std::map<const void *, LivenessInfo> &liveness, Workspace &ws) {
for (int i = 0; i < ws.NumOutput(); i++) {
const void *handle = 0;
if (ws.OutputIsType<CPUBackend>(i))
handle = &ws.Output<CPUBackend>(i);
else if (ws.OutputIsType<GPUBackend>(i))
handle = &ws.Output<GPUBackend>(i);
else
continue;
LivenessInfo &L = liveness[handle];
L.consumers_left = L.total_conumers;
}

for (int i = 0; i < ws.NumInput(); i++) {
if (ws.InputIsType<CPUBackend>(i)) {
ConsumeInput(liveness, &ws.UnsafeMutableInput<CPUBackend>(i));
} else if (ws.InputIsType<GPUBackend>(i)) {
ConsumeInput(liveness, &ws.UnsafeMutableInput<CPUBackend>(i));
}
}
}


/**
* @brief Ahead Of Time Workspace Policy for Separated Executor
*
Expand Down Expand Up @@ -476,7 +424,6 @@ struct AOT_WS_Policy<SeparateQueuePolicy> {
}
}
}

}

template <OpType op_type>
Expand All @@ -493,9 +440,6 @@ struct AOT_WS_Policy<SeparateQueuePolicy> {
return GetWorkspace<op_type>(idxs, graph, node.partition_index);
}

void WorkspaceUsed(Workspace &ws) {
}

private:
StageQueues depths_;
// ws_id -> op_id -> workspace
Expand Down Expand Up @@ -559,8 +503,6 @@ template <>
struct AOT_WS_Policy<UniformQueuePolicy> {
using ws_t = Workspace &;

bool auto_liveness = true;

~AOT_WS_Policy() {
for (auto &wss : wss_) {
for (auto &ws : std::get<static_cast<int>(OpType::CPU)>(wss.op_data))
Expand All @@ -584,19 +526,6 @@ struct AOT_WS_Policy<UniformQueuePolicy> {
PrepareWSB(i, graph, device_id, tensor_to_store_queue, thread_pool, mixed_op_stream,
gpu_op_stream, mixed_op_events);
}

InitializeLiveness();
}

void InitializeLiveness() {
for (auto &wss : wss_) {
for (auto &ws : std::get<static_cast<int>(OpType::CPU)>(wss.op_data))
InitializeLivenessInfo(liveness_info_, ws);
for (auto &ws : std::get<static_cast<int>(OpType::MIXED)>(wss.op_data))
InitializeLivenessInfo(liveness_info_, ws);
for (auto &ws : std::get<static_cast<int>(OpType::GPU)>(wss.op_data))
InitializeLivenessInfo(liveness_info_, ws);
}
}

template <OpType op_type>
Expand All @@ -615,11 +544,6 @@ struct AOT_WS_Policy<UniformQueuePolicy> {
return ws;
}

void WorkspaceUsed(Workspace &ws) {
if (auto_liveness)
AdjustLiveness(liveness_info_, ws);
}

private:
void PrepareWSB(
int queue_idx, const OpGraph &graph, int device_id,
Expand Down Expand Up @@ -662,9 +586,9 @@ struct AOT_WS_Policy<UniformQueuePolicy> {
};
vector<WorkspaceBlob> wss_;
int queue_size_ = -1;
std::map<const void *, LivenessInfo> liveness_info_;
};


} // namespace dali

#endif // DALI_PIPELINE_EXECUTOR_WORKSPACE_POLICY_H_
4 changes: 4 additions & 0 deletions dali/pipeline/graph/op_graph_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ std::vector<tensor_data_store_queue_t> CreateBackingStorageForTensorNodes(
auto producer_op_type = op_graph.Node(tensor.producer.node).op_type;
result[i] =
BatchFactory(producer_op_type, tensor.producer.storage_device, batch_size, queue_sizes[i]);

tuple_for_each(result[i], [&](auto &x) {
x.num_consumers = tensor.consumers.size();
});
}
return result;
}
Expand Down
37 changes: 37 additions & 0 deletions dali/pipeline/workspace/workspace_data_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,41 @@

namespace dali {


template <template <typename...> class TupleLike, typename... Ts, typename F>
void tuple_for_each_impl(TupleLike<Ts...> &t, F &&fun,
std::integral_constant<int, sizeof...(Ts)>) {}

template <template <typename...> class TupleLike, typename... Ts, typename F, int N>
std::enable_if_t<N != sizeof...(Ts)>
tuple_for_each_impl(TupleLike<Ts...> &t, F &&fun, std::integral_constant<int, N>) {
fun(std::get<N>(t));
tuple_for_each_impl(t, std::forward<F>(fun), std::integral_constant<int, N+1>());
}

template <template <typename...> class TupleLike, typename... Ts, typename F>
void tuple_for_each(TupleLike<Ts...> &t, F &&fun) {
tuple_for_each_impl(t, std::forward<F>(fun), std::integral_constant<int, 0>());
}


template <template <typename...> class TupleLike, typename... Ts, typename F>
void tuple_for_each_impl(const TupleLike<Ts...> &t, F &&fun,
std::integral_constant<int, sizeof...(Ts)>) {}

template <template <typename...> class TupleLike, typename... Ts, typename F, int N>
std::enable_if_t<N != sizeof...(Ts)>
tuple_for_each_impl(const TupleLike<Ts...> &t, F &&fun, std::integral_constant<int, N>) {
fun(std::get<N>(t));
tuple_for_each_impl(t, std::forward<F>(fun), std::integral_constant<int, N+1>());
}

template <template <typename...> class TupleLike, typename... Ts, typename F>
void tuple_for_each(const TupleLike<Ts...> &t, F &&fun) {
tuple_for_each_impl(t, std::forward<F>(fun), std::integral_constant<int, 0>());
}


/*
* Mappings from OpType, StorageDevice to index in tensor_data_store_queue_t
*/
Expand Down Expand Up @@ -113,6 +148,8 @@ struct StoreBufferQueue {
size_t size() const {
return store.size();
}

int num_consumers = 0;
};

// Generator for Queue of Worskpace Output Type, indexed by GetTensorStoreIndex()
Expand Down

0 comments on commit 2c3c120

Please sign in to comment.