Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAMEL-13841: Allow manual Pulsar message acknowledgments #3094

Conversation

masahoriyama-toast
Copy link
Contributor

@masahoriyama-toast masahoriyama-toast commented Aug 12, 2019

Addresses CAMEL-13841

Motivation

Pulsar messages are currently acknowledged immediately upon consumption. This could lead to lost messages if the application crashes or does not finish its unit of work. In such situations, it may be desirable to leave these messages unacknowledged so that they can be redelivered to another consumer. [Correction March 3, 2020 -- This is not correct. Pulsar messages are acknowledged after successful processing of the route, not immediately after consumption from the topic. Exceptions or errors on the route will correctly cause the message to remain unacknowledged.]

Allow manual control of Pulsar message acknowledgement related routines. The end user can decide when to acknowledge, or negative acknowledge a Pulsar message within the route. This will become a necessity when support for asynchronous processing of Pulsar messages is added.

Modifications

Changed the PulsarMessageListener's received method to not acknowledge the message if allowManualAcknowledgement is true. Instead, we add an instance of PulsarMessageReceipt as a header on the Exchange so that the user can manually acknowledge the message at the appropriate time.

Added allowManualAcknowledgement, ackTimeoutMillis, and ackGroupTimeMillis as URI parameters. The latter two are Pulsar consumer configurations to allow for finer control over acknowledgements.

Added allowManualAcknowledgement also as a component option so that it can default to true if absent in the Pulsar endpoint URI.
Added the pulsarMessageReceiptFactory component option for providing an alternate implementation of MessageReceipt.

Followed the same approach as camel-kafka:
CAMEL-11933

Verifying this change

Added the following test classes:

PulsarConsumerAcknowledgementTest
PulsarConsumerNoAcknowledgementTest
PulsarCustomMessageReceiptTest
PulsarNegativeAcknowledgementTest

And added to:
PulsarComponentTest

Documentation

Added to pulsar-component.adoc

import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.PulsarClientException;

public interface PulsarMessageReceipt {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add javadoc to this interface and its methods

Copy link
Contributor Author

@masahoriyama-toast masahoriyama-toast Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added Javadocs to PulsarMessageReceipt and PulsarMessageReceiptFactory. Please let me know what you think. Thank-you.

@davsclaus
Copy link
Contributor

Looks really good just would be nice with javadoc on that interface that some end users may implement for custom behaviour

@davsclaus davsclaus merged commit 860b654 into apache:master Aug 13, 2019
@masahoriyama-toast masahoriyama-toast deleted the feature/pulsar_manual_acknowledgement branch August 13, 2019 12:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants