Skip to content

Commit

Permalink
rgw: identify racing writes when using copy-if-newer
Browse files Browse the repository at this point in the history
When copying an object from a different zone, and copy-if-newer is
specified, if the final meta write is canceled check whether the
destinatioin that was created is actually newer than our mtime,
otherwise retry.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
(cherry picked from commit fe9c64b)
  • Loading branch information
yehudasa authored and cbodley committed Jun 10, 2016
1 parent 02f6d8a commit a38f157
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
50 changes: 46 additions & 4 deletions src/rgw/rgw_rados.cc
Expand Up @@ -898,7 +898,7 @@ int RGWPutObjProcessor::complete(string& etag, time_t *mtime, time_t set_mtime,
if (r < 0)
return r;

is_complete = true;
is_complete = canceled;
return 0;
}

Expand Down Expand Up @@ -1241,6 +1241,8 @@ int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t s
return r;
}

canceled = obj_op.meta.canceled;

return 0;
}

Expand Down Expand Up @@ -3454,6 +3456,8 @@ int RGWRados::Object::Write::write_meta(uint64_t size,
}
}

meta.canceled = false;

/* update quota cache */
store->quota_handler->update_stats(meta.owner, bucket, (orig_exists ? 0 : 1), size, orig_size);

Expand All @@ -3465,6 +3469,8 @@ int RGWRados::Object::Write::write_meta(uint64_t size,
ldout(store->ctx(), 0) << "ERROR: index_op.cancel()() returned ret=" << ret << dendl;
}

meta.canceled = true;

/* we lost in a race. There are a few options:
* - existing object was rewritten (ECANCELED)
* - non existing object was created (EEXIST)
Expand Down Expand Up @@ -3696,6 +3702,10 @@ class RGWRadosPutObj : public RGWGetDataCB
int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs) {
return processor->complete(etag, mtime, set_mtime, attrs);
}

bool is_canceled() {
return processor->is_canceled();
}
};

/*
Expand Down Expand Up @@ -3793,6 +3803,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
RGWRESTStreamReadRequest *in_stream_req;
string tag;
map<string, bufferlist> src_attrs;
int i;
append_rand_alpha(cct, tag, tag, 32);

RGWPutObjProcessor_Atomic processor(obj_ctx,
Expand Down Expand Up @@ -3900,8 +3911,34 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
attrs = src_attrs;
}

ret = cb.complete(etag, mtime, set_mtime, attrs);
if (ret < 0) {
#define MAX_COMPLETE_RETRY 100
for (i = 0; i < MAX_COMPLETE_RETRY; i++) {
ret = cb.complete(etag, mtime, set_mtime, attrs);
if (ret < 0) {
goto set_err_state;
}
if (copy_if_newer && cb.is_canceled()) {
ldout(cct, 20) << "raced with another write of obj: " << dest_obj << dendl;
obj_ctx.invalidate(dest_obj); /* object was overwritten */
ret = get_obj_state(&obj_ctx, dest_obj, &dest_state, NULL);
if (ret < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << ": get_err_state() returned ret=" << ret << dendl;
goto set_err_state;
}
if (!dest_state->exists ||
dest_state->mtime < set_mtime) {
ldout(cct, 20) << "retrying writing object mtime=" << set_mtime << " dest_state->mtime=" << dest_state->mtime << " dest_state->exists=" << dest_state->exists << dendl;
continue;
} else {
ldout(cct, 20) << "not retrying writing object mtime=" << set_mtime << " dest_state->mtime=" << dest_state->mtime << " dest_state->exists=" << dest_state->exists << dendl;
}
}
break;
}

if (i == MAX_COMPLETE_RETRY) {
ldout(cct, 0) << "ERROR: retried object completion too many times, something is wrong!" << dendl;
ret = -EIO;
goto set_err_state;
}

Expand All @@ -3912,7 +3949,12 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,

return 0;
set_err_state:
int r = opstate.set_state(RGWOpState::OPSTATE_ERROR);
RGWOpState::OpState state = RGWOpState::OPSTATE_ERROR;
if (copy_if_newer && ret == -ERR_NOT_MODIFIED) {
state = RGWOpState::OPSTATE_COMPLETE;
ret = 0;
}
int r = opstate.set_state(state);
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to set opstate r=" << ret << dendl;
}
Expand Down
8 changes: 6 additions & 2 deletions src/rgw/rgw_rados.h
Expand Up @@ -1570,10 +1570,11 @@ class RGWRados
const char *if_match;
const char *if_nomatch;
uint64_t olh_epoch;
bool canceled;

MetaParams() : mtime(NULL), rmattrs(NULL), data(NULL), manifest(NULL), ptag(NULL),
remove_objs(NULL), set_mtime(0), category(RGW_OBJ_CATEGORY_MAIN), flags(0),
if_match(NULL), if_nomatch(NULL), olh_epoch(0) {}
if_match(NULL), if_nomatch(NULL), olh_epoch(0), canceled(false) {}
} meta;

Write(RGWRados::Object *_target) : target(_target) {}
Expand Down Expand Up @@ -2307,13 +2308,14 @@ class RGWPutObjProcessor
RGWObjectCtx& obj_ctx;
bool is_complete;
RGWBucketInfo bucket_info;
bool canceled;

virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime,
map<string, bufferlist>& attrs,
const char *if_match = NULL, const char *if_nomatch = NULL) = 0;

public:
RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_info(_bi) {}
RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_info(_bi), canceled(false) {}
virtual ~RGWPutObjProcessor() {}
virtual int prepare(RGWRados *_store, string *oid_rand) {
store = _store;
Expand All @@ -2329,6 +2331,8 @@ class RGWPutObjProcessor
const char *if_match = NULL, const char *if_nomatch = NULL);

CephContext *ctx();

bool is_canceled() { return canceled; }
};

struct put_obj_aio_info {
Expand Down

0 comments on commit a38f157

Please sign in to comment.