Skip to content

Commit

Permalink
rgw: add support for noncurrentversion expiration in s3 lifecycle.
Browse files Browse the repository at this point in the history
Fixes: http://tracker.ceph.com/issues/18916

Signed-off-by: Zhang Shaowen <zhangshaowen@cmss.chinamobile.com>
  • Loading branch information
Zhang Shaowen committed Feb 13, 2017
1 parent 148d488 commit 638a1cb
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 114 deletions.
242 changes: 145 additions & 97 deletions src/rgw/rgw_lc.cc
Expand Up @@ -36,7 +36,13 @@ bool LCRule::validate()
if (id.length() > MAX_ID_LEN) {
return false;
}
else if (expiration.get_days() <= 0) {
else if(expiration.empty() && noncur_expiration.empty()) {
return false;
}
else if (!expiration.empty() && expiration.get_days() <= 0) {
return false;
}
else if (!noncur_expiration.empty() && noncur_expiration.get_days() <=0) {
return false;
}
return true;
Expand All @@ -47,12 +53,22 @@ void RGWLifecycleConfiguration::add_rule(LCRule *rule)
string id;
rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups
rule_map.insert(pair<string, LCRule>(id, *rule));
_add_rule(rule);
}

void RGWLifecycleConfiguration::_add_rule(LCRule *rule)
bool RGWLifecycleConfiguration::_add_rule(LCRule *rule)
{
prefix_map[rule->get_prefix()] = rule->get_expiration().get_days();
lc_op op;
if (rule->get_status().compare("Enabled") == 0) {
op.status = true;
}
if (!rule->get_expiration().empty()) {
op.expiration = rule->get_expiration().get_days();
}
if (!rule->get_noncur_expiration().empty()) {
op.noncur_expiration = rule->get_noncur_expiration().get_days();
}
auto ret = prefix_map.insert(pair<string, lc_op>(rule->get_prefix(), op));
return ret.second;
}

int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
Expand All @@ -67,9 +83,7 @@ int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
}
rule_map.insert(pair<string, LCRule>(id, *rule));

auto ret = prefix_map.insert(pair<string, int>(rule->get_prefix(), rule->get_expiration().get_days()));
//Now prefix shouldn't be the same. When we add noncurrent expiration or other action, prefix may be same.
if (!ret.second) {
if (!_add_rule(rule)) {
return -ERR_INVALID_REQUEST;
}
return 0;
Expand All @@ -82,15 +96,24 @@ bool RGWLifecycleConfiguration::validate()
if (prefix_map.size() < 2) {
return true;
}
auto next_iter = prefix_map.begin();
auto cur_iter = next_iter++;
while (next_iter != prefix_map.end()) {
string c_pre = cur_iter->first;
string n_pre = next_iter->first;
if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
return false;
}
auto cur_iter = prefix_map.begin();
while (cur_iter != prefix_map.end()) {
auto next_iter = cur_iter;
++next_iter;
while (next_iter != prefix_map.end()) {
string c_pre = cur_iter->first;
string n_pre = next_iter->first;
if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
if ((cur_iter->second.expiration > 0 && next_iter->second.expiration > 0) ||
(cur_iter->second.noncur_expiration > 0 && next_iter->second.noncur_expiration > 0)) {
return false;
} else {
++next_iter;
}
} else {
break;
}
}
++cur_iter;
}
return true;
Expand Down Expand Up @@ -230,15 +253,25 @@ bool RGWLC::obj_has_expired(double timediff, int days)
return (timediff >= cmp);
}

int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, bool remove_indeed)
{
if (remove_indeed) {
return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key);
} else {
obj_key.instance.clear();
RGWObjectCtx rctx(store);
rgw_obj obj(bucket_info.bucket, obj_key);
return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
}
}

int RGWLC::bucket_lc_process(string& shard_id)
{
RGWLifecycleConfiguration config(cct);
RGWBucketInfo bucket_info;
map<string, bufferlist> bucket_attrs;
string next_marker, no_ns, list_versions;
bool is_truncated;
bool default_config = false;
int default_days = 0;
vector<RGWObjEnt> objs;
RGWObjectCtx obj_ctx(store);
vector<std::string> result;
Expand Down Expand Up @@ -273,84 +306,15 @@ int RGWLC::bucket_lc_process(string& shard_id)
return -1;
}

map<string, int>& prefix_map = config.get_prefix_map();
for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
if (prefix_iter->first.empty()) {
default_config = true;
default_days = prefix_iter->second;
break;
}
}

