Skip to content

Commit

Permalink
rgw: don't pass object context into async coroutines
Browse files Browse the repository at this point in the history
Fixes: http://tracker.ceph.com/issues/15625

The async read data/attrs coroutines may outlive the callers. Instead
of introducing a complicated refcounting scheme, just don't pass it in.
Anyway, it was not thread safe, and the benefit of using it is not
clear.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
(cherry picked from commit b225d17)
  • Loading branch information
yehudasa committed May 9, 2016
1 parent 1d12f82 commit 4a5f33d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 55 deletions.
10 changes: 4 additions & 6 deletions src/rgw/rgw_cr_rados.h
Expand Up @@ -174,7 +174,7 @@ template <class T>
class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
RGWObjectCtx& obj_ctx;
RGWObjectCtx obj_ctx;
bufferlist bl;

rgw_bucket pool;
Expand All @@ -188,11 +188,10 @@ class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {

public:
RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
RGWObjectCtx& _obj_ctx,
const rgw_bucket& _pool, const string& _oid,
T *_result) : RGWSimpleCoroutine(_store->ctx()),
async_rados(_async_rados), store(_store),
obj_ctx(_obj_ctx),
obj_ctx(store),
pool(_pool), oid(_oid),
pattrs(NULL),
result(_result),
Expand Down Expand Up @@ -252,7 +251,7 @@ int RGWSimpleRadosReadCR<T>::request_complete()
class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
RGWObjectCtx& obj_ctx;
RGWObjectCtx obj_ctx;
bufferlist bl;

rgw_bucket pool;
Expand All @@ -264,11 +263,10 @@ class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {

public:
RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
RGWObjectCtx& _obj_ctx,
rgw_bucket& _pool, const string& _oid,
map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
async_rados(_async_rados), store(_store),
obj_ctx(_obj_ctx),
obj_ctx(store),
pool(_pool), oid(_oid),
pattrs(_pattrs),
req(NULL) { }
Expand Down
39 changes: 12 additions & 27 deletions src/rgw/rgw_data_sync.cc
Expand Up @@ -46,18 +46,15 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
class RGWReadDataSyncStatusCoroutine : public RGWSimpleRadosReadCR<rgw_data_sync_info> {
RGWDataSyncEnv *sync_env;

RGWObjectCtx& obj_ctx;

rgw_data_sync_status *sync_status;

public:
RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, RGWObjectCtx& _obj_ctx,
rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_sync_env->async_rados, _sync_env->store, _obj_ctx,
RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_sync_env->async_rados, _sync_env->store,
_sync_env->store->get_zone_params().log_pool,
RGWDataSyncStatusManager::sync_status_oid(_sync_env->source_zone),
&_status->sync_info),
sync_env(_sync_env),
obj_ctx(_obj_ctx),
sync_status(_status) {}

int handle_data(rgw_data_sync_info& data);
Expand All @@ -72,7 +69,7 @@ int RGWReadDataSyncStatusCoroutine::handle_data(rgw_data_sync_info& data)
map<uint32_t, rgw_data_sync_marker>& markers = sync_status->sync_markers;
RGWRados *store = sync_env->store;
for (int i = 0; i < (int)data.num_shards; i++) {
spawn(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store, obj_ctx, store->get_zone_params().log_pool,
spawn(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i), &markers[i]), true);
}
return 0;
Expand Down Expand Up @@ -344,7 +341,6 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
RGWDataSyncEnv *sync_env;

RGWRados *store;
RGWObjectCtx& obj_ctx;

string sync_status_oid;

Expand All @@ -354,9 +350,8 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
map<int, RGWDataChangesLogInfo> shards_info;
public:
RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
RGWObjectCtx& _obj_ctx, uint32_t _num_shards) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env), store(sync_env->store),
obj_ctx(_obj_ctx) {
uint32_t _num_shards) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env), store(sync_env->store) {
lock_name = "sync_lock";
status.num_shards = _num_shards;

Expand Down Expand Up @@ -527,8 +522,7 @@ int RGWRemoteDataLog::get_shard_info(int shard_id)

int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
{
RGWObjectCtx obj_ctx(store, NULL);
int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, obj_ctx, sync_status));
int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, sync_status));
if (r == -ENOENT) {
r = 0;
}
Expand All @@ -537,8 +531,7 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)

int RGWRemoteDataLog::init_sync_status(int num_shards)
{
RGWObjectCtx obj_ctx(store, NULL);
return run(new RGWInitDataSyncStatusCoroutine(&sync_env, obj_ctx, num_shards));
return run(new RGWInitDataSyncStatusCoroutine(&sync_env, num_shards));
}

