-
Notifications
You must be signed in to change notification settings - Fork 746
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GOBBLIN-684: Ensure buffered messages are flushed before close() in K… #2556
Conversation
…afkaProducerPusher.
@@ -47,6 +51,9 @@ | |||
private final String topic; | |||
private final KafkaProducer<K, V> producer; | |||
private final Closer closer; | |||
private final Queue<Future<RecordMetadata>> futures = new LinkedBlockingDeque<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment that this queue must have capacity (currently unlimited) of at least MAX_NUM_FUTURES_TO_BUFFER
plus concurrency buffer since a non-blocking offer
is used? Just in prevent someone from inadvertently adding an incorrect limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Added more comments to explain how to set the capacity for the buffer.
// each of the accumulated futures. | ||
if (this.futures.size() >= MAX_NUM_FUTURES_TO_BUFFER) { | ||
flush(MAX_NUM_FUTURES_TO_BUFFER); | ||
this.futures.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't be thread safe since if there are concurrent calls to pushMessages() some unacknowledged futures may be cleared. flush
is already removing with poll
, so I think this can be removed.
Also, this forces a synchronous wait every 1000 messages. Should there be different max buffer and flush amounts to avoid waiting for the newest messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Yes, the clear() is not needed. Also, changed the pushMessages() method to exclude the newest added messages.
* @param numRecordsToFlush | ||
*/ | ||
private void flush(long numRecordsToFlush) { | ||
log.info("Flushing records from producer buffer"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make this log and the one at the end debug level or print it only when numRecordsToFlush == Long.MAX_VALUE
? I think this may flood the log since flush() is called whenever the futures queue has hit the limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
…afkaProducerPusher.
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Currently, when KafkaProducerPusher is closed, it invokes KafkaProducer#close(). However,close() only guarantees delivery of in-flight messages, not the messages in the producer buffer waiting to be sent out. This results in data loss.
The fix ensures that we call flush() before close(). As a result, any buffered messages are immediately pushed out and we block until the messages are acked.
Tests
Tested in real deployment.
Commits