New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-1572: Add fixed retries on failure in KafkaCheckpointManager. #420
SAMZA-1572: Add fixed retries on failure in KafkaCheckpointManager. #420
Conversation
@xinyuiscool @sborya @prateekm |
QQ: How much time is 50 retries here? |
@prateekm For some critical jobs, it’s required that unavailability of broker for an hour or so should not bring their job down(which will require fail-over to a backup colo). If required we can reduce this retry duration. |
@shanthoosh That's fine, just add a comment next to the constant 50 that this is ~ 83 min. |
warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception) | ||
if (loop.sleepCount >= MAX_RETRIES_ON_FAILURE) { | ||
error(s"Exhausted failure retries: $MAX_RETRIES_ON_FAILURE when writing checkpoint: $checkpoint for task: $taskName.") | ||
loop.done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need both done and throw?
"Exhausted $MAX_RETRIES retries when ..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the patch in accordance to above comment.
@@ -53,6 +53,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, | |||
checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde, | |||
checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging { | |||
|
|||
var MAX_RETRIES_ON_FAILURE = 50 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this a val. Move to line 152, where the retry loop is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to override retries in test(set to 1)(since it's relevant only to one api, i don't think it's logical to inject it through the constructor).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine to add it to the constructor. That's what they're for - setting up the class' state.
If you really want to keep this as a field, Scala doesn't use the ALL_CAPS convention for naming constants. Change to AllCaps instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to AllCaps.
d88ccb1
to
f5d7a64
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix it and ship it.
@@ -53,6 +53,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, | |||
checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde, | |||
checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging { | |||
|
|||
var maxRetriesOnFailure = 50 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'MaxRetriesOnFailure'. Also, add comment saying that this is ~83 minutes. Otherwise LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
f5d7a64
to
426979a
Compare
KafkaCheckpointManager.writeCheckpoint currently goes into a infinite loop when an irrecoverable failure happens, this indefinitely blocks the commit phase (there by preventing processing). Added finite retries (50), which would retry for fixed time in case of failure before giving up. Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com> Reviewers: Prateek M<prateekm@cs.utexas.edu> Closes apache#420 from shanthoosh/add_fixed_retries_in_kafka_checkpoint_manager
KafkaCheckpointManager.writeCheckpoint currently goes into a infinite loop when an irrecoverable failure happens, this indefinitely blocks the commit phase (there by preventing processing). Added finite retries (50), which would retry for fixed time in case of failure before giving up.