Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mimic: rgw: abort multipart fix #29016

Merged
merged 2 commits into from Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
120 changes: 85 additions & 35 deletions src/rgw/rgw_multi.cc
Expand Up @@ -93,10 +93,9 @@ int list_multipart_parts(RGWRados *store, RGWBucketInfo& bucket_info, CephContex

bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted;

int ret;

parts.clear();

int ret;
if (sorted_omap) {
string p;
p = "part.";
Expand All @@ -109,32 +108,39 @@ int list_multipart_parts(RGWRados *store, RGWBucketInfo& bucket_info, CephContex
} else {
ret = store->omap_get_all(raw_obj, header, parts_map);
}
if (ret < 0)
if (ret < 0) {
return ret;
}

int i;
int last_num = 0;

uint32_t expected_next = marker + 1;

for (i = 0, iter = parts_map.begin(); (i < num_parts || !sorted_omap) && iter != parts_map.end(); ++iter, ++i) {
for (i = 0, iter = parts_map.begin();
(i < num_parts || !sorted_omap) && iter != parts_map.end();
++iter, ++i) {
bufferlist& bl = iter->second;
bufferlist::iterator bli = bl.begin();
RGWUploadPartInfo info;
try {
decode(info, bli);
} catch (buffer::error& err) {
ldout(cct, 0) << "ERROR: could not part info, caught buffer::error" << dendl;
ldout(cct, 0) << "ERROR: could not part info, caught buffer::error" <<
dendl;
return -EIO;
}
if (sorted_omap) {
if (info.num != expected_next) {
/* ouch, we expected a specific part num here, but we got a different one. Either
* a part is missing, or it could be a case of mixed rgw versions working on the same
* upload, where one gateway doesn't support correctly sorted omap keys for multipart
* upload just assume data is unsorted.
/* ouch, we expected a specific part num here, but we got a
* different one. Either a part is missing, or it could be a
* case of mixed rgw versions working on the same upload,
* where one gateway doesn't support correctly sorted omap
* keys for multipart upload just assume data is unsorted.
*/
return list_multipart_parts(store, bucket_info, cct, upload_id, meta_oid, num_parts, marker, parts, next_marker, truncated, true);
return list_multipart_parts(store, bucket_info, cct, upload_id,
meta_oid, num_parts, marker, parts,
next_marker, truncated, true);
}
expected_next++;
}
Expand All @@ -146,21 +152,23 @@ int list_multipart_parts(RGWRados *store, RGWBucketInfo& bucket_info, CephContex
}

if (sorted_omap) {
if (truncated)
if (truncated) {
*truncated = (iter != parts_map.end());
}
} else {
/* rebuild a map with only num_parts entries */

map<uint32_t, RGWUploadPartInfo> new_parts;
map<uint32_t, RGWUploadPartInfo>::iterator piter;

for (i = 0, piter = parts.begin(); i < num_parts && piter != parts.end(); ++i, ++piter) {
for (i = 0, piter = parts.begin();
i < num_parts && piter != parts.end();
++i, ++piter) {
new_parts[piter->first] = piter->second;
last_num = piter->first;
}

if (truncated)
if (truncated) {
*truncated = (piter != parts.end());
}

parts.swap(new_parts);
}
Expand All @@ -179,10 +187,14 @@ int list_multipart_parts(RGWRados *store, struct req_state *s,
int *next_marker, bool *truncated,
bool assume_unsorted)
{
return list_multipart_parts(store, s->bucket_info, s->cct, upload_id, meta_oid, num_parts, marker, parts, next_marker, truncated, assume_unsorted);
return list_multipart_parts(store, s->bucket_info, s->cct, upload_id,
meta_oid, num_parts, marker, parts,
next_marker, truncated, assume_unsorted);
}

int abort_multipart_upload(RGWRados *store, CephContext *cct, RGWObjectCtx *obj_ctx, RGWBucketInfo& bucket_info, RGWMPObj& mp_obj)
int abort_multipart_upload(RGWRados *store, CephContext *cct,
RGWObjectCtx *obj_ctx, RGWBucketInfo& bucket_info,
RGWMPObj& mp_obj)
{
rgw_obj meta_obj;
meta_obj.init_ns(bucket_info.bucket, mp_obj.get_meta(), RGW_OBJ_NS_MULTIPART);
Expand All @@ -196,11 +208,18 @@ int abort_multipart_upload(RGWRados *store, CephContext *cct, RGWObjectCtx *obj_
int ret;

do {
ret = list_multipart_parts(store, bucket_info, cct, mp_obj.get_upload_id(), mp_obj.get_meta(), 1000,
marker, obj_parts, &marker, &truncated);
if (ret < 0)
ret = list_multipart_parts(store, bucket_info, cct,
mp_obj.get_upload_id(), mp_obj.get_meta(),
1000, marker, obj_parts, &marker, &truncated);
if (ret < 0) {
ldout(cct, 20) << __func__ << ": list_multipart_parts returned " <<
ret << dendl;
return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
for (auto obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) {
}

for (auto obj_iter = obj_parts.begin();
obj_iter != obj_parts.end();
++obj_iter) {
RGWUploadPartInfo& obj_part = obj_iter->second;
rgw_obj obj;
if (obj_part.manifest.empty()) {
Expand All @@ -225,15 +244,16 @@ int abort_multipart_upload(RGWRados *store, CephContext *cct, RGWObjectCtx *obj_
}
}
} while (truncated);
/* use upload id as tag */
ret = store->send_chain_to_gc(chain, mp_obj.get_upload_id() , false); // do it async

/* use upload id as tag and do it asynchronously */
ret = store->send_chain_to_gc(chain, mp_obj.get_upload_id(), false);
if (ret < 0) {
ldout(cct, 5) << "gc->send_chain() returned " << ret << dendl;
ldout(cct, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
}

RGWRados::Object del_target(store, bucket_info, *obj_ctx, meta_obj);
RGWRados::Object::Delete del_op(&del_target);

del_op.params.bucket_owner = bucket_info.owner;
del_op.params.versioning_status = 0;
if (!remove_objs.empty()) {
Expand All @@ -242,13 +262,19 @@ int abort_multipart_upload(RGWRados *store, CephContext *cct, RGWObjectCtx *obj_

// and also remove the metadata obj
ret = del_op.delete_obj();
if (ret < 0) {
ldout(cct, 20) << __func__ << ": del_op.delete_obj returned " <<
ret << dendl;
}
return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
}

int list_bucket_multiparts(RGWRados *store, RGWBucketInfo& bucket_info,
string& prefix, string& marker, string& delim,
int& max_uploads, vector<rgw_bucket_dir_entry> *objs,
map<string, bool> *common_prefixes, bool *is_truncated)
const string& prefix, const string& marker,
const string& delim,
const int& max_uploads,
vector<rgw_bucket_dir_entry> *objs,
map<string, bool> *common_prefixes, bool *is_truncated)
{
RGWRados::Bucket target(store, bucket_info);
RGWRados::Bucket::List list_op(&target);
Expand All @@ -260,41 +286,65 @@ int list_bucket_multiparts(RGWRados *store, RGWBucketInfo& bucket_info,
list_op.params.ns = RGW_OBJ_NS_MULTIPART;
list_op.params.filter = &mp_filter;

return(list_op.list_objects(max_uploads, objs, common_prefixes,is_truncated));
return(list_op.list_objects(max_uploads, objs, common_prefixes, is_truncated));
}

int abort_bucket_multiparts(RGWRados *store, CephContext *cct, RGWBucketInfo& bucket_info,
string& prefix, string& delim)
{
int ret, max = 1000, num_deleted = 0;
constexpr int max = 1000;
int ret, num_deleted = 0;
vector<rgw_bucket_dir_entry> objs;
RGWObjectCtx obj_ctx(store);
string marker;
bool is_truncated;

do {
ret = list_bucket_multiparts(store, bucket_info, prefix, marker, delim,
max, &objs, nullptr, &is_truncated);
max, &objs, nullptr, &is_truncated);
if (ret < 0) {
ldout(store->ctx(), 0) << __func__ <<
" ERROR : calling list_bucket_multiparts; ret=" << ret <<
"; bucket=\"" << bucket_info.bucket << "\"; prefix=\"" <<
prefix << "\"; delim=\"" << delim << "\"" << dendl;
return ret;
}
ldout(store->ctx(), 20) << __func__ <<
" INFO: aborting and cleaning up multipart upload(s); bucket=\"" <<
bucket_info.bucket << "\"; objs.size()=" << objs.size() <<
"; is_truncated=" << is_truncated << dendl;

if (!objs.empty()) {
RGWMPObj mp;
for (const auto& obj : objs) {
rgw_obj_key key(obj.key);
if (!mp.from_meta(key.name))
continue;
ret = abort_multipart_upload(store, cct, &obj_ctx, bucket_info, mp);
if (ret < 0 && ret != -ENOENT && ret != -ERR_NO_SUCH_UPLOAD) {
return ret;
if (ret < 0) {
// we're doing a best-effort; if something cannot be found,
// log it and keep moving forward
if (ret != -ENOENT && ret != -ERR_NO_SUCH_UPLOAD) {
ldout(store->ctx(), 0) << __func__ <<
" ERROR : failed to abort and clean-up multipart upload \"" <<
key.get_oid() << "\"" << dendl;
return ret;
} else {
ldout(store->ctx(), 10) << __func__ <<
" NOTE : unable to find part(s) of "
"aborted multipart upload of \"" << key.get_oid() <<
"\" for cleaning up" << dendl;
}
}
num_deleted++;
}
if (num_deleted) {
ldout(store->ctx(),0) << "WARNING : aborted " << num_deleted << " incomplete multipart uploads" << dendl;
ldout(store->ctx(), 0) << __func__ <<
" WARNING : aborted " << num_deleted <<
" incomplete multipart uploads" << dendl;
}
}
} while (is_truncated);

return ret;
return 0;
}
9 changes: 6 additions & 3 deletions src/rgw/rgw_multi.h
Expand Up @@ -99,9 +99,12 @@ extern int abort_multipart_upload(RGWRados *store, CephContext *cct, RGWObjectCt
RGWBucketInfo& bucket_info, RGWMPObj& mp_obj);

extern int list_bucket_multiparts(RGWRados *store, RGWBucketInfo& bucket_info,
string& prefix, string& marker, string& delim,
int& max_uploads, vector<rgw_bucket_dir_entry> *objs,
map<string, bool> *common_prefixes, bool *is_truncated);
const string& prefix,
const string& marker,
const string& delim,
const int& max_uploads,
vector<rgw_bucket_dir_entry> *objs,
map<string, bool> *common_prefixes, bool *is_truncated);

extern int abort_bucket_multiparts(RGWRados *store, CephContext *cct, RGWBucketInfo& bucket_info,
string& prefix, string& delim);
Expand Down