Skip to content

Commit

Permalink
threads: add thread names
Browse files Browse the repository at this point in the history
Adding names to threads simplifies cpu usage realtime tracking e.g. top -H -p <OSD_PID>

This commit changes Thread.create() method forcing to pass thread name.

Signed-off-by: Igor Podoski <igor.podoski@ts.fujitsu.com>
  • Loading branch information
Igor Podoski committed Sep 15, 2015
1 parent b7aaecc commit e486653
Show file tree
Hide file tree
Showing 48 changed files with 78 additions and 74 deletions.
2 changes: 1 addition & 1 deletion src/ceph_fuse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ int main(int argc, const char **argv, const char *envp[]) {

cerr << "ceph-fuse[" << getpid() << "]: starting fuse" << std::endl;
tester.init(cfuse, client);
tester.create();
tester.create("tester");
r = cfuse->loop();
tester.join(&tester_rp);
tester_r = static_cast<int>(reinterpret_cast<uint64_t>(tester_rp));
Expand Down
1 change: 1 addition & 0 deletions src/client/SyntheticClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ int SyntheticClient::start_thread()

pthread_create(&thread_id, NULL, synthetic_client_thread_entry, this);
assert(thread_id);
pthread_setname_np(thread_id, "client");
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/Finisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
void Finisher::start()
{
ldout(cct, 10) << __func__ << dendl;
finisher_thread.create();
finisher_thread.create("finisher");
}

void Finisher::stop()
Expand Down
2 changes: 1 addition & 1 deletion src/common/OutputDataSocket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ bool OutputDataSocket::init(const std::string &path)
m_shutdown_rd_fd = pipe_rd;
m_shutdown_wr_fd = pipe_wr;
m_path = path;
create();
create("out data socket");
add_cleanup_file(m_path.c_str());
return true;
}
Expand Down
5 changes: 4 additions & 1 deletion src/common/Thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ int Thread::try_create(size_t stacksize)
return r;
}

void Thread::create(size_t stacksize)
void Thread::create(const char *name, size_t stacksize)
{
int ret = try_create(stacksize);
if (ret != 0) {
Expand All @@ -152,6 +152,9 @@ void Thread::create(size_t stacksize)
"failed with error %d", ret);
dout_emergency(buf);
assert(ret == 0);
} else if (thread_id > 0) {
assert(strlen(name) < 16);
pthread_setname_np(thread_id, name);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Thread {
bool am_self() const;
int kill(int signal);
int try_create(size_t stacksize);
void create(size_t stacksize = 0);
void create(const char * name, size_t stacksize = 0);
int join(void **prval = 0);
int detach();
int set_ioprio(int cls, int prio);
Expand Down
2 changes: 1 addition & 1 deletion src/common/Timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void SafeTimer::init()
{
ldout(cct,10) << "init" << dendl;
thread = new SafeTimerThread(this);
thread->create();
thread->create("safe timer");
}

void SafeTimer::shutdown()
Expand Down
4 changes: 2 additions & 2 deletions src/common/WorkQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ void ThreadPool::start_threads()
if (r < 0)
lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;

wt->create();
wt->create("thread pool");
}
}

Expand Down Expand Up @@ -356,7 +356,7 @@ void ShardedThreadPool::start_threads()
WorkThreadSharded *wt = new WorkThreadSharded(this, thread_index);
ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;
threads_shardedpool.push_back(wt);
wt->create();
wt->create("sharded th pool");
thread_index++;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/admin_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ bool AdminSocket::init(const std::string &path)
register_command("get_command_descriptions", "get_command_descriptions",
m_getdescs_hook, "list available commands");

create();
create("admin socket");
add_cleanup_file(m_path.c_str());
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/ceph_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ void CephContext::start_service_thread()
return;
}
_service_thread = new CephContextServiceThread(this);
_service_thread->create();
_service_thread->create("service");
ceph_spin_unlock(&_service_thread_lock);

