Skip to content

Commit

Permalink
rgw: clean up RGWInitDataSyncStatusCoroutine
Browse files Browse the repository at this point in the history
RGWInitDataSyncStatusCoroutine operates on a given rgw_data_sync_status
pointer, which saves us from having to read it back from rados

Signed-off-by: Casey Bodley <cbodley@redhat.com>
  • Loading branch information
cbodley committed Apr 26, 2017
1 parent 7c23713 commit 69be410
Showing 1 changed file with 67 additions and 70 deletions.
137 changes: 67 additions & 70 deletions src/rgw/rgw_data_sync.cc
Expand Up @@ -456,22 +456,25 @@ bool RGWListRemoteDataLogCR::spawn_next() {
}

class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
static constexpr uint32_t lock_duration = 30;
RGWDataSyncEnv *sync_env;

RGWRados *store;
const rgw_pool& pool;
const uint32_t num_shards;

string sync_status_oid;

string lock_name;
string cookie;
rgw_data_sync_info status;
rgw_data_sync_status *status;
map<int, RGWDataChangesLogInfo> shards_info;
public:
RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
uint32_t _num_shards) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env), store(sync_env->store) {
RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
rgw_data_sync_status *status)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
pool(store->get_zone_params().log_pool),
num_shards(num_shards), status(status) {
lock_name = "sync_lock";
status.num_shards = _num_shards;

#define COOKIE_LEN 16
char buf[COOKIE_LEN + 1];
Expand All @@ -485,80 +488,81 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
int operate() override {
int ret;
reenter(this) {
yield {
uint32_t lock_duration = 30;
call(new RGWSimpleRadosLockCR(sync_env->async_rados, store,
rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
lock_name, cookie, lock_duration));
if (retcode < 0) {
ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
return set_cr_error(retcode);
}
using LockCR = RGWSimpleRadosLockCR;
yield call(new LockCR(sync_env->async_rados, store,
rgw_raw_obj{pool, sync_status_oid},
lock_name, cookie, lock_duration));
if (retcode < 0) {
ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
return set_cr_error(retcode);
}
yield {
call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados,
store,
rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
status));
}
yield { /* take lock again, we just recreated the object */
uint32_t lock_duration = 30;
call(new RGWSimpleRadosLockCR(sync_env->async_rados,
store,
rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
lock_name, cookie, lock_duration));
if (retcode < 0) {
ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
return set_cr_error(retcode);
}
using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
yield call(new WriteInfoCR(sync_env->async_rados, store,
rgw_raw_obj{pool, sync_status_oid},
status->sync_info));
if (retcode < 0) {
ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
return set_cr_error(retcode);
}

/* take lock again, we just recreated the object */
yield call(new LockCR(sync_env->async_rados, store,
rgw_raw_obj{pool, sync_status_oid},
lock_name, cookie, lock_duration));
if (retcode < 0) {
ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
return set_cr_error(retcode);
}

/* fetch current position in logs */
yield {
RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
if (!conn) {
ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
return set_cr_error(-EIO);
}
for (int i = 0; i < (int)status.num_shards; i++) {
for (uint32_t i = 0; i < num_shards; i++) {
spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
}
}
}
while (collect(&ret, NULL)) {
if (ret < 0) {
return set_state(RGWCoroutine_Error);
}
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
return set_state(RGWCoroutine_Error);
}
yield;
}
yield {
for (int i = 0; i < (int)status.num_shards; i++) {
rgw_data_sync_marker marker;
for (uint32_t i = 0; i < num_shards; i++) {
RGWDataChangesLogInfo& info = shards_info[i];
marker.next_step_marker = info.marker;
marker.timestamp = info.last_update;
spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i)),
marker), true);
auto& marker = status->sync_markers[i];
marker.next_step_marker = info.marker;
marker.timestamp = info.last_update;
const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
spawn(new WriteMarkerCR(sync_env->async_rados, store,
rgw_raw_obj{pool, oid}, marker), true);
}
}
yield {
status.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
status));
}
yield { /* unlock */
call(new RGWSimpleRadosUnlockCR(sync_env->async_rados,
store,
rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
lock_name, cookie));
}
while (collect(&ret, NULL)) {
if (ret < 0) {
return set_state(RGWCoroutine_Error);
}
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
return set_state(RGWCoroutine_Error);
}
yield;
}
drain_all();

status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
yield call(new WriteInfoCR(sync_env->async_rados, store,
rgw_raw_obj{pool, sync_status_oid},
status->sync_info));
if (retcode < 0) {
ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
return set_cr_error(retcode);
}
yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
rgw_raw_obj{pool, sync_status_oid},
lock_name, cookie));
return set_cr_done();
}
return 0;
Expand Down Expand Up @@ -666,6 +670,7 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)

int RGWRemoteDataLog::init_sync_status(int num_shards)
{
rgw_data_sync_status sync_status;
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
int ret = http_manager.set_threaded();
Expand All @@ -675,7 +680,7 @@ int RGWRemoteDataLog::init_sync_status(int num_shards)
}
RGWDataSyncEnv sync_env_local = sync_env;
sync_env_local.http_manager = &http_manager;
ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards));
ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, &sync_status));
http_manager.stop();
return ret;
}
Expand Down Expand Up @@ -1458,20 +1463,12 @@ class RGWDataSyncCR : public RGWCoroutine {
/* state: init status */
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
yield call(new RGWInitDataSyncStatusCoroutine(sync_env, sync_status.sync_info.num_shards));
yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, &sync_status));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
sync_status.sync_info.num_shards = num_shards;
sync_status.sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
/* update new state */
yield call(set_sync_info_cr());

if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
// sets state = StateBuildingFullSyncMaps

*reset_backoff = true;
}
Expand Down

0 comments on commit 69be410

Please sign in to comment.