Skip to content

Commit

Permalink
rgw: collect skips a specific coroutine stack
Browse files Browse the repository at this point in the history
Fixes: http://tracker.ceph.com/issues/16665

Instead of drain_all_but() that specifies number of stacks to leave behind,
added drain_all_but_stack() that has a specific stack specified. This is needed
so that we don't call wakeup() through lease_cr->go_down() on a cr stack that
was already collected.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
  • Loading branch information
yehudasa committed Jul 13, 2016
1 parent 5f313f0 commit 88dccf0
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 40 deletions.
20 changes: 12 additions & 8 deletions src/rgw/rgw_coroutine.cc
Expand Up @@ -292,15 +292,15 @@ int RGWCoroutinesStack::unwind(int retcode)
}


bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret) /* returns true if needs to be called again */
bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
{
rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
*ret = 0;
vector<RGWCoroutinesStack *> new_list;

for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
RGWCoroutinesStack *stack = *iter;
if (!stack->is_done()) {
if (stack == skip_stack || !stack->is_done()) {
new_list.push_back(stack);
ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
continue;
Expand Down Expand Up @@ -349,9 +349,9 @@ bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesS
return false;
}

bool RGWCoroutinesStack::collect(int *ret) /* returns true if needs to be called again */
bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
{
return collect(NULL, ret);
return collect(NULL, ret, skip_stack);
}

static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg);
Expand Down Expand Up @@ -712,9 +712,9 @@ RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
return stack->spawn(this, op, wait);
}

bool RGWCoroutine::collect(int *ret) /* returns true if needs to be called again */
bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
{
return stack->collect(this, ret);
return stack->collect(this, ret, skip_stack);
}

bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
Expand Down Expand Up @@ -752,14 +752,18 @@ ostream& operator<<(ostream& out, const RGWCoroutine& cr)
return out;
}

