Skip to content

Commit

Permalink
threads: add thread names
Browse files Browse the repository at this point in the history
Adding names to threads simplifies cpu usage realtime tracking e.g. top -H -p <OSD_PID>

This commit changes Thread.create() method forcing to pass thread name.

Signed-off-by: Igor Podoski <igor.podoski@ts.fujitsu.com>
  • Loading branch information
Igor Podoski committed Dec 2, 2015
1 parent 9d42f1a commit 092e960
Show file tree
Hide file tree
Showing 82 changed files with 175 additions and 146 deletions.
2 changes: 1 addition & 1 deletion src/ceph_fuse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ int main(int argc, const char **argv, const char *envp[]) {

cerr << "ceph-fuse[" << getpid() << "]: starting fuse" << std::endl;
tester.init(cfuse, client);
tester.create();
tester.create("tester");
r = cfuse->loop();
tester.join(&tester_rp);
tester_r = static_cast<int>(reinterpret_cast<uint64_t>(tester_rp));
Expand Down
10 changes: 5 additions & 5 deletions src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,11 @@ Client::Client(Messenger *m, MonClient *mc)
getgroups_cb(NULL),
can_invalidate_dentries(false),
require_remount(false),
async_ino_invalidator(m->cct),
async_dentry_invalidator(m->cct),
interrupt_finisher(m->cct),
remount_finisher(m->cct),
objecter_finisher(m->cct),
async_ino_invalidator(m->cct, "fn_ino_cli"),
async_dentry_invalidator(m->cct, "fn_dentry_cli"),
interrupt_finisher(m->cct, "fn_interr_cli"),
remount_finisher(m->cct, "fn_remount_cli"),
objecter_finisher(m->cct, "fn_objecter_cli"),
tick_event(NULL),
monclient(mc), messenger(m), whoami(m->get_myname().num()),
cap_epoch_barrier(0),
Expand Down
1 change: 1 addition & 0 deletions src/client/SyntheticClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ int SyntheticClient::start_thread()

pthread_create(&thread_id, NULL, synthetic_client_thread_entry, this);
assert(thread_id);
pthread_setname_np(thread_id, "client");
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/Finisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
void Finisher::start()
{
ldout(cct, 10) << __func__ << dendl;
finisher_thread.create();
finisher_thread.create(thread_name);
}

void Finisher::stop()
Expand Down
11 changes: 7 additions & 4 deletions src/common/Finisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class Finisher {
/// should be completed in that place instead.
vector<Context*> finisher_queue;

const char *thread_name;

/// Queue for contexts for which the complete function will be called
/// with a parameter other than 0.
list<pair<Context*,int> > finisher_queue_rval;
Expand Down Expand Up @@ -132,17 +134,18 @@ class Finisher {

/// Construct an anonymous Finisher.
/// Anonymous finishers do not log their queue length.
Finisher(CephContext *cct_) :
Finisher(CephContext *cct_, const char *tn) :
cct(cct_), finisher_lock("Finisher::finisher_lock"),
finisher_stop(false), finisher_running(false),
logger(0),
thread_name(tn), logger(0),
finisher_thread(this) {}

/// Construct a named Finisher that logs its queue length.
Finisher(CephContext *cct_, string name) :

Finisher(CephContext *cct_, string name, const char *tn) :
cct(cct_), finisher_lock("Finisher::finisher_lock"),
finisher_stop(false), finisher_running(false),
logger(0),
thread_name(tn), logger(0),
finisher_thread(this) {
PerfCountersBuilder b(cct, string("finisher-") + name,
l_finisher_first, l_finisher_last);
Expand Down
2 changes: 1 addition & 1 deletion src/common/OutputDataSocket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ bool OutputDataSocket::init(const std::string &path)
m_shutdown_rd_fd = pipe_rd;
m_shutdown_wr_fd = pipe_wr;
m_path = path;
create();
create("out_data_socket");
add_cleanup_file(m_path.c_str());
return true;
}
Expand Down
5 changes: 4 additions & 1 deletion src/common/Thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ int Thread::try_create(size_t stacksize)
return r;
}

void Thread::create(size_t stacksize)
void Thread::create(const char *name, size_t stacksize)
{
int ret = try_create(stacksize);
if (ret != 0) {
Expand All @@ -152,6 +152,9 @@ void Thread::create(size_t stacksize)
"failed with error %d", ret);
dout_emergency(buf);
assert(ret == 0);
} else if (thread_id > 0) {
assert(strlen(name) < 16);
pthread_setname_np(thread_id, name);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Thread {
bool am_self() const;
int kill(int signal);
int try_create(size_t stacksize);
void create(size_t stacksize = 0);
void create(const char * name, size_t stacksize = 0);
int join(void **prval = 0);
int detach();
int set_ioprio(int cls, int prio);
Expand Down
2 changes: 1 addition & 1 deletion src/common/Timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void SafeTimer::init()
{
ldout(cct,10) << "init" << dendl;
thread = new SafeTimerThread(this);
thread->create();
thread->create("safe_timer");
}

void SafeTimer::shutdown()
Expand Down
6 changes: 4 additions & 2 deletions src/common/WorkQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ void ThreadPool::worker(WorkThread *wt)
processing++;
ldout(cct,12) << "worker wq " << wq->name << " start processing " << item
<< " (" << processing << " active)" << dendl;
assert(strlen(wq->thread_name) < 16);
pthread_setname_np(wt->get_thread_id(), wq->thread_name);
TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);
tp_handle.reset_tp_timeout();
_lock.Unlock();
Expand Down Expand Up @@ -169,7 +171,7 @@ void ThreadPool::start_threads()
if (r < 0)
lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;

wt->create();
wt->create("thread_pool");
}
}

Expand Down Expand Up @@ -356,7 +358,7 @@ void ShardedThreadPool::start_threads()
WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
threads_shardedpool.push_back(wt);
wt->create();
wt->create("sharded_th_pool");
thread_index++;
}
}
Expand Down
30 changes: 17 additions & 13 deletions src/common/WorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ class ThreadPool : public md_config_obs_t {
/// Basic interface to a work queue used by the worker threads.
struct WorkQueue_ {
string name;
const char *thread_name;
time_t timeout_interval, suicide_interval;
WorkQueue_(string n, time_t ti, time_t sti)
: name(n), timeout_interval(ti), suicide_interval(sti)
WorkQueue_(string n, const char *tn, time_t ti, time_t sti)
: name(n), thread_name(tn), timeout_interval(ti), suicide_interval(sti)
{ }
virtual ~WorkQueue_() {}
/// Remove all work items from the queue.
Expand Down Expand Up @@ -131,8 +132,8 @@ class ThreadPool : public md_config_obs_t {
}

public:
BatchWorkQueue(string n, time_t ti, time_t sti, ThreadPool* p)
: WorkQueue_(n, ti, sti), pool(p) {
BatchWorkQueue(string n, const char *tn, time_t ti, time_t sti, ThreadPool* p)
: WorkQueue_(n, tn, ti, sti), pool(p) {
pool->add_work_queue(this);
}
~BatchWorkQueue() {
Expand Down Expand Up @@ -229,8 +230,8 @@ class ThreadPool : public md_config_obs_t {
void _clear() {}

public:
WorkQueueVal(string n, time_t ti, time_t sti, ThreadPool *p)
: WorkQueue_(n, ti, sti), _lock("WorkQueueVal::lock"), pool(p) {
WorkQueueVal(string n, const char *tn, time_t ti, time_t sti, ThreadPool *p)
: WorkQueue_(n, tn, ti, sti), _lock("WorkQueueVal::lock"), pool(p) {
pool->add_work_queue(this);
}
~WorkQueueVal() {
Expand Down Expand Up @@ -298,7 +299,8 @@ class ThreadPool : public md_config_obs_t {
}

public:
WorkQueue(string n, time_t ti, time_t sti, ThreadPool* p) : WorkQueue_(n, ti, sti), pool(p) {
WorkQueue(string n, const char *tn, time_t ti, time_t sti, ThreadPool* p)
: WorkQueue_(n, tn, ti, sti), pool(p) {
pool->add_work_queue(this);
}
~WorkQueue() {
Expand Down Expand Up @@ -353,8 +355,8 @@ class ThreadPool : public md_config_obs_t {
template<typename T>
class PointerWQ : public WorkQueue_ {
public:
PointerWQ(string n, time_t ti, time_t sti, ThreadPool* p)
: WorkQueue_(n, ti, sti), m_pool(p), m_processing(0) {
PointerWQ(string n, const char *tn, time_t ti, time_t sti, ThreadPool* p)
: WorkQueue_(n, tn, ti, sti), m_pool(p), m_processing(0) {
m_pool->add_work_queue(this);
}
~PointerWQ() {
Expand Down Expand Up @@ -529,9 +531,10 @@ class GenContextWQ :
public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
list<GenContext<ThreadPool::TPHandle&>*> _queue;
public:
GenContextWQ(const string &name, time_t ti, ThreadPool *tp)
GenContextWQ(const string &name, const char *thread_name, time_t ti,
ThreadPool *tp)
: ThreadPool::WorkQueueVal<
GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
GenContext<ThreadPool::TPHandle&>*>(name, thread_name, ti, ti*10, tp) {}

void _enqueue(GenContext<ThreadPool::TPHandle&> *c) {
_queue.push_back(c);
Expand Down Expand Up @@ -569,8 +572,9 @@ class C_QueueInWQ : public Context {
/// @see Finisher
class ContextWQ : public ThreadPool::PointerWQ<Context> {
public:
ContextWQ(const string &name, time_t ti, ThreadPool *tp)
: ThreadPool::PointerWQ<Context>(name, ti, 0, tp),
ContextWQ(const string &name, const char *thread_name, time_t ti,
ThreadPool *tp)
: ThreadPool::PointerWQ<Context>(name, thread_name, ti, 0, tp),
m_lock("ContextWQ::m_lock") {
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/admin_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ bool AdminSocket::init(const std::string &path)
register_command("get_command_descriptions", "get_command_descriptions",
m_getdescs_hook, "list available commands");

create();
create("admin_socket");
add_cleanup_file(m_path.c_str());
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/ceph_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ void CephContext::start_service_thread()
return;
}
_service_thread = new CephContextServiceThread(this);
_service_thread->create();
_service_thread->create("service");
ceph_spin_unlock(&_service_thread_lock);

// make logs flush on_exit()
Expand Down
3 changes: 3 additions & 0 deletions src/common/obj_bencher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ int ObjBencher::write_bench(int secondsToRun,
pthread_t print_thread;

pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
pthread_setname_np(print_thread, "write_stat");
lock.Lock();
data.finished = 0;
data.start_time = ceph_clock_now(cct);
Expand Down Expand Up @@ -610,6 +611,7 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre

pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
pthread_setname_np(print_thread, "seq_read_stat");

utime_t finish_time = data.start_time + time_to_run;
//start initial reads
Expand Down Expand Up @@ -830,6 +832,7 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr

pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
pthread_setname_np(print_thread, "rand_read_stat");

utime_t finish_time = data.start_time + time_to_run;
//start initial reads
Expand Down
4 changes: 3 additions & 1 deletion src/compressor/AsyncCompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class AsyncCompressor {
deque<Job*> job_queue;

CompressWQ(AsyncCompressor *ac, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
: ThreadPool::WorkQueue<Job>("AsyncCompressor::CompressWQ", timeout, suicide_timeout, tp), async_compressor(ac) {}
: ThreadPool::WorkQueue<Job>("AsyncCompressor::CompressWQ", "wq_async_compr",
timeout, suicide_timeout, tp),
async_compressor(ac) {}

bool _enqueue(Job *item) {
job_queue.push_back(item);
Expand Down
2 changes: 1 addition & 1 deletion src/global/signal_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ struct SignalHandler : public Thread {
assert(r == 0);

// create thread
create();
create("sginal_handler");
}

~SignalHandler() {
Expand Down
2 changes: 1 addition & 1 deletion src/journal/JournalMetadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void JournalMetadata::init(Context *on_init) {
assert(!m_initialized);
m_initialized = true;

m_finisher = new Finisher(m_cct);
m_finisher = new Finisher(m_cct, "fn_jrn_metadata");
m_finisher->start();

m_timer = new SafeTimer(m_cct, m_timer_lock, true);
Expand Down
2 changes: 1 addition & 1 deletion src/kv/LevelDBStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,6 @@ void LevelDBStore::compact_range_async(const string& start, const string& end)
}
compact_queue_cond.Signal();
if (!compact_thread.is_started()) {
compact_thread.create();
compact_thread.create("levdbst_compact");
}
}
2 changes: 1 addition & 1 deletion src/librados/RadosClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ librados::RadosClient::RadosClient(CephContext *cct_)
timer(cct, lock),
refcnt(1),
log_last_version(0), log_cb(NULL), log_cb_arg(NULL),
finisher(cct)
finisher(cct, "fn_librados_cli")
{
}

Expand Down
4 changes: 2 additions & 2 deletions src/librbd/AioImageRequestWQ.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
namespace librbd {

AioImageRequestWQ::AioImageRequestWQ(ImageCtx *image_ctx, const string &name,
time_t ti, ThreadPool *tp)
: ThreadPool::PointerWQ<AioImageRequest>(name, ti, 0, tp),
const char *tn, time_t ti, ThreadPool *tp)
: ThreadPool::PointerWQ<AioImageRequest>(name, tn, ti, 0, tp),
m_image_ctx(*image_ctx), m_lock("AioImageRequestWQ::m_lock"),
m_write_blockers(0), m_in_progress_writes(0), m_queued_writes(0),
m_lock_listener(this), m_blocking_writes(false) {
Expand Down
4 changes: 2 additions & 2 deletions src/librbd/AioImageRequestWQ.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class ImageCtx;

class AioImageRequestWQ : protected ThreadPool::PointerWQ<AioImageRequest> {
public:
AioImageRequestWQ(ImageCtx *image_ctx, const string &name, time_t ti,
ThreadPool *tp);
AioImageRequestWQ(ImageCtx *image_ctx, const string &name, const char *tn,
time_t ti, ThreadPool *tp);

ssize_t read(uint64_t off, uint64_t len, char *buf, int op_flags);
ssize_t write(uint64_t off, uint64_t len, const char *buf, int op_flags);
Expand Down
4 changes: 3 additions & 1 deletion src/librbd/ImageCtx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@ struct C_InvalidateCache : public Context {
cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
thread_pool_singleton, "librbd::thread_pool");
aio_work_queue = new AioImageRequestWQ(this, "librbd::aio_work_queue",
"wq_librbd_aio",
cct->_conf->rbd_op_thread_timeout,
thread_pool_singleton);
op_work_queue = new ContextWQ("librbd::op_work_queue",
"wq_librbd_op",
cct->_conf->rbd_op_thread_timeout,
thread_pool_singleton);
}
Expand Down Expand Up @@ -297,7 +299,7 @@ struct C_InvalidateCache : public Context {
}

if (clone_copy_on_read) {
copyup_finisher = new Finisher(cct);
copyup_finisher = new Finisher(cct, "fn_clone_imgctx");
copyup_finisher->start();
}

Expand Down
2 changes: 1 addition & 1 deletion src/librbd/LibrbdWriteback.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ namespace librbd {
};

LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock)
: m_finisher(new Finisher(ictx->cct)), m_tid(0), m_lock(lock), m_ictx(ictx)
: m_finisher(new Finisher(ictx->cct, "fn_librbd_wback")), m_tid(0), m_lock(lock), m_ictx(ictx)
{
m_finisher->start();
}
Expand Down
2 changes: 1 addition & 1 deletion src/librbd/TaskFinisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class TaskFinisher {
public:
TaskFinisher(CephContext &cct)
: m_cct(cct), m_lock("librbd::TaskFinisher::m_lock"),
m_finisher(new Finisher(&cct)),
m_finisher(new Finisher(&cct, "fn_task_finder")),
m_safe_timer(new SafeTimer(&cct, m_lock, false))
{
m_finisher->start();
Expand Down
2 changes: 1 addition & 1 deletion src/log/Log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ void Log::start()
pthread_mutex_lock(&m_queue_mutex);
m_stop = false;
pthread_mutex_unlock(&m_queue_mutex);
create();
create("log");
}

void Log::stop()
Expand Down

0 comments on commit 092e960

Please sign in to comment.