Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rgw: add support for noncurrentversion expiration in s3 lifecycle. #13385

Merged
merged 1 commit into from Feb 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
248 changes: 148 additions & 100 deletions src/rgw/rgw_lc.cc
Expand Up @@ -36,7 +36,13 @@ bool LCRule::validate()
if (id.length() > MAX_ID_LEN) {
return false;
}
else if (expiration.get_days() <= 0) {
else if(expiration.empty() && noncur_expiration.empty()) {
return false;
}
else if (!expiration.empty() && expiration.get_days() <= 0) {
return false;
}
else if (!noncur_expiration.empty() && noncur_expiration.get_days() <=0) {
return false;
}
return true;
Expand All @@ -47,12 +53,22 @@ void RGWLifecycleConfiguration::add_rule(LCRule *rule)
string id;
rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups
rule_map.insert(pair<string, LCRule>(id, *rule));
_add_rule(rule);
}

void RGWLifecycleConfiguration::_add_rule(LCRule *rule)
bool RGWLifecycleConfiguration::_add_rule(LCRule *rule)
{
prefix_map[rule->get_prefix()] = rule->get_expiration().get_days();
lc_op op;
if (rule->get_status().compare("Enabled") == 0) {
op.status = true;
}
if (!rule->get_expiration().empty()) {
op.expiration = rule->get_expiration().get_days();
}
if (!rule->get_noncur_expiration().empty()) {
op.noncur_expiration = rule->get_noncur_expiration().get_days();
}
auto ret = prefix_map.insert(pair<string, lc_op>(rule->get_prefix(), op));
return ret.second;
}

int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
Expand All @@ -67,9 +83,7 @@ int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
}
rule_map.insert(pair<string, LCRule>(id, *rule));

auto ret = prefix_map.insert(pair<string, int>(rule->get_prefix(), rule->get_expiration().get_days()));
//Now prefix shouldn't be the same. When we add noncurrent expiration or other action, prefix may be same.
if (!ret.second) {
if (!_add_rule(rule)) {
return -ERR_INVALID_REQUEST;
}
return 0;
Expand All @@ -82,15 +96,24 @@ bool RGWLifecycleConfiguration::validate()
if (prefix_map.size() < 2) {
return true;
}
auto next_iter = prefix_map.begin();
auto cur_iter = next_iter++;
while (next_iter != prefix_map.end()) {
string c_pre = cur_iter->first;
string n_pre = next_iter->first;
if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
return false;
}
auto cur_iter = prefix_map.begin();
while (cur_iter != prefix_map.end()) {
auto next_iter = cur_iter;
++next_iter;
while (next_iter != prefix_map.end()) {
string c_pre = cur_iter->first;
string n_pre = next_iter->first;
if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
if ((cur_iter->second.expiration > 0 && next_iter->second.expiration > 0) ||
(cur_iter->second.noncur_expiration > 0 && next_iter->second.noncur_expiration > 0)) {
return false;
} else {
++next_iter;
}
} else {
break;
}
}
++cur_iter;
}
return true;
Expand Down Expand Up @@ -230,15 +253,25 @@ bool RGWLC::obj_has_expired(double timediff, int days)
return (timediff >= cmp);
}

int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, bool remove_indeed)
{
if (remove_indeed) {
return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key);
} else {
obj_key.instance.clear();
RGWObjectCtx rctx(store);
rgw_obj obj(bucket_info.bucket, obj_key);
return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
}
}

int RGWLC::bucket_lc_process(string& shard_id)
{
RGWLifecycleConfiguration config(cct);
RGWBucketInfo bucket_info;
map<string, bufferlist> bucket_attrs;
string next_marker, no_ns, list_versions;
bool is_truncated;
bool default_config = false;
int default_days = 0;
vector<RGWObjEnt> objs;
RGWObjectCtx obj_ctx(store);
vector<std::string> result;
Expand Down Expand Up @@ -273,84 +306,15 @@ int RGWLC::bucket_lc_process(string& shard_id)
return -1;
}

map<string, int>& prefix_map = config.get_prefix_map();
for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
if (prefix_iter->first.empty()) {
default_config = true;
default_days = prefix_iter->second;
break;
}
}

