Skip to content

Commit

Permalink
rbd-mirror: integrate single thread pool for all processing
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
  • Loading branch information
Jason Dillaman committed Mar 8, 2016
1 parent 3496e77 commit 7fd230f
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 11 deletions.
7 changes: 6 additions & 1 deletion src/test/rbd_mirror/image_replay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/Threads.h"

#include <string>
#include <vector>
Expand Down Expand Up @@ -128,6 +129,7 @@ int main(int argc, const char **argv)

rbd::mirror::RadosRef local(new librados::Rados());
rbd::mirror::RadosRef remote(new librados::Rados());
rbd::mirror::Threads *threads = nullptr;

int r = local->init_with_context(g_ceph_context);
if (r < 0) {
Expand Down Expand Up @@ -170,7 +172,9 @@ int main(int argc, const char **argv)

dout(5) << "starting replay" << dendl;

replayer = new rbd::mirror::ImageReplayer(local, remote, client_id,
threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
local->cct()));
replayer = new rbd::mirror::ImageReplayer(threads, local, remote, client_id,
remote_pool_id, remote_image_id);

r = replayer->start(&bootstap_params);
Expand Down Expand Up @@ -198,6 +202,7 @@ int main(int argc, const char **argv)
shutdown_async_signal_handler();

delete replayer;
delete threads;
g_ceph_context->put();

return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
Expand Down
7 changes: 7 additions & 0 deletions src/test/rbd_mirror/test_ImageReplayer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "librbd/internal.h"
#include "tools/rbd_mirror/types.h"
#include "tools/rbd_mirror/ImageReplayer.h"
#include "tools/rbd_mirror/Threads.h"

#include "test/librados/test.h"
#include "gtest/gtest.h"
Expand Down Expand Up @@ -98,7 +99,11 @@ class TestImageReplayer : public ::testing::Test {
false, features, &order, 0, 0));
m_remote_image_id = get_image_id(m_remote_ioctx, m_image_name);

m_threads = new rbd::mirror::Threads(reinterpret_cast<CephContext*>(
m_local_ioctx.cct()));

