Skip to content

Commit

Permalink
Merge pull request #35629 from dillaman/wip-librbd-asio-2
Browse files Browse the repository at this point in the history
librbd: switch IO path to use new librados asio API

Reviewed-by: Mykola Golub <mgolub@suse.com>
  • Loading branch information
trociny committed Jul 18, 2020
2 parents 4fef026 + 5b2f820 commit c20868a
Show file tree
Hide file tree
Showing 159 changed files with 2,011 additions and 951 deletions.
59 changes: 46 additions & 13 deletions src/cls/rbd/cls_rbd_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "include/encoding.h"
#include "include/rbd_types.h"
#include "include/rados/librados.hpp"
#include "include/neorados/RADOS.hpp"
#include "common/bit_vector.hpp"

#include <errno.h>
Expand Down Expand Up @@ -841,10 +842,19 @@ int get_all_features(librados::IoCtx *ioctx, const std::string &oid,
return get_all_features_finish(&it, all_features);
}

void copyup(librados::ObjectWriteOperation *op, bufferlist data) {
template <typename O>
void copyup(O* op, ceph::buffer::list data) {
op->exec("rbd", "copyup", data);
}

void copyup(neorados::WriteOp* op, ceph::buffer::list data) {
copyup<neorados::WriteOp>(op, data);
}

void copyup(librados::ObjectWriteOperation *op, bufferlist data) {
copyup<librados::ObjectWriteOperation>(op, data);
}

int copyup(librados::IoCtx *ioctx, const std::string &oid,
bufferlist data) {
librados::ObjectWriteOperation op;
Expand All @@ -853,15 +863,26 @@ int copyup(librados::IoCtx *ioctx, const std::string &oid,
return ioctx->operate(oid, &op);
}

void sparse_copyup(librados::ObjectWriteOperation *op,
const std::map<uint64_t, uint64_t> &extent_map,
bufferlist data) {
template <typename O, typename E>
void sparse_copyup(O* op, const E& extent_map, ceph::buffer::list data) {
bufferlist bl;
encode(extent_map, bl);
encode(data, bl);
op->exec("rbd", "sparse_copyup", bl);
}

void sparse_copyup(neorados::WriteOp* op,
const std::map<uint64_t, uint64_t> &extent_map,
ceph::buffer::list data) {
sparse_copyup<neorados::WriteOp>(op, extent_map, data);
}

void sparse_copyup(librados::ObjectWriteOperation *op,
const std::map<uint64_t, uint64_t> &extent_map,
bufferlist data) {
sparse_copyup<librados::ObjectWriteOperation>(op, extent_map, data);
}

int sparse_copyup(librados::IoCtx *ioctx, const std::string &oid,
const std::map<uint64_t, uint64_t> &extent_map,
bufferlist data) {
Expand Down Expand Up @@ -1732,6 +1753,27 @@ void migration_remove(librados::ObjectWriteOperation *op) {
op->exec("rbd", "migration_remove", bl);
}

template <typename O>
void assert_snapc_seq(O* op, uint64_t snapc_seq,
cls::rbd::AssertSnapcSeqState state) {
bufferlist bl;
encode(snapc_seq, bl);
encode(state, bl);
op->exec("rbd", "assert_snapc_seq", bl);
}

void assert_snapc_seq(neorados::WriteOp* op,
uint64_t snapc_seq,
cls::rbd::AssertSnapcSeqState state) {
assert_snapc_seq<neorados::WriteOp>(op, snapc_seq, state);
}

void assert_snapc_seq(librados::ObjectWriteOperation *op,
uint64_t snapc_seq,
cls::rbd::AssertSnapcSeqState state) {
assert_snapc_seq<librados::ObjectWriteOperation>(op, snapc_seq, state);
}

int assert_snapc_seq(librados::IoCtx *ioctx, const std::string &oid,
uint64_t snapc_seq,
cls::rbd::AssertSnapcSeqState state) {
Expand All @@ -1740,15 +1782,6 @@ int assert_snapc_seq(librados::IoCtx *ioctx, const std::string &oid,
return ioctx->operate(oid, &op);
}

void assert_snapc_seq(librados::ObjectWriteOperation *op,
uint64_t snapc_seq,
cls::rbd::AssertSnapcSeqState state) {
bufferlist bl;
encode(snapc_seq, bl);
encode(state, bl);
op->exec("rbd", "assert_snapc_seq", bl);
}

void mirror_uuid_get_start(librados::ObjectReadOperation *op) {
bufferlist bl;
op->exec("rbd", "mirror_uuid_get", bl);
Expand Down
14 changes: 11 additions & 3 deletions src/cls/rbd/cls_rbd_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

class Context;
namespace ceph { template <uint8_t> class BitVector; }
namespace neorados { struct WriteOp; }

namespace librbd {
namespace cls_client {
Expand Down Expand Up @@ -626,17 +627,24 @@ int namespace_list(librados::IoCtx *ioctx,
std::list<std::string> *entries);

// operations on data objects
int assert_snapc_seq(librados::IoCtx *ioctx, const std::string &oid,
uint64_t snapc_seq,
cls::rbd::AssertSnapcSeqState state);
void assert_snapc_seq(neorados::WriteOp* op,
uint64_t snapc_seq,
cls::rbd::AssertSnapcSeqState state);
void assert_snapc_seq(librados::ObjectWriteOperation *op,
uint64_t snapc_seq,
cls::rbd::AssertSnapcSeqState state);
int assert_snapc_seq(librados::IoCtx *ioctx, const std::string &oid,
uint64_t snapc_seq,
cls::rbd::AssertSnapcSeqState state);

void copyup(neorados::WriteOp* op, ceph::buffer::list data);
void copyup(librados::ObjectWriteOperation *op, ceph::buffer::list data);
int copyup(librados::IoCtx *ioctx, const std::string &oid,
ceph::buffer::list data);

void sparse_copyup(neorados::WriteOp* op,
const std::map<uint64_t, uint64_t> &extent_map,
ceph::buffer::list data);
void sparse_copyup(librados::ObjectWriteOperation *op,
const std::map<uint64_t, uint64_t> &extent_map,
ceph::buffer::list data);
Expand Down
3 changes: 2 additions & 1 deletion src/common/Timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ void SafeTimer::timer_thread()
if (schedule.empty()) {
cond.wait(l);
} else {
cond.wait_until(l, schedule.begin()->first);
auto when = schedule.begin()->first;
cond.wait_until(l, when);
}
ldout(cct,20) << "timer_thread awake" << dendl;
}
Expand Down
25 changes: 19 additions & 6 deletions src/include/neorados/RADOS.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,23 +551,25 @@ class RADOS final
template<typename CompletionToken>
auto execute(const Object& o, const IOContext& ioc, ReadOp&& op,
ceph::buffer::list* bl,
CompletionToken&& token, uint64_t* objver = nullptr) {
CompletionToken&& token, uint64_t* objver = nullptr,
const blkin_trace_info* trace_info = nullptr) {
boost::asio::async_completion<CompletionToken, Op::Signature> init(token);
execute(o, ioc, std::move(op), bl,
ReadOp::Completion::create(get_executor(),
std::move(init.completion_handler)),
objver);
objver, trace_info);
return init.result.get();
}

template<typename CompletionToken>
auto execute(const Object& o, const IOContext& ioc, WriteOp&& op,
CompletionToken&& token, uint64_t* objver = nullptr) {
CompletionToken&& token, uint64_t* objver = nullptr,
const blkin_trace_info* trace_info = nullptr) {
boost::asio::async_completion<CompletionToken, Op::Signature> init(token);
execute(o, ioc, std::move(op),
Op::Completion::create(get_executor(),
std::move(init.completion_handler)),
objver);
objver, trace_info);
return init.result.get();
}

Expand Down Expand Up @@ -939,6 +941,15 @@ class RADOS final
std::move(init.completion_handler)));
return init.result.get();
}

template<typename CompletionToken>
auto wait_for_latest_osd_map(CompletionToken&& token) {
boost::asio::async_completion<CompletionToken, SimpleOpSig> init(token);
wait_for_latest_osd_map(
SimpleOpComp::create(get_executor(), std::move(init.completion_handler)));
return init.result.get();
}

uint64_t instance_id() const;

private:
Expand All @@ -954,10 +965,11 @@ class RADOS final

void execute(const Object& o, const IOContext& ioc, ReadOp&& op,
ceph::buffer::list* bl, std::unique_ptr<Op::Completion> c,
uint64_t* objver);
uint64_t* objver, const blkin_trace_info* trace_info);

void execute(const Object& o, const IOContext& ioc, WriteOp&& op,
std::unique_ptr<Op::Completion> c, uint64_t* objver);
std::unique_ptr<Op::Completion> c, uint64_t* objver,
const blkin_trace_info* trace_info);

