Skip to content

Commit

Permalink
Merge pull request #12605 from cbodley/wip-18300
Browse files Browse the repository at this point in the history
rgw: RGWMetaSyncShardCR drops stack refs on destruction

Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
  • Loading branch information
yehudasa committed Jan 16, 2017
2 parents 7c55be9 + 34a2edb commit afa6cbf
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 69 deletions.
2 changes: 0 additions & 2 deletions src/rgw/rgw_coroutine.cc
Expand Up @@ -373,8 +373,6 @@ bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* re
return collect(NULL, ret, skip_stack);
}

static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg);

static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
{
((RGWAioCompletionNotifier *)arg)->cb();
Expand Down
75 changes: 19 additions & 56 deletions src/rgw/rgw_metadata.cc
Expand Up @@ -179,72 +179,35 @@ int RGWMetadataLog::get_info(int shard_id, RGWMetadataLogInfo *info)
return 0;
}

static void _mdlog_info_completion(librados::completion_t cb, void *arg);

class RGWMetadataLogInfoCompletion : public RefCountedObject {
RGWMetadataLogInfo *pinfo;
RGWCompletionManager *completion_manager;
void *user_info;
int *pret;
cls_log_header header;
librados::IoCtx io_ctx;
librados::AioCompletion *completion;

public:
RGWMetadataLogInfoCompletion(RGWMetadataLogInfo *_pinfo, RGWCompletionManager *_cm, void *_uinfo, int *_pret) :
pinfo(_pinfo), completion_manager(_cm), user_info(_uinfo), pret(_pret) {
completion = librados::Rados::aio_create_completion((void *)this, NULL,
_mdlog_info_completion);
}

~RGWMetadataLogInfoCompletion() {
completion->release();
}

void finish(librados::completion_t cb) {
*pret = completion->get_return_value();
if (*pret >= 0) {
pinfo->marker = header.max_marker;
pinfo->last_update = header.max_time.to_real_time();
}
completion_manager->complete(NULL, user_info);
put();
}

librados::IoCtx& get_io_ctx() { return io_ctx; }

cls_log_header *get_header() {
return &header;
}

librados::AioCompletion *get_completion() {
return completion;
}
};

static void _mdlog_info_completion(librados::completion_t cb, void *arg)
{
RGWMetadataLogInfoCompletion *infoc = (RGWMetadataLogInfoCompletion *)arg;
auto infoc = static_cast<RGWMetadataLogInfoCompletion *>(arg);
infoc->finish(cb);
infoc->put(); // drop the ref from get_info_async()
}

int RGWMetadataLog::get_info_async(int shard_id, RGWMetadataLogInfo *info, RGWCompletionManager *completion_manager, void *user_info, int *pret)
RGWMetadataLogInfoCompletion::RGWMetadataLogInfoCompletion(info_callback_t cb)
: completion(librados::Rados::aio_create_completion((void *)this, nullptr,
_mdlog_info_completion)),
callback(cb)
{
string oid;
get_shard_oid(shard_id, oid);

RGWMetadataLogInfoCompletion *req_completion = new RGWMetadataLogInfoCompletion(info, completion_manager, user_info, pret);
}

req_completion->get();
RGWMetadataLogInfoCompletion::~RGWMetadataLogInfoCompletion()
{
completion->release();
}

int ret = store->time_log_info_async(req_completion->get_io_ctx(), oid, req_completion->get_header(), req_completion->get_completion());
if (ret < 0) {
return ret;
}
int RGWMetadataLog::get_info_async(int shard_id, RGWMetadataLogInfoCompletion *completion)
{
string oid;
get_shard_oid(shard_id, oid);

req_completion->put();
completion->get(); // hold a ref until the completion fires

return 0;
return store->time_log_info_async(completion->get_io_ctx(), oid,
&completion->get_header(),
completion->get_completion());
}

