Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mimic: multisite: bucket full sync does not handle delete markers #26194

Merged
merged 6 commits into from May 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/rgw/rgw_data_sync.cc
Expand Up @@ -2342,6 +2342,16 @@ struct bucket_list_entry {
JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
}

RGWModifyOp get_modify_op() const {
if (delete_marker) {
return CLS_RGW_OP_LINK_OLH_DM;
} else if (!key.instance.empty() && key.instance != "null") {
return CLS_RGW_OP_LINK_OLH;
} else {
return CLS_RGW_OP_ADD;
}
}
};

struct bucket_list_result {
Expand Down Expand Up @@ -2744,7 +2754,6 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
RGWBucketFullSyncShardMarkerTrack marker_tracker;
rgw_obj_key list_marker;
bucket_list_entry *entry{nullptr};
RGWModifyOp op{CLS_RGW_OP_ADD};

int total_entries{0};

Expand Down Expand Up @@ -2818,12 +2827,11 @@ int RGWBucketShardFullSyncCR::operate()
if (!marker_tracker.start(entry->key, total_entries, real_time())) {
tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
} else {
op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
false, /* versioned, only matters for object removal */
entry->versioned_epoch, entry->mtime,
entry->owner, op, CLS_RGW_STATE_COMPLETE,
entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
entry->key, &marker_tracker, zones_trace, tn),
false);
}
Expand Down
59 changes: 51 additions & 8 deletions src/test/rgw/rgw_multi/tests.py
Expand Up @@ -675,22 +675,19 @@ def test_versioned_object_incremental_sync():
k = new_key(zone_conn, bucket, obj)

k.set_contents_from_string('version1')
v = get_latest_object_version(k)
log.debug('version1 id=%s', v.version_id)
log.debug('version1 id=%s', k.version_id)
# don't delete version1 - this tests that the initial version
# doesn't get squashed into later versions

# create and delete the following object versions to test that
# the operations don't race with each other during sync
k.set_contents_from_string('version2')
v = get_latest_object_version(k)
log.debug('version2 id=%s', v.version_id)
k.bucket.delete_key(obj, version_id=v.version_id)
log.debug('version2 id=%s', k.version_id)
k.bucket.delete_key(obj, version_id=k.version_id)

k.set_contents_from_string('version3')
v = get_latest_object_version(k)
log.debug('version3 id=%s', v.version_id)
k.bucket.delete_key(obj, version_id=v.version_id)
log.debug('version3 id=%s', k.version_id)
k.bucket.delete_key(obj, version_id=k.version_id)

for _, bucket in zone_bucket:
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
Expand All @@ -706,6 +703,52 @@ def test_versioned_object_incremental_sync():
for _, bucket in zone_bucket:
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)

def test_delete_marker_full_sync():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)

# enable versioning
for _, bucket in zone_bucket:
bucket.configure_versioning(True)
zonegroup_meta_checkpoint(zonegroup)

for zone, bucket in zone_bucket:
# upload an initial object
key1 = new_key(zone, bucket, 'obj')
key1.set_contents_from_string('')

# create a delete marker
key2 = new_key(zone, bucket, 'obj')
key2.delete()

# wait for full sync
for _, bucket in zone_bucket:
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)

def test_suspended_delete_marker_full_sync():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)
buckets, zone_bucket = create_bucket_per_zone(zonegroup_conns)

# enable/suspend versioning
for _, bucket in zone_bucket:
bucket.configure_versioning(True)
bucket.configure_versioning(False)
zonegroup_meta_checkpoint(zonegroup)

for zone, bucket in zone_bucket:
# upload an initial object
key1 = new_key(zone, bucket, 'obj')
key1.set_contents_from_string('')

# create a delete marker
key2 = new_key(zone, bucket, 'obj')
key2.delete()

# wait for full sync
for _, bucket in zone_bucket:
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)

def test_bucket_versioning():
buckets, zone_bucket = create_bucket_per_zone_in_realm()
Expand Down
42 changes: 31 additions & 11 deletions src/test/rgw/rgw_multi/zone_rados.py
@@ -1,4 +1,5 @@
import logging
from boto.s3.deletemarker import DeleteMarker

try:
from itertools import izip_longest as zip_longest
Expand All @@ -16,6 +17,13 @@ def check_object_eq(k1, k2, check_extra = True):
assert k2
log.debug('comparing key name=%s', k1.name)
eq(k1.name, k2.name)
eq(k1.version_id, k2.version_id)
eq(k1.is_latest, k2.is_latest)
eq(k1.last_modified, k2.last_modified)
if isinstance(k1, DeleteMarker):
assert isinstance(k2, DeleteMarker)
return

eq(k1.get_contents_as_string(), k2.get_contents_as_string())
eq(k1.metadata, k2.metadata)
eq(k1.cache_control, k2.cache_control)
Expand All @@ -24,16 +32,13 @@ def check_object_eq(k1, k2, check_extra = True):
eq(k1.content_disposition, k2.content_disposition)
eq(k1.content_language, k2.content_language)
eq(k1.etag, k2.etag)
eq(k1.last_modified, k2.last_modified)
if check_extra:
eq(k1.owner.id, k2.owner.id)
eq(k1.owner.display_name, k2.owner.display_name)
eq(k1.storage_class, k2.storage_class)
eq(k1.size, k2.size)
eq(k1.version_id, k2.version_id)
eq(k1.encrypted, k2.encrypted)


class RadosZone(Zone):
def __init__(self, name, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
super(RadosZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
Expand All @@ -57,14 +62,17 @@ def check_bucket_eq(self, zone_conn, bucket_name):
b1 = self.get_bucket(bucket_name)
b2 = zone_conn.get_bucket(bucket_name)

b1_versions = b1.list_versions()
log.debug('bucket1 objects:')
for o in b1.get_all_versions():
for o in b1_versions:
log.debug('o=%s', o.name)

b2_versions = b2.list_versions()
log.debug('bucket2 objects:')
for o in b2.get_all_versions():
for o in b2_versions:
log.debug('o=%s', o.name)

for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
for k1, k2 in zip_longest(b1_versions, b2_versions):
if k1 is None:
log.critical('key=%s is missing from zone=%s', k2.name, self.name)
assert False
Expand All @@ -74,11 +82,23 @@ def check_bucket_eq(self, zone_conn, bucket_name):

check_object_eq(k1, k2)

# now get the keys through a HEAD operation, verify that the available data is the same
k1_head = b1.get_key(k1.name)
k2_head = b2.get_key(k2.name)

check_object_eq(k1_head, k2_head, False)
if isinstance(k1, DeleteMarker):
# verify that HEAD sees a delete marker
assert b1.get_key(k1.name) is None
assert b2.get_key(k2.name) is None
else:
# now get the keys through a HEAD operation, verify that the available data is the same
k1_head = b1.get_key(k1.name, version_id=k1.version_id)
k2_head = b2.get_key(k2.name, version_id=k2.version_id)
check_object_eq(k1_head, k2_head, False)

if k1.version_id:
# compare the olh to make sure they agree about the current version
k1_olh = b1.get_key(k1.name)
k2_olh = b2.get_key(k2.name)
# if there's a delete marker, HEAD will return None
if k1_olh or k2_olh:
check_object_eq(k1_olh, k2_olh, False)

log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)

Expand Down