Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rgw: fix versioned bucket index logic so that incomplete index transactions can be fixed #55165

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 40 additions & 0 deletions qa/workunits/rgw/test_rgw_versioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,46 @@ def do_delete():
assert num_leftover_olh_entries == 0, \
'Found leftover olh entries after concurrent deletes'


# TESTCASE 'verify that index entries can be cleaned up if index completion ops are not applied'
log.debug('TEST: verify that index entries can be cleaned up if index completion ops are not applied\n')
bucket.object_versions.all().delete()

def delete_request(key, version_id):
connection.ObjectVersion(bucket.name, key, version_id).delete()

def check_olh(*args):
exec_cmd(f'radosgw-admin bucket check olh --fix --bucket {BUCKET_NAME}')

def check_unlinked(*args):
exec_cmd(f'radosgw-admin bucket check unlinked --fix --min-age-hours=0 --bucket {BUCKET_NAME}')

fix_funcs = [delete_request, check_olh, check_unlinked]
bucket.object_versions.all().delete()

for fix_func in fix_funcs:
key = str(uuid.uuid4())
put_resp = bucket.put_object(Key=key, Body=b"data")
version_id = put_resp.version_id
try:
exec_cmd('ceph config set client rgw_debug_inject_skip_index_clear_olh true')
exec_cmd('ceph config set client rgw_debug_inject_skip_index_complete_del true')
time.sleep(1)
connection.ObjectVersion(bucket.name, key, version_id).delete()
finally:
exec_cmd('ceph config rm client rgw_debug_inject_skip_index_clear_olh')
exec_cmd('ceph config rm client rgw_debug_inject_skip_index_complete_del')

out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME} --object {key}')
json_out = json.loads(out.replace(b'\x80', b'0x80'))
assert len(json_out) == 3, 'failed to find leftover bi entries'

fix_func(key, version_id)

out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME} --object {key}')
json_out = json.loads(out.replace(b'\x80', b'0x80'))
assert len(json_out) == 0, f'{fix_func.__name__} did not remove leftover index entries for {key}'

# Clean up
log.debug("Deleting bucket {}".format(BUCKET_NAME))
bucket.object_versions.all().delete()
Expand Down
35 changes: 17 additions & 18 deletions src/cls/rgw/cls_rgw.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,9 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
CLS_LOG_BITX(bitx_inst, 1,
"ERROR: %s: couldn't find tag for pending operation with tag %s",
__func__, op.tag.c_str());
if (op.op == CLS_RGW_OP_DEL || op.op == CLS_RGW_OP_CANCEL) {
return 0;
}
return -EINVAL;
}
CLS_LOG_BITX(bitx_inst, 20,
Expand Down Expand Up @@ -1121,7 +1124,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
"INFO: %s: key=%s not on disk, no action",
__func__, escape_str(idx).c_str());
log_op = false;
} else if (!entry.pending_map.size()) {
} else {
CLS_LOG_BITX(bitx_inst, 20,
"INFO: %s: removing map entry with key=%s",
__func__, escape_str(idx).c_str());
Expand All @@ -1132,20 +1135,6 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
__func__, escape_str(idx).c_str(), rc);
return rc;
}
} else {
entry.exists = false;
bufferlist new_key_bl;
encode(entry, new_key_bl);
CLS_LOG_BITX(bitx_inst, 20,
"INFO: %s: setting map entry at key=%s",
__func__, escape_str(idx).c_str());
rc = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
if (rc < 0) {
CLS_LOG_BITX(bitx_inst, 1,
"ERROR: %s: unable to set map val, key=%s, rc=%d",
__func__, escape_str(idx).c_str(), rc);
return rc;
}
}
} // CLS_RGW_OP_DEL
else if (op.op == CLS_RGW_OP_ADD) {
Expand Down Expand Up @@ -1921,6 +1910,16 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
olh.set_tag(op.olh_tag);

obj.set_epoch(1);
} else {
const string& olh_tag = olh.get_tag();
if (op.olh_tag != olh_tag) {
if (!olh.pending_removal()) {
CLS_LOG(5, "NOTICE: op.olh_tag (%s) != olh.tag (%s)", op.olh_tag.c_str(), olh_tag.c_str());
return -ECANCELED;
}
/* if pending removal, this is a new olh instance */
olh.set_tag(op.olh_tag);
}
}

