Skip to content

Commit

Permalink
Merge pull request #7827 from cbodley/wip-rgw-period-meta-logs
Browse files Browse the repository at this point in the history
rgw: fixes for per-period metadata logs

Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
  • Loading branch information
yehudasa committed Mar 5, 2016
2 parents a9493f2 + 9a6771a commit 2286463
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 97 deletions.
46 changes: 21 additions & 25 deletions src/rgw/rgw_admin.cc
Expand Up @@ -1556,6 +1556,20 @@ static int do_period_pull(const string& remote, const string& url, const string&
return 0;
}

static int read_current_period_id(RGWRados* store, const std::string& realm_id,
const std::string& realm_name,
std::string* period_id)
{
RGWRealm realm(realm_id, realm_name);
int ret = realm.init(g_ceph_context, store);
if (ret < 0) {
std::cerr << "failed to read realm: " << cpp_strerror(-ret) << std::endl;
return ret;
}
*period_id = realm.get_current_period();
return 0;
}

int main(int argc, char **argv)
{
vector<const char*> args;
Expand Down Expand Up @@ -2137,19 +2151,12 @@ int main(int argc, char **argv)
break;
case OPT_PERIOD_GET_CURRENT:
{
RGWRealm realm(realm_id, realm_name);
int ret = realm.init(g_ceph_context, store);
if (ret < 0 ) {
cerr << "Error initializing realm " << cpp_strerror(-ret) << std::endl;
return ret;
}
string current_id = realm.get_current_period();
int ret = read_current_period_id(store, realm_id, realm_name, &period_id);
if (ret < 0) {
cerr << "Error reading current period:" << cpp_strerror(-ret) << std::endl;
return ret;
}
formatter->open_object_section("period_get_current");
encode_json("current_period", current_id, formatter);
encode_json("current_period", period_id, formatter);
formatter->close_section();
formatter->flush(cout);
}
Expand Down Expand Up @@ -2282,21 +2289,18 @@ int main(int argc, char **argv)
break;
case OPT_REALM_LIST_PERIODS:
{
RGWRealm realm(realm_id, realm_name);
int ret = realm.init(g_ceph_context, store);
int ret = read_current_period_id(store, realm_id, realm_name, &period_id);
if (ret < 0) {
cerr << "realm.init failed: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
string current_period = realm.get_current_period();
list<string> periods;
ret = store->list_periods(current_period, periods);
ret = store->list_periods(period_id, periods);
if (ret < 0) {
cerr << "list periods failed: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
formatter->open_object_section("realm_periods_list");
encode_json("current_period", current_period, formatter);
encode_json("current_period", period_id, formatter);
encode_json("periods", periods, formatter);
formatter->close_section();
formatter->flush(cout);
Expand Down Expand Up @@ -4383,14 +4387,10 @@ int main(int argc, char **argv)
int i = (specified_shard_id ? shard_id : 0);

if (period_id.empty()) {
// read current_period from the realm
RGWRealm realm(realm_id, realm_name);
ret = realm.init(g_ceph_context, store);
int ret = read_current_period_id(store, realm_id, realm_name, &period_id);
if (ret < 0) {
std::cerr << "failed to init realm: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
period_id = realm.get_current_period();
std::cerr << "No --period given, using current period="
<< period_id << std::endl;
}
Expand Down Expand Up @@ -4434,14 +4434,10 @@ int main(int argc, char **argv)
int i = (specified_shard_id ? shard_id : 0);

if (period_id.empty()) {
// read current_period from the realm
RGWRealm realm(realm_id, realm_name);
int ret = realm.init(g_ceph_context, store);
int ret = read_current_period_id(store, realm_id, realm_name, &period_id);
if (ret < 0) {
std::cerr << "failed to init realm: " << cpp_strerror(-ret) << std::endl;
return ret;
}
period_id = realm.get_current_period();
std::cerr << "No --period given, using current period="
<< period_id << std::endl;
}
Expand Down
5 changes: 4 additions & 1 deletion src/rgw/rgw_coroutine.cc
Expand Up @@ -439,7 +439,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
RGWCoroutinesStack *stack = *iter;
env.stack = stack;

int ret = stack->operate(&env);
ret = stack->operate(&env);
stack->set_is_scheduled(false);
if (ret < 0) {
ldout(cct, 0) << "ERROR: stack->operate() returned ret=" << ret << dendl;
Expand Down Expand Up @@ -532,6 +532,9 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
iter = scheduled_stacks.begin();
}
if (ret == -ECANCELED) {
break;
}

if (iter == scheduled_stacks.end()) {
iter = scheduled_stacks.begin();
Expand Down
36 changes: 36 additions & 0 deletions src/rgw/rgw_cr_rados.cc
Expand Up @@ -611,3 +611,39 @@ int RGWRadosTimelogAddCR::request_complete()

return r;
}

int RGWAsyncStatObj::_send_request()
{
return store->raw_obj_stat(obj, psize, pmtime, pepoch,
nullptr, nullptr, objv_tracker);
}

RGWStatObjCR::RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
const rgw_obj& obj, uint64_t *psize,
time_t *pmtime, uint64_t *pepoch,
RGWObjVersionTracker *objv_tracker)
: RGWSimpleCoroutine(store->ctx()), store(store), async_rados(async_rados),
obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch),
objv_tracker(objv_tracker)
{
}

RGWStatObjCR::~RGWStatObjCR()
{
if (req) {
req->finish();
}
}

int RGWStatObjCR::send_request()
{
req = new RGWAsyncStatObj(stack->create_completion_notifier(),
store, obj, psize, pmtime, pepoch, objv_tracker);
async_rados->queue(req);
return 0;
}

int RGWStatObjCR::request_complete()
{
return req->get_ret_status();
}
37 changes: 37 additions & 0 deletions src/rgw/rgw_cr_rados.h
Expand Up @@ -887,5 +887,42 @@ class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
int request_complete();
};

class RGWAsyncStatObj : public RGWAsyncRadosRequest {
RGWRados *store;
rgw_obj obj;
uint64_t *psize;
time_t *pmtime;
uint64_t *pepoch;
RGWObjVersionTracker *objv_tracker;
protected:
int _send_request() override;
public:
RGWAsyncStatObj(RGWAioCompletionNotifier *cn, RGWRados *store,
const rgw_obj& obj, uint64_t *psize = nullptr,
time_t *pmtime = nullptr, uint64_t *pepoch = nullptr,
RGWObjVersionTracker *objv_tracker = nullptr)
: RGWAsyncRadosRequest(cn), store(store), obj(obj), psize(psize),
pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {}
};

class RGWStatObjCR : public RGWSimpleCoroutine {
RGWRados *store;
RGWAsyncRadosProcessor *async_rados;
rgw_obj obj;
uint64_t *psize;
time_t *pmtime;
uint64_t *pepoch;
RGWObjVersionTracker *objv_tracker;
RGWAsyncStatObj *req = nullptr;
public:
RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
const rgw_obj& obj, uint64_t *psize = nullptr,
time_t *pmtime = nullptr, uint64_t *pepoch = nullptr,
RGWObjVersionTracker *objv_tracker = nullptr);
~RGWStatObjCR();

int send_request() override;
int request_complete() override;
};

#endif
125 changes: 118 additions & 7 deletions src/rgw/rgw_metadata.cc
@@ -1,6 +1,7 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include <boost/intrusive_ptr.hpp>
#include "common/ceph_json.h"
#include "rgw_metadata.h"
#include "rgw_coroutine.h"
Expand All @@ -9,6 +10,9 @@
#include "rgw_rados.h"
#include "rgw_tools.h"

#include "rgw_cr_rados.h"
#include "rgw_boost_asio_yield.h"

#define dout_subsys ceph_subsys_rgw

void LogStatusDump::dump(Formatter *f) const {
Expand Down Expand Up @@ -357,19 +361,126 @@ RGWMetadataManager::~RGWMetadataManager()
handlers.clear();
}

static RGWPeriodHistory::Cursor find_oldest_log_period(RGWRados* store)
namespace {

class FindAnyShardCR : public RGWCoroutine {
RGWRados *const store;
const RGWMetadataLog& mdlog;
const int num_shards;
int ret = 0;
public:
FindAnyShardCR(RGWRados *store, const RGWMetadataLog& mdlog, int num_shards)
: RGWCoroutine(store->ctx()), store(store), mdlog(mdlog),
num_shards(num_shards) {}

int operate() {
reenter(this) {
// send stat requests for each shard in parallel
yield {
auto async_rados = store->get_async_rados();
auto& pool = store->get_zone_params().log_pool;
auto oid = std::string{};

for (int i = 0; i < num_shards; i++) {
mdlog.get_shard_oid(i, oid);
auto obj = rgw_obj{pool, oid};
spawn(new RGWStatObjCR(async_rados, store, obj), true);
}
}
drain_all();
// if any shards were found, return success
while (collect_next(&ret)) {
if (ret == 0) {
// TODO: cancel instead of waiting for the rest
return set_cr_done();
}
ret = 0; // collect_next() won't modify &ret unless it's a failure
}
// no shards found
set_retcode(-ENOENT);
return set_cr_error(-ENOENT);
}
return 0;
}
};

// return true if any log shards exist for the given period
int find_shards_for_period(RGWRados *store, const std::string& period_id)
{
// TODO: search backwards through the period history for the first period with
// no log shard objects, and return its successor (some shards may be missing
auto cct = store->ctx();
RGWMetadataLog mdlog(cct, store, period_id);
auto num_shards = cct->_conf->rgw_md_log_max_shards;

using FindAnyShardCRRef = boost::intrusive_ptr<FindAnyShardCR>;
auto cr = FindAnyShardCRRef{new FindAnyShardCR(store, mdlog, num_shards)};

RGWCoroutinesManager mgr(cct, nullptr);
int r = mgr.run(cr.get());
if (r < 0) {
return r;
}
return cr->get_ret_status();
}

RGWPeriodHistory::Cursor find_oldest_log_period(RGWRados *store)
{
// search backwards through the period history for the first period with no
// log shard objects, and return its successor (some shards may be missing
// if they contain no metadata yet, so we need to check all shards)
return store->period_history->get_current();
auto cursor = store->period_history->get_current();
auto oldest_log = cursor;

while (cursor) {
// search for an existing log shard object for this period
int r = find_shards_for_period(store, cursor.get_period().get_id());
if (r == -ENOENT) {
ldout(store->ctx(), 10) << "find_oldest_log_period found no log shards "
"for period " << cursor.get_period().get_id() << "; returning "
"period " << oldest_log.get_period().get_id() << dendl;
return oldest_log;
}
if (r < 0) {
return RGWPeriodHistory::Cursor{r};
}
oldest_log = cursor;

// advance to the period's predecessor
if (!cursor.has_prev()) {
auto& predecessor = cursor.get_period().get_predecessor();
if (predecessor.empty()) {
// this is the first period, so our logs must start here
ldout(store->ctx(), 10) << "find_oldest_log_period returning first "
"period " << cursor.get_period().get_id() << dendl;
return cursor;
}
// pull the predecessor and add it to our history
RGWPeriod period;
int r = store->period_puller->pull(predecessor, period);
if (r < 0) {
return RGWPeriodHistory::Cursor{r};
}
auto prev = store->period_history->insert(std::move(period));
if (!prev) {
return prev;
}
ldout(store->ctx(), 10) << "find_oldest_log_period advancing to "
"predecessor period " << predecessor << dendl;
assert(cursor.has_prev());
}
cursor.prev();
}
ldout(store->ctx(), 10) << "find_oldest_log_period returning empty cursor" << dendl;
return cursor;
}

} // anonymous namespace

int RGWMetadataManager::init(const std::string& current_period)
{
// find our oldest log so we can tell other zones where to start their sync
oldest_log_period = find_oldest_log_period(store);

if (store->is_meta_master()) {
// find our oldest log so we can tell other zones where to start their sync
oldest_log_period = find_oldest_log_period(store);
}
// open a log for the current period
current_log = get_log(current_period);
return 0;
Expand Down
12 changes: 6 additions & 6 deletions src/rgw/rgw_metadata.h
Expand Up @@ -150,12 +150,6 @@ class RGWMetadataLog {
return META_LOG_OBJ_PREFIX + period + ".";
}

void get_shard_oid(int id, string& oid) {
char buf[16];
snprintf(buf, sizeof(buf), "%d", id);
oid = prefix + buf;
}

RWLock lock;
set<int> modified_shards;

Expand All @@ -166,6 +160,12 @@ class RGWMetadataLog {
prefix(make_prefix(period)),
lock("RGWMetaLog::lock") {}

void get_shard_oid(int id, string& oid) const {
char buf[16];
snprintf(buf, sizeof(buf), "%d", id);
oid = prefix + buf;
}

int add_entry(RGWMetadataHandler *handler, const string& section, const string& key, bufferlist& bl);
int store_entries_in_shard(list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion);

Expand Down

0 comments on commit 2286463

Please sign in to comment.