// make logs flush on_exit()
Expand Down
3 changes: 3 additions & 0 deletions src/common/obj_bencher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ int ObjBencher::write_bench(int secondsToRun,
pthread_t print_thread;

pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
pthread_setname_np(print_thread, "write stat");
lock.Lock();
data.finished = 0;
data.start_time = ceph_clock_now(cct);
Expand Down Expand Up @@ -610,6 +611,7 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre

pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
pthread_setname_np(print_thread, "seq read stat");

utime_t finish_time = data.start_time + time_to_run;
//start initial reads
Expand Down Expand Up @@ -828,6 +830,7 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr

pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
pthread_setname_np(print_thread, "rand read stat");

utime_t finish_time = data.start_time + time_to_run;
//start initial reads
Expand Down
2 changes: 1 addition & 1 deletion src/global/signal_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ struct SignalHandler : public Thread {
assert(r == 0);

// create thread
create();
create("sginal handler");
}

~SignalHandler() {
Expand Down
2 changes: 1 addition & 1 deletion src/log/Log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ void Log::start()
pthread_mutex_lock(&m_queue_mutex);
m_stop = false;
pthread_mutex_unlock(&m_queue_mutex);
create();
create("log");
}

void Log::stop()
Expand Down
10 changes: 5 additions & 5 deletions src/mds/MDLog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,17 @@ void MDLog::create(MDSInternalContextBase *c)
logger->set(l_mdl_expos, journaler->get_expire_pos());
logger->set(l_mdl_wrpos, journaler->get_write_pos());

submit_thread.create();
submit_thread.create("md sumbmit");
}

void MDLog::open(MDSInternalContextBase *c)
{
dout(5) << "open discovering log bounds" << dendl;

recovery_thread.set_completion(c);
recovery_thread.create();
recovery_thread.create("md recov open");

submit_thread.create();
submit_thread.create("md sumbit");
// either append() or replay() will follow.
}

Expand Down Expand Up @@ -211,7 +211,7 @@ void MDLog::reopen(MDSInternalContextBase *c)
journaler = NULL;

recovery_thread.set_completion(new C_ReopenComplete(this, c));
recovery_thread.create();
recovery_thread.create("md recov reopen");
}

void MDLog::append()
Expand Down Expand Up @@ -837,7 +837,7 @@ void MDLog::replay(MDSInternalContextBase *c)
assert(num_events == 0 || already_replayed);
already_replayed = true;

replay_thread.create();
replay_thread.create("md log replay");
}


Expand Down
3 changes: 1 addition & 2 deletions src/mds/MDSRank.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ void MDSRankDispatcher::init()
// Expose the OSDMap (already populated during MDS::init) to anyone
// who is interested in it.
handle_osd_map();

progress_thread.create();
progress_thread.create("mds rank progr");

finisher->start();
}
Expand Down
2 changes: 1 addition & 1 deletion src/msg/async/AsyncMessenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ void WorkerPool::start()
{
if (!started) {
for (uint64_t i = 0; i < workers.size(); ++i) {
workers[i]->create();
workers[i]->create("woker");
}
started = true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/msg/simple/Accepter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ int Accepter::start()
ldout(msgr->cct,1) << "accepter.start" << dendl;

// start thread
create();
create("accepter");

return 0;
}
Expand Down
4 changes: 2 additions & 2 deletions src/msg/simple/DispatchQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ void DispatchQueue::start()
{
assert(!stop);
assert(!dispatch_thread.is_started());
dispatch_thread.create();
local_delivery_thread.create();
dispatch_thread.create("dispatch queue");
local_delivery_thread.create("local delivery");
}

void DispatchQueue::wait()
Expand Down
6 changes: 3 additions & 3 deletions src/msg/simple/Pipe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ void Pipe::start_reader()
reader_needs_join = false;
}
reader_running = true;
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
reader_thread.create("pipe reader", msgr->cct->_conf->ms_rwthread_stack_bytes);
}

void Pipe::maybe_start_delay_thread()
Expand All @@ -142,7 +142,7 @@ void Pipe::maybe_start_delay_thread()
msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) != string::npos) {
lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
delay_thread = new DelayedDelivery(this);
delay_thread->create();
delay_thread->create("pipe");
}
}

Expand All @@ -151,7 +151,7 @@ void Pipe::start_writer()
assert(pipe_lock.is_locked());
assert(!writer_running);
writer_running = true;
writer_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
writer_thread.create("pipe writer", msgr->cct->_conf->ms_rwthread_stack_bytes);
}

