New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-4027] Flush FlinkKafkaProducer on checkpoints #2108
Conversation
@@ -51,10 +54,11 @@ | |||
* Flink Sink to produce data into a Kafka topic. | |||
* | |||
* Please note that this producer does not have any reliability guarantees. | |||
* The producer implements the checkpointed interface for allowing synchronization on checkpoints. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May want to change:
note that this producer does not have any reliability guarantees.
to
note that this producer provides at-least-once reliability guarantees when checkpoints are enabled.
Thank you for the review @eliaslevy! |
protected void flush() { | ||
// The Kafka 0.8 producer doesn't support flushing, therefore, we are using an inefficient | ||
// busy wait approach | ||
while(pendingRecords > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing space after while
👍 |
* Please note that this producer does not have any reliability guarantees. | ||
* Please note that this producer provides at-least-once reliability guarantees when | ||
* checkpoints are enabled and setFlushOnCheckpoint(true) is set. | ||
* Otherwise, the producer doesn't provide any reliability guarantees. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to completely remove the old behaviour and always enable flush on checkpoint? I'm wondering, because who would like to use a KafkaProducer with not processing guarantees?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reasoning here was that we first provide this as an optional feature to those users who know what they are doing / what they need to give the feature exposure.
I want to be certain that it works in all environments before we activate it by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, this makes totally sense
Good work @rmetzger. Well documented code and a good idea to solve the problem. I had some comments concerning concurrent accesses to |
Thank you for your review @tillrohrmann and @zentol . I tried addressing all your concerns. |
threadB.join(500); | ||
} | ||
Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive()); | ||
if (runnableError.f0 != null) { | ||
runnableError.f0.printStackTrace(); | ||
Assert.fail("Error from thread B: " + runnableError.f0 ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Printing the stack trace to stdout is imo not so good. The problem is that the stack trace will be intermingled with the rest of the testing log output. I think it's better to simply rethrow the Throwable
here.
Changes look good to me @rmetzger :-) I had only some minor comments. +1 for merging after addressing the comments. |
A user on the mailing list raised the point that our Kafka producer can be made at-least-once quite easily.
The current producer code doesn't have any guarantees
We are using the producer's callbacks to account for unacknowledged records. When a checkpoint barrier reaches the sink, it will confirm the checkpoint once all pending records have been acked.