Skip to content

Commit

Permalink
rgw: don't allow multiple writers to same multiobject part
Browse files Browse the repository at this point in the history
Fixes: #8269
Backport: firefly, dumpling

A client might need to retry a multipart part write. The original thread
might race with the new one, trying to clean up after it, clobbering the
part's data.
The fix is to detect whether an original part already existed, and if so
use a different part name for it.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
  • Loading branch information
yehudasa committed May 6, 2014
1 parent 650051c commit bd8e026
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 53 deletions.
82 changes: 68 additions & 14 deletions src/rgw/rgw_op.cc
Expand Up @@ -1357,22 +1357,26 @@ class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic
string upload_id;

protected:
bool immutable_head() { return true; }
int prepare(RGWRados *store, void *obj_ctx);
int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);

public:
bool immutable_head() { return true; }
RGWPutObjProcessor_Multipart(const string& bucket_owner, uint64_t _p, req_state *_s) :
RGWPutObjProcessor_Atomic(bucket_owner, _s->bucket, _s->object_str, _p, _s->req_id), s(_s) {}
};

int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx)
int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
RGWPutObjProcessor::prepare(store, obj_ctx);
RGWPutObjProcessor::prepare(store, obj_ctx, NULL);

string oid = obj_str;
upload_id = s->info.args.get("uploadId");
mp.init(oid, upload_id);
if (!oid_rand) {
mp.init(oid, upload_id);
} else {
mp.init(oid, upload_id, *oid_rand);
}

part_num = s->info.args.get("partNumber");
if (part_num.empty()) {
Expand All @@ -1388,7 +1392,13 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx)
return -EINVAL;
}

string upload_prefix = oid + "." + upload_id;
string upload_prefix = oid + ".";

if (!oid_rand) {
upload_prefix.append(upload_id);
} else {
upload_prefix.append(*oid_rand);
}

rgw_obj target_obj;
target_obj.init(bucket, oid);
Expand Down Expand Up @@ -1466,7 +1476,7 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, time_t *mtime, time_
}


RGWPutObjProcessor *RGWPutObj::select_processor()
RGWPutObjProcessor *RGWPutObj::select_processor(bool *is_multipart)
{
RGWPutObjProcessor *processor;

Expand All @@ -1482,6 +1492,10 @@ RGWPutObjProcessor *RGWPutObj::select_processor()
processor = new RGWPutObjProcessor_Multipart(bucket_owner, part_size, s);
}

if (is_multipart) {
*is_multipart = multipart;
}

return processor;
}

Expand All @@ -1507,6 +1521,7 @@ void RGWPutObj::execute()
map<string, bufferlist> attrs;
int len;
map<string, string>::iterator iter;
bool multipart;


perfcounter->inc(l_rgw_put);
Expand Down Expand Up @@ -1547,9 +1562,9 @@ void RGWPutObj::execute()
supplied_md5[sizeof(supplied_md5) - 1] = '\0';
}

processor = select_processor();
processor = select_processor(&multipart);

ret = processor->prepare(store, s->obj_ctx);
ret = processor->prepare(store, s->obj_ctx, NULL);
if (ret < 0)
goto done;

Expand All @@ -1572,9 +1587,48 @@ void RGWPutObj::execute()

hash.Update(data_ptr, len);

ret = processor->throttle_data(handle);
if (ret < 0)
goto done;
/* do we need this operation to be synchronous? if we're dealing with an object with immutable
* head, e.g., multipart object we need to make sure we're the first one writing to this object
*/
bool need_to_wait = (ofs == 0) && multipart;

ret = processor->throttle_data(handle, need_to_wait);
if (ret < 0) {
if (!need_to_wait || ret != -EEXIST) {
ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl;
goto done;
}

ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;

/* restart processing with different oid suffix */

dispose_processor(processor);
processor = select_processor(&multipart);

string oid_rand;
char buf[33];
gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
oid_rand.append(buf);

ret = processor->prepare(store, s->obj_ctx, &oid_rand);
if (ret < 0) {
ldout(s->cct, 0) << "ERROR: processor->prepare() returned " << ret << dendl;
goto done;
}

ret = processor->handle_data(data, ofs, &handle);
if (ret < 0) {
ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl;
goto done;
}

ret = processor->throttle_data(handle, false);
if (ret < 0) {
ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl;
goto done;
}
}

ofs += len;
} while (len > 0);
Expand Down Expand Up @@ -1683,7 +1737,7 @@ void RGWPostObj::execute()

processor = select_processor();

ret = processor->prepare(store, s->obj_ctx);
ret = processor->prepare(store, s->obj_ctx, NULL);
if (ret < 0)
goto done;

Expand All @@ -1708,7 +1762,7 @@ void RGWPostObj::execute()

hash.Update(data_ptr, len);

ret = processor->throttle_data(handle);
ret = processor->throttle_data(handle, false);
if (ret < 0)
goto done;

Expand Down
21 changes: 11 additions & 10 deletions src/rgw/rgw_op.h
Expand Up @@ -340,7 +340,7 @@ class RGWPutObj : public RGWOp {
policy.set_ctx(s->cct);
}

RGWPutObjProcessor *select_processor();
RGWPutObjProcessor *select_processor(bool *is_multipart);
void dispose_processor(RGWPutObjProcessor *processor);