void Pipe::join_reader()
Expand Down
2 changes: 1 addition & 1 deletion src/msg/simple/SimpleMessenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ int SimpleMessenger::start()
lock.Unlock();

reaper_started = true;
reaper_thread.create();
reaper_thread.create("reaper");
return 0;
}

Expand Down
4 changes: 2 additions & 2 deletions src/os/FileJournal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -687,10 +687,10 @@ void FileJournal::start_writer()
{
write_stop = false;
aio_stop = false;
write_thread.create();
write_thread.create("journal write");
#ifdef HAVE_LIBAIO
if (aio)
write_finish_thread.create();
write_finish_thread.create("journal wrt fin");
#endif
}

Expand Down
2 changes: 1 addition & 1 deletion src/os/FileStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1591,7 +1591,7 @@ int FileStore::mount()
}

wbthrottle.start();
sync_thread.create();
sync_thread.create("filestore sync");

if (!(generic_flags & SKIP_JOURNAL_REPLAY)) {
ret = journal_replay(initial_op_seq);
Expand Down
2 changes: 1 addition & 1 deletion src/os/LevelDBStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,6 @@ void LevelDBStore::compact_range_async(const string& start, const string& end)
}
compact_queue_cond.Signal();
if (!compact_thread.is_started()) {
compact_thread.create();
compact_thread.create("levdbst compact");
}
}
2 changes: 1 addition & 1 deletion src/os/WBThrottle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void WBThrottle::start()
Mutex::Locker l(lock);
stopping = false;
}
create();
create("wb throttle");
}

void WBThrottle::stop()
Expand Down
4 changes: 2 additions & 2 deletions src/os/newstore/NewStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ int NewStore::_aio_start()
int r = aio_queue.init();
if (r < 0)
return r;
aio_thread.create();
aio_thread.create("newstore aio");
}
return 0;
}
Expand Down Expand Up @@ -1004,7 +1004,7 @@ int NewStore::mount()
finisher.start();
fsync_tp.start();
wal_tp.start();
kv_sync_thread.create();
kv_sync_thread.create("newstore kv syn");

mounted = true;
return 0;
Expand Down
5 changes: 2 additions & 3 deletions src/osd/OSD.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,7 @@ void OSDService::init()
objecter->start();
watch_timer.init();
agent_timer.init();

agent_thread.create();
agent_thread.create("osd srv agent");
}

void OSDService::activate_map()
Expand Down Expand Up @@ -1921,7 +1920,7 @@ int OSD::init()
set_disk_tp_priority();

// start the heartbeat
heartbeat_thread.create();
heartbeat_thread.create("osd srv heartbt");

// tick
tick_timer.add_event_after(cct->_conf->osd_heartbeat_interval, new C_Tick(this));
Expand Down
2 changes: 1 addition & 1 deletion src/osdc/ObjectCacher.h
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ class ObjectCacher {
~ObjectCacher();

void start() {
flusher_thread.create();
flusher_thread.create("flusher");
}
void stop() {
assert(flusher_thread.is_started());
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class RGWDataChangesLog {
}

renew_thread = new ChangesRenewThread(cct, this);
renew_thread->create();
renew_thread->create("rgw dt lg renew");
}

~RGWDataChangesLog();
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ bool RGWGC::going_down()
void RGWGC::start_processor()
{
worker = new GCWorker(cct, this);
worker->create();
worker->create("rgw gc");
}

void RGWGC::stop_processor()
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ class RGWProcessFrontend : public RGWFrontend {
int run() {
assert(pprocess); /* should have initialized by init() */
thread = new RGWProcessControlThread(pprocess);
thread->create();
thread->create("rgw frontend");
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_object_expirer_core.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ bool RGWObjectExpirer::going_down()
void RGWObjectExpirer::start_processor()
{
worker = new OEWorker(store->ctx(), this);
worker->create();
worker->create("rgw obj expirer");
}

void RGWObjectExpirer::stop_processor()
Expand Down

0 comments on commit e486653

Please sign in to comment.