diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 41678678f1d45..b9cc870a8e1fc 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1496,6 +1496,7 @@ OPTION(rgw_num_async_rados_threads, OPT_INT, 32) // num of threads to use for as OPTION(rgw_md_notify_interval_msec, OPT_INT, 200) // metadata changes notification interval to followers OPTION(rgw_run_sync_thread, OPT_BOOL, true) // whether radosgw (not radosgw-admin) spawns the sync thread OPTION(rgw_sync_lease_period, OPT_INT, 120) // time in second for lease that rgw takes on a specific log (or log shard) +OPTION(rgw_sync_log_trim_interval, OPT_INT, 1200) // time in seconds between attempts to trim sync logs OPTION(rgw_sync_data_inject_err_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(rgw_sync_meta_inject_err_probability, OPT_DOUBLE, 0) // range [0, 1] diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index ad6218080c2bd..a7a732f22f1df 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1795,14 +1795,13 @@ static void get_data_sync_status(const string& source_zone, list& status return; } - ret = sync.read_sync_status(); - if (ret < 0) { + rgw_data_sync_status sync_status; + ret = sync.read_sync_status(&sync_status); + if (ret < 0 && ret != -ENOENT) { push_ss(ss, status, tab) << string("failed read sync status: ") + cpp_strerror(-ret); return; } - const rgw_data_sync_status& sync_status = sync.get_sync_status(); - string status_str; switch (sync_status.sync_info.state) { case rgw_data_sync_info::StateInit: @@ -5140,14 +5139,13 @@ int main(int argc, char **argv) return -ret; } - ret = sync.read_sync_status(); - if (ret < 0) { + rgw_data_sync_status sync_status; + ret = sync.read_sync_status(&sync_status); + if (ret < 0 && ret != -ENOENT) { cerr << "ERROR: sync.read_sync_status() returned ret=" << ret << std::endl; return -ret; } - rgw_data_sync_status& sync_status = sync.get_sync_status(); - formatter->open_object_section("summary"); encode_json("sync_status", sync_status, formatter); diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 12182d12935ab..ee52ad626b7ec 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -466,6 +466,7 @@ class RGWDataChangesLog { ~RGWDataChangesLog(); int choose_oid(const rgw_bucket_shard& bs); + const std::string& get_oid(int shard_id) const { return oids[shard_id]; } int add_entry(rgw_bucket& bucket, int shard_id); int get_log_shard_id(rgw_bucket& bucket, int shard_id); int renew_entries(); diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 19fb94944d6d9..99f8427e45178 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -651,6 +651,47 @@ int RGWRadosTimelogAddCR::request_complete() return r; } +RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados *store, + const std::string& oid, + const real_time& start_time, + const real_time& end_time, + const std::string& from_marker, + const std::string& to_marker) + : RGWSimpleCoroutine(store->ctx()), store(store), oid(oid), + start_time(start_time), end_time(end_time), + from_marker(from_marker), to_marker(to_marker) +{ + set_description() << "timelog trim oid=" << oid + << " start_time=" << start_time << " end_time=" << end_time + << " from_marker=" << from_marker << " to_marker=" << to_marker; +} + +RGWRadosTimelogTrimCR::~RGWRadosTimelogTrimCR() +{ + if (cn) { + cn->put(); + } +} + +int RGWRadosTimelogTrimCR::send_request() +{ + set_status() << "sending request"; + + cn = stack->create_completion_notifier(); + cn->get(); + return store->time_log_trim(oid, start_time, end_time, from_marker, + to_marker, cn->completion()); +} + +int RGWRadosTimelogTrimCR::request_complete() +{ + int r = cn->completion()->get_return_value(); + + set_status() << "request complete; ret=" << r; + + return r; +} + int RGWAsyncStatObj::_send_request() { return store->raw_obj_stat(obj, psize, pmtime, pepoch, diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 24a284d63d527..a1cbfabc8dee3 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -180,22 +180,21 @@ class RGWSimpleRadosReadCR : public RGWSimpleCoroutine { rgw_bucket pool; string oid; - map *pattrs; + map *pattrs{nullptr}; T *result; + /// on ENOENT, call handle_data() with an empty object instead of failing + const bool empty_on_enoent; - RGWAsyncGetSystemObj *req; + RGWAsyncGetSystemObj *req{nullptr}; public: RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_bucket& _pool, const string& _oid, - T *_result) : RGWSimpleCoroutine(_store->ctx()), - async_rados(_async_rados), store(_store), - obj_ctx(store), - pool(_pool), oid(_oid), - pattrs(NULL), - result(_result), - req(NULL) { } + T *_result, bool empty_on_enoent = true) + : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), + obj_ctx(store), pool(_pool), oid(_oid), result(_result), + empty_on_enoent(empty_on_enoent) {} ~RGWSimpleRadosReadCR() { request_cleanup(); } @@ -235,7 +234,9 @@ int RGWSimpleRadosReadCR::request_complete() { int ret = req->get_ret_status(); retcode = ret; - if (ret != -ENOENT) { + if (ret == -ENOENT && empty_on_enoent) { + *result = T(); + } else { if (ret < 0) { return ret; } @@ -245,8 +246,6 @@ int RGWSimpleRadosReadCR::request_complete() } catch (buffer::error& err) { return -EIO; } - } else { - *result = T(); } return handle_data(*result); @@ -982,6 +981,27 @@ class RGWRadosTimelogAddCR : public RGWSimpleCoroutine { int request_complete(); }; +class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine { + RGWRados *store; + RGWAioCompletionNotifier *cn{nullptr}; + protected: + std::string oid; + real_time start_time; + real_time end_time; + std::string from_marker; + std::string to_marker; + + public: + RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid, + const real_time& start_time, const real_time& end_time, + const std::string& from_marker, + const std::string& to_marker); + ~RGWRadosTimelogTrimCR(); + + int send_request() override; + int request_complete() override; +}; + class RGWAsyncStatObj : public RGWAsyncRadosRequest { RGWRados *store; rgw_obj obj; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 607cb4663f7a5..4c82795a46b34 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -23,6 +23,9 @@ #define dout_subsys ceph_subsys_rgw +#undef dout_prefix +#define dout_prefix (*_dout << "data sync: ") + static string datalog_sync_status_oid_prefix = "datalog.sync-status"; static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard"; static string datalog_sync_full_sync_index_prefix = "data.full-sync.index"; @@ -112,34 +115,76 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) { JSONDecoder::decode_json("entries", entries, obj); }; -class RGWReadDataSyncStatusCoroutine : public RGWSimpleRadosReadCR { - RGWDataSyncEnv *sync_env; +class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + + RGWDataSyncEnv *env; + const int num_shards; + int shard_id{0}; + map& markers; + + public: + RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards, + map& markers) + : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), + env(env), num_shards(num_shards), markers(markers) + {} + bool spawn_next() override; +}; +bool RGWReadDataSyncStatusMarkersCR::spawn_next() +{ + if (shard_id >= num_shards) { + return false; + } + using CR = RGWSimpleRadosReadCR; + spawn(new CR(env->async_rados, env->store, env->store->get_zone_params().log_pool, + RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id), + &markers[shard_id]), + false); + shard_id++; + return true; +} + +class RGWReadDataSyncStatusCoroutine : public RGWCoroutine { + RGWDataSyncEnv *sync_env; rgw_data_sync_status *sync_status; public: RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, - rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_sync_env->async_rados, _sync_env->store, - _sync_env->store->get_zone_params().log_pool, - RGWDataSyncStatusManager::sync_status_oid(_sync_env->source_zone), - &_status->sync_info), - sync_env(_sync_env), - sync_status(_status) {} - - int handle_data(rgw_data_sync_info& data); + rgw_data_sync_status *_status) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status) + {} + int operate() override; }; -int RGWReadDataSyncStatusCoroutine::handle_data(rgw_data_sync_info& data) +int RGWReadDataSyncStatusCoroutine::operate() { - if (retcode == -ENOENT) { - return retcode; - } - - map& markers = sync_status->sync_markers; - RGWRados *store = sync_env->store; - for (int i = 0; i < (int)data.num_shards; i++) { - spawn(new RGWSimpleRadosReadCR(sync_env->async_rados, store, store->get_zone_params().log_pool, - RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i), &markers[i]), true); + reenter(this) { + // read sync info + using ReadInfoCR = RGWSimpleRadosReadCR; + yield { + bool empty_on_enoent = false; // fail on ENOENT + call(new ReadInfoCR(sync_env->async_rados, sync_env->store, + sync_env->store->get_zone_params().log_pool, + RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone), + &sync_status->sync_info, empty_on_enoent)); + } + if (retcode < 0) { + ldout(sync_env->cct, 4) << "failed to read sync status info with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + // read shard markers + using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR; + yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards, + sync_status->sync_markers)); + if (retcode < 0) { + ldout(sync_env->cct, 4) << "failed to read sync status markers with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + return set_cr_done(); } return 0; } @@ -594,16 +639,15 @@ int RGWRemoteDataLog::get_shard_info(int shard_id) int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status) { - int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, sync_status)); - if (r == -ENOENT) { - r = 0; - } - return r; + // cannot run concurrently with run_sync(), so run in a separate manager + RGWCoroutinesManager crs(store->ctx(), nullptr); + return crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env, sync_status)); } int RGWRemoteDataLog::init_sync_status(int num_shards) { - return run(new RGWInitDataSyncStatusCoroutine(&sync_env, num_shards)); + RGWCoroutinesManager crs(store->ctx(), nullptr); + return crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env, num_shards)); } static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id) @@ -1484,18 +1528,12 @@ void RGWRemoteDataLog::wakeup(int shard_id, set& keys) { data_sync_cr->wakeup(shard_id, keys); } -int RGWRemoteDataLog::run_sync(int num_shards, rgw_data_sync_status& sync_status) +int RGWRemoteDataLog::run_sync(int num_shards) { - int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, &sync_status)); - if (r < 0 && r != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to read sync status from source_zone=" << sync_env.source_zone << " r=" << r << dendl; - return r; - } - lock.get_write(); data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards); lock.unlock(); - r = run(data_sync_cr); + int r = run(data_sync_cr); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl; return r; @@ -2812,3 +2850,148 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone, return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key(); } + +#undef dout_prefix +#define dout_prefix (*_dout << "data trim: ") + +namespace { + +/// return the marker that it's safe to trim up to +const std::string& get_stable_marker(const rgw_data_sync_marker& m) +{ + return m.state == m.FullSync ? m.next_step_marker : m.marker; +} + +/// comparison operator for take_min_markers() +bool operator<(const rgw_data_sync_marker& lhs, + const rgw_data_sync_marker& rhs) +{ + // sort by stable marker + return get_stable_marker(lhs) < get_stable_marker(rhs); +} + +/// populate the container starting with 'dest' with the minimum stable marker +/// of each shard for all of the peers in [first, last) +template +void take_min_markers(IterIn first, IterIn last, IterOut dest) +{ + if (first == last) { + return; + } + // initialize markers with the first peer's + auto m = dest; + for (auto &shard : first->sync_markers) { + *m = std::move(shard.second); + ++m; + } + // for remaining peers, replace with smaller markers + for (auto p = first + 1; p != last; ++p) { + m = dest; + for (auto &shard : p->sync_markers) { + if (shard.second < *m) { + *m = std::move(shard.second); + } + ++m; + } + } +} + +// wrapper to update last_trim_marker on success +class LastTimelogTrimCR : public RGWRadosTimelogTrimCR { + CephContext *cct; + std::string *last_trim_marker; + public: + LastTimelogTrimCR(RGWRados *store, const std::string& oid, + const std::string& to_marker, std::string *last_trim_marker) + : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{}, + std::string{}, to_marker), + cct(store->ctx()), last_trim_marker(last_trim_marker) + {} + int request_complete() override { + int r = RGWRadosTimelogTrimCR::request_complete(); + if (r < 0 && r != -ENODATA) { + ldout(cct, 1) << "failed to trim datalog: " << cpp_strerror(r) << dendl; + return r; + } + ldout(cct, 10) << "datalog trimmed to marker " << to_marker << dendl; + *last_trim_marker = to_marker; + return 0; + } +}; + +} // anonymous namespace + +RGWDataLogTrimCR::RGWDataLogTrimCR(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval) + : RGWCoroutine(store->ctx()), store(store), http(http), + num_shards(num_shards), interval(interval), + zone(store->get_zone().id), + peer_status(store->zone_conn_map.size()), + min_shard_markers(num_shards), + last_trim(num_shards) +{ +} + +int RGWDataLogTrimCR::operate() +{ + reenter(this) { + for (;;) { + yield wait(interval); + + ldout(cct, 10) << "fetching sync status for zone " << zone << dendl; + yield { + // query data sync status from each sync peer + rgw_http_param_pair params[] = { + { "type", "data" }, + { "status", nullptr }, + { "source-zone", zone.c_str() }, + { nullptr, nullptr } + }; + + auto p = peer_status.begin(); + for (auto& c : store->zone_conn_map) { + ldout(cct, 20) << "query sync status from " << c.first << dendl; + using StatusCR = RGWReadRESTResourceCR; + spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), + false); + ++p; + } + } + + // must get a successful reply from all peers to consider trimming + ret = 0; + while (ret == 0 && num_spawned()) { + yield wait_for_child(); + collect_next(&ret); + } + if (ret < 0) { + ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl; + drain_all(); + continue; + } + + ldout(cct, 10) << "trimming log shards" << dendl; + yield { + // determine the minimum marker for each shard + take_min_markers(peer_status.begin(), peer_status.end(), + min_shard_markers.begin()); + + for (int i = 0; i < num_shards; i++) { + const auto& m = min_shard_markers[i]; + auto& stable = get_stable_marker(m); + if (stable <= last_trim[i]) { + continue; + } + ldout(cct, 10) << "trimming log shard " << i + << " at marker=" << stable + << " last_trim=" << last_trim[i] << dendl; + using TrimCR = LastTimelogTrimCR; + spawn(new TrimCR(store, store->data_log->get_oid(i), + stable, &last_trim[i]), + true); + } + } + } + } + return 0; +} diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index f3fc2f2a28367..07953c5d942c5 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -6,6 +6,7 @@ #include "rgw_bucket.h" #include "common/RWLock.h" +#include "common/ceph_json.h" struct rgw_datalog_info { @@ -59,6 +60,18 @@ struct rgw_data_sync_info { encode_json("status", s, f); encode_json("num_shards", num_shards, f); } + void decode_json(JSONObj *obj) { + std::string s; + JSONDecoder::decode_json("status", s, obj); + if (s == "building-full-sync-maps") { + state = StateBuildingFullSyncMaps; + } else if (s == "sync") { + state = StateSync; + } else { + state = StateInit; + } + JSONDecoder::decode_json("num_shards", num_shards, obj); + } rgw_data_sync_info() : state((int)StateInit), num_shards(0) {} }; @@ -108,6 +121,18 @@ struct rgw_data_sync_marker { encode_json("pos", pos, f); encode_json("timestamp", utime_t(timestamp), f); } + void decode_json(JSONObj *obj) { + int s; + JSONDecoder::decode_json("state", s, obj); + state = s; + JSONDecoder::decode_json("marker", marker, obj); + JSONDecoder::decode_json("next_step_marker", next_step_marker, obj); + JSONDecoder::decode_json("total_entries", total_entries, obj); + JSONDecoder::decode_json("pos", pos, obj); + utime_t t; + JSONDecoder::decode_json("timestamp", t, obj); + timestamp = t.to_real_time(); + } }; WRITE_CLASS_ENCODER(rgw_data_sync_marker) @@ -135,6 +160,10 @@ struct rgw_data_sync_status { encode_json("info", sync_info, f); encode_json("markers", sync_markers, f); } + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("info", sync_info, obj); + JSONDecoder::decode_json("markers", sync_markers, obj); + } }; WRITE_CLASS_ENCODER(rgw_data_sync_status) @@ -212,7 +241,7 @@ class RGWRemoteDataLog : public RGWCoroutinesManager { int get_shard_info(int shard_id); int read_sync_status(rgw_data_sync_status *sync_status); int init_sync_status(int num_shards); - int run_sync(int num_shards, rgw_data_sync_status& sync_status); + int run_sync(int num_shards); void wakeup(int shard_id, set& keys); }; @@ -231,7 +260,6 @@ class RGWDataSyncStatusManager { string source_shard_status_oid_prefix; rgw_obj source_status_obj; - rgw_data_sync_status sync_status; map shard_objs; int num_shards; @@ -247,12 +275,12 @@ class RGWDataSyncStatusManager { int init(); void finalize(); - rgw_data_sync_status& get_sync_status() { return sync_status; } - static string shard_obj_name(const string& source_zone, int shard_id); static string sync_status_oid(const string& source_zone); - int read_sync_status() { return source_log.read_sync_status(&sync_status); } + int read_sync_status(rgw_data_sync_status *sync_status) { + return source_log.read_sync_status(sync_status); + } int init_sync_status() { return source_log.init_sync_status(num_shards); } int read_log_info(rgw_datalog_info *log_info) { @@ -265,7 +293,7 @@ class RGWDataSyncStatusManager { return source_log.read_source_log_shards_next(shard_markers, result); } - int run() { return source_log.run_sync(num_shards, sync_status); } + int run() { return source_log.run_sync(num_shards); } void wakeup(int shard_id, set& keys) { return source_log.wakeup(shard_id, keys); } void stop() { @@ -473,4 +501,21 @@ class RGWBucketSyncStatusManager { }; +class RGWDataLogTrimCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager *http; + const int num_shards; + const utime_t interval; //< polling interval + const std::string& zone; //< my zone id + std::vector peer_status; //< sync status for each peer + std::vector min_shard_markers; //< min marker per shard + std::vector last_trim; //< last trimmed marker per shard + int ret{0}; + + public: + RGWDataLogTrimCR(RGWRados *store, RGWHTTPManager *http, + int num_shards, utime_t interval); + int operate() override; +}; + #endif diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 9bbc4628c666c..e26af715075af 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -45,6 +45,9 @@ #include "rgw_coroutine.h" +#include "rgw_boost_asio_yield.h" +#undef fork // fails to compile RGWPeriod::fork() below + #include "common/Clock.h" #include "include/rados/librados.hpp" @@ -2972,6 +2975,7 @@ class RGWMetaSyncProcessorThread : public RGWSyncProcessorThread sync.wakeup(*iter); } } + RGWMetaSyncStatusManager* get_manager() { return &sync; } int init() { int ret = sync.init(); @@ -3015,7 +3019,7 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread sync.wakeup(iter->first, iter->second); } } - + RGWDataSyncStatusManager* get_manager() { return &sync; } int init() { return 0; @@ -3039,6 +3043,33 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread } }; +class RGWSyncLogTrimThread : public RGWSyncProcessorThread +{ + RGWCoroutinesManager crs; + RGWRados *store; + RGWHTTPManager http; + const utime_t trim_interval; + + uint64_t interval_msec() override { return 0; } + void stop_process() override { crs.stop(); } +public: + RGWSyncLogTrimThread(RGWRados *store, int interval) + : RGWSyncProcessorThread(store), crs(store->ctx(), nullptr), store(store), + http(store->ctx(), crs.get_completion_mgr()), + trim_interval(interval, 0) + {} + + int init() override { + return http.set_threaded(); + } + int process() override { + crs.run(new RGWDataLogTrimCR(store, &http, + cct->_conf->rgw_data_log_num_shards, + trim_interval)); + return 0; + } +}; + void RGWRados::wakeup_meta_sync_shards(set& shard_ids) { Mutex::Locker l(meta_sync_thread_lock); @@ -3062,6 +3093,25 @@ void RGWRados::wakeup_data_sync_shards(const string& source_zone, mapwakeup_sync_shards(shard_ids); } +RGWMetaSyncStatusManager* RGWRados::get_meta_sync_manager() +{ + Mutex::Locker l(meta_sync_thread_lock); + if (meta_sync_processor_thread) { + return meta_sync_processor_thread->get_manager(); + } + return nullptr; +} + +RGWDataSyncStatusManager* RGWRados::get_data_sync_manager(const std::string& source_zone) +{ + Mutex::Locker l(data_sync_thread_lock); + auto thread = data_sync_processor_threads.find(source_zone); + if (thread == data_sync_processor_threads.end()) { + return nullptr; + } + return thread->second->get_manager(); +} + int RGWRados::get_required_alignment(rgw_bucket& bucket, uint64_t *alignment) { IoCtx ioctx; @@ -3136,6 +3186,9 @@ void RGWRados::finalize() RGWDataSyncProcessorThread *thread = iter.second; thread->stop(); } + if (sync_log_trimmer) { + sync_log_trimmer->stop(); + } } if (async_rados) { async_rados->stop(); @@ -3149,6 +3202,8 @@ void RGWRados::finalize() delete thread; } data_sync_processor_threads.clear(); + delete sync_log_trimmer; + sync_log_trimmer = nullptr; } if (finisher) { finisher->stop(); @@ -3815,7 +3870,7 @@ int RGWRados::init_complete() meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_rados); ret = meta_sync_processor_thread->init(); if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to initialize" << dendl; + ldout(cct, 0) << "ERROR: failed to initialize meta sync thread" << dendl; return ret; } meta_sync_processor_thread->start(); @@ -3826,12 +3881,22 @@ int RGWRados::init_complete() RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter->first); ret = thread->init(); if (ret < 0) { - ldout(cct, 0) << "ERROR: failed to initialize" << dendl; + ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl; return ret; } thread->start(); data_sync_processor_threads[iter->first] = thread; } + auto interval = cct->_conf->rgw_sync_log_trim_interval; + if (interval > 0) { + sync_log_trimmer = new RGWSyncLogTrimThread(this, interval); + ret = sync_log_trimmer->init(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to initialize sync log trim thread" << dendl; + return ret; + } + sync_log_trimmer->start(); + } } data_notifier = new RGWDataNotifier(this); data_notifier->start(); @@ -4663,7 +4728,8 @@ int RGWRados::time_log_info_async(librados::IoCtx& io_ctx, const string& oid, cl } int RGWRados::time_log_trim(const string& oid, const real_time& start_time, const real_time& end_time, - const string& from_marker, const string& to_marker) + const string& from_marker, const string& to_marker, + librados::AioCompletion *completion) { librados::IoCtx io_ctx; @@ -4676,7 +4742,15 @@ int RGWRados::time_log_trim(const string& oid, const real_time& start_time, cons utime_t st(start_time); utime_t et(end_time); - return cls_log_trim(io_ctx, oid, st, et, from_marker, to_marker); + ObjectWriteOperation op; + cls_log_trim(op, st, et, from_marker, to_marker); + + if (!completion) { + r = io_ctx.operate(oid, &op); + } else { + r = io_ctx.aio_operate(oid, completion, &op); + } + return r; } string RGWRados::objexp_hint_get_shardname(int shard_num) diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 65866ed0b2c23..469445298c1e7 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -33,6 +33,7 @@ class RGWLC; class RGWObjectExpirer; class RGWMetaSyncProcessorThread; class RGWDataSyncProcessorThread; +class RGWSyncLogTrimThread; class RGWRESTConn; /* flags for put_obj_meta() */ @@ -1546,6 +1547,8 @@ class RGWPeriod WRITE_CLASS_ENCODER(RGWPeriod) class RGWDataChangesLog; +class RGWMetaSyncStatusManager; +class RGWDataSyncStatusManager; class RGWReplicaLogger; class RGWCoroutinesManagerRegistry; @@ -1801,6 +1804,8 @@ class RGWRados RGWMetaSyncProcessorThread *meta_sync_processor_thread; map data_sync_processor_threads; + RGWSyncLogTrimThread *sync_log_trimmer{nullptr}; + Mutex meta_sync_thread_lock; Mutex data_sync_thread_lock; @@ -2616,6 +2621,9 @@ class RGWRados void wakeup_meta_sync_shards(set& shard_ids); void wakeup_data_sync_shards(const string& source_zone, map >& shard_ids); + RGWMetaSyncStatusManager* get_meta_sync_manager(); + RGWDataSyncStatusManager* get_data_sync_manager(const std::string& source_zone); + int set_bucket_owner(rgw_bucket& bucket, ACLOwner& owner); int set_buckets_enabled(std::vector& buckets, bool enabled); int bucket_suspended(rgw_bucket& bucket, bool *suspended); @@ -2851,7 +2859,8 @@ class RGWRados int time_log_info(const string& oid, cls_log_header *header); int time_log_info_async(librados::IoCtx& io_ctx, const string& oid, cls_log_header *header, librados::AioCompletion *completion); int time_log_trim(const string& oid, const ceph::real_time& start_time, const ceph::real_time& end_time, - const string& from_marker, const string& to_marker); + const string& from_marker, const string& to_marker, + librados::AioCompletion *completion = nullptr); string objexp_hint_get_shardname(int shard_num); int objexp_key_shard(const rgw_obj_key& key); diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index 6f8bc644418d1..cf2a1cdd67c48 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -18,6 +18,8 @@ #include "rgw_rest_s3.h" #include "rgw_rest_log.h" #include "rgw_client_io.h" +#include "rgw_sync.h" +#include "rgw_data_sync.h" #include "common/errno.h" #include "include/assert.h" @@ -837,6 +839,85 @@ void RGWOp_DATALog_Delete::execute() { http_ret = store->data_log->trim_entries(shard_id, ut_st, ut_et, start_marker, end_marker); } +// not in header to avoid pulling in rgw_sync.h +class RGWOp_MDLog_Status : public RGWRESTOp { + rgw_meta_sync_status status; +public: + int check_caps(RGWUserCaps& caps) override { + return caps.check_cap("mdlog", RGW_CAP_READ); + } + int verify_permission() override { + return check_caps(s->user->caps); + } + void execute() override; + void send_response() override; + const string name() override { return "get_metadata_log_status"; } +}; + +void RGWOp_MDLog_Status::execute() +{ + auto sync = store->get_meta_sync_manager(); + if (sync == nullptr) { + ldout(s->cct, 1) << "no sync manager" << dendl; + http_ret = -ENOENT; + return; + } + http_ret = sync->read_sync_status(); + status = sync->get_sync_status(); +} + +void RGWOp_MDLog_Status::send_response() +{ + set_req_state_err(s, http_ret); + dump_errno(s); + end_header(s); + + if (http_ret >= 0) { + encode_json("status", status, s->formatter); + } + flusher.flush(); +} + +// not in header to avoid pulling in rgw_data_sync.h +class RGWOp_DATALog_Status : public RGWRESTOp { + rgw_data_sync_status status; +public: + int check_caps(RGWUserCaps& caps) override { + return caps.check_cap("datalog", RGW_CAP_READ); + } + int verify_permission() override { + return check_caps(s->user->caps); + } + void execute() override ; + void send_response() override; + const string name() override { return "get_data_changes_log_status"; } +}; + +void RGWOp_DATALog_Status::execute() +{ + const auto source_zone = s->info.args.get("source-zone"); + auto sync = store->get_data_sync_manager(source_zone); + if (sync == nullptr) { + ldout(s->cct, 1) << "no sync manager for source-zone " << source_zone << dendl; + http_ret = -ENOENT; + return; + } + http_ret = sync->read_sync_status(&status); +} + +void RGWOp_DATALog_Status::send_response() +{ + set_req_state_err(s, http_ret); + dump_errno(s); + end_header(s); + + if (http_ret >= 0) { + encode_json("status", status, s->formatter); + } + flusher.flush(); +} + + RGWOp *RGWHandler_Log::op_get() { bool exists; string type = s->info.args.get("type", &exists); @@ -852,6 +933,8 @@ RGWOp *RGWHandler_Log::op_get() { } else { return new RGWOp_MDLog_List; } + } else if (s->info.args.exists("status")) { + return new RGWOp_MDLog_Status; } else { return new RGWOp_MDLog_Info; } @@ -868,6 +951,8 @@ RGWOp *RGWHandler_Log::op_get() { } else { return new RGWOp_DATALog_List; } + } else if (s->info.args.exists("status")) { + return new RGWOp_DATALog_Status; } else { return new RGWOp_DATALog_Info; } diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 8d6b1f5bc7110..3a500791e28b9 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -25,7 +25,7 @@ #define dout_subsys ceph_subsys_rgw #undef dout_prefix -#define dout_prefix (*_dout << "rgw meta sync: ") +#define dout_prefix (*_dout << "meta sync: ") static string mdlog_sync_status_oid = "mdlog.sync-status"; static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";