Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jewel: rgw: object expirer's hints might be trimmed without processing in some circumstances #10763

Merged
merged 3 commits into from Aug 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 35 additions & 21 deletions src/rgw/rgw_object_expirer_core.cc
Expand Up @@ -124,29 +124,34 @@ void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries, /*
}

void RGWObjectExpirer::trim_chunk(const string& shard,
const utime_t& from,
const utime_t& to)
const utime_t& from,
const utime_t& to,
const string& from_marker,
const string& to_marker)
{
ldout(store->ctx(), 20) << "trying to trim removal hints to " << to << dendl;
ldout(store->ctx(), 20) << "trying to trim removal hints to=" << to
<< ", to_marker=" << to_marker << dendl;

real_time rt_from = from.to_real_time();
real_time rt_to = to.to_real_time();

int ret = store->objexp_hint_trim(shard, rt_from, rt_to);
int ret = store->objexp_hint_trim(shard, rt_from, rt_to,
from_marker, to_marker);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR during trim: " << ret << dendl;
}

return;
}

void RGWObjectExpirer::process_single_shard(const string& shard,
const utime_t& last_run,
const utime_t& round_start)
bool RGWObjectExpirer::process_single_shard(const string& shard,
const utime_t& last_run,
const utime_t& round_start)
{
string marker;
string out_marker;
bool truncated = false;
bool done = true;

CephContext *cct = store->ctx();
int num_entries = cct->_conf->rgw_objexp_chunk_size;
Expand All @@ -163,57 +168,63 @@ void RGWObjectExpirer::process_single_shard(const string& shard,
int ret = l.lock_exclusive(&store->objexp_pool_ctx, shard);
if (ret == -EBUSY) { /* already locked by another processor */
dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
return;
return false;
}

do {
real_time rt_last = last_run.to_real_time();
real_time rt_start = round_start.to_real_time();

list<cls_timeindex_entry> entries;
ret = store->objexp_hint_list(shard, rt_last, rt_start,
num_entries, marker, entries,
&out_marker, &truncated);
num_entries, marker, entries,
&out_marker, &truncated);
if (ret < 0) {
ldout(cct, 10) << "cannot get removal hints from shard: " << shard << dendl;
ldout(cct, 10) << "cannot get removal hints from shard: " << shard
<< dendl;
continue;
}

bool need_trim;
garbage_chunk(entries, need_trim);

if (need_trim) {
trim_chunk(shard, last_run, round_start);
trim_chunk(shard, last_run, round_start, marker, out_marker);
}

utime_t now = ceph_clock_now(g_ceph_context);
if (now >= end) {
done = false;
break;
}

marker = out_marker;
} while (truncated);

l.unlock(&store->objexp_pool_ctx, shard);
return;
return done;
}

void RGWObjectExpirer::inspect_all_shards(const utime_t& last_run, const utime_t& round_start)
/* Returns true if all shards have been processed successfully. */
bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
const utime_t& round_start)
{
utime_t shard_marker;

CephContext *cct = store->ctx();
CephContext * const cct = store->ctx();
int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
bool all_done = true;

for (int i = 0; i < num_shards; i++) {
string shard;
store->objexp_get_shard(i, shard);

ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl;

process_single_shard(shard, last_run, round_start);
if (! process_single_shard(shard, last_run, round_start)) {
all_done = false;
}
}

return;
return all_done;
}

bool RGWObjectExpirer::going_down()
Expand Down Expand Up @@ -243,10 +254,13 @@ void *RGWObjectExpirer::OEWorker::entry() {
do {
utime_t start = ceph_clock_now(cct);
ldout(cct, 2) << "object expiration: start" << dendl;
oe->inspect_all_shards(last_run, start);
if (oe->inspect_all_shards(last_run, start)) {
/* All shards have been processed properly. Next time we can start
* from this moment. */
last_run = start;
}
ldout(cct, 2) << "object expiration: stop" << dendl;

last_run = start;

if (oe->going_down())
break;
Expand Down
30 changes: 19 additions & 11 deletions src/rgw/rgw_object_expirer_core.h
Expand Up @@ -41,9 +41,9 @@ class RGWObjectExpirer {
protected:
RGWRados *store;

int init_bucket_info(const string& tenant_name,
const string& bucket_name,
const string& bucket_id,
int init_bucket_info(const std::string& tenant_name,
const std::string& bucket_name,
const std::string& bucket_id,
RGWBucketInfo& bucket_info);

class OEWorker : public Thread {
Expand All @@ -53,7 +53,13 @@ class RGWObjectExpirer {
Cond cond;

public:
OEWorker(CephContext *_cct, RGWObjectExpirer *_oe) : cct(_cct), oe(_oe), lock("OEWorker") {}
OEWorker(CephContext * const cct,
RGWObjectExpirer * const oe)
: cct(cct),
oe(oe),
lock("OEWorker") {
}

void *entry();
void stop();
};
Expand All @@ -63,23 +69,25 @@ class RGWObjectExpirer {

public:
explicit RGWObjectExpirer(RGWRados *_store)
: store(_store)
{}
: store(_store) {
}

int garbage_single_object(objexp_hint_entry& hint);

void garbage_chunk(list<cls_timeindex_entry>& entries, /* in */
void garbage_chunk(std::list<cls_timeindex_entry>& entries, /* in */
bool& need_trim); /* out */

void trim_chunk(const string& shard,
void trim_chunk(const std::string& shard,
const utime_t& from,
const utime_t& to);
const utime_t& to,
const string& from_marker,
const string& to_marker);

void process_single_shard(const string& shard,
bool process_single_shard(const std::string& shard,
const utime_t& last_run,
const utime_t& round_start);

void inspect_all_shards(const utime_t& last_run,
bool inspect_all_shards(const utime_t& last_run,
const utime_t& round_start);

bool going_down();
Expand Down