-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-2014: Put logic around dropping messages into RetryService, rem… #1605
Conversation
+1 (non-binding). I had talked to @hmcl briefly about this. We should have his review as well |
@hmcl ping. Any input on this? |
* nextRetry = Min(nextRetry, currentTime + maxDelay) | ||
* nextRetry = Min(nextRetry, currentTime + maxDelay). | ||
* | ||
* While retrying a record, no new records are committed until the previous polled records have been acked. This guarantees at once delivery of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment should probably not go here, but rather somewhere in the Spout code. (retry service knows nothing about exactly once or at most once semantics)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I'll move it to findNextCommitOffset in the spout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor suggestions around comments and logging, but I am +1 overall
LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); | ||
msgId.incrementNumFails(); | ||
if (!retryService.schedule(msgId)) { | ||
LOG.debug("Retry service indicated message should not be retried. Message [{}] being marked as acked.", msgId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave the message as it was before. "Reached maximum number of retries. Message [{}] being marked as acked."
It adds overhead and it is irrelevant to be mentioning the retry service here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it because retryService can be any implementation here. It may decide not to schedule a message for other reasons than reaching the retry cap. I'll change it back if you still prefer the previous log message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srdo I am not sure I completely follow your reasoning. The most meaningful piece of information that we want to show to the user here is that if the max number of retries has been reached, no more retries happen. If the user reads a message saying that the retry service marked it not to be retried, he still does not know the cause.
Looking at the code, the boolean schedule(msgId)
only returns false only if the max number of retries has been reached.
Although the retry service log messages hints ant the max retries cap being reached, the user will have to have both logs enabled to have both spouts. I favor the message as it was before, as it is straight to the point and provides good insight on what is happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hmcl I was referring to RetryService being an interface users can override, which doesn't mention max retries in the schedule javadoc, and this part of the code technically not "knowing about" max retries, because users may supply a retry service that doesn't schedule tuples for some other reason. I think you're right though, it was probably clearer before.
@@ -29,14 +29,18 @@ | |||
*/ | |||
public interface KafkaSpoutRetryService extends Serializable { | |||
/** | |||
* Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled. | |||
* Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or | |||
* updates retry time if it has already been scheduled. May also indicate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may also indicate...
I am +1 overall. Please format the commit message to be easy to read and squash the two commits. This is a simple change that should have only one commit. We can merge after that. |
Squashed. I hope this commit message is clearer. |
+1 |
1 similar comment
+1 |
…ove maxRetry setting from new KafkaSpout
https://issues.apache.org/jira/browse/STORM-2014
This PR removes maxRetry from the KafkaSpout and changes the RetryService interface slightly so the schedule method can communicate back to the spout that the message should be dropped. Retry logic belongs to the RetryService interface, and it's nice for users if they can easily plug in their own handling of messages that will be dropped (custom logging for example).