static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
Expand Down Expand Up @@ -1139,8 +1132,7 @@ class RGWDataSyncShardControlCR : public RGWBackoffControlCR {

RGWCoroutine *alloc_finisher_cr() {
RGWRados *store = sync_env->store;
RGWObjectCtx obj_ctx(store, NULL);
return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store, obj_ctx, store->get_zone_params().log_pool,
return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id), &sync_marker);
}

Expand All @@ -1160,8 +1152,6 @@ class RGWDataSyncCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
uint32_t num_shards;

RGWObjectCtx obj_ctx;

rgw_data_sync_status sync_status;

RGWDataSyncShardMarkerTrack *marker_tracker;
Expand All @@ -1175,7 +1165,6 @@ class RGWDataSyncCR : public RGWCoroutine {
RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
num_shards(_num_shards),
obj_ctx(sync_env->store),
marker_tracker(NULL),
shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
reset_backoff(_reset_backoff) {
Expand All @@ -1191,7 +1180,7 @@ class RGWDataSyncCR : public RGWCoroutine {
reenter(this) {

/* read sync status */
yield call(new RGWReadDataSyncStatusCoroutine(sync_env, obj_ctx, &sync_status));
yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));

if (retcode == -ENOENT) {
sync_status.sync_info.num_shards = num_shards;
Expand All @@ -1203,7 +1192,7 @@ class RGWDataSyncCR : public RGWCoroutine {
/* state: init status */
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
yield call(new RGWInitDataSyncStatusCoroutine(sync_env, obj_ctx, sync_status.sync_info.num_shards));
yield call(new RGWInitDataSyncStatusCoroutine(sync_env, sync_status.sync_info.num_shards));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
return set_cr_error(retcode);
Expand Down Expand Up @@ -1320,9 +1309,7 @@ void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {

int RGWRemoteDataLog::run_sync(int num_shards, rgw_data_sync_status& sync_status)
{
RGWObjectCtx obj_ctx(store, NULL);

int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, obj_ctx, &sync_status));
int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, &sync_status));
if (r < 0 && r != -ENOENT) {
ldout(store->ctx(), 0) << "ERROR: failed to read sync status from source_zone=" << sync_env.source_zone << " r=" << r << dendl;
return r;
Expand Down Expand Up @@ -1604,7 +1591,6 @@ void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attr

class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
RGWObjectCtx obj_ctx;
string oid;
rgw_bucket_shard_sync_info *status;

Expand All @@ -1614,7 +1600,6 @@ class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
const string& _bucket_name, const string _bucket_id, int _shard_id,
rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
obj_ctx(sync_env->store),
oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, _bucket_name, _bucket_id, _shard_id)),
status(_status) {}
int operate();
Expand All @@ -1623,7 +1608,7 @@ class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
int RGWReadBucketSyncStatusCoroutine::operate()
{
reenter(this) {
yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store, obj_ctx,
yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
sync_env->store->get_zone_params().log_pool,
oid,
&attrs));
Expand Down
32 changes: 10 additions & 22 deletions src/rgw/rgw_sync.cc
Expand Up @@ -587,17 +587,15 @@ bool RGWListRemoteMDLogCR::spawn_next() {

class RGWInitSyncStatusCoroutine : public RGWCoroutine {
RGWMetaSyncEnv *sync_env;
RGWObjectCtx& obj_ctx;

rgw_meta_sync_info status;
vector<RGWMetadataLogInfo> shards_info;
RGWContinuousLeaseCR *lease_cr;
public:
RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
RGWObjectCtx& _obj_ctx,
const rgw_meta_sync_info &status)
: RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
obj_ctx(_obj_ctx), status(status), shards_info(status.num_shards),
status(status), shards_info(status.num_shards),
lease_cr(NULL) {}

~RGWInitSyncStatusCoroutine() {
Expand Down Expand Up @@ -689,19 +687,16 @@ class RGWInitSyncStatusCoroutine : public RGWCoroutine {

class RGWReadSyncStatusCoroutine : public RGWSimpleRadosReadCR<rgw_meta_sync_info> {
RGWMetaSyncEnv *sync_env;
RGWObjectCtx& obj_ctx;

rgw_meta_sync_status *sync_status;

public:
RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
RGWObjectCtx& _obj_ctx,
rgw_meta_sync_status *_status) : RGWSimpleRadosReadCR(_sync_env->async_rados, _sync_env->store, _obj_ctx,
rgw_meta_sync_status *_status) : RGWSimpleRadosReadCR(_sync_env->async_rados, _sync_env->store,
_sync_env->store->get_zone_params().log_pool,
_sync_env->status_oid(),
&_status->sync_info),
sync_env(_sync_env),
obj_ctx(_obj_ctx),
sync_status(_status) {

}
Expand All @@ -718,7 +713,7 @@ int RGWReadSyncStatusCoroutine::handle_data(rgw_meta_sync_info& data)
RGWRados *store = sync_env->store;
map<uint32_t, rgw_meta_sync_marker>& markers = sync_status->sync_markers;
for (int i = 0; i < (int)data.num_shards; i++) {
spawn(new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store, obj_ctx, store->get_zone_params().log_pool,
spawn(new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
sync_env->shard_obj_name(i), &markers[i]), true);
}
return 0;
Expand Down Expand Up @@ -1636,17 +1631,14 @@ class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
rgw_meta_sync_marker sync_marker;
const std::string period_marker;

RGWObjectCtx obj_ctx;

public:
RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_bucket& _pool,
const std::string& period, RGWMetadataLog* mdlog,
uint32_t _shard_id, const rgw_meta_sync_marker& _marker,
std::string&& period_marker)
: RGWBackoffControlCR(_sync_env->cct, true), sync_env(_sync_env),
pool(_pool), period(period), mdlog(mdlog), shard_id(_shard_id),
sync_marker(_marker), period_marker(std::move(period_marker)),
obj_ctx(sync_env->store) {}
sync_marker(_marker), period_marker(std::move(period_marker)) {}

RGWCoroutine *alloc_cr() {
return new RGWMetaSyncShardCR(sync_env, pool, period, mdlog, shard_id,
Expand All @@ -1655,7 +1647,7 @@ class RGWMetaSyncShardControlCR : public RGWBackoffControlCR

RGWCoroutine *alloc_finisher_cr() {
RGWRados *store = sync_env->store;
return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store, obj_ctx, pool,
return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store, pool,
sync_env->shard_obj_name(shard_id), &sync_marker);
}
};
Expand Down Expand Up @@ -1784,8 +1776,7 @@ int RGWRemoteMetaLog::read_sync_status()
return 0;
}

RGWObjectCtx obj_ctx(store, NULL);
return run(new RGWReadSyncStatusCoroutine(&sync_env, obj_ctx, &sync_status));
return run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
}

int RGWRemoteMetaLog::init_sync_status()
Expand All @@ -1810,8 +1801,7 @@ int RGWRemoteMetaLog::init_sync_status()
}
}