m_replayer = new rbd::mirror::ImageReplayer(
m_threads,
rbd::mirror::RadosRef(new librados::Rados(m_local_ioctx)),
rbd::mirror::RadosRef(new librados::Rados(m_remote_ioctx)),
m_client_id, remote_pool_id, m_remote_image_id);
Expand All @@ -109,6 +114,7 @@ class TestImageReplayer : public ::testing::Test {
~TestImageReplayer()
{
delete m_replayer;
delete m_threads;

EXPECT_EQ(0, m_remote_cluster.pool_delete(m_remote_pool_name.c_str()));
EXPECT_EQ(0, m_local_cluster.pool_delete(m_local_pool_name.c_str()));
Expand Down Expand Up @@ -311,6 +317,7 @@ class TestImageReplayer : public ::testing::Test {

static int _image_number;

rbd::mirror::Threads *m_threads = nullptr;
librados::Rados m_local_cluster, m_remote_cluster;
std::string m_client_id;
std::string m_local_pool_name, m_remote_pool_name;
Expand Down
2 changes: 2 additions & 0 deletions src/tools/Makefile-client.am
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ librbd_mirror_internal_la_SOURCES = \
tools/rbd_mirror/Mirror.cc \
tools/rbd_mirror/PoolWatcher.cc \
tools/rbd_mirror/Replayer.cc \
tools/rbd_mirror/Threads.cc \
tools/rbd_mirror/types.cc
noinst_LTLIBRARIES += librbd_mirror_internal.la
noinst_HEADERS += \
Expand All @@ -94,6 +95,7 @@ noinst_HEADERS += \
tools/rbd_mirror/Mirror.h \
tools/rbd_mirror/PoolWatcher.h \
tools/rbd_mirror/Replayer.h \
tools/rbd_mirror/Threads.h \
tools/rbd_mirror/types.h

rbd_mirror_SOURCES = \
Expand Down
18 changes: 14 additions & 4 deletions src/tools/rbd_mirror/ImageReplayer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "common/errno.h"
#include "include/stringify.h"
#include "cls/rbd/cls_rbd_client.h"
#include "common/Timer.h"
#include "common/WorkQueue.h"
#include "journal/Journaler.h"
#include "journal/ReplayEntry.h"
#include "journal/ReplayHandler.h"
Expand All @@ -18,6 +20,7 @@
#include "librbd/internal.h"
#include "librbd/journal/Replay.h"
#include "ImageReplayer.h"
#include "Threads.h"

#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
Expand Down Expand Up @@ -158,10 +161,11 @@ class ImageReplayerAdminSocketHook : public AdminSocketHook {
Commands commands;
};

ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote,
ImageReplayer::ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
const std::string &client_id,
int64_t remote_pool_id,
const std::string &remote_image_id) :
m_threads(threads),
m_local(local),
m_remote(remote),
m_client_id(client_id),
Expand All @@ -183,6 +187,8 @@ ImageReplayer::ImageReplayer(RadosRef local, RadosRef remote,

ImageReplayer::~ImageReplayer()
{
m_threads->work_queue->drain();

assert(m_local_image_ctx == nullptr);
assert(m_local_replay == nullptr);
assert(m_remote_journaler == nullptr);
Expand Down Expand Up @@ -219,9 +225,13 @@ int ImageReplayer::start(const BootstrapParams *bootstrap_params)
}

CephContext *cct = static_cast<CephContext *>(m_local->cct());

commit_interval = cct->_conf->rbd_journal_commit_age;
bool remote_journaler_initialized = false;
m_remote_journaler = new ::journal::Journaler(m_remote_ioctx,
m_remote_journaler = new ::journal::Journaler(m_threads->work_queue,
m_threads->timer,
&m_threads->timer_lock,
m_remote_ioctx,
remote_journal_id,
m_client_id, commit_interval);
r = get_registered_client_status(&registered);
Expand Down Expand Up @@ -315,7 +325,7 @@ int ImageReplayer::start(const BootstrapParams *bootstrap_params)
if (m_remote_journaler) {
if (remote_journaler_initialized) {
m_remote_journaler->stop_replay();
m_remote_journaler->shutdown();
m_remote_journaler->shut_down();
}
delete m_remote_journaler;
m_remote_journaler = nullptr;
Expand Down Expand Up @@ -378,7 +388,7 @@ void ImageReplayer::stop()
m_local_ioctx.close();

m_remote_journaler->stop_replay();
m_remote_journaler->shutdown();
m_remote_journaler->shut_down();
delete m_remote_journaler;
m_remote_journaler = nullptr;

Expand Down
9 changes: 7 additions & 2 deletions src/tools/rbd_mirror/ImageReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include "include/rados/librados.hpp"
#include "types.h"

class ContextWQ;

namespace journal {

class Journaler;
Expand All @@ -37,6 +39,7 @@ namespace rbd {
namespace mirror {

class ImageReplayerAdminSocketHook;
struct Threads;

/**
* Replays changes from a remote cluster for a single image.
Expand Down Expand Up @@ -64,8 +67,9 @@ class ImageReplayer {
};

public:
ImageReplayer(RadosRef local, RadosRef remote, const std::string &client_id,
int64_t remote_pool_id, const std::string &remote_image_id);
ImageReplayer(Threads *threads, RadosRef local, RadosRef remote,
const std::string &client_id, int64_t remote_pool_id,
const std::string &remote_image_id);
virtual ~ImageReplayer();
ImageReplayer(const ImageReplayer&) = delete;
ImageReplayer& operator=(const ImageReplayer&) = delete;
Expand Down Expand Up @@ -98,6 +102,7 @@ class ImageReplayer {
friend std::ostream &operator<<(std::ostream &os,
const ImageReplayer &replayer);
private:
Threads *m_threads;
RadosRef m_local, m_remote;
std::string m_client_id;
int64_t m_remote_pool_id, m_local_pool_id;
Expand Down
5 changes: 4 additions & 1 deletion src/tools/rbd_mirror/Mirror.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "common/debug.h"
#include "common/errno.h"
#include "Mirror.h"
#include "Threads.h"

#define dout_subsys ceph_subsys_rbd_mirror
#undef dout_prefix
Expand All @@ -31,6 +32,8 @@ Mirror::Mirror(CephContext *cct) :
m_lock("rbd::mirror::Mirror"),
m_local(new librados::Rados())
{
cct->lookup_or_create_singleton_object<Threads>(m_threads,
"rbd_mirror::threads");
}

void Mirror::handle_signal(int signum)
Expand Down Expand Up @@ -76,7 +79,7 @@ void Mirror::update_replayers(const map<peer_t, set<int64_t> > &peer_configs)
for (auto &kv : peer_configs) {
const peer_t &peer = kv.first;
if (m_replayers.find(peer) == m_replayers.end()) {
unique_ptr<Replayer> replayer(new Replayer(m_local, peer));
unique_ptr<Replayer> replayer(new Replayer(m_threads, m_local, peer));
// TODO: make async, and retry connecting within replayer
int r = replayer->init();
if (r < 0) {
Expand Down
3 changes: 3 additions & 0 deletions src/tools/rbd_mirror/Mirror.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
namespace rbd {
namespace mirror {

struct Threads;

/**
* Contains the main loop and overall state for rbd-mirror.
*
Expand All @@ -40,6 +42,7 @@ class Mirror {
void update_replayers(const map<peer_t, set<int64_t> > &peer_configs);

CephContext *m_cct;
Threads *m_threads = nullptr;
Mutex m_lock;
Cond m_cond;
RadosRef m_local;
Expand Down
7 changes: 5 additions & 2 deletions src/tools/rbd_mirror/Replayer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ using std::vector;
namespace rbd {
namespace mirror {

Replayer::Replayer(RadosRef local_cluster, const peer_t &peer) :
Replayer::Replayer(Threads *threads, RadosRef local_cluster,
const peer_t &peer) :
m_threads(threads),
m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)),
m_peer(peer),
m_local(local_cluster),
Expand Down Expand Up @@ -122,7 +124,8 @@ void Replayer::set_sources(const map<int64_t, set<string> > &images)
auto &pool_replayers = m_images[pool_id];
for (const auto &image_id : kv.second) {
if (pool_replayers.find(image_id) == pool_replayers.end()) {
unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_local,
unique_ptr<ImageReplayer> image_replayer(new ImageReplayer(m_threads,
m_local,
m_remote,
m_client_id,
pool_id,
Expand Down
5 changes: 4 additions & 1 deletion src/tools/rbd_mirror/Replayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
namespace rbd {
namespace mirror {

struct Threads;

/**
* Controls mirroring for a single remote cluster.
*/
class Replayer {
public:
Replayer(RadosRef local_cluster, const peer_t &peer);
Replayer(Threads *threads, RadosRef local_cluster, const peer_t &peer);
~Replayer();
Replayer(const Replayer&) = delete;
Replayer& operator=(const Replayer&) = delete;
Expand All @@ -40,6 +42,7 @@ class Replayer {
private:
void set_sources(const std::map<int64_t, std::set<std::string> > &images);

Threads *m_threads;
Mutex m_lock;
Cond m_cond;
atomic_t m_stopping;
Expand Down
38 changes: 38 additions & 0 deletions src/tools/rbd_mirror/Threads.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include "tools/rbd_mirror/Threads.h"
#include "common/Timer.h"
#include "common/WorkQueue.h"

namespace rbd {
namespace mirror {

Threads::Threads(CephContext *cct) : timer_lock("Threads::timer_lock") {
thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal",
cct->_conf->rbd_op_threads, "rbd_op_threads");
thread_pool->start();

work_queue = new ContextWQ("Journaler::work_queue",
cct->_conf->rbd_op_thread_timeout, thread_pool);

timer = new SafeTimer(cct, timer_lock, true);
timer->init();
}

Threads::~Threads() {
{
Mutex::Locker timer_locker(timer_lock);
timer->shutdown();
}
delete timer;

work_queue->drain();
delete work_queue;

thread_pool->stop();
delete thread_pool;
}

} // namespace mirror
} // namespace rbd
34 changes: 34 additions & 0 deletions src/tools/rbd_mirror/Threads.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef CEPH_RBD_MIRROR_THREADS_H
#define CEPH_RBD_MIRROR_THREADS_H

#include "common/Mutex.h"

class CephContext;
class ContextWQ;
class SafeTimer;
class ThreadPool;

namespace rbd {
namespace mirror {

struct Threads {
ThreadPool *thread_pool = nullptr;
ContextWQ *work_queue = nullptr;

SafeTimer *timer = nullptr;
Mutex timer_lock;

explicit Threads(CephContext *cct);
Threads(const Threads&) = delete;
Threads& operator=(const Threads&) = delete;

~Threads();
};

} // namespace mirror
} // namespace rbd

#endif // CEPH_RBD_MIRROR_THREADS_H

0 comments on commit 7fd230f

Please sign in to comment.