From 6843072f389bf85da04dba32a9084ea97a8ba8d6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 24 May 2023 20:55:27 +0800 Subject: [PATCH] Make acknowledge APIs synchronous and improve the documents Fixes https://github.com/apache/pulsar-client-python/issues/114 ### Motivation Currently the `acknowledge` and `acknowledge_cumulative` methods are all asynchronous. Even if any error happened, no exception would be raised. For example, when acknowledging cumulatively on a consumer whose consumer type is Shared or KeyShared, no error happens. ### Modifications - Change these methods to synchronous and raise exceptions if the acknowledgment failed. - Add `PulsarTest.test_acknowledge_failed` to test these failed cases. - Improve the documents to describe which exceptions could be raised in which cases. --- pulsar/__init__.py | 10 ++++++++++ src/consumer.cc | 17 ++++++++--------- tests/pulsar_test.py | 23 +++++++++++++++++++++++ 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index f7c05e2..c85c6e3 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1305,6 +1305,11 @@ def acknowledge(self, message): message: The received message or message id. + + Raises + ------ + OperationNotSupported + if `message` is not allowed to acknowledge """ if isinstance(message, Message): self._consumer.acknowledge(message._message) @@ -1324,6 +1329,11 @@ def acknowledge_cumulative(self, message): message: The received message or message id. + + Raises + ------ + CumulativeAcknowledgementNotAllowedError + if the consumer type is ConsumerType.KeyShared or ConsumerType.Shared """ if isinstance(message, Message): self._consumer.acknowledge_cumulative(message._message) diff --git a/src/consumer.cc b/src/consumer.cc index 4b44775..67d2daa 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -50,11 +50,12 @@ Messages Consumer_batch_receive(Consumer& consumer) { return msgs; } -void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); } +void Consumer_acknowledge(Consumer& consumer, const Message& msg) { + waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msg, callback); }); +} void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { - Py_BEGIN_ALLOW_THREADS consumer.acknowledgeAsync(msgId, nullptr); - Py_END_ALLOW_THREADS + waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msgId, callback); }); } void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) { @@ -63,18 +64,16 @@ void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) { } void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { - Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msgId); - Py_END_ALLOW_THREADS + waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msgId, callback); }); } void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) { - Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msg, nullptr); - Py_END_ALLOW_THREADS + waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeCumulativeAsync(msg, callback); }); } void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) { - Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msgId, nullptr); - Py_END_ALLOW_THREADS + waitForAsyncResult( + [&](ResultCallback callback) { consumer.acknowledgeCumulativeAsync(msgId, callback); }); } void Consumer_close(Consumer& consumer) { diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 00e2466..eeb2a6a 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -1437,6 +1437,29 @@ def send_callback(res, msg): producer.flush() client.close() + def test_acknowledge_failed(self): + client = Client(self.serviceUrl) + topic = 'test_acknowledge_failed' + producer = client.create_producer(topic) + consumer1 = client.subscribe(topic, 'sub1', consumer_type=ConsumerType.Shared) + consumer2 = client.subscribe(topic, 'sub2', consumer_type=ConsumerType.KeyShared) + msg_id = producer.send('hello'.encode()) + msg1 = consumer1.receive() + with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError): + consumer1.acknowledge_cumulative(msg1) + with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError): + consumer1.acknowledge_cumulative(msg1.message_id()) + msg2 = consumer2.receive() + with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError): + consumer2.acknowledge_cumulative(msg2) + with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError): + consumer2.acknowledge_cumulative(msg2.message_id()) + consumer = client.subscribe([topic, topic + '-another'], 'sub') + # The message id does not have a topic name + with self.assertRaises(pulsar.OperationNotSupported): + consumer.acknowledge(msg_id) + client.close() + if __name__ == "__main__": main()