RGWObjectCtx obj_ctx(store, NULL);
return run(new RGWInitSyncStatusCoroutine(&sync_env, obj_ctx, sync_info));
return run(new RGWInitSyncStatusCoroutine(&sync_env, sync_info));
}

int RGWRemoteMetaLog::store_sync_info()
Expand Down Expand Up @@ -1867,7 +1857,6 @@ int RGWRemoteMetaLog::run_sync()
return 0;
}

RGWObjectCtx obj_ctx(store, NULL);
int r = 0;

// get shard count and oldest log period from master
Expand Down Expand Up @@ -1897,7 +1886,7 @@ int RGWRemoteMetaLog::run_sync()
ldout(store->ctx(), 1) << __func__ << "(): going down" << dendl;
return 0;
}
r = run(new RGWReadSyncStatusCoroutine(&sync_env, obj_ctx, &sync_status));
r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
if (r < 0 && r != -ENOENT) {
ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
return r;
Expand All @@ -1922,8 +1911,7 @@ int RGWRemoteMetaLog::run_sync()
sync_status.sync_info.period = cursor.get_period().get_id();
sync_status.sync_info.realm_epoch = cursor.get_epoch();
}
r = run(new RGWInitSyncStatusCoroutine(&sync_env, obj_ctx,
sync_status.sync_info));
r = run(new RGWInitSyncStatusCoroutine(&sync_env, sync_status.sync_info));
if (r == -EBUSY) {
backoff.backoff_sleep();
continue;
Expand All @@ -1944,7 +1932,7 @@ int RGWRemoteMetaLog::run_sync()

RGWPeriodHistory::Cursor cursor;
do {
r = run(new RGWReadSyncStatusCoroutine(&sync_env, obj_ctx, &sync_status));
r = run(new RGWReadSyncStatusCoroutine(&sync_env, &sync_status));
if (r < 0 && r != -ENOENT) {
ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
return r;
Expand Down

0 comments on commit 4a5f33d

Please sign in to comment.