From c5c95e7f225d59a8bdd8eda3742053b77492c40c Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 8 Jun 2016 09:37:26 -0400 Subject: [PATCH 01/13] rgw: add empty_on_enoent flag to RGWSimpleRadosReadCR RGWSimpleRadosReadCR won't currently fail with ENOENT, but instead passes an empty object to handle_data(). add an empty_on_enoent flag to the constructor, defaulting to true, to make this behavior optional for callers that do want to fail on ENOENT Signed-off-by: Casey Bodley --- src/rgw/rgw_cr_rados.h | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 24a284d63d527..81cae4d281302 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -180,22 +180,21 @@ class RGWSimpleRadosReadCR : public RGWSimpleCoroutine { rgw_bucket pool; string oid; - map *pattrs; + map *pattrs{nullptr}; T *result; + /// on ENOENT, call handle_data() with an empty object instead of failing + const bool empty_on_enoent; - RGWAsyncGetSystemObj *req; + RGWAsyncGetSystemObj *req{nullptr}; public: RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_bucket& _pool, const string& _oid, - T *_result) : RGWSimpleCoroutine(_store->ctx()), - async_rados(_async_rados), store(_store), - obj_ctx(store), - pool(_pool), oid(_oid), - pattrs(NULL), - result(_result), - req(NULL) { } + T *_result, bool empty_on_enoent = true) + : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), + obj_ctx(store), pool(_pool), oid(_oid), result(_result), + empty_on_enoent(empty_on_enoent) {} ~RGWSimpleRadosReadCR() { request_cleanup(); } @@ -235,7 +234,9 @@ int RGWSimpleRadosReadCR::request_complete() { int ret = req->get_ret_status(); retcode = ret; - if (ret != -ENOENT) { + if (ret == -ENOENT && empty_on_enoent) { + *result = T(); + } else { if (ret < 0) { return ret; } @@ -245,8 +246,6 @@ int RGWSimpleRadosReadCR::request_complete() } catch (buffer::error& err) { return -EIO; } - } else { - *result = T(); } return handle_data(*result); From 30823c5d4e98eb501ff1c2f7315705c3c5dd614d Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 7 Jun 2016 15:50:18 -0400 Subject: [PATCH 02/13] rgw: use RGWShardCollectCR for RGWReadDataSyncStatusCoroutine this allows us to limit the number of outstanding requests for shard markers there also appeared to be issues with spawning the shard CRs from RGWReadDataSyncStatusCoroutine::handle_data(), because handle_data() was returning before the shard CRs completed Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 82 ++++++++++++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 20 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f22775b5d620c..1d1f8bfcb34bc 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -112,34 +112,76 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) { JSONDecoder::decode_json("entries", entries, obj); }; -class RGWReadDataSyncStatusCoroutine : public RGWSimpleRadosReadCR { - RGWDataSyncEnv *sync_env; +class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + + RGWDataSyncEnv *env; + const int num_shards; + int shard_id{0}; + map& markers; + + public: + RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards, + map& markers) + : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), + env(env), num_shards(num_shards), markers(markers) + {} + bool spawn_next() override; +}; + +bool RGWReadDataSyncStatusMarkersCR::spawn_next() +{ + if (shard_id >= num_shards) { + return false; + } + using CR = RGWSimpleRadosReadCR; + spawn(new CR(env->async_rados, env->store, env->store->get_zone_params().log_pool, + RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id), + &markers[shard_id]), + false); + shard_id++; + return true; +} +class RGWReadDataSyncStatusCoroutine : public RGWCoroutine { + RGWDataSyncEnv *sync_env; rgw_data_sync_status *sync_status; public: RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, - rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_sync_env->async_rados, _sync_env->store, - _sync_env->store->get_zone_params().log_pool, - RGWDataSyncStatusManager::sync_status_oid(_sync_env->source_zone), - &_status->sync_info), - sync_env(_sync_env), - sync_status(_status) {} - - int handle_data(rgw_data_sync_info& data); + rgw_data_sync_status *_status) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status) + {} + int operate() override; }; -int RGWReadDataSyncStatusCoroutine::handle_data(rgw_data_sync_info& data) +int RGWReadDataSyncStatusCoroutine::operate() { - if (retcode == -ENOENT) { - return retcode; - } - - map& markers = sync_status->sync_markers; - RGWRados *store = sync_env->store; - for (int i = 0; i < (int)data.num_shards; i++) { - spawn(new RGWSimpleRadosReadCR(sync_env->async_rados, store, store->get_zone_params().log_pool, - RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i), &markers[i]), true); + reenter(this) { + // read sync info + using ReadInfoCR = RGWSimpleRadosReadCR; + yield { + bool empty_on_enoent = false; // fail on ENOENT + call(new ReadInfoCR(sync_env->async_rados, sync_env->store, + sync_env->store->get_zone_params().log_pool, + RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone), + &sync_status->sync_info, empty_on_enoent)); + } + if (retcode < 0) { + ldout(sync_env->cct, 4) << "failed to read sync status info with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + // read shard markers + using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR; + yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards, + sync_status->sync_markers)); + if (retcode < 0) { + ldout(sync_env->cct, 4) << "failed to read sync status markers with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + return set_cr_done(); } return 0; } From 821c70d710295ceb5fd12b3058df6a672ed50b0a Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Mon, 16 May 2016 15:07:43 -0400 Subject: [PATCH 03/13] rgw: add dout_prefix for rgw_data_sync.cc and took out the redundant 'rgw' from 'rgw meta sync:' Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 3 +++ src/rgw/rgw_sync.cc | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 1d1f8bfcb34bc..d5ace03f37416 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -23,6 +23,9 @@ #define dout_subsys ceph_subsys_rgw +#undef dout_prefix +#define dout_prefix (*_dout << "data sync: ") + static string datalog_sync_status_oid_prefix = "datalog.sync-status"; static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard"; static string datalog_sync_full_sync_index_prefix = "data.full-sync.index"; diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 474e2c558d87f..d82d11abdee90 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -25,7 +25,7 @@ #define dout_subsys ceph_subsys_rgw #undef dout_prefix -#define dout_prefix (*_dout << "rgw meta sync: ") +#define dout_prefix (*_dout << "meta sync: ") static string mdlog_sync_status_oid = "mdlog.sync-status"; static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard"; From ebbb70bd5165e3e835d8e528e038d512e7dedba9 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Mon, 6 Jun 2016 14:43:09 -0400 Subject: [PATCH 04/13] rgw: add json decoders for data sync status Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.h | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index f3fc2f2a28367..0bdb151128578 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -6,6 +6,7 @@ #include "rgw_bucket.h" #include "common/RWLock.h" +#include "common/ceph_json.h" struct rgw_datalog_info { @@ -59,6 +60,18 @@ struct rgw_data_sync_info { encode_json("status", s, f); encode_json("num_shards", num_shards, f); } + void decode_json(JSONObj *obj) { + std::string s; + JSONDecoder::decode_json("status", s, obj); + if (s == "building-full-sync-maps") { + state = StateBuildingFullSyncMaps; + } else if (s == "sync") { + state = StateSync; + } else { + state = StateInit; + } + JSONDecoder::decode_json("num_shards", num_shards, obj); + } rgw_data_sync_info() : state((int)StateInit), num_shards(0) {} }; @@ -108,6 +121,18 @@ struct rgw_data_sync_marker { encode_json("pos", pos, f); encode_json("timestamp", utime_t(timestamp), f); } + void decode_json(JSONObj *obj) { + int s; + JSONDecoder::decode_json("state", s, obj); + state = s; + JSONDecoder::decode_json("marker", marker, obj); + JSONDecoder::decode_json("next_step_marker", next_step_marker, obj); + JSONDecoder::decode_json("total_entries", total_entries, obj); + JSONDecoder::decode_json("pos", pos, obj); + utime_t t; + JSONDecoder::decode_json("timestamp", t, obj); + timestamp = t.to_real_time(); + } }; WRITE_CLASS_ENCODER(rgw_data_sync_marker) @@ -135,6 +160,10 @@ struct rgw_data_sync_status { encode_json("info", sync_info, f); encode_json("markers", sync_markers, f); } + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("info", sync_info, obj); + JSONDecoder::decode_json("markers", sync_markers, obj); + } }; WRITE_CLASS_ENCODER(rgw_data_sync_status) From 2cc533b30c0f23c0750ea8d02c51b3b3d3b4821a Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 8 Jun 2016 11:24:11 -0400 Subject: [PATCH 05/13] rgw: don't ignore ENOENT in RGWRemoteDataLog::read_sync_status() rest handlers for sync status need to return ENOENT errors. the only other callers are in radosgw-admin, so the ENOENT errors are ignored at those call sites instead Signed-off-by: Casey Bodley --- src/rgw/rgw_admin.cc | 4 ++-- src/rgw/rgw_data_sync.cc | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index addf36c579ab0..b457495e7f201 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1788,7 +1788,7 @@ static void get_data_sync_status(const string& source_zone, list& status } ret = sync.read_sync_status(); - if (ret < 0) { + if (ret < 0 && ret != -ENOENT) { push_ss(ss, status, tab) << string("failed read sync status: ") + cpp_strerror(-ret); return; } @@ -5091,7 +5091,7 @@ int main(int argc, char **argv) } ret = sync.read_sync_status(); - if (ret < 0) { + if (ret < 0 && ret != -ENOENT) { cerr << "ERROR: sync.read_sync_status() returned ret=" << ret << std::endl; return -ret; } diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index d5ace03f37416..568ee2727d203 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -639,11 +639,7 @@ int RGWRemoteDataLog::get_shard_info(int shard_id) int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status) { - int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, sync_status)); - if (r == -ENOENT) { - r = 0; - } - return r; + return run(new RGWReadDataSyncStatusCoroutine(&sync_env, sync_status)); } int RGWRemoteDataLog::init_sync_status(int num_shards) From ccef4b0f59c598eff2afc908f639ccf560782015 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 21 Jul 2016 23:43:06 -0400 Subject: [PATCH 06/13] rgw: expose sync managers through RGWRados Signed-off-by: Casey Bodley --- src/rgw/rgw_rados.cc | 22 +++++++++++++++++++++- src/rgw/rgw_rados.h | 5 +++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index b38ff7cdb0e39..f05b0a139cb14 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2960,6 +2960,7 @@ class RGWMetaSyncProcessorThread : public RGWSyncProcessorThread sync.wakeup(*iter); } } + RGWMetaSyncStatusManager* get_manager() { return &sync; } int init() { int ret = sync.init(); @@ -3003,7 +3004,7 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread sync.wakeup(iter->first, iter->second); } } - + RGWDataSyncStatusManager* get_manager() { return &sync; } int init() { return 0; @@ -3050,6 +3051,25 @@ void RGWRados::wakeup_data_sync_shards(const string& source_zone, mapwakeup_sync_shards(shard_ids); } +RGWMetaSyncStatusManager* RGWRados::get_meta_sync_manager() +{ + Mutex::Locker l(meta_sync_thread_lock); + if (meta_sync_processor_thread) { + return meta_sync_processor_thread->get_manager(); + } + return nullptr; +} + +RGWDataSyncStatusManager* RGWRados::get_data_sync_manager(const std::string& source_zone) +{ + Mutex::Locker l(data_sync_thread_lock); + auto thread = data_sync_processor_threads.find(source_zone); + if (thread == data_sync_processor_threads.end()) { + return nullptr; + } + return thread->second->get_manager(); +} + int RGWRados::get_required_alignment(rgw_bucket& bucket, uint64_t *alignment) { IoCtx ioctx; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 5b8d2ec502786..91eab4bedf502 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1516,6 +1516,8 @@ class RGWPeriod WRITE_CLASS_ENCODER(RGWPeriod) class RGWDataChangesLog; +class RGWMetaSyncStatusManager; +class RGWDataSyncStatusManager; class RGWReplicaLogger; class RGWCoroutinesManagerRegistry; @@ -2586,6 +2588,9 @@ class RGWRados void wakeup_meta_sync_shards(set& shard_ids); void wakeup_data_sync_shards(const string& source_zone, map >& shard_ids); + RGWMetaSyncStatusManager* get_meta_sync_manager(); + RGWDataSyncStatusManager* get_data_sync_manager(const std::string& source_zone); + int set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner); int set_buckets_enabled(std::vector& buckets, bool enabled); int bucket_suspended(rgw_bucket& bucket, bool *suspended); From b7cd4e0e8b879b5e528de75bea3307585b96cbf2 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 21 Jul 2016 23:46:20 -0400 Subject: [PATCH 07/13] rgw: change read_sync_status interface RGWDataSyncStatusManager::read_sync_status() now operates on the given parameter, rather than its internal member variable. this allows multiple concurrent readers, which is needed for the rest interface Signed-off-by: Casey Bodley --- src/rgw/rgw_admin.cc | 10 ++++------ src/rgw/rgw_data_sync.cc | 10 ++-------- src/rgw/rgw_data_sync.h | 11 +++++------ 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index b457495e7f201..92730f8735618 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1787,14 +1787,13 @@ static void get_data_sync_status(const string& source_zone, list& status return; } - ret = sync.read_sync_status(); + rgw_data_sync_status sync_status; + ret = sync.read_sync_status(&sync_status); if (ret < 0 && ret != -ENOENT) { push_ss(ss, status, tab) << string("failed read sync status: ") + cpp_strerror(-ret); return; } - const rgw_data_sync_status& sync_status = sync.get_sync_status(); - string status_str; switch (sync_status.sync_info.state) { case rgw_data_sync_info::StateInit: @@ -5090,14 +5089,13 @@ int main(int argc, char **argv) return -ret; } - ret = sync.read_sync_status(); + rgw_data_sync_status sync_status; + ret = sync.read_sync_status(&sync_status); if (ret < 0 && ret != -ENOENT) { cerr << "ERROR: sync.read_sync_status() returned ret=" << ret << std::endl; return -ret; } - rgw_data_sync_status& sync_status = sync.get_sync_status(); - formatter->open_object_section("summary"); encode_json("sync_status", sync_status, formatter); diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 568ee2727d203..e333014dfc824 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1521,18 +1521,12 @@ void RGWRemoteDataLog::wakeup(int shard_id, set& keys) { data_sync_cr->wakeup(shard_id, keys); } -int RGWRemoteDataLog::run_sync(int num_shards, rgw_data_sync_status& sync_status) +int RGWRemoteDataLog::run_sync(int num_shards) { - int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, &sync_status)); - if (r < 0 && r != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read sync status from source_zone=" << sync_env.source_zone << " r=" << r << dendl; - return r; - } - lock.get_write(); data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards); lock.unlock(); - r = run(data_sync_cr); + int r = run(data_sync_cr); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl; return r; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 0bdb151128578..1c950674c2245 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -241,7 +241,7 @@ class RGWRemoteDataLog : public RGWCoroutinesManager { int get_shard_info(int shard_id); int read_sync_status(rgw_data_sync_status *sync_status); int init_sync_status(int num_shards); - int run_sync(int num_shards, rgw_data_sync_status& sync_status); + int run_sync(int num_shards); void wakeup(int shard_id, set& keys); }; @@ -260,7 +260,6 @@ class RGWDataSyncStatusManager { string source_shard_status_oid_prefix; rgw_obj source_status_obj; - rgw_data_sync_status sync_status; map shard_objs; int num_shards; @@ -276,12 +275,12 @@ class RGWDataSyncStatusManager { int init(); void finalize(); - rgw_data_sync_status& get_sync_status() { return sync_status; } - static string shard_obj_name(const string& source_zone, int shard_id); static string sync_status_oid(const string& source_zone); - int read_sync_status() { return source_log.read_sync_status(&sync_status); } + int read_sync_status(rgw_data_sync_status *sync_status) { + return source_log.read_sync_status(sync_status); + } int init_sync_status() { return source_log.init_sync_status(num_shards); } int read_log_info(rgw_datalog_info *log_info) { @@ -294,7 +293,7 @@ class RGWDataSyncStatusManager { return source_log.read_source_log_shards_next(shard_markers, result); } - int run() { return source_log.run_sync(num_shards, sync_status); } + int run() { return source_log.run_sync(num_shards); } void wakeup(int shard_id, set& keys) { return source_log.wakeup(shard_id, keys); } void stop() { From 6b1e40d7a21c549b55e6576ec56973c8d3c092d1 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 22 Jul 2016 11:00:16 -0400 Subject: [PATCH 08/13] rgw: use separate cr manager for read_sync_status RGWCoroutinesManager::run() is not reentrant, so concurrent users of read_sync_status() must use different managers Signed-off-by: Casey Bodley --- src/rgw/rgw_data_sync.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index e333014dfc824..fe7155c8036a4 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -639,12 +639,15 @@ int RGWRemoteDataLog::get_shard_info(int shard_id) int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status) { - return run(new RGWReadDataSyncStatusCoroutine(&sync_env, sync_status)); + // cannot run concurrently with run_sync(), so run in a separate manager + RGWCoroutinesManager crs(store->ctx(), nullptr); + return crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env, sync_status)); } int RGWRemoteDataLog::init_sync_status(int num_shards) { - return run(new RGWInitDataSyncStatusCoroutine(&sync_env, num_shards)); + RGWCoroutinesManager crs(store->ctx(), nullptr); + return crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env, num_shards)); } static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id) From a66b4cc9faaf003e9b3389994470048f1a317405 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 20 May 2016 18:13:25 -0400 Subject: [PATCH 09/13] rgw: add rest handlers to query sync status Signed-off-by: Casey Bodley --- src/rgw/rgw_rest_log.cc | 85 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index 6f8bc644418d1..cf2a1cdd67c48 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -18,6 +18,8 @@ #include "rgw_rest_s3.h" #include "rgw_rest_log.h" #include "rgw_client_io.h" +#include "rgw_sync.h" +#include "rgw_data_sync.h" #include "common/errno.h" #include "include/assert.h" @@ -837,6 +839,85 @@ void RGWOp_DATALog_Delete::execute() { http_ret = store->data_log->trim_entries(shard_id, ut_st, ut_et, start_marker, end_marker); } +// not in header to avoid pulling in rgw_sync.h +class RGWOp_MDLog_Status : public RGWRESTOp { + rgw_meta_sync_status status; +public: + int check_caps(RGWUserCaps& caps) override { + return caps.check_cap("mdlog", RGW_CAP_READ); + } + int verify_permission() override { + return check_caps(s->user->caps); + } + void execute() override; + void send_response() override; + const string name() override { return "get_metadata_log_status"; } +}; + +void RGWOp_MDLog_Status::execute() +{ + auto sync = store->get_meta_sync_manager(); + if (sync == nullptr) { + ldout(s->cct, 1) << "no sync manager" << dendl; + http_ret = -ENOENT; + return; + } + http_ret = sync->read_sync_status(); + status = sync->get_sync_status(); +} + +void RGWOp_MDLog_Status::send_response() +{ + set_req_state_err(s, http_ret); + dump_errno(s); + end_header(s); + + if (http_ret >= 0) { + encode_json("status", status, s->formatter); + } + flusher.flush(); +} + +// not in header to avoid pulling in rgw_data_sync.h +class RGWOp_DATALog_Status : public RGWRESTOp { + rgw_data_sync_status status; +public: + int check_caps(RGWUserCaps& caps) override { + return caps.check_cap("datalog", RGW_CAP_READ); + } + int verify_permission() override { + return check_caps(s->user->caps); + } + void execute() override ; + void send_response() override; + const string name() override { return "get_data_changes_log_status"; } +}; + +void RGWOp_DATALog_Status::execute() +{ + const auto source_zone = s->info.args.get("source-zone"); + auto sync = store->get_data_sync_manager(source_zone); + if (sync == nullptr) { + ldout(s->cct, 1) << "no sync manager for source-zone " << source_zone << dendl; + http_ret = -ENOENT; + return; + } + http_ret = sync->read_sync_status(&status); +} + +void RGWOp_DATALog_Status::send_response() +{ + set_req_state_err(s, http_ret); + dump_errno(s); + end_header(s); + + if (http_ret >= 0) { + encode_json("status", status, s->formatter); + } + flusher.flush(); +} + + RGWOp *RGWHandler_Log::op_get() { bool exists; string type = s->info.args.get("type", &exists); @@ -852,6 +933,8 @@ RGWOp *RGWHandler_Log::op_get() { } else { return new RGWOp_MDLog_List; } + } else if (s->info.args.exists("status")) { + return new RGWOp_MDLog_Status; } else { return new RGWOp_MDLog_Info; } @@ -868,6 +951,8 @@ RGWOp *RGWHandler_Log::op_get() { } else { return new RGWOp_DATALog_List; } + } else if (s->info.args.exists("status")) { + return new RGWOp_DATALog_Status; } else { return new RGWOp_DATALog_Info; } From d67436bb0d6cb9ae7757916547b8274990f8cc6b Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 8 Jun 2016 15:17:55 -0400 Subject: [PATCH 10/13] rgw: enable async calls to time_log_trim Signed-off-by: Casey Bodley --- src/rgw/rgw_rados.cc | 13 +++++++++++-- src/rgw/rgw_rados.h | 3 ++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index f05b0a139cb14..0b3b7ec1b3c8b 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -4621,7 +4621,8 @@ int RGWRados::time_log_info_async(librados::IoCtx& io_ctx, const string& oid, cl } int RGWRados::time_log_trim(const string& oid, const real_time& start_time, const real_time& end_time, - const string& from_marker, const string& to_marker) + const string& from_marker, const string& to_marker, + librados::AioCompletion *completion) { librados::IoCtx io_ctx; @@ -4634,7 +4635,15 @@ int RGWRados::time_log_trim(const string& oid, const real_time& start_time, cons utime_t st(start_time); utime_t et(end_time); - return cls_log_trim(io_ctx, oid, st, et, from_marker, to_marker); + ObjectWriteOperation op; + cls_log_trim(op, st, et, from_marker, to_marker); + + if (!completion) { + r = io_ctx.operate(oid, &op); + } else { + r = io_ctx.aio_operate(oid, completion, &op); + } + return r; } string RGWRados::objexp_hint_get_shardname(int shard_num) diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 91eab4bedf502..b4e43c4d9515e 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2826,7 +2826,8 @@ class RGWRados int time_log_info(const string& oid, cls_log_header *header); int time_log_info_async(librados::IoCtx& io_ctx, const string& oid, cls_log_header *header, librados::AioCompletion *completion); int time_log_trim(const string& oid, const ceph::real_time& start_time, const ceph::real_time& end_time, - const string& from_marker, const string& to_marker); + const string& from_marker, const string& to_marker, + librados::AioCompletion *completion = nullptr); string objexp_hint_get_shardname(int shard_num); int objexp_key_shard(const rgw_obj_key& key); From 20f31a412d3479efe71d5745c55575173c3fe9e8 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 8 Jun 2016 15:54:31 -0400 Subject: [PATCH 11/13] rgw: add RGWRadosTimelogTrimCR Signed-off-by: Casey Bodley --- src/rgw/rgw_cr_rados.cc | 41 +++++++++++++++++++++++++++++++++++++++++ src/rgw/rgw_cr_rados.h | 20 ++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 19fb94944d6d9..99f8427e45178 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -651,6 +651,47 @@ int RGWRadosTimelogAddCR::request_complete() return r; } +RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados *store, + const std::string& oid, + const real_time& start_time, + const real_time& end_time, + const std::string& from_marker, + const std::string& to_marker) + : RGWSimpleCoroutine(store->ctx()), store(store), oid(oid), + start_time(start_time), end_time(end_time), + from_marker(from_marker), to_marker(to_marker) +{ + set_description() << "timelog trim oid=" << oid + << " start_time=" << start_time << " end_time=" << end_time + << " from_marker=" << from_marker << " to_marker=" << to_marker; +} + +RGWRadosTimelogTrimCR::~RGWRadosTimelogTrimCR() +{ + if (cn) { + cn->put(); + } +} + +int RGWRadosTimelogTrimCR::send_request() +{ + set_status() << "sending request"; + + cn = stack->create_completion_notifier(); + cn->get(); + return store->time_log_trim(oid, start_time, end_time, from_marker, + to_marker, cn->completion()); +} + +int RGWRadosTimelogTrimCR::request_complete() +{ + int r = cn->completion()->get_return_value(); + + set_status() << "request complete; ret=" << r; + + return r; +} + int RGWAsyncStatObj::_send_request() { return store->raw_obj_stat(obj, psize, pmtime, pepoch, diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 81cae4d281302..5174ab6e1f29b 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -981,6 +981,26 @@ class RGWRadosTimelogAddCR : public RGWSimpleCoroutine { int request_complete(); }; +class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine { + RGWRados *store; + RGWAioCompletionNotifier *cn{nullptr}; + std::string oid; + real_time start_time; + real_time end_time; + std::string from_marker; + std::string to_marker; + +public: + RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid, + const real_time& start_time, const real_time& end_time, + const std::string& from_marker, + const std::string& to_marker); + ~RGWRadosTimelogTrimCR(); + + int send_request() override; + int request_complete() override; +}; + class RGWAsyncStatObj : public RGWAsyncRadosRequest { RGWRados *store; rgw_obj obj; From 6a366f955a0caa4febdbcf855a9da482a27b7fb0 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Mon, 6 Jun 2016 16:46:56 -0400 Subject: [PATCH 12/13] rgw: add RGWDataLogTrimCR Signed-off-by: Casey Bodley --- src/rgw/rgw_bucket.h | 1 + src/rgw/rgw_cr_rados.h | 3 +- src/rgw/rgw_data_sync.cc | 145 +++++++++++++++++++++++++++++++++++++++ src/rgw/rgw_data_sync.h | 17 +++++ 4 files changed, 165 insertions(+), 1 deletion(-) diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 12182d12935ab..ee52ad626b7ec 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -466,6 +466,7 @@ class RGWDataChangesLog { ~RGWDataChangesLog(); int choose_oid(const rgw_bucket_shard& bs); + const std::string& get_oid(int shard_id) const { return oids[shard_id]; } int add_entry(rgw_bucket& bucket, int shard_id); int get_log_shard_id(rgw_bucket& bucket, int shard_id); int renew_entries(); diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 5174ab6e1f29b..a1cbfabc8dee3 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -984,13 +984,14 @@ class RGWRadosTimelogAddCR : public RGWSimpleCoroutine { class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine { RGWRados *store; RGWAioCompletionNotifier *cn{nullptr}; + protected: std::string oid; real_time start_time; real_time end_time; std::string from_marker; std::string to_marker; -public: + public: RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid, const real_time& start_time, const real_time& end_time, const std::string& from_marker, diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index fe7155c8036a4..051231129269a 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2838,3 +2838,148 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone, return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key(); } + +#undef dout_prefix +#define dout_prefix (*_dout << "data trim: ") + +namespace { + +/// return the marker that it's safe to trim up to +const std::string& get_stable_marker(const rgw_data_sync_marker& m) +{ + return m.state == m.FullSync ? m.next_step_marker : m.marker; +} + +/// comparison operator for take_min_markers() +bool operator<(const rgw_data_sync_marker& lhs, + const rgw_data_sync_marker& rhs) +{ + // sort by stable marker + return get_stable_marker(lhs) < get_stable_marker(rhs); +} + +/// populate the container starting with 'dest' with the minimum stable marker +/// of each shard for all of the peers in [first, last) +template +void take_min_markers(IterIn first, IterIn last, IterOut dest) +{ + if (first == last) { + return; + } + // initialize markers with the first peer's + auto m = dest; + for (auto &shard : first->sync_markers) { + *m = std::move(shard.second); + ++m; + } + // for remaining peers, replace with smaller markers + for (auto p = first + 1; p != last; ++p) { + m = dest; + for (auto &shard : p->sync_markers) { + if (shard.second < *m) { + *m = std::move(shard.second); + } + ++m; + } + } +} + +// wrapper to update last_trim_marker on success +class LastTimelogTrimCR : public RGWRadosTimelogTrimCR { + CephContext *cct; + std::string *last_trim_marker; + public: + LastTimelogTrimCR(RGWRados *store, const std::string& oid, + const std::string& to_marker, std::string *last_trim_marker) + : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{}, + std::string{}, to_marker), + cct(store->ctx()), last_trim_marker(last_trim_marker) + {} + int request_complete() override { + int r = RGWRadosTimelogTrimCR::request_complete(); + if (r < 0 && r != -ENODATA) { + ldout(cct, 1) << "failed to trim datalog: " << cpp_strerror(r) << dendl; + return r; + } + ldout(cct, 10) << "datalog trimmed to marker " << to_marker << dendl; + *last_trim_marker = to_marker; + return 0; + } +}; + +} // anonymous namespace + +RGWDataLogTrimCR::RGWDataLogTrimCR(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval) + : RGWCoroutine(store->ctx()), store(store), http(http), + num_shards(num_shards), interval(interval), + zone(store->get_zone().id), + peer_status(store->zone_conn_map.size()), + min_shard_markers(num_shards), + last_trim(num_shards) +{ +} + +int RGWDataLogTrimCR::operate() +{ + reenter(this) { + for (;;) { + yield wait(interval); + + ldout(cct, 10) << "fetching sync status for zone " << zone << dendl; + yield { + // query data sync status from each sync peer + rgw_http_param_pair params[] = { + { "type", "data" }, + { "status", nullptr }, + { "source-zone", zone.c_str() }, + { nullptr, nullptr } + }; + + auto p = peer_status.begin(); + for (auto& c : store->zone_conn_map) { + ldout(cct, 20) << "query sync status from " << c.first << dendl; + using StatusCR = RGWReadRESTResourceCR; + spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), + false); + ++p; + } + } + + // must get a successful reply from all peers to consider trimming + ret = 0; + while (ret == 0 && num_spawned()) { + yield wait_for_child(); + collect_next(&ret); + } + if (ret < 0) { + ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; + drain_all(); + continue; + } + + ldout(cct, 10) << "trimming log shards" << dendl; + yield { + // determine the minimum marker for each shard + take_min_markers(peer_status.begin(), peer_status.end(), + min_shard_markers.begin()); + + for (int i = 0; i < num_shards; i++) { + const auto& m = min_shard_markers[i]; + auto& stable = get_stable_marker(m); + if (stable <= last_trim[i]) { + continue; + } + ldout(cct, 10) << "trimming log shard " << i + << " at marker=" << stable + << " last_trim=" << last_trim[i] << dendl; + using TrimCR = LastTimelogTrimCR; + spawn(new TrimCR(store, store->data_log->get_oid(i), + stable, &last_trim[i]), + true); + } + } + } + } + return 0; +} diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 1c950674c2245..07953c5d942c5 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -501,4 +501,21 @@ class RGWBucketSyncStatusManager { }; +class RGWDataLogTrimCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager *http; + const int num_shards; + const utime_t interval; //< polling interval + const std::string& zone; //< my zone id + std::vector peer_status; //< sync status for each peer + std::vector min_shard_markers; //< min marker per shard + std::vector last_trim; //< last trimmed marker per shard + int ret{0}; + + public: + RGWDataLogTrimCR(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval); + int operate() override; +}; + #endif From 3b674bbd865cb640266751480c86925a8c07e099 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Mon, 6 Jun 2016 16:47:28 -0400 Subject: [PATCH 13/13] rgw: add RGWSyncLogTrimThread to RGWRados Signed-off-by: Casey Bodley --- src/common/config_opts.h | 1 + src/rgw/rgw_rados.cc | 49 ++++++++++++++++++++++++++++++++++++++-- src/rgw/rgw_rados.h | 3 +++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 04ac61832e588..622c1361cc5b0 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1453,6 +1453,7 @@ OPTION(rgw_num_async_rados_threads, OPT_INT, 32) // num of threads to use for as OPTION(rgw_md_notify_interval_msec, OPT_INT, 200) // metadata changes notification interval to followers OPTION(rgw_run_sync_thread, OPT_BOOL, true) // whether radosgw (not radosgw-admin) spawns the sync thread OPTION(rgw_sync_lease_period, OPT_INT, 120) // time in second for lease that rgw takes on a specific log (or log shard) +OPTION(rgw_sync_log_trim_interval, OPT_INT, 1200) // time in seconds between attempts to trim sync logs OPTION(rgw_realm_reconfigure_delay, OPT_DOUBLE, 2) // seconds to wait before reloading realm configuration OPTION(rgw_period_push_interval, OPT_DOUBLE, 2) // seconds to wait before retrying "period push" diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 0b3b7ec1b3c8b..1ba550655778b 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -43,6 +43,9 @@ #include "rgw_coroutine.h" +#include "rgw_boost_asio_yield.h" +#undef fork // fails to compile RGWPeriod::fork() below + #include "common/Clock.h" #include "include/rados/librados.hpp" @@ -3028,6 +3031,33 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread } }; +class RGWSyncLogTrimThread : public RGWSyncProcessorThread +{ + RGWCoroutinesManager crs; + RGWRados *store; + RGWHTTPManager http; + const utime_t trim_interval; + + uint64_t interval_msec() override { return 0; } + void stop_process() override { crs.stop(); } +public: + RGWSyncLogTrimThread(RGWRados *store, int interval) + : RGWSyncProcessorThread(store), crs(store->ctx(), nullptr), store(store), + http(store->ctx(), crs.get_completion_mgr()), + trim_interval(interval, 0) + {} + + int init() override { + return http.set_threaded(); + } + int process() override { + crs.run(new RGWDataLogTrimCR(store, &http, + cct->_conf->rgw_data_log_num_shards, + trim_interval)); + return 0; + } +}; + void RGWRados::wakeup_meta_sync_shards(set& shard_ids) { Mutex::Locker l(meta_sync_thread_lock); @@ -3144,6 +3174,9 @@ void RGWRados::finalize() RGWDataSyncProcessorThread *thread = iter.second; thread->stop(); } + if (sync_log_trimmer) { + sync_log_trimmer->stop(); + } } if (async_rados) { async_rados->stop(); @@ -3157,6 +3190,8 @@ void RGWRados::finalize() delete thread; } data_sync_processor_threads.clear(); + delete sync_log_trimmer; + sync_log_trimmer = nullptr; } if (finisher) { finisher->stop(); @@ -3796,7 +3831,7 @@ int RGWRados::init_complete() meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_rados); ret = meta_sync_processor_thread->init(); if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to initialize" << dendl; + ldout(cct, 0) << "ERROR: failed to initialize meta sync thread" << dendl; return ret; } meta_sync_processor_thread->start(); @@ -3807,12 +3842,22 @@ int RGWRados::init_complete() RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter->first); ret = thread->init(); if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to initialize" << dendl; + ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl; return ret; } thread->start(); data_sync_processor_threads[iter->first] = thread; } + auto interval = cct->_conf->rgw_sync_log_trim_interval; + if (interval > 0) { + sync_log_trimmer = new RGWSyncLogTrimThread(this, interval); + ret = sync_log_trimmer->init(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to initialize sync log trim thread" << dendl; + return ret; + } + sync_log_trimmer->start(); + } } data_notifier = new RGWDataNotifier(this); data_notifier->start(); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index b4e43c4d9515e..4329e5fc52c8d 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -32,6 +32,7 @@ class RGWDataNotifier; class RGWObjectExpirer; class RGWMetaSyncProcessorThread; class RGWDataSyncProcessorThread; +class RGWSyncLogTrimThread; class RGWRESTConn; /* flags for put_obj_meta() */ @@ -1778,6 +1779,8 @@ class RGWRados RGWMetaSyncProcessorThread *meta_sync_processor_thread; map data_sync_processor_threads; + RGWSyncLogTrimThread *sync_log_trimmer{nullptr}; + Mutex meta_sync_thread_lock; Mutex data_sync_thread_lock;