if (default_config) {
do {

objs.clear();
list_op.params.marker = list_op.get_next_marker();
ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
if (ret < 0) {
if (ret == -ENOENT)
return 0;
ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
return ret;
}

vector<RGWObjEnt>::iterator obj_iter;
int pos = 0;
utime_t now = ceph_clock_now();
for (obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
bool prefix_match = false;
int match_days = 0;
map<string, int>& prefix_map = config.get_prefix_map();

for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
if (prefix_iter->first.empty()) {
continue;
}
pos = (*obj_iter).key.name.find(prefix_iter->first, 0);
if (pos != 0) {
continue;
}
prefix_match = true;
match_days = prefix_iter->second;
break;
}
int days = 0;
if (prefix_match) {
days = match_days;
} else if (default_config) {
days = default_days;
} else {
continue;
}
if (obj_has_expired(now - ceph::real_clock::to_time_t((*obj_iter).mtime), days)) {
RGWObjectCtx rctx(store);
rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name);
RGWObjState *state;
int ret = store->get_obj_state(&rctx, obj, &state, false);
if (ret < 0) {
return ret;
}
if (state->mtime != (*obj_iter).mtime) //Check mtime again to avoid delete a recently update object as much as possible
continue;
ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key);
if (ret < 0) {
ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl;
} else {
ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name <<dendl;
}
}
}
} while (is_truncated);
} else {
for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
if (prefix_iter->first.empty()) {
map<string, lc_op>& prefix_map = config.get_prefix_map();
list_op.params.list_versions = bucket_info.versioned();
if (!bucket_info.versioned()) {
for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
if (!prefix_iter->second.status || prefix_iter->second.expiration <=0) {
continue;
}
list_op.params.prefix = prefix_iter->first;

do {

objs.clear();
list_op.params.marker = list_op.get_next_marker();
ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
Expand All @@ -362,12 +326,10 @@ int RGWLC::bucket_lc_process(string& shard_id)
return ret;
}

vector<RGWObjEnt>::iterator obj_iter;
int days = prefix_iter->second;
utime_t now = ceph_clock_now();

for (obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
if (obj_has_expired(now - ceph::real_clock::to_time_t((*obj_iter).mtime), days)) {
for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
if (obj_has_expired(now - ceph::real_clock::to_time_t((*obj_iter).mtime), prefix_iter->second.expiration)) {
RGWObjectCtx rctx(store);
rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name);
RGWObjState *state;
Expand All @@ -377,9 +339,95 @@ int RGWLC::bucket_lc_process(string& shard_id)
}
if (state->mtime != (*obj_iter).mtime)//Check mtime again to avoid delete a recently update object as much as possible
continue;
ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key);
ret = remove_expired_obj(bucket_info, (*obj_iter).key, true);
if (ret < 0) {
ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
} else {
ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name << dendl;
}
}
}
} while (is_truncated);
}
} else {
//bucket versioning is enabled or suspended
rgw_obj_key pre_marker;
for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
if (!prefix_iter->second.status) {
continue;
}
if (prefix_iter != prefix_map.begin() &&
(prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
list_op.next_marker = pre_marker;
} else {
pre_marker = list_op.get_next_marker();
}
list_op.params.prefix = prefix_iter->first;
RGWObjEnt pre_obj;
do {
if (!objs.empty()) {
pre_obj = objs.back();
}
objs.clear();
list_op.params.marker = list_op.get_next_marker();
ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);

if (ret < 0) {
if (ret == (-ENOENT))
return 0;
ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
return ret;
}

utime_t now = ceph_clock_now();
ceph::real_time mtime;
bool remove_indeed = true;
int expiration;
for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
if ((*obj_iter).is_current()) {
if (prefix_iter->second.expiration <= 0) {
continue;
}
if ((*obj_iter).is_delete_marker()) {
if ((obj_iter + 1)==objs.end()) {
if (is_truncated) {
//deal with it in next round because we can't judge whether this marker is the only version
list_op.next_marker = (*obj_iter).key;
break;
}
} else if ((*obj_iter).key.name.compare((*(obj_iter + 1)).key.name) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
continue;
}
remove_indeed = true; //we should remove the delete marker if it's the only version
} else {
remove_indeed = false;
}
mtime = (*obj_iter).mtime;
expiration = prefix_iter->second.expiration;
} else {
if (prefix_iter->second.noncur_expiration <=0) {
continue;
}
remove_indeed = true;
mtime = (obj_iter == objs.begin())?pre_obj.mtime:(*(obj_iter - 1)).mtime;
expiration = prefix_iter->second.noncur_expiration;
}
if (obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration)) {
if ((*obj_iter).is_visible()) {
RGWObjectCtx rctx(store);
rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name);
obj.set_instance((*obj_iter).key.instance);
RGWObjState *state;
int ret = store->get_obj_state(&rctx, obj, &state, false);
if (ret < 0) {
return ret;
}
if (state->mtime != (*obj_iter).mtime)//Check mtime again to avoid delete a recently update object as much as possible
continue;
}
ret = remove_expired_obj(bucket_info, (*obj_iter).key, remove_indeed);
if (ret < 0) {
ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl;
ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
} else {
ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name << dendl;
}
Expand Down

0 comments on commit 638a1cb

Please sign in to comment.