Skip to content

Commit

Permalink
osd, cls/cas: fix to make inc/dec operation idempotent
Browse files Browse the repository at this point in the history
Current dedup allow to contain multiple same sources using
multiset, which results in inconsistent situation as follow
(during set_chunk, but not confined in set_chunk).

1. complete primary write
2. Failure occurs before replication is done
3. cancel repop
4. requeue the op

Due to 4, inc. op is applied one more time. At this time,
current inc op---this is similar to ++ operation--- increases
the ref. count once again.
To prevent this, inc/dec op needs to be idempotent using
absolute value.

fixes: https://tracker.ceph.com/issues/51000

Signed-off-by: Myoungwon Oh <myoungwon.oh@samsung.com>
  • Loading branch information
myoungwon committed Jul 20, 2021
1 parent cd0bb38 commit d2ff8a4
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 19 deletions.
65 changes: 55 additions & 10 deletions src/cls/cas/cls_cas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,26 @@ static int chunk_create_or_get_ref(cls_method_context_t hctx,
} else if (ret < 0) {
return ret;
} else {
// existing chunk; inc ref
if (op.flags & cls_cas_chunk_create_or_get_ref_op::FLAG_VERIFY) {
bufferlist old;
cls_cxx_read(hctx, 0, 0, &old);
if (!old.contents_equal(op.data)) {
return -ENOMSG;
if (objr.get_type() == chunk_refs_t::TYPE_BY_OBJECT) {
int expected_count = op.count;
ceph_assert(expected_count != -1);
int cur_count = static_cast<chunk_refs_by_object_t *>(objr.r.get())->count(op.source);
if (expected_count == cur_count) {
return 0;
}
CLS_LOG(5, "expected=%d, cur=%d\n", expected_count, cur_count);
while (expected_count > cur_count) {
objr.get(op.source);
cur_count++;
}
} else {
// existing chunk; inc ref
if (op.flags & cls_cas_chunk_create_or_get_ref_op::FLAG_VERIFY) {
bufferlist old;
cls_cxx_read(hctx, 0, 0, &old);
if (!old.contents_equal(op.data)) {
return -ENOMSG;
}
}
}
CLS_LOG(10, "inc ref oid=%s\n",
Expand Down Expand Up @@ -137,10 +151,24 @@ static int chunk_get_ref(cls_method_context_t hctx,
return ret;
}

if (objr.get_type() == chunk_refs_t::TYPE_BY_OBJECT) {
int expected_count = op.count;
ceph_assert(expected_count != -1);
int cur_count = static_cast<chunk_refs_by_object_t *>(objr.r.get())->count(op.source);
if (expected_count == cur_count) {
return 0;
}
CLS_LOG(5, "expected=%d, cur=%d\n", expected_count, cur_count);
while (expected_count > cur_count) {
objr.get(op.source);
cur_count++;
}
} else {
objr.get(op.source);
}
// existing chunk; inc ref
CLS_LOG(10, "oid=%s\n", op.source.oid.name.c_str());

objr.get(op.source);

ret = chunk_set_refcount(hctx, objr);
if (ret < 0) {
Expand All @@ -167,9 +195,26 @@ static int chunk_put_ref(cls_method_context_t hctx,
if (ret < 0)
return ret;

if (!objr.put(op.source)) {
CLS_LOG(10, "oid=%s (no ref)\n", op.source.oid.name.c_str());
return -ENOLINK;
if (objr.get_type() == chunk_refs_t::TYPE_BY_OBJECT) {
int expected_count = op.count;
ceph_assert(expected_count != -1);
int cur_count = static_cast<chunk_refs_by_object_t *>(objr.r.get())->count(op.source);
if (expected_count == cur_count) {
return 0;
}
CLS_LOG(5, "expected=%d, cur=%d\n", expected_count, cur_count);
while (expected_count < cur_count) {
if (!objr.put(op.source)) {
CLS_LOG(10, "oid=%s (no ref)\n", op.source.oid.name.c_str());
return -ENOLINK;
}
cur_count--;
}
} else {
if (!objr.put(op.source)) {
CLS_LOG(10, "oid=%s (no ref)\n", op.source.oid.name.c_str());
return -ENOLINK;
}
}

if (objr.empty()) {
Expand Down
12 changes: 9 additions & 3 deletions src/cls/cas/cls_cas_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ void cls_cas_chunk_create_or_get_ref(
librados::ObjectWriteOperation& op,
const hobject_t& soid,
const bufferlist& data,
bool verify)
bool verify,
int count)
{
bufferlist in;
cls_cas_chunk_create_or_get_ref_op call;
call.source = soid;
call.count = count;
if (verify) {
call.flags |= cls_cas_chunk_create_or_get_ref_op::FLAG_VERIFY;
}
Expand All @@ -33,22 +35,26 @@ void cls_cas_chunk_create_or_get_ref(

void cls_cas_chunk_get_ref(
librados::ObjectWriteOperation& op,
const hobject_t& soid)
const hobject_t& soid,
int count)
{
bufferlist in;
cls_cas_chunk_get_ref_op call;
call.source = soid;
call.count = count;
encode(call, in);
op.exec("cas", "chunk_get_ref", in);
}

void cls_cas_chunk_put_ref(
librados::ObjectWriteOperation& op,
const hobject_t& soid)
const hobject_t& soid,
int count)
{
bufferlist in;
cls_cas_chunk_put_ref_op call;
call.source = soid;
call.count = count;
encode(call, in);
op.exec("cas", "chunk_put_ref", in);
}
Expand Down
9 changes: 6 additions & 3 deletions src/cls/cas/cls_cas_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ void cls_cas_chunk_create_or_get_ref(
librados::ObjectWriteOperation& op,
const hobject_t& soid,
const bufferlist& data,
bool verify=false);
bool verify=false,
int count = -1);

/// get ref on existing chunk
void cls_cas_chunk_get_ref(
librados::ObjectWriteOperation& op,
const hobject_t& soid);
const hobject_t& soid,
int count = -1);

/// drop reference on existing chunk
void cls_cas_chunk_put_ref(
librados::ObjectWriteOperation& op,
const hobject_t& soid);
const hobject_t& soid,
int count = -1);


//
Expand Down
3 changes: 3 additions & 0 deletions src/cls/cas/cls_cas_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ struct chunk_refs_by_object_t : public chunk_refs_t::refs_t {
uint64_t count() const override {
return by_object.size();
}
uint64_t count(hobject_t& o) const {
return by_object.count(o);
}
void get(const hobject_t& o) override {
by_object.insert(o);
}
Expand Down
9 changes: 9 additions & 0 deletions src/cls/cas/cls_cas_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ struct cls_cas_chunk_create_or_get_ref_op {
hobject_t source;
uint64_t flags = 0;
bufferlist data;
int count;

cls_cas_chunk_create_or_get_ref_op() {}

Expand All @@ -24,6 +25,7 @@ struct cls_cas_chunk_create_or_get_ref_op {
encode(source, bl);
encode(flags, bl);
encode(data, bl);
encode(count, bl);
ENCODE_FINISH(bl);
}

Expand All @@ -32,6 +34,7 @@ struct cls_cas_chunk_create_or_get_ref_op {
decode(source, bl);
decode(flags, bl);
decode(data, bl);
decode(count, bl);
DECODE_FINISH(bl);
}
void dump(ceph::Formatter *f) const {
Expand All @@ -48,18 +51,21 @@ WRITE_CLASS_ENCODER(cls_cas_chunk_create_or_get_ref_op)

struct cls_cas_chunk_get_ref_op {
hobject_t source;
int count;

cls_cas_chunk_get_ref_op() {}

void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
encode(source, bl);
encode(count, bl);
ENCODE_FINISH(bl);
}

void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
decode(source, bl);
decode(count, bl);
DECODE_FINISH(bl);
}
void dump(ceph::Formatter *f) const {
Expand All @@ -74,18 +80,21 @@ WRITE_CLASS_ENCODER(cls_cas_chunk_get_ref_op)

struct cls_cas_chunk_put_ref_op {
hobject_t source;
int count;

cls_cas_chunk_put_ref_op() {}

void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
encode(source, bl);
encode(count, bl);
ENCODE_FINISH(bl);
}

void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
decode(source, bl);
decode(count, bl);
DECODE_FINISH(bl);
}

Expand Down
12 changes: 9 additions & 3 deletions src/osd/PrimaryLogPG.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3425,7 +3425,7 @@ int PrimaryLogPG::get_manifest_ref_count(ObjectContextRef obc, std::string& fp_o
if (!clone_obc) {
break;
}
if (recover_adjacent_clones(clone_obc, op)) {
if (op && recover_adjacent_clones(clone_obc, op)) {
return -EAGAIN;
}
get_adjacent_clones(clone_obc, obc_l, obc_g);
Expand Down Expand Up @@ -3685,21 +3685,29 @@ ceph_tid_t PrimaryLogPG::refcount_manifest(hobject_t src_soid, hobject_t tgt_soi

ObjectOperation obj_op;
bufferlist in;
ObjectContextRef src_obc = get_object_context(src_soid, false, NULL);
ceph_assert(src_obc);
ObjectContextRef head = get_object_context(src_soid.get_head(), false, nullptr);
ceph_assert(head);
if (type == refcount_t::INCREMENT_REF) {
cls_cas_chunk_get_ref_op call;
call.source = src_soid.get_head();
call.count = get_manifest_ref_count(head, tgt_soid.oid.name, nullptr) + 1;
::encode(call, in);
obj_op.call("cas", "chunk_get_ref", in);
} else if (type == refcount_t::DECREMENT_REF) {
cls_cas_chunk_put_ref_op call;
call.source = src_soid.get_head();
// refcount_manifest is called after tgt_soid is deleted from chunk_info_t
call.count = get_manifest_ref_count(head, tgt_soid.oid.name, nullptr);
::encode(call, in);
obj_op.call("cas", "chunk_put_ref", in);
} else if (type == refcount_t::CREATE_OR_GET_REF) {
cls_cas_chunk_create_or_get_ref_op get_call;
get_call.source = src_soid.get_head();
ceph_assert(chunk);
get_call.data = move(*chunk);
get_call.count = get_manifest_ref_count(head, tgt_soid.oid.name, nullptr) + 1;
::encode(get_call, in);
obj_op.call("cas", "chunk_create_or_get_ref", in);
} else {
Expand All @@ -3712,8 +3720,6 @@ ceph_tid_t PrimaryLogPG::refcount_manifest(hobject_t src_soid, hobject_t tgt_soi
}

object_locator_t oloc(tgt_soid);
ObjectContextRef src_obc = get_object_context(src_soid, false, NULL);
ceph_assert(src_obc);
auto tid = osd->objecter->mutate(
tgt_soid.oid, oloc, obj_op, SnapContext(),
ceph::real_clock::from_ceph_timespec(src_obc->obs.oi.mtime),
Expand Down

0 comments on commit d2ff8a4

Please sign in to comment.