diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 5474c16..6fd38d1 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -697,7 +697,8 @@ def subscribe(self, topic, subscription_name, crypto_key_reader=None, replicate_subscription_state_enabled=False, max_pending_chunked_message=10, - auto_ack_oldest_chunked_message_on_queue_full=False + auto_ack_oldest_chunked_message_on_queue_full=False, + start_message_id_inclusive=False ): """ Subscribe to the given topic and subscription combination. @@ -791,6 +792,10 @@ def my_listener(consumer, message): can be guarded by providing the maxPendingChunkedMessage threshold. See setMaxPendingChunkedMessage. Once, consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery. + Default: `False`. + * start_message_id_inclusive: + Set the consumer to include the given position of any reset operation like Consumer::seek. + Default: `False`. """ _check_type(str, subscription_name, 'subscription_name') @@ -810,6 +815,7 @@ def my_listener(consumer, message): _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') _check_type(int, max_pending_chunked_message, 'max_pending_chunked_message') _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full') + _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -838,6 +844,7 @@ def my_listener(consumer, message): conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) conf.max_pending_chunked_message(max_pending_chunked_message) conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full) + conf.start_message_id_inclusive(start_message_id_inclusive) c = Consumer() if isinstance(topic, str): diff --git a/src/config.cc b/src/config.cc index 3a9c14b..d2ed103 100644 --- a/src/config.cc +++ b/src/config.cc @@ -294,7 +294,10 @@ void export_config() { .def("auto_ack_oldest_chunked_message_on_queue_full", &ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull) .def("auto_ack_oldest_chunked_message_on_queue_full", - &ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull, return_self<>()); + &ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull, return_self<>()) + .def("start_message_id_inclusive", &ConsumerConfiguration::isStartMessageIdInclusive) + .def("start_message_id_inclusive",&ConsumerConfiguration::setStartMessageIdInclusive, + return_self<>()); class_("ReaderConfiguration") .def("reader_listener", &ReaderConfiguration_setReaderListener, return_self<>()) diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 71af279..d0f1ba0 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -920,6 +920,37 @@ def test_seek(self): reader.close() client.close() + def test_seek_inclusive(self): + client = Client(self.serviceUrl) + topic = "my-python-topic-seek-inclusive-" + str(time.time()) + consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared, start_message_id_inclusive=True) + producer = client.create_producer(topic) + + for i in range(100): + if i > 0: + time.sleep(0.02) + producer.send(b"hello-%d" % i) + + ids = [] + for i in range(100): + msg = consumer.receive(TM) + self.assertEqual(msg.data(), b"hello-%d" % i) + ids.append(msg.message_id()) + consumer.acknowledge(msg) + + # seek, and after reconnect, expected receive first message. + consumer.seek(MessageId.earliest) + time.sleep(0.5) + msg = consumer.receive(TM) + self.assertEqual(msg.data(), b"hello-0") + + # seek on messageId + consumer.seek(ids[50]) + time.sleep(0.5) + msg = consumer.receive(TM) + self.assertEqual(msg.data(), b"hello-50") + client.close() + def test_v2_topics(self): self._v2_topics(self.serviceUrl)