Skip to content

Commit

Permalink
Make acknowledge APIs synchronous and improve the documents (#121)
Browse files Browse the repository at this point in the history
Fixes #114

### Motivation

Currently the `acknowledge` and `acknowledge_cumulative` methods are all
asynchronous. Even if any error happened, no exception would be raised.
For example, when acknowledging cumulatively on a consumer whose
consumer type is Shared or KeyShared, no error happens.

### Modifications

- Change these methods to synchronous and raise exceptions if the
  acknowledgment failed.
- Add `PulsarTest.test_acknowledge_failed` to test these failed cases.
- Improve the documents to describe which exceptions could be raised in
  which cases.
  • Loading branch information
BewareMyPower committed May 24, 2023
1 parent 87a3506 commit 0028893
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 9 deletions.
10 changes: 10 additions & 0 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,11 @@ def acknowledge(self, message):
message:
The received message or message id.
Raises
------
OperationNotSupported
if `message` is not allowed to acknowledge
"""
if isinstance(message, Message):
self._consumer.acknowledge(message._message)
Expand All @@ -1324,6 +1329,11 @@ def acknowledge_cumulative(self, message):
message:
The received message or message id.
Raises
------
CumulativeAcknowledgementNotAllowedError
if the consumer type is ConsumerType.KeyShared or ConsumerType.Shared
"""
if isinstance(message, Message):
self._consumer.acknowledge_cumulative(message._message)
Expand Down
17 changes: 8 additions & 9 deletions src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ Messages Consumer_batch_receive(Consumer& consumer) {
return msgs;
}

void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }
void Consumer_acknowledge(Consumer& consumer, const Message& msg) {
waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msg, callback); });
}

void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
Py_BEGIN_ALLOW_THREADS consumer.acknowledgeAsync(msgId, nullptr);
Py_END_ALLOW_THREADS
waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msgId, callback); });
}

void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
Expand All @@ -63,18 +64,16 @@ void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
}

void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msgId);
Py_END_ALLOW_THREADS
waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msgId, callback); });
}

void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msg, nullptr);
Py_END_ALLOW_THREADS
waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeCumulativeAsync(msg, callback); });
}

void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msgId, nullptr);
Py_END_ALLOW_THREADS
waitForAsyncResult(
[&](ResultCallback callback) { consumer.acknowledgeCumulativeAsync(msgId, callback); });
}

void Consumer_close(Consumer& consumer) {
Expand Down
23 changes: 23 additions & 0 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,29 @@ def send_callback(res, msg):
producer.flush()
client.close()

def test_acknowledge_failed(self):
client = Client(self.serviceUrl)
topic = 'test_acknowledge_failed'
producer = client.create_producer(topic)
consumer1 = client.subscribe(topic, 'sub1', consumer_type=ConsumerType.Shared)
consumer2 = client.subscribe(topic, 'sub2', consumer_type=ConsumerType.KeyShared)
msg_id = producer.send('hello'.encode())
msg1 = consumer1.receive()
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer1.acknowledge_cumulative(msg1)
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer1.acknowledge_cumulative(msg1.message_id())
msg2 = consumer2.receive()
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer2.acknowledge_cumulative(msg2)
with self.assertRaises(pulsar.CumulativeAcknowledgementNotAllowedError):
consumer2.acknowledge_cumulative(msg2.message_id())
consumer = client.subscribe([topic, topic + '-another'], 'sub')
# The message id does not have a topic name
with self.assertRaises(pulsar.OperationNotSupported):
consumer.acknowledge(msg_id)
client.close()


if __name__ == "__main__":
main()

0 comments on commit 0028893

Please sign in to comment.