int RGWMetadataLog::trim(int shard_id, const real_time& from_time, const real_time& end_time,
Expand Down
33 changes: 32 additions & 1 deletion src/rgw/rgw_metadata.h
Expand Up @@ -5,13 +5,15 @@
#define CEPH_RGW_METADATA_H

#include <string>
#include <boost/optional.hpp>

#include "include/types.h"
#include "rgw_common.h"
#include "rgw_period_history.h"
#include "cls/version/cls_version_types.h"
#include "cls/log/cls_log_types.h"
#include "common/RWLock.h"
#include "common/RefCountedObj.h"
#include "common/ceph_time.h"


Expand Down Expand Up @@ -140,6 +142,35 @@ struct RGWMetadataLogInfo {

class RGWCompletionManager;

class RGWMetadataLogInfoCompletion : public RefCountedObject {
public:
using info_callback_t = std::function<void(int, const cls_log_header&)>;
private:
cls_log_header header;
librados::IoCtx io_ctx;
librados::AioCompletion *completion;
std::mutex mutex; //< protects callback between cancel/complete
boost::optional<info_callback_t> callback; //< cleared on cancel
public:
RGWMetadataLogInfoCompletion(info_callback_t callback);
virtual ~RGWMetadataLogInfoCompletion();

librados::IoCtx& get_io_ctx() { return io_ctx; }
cls_log_header& get_header() { return header; }
librados::AioCompletion* get_completion() { return completion; }

void finish(librados::completion_t cb) {
std::lock_guard<std::mutex> lock(mutex);
if (callback) {
(*callback)(completion->get_return_value(), header);
}
}
void cancel() {
std::lock_guard<std::mutex> lock(mutex);
callback = boost::none;
}
};

class RGWMetadataLog {
CephContext *cct;
RGWRados *store;
Expand Down Expand Up @@ -193,7 +224,7 @@ class RGWMetadataLog {

int trim(int shard_id, const real_time& from_time, const real_time& end_time, const string& start_marker, const string& end_marker);
int get_info(int shard_id, RGWMetadataLogInfo *info);
int get_info_async(int shard_id, RGWMetadataLogInfo *info, RGWCompletionManager *completion_manager, void *user_info, int *pret);
int get_info_async(int shard_id, RGWMetadataLogInfoCompletion *completion);
int lock_exclusive(int shard_id, timespan duration, string&zone_id, string& owner_id);
int unlock(int shard_id, string& zone_id, string& owner_id);

Expand Down
38 changes: 28 additions & 10 deletions src/rgw/rgw_sync.cc
Expand Up @@ -1198,8 +1198,8 @@ class RGWCloneMetaLogCoroutine : public RGWCoroutine {
int max_entries = CLONE_MAX_ENTRIES;

RGWRESTReadResource *http_op = nullptr;
boost::intrusive_ptr<RGWMetadataLogInfoCompletion> completion;

int req_ret = 0;
RGWMetadataLogInfo shard_info;
rgw_mdlog_shard_data data;

Expand All @@ -1217,6 +1217,9 @@ class RGWCloneMetaLogCoroutine : public RGWCoroutine {
if (http_op) {
http_op->put();
}
if (completion) {
completion->cancel();
}
}

int operate();
Expand Down Expand Up @@ -1269,7 +1272,9 @@ class RGWMetaSyncShardCR : public RGWCoroutine {

bool *reset_backoff;

map<RGWCoroutinesStack *, string> stack_to_pos;
// hold a reference to the cr stack while it's in the map
using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
map<StackRef, string> stack_to_pos;
map<string, string> pos_to_prev;

bool can_adjust_marker = false;
Expand Down Expand Up @@ -1331,7 +1336,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
int child_ret;
RGWCoroutinesStack *child;
while (collect_next(&child_ret, &child)) {
map<RGWCoroutinesStack *, string>::iterator iter = stack_to_pos.find(child);
auto iter = stack_to_pos.find(child);
if (iter == stack_to_pos.end()) {
/* some other stack that we don't care about */
continue;
Expand Down Expand Up @@ -1371,8 +1376,6 @@ class RGWMetaSyncShardCR : public RGWCoroutine {

ldout(sync_env->cct, 0) << *this << ": adjusting marker pos=" << sync_marker.marker << dendl;
stack_to_pos.erase(iter);

child->put();
}
}

Expand Down Expand Up @@ -1443,8 +1446,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
// fetch remote and write locally
yield {
RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, MDLOG_STATUS_COMPLETE, marker_tracker), false);
stack->get();

// stack_to_pos holds a reference to the stack
stack_to_pos[stack] = iter->first;
pos_to_prev[iter->first] = marker;
}
Expand Down Expand Up @@ -1592,8 +1594,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
yield {
RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, mdlog_entry.log_data.status, marker_tracker), false);
assert(stack);
stack->get();

// stack_to_pos holds a reference to the stack
stack_to_pos[stack] = log_iter->id;
pos_to_prev[log_iter->id] = marker;
}
Expand Down Expand Up @@ -2064,7 +2065,22 @@ int RGWCloneMetaLogCoroutine::state_init()

int RGWCloneMetaLogCoroutine::state_read_shard_status()
{
int ret = mdlog->get_info_async(shard_id, &shard_info, stack->get_completion_mgr(), (void *)stack, &req_ret);
const bool add_ref = false; // default constructs with refs=1

completion.reset(new RGWMetadataLogInfoCompletion(
[this](int ret, const cls_log_header& header) {
if (ret < 0) {
ldout(cct, 1) << "ERROR: failed to read mdlog info with "
<< cpp_strerror(ret) << dendl;
} else {
shard_info.marker = header.max_marker;
shard_info.last_update = header.max_time.to_real_time();
}
// wake up parent stack
stack->get_completion_mgr()->complete(nullptr, stack);
}), add_ref);

int ret = mdlog->get_info_async(shard_id, completion.get());
if (ret < 0) {
ldout(cct, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
return set_cr_error(ret);
Expand All @@ -2075,6 +2091,8 @@ int RGWCloneMetaLogCoroutine::state_read_shard_status()

int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
{
completion.reset();

ldout(cct, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;

marker = shard_info.marker;
Expand Down

0 comments on commit afa6cbf

Please sign in to comment.