Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dump operator stats #2039

Merged
merged 6 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion dali/c_api/c_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ void daliCreatePipeline(daliPipelineHandle *pipe_handle,
int separated_execution,
int prefetch_queue_depth,
int cpu_prefetch_queue_depth,
int gpu_prefetch_queue_depth) {
int gpu_prefetch_queue_depth,
int enable_memory_stats) {
bool se = separated_execution != 0;
auto pipeline = std::make_unique<dali::Pipeline>(std::string(serialized_pipeline, length),
batch_size, num_threads, device_id, true,
Expand All @@ -118,6 +119,7 @@ void daliCreatePipeline(daliPipelineHandle *pipe_handle,
if (se) {
pipeline->SetQueueSizes(cpu_prefetch_queue_depth, gpu_prefetch_queue_depth);
}
pipeline->EnableExecutorMemoryStats(enable_memory_stats);
pipeline->Build();
auto ws = std::make_unique<dali::DeviceWorkspace>();
auto stream = dali::CUDAStream::Create(true);
Expand Down Expand Up @@ -441,3 +443,48 @@ void daliGetReaderMetadata(daliPipelineHandle* pipe_handle, const char *reader_n
meta->pad_last_batch = returned_meta.pad_last_batch;
meta->stick_to_shard = returned_meta.stick_to_shard;
}

void daliGetExecutorMetadata(daliPipelineHandle* pipe_handle, daliExecutorMetadata **operator_meta,
size_t *operator_meta_num) {
dali::Pipeline* pipeline = reinterpret_cast<dali::Pipeline*>(pipe_handle->pipe);
auto returned_meta = pipeline->GetExecutorMeta();
*operator_meta_num = returned_meta.size();
*operator_meta = static_cast<daliExecutorMetadata*>(malloc(sizeof(daliExecutorMetadata) *
returned_meta.size()));

int i = 0;
for (const auto &stat : returned_meta) {
auto op_name_size = stat.first.size();
auto &op_meta = (*operator_meta)[i];
op_meta.operator_name = static_cast<char*>(malloc(sizeof(char) * (op_name_size + 1)));
stat.first.copy(op_meta.operator_name, op_name_size);
op_meta.operator_name[op_name_size] = '\0';

auto num_outputs = stat.second.size();
op_meta.out_num = num_outputs;
op_meta.real_size = static_cast<size_t*>(malloc(sizeof(size_t) * num_outputs));
op_meta.max_real_size = static_cast<size_t*>(malloc(sizeof(size_t) * num_outputs));
op_meta.reserved = static_cast<size_t*>(malloc(sizeof(size_t) * num_outputs));
op_meta.max_reserved = static_cast<size_t*>(malloc(sizeof(size_t) * num_outputs));

for (size_t j = 0; j < num_outputs; ++j) {
const auto &entry = stat.second[j];
op_meta.real_size[j] = entry.real_size;
op_meta.max_real_size[j] = entry.max_real_size;
op_meta.reserved[j] = entry.reserved;
op_meta.max_reserved[j] = entry.max_reserved;
}
++i;
}
}

void daliFreeExecutorMetadata(daliExecutorMetadata *operator_meta, size_t operator_meta_num) {
for (size_t i = 0; i < operator_meta_num; ++i) {
free(operator_meta[i].operator_name);
free(operator_meta[i].real_size);
free(operator_meta[i].max_real_size);
free(operator_meta[i].reserved);
free(operator_meta[i].max_reserved);
}
free(operator_meta);
}
37 changes: 32 additions & 5 deletions dali/c_api/c_api_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ TYPED_TEST(CApiTest, FileReaderPipe) {
daliPipelineHandle handle;
daliCreatePipeline(&handle, serialized.c_str(), serialized.size(), batch_size, num_thread,
device_id, false, prefetch_queue_depth, prefetch_queue_depth,
prefetch_queue_depth);
prefetch_queue_depth, false);
daliPrefetchUniform(&handle, prefetch_queue_depth);

dali::DeviceWorkspace ws;
Expand Down Expand Up @@ -223,7 +223,7 @@ TYPED_TEST(CApiTest, ExternalSourceSingleAllocPipe) {
daliPipelineHandle handle;
daliCreatePipeline(&handle, serialized.c_str(), serialized.size(), batch_size, num_thread,
device_id, false, prefetch_queue_depth, prefetch_queue_depth,
prefetch_queue_depth);
prefetch_queue_depth, false);

for (int i = 0; i < prefetch_queue_depth; i++) {
SequentialFill(view<uint8_t>(input_cpu), 42 * i);
Expand Down Expand Up @@ -280,7 +280,7 @@ TYPED_TEST(CApiTest, ExternalSourceMultipleAllocPipe) {
daliPipelineHandle handle;
daliCreatePipeline(&handle, serialized.c_str(), serialized.size(), batch_size, num_thread,
device_id, false, prefetch_queue_depth, prefetch_queue_depth,
prefetch_queue_depth);
prefetch_queue_depth, false);

for (int i = 0; i < prefetch_queue_depth; i++) {
SequentialFill(view<uint8_t>(input_cpu), 42 * i);
Expand Down Expand Up @@ -340,7 +340,7 @@ TYPED_TEST(CApiTest, ExternalSourceSingleAllocDifferentBackendsTest) {
daliPipelineHandle handle;
daliCreatePipeline(&handle, serialized.c_str(), serialized.size(), batch_size, num_thread,
device_id, false, prefetch_queue_depth, prefetch_queue_depth,
prefetch_queue_depth);
prefetch_queue_depth, false);

for (int i = 0; i < prefetch_queue_depth; i++) {
SequentialFill(view<uint8_t>(input_cpu), 42 * i);
Expand Down Expand Up @@ -404,7 +404,7 @@ TYPED_TEST(CApiTest, ExternalSourceMultipleAllocDifferentBackendsTest) {
daliPipelineHandle handle;
daliCreatePipeline(&handle, serialized.c_str(), serialized.size(), batch_size, num_thread,
device_id, false, prefetch_queue_depth, prefetch_queue_depth,
prefetch_queue_depth);
prefetch_queue_depth, false);

for (int i = 0; i < prefetch_queue_depth; i++) {
SequentialFill(view<uint8_t>(input_cpu), 42 * i);
Expand Down Expand Up @@ -445,4 +445,31 @@ TYPED_TEST(CApiTest, ExternalSourceMultipleAllocDifferentBackendsTest) {
ComparePipelinesOutputs<OpBackend>(handle, *pipe_ptr);
}

TYPED_TEST(CApiTest, TestExecutorMeta) {
auto pipe_ptr = GetTestPipeline<TypeParam>(true, this->output_device_);
auto serialized = pipe_ptr->SerializeToProtobuf();

pipe_ptr.reset();
daliPipelineHandle handle;
daliCreatePipeline(&handle, serialized.c_str(), serialized.size(), batch_size, num_thread,
device_id, false, prefetch_queue_depth, prefetch_queue_depth,
prefetch_queue_depth, true);

daliRun(&handle);
daliOutput(&handle);
CUDA_CALL(cudaDeviceSynchronize());

size_t N;
daliExecutorMetadata *meta;
daliGetExecutorMetadata(&handle, &meta, &N);
EXPECT_EQ(N, 4);
for (size_t i = 0; i< N; ++i) {
auto &meta_entry = meta[i];
for (size_t j = 0; j < meta_entry.out_num; ++j) {
EXPECT_LE(meta_entry.real_size[j], meta_entry.reserved[j]);
}
}
daliFreeExecutorMetadata(meta, N);
}

} // namespace dali
24 changes: 24 additions & 0 deletions dali/pipeline/data/tensor_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,30 @@ class TensorVector {
return tensors_.size();
}

size_t nbytes() const noexcept {
if (state_ == State::contiguous) {
return tl_->nbytes();
}
// else
size_t total_nbytes = 0;
for (const auto &t : tensors_) {
total_nbytes += t->nbytes();
}
return total_nbytes;
}

size_t capacity() const noexcept {
if (state_ == State::contiguous) {
return tl_->capacity();
}
// else
size_t total_capacity = 0;
for (const auto &t : tensors_) {
total_capacity += t->capacity();
}
return total_capacity;
}

TensorListShape<> shape() const {
if (state_ == State::contiguous) {
return tl_->shape();
Expand Down
117 changes: 115 additions & 2 deletions dali/pipeline/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <string>
#include <utility>
#include <vector>
#include <unordered_map>
#include <mutex>

#include "dali/core/common.h"
#include "dali/core/error_handling.h"
Expand All @@ -38,15 +40,26 @@
#include "dali/pipeline/workspace/host_workspace.h"
#include "dali/pipeline/workspace/mixed_workspace.h"
#include "dali/pipeline/workspace/workspace_data_factory.h"
#include "dali/pipeline/data/backend.h"

namespace dali {

struct DLL_PUBLIC ExecutorMeta {
size_t real_size;
size_t max_real_size;
size_t reserved;
size_t max_reserved;
};
using ExecutorMetaMap = std::unordered_map<std::string, std::vector<ExecutorMeta>>;

namespace detail {
// This is stream callback used on GPU stream to indicate that GPU work for this
// pipeline run is finished
static void gpu_finished_callback(cudaStream_t stream, cudaError_t status, void *userData);

// helper function to concatenate ExecutorMetaMap maps
static void AppendToMap(ExecutorMetaMap &ret, ExecutorMetaMap &in_stats, std::mutex &mutex);

} // namespace detail

class DLL_PUBLIC ExecutorBase {
Expand All @@ -62,6 +75,8 @@ class DLL_PUBLIC ExecutorBase {
DLL_PUBLIC virtual void ShareOutputs(DeviceWorkspace *ws) = 0;
DLL_PUBLIC virtual void ReleaseOutputs() = 0;
DLL_PUBLIC virtual void SetCompletionCallback(ExecutorCallback cb) = 0;
DLL_PUBLIC virtual void EnableMemoryStats(bool enable_memory_stats = false) = 0;
DLL_PUBLIC virtual ExecutorMetaMap GetExecutorMeta() = 0;

protected:
// virtual to allow the TestPruneWholeGraph test in gcc
Expand Down Expand Up @@ -94,13 +109,17 @@ class DLL_PUBLIC Executor : public ExecutorBase, public WorkspacePolicy, public
exec_error_(false),
queue_sizes_(prefetch_queue_depth),
mixed_op_stream_(0),
gpu_op_stream_(0) {
gpu_op_stream_(0),
enable_memory_stats_(false) {
DALI_ENFORCE(batch_size_ > 0, "Batch size must be greater than 0.");
DALI_ENFORCE(device_id >= 0, "Device id must be non-negative.");

stage_queue_depths_ = QueuePolicy::GetQueueSizes(prefetch_queue_depth);
}

DLL_PUBLIC void EnableMemoryStats(bool enable_memory_stats = false) override {
enable_memory_stats_ = enable_memory_stats;
}
DLL_PUBLIC void Build(OpGraph *graph, vector<string> output_names) override;
DLL_PUBLIC void Init() override {}
DLL_PUBLIC void RunCPU() override;
Expand All @@ -110,6 +129,7 @@ class DLL_PUBLIC Executor : public ExecutorBase, public WorkspacePolicy, public
DLL_PUBLIC void ShareOutputs(DeviceWorkspace *ws) override;
DLL_PUBLIC void ReleaseOutputs() override;
DLL_PUBLIC void SetCompletionCallback(ExecutorCallback cb) override;
DLL_PUBLIC ExecutorMetaMap GetExecutorMeta() override;

DLL_PUBLIC void ShutdownQueue() {
QueuePolicy::SignalStop();
Expand All @@ -118,6 +138,75 @@ class DLL_PUBLIC Executor : public ExecutorBase, public WorkspacePolicy, public
DISABLE_COPY_MOVE_ASSIGN(Executor);

protected:
template<typename T>
inline void GetMaxSizesCont(T &in, size_t &max_out_size, size_t &max_reserved_size) {
auto out_size = in.nbytes();
auto reserved_size = in.capacity();
max_out_size = std::max<size_t>(std::ceil((out_size * 1.0) / in.ntensor()), max_out_size);
max_reserved_size = std::max<size_t>(std::ceil((reserved_size * 1.0) / in.ntensor()),
max_reserved_size);
}

template<typename T>
inline void GetMaxSizesNonCont(T &in, size_t &max_out_size, size_t &max_reserved_size) {
for (size_t j = 0; j < in.ntensor(); ++j) {
max_out_size = std::max(in[j].nbytes(), max_out_size);
max_reserved_size = std::max(in[j].capacity(), max_reserved_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some reservations regarding this. In the case of contiguous TensorVector, the Tensors that you're accessing with operator[] here are kind of views into the backing TensorList. So the capacity will probably match exactly the nbytes here, but the total capacity of the TensorVector might be bigger than nsamples * max(number of bytes) (I suspect).

This can be probably checked with a test that sets the TensorVector to contiguous mode, resizes it to some big shapes and than sets all the shapes to be for example half the initial size.

Can you check this? I think the best solution would be to take max of this reserved size and the average reserved size you calculate for the TensorList in function above.

}
}

template<typename backend>
inline void GetMaxSizes(TensorList<backend> &in, size_t &max_out_size,
size_t &max_reserved_size) {
GetMaxSizesCont(in, max_out_size, max_reserved_size);
}

template<typename backend>
inline void GetMaxSizes(TensorVector<backend> &in, size_t &max_out_size,
size_t &max_reserved_size) {
if (in.IsContiguous()) {
GetMaxSizesCont(in, max_out_size, max_reserved_size);
} else {
GetMaxSizesNonCont(in, max_out_size, max_reserved_size);
}
}

template <typename W>
inline void FillStats(ExecutorMetaMap &memory_stats, W ws, std::string op_name,
std::mutex &write_mutex) {
if (enable_memory_stats_) {
size_t out_size = 0;
size_t max_out_size = 0;
size_t reserved_size = 0;
size_t max_reserved_size = 0;
std::lock_guard<std::mutex> lck(write_mutex);
auto &stats = memory_stats[op_name];
stats.resize(ws.NumOutput(), {0, 0});

for (int i = 0; i < ws.NumOutput(); ++i) {
out_size = 0;
max_out_size = 0;
reserved_size = 0;
max_reserved_size = 0;
if (ws.template OutputIsType<CPUBackend>(i)) {
auto &out = ws.template OutputRef<CPUBackend>(i);
out_size = out.nbytes();
reserved_size = out.capacity();
GetMaxSizes(out, max_out_size, max_reserved_size);
} else {
auto &out = ws.template OutputRef<GPUBackend>(i);
out_size = out.nbytes();
reserved_size = out.capacity();
GetMaxSizes(out, max_out_size, max_reserved_size);
}
stats[i].real_size = std::max(out_size, stats[i].real_size);
stats[i].max_real_size = std::max(max_out_size, stats[i].max_real_size);
stats[i].reserved = std::max(reserved_size, stats[i].reserved);
stats[i].max_reserved = std::max(max_reserved_size, stats[i].max_reserved);
}
}
}

void HandleError(const char *message = "Unknown exception") {
exec_error_ = true;
ShutdownQueue();
Expand Down Expand Up @@ -220,6 +309,12 @@ class DLL_PUBLIC Executor : public ExecutorBase, public WorkspacePolicy, public
// in some edge cases where there are no operators
std::vector<cudaEvent_t> mixed_callback_events_;

std::atomic<bool> enable_memory_stats_;
ExecutorMetaMap cpu_memory_stats_, mixed_memory_stats_, gpu_memory_stats_;
std::mutex cpu_memory_stats_mutex_;
std::mutex mixed_memory_stats_mutex_;
std::mutex gpu_memory_stats_mutex_;

private:
template <typename Workspace>
void RunHelper(OpNode &op_node, Workspace &ws) {
Expand Down Expand Up @@ -266,6 +361,15 @@ void Executor<WorkspacePolicy, QueuePolicy>::SetCompletionCallback(ExecutorCallb
}
}

template <typename WorkspacePolicy, typename QueuePolicy>
ExecutorMetaMap Executor<WorkspacePolicy, QueuePolicy>::GetExecutorMeta() {
ExecutorMetaMap ret;
detail::AppendToMap(ret, cpu_memory_stats_, cpu_memory_stats_mutex_);
detail::AppendToMap(ret, mixed_memory_stats_, mixed_memory_stats_mutex_);
detail::AppendToMap(ret, gpu_memory_stats_, gpu_memory_stats_mutex_);
return ret;
}

template <typename WorkspacePolicy, typename QueuePolicy>
void Executor<WorkspacePolicy, QueuePolicy>::Build(OpGraph *graph, vector<string> output_names) {
DALI_ENFORCE(graph != nullptr, "Input graph is nullptr.");
Expand Down Expand Up @@ -347,6 +451,7 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunCPU() {

try {
RunHelper(op_node, ws);
FillStats(cpu_memory_stats_, ws, "CPU_" + op_node.instance_name, cpu_memory_stats_mutex_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instance names should be unique, what's the rationale for the CPU_ suffixes etc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have an operator for CPU and GPU then it is hard to tell which instance is placed where. Name is unique but a bit mangled and not always self explanatory to the user.

} catch (std::exception &e) {
HandleError(e.what());
} catch (...) {
Expand Down Expand Up @@ -381,6 +486,8 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunMixed() {
TimeRange tr("[Executor] Run Mixed op " + op_node.instance_name,
TimeRange::kOrange);
RunHelper(op_node, ws);
FillStats(mixed_memory_stats_, ws, "MIXED_" + op_node.instance_name,
mixed_memory_stats_mutex_);
if (ws.has_stream() && ws.has_event()) {
CUDA_CALL(cudaEventRecord(ws.event(), ws.stream()));
}
Expand Down Expand Up @@ -438,6 +545,7 @@ void Executor<WorkspacePolicy, QueuePolicy>::RunGPU() {
TimeRange tr("[Executor] Run GPU op " + op_node.instance_name,
TimeRange::knvGreen);
RunHelper(op_node, ws);
FillStats(gpu_memory_stats_, ws, "GPU_" + op_node.instance_name, gpu_memory_stats_mutex_);
if (ws.has_event()) {
CUDA_CALL(cudaEventRecord(ws.event(), ws.stream()));
}
Expand Down Expand Up @@ -724,15 +832,20 @@ void Executor<WorkspacePolicy, QueuePolicy>::SetupOutputQueuesForGraph() {

using SimpleExecutor = Executor<AOT_WS_Policy<UniformQueuePolicy>, UniformQueuePolicy>;


namespace detail {

void gpu_finished_callback(cudaStream_t stream, cudaError_t status, void *userData) {
auto callback = static_cast<ExecutorBase::ExecutorCallback*>(userData);
(*callback)();
}

} // namespace detail
void AppendToMap(ExecutorMetaMap &ret, ExecutorMetaMap &in_stats, std::mutex &mutex) {
const std::lock_guard<std::mutex> lock(mutex);
ret.insert(in_stats.begin(), in_stats.end());
}

} // namespace detail

} // namespace dali

Expand Down
Loading