Skip to content

Commit

Permalink
Merge pull request #10763: jewel: rgw: object expirer's hints might b…
Browse files Browse the repository at this point in the history
…e trimmed without processing in some circumstances

Reviewed-by: Loic Dachary <ldachary@redhat.com>
  • Loading branch information
Loic Dachary committed Aug 24, 2016
2 parents 7036418 + f298643 commit 1edbb4f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 32 deletions.
56 changes: 35 additions & 21 deletions src/rgw/rgw_object_expirer_core.cc
Expand Up @@ -124,29 +124,34 @@ void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries, /*
}

void RGWObjectExpirer::trim_chunk(const string& shard,
const utime_t& from,
const utime_t& to)
const utime_t& from,
const utime_t& to,
const string& from_marker,
const string& to_marker)
{
ldout(store->ctx(), 20) << "trying to trim removal hints to " << to << dendl;
ldout(store->ctx(), 20) << "trying to trim removal hints to=" << to
<< ", to_marker=" << to_marker << dendl;

real_time rt_from = from.to_real_time();
real_time rt_to = to.to_real_time();

int ret = store->objexp_hint_trim(shard, rt_from, rt_to);
int ret = store->objexp_hint_trim(shard, rt_from, rt_to,
from_marker, to_marker);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR during trim: " << ret << dendl;
}

return;
}

void RGWObjectExpirer::process_single_shard(const string& shard,
const utime_t& last_run,
const utime_t& round_start)
bool RGWObjectExpirer::process_single_shard(const string& shard,
const utime_t& last_run,
const utime_t& round_start)
{
string marker;
string out_marker;
bool truncated = false;
bool done = true;

CephContext *cct = store->ctx();
int num_entries = cct->_conf->rgw_objexp_chunk_size;
Expand All @@ -163,57 +168,63 @@ void RGWObjectExpirer::process_single_shard(const string& shard,
int ret = l.lock_exclusive(&store->objexp_pool_ctx, shard);
if (ret == -EBUSY) { /* already locked by another processor */
dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
return;
return false;
}

do {
real_time rt_last = last_run.to_real_time();
real_time rt_start = round_start.to_real_time();

list<cls_timeindex_entry> entries;
ret = store->objexp_hint_list(shard, rt_last, rt_start,
num_entries, marker, entries,
&out_marker, &truncated);
num_entries, marker, entries,
&out_marker, &truncated);
if (ret < 0) {
ldout(cct, 10) << "cannot get removal hints from shard: " << shard << dendl;
ldout(cct, 10) << "cannot get removal hints from shard: " << shard
<< dendl;
continue;
}

bool need_trim;
garbage_chunk(entries, need_trim);

if (need_trim) {
trim_chunk(shard, last_run, round_start);
trim_chunk(shard, last_run, round_start, marker, out_marker);
}

utime_t now = ceph_clock_now(g_ceph_context);
if (now >= end) {
done = false;
break;
}

marker = out_marker;
} while (truncated);

l.unlock(&store->objexp_pool_ctx, shard);
return;
return done;
}

void RGWObjectExpirer::inspect_all_shards(const utime_t& last_run, const utime_t& round_start)
/* Returns true if all shards have been processed successfully. */
bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
const utime_t& round_start)
{
utime_t shard_marker;

CephContext *cct = store->ctx();
CephContext * const cct = store->ctx();
int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
bool all_done = true;

for (int i = 0; i < num_shards; i++) {
string shard;
store->objexp_get_shard(i, shard);

ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl;

process_single_shard(shard, last_run, round_start);
if (! process_single_shard(shard, last_run, round_start)) {
all_done = false;
}
}

return;
return all_done;
}

bool RGWObjectExpirer::going_down()
Expand Down Expand Up @@ -243,10 +254,13 @@ void *RGWObjectExpirer::OEWorker::entry() {
do {
utime_t start = ceph_clock_now(cct);
ldout(cct, 2) << "object expiration: start" << dendl;
oe->inspect_all_shards(last_run, start);
if (oe->inspect_all_shards(last_run, start)) {
/* All shards have been processed properly. Next time we can start
* from this moment. */
last_run = start;
}
ldout(cct, 2) << "object expiration: stop" << dendl;

last_run = start;

if (oe->going_down())
break;
Expand Down
30 changes: 19 additions & 11 deletions src/rgw/rgw_object_expirer_core.h
Expand Up @@ -41,9 +41,9 @@ class RGWObjectExpirer {
protected:
RGWRados *store;

int init_bucket_info(const string& tenant_name,
const string& bucket_name,
const string& bucket_id,
int init_bucket_info(const std::string& tenant_name,
const std::string& bucket_name,
const std::string& bucket_id,
RGWBucketInfo& bucket_info);

class OEWorker : public Thread {
Expand All @@ -53,7 +53,13 @@ class RGWObjectExpirer {
Cond cond;

public:
OEWorker(CephContext *_cct, RGWObjectExpirer *_oe) : cct(_cct), oe(_oe), lock("OEWorker") {}
OEWorker(CephContext * const cct,
RGWObjectExpirer * const oe)
: cct(cct),
oe(oe),
lock("OEWorker") {
}

void *entry();
void stop();
};
Expand All @@ -63,23 +69,25 @@ class RGWObjectExpirer {

public:
explicit RGWObjectExpirer(RGWRados *_store)
: store(_store)
{}
: store(_store) {
}

int garbage_single_object(objexp_hint_entry& hint);

void garbage_chunk(list<cls_timeindex_entry>& entries, /* in */
void garbage_chunk(std::list<cls_timeindex_entry>& entries, /* in */
bool& need_trim); /* out */

void trim_chunk(const string& shard,
void trim_chunk(const std::string& shard,
const utime_t& from,
const utime_t& to);
const utime_t& to,
const string& from_marker,
const string& to_marker);

void process_single_shard(const string& shard,
bool process_single_shard(const std::string& shard,
const utime_t& last_run,
const utime_t& round_start);

void inspect_all_shards(const utime_t& last_run,
bool inspect_all_shards(const utime_t& last_run,
const utime_t& round_start);

bool going_down();
Expand Down

0 comments on commit 1edbb4f

Please sign in to comment.