void execute(const Object& o, std::int64_t pool, ReadOp&& op,
ceph::buffer::list* bl, std::unique_ptr<Op::Completion> c,
Expand Down Expand Up @@ -1070,6 +1082,7 @@ class RADOS final
void enable_application(std::string_view pool, std::string_view app_name,
bool force, std::unique_ptr<SimpleOpComp> c);

void wait_for_latest_osd_map(std::unique_ptr<SimpleOpComp> c);

// Proxy object to provide access to low-level RADOS messaging clients
std::unique_ptr<detail::Client> impl;
Expand Down
2 changes: 2 additions & 0 deletions src/include/rados/librados_fwd.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef __LIBRADOS_FWD_HPP
#define __LIBRADOS_FWD_HPP

struct blkin_trace_info;

namespace libradosstriper {

class RadosStriper;
Expand Down
62 changes: 31 additions & 31 deletions src/librbd/AsioEngine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
// vim: ts=8 sw=2 smarttab

#include "librbd/AsioEngine.h"
#include "include/Context.h"
#include "include/stringify.h"
#include "include/neorados/RADOS.hpp"
#include "include/rados/librados.hpp"
#include "common/dout.h"
#include "librbd/asio/ContextWQ.h"
#include <boost/system/error_code.hpp>

#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
Expand All @@ -13,43 +16,40 @@

namespace librbd {

AsioEngine::AsioEngine(CephContext* cct)
: m_cct(cct) {
init();
AsioEngine::AsioEngine(std::shared_ptr<librados::Rados> rados)
: m_rados_api(std::make_shared<neorados::RADOS>(
neorados::RADOS::make_with_librados(*rados))),
m_cct(m_rados_api->cct()),
m_io_context(m_rados_api->get_io_context()),
m_api_strand(std::make_unique<boost::asio::io_context::strand>(
m_io_context)),
m_context_wq(std::make_unique<asio::ContextWQ>(m_cct, m_io_context)) {
ldout(m_cct, 20) << dendl;

auto rados_threads = m_cct->_conf.get_val<uint64_t>("librados_thread_count");
auto rbd_threads = m_cct->_conf.get_val<uint64_t>("rbd_op_threads");
if (rbd_threads > rados_threads) {
// inherit the librados thread count -- but increase it if librbd wants to
// utilize more threads
m_cct->_conf.set_val("librados_thread_count", stringify(rbd_threads));
}
}

AsioEngine::~AsioEngine() {
shut_down();
AsioEngine::AsioEngine(librados::IoCtx& io_ctx)
: AsioEngine(std::make_shared<librados::Rados>(io_ctx)) {
}

void AsioEngine::init() {
auto thread_count = m_cct->_conf.get_val<uint64_t>("rbd_op_threads");
m_threads.reserve(thread_count);

// prevent IO context from exiting if no work is currently scheduled
m_work_guard.emplace(boost::asio::make_work_guard(m_io_context));

ldout(m_cct, 5) << "spawning " << thread_count << " threads" << dendl;
for (auto i = 0U; i < thread_count; i++) {
m_threads.emplace_back([=] {
boost::system::error_code ec;
m_io_context.run(ec);
});
}

m_work_queue = std::make_unique<asio::ContextWQ>(m_io_context);
AsioEngine::~AsioEngine() {
ldout(m_cct, 20) << dendl;
m_api_strand.reset();
}

void AsioEngine::shut_down() {
ldout(m_cct, 5) << "joining threads" << dendl;

m_work_guard.reset();
for (auto& thread : m_threads) {
thread.join();
}
m_threads.clear();
void AsioEngine::dispatch(Context* ctx, int r) {
dispatch([ctx, r]() { ctx->complete(r); });
}

ldout(m_cct, 5) << "done" << dendl;
void AsioEngine::post(Context* ctx, int r) {
post([ctx, r]() { ctx->complete(r); });
}

} // namespace librbd
Loading

0 comments on commit c20868a

Please sign in to comment.