Skip to content

Commit

Permalink
Give compacted ledger some time to update in python test(#2321) (#5448)
Browse files Browse the repository at this point in the history
After compaction completes the compacted ledger is recorded as a
property of a cursor. As persisting the cursor is async and we don't
wait for the acknowledgement of the acknowledgement, there may be a
race if we try to read the compacted ledger immediately.

To avoid this race, I've added a 1 second sleep to allow the compacted
ledger to be updated on the broker.
  • Loading branch information
ivankelly authored and merlimat committed Oct 23, 2019
1 parent 8b7245c commit 5d1529a
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions pulsar-client-cpp/python/pulsar_test.py
Expand Up @@ -22,6 +22,7 @@
from unittest import TestCase, main
import time
import os
import uuid
from pulsar import Client, MessageId, \
CompressionType, ConsumerType, PartitionsRoutingMode, \
AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition
Expand Down Expand Up @@ -608,7 +609,7 @@ def test_reader_argument_errors(self):

def test_publish_compact_and_consume(self):
client = Client(self.serviceUrl)
topic = 'my-python-test_publish_compact_and_consume'
topic = 'compaction_%s' % (uuid.uuid4())
producer = client.create_producer(topic, producer_name='my-producer-name', batching_enabled=False)
self.assertEqual(producer.last_sequence_id(), -1)
consumer = client.subscribe(topic, 'my-sub1', is_read_compacted=True)
Expand All @@ -621,7 +622,7 @@ def test_publish_compact_and_consume(self):
producer.close()

# issue compact command, and wait success
url=self.adminUrl + '/admin/v2/persistent/public/default/my-python-test_publish_compact_and_consume/compaction'
url='%s/admin/v2/persistent/public/default/%s/compaction' % (self.adminUrl, topic)
doHttpPut(url, '')
while True:
s=doHttpGet(url).decode('utf-8')
Expand All @@ -635,6 +636,14 @@ def test_publish_compact_and_consume(self):
print(s)
break

# after compaction completes the compacted ledger is recorded
# as a property of a cursor. As persisting the cursor is async
# and we don't wait for the acknowledgement of the acknowledgement,
# there may be a race if we try to read the compacted ledger immediately.
# therefore wait a second to allow the compacted ledger to be updated on
# the broker.
time.sleep(1.0)

# after compact, consumer with `is_read_compacted=True`, expected read only the second message for same key.
consumer1 = client.subscribe(topic, 'my-sub1', is_read_compacted=True)
msg0 = consumer1.receive()
Expand Down

0 comments on commit 5d1529a

Please sign in to comment.