if (!olh.start_modify(op.olh_epoch)) {
Expand Down Expand Up @@ -2053,7 +2052,7 @@ static int rgw_bucket_read_olh_log(cls_method_context_t hctx, bufferlist *in, bu
return ret;
}

if (olh_data_entry.tag != op.olh_tag) {
if (olh_data_entry.tag != op.olh_tag && !olh_data_entry.pending_removal) {
CLS_LOG(1, "NOTICE: %s: olh_tag_mismatch olh_data_entry.tag=%s op.olh_tag=%s", __func__, olh_data_entry.tag.c_str(), op.olh_tag.c_str());
return -ECANCELED;
}
Expand Down Expand Up @@ -2160,8 +2159,8 @@ static int rgw_bucket_clear_olh(cls_method_context_t hctx, bufferlist *in, buffe
return ret;
}

if (olh_data_entry.tag != op.olh_tag) {
CLS_LOG(1, "NOTICE: %s: olh_tag_mismatch olh_data_entry.tag=%s op.olh_tag=%s", __func__, olh_data_entry.tag.c_str(), op.olh_tag.c_str());
if (!olh_data_entry.pending_removal) {
CLS_LOG(1, "NOTICE: %s: olh_data_entry.pending_removal=false", __func__);
return -ECANCELED;
}

Expand Down
22 changes: 22 additions & 0 deletions src/common/options/rgw.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -2672,6 +2672,28 @@ options:
with_legacy: true
services:
- rgw
- name: rgw_debug_inject_skip_index_clear_olh
type: bool
level: dev
desc: Whether to inject a condition to skip bucket index clear olh ops
when unlinking a bucket index entry. This is useful to simulate behavior
for testing cases where the op is lost due to a crash or other
abnormal scenario.
default: false
with_legacy: true
services:
- rgw
- name: rgw_debug_inject_skip_index_complete_del
type: bool
level: dev
desc: Whether to inject a condition to skip bucket index complete_del ops
when removing a version bucket object instance. This is useful to simulate
behavior for testing cases where the op is lost due to a crash or other
abnormal scenario.
default: false
with_legacy: true
services:
- rgw
- name: rgw_reshard_batch_size
type: uint
level: advanced
Expand Down
12 changes: 11 additions & 1 deletion src/rgw/driver/rados/rgw_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -726,9 +726,19 @@ static int check_index_unlinked(rgw::sal::RadosStore* const rados_store,
entry.idx << dendl;
continue;
}
if (dir_entry.versioned_epoch != 0 || dir_entry.meta.mtime > not_after) {
if (dir_entry.meta.mtime > not_after) {
continue;
}
const bool has_pending_delete_op = std::any_of(dir_entry.pending_map.begin(),
dir_entry.pending_map.end(),
[&](const auto& pi) {
return pi.second.op == CLS_RGW_OP_DEL &&
pi.second.timestamp < not_after;
});
if (dir_entry.versioned_epoch != 0 && !has_pending_delete_op) {
continue;
}

bool listable;
ret = is_versioned_instance_listable(dpp, bs, dir_entry.key, listable, y);
if (ret < 0) {
Expand Down
85 changes: 50 additions & 35 deletions src/rgw/driver/rados/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5766,14 +5766,14 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
}
}

if (!state->exists) {
target->invalidate_state();
return -ENOENT;
}
bool need_invalidate = !state->exists;
bool exists = state->exists;

r = target->prepare_atomic_modification(dpp, op, false, NULL, NULL, NULL, true, false, y);
if (r < 0)
return r;
if (exists) {
r = target->prepare_atomic_modification(dpp, op, false, NULL, NULL, NULL, true, false, y);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prepare_atomic_modification is protected by a check of exists. Yet I don't see a similar protection below in the corresponding call to complete_atomic_modification. Is that an issue or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double check my analysis here, but it looks to me like the complete_atomic_modification method is cleaning up the tail of an object and it seems that it's possible for the tail to not be cleaned up (if the process is aborted or killed or the request thread is terminated early in some other way). I was thinking that the subsequent attempts to delete the object should again attempt to delete any tail objects, in case some were left behind. It's a good point though, and a scenario that we should probably test to make sure there are no surprises.

if (r < 0)
return r;
}

RGWBucketInfo& bucket_info = target->get_bucket_info();

Expand All @@ -5787,13 +5787,15 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
if (r < 0)
return r;

store->remove_rgw_head_obj(op);

auto& ioctx = ref.ioctx;
r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y);
if (exists) {
store->remove_rgw_head_obj(op);

r = rgw_rados_operate(dpp, ioctx, ref.obj.oid, &op, y);
/* raced with another operation, object state is indeterminate */
need_invalidate = (r == -ECANCELED);
}

/* raced with another operation, object state is indeterminate */
const bool need_invalidate = (r == -ECANCELED);

int64_t poolid = ioctx.get_id();
if (r >= 0) {
Expand All @@ -5802,7 +5804,10 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
tombstone_entry entry{*state};
obj_tombstone_cache->add(obj, entry);
}
r = index_op.complete_del(dpp, poolid, ioctx.get_last_version(), state->mtime, params.remove_objs, y, log_op);

if (!store->ctx()->_conf->rgw_debug_inject_skip_index_complete_del) {
r = index_op.complete_del(dpp, poolid, ioctx.get_last_version(), state->mtime, params.remove_objs, y, log_op);
}

int ret = target->complete_atomic_modification(dpp, y);
if (ret < 0) {
Expand All @@ -5820,6 +5825,9 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
target->invalidate_state();
}

if (!exists) {
return -ENOENT;
}
if (r < 0)
return r;

Expand Down Expand Up @@ -8147,7 +8155,7 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp,

/* update olh object */
r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &op, y);
if (r < 0) {
if (r < 0 && !(r == -ENOENT && need_to_remove)) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ << ": could not apply olh update to oid \"" << ref.obj.oid << "\", r=" << r << dendl;
return r;
}
Expand Down Expand Up @@ -8203,31 +8211,38 @@ int RGWRados::clear_olh(const DoutPrefixProvider *dpp,
if (r < 0) {
return r;
}
map<string, bufferlist> pending_entries;
rgw_filter_attrset(s->attrset, RGW_ATTR_OLH_PENDING_PREFIX, &pending_entries);

map<string, bufferlist> rm_pending_entries;
check_pending_olh_entries(dpp, pending_entries, &rm_pending_entries);

if (!rm_pending_entries.empty()) {
r = remove_olh_pending_entries(dpp, bucket_info, *s, obj, rm_pending_entries, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: rm_pending_entries returned ret=" << r << dendl;
return r;
if (s->exists) {
map<string, bufferlist> pending_entries;
rgw_filter_attrset(s->attrset, RGW_ATTR_OLH_PENDING_PREFIX, &pending_entries);

map<string, bufferlist> rm_pending_entries;
check_pending_olh_entries(dpp, pending_entries, &rm_pending_entries);

if (!rm_pending_entries.empty()) {
r = remove_olh_pending_entries(dpp, bucket_info, *s, obj, rm_pending_entries, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: rm_pending_entries returned ret=" << r << dendl;
return r;
}
}

bufferlist tag_bl;
tag_bl.append(tag.c_str(), tag.length());
rm_op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, tag_bl);
rm_op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_EQ, ver);
cls_obj_check_prefix_exist(rm_op, RGW_ATTR_OLH_PENDING_PREFIX, true); /* fail if found one of these, pending modification */
rm_op.remove();

r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &rm_op, y);
if (r == -ECANCELED) {
return r; /* someone else made a modification in the meantime */
}
}

bufferlist tag_bl;
tag_bl.append(tag.c_str(), tag.length());
rm_op.cmpxattr(RGW_ATTR_OLH_ID_TAG, CEPH_OSD_CMPXATTR_OP_EQ, tag_bl);
rm_op.cmpxattr(RGW_ATTR_OLH_VER, CEPH_OSD_CMPXATTR_OP_EQ, ver);
cls_obj_check_prefix_exist(rm_op, RGW_ATTR_OLH_PENDING_PREFIX, true); /* fail if found one of these, pending modification */
rm_op.remove();

r = rgw_rados_operate(dpp, ref.ioctx, ref.obj.oid, &rm_op, y);
if (r == -ECANCELED) {
return r; /* someone else made a modification in the meantime */
if (cct->_conf->rgw_debug_inject_skip_index_clear_olh) {
return 0;
}

/*
* only clear if was successful, otherwise we might clobber pending operations on this object
*/
Expand Down