Skip to content

Commit

Permalink
test/rgw/notifications: fix lifecycle tests
Browse files Browse the repository at this point in the history
* tests were passing only because they were not performings their asserts
* tests are now separated with their own attribute
* their topics are now marked "persistent" to workaround the issue in:
  https://tracker.ceph.com/issues/65645

Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
  • Loading branch information
yuvalif committed Apr 24, 2024
1 parent 9bfc669 commit 9e8cc6f
Showing 1 changed file with 17 additions and 37 deletions.
54 changes: 17 additions & 37 deletions src/test/rgw/bucket_notification/test_bn.py
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,7 @@ def test_ps_s3_opaque_data_on_master():
conn.delete_bucket(bucket_name)
http_server.close()

@attr('http_test')
@attr('lifecycle_test')
def test_ps_s3_lifecycle_on_master():
""" test that when object is deleted due to lifecycle policy, notification is sent on master """
hostname = get_ip()
Expand All @@ -1828,7 +1828,7 @@ def test_ps_s3_lifecycle_on_master():

# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
opaque_data = 'http://1.2.3.4:8888'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
topic_arn = topic_conf.set_config()
Expand Down Expand Up @@ -1858,6 +1858,8 @@ def test_ps_s3_lifecycle_on_master():
time_diff = time.time() - start_time
print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')

keys = list(bucket.list())

# create lifecycle policy
client = boto3.client('s3',
endpoint_url='http://'+conn.host+':'+str(conn.port),
Expand All @@ -1878,15 +1880,15 @@ def test_ps_s3_lifecycle_on_master():

# start lifecycle processing
admin(['lc', 'process'], get_config_cluster())
print('wait for 5sec for the messages...')
time.sleep(5)
print('wait for 20s for the lifecycle...')
time.sleep(20)

# check http receiver does not have messages
keys = list(bucket.list())
print('total number of objects: ' + str(len(keys)))
no_keys = list(bucket.list())
wait_for_queue_to_drain(topic_name)
assert_equal(len(no_keys), 0)
event_keys = []
events = http_server.get_and_reset_events()
assert_equal(number_of_objects * 2, len(events))
assert number_of_objects * 2 <= len(events)
for event in events:
assert_in(event['Records'][0]['eventName'],
['LifecycleExpiration:Delete',
Expand All @@ -1895,7 +1897,7 @@ def test_ps_s3_lifecycle_on_master():
for key in keys:
key_found = False
for event_key in event_keys:
if event_key == key:
if event_key == key.name:
key_found = True
break
if not key_found:
Expand All @@ -1922,7 +1924,7 @@ def start_and_abandon_multipart_upload(bucket, key_name, content):
except Exception as e:
print('Error: ' + str(e))

@attr('http_test')
@attr('lifecycle_test')
def test_ps_s3_lifecycle_abort_mpu_on_master():
""" test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """
hostname = get_ip()
Expand All @@ -1933,7 +1935,6 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():
host = get_ip()
port = random.randint(10000, 20000)
# start an http server in a separate thread
number_of_objects = 1
http_server = HTTPServerWithEvents((host, port))

# create bucket
Expand All @@ -1943,7 +1944,7 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():

# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address
endpoint_args = 'push-endpoint='+endpoint_address+'&persistent=true'
opaque_data = 'http://1.2.3.4:8888'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
topic_arn = topic_conf.set_config()
Expand All @@ -1958,19 +1959,14 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():
assert_equal(status/100, 2)

# start and abandon a multpart upload
# create objects in the bucket
obj_prefix = 'ooo'
start_time = time.time()
content = 'bar'

key_name = obj_prefix + str(1)
thr = threading.Thread(target = start_and_abandon_multipart_upload, args=(bucket, key_name, content,))
thr.start()
thr.join()

time_diff = time.time() - start_time
print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')

# create lifecycle policy -- assume rgw_lc_debug_interval=10 is in effect
client = boto3.client('s3',
endpoint_url='http://'+conn.host+':'+str(conn.port),
Expand All @@ -1990,32 +1986,16 @@ def test_ps_s3_lifecycle_abort_mpu_on_master():

# start lifecycle processing
admin(['lc', 'process'], get_config_cluster())
print('wait for 20s (2 days) for the messages...')
print('wait for 20s for the lifecycle...')
time.sleep(20)

# check http receiver does not have messages
keys = list(bucket.list())
print('total number of objects: ' + str(len(keys)))
event_keys = []
wait_for_queue_to_drain(topic_name)
events = http_server.get_and_reset_events()
for event in events:
# I hope Boto doesn't gak on the unknown eventName
assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMultipartUpload')
event_keys.append(event['Records'][0]['s3']['object']['key'])
for key in keys:
key_found = False
for event_key in event_keys:
if event_key == key:
key_found = True
break
if not key_found:
err = 'no lifecycle event found for key: ' + str(key)
log.error(events)
assert False, err
assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMPU')
assert key_name in event['Records'][0]['s3']['object']['key']

# cleanup
for key in keys:
key.delete()
topic_conf.del_config()
s3_notification_conf.del_config(notification=notification_name)
# delete the bucket
Expand Down

0 comments on commit 9e8cc6f

Please sign in to comment.