Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rgw multisite: use a rados lock to coordinate data log trimming #10546

Merged
merged 5 commits into from Nov 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 18 additions & 14 deletions src/rgw/rgw_cr_rados.h
Expand Up @@ -484,6 +484,13 @@ class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {

int send_request();
int request_complete();

static std::string gen_random_cookie(CephContext* cct) {
#define COOKIE_LEN 16
char buf[COOKIE_LEN + 1];
gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
return buf;
}
};

class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
Expand Down Expand Up @@ -1002,33 +1009,30 @@ class RGWContinuousLeaseCR : public RGWCoroutine {
RGWRados *store;

const rgw_bucket& pool;
string oid;
const string oid;

string lock_name;
string cookie;
const string lock_name;
const string cookie;

int interval;

Mutex lock;
atomic_t going_down;
bool locked;
bool locked{false};

RGWCoroutine *caller;

bool aborted;
bool aborted{false};

public:
RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
const rgw_bucket& _pool, const string& _oid,
const string& _lock_name, int _interval, RGWCoroutine *_caller) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
pool(_pool), oid(_oid), lock_name(_lock_name), interval(_interval),
lock("RGWContimuousLeaseCR"), locked(false), caller(_caller), aborted(false) {
#define COOKIE_LEN 16
char buf[COOKIE_LEN + 1];

gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
cookie = buf;
}
const string& _lock_name, int _interval, RGWCoroutine *_caller)
: RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
pool(_pool), oid(_oid), lock_name(_lock_name),
cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller)
{}

int operate();

Expand Down
199 changes: 136 additions & 63 deletions src/rgw/rgw_data_sync.cc
Expand Up @@ -641,13 +641,13 @@ int RGWRemoteDataLog::get_shard_info(int shard_id)
int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
{
// cannot run concurrently with run_sync(), so run in a separate manager
RGWCoroutinesManager crs(store->ctx(), nullptr);
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
return crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env, sync_status));
}

int RGWRemoteDataLog::init_sync_status(int num_shards)
{
RGWCoroutinesManager crs(store->ctx(), nullptr);
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
return crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env, num_shards));
}

Expand Down Expand Up @@ -2981,79 +2981,152 @@ class LastTimelogTrimCR : public RGWRadosTimelogTrimCR {
}
};

} // anonymous namespace
class DataLogTrimCR : public RGWCoroutine {
RGWRados *store;
RGWHTTPManager *http;
const int num_shards;
const std::string& zone_id; //< my zone id
std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
std::vector<std::string>& last_trim; //< last trimmed marker per shard
int ret{0};

RGWDataLogTrimCR::RGWDataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
int num_shards, utime_t interval)
: RGWCoroutine(store->ctx()), store(store), http(http),
num_shards(num_shards), interval(interval),
zone(store->get_zone().id),
peer_status(store->zone_conn_map.size()),
min_shard_markers(num_shards),
last_trim(num_shards)
{
}
public:
DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
int num_shards, std::vector<std::string>& last_trim)
: RGWCoroutine(store->ctx()), store(store), http(http),
num_shards(num_shards),
zone_id(store->get_zone().id),
peer_status(store->zone_conn_map.size()),
min_shard_markers(num_shards),
last_trim(last_trim)
{}

int RGWDataLogTrimCR::operate()
int operate() override;
};

