Skip to content

Commit

Permalink
Merge pull request #10372 from cbodley/wip-rgw-data-log-trim
Browse files Browse the repository at this point in the history
rgw multisite: trim data logs as peer zones catch up

Reviewed-by: Yehuda Sadeh <yehuda@redhat.com>
  • Loading branch information
yehudasa committed Oct 4, 2016
2 parents 9670719 + 3b674bb commit b974dac
Show file tree
Hide file tree
Showing 11 changed files with 524 additions and 67 deletions.
1 change: 1 addition & 0 deletions src/common/config_opts.h
Expand Up @@ -1496,6 +1496,7 @@ OPTION(rgw_num_async_rados_threads, OPT_INT, 32) // num of threads to use for as
OPTION(rgw_md_notify_interval_msec, OPT_INT, 200) // metadata changes notification interval to followers
OPTION(rgw_run_sync_thread, OPT_BOOL, true) // whether radosgw (not radosgw-admin) spawns the sync thread
OPTION(rgw_sync_lease_period, OPT_INT, 120) // time in second for lease that rgw takes on a specific log (or log shard)
OPTION(rgw_sync_log_trim_interval, OPT_INT, 1200) // time in seconds between attempts to trim sync logs

OPTION(rgw_sync_data_inject_err_probability, OPT_DOUBLE, 0) // range [0, 1]
OPTION(rgw_sync_meta_inject_err_probability, OPT_DOUBLE, 0) // range [0, 1]
Expand Down
14 changes: 6 additions & 8 deletions src/rgw/rgw_admin.cc
Expand Up @@ -1795,14 +1795,13 @@ static void get_data_sync_status(const string& source_zone, list<string>& status
return;
}

ret = sync.read_sync_status();
if (ret < 0) {
rgw_data_sync_status sync_status;
ret = sync.read_sync_status(&sync_status);
if (ret < 0 && ret != -ENOENT) {
push_ss(ss, status, tab) << string("failed read sync status: ") + cpp_strerror(-ret);
return;
}

const rgw_data_sync_status& sync_status = sync.get_sync_status();

string status_str;
switch (sync_status.sync_info.state) {
case rgw_data_sync_info::StateInit:
Expand Down Expand Up @@ -5140,14 +5139,13 @@ int main(int argc, char **argv)
return -ret;
}

ret = sync.read_sync_status();
if (ret < 0) {
rgw_data_sync_status sync_status;
ret = sync.read_sync_status(&sync_status);
if (ret < 0 && ret != -ENOENT) {
cerr << "ERROR: sync.read_sync_status() returned ret=" << ret << std::endl;
return -ret;
}

rgw_data_sync_status& sync_status = sync.get_sync_status();

formatter->open_object_section("summary");
encode_json("sync_status", sync_status, formatter);

Expand Down
1 change: 1 addition & 0 deletions src/rgw/rgw_bucket.h
Expand Up @@ -466,6 +466,7 @@ class RGWDataChangesLog {
~RGWDataChangesLog();

int choose_oid(const rgw_bucket_shard& bs);
const std::string& get_oid(int shard_id) const { return oids[shard_id]; }
int add_entry(rgw_bucket& bucket, int shard_id);
int get_log_shard_id(rgw_bucket& bucket, int shard_id);
int renew_entries();
Expand Down
41 changes: 41 additions & 0 deletions src/rgw/rgw_cr_rados.cc
Expand Up @@ -651,6 +651,47 @@ int RGWRadosTimelogAddCR::request_complete()
return r;
}

RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados *store,
const std::string& oid,
const real_time& start_time,
const real_time& end_time,
const std::string& from_marker,
const std::string& to_marker)
: RGWSimpleCoroutine(store->ctx()), store(store), oid(oid),
start_time(start_time), end_time(end_time),
from_marker(from_marker), to_marker(to_marker)
{
set_description() << "timelog trim oid=" << oid
<< " start_time=" << start_time << " end_time=" << end_time
<< " from_marker=" << from_marker << " to_marker=" << to_marker;
}

RGWRadosTimelogTrimCR::~RGWRadosTimelogTrimCR()
{
if (cn) {
cn->put();
}
}

int RGWRadosTimelogTrimCR::send_request()
{
set_status() << "sending request";

cn = stack->create_completion_notifier();
cn->get();
return store->time_log_trim(oid, start_time, end_time, from_marker,
to_marker, cn->completion());
}

int RGWRadosTimelogTrimCR::request_complete()
{
int r = cn->completion()->get_return_value();

set_status() << "request complete; ret=" << r;

return r;
}

int RGWAsyncStatObj::_send_request()
{
return store->raw_obj_stat(obj, psize, pmtime, pepoch,
Expand Down
44 changes: 32 additions & 12 deletions src/rgw/rgw_cr_rados.h
Expand Up @@ -180,22 +180,21 @@ class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
rgw_bucket pool;
string oid;

map<string, bufferlist> *pattrs;
map<string, bufferlist> *pattrs{nullptr};

T *result;
/// on ENOENT, call handle_data() with an empty object instead of failing
const bool empty_on_enoent;

RGWAsyncGetSystemObj *req;
RGWAsyncGetSystemObj *req{nullptr};

public:
RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
const rgw_bucket& _pool, const string& _oid,
T *_result) : RGWSimpleCoroutine(_store->ctx()),
async_rados(_async_rados), store(_store),
obj_ctx(store),
pool(_pool), oid(_oid),
pattrs(NULL),
result(_result),
req(NULL) { }
T *_result, bool empty_on_enoent = true)
: RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
obj_ctx(store), pool(_pool), oid(_oid), result(_result),
empty_on_enoent(empty_on_enoent) {}
~RGWSimpleRadosReadCR() {
request_cleanup();
}
Expand Down Expand Up @@ -235,7 +234,9 @@ int RGWSimpleRadosReadCR<T>::request_complete()
{
int ret = req->get_ret_status();
retcode = ret;
if (ret != -ENOENT) {
if (ret == -ENOENT && empty_on_enoent) {
*result = T();
} else {
if (ret < 0) {
return ret;
}
Expand All @@ -245,8 +246,6 @@ int RGWSimpleRadosReadCR<T>::request_complete()
} catch (buffer::error& err) {
return -EIO;
}
} else {
*result = T();
}

return handle_data(*result);
Expand Down Expand Up @@ -982,6 +981,27 @@ class RGWRadosTimelogAddCR : public RGWSimpleCoroutine {
int request_complete();
};

class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
RGWRados *store;
RGWAioCompletionNotifier *cn{nullptr};
protected:
std::string oid;
real_time start_time;
real_time end_time;
std::string from_marker;
std::string to_marker;

public:
RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
const real_time& start_time, const real_time& end_time,
const std::string& from_marker,
const std::string& to_marker);
~RGWRadosTimelogTrimCR();

int send_request() override;
int request_complete() override;
};

class RGWAsyncStatObj : public RGWAsyncRadosRequest {
RGWRados *store;
rgw_obj obj;
Expand Down

0 comments on commit b974dac

Please sign in to comment.