if (default_config) {
do {

objs.clear();
list_op.params.marker = list_op.get_next_marker();
ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
if (ret < 0) {
if (ret == -ENOENT)
return 0;
ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
return ret;
}

vector<RGWObjEnt>::iterator obj_iter;
int pos = 0;
utime_t now = ceph_clock_now();
for (obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
bool prefix_match = false;
int match_days = 0;
map<string, int>& prefix_map = config.get_prefix_map();

for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
if (prefix_iter->first.empty()) {
continue;
}
pos = (*obj_iter).key.name.find(prefix_iter->first, 0);
if (pos != 0) {
continue;
}
prefix_match = true;
match_days = prefix_iter->second;
break;
}
int days = 0;
if (prefix_match) {
days = match_days;
} else if (default_config) {
days = default_days;
} else {
continue;
}
if (obj_has_expired(now - ceph::real_clock::to_time_t((*obj_iter).mtime), days)) {
RGWObjectCtx rctx(store);
rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name);
RGWObjState *state;
int ret = store->get_obj_state(&rctx, obj, &state, false);
if (ret < 0) {
return ret;
}
if (state->mtime != (*obj_iter).mtime) //Check mtime again to avoid delete a recently update object as much as possible
continue;
ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key);
if (ret < 0) {
ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl;
} else {
ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name <<dendl;
}
}
}
} while (is_truncated);
} else {
for(map<string, int>::iterator prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
if (prefix_iter->first.empty()) {
map<string, lc_op>& prefix_map = config.get_prefix_map();
list_op.params.list_versions = bucket_info.versioned();
if (!bucket_info.versioned()) {
for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
if (!prefix_iter->second.status || prefix_iter->second.expiration <=0) {
continue;
}
list_op.params.prefix = prefix_iter->first;

do {

objs.clear();
list_op.params.marker = list_op.get_next_marker();
ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
Expand All @@ -362,26 +326,110 @@ int RGWLC::bucket_lc_process(string& shard_id)
return ret;
}

vector<RGWObjEnt>::iterator obj_iter;
int days = prefix_iter->second;
utime_t now = ceph_clock_now();

for (obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
if (obj_has_expired(now - ceph::real_clock::to_time_t((*obj_iter).mtime), days)) {
for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
if (obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->mtime), prefix_iter->second.expiration)) {
RGWObjectCtx rctx(store);
rgw_obj obj(bucket_info.bucket, (*obj_iter).key.name);
rgw_obj obj(bucket_info.bucket, obj_iter->key.name);
RGWObjState *state;
int ret = store->get_obj_state(&rctx, obj, &state, false);
if (ret < 0) {
return ret;
}
if (state->mtime != (*obj_iter).mtime)//Check mtime again to avoid delete a recently update object as much as possible
if (state->mtime != obj_iter->mtime)//Check mtime again to avoid delete a recently update object as much as possible
continue;
ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, (*obj_iter).key);
ret = remove_expired_obj(bucket_info, obj_iter->key, true);
if (ret < 0) {
ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
} else {
ldout(cct, 10) << "DELETED:" << bucket_name << ":" << obj_iter->key.name << dendl;
}
}
}
} while (is_truncated);
}
} else {
//bucket versioning is enabled or suspended
rgw_obj_key pre_marker;
for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
if (!prefix_iter->second.status) {
continue;
}
if (prefix_iter != prefix_map.begin() &&
(prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
list_op.next_marker = pre_marker;
} else {
pre_marker = list_op.get_next_marker();
}
list_op.params.prefix = prefix_iter->first;
RGWObjEnt pre_obj;
do {
if (!objs.empty()) {
pre_obj = objs.back();
}
objs.clear();
list_op.params.marker = list_op.get_next_marker();
ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);

if (ret < 0) {
if (ret == (-ENOENT))
return 0;
ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
return ret;
}

utime_t now = ceph_clock_now();
ceph::real_time mtime;
bool remove_indeed = true;
int expiration;
for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
if (obj_iter->is_current()) {
if (prefix_iter->second.expiration <= 0) {
continue;
}
if (obj_iter->is_delete_marker()) {
if ((obj_iter + 1)==objs.end()) {
if (is_truncated) {
//deal with it in next round because we can't judge whether this marker is the only version
list_op.next_marker = obj_iter->key;
break;
}
} else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
continue;
}
remove_indeed = true; //we should remove the delete marker if it's the only version
} else {
remove_indeed = false;
}
mtime = obj_iter->mtime;
expiration = prefix_iter->second.expiration;
} else {
if (prefix_iter->second.noncur_expiration <=0) {
continue;
}
remove_indeed = true;
mtime = (obj_iter == objs.begin())?pre_obj.mtime:(obj_iter - 1)->mtime;
expiration = prefix_iter->second.noncur_expiration;
}
if (obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration)) {
if (obj_iter->is_visible()) {
RGWObjectCtx rctx(store);
rgw_obj obj(bucket_info.bucket, obj_iter->key.name);
obj.set_instance(obj_iter->key.instance);
RGWObjState *state;
int ret = store->get_obj_state(&rctx, obj, &state, false);
if (ret < 0) {
return ret;
}
if (state->mtime != obj_iter->mtime)//Check mtime again to avoid delete a recently update object as much as possible
continue;
}
ret = remove_expired_obj(bucket_info, obj_iter->key, remove_indeed);
if (ret < 0) {
ldout(cct, 0) << "ERROR: rgw_remove_object " << dendl;
ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
} else {
ldout(cct, 10) << "DELETED:" << bucket_name << ":" << (*obj_iter).key.name << dendl;
ldout(cct, 10) << "DELETED:" << bucket_name << ":" << obj_iter->key.name << dendl;
}
}
}
Expand Down