int verify_permission();
Expand Down Expand Up @@ -754,21 +754,22 @@ class RGWMPObj {
string upload_id;
public:
RGWMPObj() {}
RGWMPObj(string& _oid, string& _upload_id) {
init(_oid, _upload_id);
RGWMPObj(const string& _oid, const string& _upload_id) {
init(_oid, _upload_id, _upload_id);
}
void init(const string& _oid, const string& _upload_id) {
init(_oid, _upload_id, _upload_id);
}
void init(string& _oid, string& _upload_id) {
void init(const string& _oid, const string& _upload_id, const string& part_unique_str) {
if (_oid.empty()) {
clear();
return;
}
oid = _oid;
upload_id = _upload_id;
prefix = oid;
prefix.append(".");
prefix.append(upload_id);
meta = prefix;
meta.append(MP_META_SUFFIX);
prefix = oid + ".";
meta = prefix + upload_id + MP_META_SUFFIX;
prefix.append(part_unique_str);
}
string& get_meta() { return meta; }
string get_part(int num) {
Expand Down Expand Up @@ -799,7 +800,7 @@ class RGWMPObj {
return false;
oid = meta.substr(0, mid_pos);
upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1);
init(oid, upload_id);
init(oid, upload_id, upload_id);
return true;
}
void clear() {
Expand Down
42 changes: 24 additions & 18 deletions src/rgw/rgw_rados.cc
Expand Up @@ -719,10 +719,10 @@ int RGWObjManifest::generator::create_begin(CephContext *cct, RGWObjManifest *_m
manifest->set_head(_h);
last_ofs = 0;

char buf[33];
gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);

if (manifest->get_prefix().empty()) {
char buf[33];
gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);

string oid_prefix = ".";
oid_prefix.append(buf);
oid_prefix.append("_");
Expand Down Expand Up @@ -1006,9 +1006,9 @@ RGWPutObjProcessor::~RGWPutObjProcessor()
}
}

int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx)
int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
RGWPutObjProcessor::prepare(store, obj_ctx);
RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand);

obj.init(bucket, obj_str);

Expand Down Expand Up @@ -1041,7 +1041,7 @@ int RGWPutObjProcessor_Plain::do_complete(string& etag, time_t *mtime, time_t se
}


int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle)
int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive)
{
if ((uint64_t)abs_ofs + bl.length() > obj_len)
obj_len = abs_ofs + bl.length();
Expand All @@ -1051,7 +1051,7 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t
int r = store->aio_put_obj_data(NULL, obj,
bl,
((ofs != 0) ? ofs : -1),
false, phandle);
exclusive, phandle);

return r;
}
Expand Down Expand Up @@ -1091,18 +1091,21 @@ int RGWPutObjProcessor_Aio::drain_pending()
return ret;
}

int RGWPutObjProcessor_Aio::throttle_data(void *handle)
int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
{
if (handle) {
struct put_obj_aio_info info;
info.handle = handle;
pending.push_back(info);
}
size_t orig_size = pending.size();
while (pending_has_completed()) {
while (pending_has_completed()
|| need_to_wait) {
int r = wait_pending_front();
if (r < 0)
return r;

need_to_wait = false;
}

/* resize window in case messages are draining too fast */
Expand All @@ -1118,7 +1121,7 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle)
return 0;
}

int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle)
int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive)
{
if (ofs >= next_part_ofs) {
int r = prepare_next_part(ofs);
Expand All @@ -1127,7 +1130,7 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
}
}

return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle);
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}

int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle)
Expand Down Expand Up @@ -1168,12 +1171,15 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha
}
off_t write_ofs = data_ofs;
data_ofs = write_ofs + bl.length();
return write_data(bl, write_ofs, phandle);
bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
we could be racing with another upload, to the same
object and cleanup can be messy */
return write_data(bl, write_ofs, phandle, exclusive);
}

int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx)
int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
RGWPutObjProcessor::prepare(store, obj_ctx);
RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand);

head_obj.init(bucket, obj_str);

Expand Down Expand Up @@ -1220,12 +1226,12 @@ int RGWPutObjProcessor_Atomic::complete_writing_data()
}
if (pending_data_bl.length()) {
void *handle;
int r = write_data(pending_data_bl, data_ofs, &handle);
int r = write_data(pending_data_bl, data_ofs, &handle, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
return r;
}
r = throttle_data(handle);
r = throttle_data(handle, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
return r;
Expand Down Expand Up @@ -3014,7 +3020,7 @@ class RGWRadosPutObj : public RGWGetDataCB
}
}

ret = processor->throttle_data(handle);
ret = processor->throttle_data(handle, false);
if (ret < 0)
return ret;

Expand Down Expand Up @@ -3161,7 +3167,7 @@ int RGWRados::copy_obj(void *ctx,

RGWPutObjProcessor_Atomic processor(dest_bucket_info.owner, dest_obj.bucket, dest_obj.object,
cct->_conf->rgw_obj_stripe_size, tag);
ret = processor.prepare(this, ctx);
ret = processor.prepare(this, ctx, NULL);
if (ret < 0)
return ret;

Expand Down

0 comments on commit bd8e026

Please sign in to comment.