bool RGWCoroutine::drain_children(int num_cr_left)
bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack)
{
bool done = false;
assert(num_cr_left >= 0);
if (num_cr_left == 0 && skip_stack) {
num_cr_left = 1;
}
reenter(&drain_cr) {
while (num_spawned() > (size_t)num_cr_left) {
yield wait_for_child();
int ret;
while (collect(&ret)) {
while (collect(&ret, skip_stack)) {
if (ret < 0) {
ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
/* we should have reported this error */
Expand Down
12 changes: 8 additions & 4 deletions src/rgw/rgw_coroutine.h
Expand Up @@ -265,11 +265,11 @@ class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {

void call(RGWCoroutine *op); /* call at the same stack we're in */
RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
bool collect(int *ret); /* returns true if needs to be called again */
bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */

int wait(const utime_t& interval);
bool drain_children(int num_cr_left); /* returns true if needed to be called again */
bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
void wakeup();
void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */

Expand Down Expand Up @@ -306,6 +306,10 @@ do { \
drain_cr = boost::asio::coroutine(); \
yield_until_true(drain_children(n))

#define drain_all_but_stack(stack) \
drain_cr = boost::asio::coroutine(); \
yield_until_true(drain_children(1, stack))

template <class T>
class RGWConsumerCR : public RGWCoroutine {
list<T> product;
Expand Down Expand Up @@ -371,7 +375,7 @@ class RGWCoroutinesStack : public RefCountedObject {
RGWCoroutinesStack *parent;

RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
bool collect(RGWCoroutine *op, int *ret); /* returns true if needs to be called again */
bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
public:
RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
Expand Down Expand Up @@ -442,7 +446,7 @@ class RGWCoroutinesStack : public RefCountedObject {
int wait(const utime_t& interval);
void wakeup();

bool collect(int *ret); /* returns true if needs to be called again */
bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */

RGWAioCompletionNotifier *create_completion_notifier();
RGWCompletionManager *get_completion_mgr();
Expand Down
37 changes: 20 additions & 17 deletions src/rgw/rgw_data_sync.cc
Expand Up @@ -470,7 +470,7 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
}
}
while (collect(&ret)) {
while (collect(&ret, NULL)) {
if (ret < 0) {
return set_state(RGWCoroutine_Error);
}
Expand All @@ -495,7 +495,7 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
lock_name, cookie));
}
while (collect(&ret)) {
while (collect(&ret, NULL)) {
if (ret < 0) {
return set_state(RGWCoroutine_Error);
}
Expand Down Expand Up @@ -722,7 +722,7 @@ class RGWListBucketIndexesCR : public RGWCoroutine {
yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
EIO, string("failed to build bucket instances map")));
}
while (collect(&ret)) {
while (collect(&ret, NULL)) {
if (ret < 0) {
yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
-ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
Expand Down Expand Up @@ -980,6 +980,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
set<string> spawned_keys;

RGWContinuousLeaseCR *lease_cr;
RGWCoroutinesStack *lease_stack;
string status_oid;


Expand Down Expand Up @@ -1007,7 +1008,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
sync_marker(_marker),
marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
lease_cr(NULL), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
Expand Down Expand Up @@ -1074,7 +1075,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
lease_stack = spawn(lease_cr, false);
}

int full_sync() {
Expand Down Expand Up @@ -1246,7 +1247,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
int ret;
while (collect(&ret)) {
while (collect(&ret, lease_stack)) {
if (ret < 0) {
ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
/* we have reported this error */
Expand Down Expand Up @@ -2230,6 +2231,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
int total_entries;

RGWContinuousLeaseCR *lease_cr;
RGWCoroutinesStack *lease_stack;

string status_oid;

Expand All @@ -2245,7 +2247,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
full_marker(_full_marker), marker_tracker(NULL),
spawn_window(BUCKET_SYNC_SPAWN_WINDOW), entry(NULL),
op(CLS_RGW_OP_ADD),
total_entries(0), lease_cr(NULL) {
total_entries(0), lease_cr(nullptr), lease_stack(nullptr) {
status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
logger.init(sync_env, "BucketFull", bucket_shard_str(bucket_name, bucket_id, shard_id));
}
Expand All @@ -2272,7 +2274,7 @@ int RGWBucketShardFullSyncCR::operate()
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
lease_stack = spawn(lease_cr, false);
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
Expand Down Expand Up @@ -2320,7 +2322,7 @@ int RGWBucketShardFullSyncCR::operate()
}
while ((int)num_spawned() > spawn_window) {
yield wait_for_child();
while (collect(&ret)) {
while (collect(&ret, lease_stack)) {
if (ret < 0) {
ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
/* we have reported this error */
Expand All @@ -2331,7 +2333,7 @@ int RGWBucketShardFullSyncCR::operate()
} while (list_result.is_truncated);
set_status("done iterating over all objects");
/* wait for all operations to complete */
drain_all_but(1); /* still need to hold lease cr */
drain_all_but_stack(lease_stack); /* still need to hold lease cr */
/* update sync state to incremental */
yield {
rgw_bucket_shard_sync_info sync_status;
Expand Down Expand Up @@ -2371,6 +2373,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
int spawn_window;
bool updated_status;
RGWContinuousLeaseCR *lease_cr;
RGWCoroutinesStack *lease_stack;
string status_oid;

string name;
Expand All @@ -2391,7 +2394,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
bucket_info(_bucket_info),
inc_marker(_inc_marker), entry(NULL), marker_tracker(NULL),
spawn_window(BUCKET_SYNC_SPAWN_WINDOW), updated_status(false),
lease_cr(NULL) {
lease_cr(nullptr), lease_stack(nullptr) {
status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
set_description() << "bucket shard incremental sync bucket=" << _bucket_name << ":" << _bucket_id << ":" << _shard_id;
set_status("init");
Expand Down Expand Up @@ -2420,7 +2423,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
lease_stack = spawn(lease_cr, false);
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
Expand All @@ -2441,7 +2444,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
inc_marker.position, &list_result));
if (retcode < 0 && retcode != -ENOENT) {
/* wait for all operations to complete */
drain_all_but(1);
drain_all_but_stack(lease_stack);
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
Expand Down Expand Up @@ -2517,7 +2520,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
}
ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
yield wait_for_child();
while (collect(&ret)) {
while (collect(&ret, lease_stack)) {
if (ret < 0) {
ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
/* we have reported this error */
Expand Down Expand Up @@ -2549,7 +2552,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
while ((int)num_spawned() > spawn_window) {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
while (collect(&ret)) {
while (collect(&ret, lease_stack)) {
if (ret < 0) {
ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
/* we have reported this error */
Expand All @@ -2565,13 +2568,13 @@ int RGWBucketShardIncrementalSyncCR::operate()
}
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: marker_tracker->flush() returned retcode=" << retcode << dendl;
drain_all_but(1);
drain_all_but_stack(lease_stack);
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}

drain_all_but(1);
drain_all_but_stack(lease_stack);
lease_cr->go_down();
/* wait for all operations to complete */
drain_all();
Expand Down
26 changes: 15 additions & 11 deletions src/rgw/rgw_sync.cc
Expand Up @@ -593,12 +593,13 @@ class RGWInitSyncStatusCoroutine : public RGWCoroutine {
rgw_meta_sync_info status;
vector<RGWMetadataLogInfo> shards_info;
RGWContinuousLeaseCR *lease_cr;
RGWCoroutinesStack *lease_stack;
public:
RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
const rgw_meta_sync_info &status)
: RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
status(status), shards_info(status.num_shards),
lease_cr(NULL) {}
lease_cr(nullptr), lease_stack(nullptr) {}

~RGWInitSyncStatusCoroutine() {
if (lease_cr) {
Expand All @@ -618,7 +619,7 @@ class RGWInitSyncStatusCoroutine : public RGWCoroutine {
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_env->status_oid(),
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
lease_stack = spawn(lease_cr, false);
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
Expand Down Expand Up @@ -651,7 +652,7 @@ class RGWInitSyncStatusCoroutine : public RGWCoroutine {
}
}

drain_all_but(1); /* the lease cr still needs to run */
drain_all_but_stack(lease_stack); /* the lease cr still needs to run */

yield {
set_status("updating sync status");
Expand All @@ -674,7 +675,7 @@ class RGWInitSyncStatusCoroutine : public RGWCoroutine {
}
set_status("drop lock lease");
yield lease_cr->go_down();
while (collect(&ret)) {
while (collect(&ret, NULL)) {
if (ret < 0) {
return set_cr_error(ret);
}
Expand Down Expand Up @@ -737,6 +738,7 @@ class RGWFetchAllMetaCR : public RGWCoroutine {
RGWShardedOmapCRManager *entries_index;

RGWContinuousLeaseCR *lease_cr;
RGWCoroutinesStack *lease_stack;
bool lost_lock;
bool failed;

Expand All @@ -746,7 +748,8 @@ class RGWFetchAllMetaCR : public RGWCoroutine {
RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
map<uint32_t, rgw_meta_sync_marker>& _markers) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
num_shards(_num_shards),
ret_status(0), entries_index(NULL), lease_cr(NULL), lost_lock(false), failed(false), markers(_markers) {
ret_status(0), entries_index(NULL), lease_cr(nullptr), lease_stack(nullptr),
lost_lock(false), failed(false), markers(_markers) {
}

~RGWFetchAllMetaCR() {
Expand Down Expand Up @@ -791,7 +794,7 @@ class RGWFetchAllMetaCR : public RGWCoroutine {
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store, sync_env->store->get_zone_params().log_pool, sync_env->status_oid(),
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
lease_stack = spawn(lease_cr, false);
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
Expand Down Expand Up @@ -870,12 +873,12 @@ class RGWFetchAllMetaCR : public RGWCoroutine {
}
}

drain_all_but(1); /* the lease cr still needs to run */
drain_all_but_stack(lease_stack); /* the lease cr still needs to run */

yield lease_cr->go_down();

int ret;
while (collect(&ret)) {
while (collect(&ret, NULL)) {
if (ret < 0) {
return set_cr_error(ret);
}
Expand Down Expand Up @@ -1253,6 +1256,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
boost::asio::coroutine full_cr;

RGWContinuousLeaseCR *lease_cr = nullptr;
RGWCoroutinesStack *lease_stack = nullptr;
bool lost_lock = false;

bool *reset_backoff;
Expand Down Expand Up @@ -1383,7 +1387,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
sync_env->shard_obj_name(shard_id),
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
lease_stack = spawn(lease_cr, false);
lost_lock = false;
}
while (!lease_cr->is_locked()) {
Expand Down Expand Up @@ -1505,7 +1509,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
sync_env->shard_obj_name(shard_id),
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
lease_stack = spawn(lease_cr, false);
lost_lock = false;
}
while (!lease_cr->is_locked()) {
Expand Down Expand Up @@ -1728,7 +1732,7 @@ class RGWMetaSyncCR : public RGWCoroutine {
}
}
// wait for each shard to complete
collect(&ret);
collect(&ret, NULL);
drain_all();
{
// drop shard cr refs under lock
Expand Down

0 comments on commit 88dccf0

Please sign in to comment.