int DataLogTrimCR::operate()
{
reenter(this) {
for (;;) {
yield wait(interval);
ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
set_status("fetching sync status");
yield {
// query data sync status from each sync peer
rgw_http_param_pair params[] = {
{ "type", "data" },
{ "status", nullptr },
{ "source-zone", zone_id.c_str() },
{ nullptr, nullptr }
};

auto p = peer_status.begin();
for (auto& c : store->zone_conn_map) {
ldout(cct, 20) << "query sync status from " << c.first << dendl;
using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
false);
++p;
}
}

// must get a successful reply from all peers to consider trimming
ret = 0;
while (ret == 0 && num_spawned() > 0) {
yield wait_for_child();
collect_next(&ret);
}
drain_all();

ldout(cct, 10) << "fetching sync status for zone " << zone << dendl;
yield {
// query data sync status from each sync peer
rgw_http_param_pair params[] = {
{ "type", "data" },
{ "status", nullptr },
{ "source-zone", zone.c_str() },
{ nullptr, nullptr }
};

auto p = peer_status.begin();
for (auto& c : store->zone_conn_map) {
ldout(cct, 20) << "query sync status from " << c.first << dendl;
using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
false);
++p;
if (ret < 0) {
ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
return set_cr_error(ret);
}

ldout(cct, 10) << "trimming log shards" << dendl;
set_status("trimming log shards");
yield {
// determine the minimum marker for each shard
take_min_markers(peer_status.begin(), peer_status.end(),
min_shard_markers.begin());

for (int i = 0; i < num_shards; i++) {
const auto& m = min_shard_markers[i];
auto& stable = get_stable_marker(m);
if (stable <= last_trim[i]) {
continue;
}
ldout(cct, 10) << "trimming log shard " << i
<< " at marker=" << stable
<< " last_trim=" << last_trim[i] << dendl;
using TrimCR = LastTimelogTrimCR;
spawn(new TrimCR(store, store->data_log->get_oid(i),
stable, &last_trim[i]),
true);
}
}
return set_cr_done();
}
return 0;
}

// must get a successful reply from all peers to consider trimming
ret = 0;
while (ret == 0 && num_spawned()) {
yield wait_for_child();
collect_next(&ret);
}
if (ret < 0) {
ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
drain_all();
class DataLogTrimPollCR : public RGWCoroutine {
RGWRados *store;
RGWHTTPManager *http;
const int num_shards;
const utime_t interval; //< polling interval
const std::string lock_oid; //< use first data log shard for lock
const std::string lock_cookie;
std::vector<std::string> last_trim; //< last trimmed marker per shard

public:
DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
int num_shards, utime_t interval)
: RGWCoroutine(store->ctx()), store(store), http(http),
num_shards(num_shards), interval(interval),
lock_oid(store->data_log->get_oid(0)),
lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
last_trim(num_shards)
{}

int operate();
};

int DataLogTrimPollCR::operate()
{
reenter(this) {
for (;;) {
set_status("sleeping");
wait(interval);

// request a 'data_trim' lock that covers the entire wait interval to
// prevent other gateways from attempting to trim for the duration
set_status("acquiring trim lock");
yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cbodley, maybe use RGWContinuousLeaseCR instead? and need to shut it down when done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was by design - if there are multiple gateways in a zone, we only want one of them to attempt trimming each rgw_sync_log_trim_interval. this is accomplished by leaving the lock held for the duration

(we exchanged email about it with the subject line "lease for multisite lock trimming")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cbodley right.. I see that discussion. I'm not sure I like the idea of just firing a lease without making the effort of releasing it. Is it a problem releasing it at the end here? Or is it just so that we don't trigger the trim code more frequent that originally planned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a problem releasing it at the end here? Or is it just so that we don't trigger the trim code more frequent that originally planned?

right. if we release the lock then all of the gateways would try to trim, when we only want it to happen once per trim interval

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yehudasa do i you still object to this appoach? any ideas for a better solution?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cbodley I'm not happy with this one, but let's just comment it clearly

store->get_zone_params().log_pool,
lock_oid, "data_trim", lock_cookie,
interval.sec()));
if (retcode < 0) {
// if the lock is already held, go back to sleep and try again later
ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
<< interval.sec() << "s" << dendl;
continue;
}

ldout(cct, 10) << "trimming log shards" << dendl;
yield {
// determine the minimum marker for each shard
take_min_markers(peer_status.begin(), peer_status.end(),
min_shard_markers.begin());

for (int i = 0; i < num_shards; i++) {
const auto& m = min_shard_markers[i];
auto& stable = get_stable_marker(m);
if (stable <= last_trim[i]) {
continue;
}
ldout(cct, 10) << "trimming log shard " << i
<< " at marker=" << stable
<< " last_trim=" << last_trim[i] << dendl;
using TrimCR = LastTimelogTrimCR;
spawn(new TrimCR(store, store->data_log->get_oid(i),
stable, &last_trim[i]),
true);
}
}
set_status("trimming");
yield call(new DataLogTrimCR(store, http, num_shards, last_trim));

// note that the lock is not released. this is intentional, as it avoids
// duplicating this work in other gateways
}
}
return 0;
}

} // anonymous namespace

RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
RGWHTTPManager *http,
int num_shards, utime_t interval)
{
return new DataLogTrimPollCR(store, http, num_shards, interval);
}
21 changes: 4 additions & 17 deletions src/rgw/rgw_data_sync.h
Expand Up @@ -529,22 +529,9 @@ class RGWDefaultSyncModule : public RGWSyncModule {
int create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) override;
};


class RGWDataLogTrimCR : public RGWCoroutine {
RGWRados *store;
RGWHTTPManager *http;
const int num_shards;
const utime_t interval; //< polling interval
const std::string& zone; //< my zone id
std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
std::vector<std::string> last_trim; //< last trimmed marker per shard
int ret{0};

public:
RGWDataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
int num_shards, utime_t interval);
int operate() override;
};
// DataLogTrimCR factory function
extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
RGWHTTPManager *http,
int num_shards, utime_t interval);

#endif
9 changes: 5 additions & 4 deletions src/rgw/rgw_rados.cc
Expand Up @@ -3074,7 +3074,8 @@ class RGWSyncLogTrimThread : public RGWSyncProcessorThread
void stop_process() override { crs.stop(); }
public:
RGWSyncLogTrimThread(RGWRados *store, int interval)
: RGWSyncProcessorThread(store), crs(store->ctx(), nullptr), store(store),
: RGWSyncProcessorThread(store),
crs(store->ctx(), store->get_cr_registry()), store(store),
http(store->ctx(), crs.get_completion_mgr()),
trim_interval(interval, 0)
{}
Expand All @@ -3083,9 +3084,9 @@ class RGWSyncLogTrimThread : public RGWSyncProcessorThread
return http.set_threaded();
}
int process() override {
crs.run(new RGWDataLogTrimCR(store, &http,
cct->_conf->rgw_data_log_num_shards,
trim_interval));
crs.run(create_data_log_trim_cr(store, &http,
cct->_conf->rgw_data_log_num_shards,
trim_interval));
return 0;
}
};
Expand Down