diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 86f920e04417c..8ff4044174dd4 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1556,6 +1556,20 @@ static int do_period_pull(const string& remote, const string& url, const string& return 0; } +static int read_current_period_id(RGWRados* store, const std::string& realm_id, + const std::string& realm_name, + std::string* period_id) +{ + RGWRealm realm(realm_id, realm_name); + int ret = realm.init(g_ceph_context, store); + if (ret < 0) { + std::cerr << "failed to read realm: " << cpp_strerror(-ret) << std::endl; + return ret; + } + *period_id = realm.get_current_period(); + return 0; +} + int main(int argc, char **argv) { vector args; @@ -2137,19 +2151,12 @@ int main(int argc, char **argv) break; case OPT_PERIOD_GET_CURRENT: { - RGWRealm realm(realm_id, realm_name); - int ret = realm.init(g_ceph_context, store); - if (ret < 0 ) { - cerr << "Error initializing realm " << cpp_strerror(-ret) << std::endl; - return ret; - } - string current_id = realm.get_current_period(); + int ret = read_current_period_id(store, realm_id, realm_name, &period_id); if (ret < 0) { - cerr << "Error reading current period:" << cpp_strerror(-ret) << std::endl; return ret; } formatter->open_object_section("period_get_current"); - encode_json("current_period", current_id, formatter); + encode_json("current_period", period_id, formatter); formatter->close_section(); formatter->flush(cout); } @@ -2282,21 +2289,18 @@ int main(int argc, char **argv) break; case OPT_REALM_LIST_PERIODS: { - RGWRealm realm(realm_id, realm_name); - int ret = realm.init(g_ceph_context, store); + int ret = read_current_period_id(store, realm_id, realm_name, &period_id); if (ret < 0) { - cerr << "realm.init failed: " << cpp_strerror(-ret) << std::endl; return -ret; } - string current_period = realm.get_current_period(); list periods; - ret = store->list_periods(current_period, periods); + ret = store->list_periods(period_id, periods); if (ret < 0) { cerr << "list periods failed: " << cpp_strerror(-ret) << std::endl; return -ret; } formatter->open_object_section("realm_periods_list"); - encode_json("current_period", current_period, formatter); + encode_json("current_period", period_id, formatter); encode_json("periods", periods, formatter); formatter->close_section(); formatter->flush(cout); @@ -4383,14 +4387,10 @@ int main(int argc, char **argv) int i = (specified_shard_id ? shard_id : 0); if (period_id.empty()) { - // read current_period from the realm - RGWRealm realm(realm_id, realm_name); - ret = realm.init(g_ceph_context, store); + int ret = read_current_period_id(store, realm_id, realm_name, &period_id); if (ret < 0) { - std::cerr << "failed to init realm: " << cpp_strerror(-ret) << std::endl; return -ret; } - period_id = realm.get_current_period(); std::cerr << "No --period given, using current period=" << period_id << std::endl; } @@ -4434,14 +4434,10 @@ int main(int argc, char **argv) int i = (specified_shard_id ? shard_id : 0); if (period_id.empty()) { - // read current_period from the realm - RGWRealm realm(realm_id, realm_name); - int ret = realm.init(g_ceph_context, store); + int ret = read_current_period_id(store, realm_id, realm_name, &period_id); if (ret < 0) { - std::cerr << "failed to init realm: " << cpp_strerror(-ret) << std::endl; return ret; } - period_id = realm.get_current_period(); std::cerr << "No --period given, using current period=" << period_id << std::endl; } diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index ba36756501c33..cdbc9a5d8376e 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -439,7 +439,7 @@ int RGWCoroutinesManager::run(list& stacks) RGWCoroutinesStack *stack = *iter; env.stack = stack; - int ret = stack->operate(&env); + ret = stack->operate(&env); stack->set_is_scheduled(false); if (ret < 0) { ldout(cct, 0) << "ERROR: stack->operate() returned ret=" << ret << dendl; @@ -532,6 +532,9 @@ int RGWCoroutinesManager::run(list& stacks) handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count); iter = scheduled_stacks.begin(); } + if (ret == -ECANCELED) { + break; + } if (iter == scheduled_stacks.end()) { iter = scheduled_stacks.begin(); diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 0b5243f7dd337..0d5fa5dd9f8ae 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -611,3 +611,39 @@ int RGWRadosTimelogAddCR::request_complete() return r; } + +int RGWAsyncStatObj::_send_request() +{ + return store->raw_obj_stat(obj, psize, pmtime, pepoch, + nullptr, nullptr, objv_tracker); +} + +RGWStatObjCR::RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store, + const rgw_obj& obj, uint64_t *psize, + time_t *pmtime, uint64_t *pepoch, + RGWObjVersionTracker *objv_tracker) + : RGWSimpleCoroutine(store->ctx()), store(store), async_rados(async_rados), + obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch), + objv_tracker(objv_tracker) +{ +} + +RGWStatObjCR::~RGWStatObjCR() +{ + if (req) { + req->finish(); + } +} + +int RGWStatObjCR::send_request() +{ + req = new RGWAsyncStatObj(stack->create_completion_notifier(), + store, obj, psize, pmtime, pepoch, objv_tracker); + async_rados->queue(req); + return 0; +} + +int RGWStatObjCR::request_complete() +{ + return req->get_ret_status(); +} diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index e2692741d2abc..3a29b64bec083 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -887,5 +887,42 @@ class RGWRadosTimelogAddCR : public RGWSimpleCoroutine { int request_complete(); }; +class RGWAsyncStatObj : public RGWAsyncRadosRequest { + RGWRados *store; + rgw_obj obj; + uint64_t *psize; + time_t *pmtime; + uint64_t *pepoch; + RGWObjVersionTracker *objv_tracker; +protected: + int _send_request() override; +public: + RGWAsyncStatObj(RGWAioCompletionNotifier *cn, RGWRados *store, + const rgw_obj& obj, uint64_t *psize = nullptr, + time_t *pmtime = nullptr, uint64_t *pepoch = nullptr, + RGWObjVersionTracker *objv_tracker = nullptr) + : RGWAsyncRadosRequest(cn), store(store), obj(obj), psize(psize), + pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {} +}; + +class RGWStatObjCR : public RGWSimpleCoroutine { + RGWRados *store; + RGWAsyncRadosProcessor *async_rados; + rgw_obj obj; + uint64_t *psize; + time_t *pmtime; + uint64_t *pepoch; + RGWObjVersionTracker *objv_tracker; + RGWAsyncStatObj *req = nullptr; + public: + RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store, + const rgw_obj& obj, uint64_t *psize = nullptr, + time_t *pmtime = nullptr, uint64_t *pepoch = nullptr, + RGWObjVersionTracker *objv_tracker = nullptr); + ~RGWStatObjCR(); + + int send_request() override; + int request_complete() override; +}; #endif diff --git a/src/rgw/rgw_metadata.cc b/src/rgw/rgw_metadata.cc index 2f0d1e98174c9..027781bb1e983 100644 --- a/src/rgw/rgw_metadata.cc +++ b/src/rgw/rgw_metadata.cc @@ -1,6 +1,7 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab +#include #include "common/ceph_json.h" #include "rgw_metadata.h" #include "rgw_coroutine.h" @@ -9,6 +10,9 @@ #include "rgw_rados.h" #include "rgw_tools.h" +#include "rgw_cr_rados.h" +#include "rgw_boost_asio_yield.h" + #define dout_subsys ceph_subsys_rgw void LogStatusDump::dump(Formatter *f) const { @@ -357,19 +361,126 @@ RGWMetadataManager::~RGWMetadataManager() handlers.clear(); } -static RGWPeriodHistory::Cursor find_oldest_log_period(RGWRados* store) +namespace { + +class FindAnyShardCR : public RGWCoroutine { + RGWRados *const store; + const RGWMetadataLog& mdlog; + const int num_shards; + int ret = 0; + public: + FindAnyShardCR(RGWRados *store, const RGWMetadataLog& mdlog, int num_shards) + : RGWCoroutine(store->ctx()), store(store), mdlog(mdlog), + num_shards(num_shards) {} + + int operate() { + reenter(this) { + // send stat requests for each shard in parallel + yield { + auto async_rados = store->get_async_rados(); + auto& pool = store->get_zone_params().log_pool; + auto oid = std::string{}; + + for (int i = 0; i < num_shards; i++) { + mdlog.get_shard_oid(i, oid); + auto obj = rgw_obj{pool, oid}; + spawn(new RGWStatObjCR(async_rados, store, obj), true); + } + } + drain_all(); + // if any shards were found, return success + while (collect_next(&ret)) { + if (ret == 0) { + // TODO: cancel instead of waiting for the rest + return set_cr_done(); + } + ret = 0; // collect_next() won't modify &ret unless it's a failure + } + // no shards found + set_retcode(-ENOENT); + return set_cr_error(-ENOENT); + } + return 0; + } +}; + +// return true if any log shards exist for the given period +int find_shards_for_period(RGWRados *store, const std::string& period_id) { - // TODO: search backwards through the period history for the first period with - // no log shard objects, and return its successor (some shards may be missing + auto cct = store->ctx(); + RGWMetadataLog mdlog(cct, store, period_id); + auto num_shards = cct->_conf->rgw_md_log_max_shards; + + using FindAnyShardCRRef = boost::intrusive_ptr; + auto cr = FindAnyShardCRRef{new FindAnyShardCR(store, mdlog, num_shards)}; + + RGWCoroutinesManager mgr(cct, nullptr); + int r = mgr.run(cr.get()); + if (r < 0) { + return r; + } + return cr->get_ret_status(); +} + +RGWPeriodHistory::Cursor find_oldest_log_period(RGWRados *store) +{ + // search backwards through the period history for the first period with no + // log shard objects, and return its successor (some shards may be missing // if they contain no metadata yet, so we need to check all shards) - return store->period_history->get_current(); + auto cursor = store->period_history->get_current(); + auto oldest_log = cursor; + + while (cursor) { + // search for an existing log shard object for this period + int r = find_shards_for_period(store, cursor.get_period().get_id()); + if (r == -ENOENT) { + ldout(store->ctx(), 10) << "find_oldest_log_period found no log shards " + "for period " << cursor.get_period().get_id() << "; returning " + "period " << oldest_log.get_period().get_id() << dendl; + return oldest_log; + } + if (r < 0) { + return RGWPeriodHistory::Cursor{r}; + } + oldest_log = cursor; + + // advance to the period's predecessor + if (!cursor.has_prev()) { + auto& predecessor = cursor.get_period().get_predecessor(); + if (predecessor.empty()) { + // this is the first period, so our logs must start here + ldout(store->ctx(), 10) << "find_oldest_log_period returning first " + "period " << cursor.get_period().get_id() << dendl; + return cursor; + } + // pull the predecessor and add it to our history + RGWPeriod period; + int r = store->period_puller->pull(predecessor, period); + if (r < 0) { + return RGWPeriodHistory::Cursor{r}; + } + auto prev = store->period_history->insert(std::move(period)); + if (!prev) { + return prev; + } + ldout(store->ctx(), 10) << "find_oldest_log_period advancing to " + "predecessor period " << predecessor << dendl; + assert(cursor.has_prev()); + } + cursor.prev(); + } + ldout(store->ctx(), 10) << "find_oldest_log_period returning empty cursor" << dendl; + return cursor; } +} // anonymous namespace + int RGWMetadataManager::init(const std::string& current_period) { - // find our oldest log so we can tell other zones where to start their sync - oldest_log_period = find_oldest_log_period(store); - + if (store->is_meta_master()) { + // find our oldest log so we can tell other zones where to start their sync + oldest_log_period = find_oldest_log_period(store); + } // open a log for the current period current_log = get_log(current_period); return 0; diff --git a/src/rgw/rgw_metadata.h b/src/rgw/rgw_metadata.h index d4e511e42ac8e..ff3efa2973260 100644 --- a/src/rgw/rgw_metadata.h +++ b/src/rgw/rgw_metadata.h @@ -150,12 +150,6 @@ class RGWMetadataLog { return META_LOG_OBJ_PREFIX + period + "."; } - void get_shard_oid(int id, string& oid) { - char buf[16]; - snprintf(buf, sizeof(buf), "%d", id); - oid = prefix + buf; - } - RWLock lock; set modified_shards; @@ -166,6 +160,12 @@ class RGWMetadataLog { prefix(make_prefix(period)), lock("RGWMetaLog::lock") {} + void get_shard_oid(int id, string& oid) const { + char buf[16]; + snprintf(buf, sizeof(buf), "%d", id); + oid = prefix + buf; + } + int add_entry(RGWMetadataHandler *handler, const string& section, const string& key, bufferlist& bl); int store_entries_in_shard(list& entries, int shard_id, librados::AioCompletion *completion); diff --git a/src/rgw/rgw_period_history.cc b/src/rgw/rgw_period_history.cc index caad00c7de194..29926236b46ad 100644 --- a/src/rgw/rgw_period_history.cc +++ b/src/rgw/rgw_period_history.cc @@ -116,15 +116,17 @@ RGWPeriodHistory::Impl::Impl(CephContext* cct, Puller* puller, const RGWPeriod& current_period) : cct(cct), puller(puller) { - // copy the current period into a new history - auto history = new History; - history->periods.push_back(current_period); + if (!current_period.get_id().empty()) { + // copy the current period into a new history + auto history = new History; + history->periods.push_back(current_period); - // insert as our current history - current_history = histories.insert(*history).first; + // insert as our current history + current_history = histories.insert(*history).first; - // get a cursor to the current period - current_cursor = make_cursor(current_history, current_period.get_realm_epoch()); + // get a cursor to the current period + current_cursor = make_cursor(current_history, current_period.get_realm_epoch()); + } } RGWPeriodHistory::Impl::~Impl() @@ -135,6 +137,10 @@ RGWPeriodHistory::Impl::~Impl() Cursor RGWPeriodHistory::Impl::attach(RGWPeriod&& period) { + if (current_history == histories.end()) { + return Cursor{-EINVAL}; + } + const auto epoch = period.get_realm_epoch(); std::string predecessor_id; @@ -177,6 +183,10 @@ Cursor RGWPeriodHistory::Impl::attach(RGWPeriod&& period) Cursor RGWPeriodHistory::Impl::insert(RGWPeriod&& period) { + if (current_history == histories.end()) { + return Cursor{-EINVAL}; + } + std::lock_guard lock(mutex); auto cursor = insert_locked(std::move(period)); @@ -195,7 +205,8 @@ Cursor RGWPeriodHistory::Impl::insert(RGWPeriod&& period) Cursor RGWPeriodHistory::Impl::lookup(epoch_t realm_epoch) { - if (current_history->contains(realm_epoch)) { + if (current_history != histories.end() && + current_history->contains(realm_epoch)) { return make_cursor(current_history, realm_epoch); } return Cursor{}; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 7cd7acbc00861..d666f180d4e8d 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3665,28 +3665,28 @@ int RGWRados::init_complete() obj_expirer->start_processor(); } + /* not point of running sync thread if there is a single zone or + we don't have a master zone configured or there is no rest_master_conn */ + if (get_zonegroup().zones.size() < 2 || get_zonegroup().master_zone.empty() || !rest_master_conn) { + run_sync_thread = false; + } + + async_rados = new RGWAsyncRadosProcessor(this, cct->_conf->rgw_num_async_rados_threads); + async_rados->start(); + ret = meta_mgr->init(current_period.get_id()); if (ret < 0) { lderr(cct) << "ERROR: failed to initialize metadata log: " << cpp_strerror(-ret) << dendl; return ret; } - auto md_log = meta_mgr->get_log(current_period.get_id()); - meta_notifier = new RGWMetaNotifier(this, md_log); if (is_meta_master()) { + auto md_log = meta_mgr->get_log(current_period.get_id()); + meta_notifier = new RGWMetaNotifier(this, md_log); meta_notifier->start(); } - /* not point of running sync thread if there is a single zone or - we don't have a master zone configured or there is no rest_master_conn */ - if (get_zonegroup().zones.size() < 2 || get_zonegroup().master_zone.empty() || !rest_master_conn) { - run_sync_thread = false; - } - - async_rados = new RGWAsyncRadosProcessor(this, cct->_conf->rgw_num_async_rados_threads); - async_rados->start(); - if (run_sync_thread) { Mutex::Locker l(meta_sync_thread_lock); meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_rados); diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index b165b401b7755..3ff085470ade1 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -81,13 +81,13 @@ void RGWOp_MDLog_List::execute() { http_ret = -EINVAL; return; } - RGWMetadataLog *meta_log = store->meta_mgr->get_log(period); + RGWMetadataLog meta_log{s->cct, store, period}; - meta_log->init_list_entries(shard_id, ut_st, ut_et, marker, &handle); + meta_log.init_list_entries(shard_id, ut_st, ut_et, marker, &handle); do { - http_ret = meta_log->list_entries(handle, max_entries, entries, - &last_marker, &truncated); + http_ret = meta_log.list_entries(handle, max_entries, entries, + &last_marker, &truncated); if (http_ret < 0) break; @@ -95,7 +95,7 @@ void RGWOp_MDLog_List::execute() { max_entries -= entries.size(); } while (truncated && (max_entries > 0)); - meta_log->complete_list_entries(handle); + meta_log.complete_list_entries(handle); } void RGWOp_MDLog_List::send_response() { @@ -161,9 +161,9 @@ void RGWOp_MDLog_ShardInfo::execute() { http_ret = -EINVAL; return; } - RGWMetadataLog *meta_log = store->meta_mgr->get_log(period); + RGWMetadataLog meta_log{s->cct, store, period}; - http_ret = meta_log->get_info(shard_id, &info); + http_ret = meta_log.get_info(shard_id, &info); } void RGWOp_MDLog_ShardInfo::send_response() { @@ -215,9 +215,9 @@ void RGWOp_MDLog_Delete::execute() { http_ret = -EINVAL; return; } - RGWMetadataLog *meta_log = store->meta_mgr->get_log(period); + RGWMetadataLog meta_log{s->cct, store, period}; - http_ret = meta_log->trim(shard_id, ut_st, ut_et, start_marker, end_marker); + http_ret = meta_log.trim(shard_id, ut_st, ut_et, start_marker, end_marker); } void RGWOp_MDLog_Lock::execute() { @@ -250,7 +250,7 @@ void RGWOp_MDLog_Lock::execute() { return; } - RGWMetadataLog *meta_log = store->meta_mgr->get_log(period); + RGWMetadataLog meta_log{s->cct, store, period}; unsigned dur; dur = (unsigned)strict_strtol(duration_str.c_str(), 10, &err); if (!err.empty() || dur <= 0) { @@ -259,7 +259,7 @@ void RGWOp_MDLog_Lock::execute() { return; } utime_t time(dur, 0); - http_ret = meta_log->lock_exclusive(shard_id, time, zone_id, locker_id); + http_ret = meta_log.lock_exclusive(shard_id, time, zone_id, locker_id); if (http_ret == -EBUSY) http_ret = -ERR_LOCKED; } @@ -292,8 +292,8 @@ void RGWOp_MDLog_Unlock::execute() { return; } - RGWMetadataLog *meta_log = store->meta_mgr->get_log(period); - http_ret = meta_log->unlock(shard_id, zone_id, locker_id); + RGWMetadataLog meta_log{s->cct, store, period}; + http_ret = meta_log.unlock(shard_id, zone_id, locker_id); } void RGWOp_MDLog_Notify::execute() { diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index bcb6d02b65f1d..6b56dfcec875e 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -24,6 +24,9 @@ #define dout_subsys ceph_subsys_rgw +#undef dout_prefix +#define dout_prefix (*_dout << "rgw meta sync: ") + static string mdlog_sync_status_oid = "mdlog.sync-status"; static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard"; static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index"; @@ -1286,18 +1289,18 @@ class RGWMetaSyncShardCR : public RGWCoroutine { } if (!lost_lock) { - yield { - /* update marker to reflect we're done with full sync */ - if (can_adjust_marker) { - sync_marker.state = rgw_meta_sync_marker::IncrementalSync; - sync_marker.marker = sync_marker.next_step_marker; - sync_marker.next_step_marker.clear(); - } - // XXX: why write the marker if !can_adjust_marker? + /* update marker to reflect we're done with full sync */ + if (can_adjust_marker) yield { + sync_marker.state = rgw_meta_sync_marker::IncrementalSync; + sync_marker.marker = sync_marker.next_step_marker; + sync_marker.next_step_marker.clear(); + RGWRados *store = sync_env->store; ldout(sync_env->cct, 0) << *this << ": saving marker pos=" << sync_marker.marker << dendl; - call(new RGWSimpleRadosWriteCR(sync_env->async_rados, store, pool, - sync_env->shard_obj_name(shard_id), sync_marker)); + using WriteMarkerCR = RGWSimpleRadosWriteCR; + call(new WriteMarkerCR(sync_env->async_rados, store, pool, + sync_env->shard_obj_name(shard_id), + sync_marker)); } if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl; @@ -1503,7 +1506,10 @@ class RGWMetaSyncCR : public RGWCoroutine { RGWPeriodHistory::Cursor next; //< next period in history rgw_meta_sync_status sync_status; - map shard_crs; + std::mutex mutex; //< protect access to shard_crs + + using ControlCRRef = boost::intrusive_ptr; + map shard_crs; public: RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, RGWPeriodHistory::Cursor cursor, @@ -1538,6 +1544,9 @@ class RGWMetaSyncCR : public RGWCoroutine { auto& period_id = sync_status.sync_info.period; auto mdlog = sync_env->store->meta_mgr->get_log(period_id); + // prevent wakeup() from accessing shard_crs while we're spawning them + std::lock_guard lock(mutex); + // sync this period on each shard for (const auto& m : sync_status.sync_markers) { uint32_t shard_id = m.first; @@ -1558,18 +1567,21 @@ class RGWMetaSyncCR : public RGWCoroutine { auto cr = new RGWMetaSyncShardControlCR(sync_env, pool, period_id, mdlog, shard_id, marker, std::move(period_marker)); - // XXX: do we need to hold a ref on cr while it's in shard_crs? shard_crs[shard_id] = cr; spawn(cr, false); } } // wait for each shard to complete collect(&ret); + drain_all(); + { + // drop shard cr refs under lock + std::lock_guard lock(mutex); + shard_crs.clear(); + } if (ret < 0) { - drain_all(); return set_cr_error(ret); } - drain_all(); // advance to the next period assert(next); cursor = next; @@ -1587,7 +1599,8 @@ class RGWMetaSyncCR : public RGWCoroutine { } void wakeup(int shard_id) { - map::iterator iter = shard_crs.find(shard_id); + std::lock_guard lock(mutex); + auto iter = shard_crs.find(shard_id); if (iter == shard_crs.end()) { return; } @@ -1629,8 +1642,11 @@ int RGWRemoteMetaLog::init_sync_status() return r; } sync_info.num_shards = mdlog_info.num_shards; - sync_info.period = mdlog_info.period; - sync_info.realm_epoch = mdlog_info.realm_epoch; + auto cursor = store->period_history->get_current(); + if (cursor) { + sync_info.period = cursor.get_period().get_id(); + sync_info.realm_epoch = cursor.get_epoch(); + } } RGWObjectCtx obj_ctx(store, NULL); @@ -1666,9 +1682,9 @@ static RGWPeriodHistory::Cursor get_period_at(RGWRados* store, return cursor; } - // read the period from rados - RGWPeriod period(info.period); - int r = period.init(store->ctx(), store, store->realm.get_id()); + // read the period from rados or pull it from the master + RGWPeriod period; + int r = store->period_puller->pull(info.period, period); if (r < 0) { lderr(store->ctx()) << "ERROR: failed to read period id " << info.period << ": " << cpp_strerror(r) << dendl; @@ -1707,17 +1723,25 @@ int RGWRemoteMetaLog::run_sync() return r; } - if (!mdlog_info.period.empty() && sync_status.sync_info.period.empty()) { - // restart sync if the remote has a period but our status does not - sync_status.sync_info.state = rgw_meta_sync_info::StateInit; + if (!mdlog_info.period.empty()) { + // restart sync if the remote has a period, but: + // a) our status does not, or + // b) our sync period comes before the remote's oldest log period + if (sync_status.sync_info.period.empty() || + sync_status.sync_info.realm_epoch < mdlog_info.realm_epoch) { + sync_status.sync_info.state = rgw_meta_sync_info::StateInit; + } } if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) { ldout(store->ctx(), 20) << __func__ << "(): init" << dendl; sync_status.sync_info.num_shards = mdlog_info.num_shards; - // use the period/epoch from the master's oldest log - sync_status.sync_info.period = mdlog_info.period; - sync_status.sync_info.realm_epoch = mdlog_info.realm_epoch; + auto cursor = store->period_history->get_current(); + if (cursor) { + // run full sync, then start incremental from the current period/epoch + sync_status.sync_info.period = cursor.get_period().get_id(); + sync_status.sync_info.realm_epoch = cursor.get_epoch(); + } r = run(new RGWInitSyncStatusCoroutine(&sync_env, obj_ctx, sync_status.sync_info)); if (r == -EBUSY) {