Skip to content

Commit

Permalink
Add a thread name to all DALI threads
Browse files Browse the repository at this point in the history
- adds an ability to name thread poll and the worker thread
  threads so they are easier to identify on the nsight traces
- use NVTX API to set full names visible in nsight traces
  and pthread_setname_np to set truncated names visible in the GDB debugger

Signed-off-by: Janusz Lisiecki <jlisiecki@nvidia.com>
  • Loading branch information
JanuszL committed May 18, 2022
1 parent e03deb8 commit ef02b63
Show file tree
Hide file tree
Showing 22 changed files with 75 additions and 41 deletions.
2 changes: 1 addition & 1 deletion dali/benchmark/displacement_cpu_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void DisplacementBench(benchmark::State& st) {//NOLINT
}

// We need a thread pool
ThreadPool tp(4, 0, false);
ThreadPool tp(4, 0, false, "DisplacementBench");

// Create workspace and set input and output
HostWorkspace ws;
Expand Down
2 changes: 1 addition & 1 deletion dali/benchmark/operator_bench.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class OperatorBench : public DALIBenchmark {
// Create workspace and set input and output
HostWorkspace ws;
ws.AddInput(data_in);
ThreadPool tp(num_threads, 0, false);
ThreadPool tp(num_threads, 0, false, "OperatorBench");
ws.SetThreadPool(&tp);

Setup<TensorVector<CPUBackend>>(op_ptr, op_spec, ws, batch_size);
Expand Down
6 changes: 3 additions & 3 deletions dali/benchmark/thread_pool_bench.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,7 @@ BENCHMARK_DEFINE_F(ThreadPoolBench, AddWork)(benchmark::State& st) {
int work_size_max = st.range(2);
int nthreads = st.range(3);

ThreadPool thread_pool(nthreads, 0, false);
ThreadPool thread_pool(nthreads, 0, false, "ThreadPoolBench");

std::vector<uint8_t> data(2000, 0xFF);
std::atomic<int64_t> total_count(0);
Expand Down Expand Up @@ -71,7 +71,7 @@ BENCHMARK_DEFINE_F(ThreadPoolBench, AddWorkDeferred)(benchmark::State& st) {
int work_size_max = st.range(2);
int nthreads = st.range(3);

ThreadPool thread_pool(nthreads, 0, false);
ThreadPool thread_pool(nthreads, 0, false, "ThreadPoolBench");
std::vector<uint8_t> data(2000, 0xFF);

std::atomic<int64_t> total_count(0);
Expand Down
21 changes: 20 additions & 1 deletion dali/core/nvtx.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <sys/syscall.h>
#include <pthread.h>
#include <memory>
#include "dali/core/nvtx.h"

Expand Down Expand Up @@ -56,6 +58,23 @@ DLL_PUBLIC DomainTimeRange::~DomainTimeRange() {
DomainTimeRangeImpl::GetInstance().Stop();
}


DLL_PUBLIC void NameThread(const char *name) {
nvtxNameOsThreadA(syscall(SYS_gettid), name);
char tmp_name[16];
int i = 0;
while (name[i] != '\0' && i < 15) {
tmp_name[i] = name[i];
++i;
}
tmp_name[i] = '\0';
pthread_setname_np(pthread_self(), tmp_name);
}

#else

void NameThread(const char *name) {}

#endif // NVTX_ENABLED

} // namespace dali
2 changes: 1 addition & 1 deletion dali/kernels/test/scatter_gather_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ScatterGatherTest : public testing::Test {
this->template Memset<kind>(out_ptr.get(), 0, out.size());

T sg(max_block);
ThreadPool tp(4, 0, false);
ThreadPool tp(4, 0, false, "test TP");
// copy
for (auto &r : ranges)
sg.AddCopy(r.dst, r.src, r.size);
Expand Down
6 changes: 4 additions & 2 deletions dali/operators/decoder/nvjpeg/nvjpeg_decoder_decoupled_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ class nvJPEGDecoder : public Operator<MixedBackend>, CachedDecoderImpl {
pinned_allocator_(nvjpeg_memory::GetPinnedAllocator()),
thread_pool_(num_threads_,
spec.GetArgument<int>("device_id"),
spec.GetArgument<bool>("affine") /* pin threads */),
spec.GetArgument<bool>("affine") /* pin threads */,
"image decoder nvJPEG"),
nvjpeg2k_thread_(1,
spec.GetArgument<int>("device_id"),
spec.GetArgument<bool>("affine")) {
spec.GetArgument<bool>("affine"),
"image decoder nvJPEG2k") {
#if IS_HW_DECODER_COMPATIBLE
// if hw_decoder_load is not present in the schema (crop/sliceDecoder) then it is not supported
bool try_init_hw_decoder = false;
Expand Down
2 changes: 1 addition & 1 deletion dali/operators/reader/nemo_asr_reader_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ NemoAsrReader::NemoAsrReader(const OpSpec& spec)
read_idxs_(spec.GetArgument<bool>("read_idxs")),
dtype_(spec.GetArgument<DALIDataType>("dtype")),
num_threads_(std::max(1, spec.GetArgument<int>("num_threads"))),
thread_pool_(num_threads_, spec.GetArgument<int>("device_id"), false) {
thread_pool_(num_threads_, spec.GetArgument<int>("device_id"), false, "NemoAsrReader") {
loader_ = InitLoader<NemoAsrLoader>(spec);

prefetched_decoded_audio_.resize(prefetch_queue_depth_);
Expand Down
2 changes: 1 addition & 1 deletion dali/operators/reader/numpy_reader_gpu_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace dali {

NumpyReaderGPU::NumpyReaderGPU(const OpSpec& spec)
: NumpyReader<GPUBackend, NumpyFileWrapperGPU>(spec),
thread_pool_(num_threads_, spec.GetArgument<int>("device_id"), false),
thread_pool_(num_threads_, spec.GetArgument<int>("device_id"), false, "NumpyReaderGPU"),
sg_(1 << 18) {
prefetched_batch_tensors_.resize(prefetch_queue_depth_);

Expand Down
3 changes: 2 additions & 1 deletion dali/operators/reader/reader_op.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,6 +93,7 @@ class DataReader : public Operator<Backend> {

// Main prefetch work loop
void PrefetchWorker() {
NameThread(make_string("PrefetchWorker ", spec_.name()).c_str());
DeviceGuard g(device_id_);
ProducerWait();
while (!finished_) {
Expand Down
6 changes: 3 additions & 3 deletions dali/pipeline/executor/async_pipelined_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class DLL_PUBLIC AsyncPipelinedExecutor : public PipelinedExecutor {
QueueSizes prefetch_queue_depth = QueueSizes{2, 2})
: PipelinedExecutor(batch_size, num_thread, device_id, bytes_per_sample_hint, set_affinity,
max_num_stream, default_cuda_stream_priority, prefetch_queue_depth),
cpu_thread_(device_id, set_affinity),
mixed_thread_(device_id, set_affinity),
gpu_thread_(device_id, set_affinity) {}
cpu_thread_(device_id, set_affinity, "CPU executor"),
mixed_thread_(device_id, set_affinity, "Mixed executor"),
gpu_thread_(device_id, set_affinity, "GPU executor") {}

DLL_PUBLIC ~AsyncPipelinedExecutor() override {
Shutdown();
Expand Down
8 changes: 4 additions & 4 deletions dali/pipeline/executor/async_separated_pipelined_executor.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2019-2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2019-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,9 +39,9 @@ class DLL_PUBLIC AsyncSeparatedPipelinedExecutor : public SeparatedPipelinedExec
: SeparatedPipelinedExecutor(batch_size, num_thread, device_id, bytes_per_sample_hint,
set_affinity, max_num_stream, default_cuda_stream_priority,
prefetch_queue_depth),
cpu_thread_(device_id, set_affinity),
mixed_thread_(device_id, set_affinity),
gpu_thread_(device_id, set_affinity) {}
cpu_thread_(device_id, set_affinity, "CPU executor"),
mixed_thread_(device_id, set_affinity, "Mixed executor"),
gpu_thread_(device_id, set_affinity, "GPU executor") {}

DLL_PUBLIC ~AsyncSeparatedPipelinedExecutor() override {
Shutdown();
Expand Down
2 changes: 1 addition & 1 deletion dali/pipeline/executor/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class DLL_PUBLIC Executor : public ExecutorBase, public QueuePolicy {
bytes_per_sample_hint_(bytes_per_sample_hint),
callback_(nullptr),
event_pool_(),
thread_pool_(num_thread, device_id, set_affinity),
thread_pool_(num_thread, device_id, set_affinity, "Executor"),
exec_error_(false),
queue_sizes_(prefetch_queue_depth),
enable_memory_stats_(false) {
Expand Down
2 changes: 1 addition & 1 deletion dali/pipeline/operator/builtin/external_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class ExternalSource : public Operator<Backend>, virtual public BatchSizeProvide
previous_dtype_(DALIDataType::DALI_NO_TYPE),
ndim_(-1),
layout_(),
sync_worker_(device_id_, false) {
sync_worker_(device_id_, false, "ExternalSource syncworker") {
if (spec.TryGetArgument(ndim_, "ndim")) {
DALI_ENFORCE(ndim_ >= 0, make_string("Incorrect number of dimensions (", ndim_,
"). Use positive values for tensors or 0 for scalars."));
Expand Down
2 changes: 1 addition & 1 deletion dali/pipeline/operator/eager_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ std::vector<std::shared_ptr<TensorList<OutBackend>>> EagerOperator<Backend>::Run

template <typename Backend>
std::unique_ptr<ThreadPool> EagerOperator<Backend>::shared_thread_pool =
std::make_unique<ThreadPool>(1, CPU_ONLY_DEVICE_ID, false);
std::make_unique<ThreadPool>(1, CPU_ONLY_DEVICE_ID, false, "EagerOperator");

template <typename Backend>
CUDAStreamLease EagerOperator<Backend>::shared_cuda_stream{};
Expand Down
3 changes: 2 additions & 1 deletion dali/pipeline/operator/false_gpu_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class FalseGPUOperator : public Operator<GPUBackend> {
explicit FalseGPUOperator(const OpSpec &spec)
: Operator<GPUBackend>(spec),
cpu_impl_(spec),
thread_pool_(num_threads_, spec.GetArgument<int>("device_id"), true) {
thread_pool_(num_threads_, spec.GetArgument<int>("device_id"), true,
"FalseGPUOperator " + spec.name()) {
cpu_ws_.SetThreadPool(&thread_pool_);
}
~FalseGPUOperator() override = default;
Expand Down
2 changes: 1 addition & 1 deletion dali/pipeline/pipeline_debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class DLL_PUBLIC PipelineDebug {
: max_batch_size_(max_batch_size),
device_id_(device_id),
num_threads_(num_threads),
thread_pool_(num_threads, device_id, set_affinity) {
thread_pool_(num_threads, device_id, set_affinity, "Debug pipeline") {
if (device_id != CPU_ONLY_DEVICE_ID) {
DeviceGuard g(device_id);
cuda_stream_ = CUDAStreamPool::instance().Get(device_id);
Expand Down
12 changes: 8 additions & 4 deletions dali/pipeline/util/thread_pool.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved.
// Copyright (c) 2018-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -21,10 +21,11 @@
#include "dali/core/format.h"
#include "dali/core/cuda_utils.h"
#include "dali/core/device_guard.h"
#include "dali/core/nvtx.h"

namespace dali {

ThreadPool::ThreadPool(int num_thread, int device_id, bool set_affinity)
ThreadPool::ThreadPool(int num_thread, int device_id, bool set_affinity, const std::string name)
: threads_(num_thread), running_(true), work_complete_(true), started_(false)
, active_threads_(0) {
DALI_ENFORCE(num_thread > 0, "Thread pool must have non-zero size");
Expand All @@ -36,7 +37,8 @@ ThreadPool::ThreadPool(int num_thread, int device_id, bool set_affinity)
#endif
// Start the threads in the main loop
for (int i = 0; i < num_thread; ++i) {
threads_[i] = std::thread(std::bind(&ThreadPool::ThreadMain, this, i, device_id, set_affinity));
threads_[i] = std::thread(std::bind(&ThreadPool::ThreadMain, this, i, device_id, set_affinity,
make_string("[DALI][TP", i, "]", name)));
}
tl_errors_.resize(num_thread);
}
Expand Down Expand Up @@ -116,7 +118,9 @@ std::vector<std::thread::id> ThreadPool::GetThreadIds() const {
}


void ThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity) {
void ThreadPool::ThreadMain(int thread_id, int device_id, bool set_affinity,
const std::string &name) {
NameThread(name.c_str());
DeviceGuard g(device_id);
try {
#if NVML_ENABLED
Expand Down
8 changes: 5 additions & 3 deletions dali/pipeline/util/thread_pool.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2018, NVIDIA CORPORATION. All rights reserved.
// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,7 +34,8 @@ class DLL_PUBLIC ThreadPool {
// Basic unit of work that our threads do
typedef std::function<void(int)> Work;

DLL_PUBLIC ThreadPool(int num_thread, int device_id, bool set_affinity);
DLL_PUBLIC ThreadPool(int num_thread, int device_id, bool set_affinity,
const std::string name);

DLL_PUBLIC ~ThreadPool();

Expand Down Expand Up @@ -67,7 +68,8 @@ class DLL_PUBLIC ThreadPool {
DISABLE_COPY_MOVE_ASSIGN(ThreadPool);

private:
DLL_PUBLIC void ThreadMain(int thread_id, int device_id, bool set_affinity);
DLL_PUBLIC void ThreadMain(int thread_id, int device_id, bool set_affinity,
const std::string &name);

vector<std::thread> threads_;

Expand Down
7 changes: 4 additions & 3 deletions dali/pipeline/util/thread_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace dali {
namespace test {

TEST(ThreadPool, AddWork) {
ThreadPool tp(16, 0, false);
ThreadPool tp(16, 0, false, "ThreadPool test");
std::atomic<int> count{0};
auto increase = [&count](int thread_id) { count++; };
for (int i = 0; i < 64; i++) {
Expand All @@ -33,7 +33,7 @@ TEST(ThreadPool, AddWork) {
}

TEST(ThreadPool, AddWorkImmediateStart) {
ThreadPool tp(16, 0, false);
ThreadPool tp(16, 0, false, "ThreadPool test");
std::atomic<int> count{0};
auto increase = [&count](int thread_id) { count++; };
for (int i = 0; i < 64; i++) {
Expand All @@ -44,7 +44,8 @@ TEST(ThreadPool, AddWorkImmediateStart) {
}

TEST(ThreadPool, AddWorkWithPriority) {
ThreadPool tp(1, 0, false); // only one thread to ensure deterministic behavior
// only one thread to ensure deterministic behavior
ThreadPool tp(1, 0, false, "ThreadPool test");
std::atomic<int> count{0};
auto set_to_1 = [&count](int thread_id) {
count = 1;
Expand Down
8 changes: 5 additions & 3 deletions dali/pipeline/util/worker_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "dali/util/nvml.h"
#endif
#include "dali/core/device_guard.h"
#include "dali/core/nvtx.h"

namespace dali {

Expand Down Expand Up @@ -66,15 +67,15 @@ class WorkerThread {
public:
typedef std::function<void(void)> Work;

inline WorkerThread(int device_id, bool set_affinity) :
inline WorkerThread(int device_id, bool set_affinity, const std::string name) :
running_(true), work_complete_(true), barrier_(2) {
#if NVML_ENABLED
if (device_id != CPU_ONLY_DEVICE_ID) {
nvml::Init();
}
#endif
thread_ = std::thread(&WorkerThread::ThreadMain,
this, device_id, set_affinity);
this, device_id, set_affinity, make_string("[DALI][WT]", name));
}

inline ~WorkerThread() {
Expand Down Expand Up @@ -161,7 +162,8 @@ class WorkerThread {
}

private:
void ThreadMain(int device_id, bool set_affinity) {
void ThreadMain(int device_id, bool set_affinity, const std::string &name) {
NameThread(name.c_str());
DeviceGuard g(device_id);
try {
if (set_affinity) {
Expand Down
6 changes: 3 additions & 3 deletions dali/pipeline/util/worker_thread_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ namespace dali {
namespace test {

TEST(WorkerThread, Destructing) {
WorkerThread wt(0, false);
WorkerThread wt(0, false, "WorkerThread test");
// check destruction of a running worker thread
}

TEST(WorkerThread, WaitForWorkErrorHandling) {
WorkerThread wt(0, false);
WorkerThread wt(0, false, "WorkerThread test");
ASSERT_TRUE(wt.WaitForInit());
wt.DoWork([]() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
Expand All @@ -45,7 +45,7 @@ TEST(WorkerThread, WaitForWorkErrorHandling) {
}

TEST(WorkerThread, ShutdownErrorHandling) {
WorkerThread wt(0, false);
WorkerThread wt(0, false, "WorkerThread test");
ASSERT_TRUE(wt.WaitForInit());
wt.DoWork([]() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
Expand Down
4 changes: 3 additions & 1 deletion include/dali/core/nvtx.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2018, NVIDIA CORPORATION. All rights reserved.
// Copyright (c) 2018-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -100,6 +100,8 @@ struct DomainTimeRange : RangeBase {
#endif
};

void NameThread(const char *name);

} // namespace dali

#endif // DALI_CORE_NVTX_H_

0 comments on commit ef02b63

Please sign in to comment.