Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PulsarSpout to use consumer.redeliverUnacknowledgedMessages() ? #2210

Closed
jnioche opened this issue Jul 20, 2018 · 3 comments
Closed

PulsarSpout to use consumer.redeliverUnacknowledgedMessages() ? #2210

jnioche opened this issue Jul 20, 2018 · 3 comments
Labels
lifecycle/stale type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/feature The PR added a new feature or issue requested a new feature

Comments

@jnioche
Copy link

jnioche commented Jul 20, 2018

The PulsarSpout keeps track of the pending messages as well as the failed ones in a queue and map. If the JVM containing a PulsarSpout crashes, this information will get lost with it and the pending or failed messages won't get acked in Pulsar but they also won't have a chance to get retried either when a new instance of the spout is started.

Should we add consumer.redeliverUnacknowledgedMessages() to the initialisation of the spout? That way any message which hasn't been acked will get retried?

@merlimat
Copy link
Contributor

@jnioche Yes, that makes total sense. The only reason for this to not use redeliverUnacknowledgedMessages() in the spout was that the method didn't exist when we added the spout :).

Probably it would even be better to use the method at #2211 so that we can precisely control the redelivery.

Should we add consumer.redeliverUnacknowledgedMessages() to the initialisation of the spout?

When subscribing the unacked messages are automatically re-delivered. The only downside of redeliverUnacknowledgedMessages() would be that it triggers the re-delivery of unacked messages.

The most complicated part in the PulsarSpout is the caching of client and consumer instances. This was done to shared resources across threads when users set high parallelism in the topologies.

@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Jul 24, 2018
@merlimat merlimat added this to the 2.2.0-incubating milestone Jul 24, 2018
@jnioche
Copy link
Author

jnioche commented Jul 25, 2018

Probably it would even be better to use the method at #2211 so that we can precisely control the redelivery.

We need both redeliverUnacknowledgedMessages() and #2211. The latter could be used for failed messages but we'd still need to keep track of the number of times it failed and a delay to apply. This won't help for messages which were being processed when the JVM died, which is why we need redeliverUnacknowledgedMessages() at init time.

When subscribing the unacked messages are automatically re-delivered.

I'm confused. Do you mean that this is what it does even without using redeliverUnacknowledgedMessages?

@tisonkun
Copy link
Member

tisonkun commented Dec 7, 2022

Closed as stale and no one worked on it. Please open a new issue if it's still relevant to the maintained versions.

@tisonkun tisonkun closed this as not planned Won't fix, can't repro, duplicate, stale Dec 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lifecycle/stale type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

No branches or pull requests

6 participants