Skip to content

Commit

Permalink
finisher: Update to C++11 concurrency
Browse files Browse the repository at this point in the history
The only visible effect of this change is that the constructor no longer takes a
name for the finisher thread.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
  • Loading branch information
adamemerson committed Feb 15, 2016
1 parent a682d77 commit 81530ad
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 98 deletions.
58 changes: 30 additions & 28 deletions src/common/Finisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// vim: ts=8 sw=2 smarttab

#include "common/config.h"
#include "common/ceph_time.h"

#include "Finisher.h"

#include "common/debug.h"
Expand All @@ -12,52 +14,53 @@
void Finisher::start()
{
ldout(cct, 10) << __func__ << dendl;
finisher_thread.create(thread_name.c_str());

This comment has been minimized.

Copy link
@branch-predictor

branch-predictor Feb 16, 2016

What about thread names? It appears to me that this reverts ceph#5882 without providing any replacement (or I just don't see it...).

This comment has been minimized.

Copy link
@cbodley

cbodley Feb 16, 2016

std::thread doesn't support thread names directly, but we could always use std::thread::native_handle() to call pthread_setname_np(). since we can't add a new ctor to std::thread, maybe a new factory function like make_named_thread()?

This comment has been minimized.

Copy link
@adamemerson

adamemerson Feb 16, 2016

Author Owner

We could do make_named_thread taking a string and then the rest of the arguments to thread::thread.

If we plan to do lots of stuff we might want to consider subclassing std::thread.

This comment has been minimized.

Copy link
@branch-predictor

branch-predictor Feb 17, 2016

I vote for factory. At this point I don't think we need anything more than that, and we'll prevent any confusion (it'll still be std::thread), with only potential problem being unnamed thread when someone uses std::thread ctor instead of factory which may be annoying but nowhere near fatal. We don't need to store thread name too, making subclassing less valuable.

This comment has been minimized.

Copy link
@adamemerson

adamemerson Feb 17, 2016

Author Owner

All right. Newest push has a make_named_thread method and I use that in the Finisher

This comment has been minimized.

Copy link
@branch-predictor

branch-predictor Feb 17, 2016

Thanks! Hopefully it'll be used thorough rest of codebase.

finisher_thread = std::thread(&Finisher::finisher_thread_entry, this);
}

void Finisher::stop()
{
ldout(cct, 10) << __func__ << dendl;
finisher_lock.Lock();
finisher_stop = true;
// we don't have any new work to do, but we want the worker to wake up anyway
// to process the stop condition.
finisher_cond.Signal();
finisher_lock.Unlock();
{
lock_guard l(finisher_lock);
finisher_stop = true;
// we don't have any new work to do, but we want the worker to
// wake up anyway to process the stop condition.
finisher_cond.notify_one();
}
finisher_thread.join(); // wait until the worker exits completely
ldout(cct, 10) << __func__ << " finish" << dendl;
}

void Finisher::wait_for_empty()
{
finisher_lock.Lock();
while (!finisher_queue.empty() || finisher_running) {
ldout(cct, 10) << "wait_for_empty waiting" << dendl;
finisher_empty_cond.Wait(finisher_lock);
}
unique_lock l(finisher_lock);
ldout(cct, 10) << "wait_for_empty waiting" << dendl;
finisher_empty_cond.wait(l, [this] {
return finisher_queue.empty() && !finisher_running;
});
ldout(cct, 10) << "wait_for_empty empty" << dendl;
finisher_lock.Unlock();
}

void *Finisher::finisher_thread_entry()
void Finisher::finisher_thread_entry()
{
finisher_lock.Lock();
unique_lock l(finisher_lock);
ldout(cct, 10) << "finisher_thread start" << dendl;

utime_t start;
ceph::coarse_mono_time start;
while (!finisher_stop) {
/// Every time we are woken up, we process the queue until it is empty.
while (!finisher_queue.empty()) {
if (logger)
start = ceph_clock_now(cct);
start = ceph::coarse_mono_clock::now();
// To reduce lock contention, we swap out the queue to process.
// This way other threads can submit new contexts to complete while we are working.
// This way other threads can submit new contexts to complete
// while we are working.
vector<Context*> ls;
list<pair<Context*,int> > ls_rval;
ls.swap(finisher_queue);
ls_rval.swap(finisher_queue_rval);
finisher_running = true;
finisher_lock.Unlock();
l.unlock();
ldout(cct, 10) << "finisher_thread doing " << ls << dendl;

// Now actually process the contexts.
Expand All @@ -78,30 +81,29 @@ void *Finisher::finisher_thread_entry()
}
if (logger) {
logger->dec(l_finisher_queue_len);
logger->tinc(l_finisher_complete_lat, ceph_clock_now(cct) - start);
}
logger->tinc(l_finisher_complete_lat,
ceph::coarse_mono_clock::now() - start);
}
}
ldout(cct, 10) << "finisher_thread done with " << ls << dendl;
ls.clear();

finisher_lock.Lock();
l.lock();
finisher_running = false;
}
ldout(cct, 10) << "finisher_thread empty" << dendl;
finisher_empty_cond.Signal();
finisher_empty_cond.notify_all();
if (finisher_stop)
break;

ldout(cct, 10) << "finisher_thread sleeping" << dendl;
finisher_cond.Wait(finisher_lock);
finisher_cond.wait(l);
}
// If we are exiting, we signal the thread waiting in stop(),
// otherwise it would never unblock
finisher_empty_cond.Signal();
finisher_empty_cond.notify_all();

ldout(cct, 10) << "finisher_thread stop" << dendl;
finisher_stop = false;
finisher_lock.Unlock();
return 0;
}

139 changes: 74 additions & 65 deletions src/common/Finisher.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
Expand All @@ -7,18 +7,20 @@
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*
*/

#ifndef CEPH_FINISHER_H
#define CEPH_FINISHER_H

#include "include/atomic.h"
#include "common/Mutex.h"
#include "common/Cond.h"
#include "common/Thread.h"
#include <condition_variable>
#include <mutex>
#include <thread>

#include "include/Context.h"

#include "common/perf_counters.h"

class CephContext;
Expand All @@ -38,40 +40,52 @@ enum {
*/
class Finisher {
CephContext *cct;
Mutex finisher_lock; ///< Protects access to queues and finisher_running.
Cond finisher_cond; ///< Signaled when there is something to process.
Cond finisher_empty_cond; ///< Signaled when the finisher has nothing more to process.
bool finisher_stop; ///< Set when the finisher should stop.
bool finisher_running; ///< True when the finisher is currently executing contexts.
std::mutex finisher_lock; ///< Protects access to queues and
///< finisher_running.
using lock_guard = std::lock_guard<decltype(finisher_lock)>;
using unique_lock = std::unique_lock<decltype(finisher_lock)>;
std::condition_variable finisher_cond; ///< Signaled when there is
///< something to process.
std::condition_variable finisher_empty_cond; ///< Signaled when the
///< finisher has nothing
///< more to process.
bool finisher_stop = false; ///< Set when the finisher should stop.
bool finisher_running = false; ///< True when the finisher is currently
///< executing tasks.
/// Queue for contexts for which complete(0) will be called.
/// NULLs in this queue indicate that an item from finisher_queue_rval
/// should be completed in that place instead.
vector<Context*> finisher_queue;

string thread_name;

/// Queue for contexts for which the complete function will be called
/// with a parameter other than 0.
list<pair<Context*,int> > finisher_queue_rval;

class PerfCountersDeleter {
CephContext* cct;

public:
PerfCountersDeleter(CephContext* cct = nullptr) : cct(cct) {}
void operator()(PerfCounters* p) {
if (cct)
cct->get_perfcounters_collection()->remove(p);
delete p;
}
};

/// Performance counter for the finisher's queue length.
/// Only active for named finishers.
PerfCounters *logger;

void *finisher_thread_entry();
std::unique_ptr<PerfCounters, PerfCountersDeleter> logger;

struct FinisherThread : public Thread {
Finisher *fin;
explicit FinisherThread(Finisher *f) : fin(f) {}
void* entry() { return (void*)fin->finisher_thread_entry(); }
} finisher_thread;
std::thread finisher_thread;

public:
/// Add a context to complete, optionally specifying a parameter for the complete function.
/// Add a context to complete, optionally specifying a parameter for
/// the complete function.
void queue(Context *c, int r = 0) {
finisher_lock.Lock();
lock_guard l(finisher_lock);
if (finisher_queue.empty()) {
finisher_cond.Signal();
finisher_cond.notify_one();
}
if (r) {
finisher_queue_rval.push_back(pair<Context*, int>(c, r));
Expand All @@ -80,39 +94,41 @@ class Finisher {
finisher_queue.push_back(c);
if (logger)
logger->inc(l_finisher_queue_len);
finisher_lock.Unlock();
}
void queue(vector<Context*>& ls) {
finisher_lock.Lock();
if (finisher_queue.empty()) {
finisher_cond.Signal();
{
lock_guard l(finisher_lock);
if (finisher_queue.empty()) {
finisher_cond.notify_one();
}
finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
}
finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
finisher_lock.Unlock();
ls.clear();
}
void queue(deque<Context*>& ls) {
finisher_lock.Lock();
if (finisher_queue.empty()) {
finisher_cond.Signal();
{
lock_guard l(finisher_lock);
if (finisher_queue.empty()) {
finisher_cond.notify_one();
}
finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
}
finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
finisher_lock.Unlock();
ls.clear();
}
void queue(list<Context*>& ls) {
finisher_lock.Lock();
if (finisher_queue.empty()) {
finisher_cond.Signal();
{
lock_guard l(finisher_lock);
if (finisher_queue.empty()) {
finisher_cond.notify_one();
}
finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
}
finisher_queue.insert(finisher_queue.end(), ls.begin(), ls.end());
if (logger)
logger->inc(l_finisher_queue_len, ls.size());
finisher_lock.Unlock();
ls.clear();
}

Expand All @@ -128,40 +144,33 @@ class Finisher {
void stop();

/** @brief Blocks until the finisher has nothing left to process.
*
* This function will also return when a concurrent call to stop()
* finishes, but this class should never be used in this way. */
void wait_for_empty();

/// The worker function of the Finisher
void finisher_thread_entry();

/// Construct an anonymous Finisher.
/// Anonymous finishers do not log their queue length.
explicit Finisher(CephContext *cct_) :
cct(cct_), finisher_lock("Finisher::finisher_lock"),
finisher_stop(false), finisher_running(false),
thread_name("fn_anonymous"), logger(0),
finisher_thread(this) {}
explicit Finisher(CephContext *cct) : cct(cct) {}

/// Construct a named Finisher that logs its queue length.
Finisher(CephContext *cct_, string name, string tn) :
cct(cct_), finisher_lock("Finisher::finisher_lock"),
finisher_stop(false), finisher_running(false),
thread_name(tn), logger(0),
finisher_thread(this) {
Finisher(CephContext *cct, string name) :
cct(cct) {
PerfCountersBuilder b(cct, string("finisher-") + name,
l_finisher_first, l_finisher_last);
b.add_u64(l_finisher_queue_len, "queue_len");
b.add_time_avg(l_finisher_complete_lat, "complete_latency");
logger = b.create_perf_counters();
cct->get_perfcounters_collection()->add(logger);
logger = decltype(logger)(b.create_perf_counters(),
PerfCountersDeleter(cct));
cct->get_perfcounters_collection()->add(logger.get());
logger->set(l_finisher_queue_len, 0);
logger->set(l_finisher_complete_lat, 0);
}

~Finisher() {
if (logger && cct) {
cct->get_perfcounters_collection()->remove(logger);
delete logger;
}
}
~Finisher() = default;
};

/// Context that is completed asynchronously on the supplied finisher.
Expand Down
2 changes: 1 addition & 1 deletion src/mon/MonitorDBStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ class MonitorDBStore
do_dump(false),
dump_fd_binary(-1),
dump_fmt(true),
io_work(g_ceph_context, "monstore", "fn_monstore"),
io_work(g_ceph_context, "monstore"),
is_open(false) {
string::const_reverse_iterator rit;
int pos = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/os/filestore/FileStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -560,13 +560,13 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, osflagbit
for (int i = 0; i < m_ondisk_finisher_num; ++i) {
ostringstream oss;
oss << "filestore-ondisk-" << i;
Finisher *f = new Finisher(g_ceph_context, oss.str(), "fn_odsk_fstore");
Finisher *f = new Finisher(g_ceph_context, oss.str());
ondisk_finishers.push_back(f);
}
for (int i = 0; i < m_apply_finisher_num; ++i) {
ostringstream oss;
oss << "filestore-apply-" << i;
Finisher *f = new Finisher(g_ceph_context, oss.str(), "fn_appl_fstore");
Finisher *f = new Finisher(g_ceph_context, oss.str());
apply_finishers.push_back(f);
}

Expand Down
2 changes: 1 addition & 1 deletion src/os/filestore/JournalingObjectStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class JournalingObjectStore : public ObjectStore {
explicit JournalingObjectStore(const std::string& path)
: ObjectStore(path),
journal(NULL),
finisher(g_ceph_context, "JournalObjectStore", "fn_jrn_objstore"),
finisher(g_ceph_context, "JournalObjectStore"),
apply_manager(journal, finisher),
replaying(false) {}

Expand Down
2 changes: 1 addition & 1 deletion src/tools/cephfs/MDSUtility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ MDSUtility::MDSUtility() :
objecter(NULL),
lock("MDSUtility::lock"),
timer(g_ceph_context, lock),
finisher(g_ceph_context, "MDSUtility", "fn_mds_utility"),
finisher(g_ceph_context, "MDSUtility"),
waiting_for_mds_map(NULL)
{
monc = new MonClient(g_ceph_context);
Expand Down

0 comments on commit 81530ad

Please sign in to comment.