From 5d1529a745b295283fd84d5930fac0c3092ef036 Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Wed, 23 Oct 2019 18:36:30 +0200 Subject: [PATCH] Give compacted ledger some time to update in python test(#2321) (#5448) After compaction completes the compacted ledger is recorded as a property of a cursor. As persisting the cursor is async and we don't wait for the acknowledgement of the acknowledgement, there may be a race if we try to read the compacted ledger immediately. To avoid this race, I've added a 1 second sleep to allow the compacted ledger to be updated on the broker. --- pulsar-client-cpp/python/pulsar_test.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 36d7fca3a0b7d..97aca3c69006c 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -22,6 +22,7 @@ from unittest import TestCase, main import time import os +import uuid from pulsar import Client, MessageId, \ CompressionType, ConsumerType, PartitionsRoutingMode, \ AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition @@ -608,7 +609,7 @@ def test_reader_argument_errors(self): def test_publish_compact_and_consume(self): client = Client(self.serviceUrl) - topic = 'my-python-test_publish_compact_and_consume' + topic = 'compaction_%s' % (uuid.uuid4()) producer = client.create_producer(topic, producer_name='my-producer-name', batching_enabled=False) self.assertEqual(producer.last_sequence_id(), -1) consumer = client.subscribe(topic, 'my-sub1', is_read_compacted=True) @@ -621,7 +622,7 @@ def test_publish_compact_and_consume(self): producer.close() # issue compact command, and wait success - url=self.adminUrl + '/admin/v2/persistent/public/default/my-python-test_publish_compact_and_consume/compaction' + url='%s/admin/v2/persistent/public/default/%s/compaction' % (self.adminUrl, topic) doHttpPut(url, '') while True: s=doHttpGet(url).decode('utf-8') @@ -635,6 +636,14 @@ def test_publish_compact_and_consume(self): print(s) break + # after compaction completes the compacted ledger is recorded + # as a property of a cursor. As persisting the cursor is async + # and we don't wait for the acknowledgement of the acknowledgement, + # there may be a race if we try to read the compacted ledger immediately. + # therefore wait a second to allow the compacted ledger to be updated on + # the broker. + time.sleep(1.0) + # after compact, consumer with `is_read_compacted=True`, expected read only the second message for same key. consumer1 = client.subscribe(topic, 'my-sub1', is_read_compacted=True) msg0 = consumer1.receive()