Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously #2559

Closed
wants to merge 1 commit into from

Conversation

StephanEwen
Copy link
Contributor

The offset commit calls to Kafka may occasionally take very long. In that case, the notifyCheckpointComplete() method blocks for long and the KafkaConsumer cannot make progress and cannot perform checkpoints.

This pull request changes the offset committing to use Kafka's commitAsync() method.
It also makes sure that no more than one commit is concurrently in progress, to that commit requests do not pile up.

@StephanEwen
Copy link
Contributor Author

@robert and @tzulitai What is your take on this?

@tzulitai
Copy link
Contributor

tzulitai commented Sep 28, 2016

Just had a look at the API of commitAsync, and it seems like the committed offsets back to Kafka through this API (likewise for commitSync) need to be lastProcessedMessageOffset + 1 (https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback)).

This mainly effects that when starting from group offsets in Kafka, FlinkKafkaConsumer09 will start from the wrong offset. There's a separate JIRA for this bug: FLINK-4618.

Another contributor had already picked up FLINK-4618, so I'd say it's ok to leave this PR as it is. I'll help check on FLINK-4618 progress and make sure it gets merged after this PR.

Minus the above, this looks good to me. +1

@@ -86,6 +90,9 @@
/** Flag to mark the main work loop as alive */
private volatile boolean running = true;

/** Flag indicating whether a commit of offsets to Kafka it currently happening */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it --> "is"?

@tzulitai
Copy link
Contributor

tzulitai commented Sep 28, 2016

Btw, just curious, does 0.8 Kafka connector have the same issue with sync committing? I haven't looked into the code for this, but wondering if we need a ticket for 0.8 too.

@tzulitai
Copy link
Contributor

@StephanEwen I think you've tagged the wrong Github ID for Robert ;)

commitInProgress = false;

if (exception != null) {
LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints", exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception message isn't included in the log warning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, this is actually correct, sorry.

this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
}
else {
LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user sets a relatively short checkpoint interval, will this be flooding log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly yes. But on the other hand, this should be pretty visible if it happens.
I would expect that with proper options to participate in group checkpoint committing, most Flink jobs run without committing to Kafka/ZooKeeper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, makes sense.

@StephanEwen
Copy link
Contributor Author

Thanks @tzulitai for looking at this. I will leave the offset then as it is (fixed via followup) and

The Kafka 0.8 connector needs a similar change. This here is encountered by a user, so I wanted to get the 0.9 fix in faster. Will do a follow-up for Kafka 0.8. Will also correct the issue tag ;-)

I have no good idea how to test this, though, so any thoughts there are welcome!

@tzulitai
Copy link
Contributor

tzulitai commented Sep 28, 2016

@StephanEwen
On a second look, I think the commitSpecificOffsetsToKafka method was designed to commit synchronously in the first place. AbstractFetcher holds a Map of all current pending offsets for committing by checkpointID, and on every notifyCheckpointComplete the offsets are removed from the Map before commitSpecificOffsetsToKafka is called.

So, for async committing, we need to remove cleaning up the offsets in AbstractFetcher#notifyCheckpointComplete() and instead clean them up in a new separate callback handle method added to AbstractFetcher.

I don't think this is super critical though, as normally the async commits shouldn't fail anyways. Perhaps we can properly add this in the follow-up for 0.8.

@tzulitai
Copy link
Contributor

tzulitai commented Sep 28, 2016

Seems like currently only the 0.8 Kafka connector have tests related to offset committing (in Kafka08ITCase).

Following the existing testing code, my two cents for testing this for now is that a IT test for correct offset committing back to Kafka in the 0.9 connector is sufficient (can take a look at Kafka08ITCase#testOffsetInZookeeper(), but replacing ZookeeperOffsetHandler with the new KafkaConsumer methods)?

@StephanEwen
Copy link
Contributor Author

@tzulitai Thanks for thorough review!

I don't understand the problem why the commitSpecificOffsetsToKafka method is designed to commit synchronously. The FlinkKafkaConsumerBase has the pending checkpoints (I think that is what you refer to). It removes the HashMap of "offsets to commit" from the pendingCheckpoints Map synchronously, before even calling the fetcher to commit.
After that, it looks to me like it does not make a difference how that Map "offsets to commit" is used (sync or async)...

@StephanEwen
Copy link
Contributor Author

Actually, just discovered that the problem is different all together.

While the KafkaConsumer is polling for new data (with a timeout), it holds the consumer lock. If no data comes in Kafka, the lock is not released before the poll timeout is over.
During that time, neither a "commitSync" nor "commitAsync" call can be fired off. The notifyCheckpointComplete method hence blocks until the poll timeout is over and the lock is released.

We can fix this by making sure that the consumer is "woken up" to release the lock, and by making sure the lock acquisition is fair, so the committer will get it next.

For the sake of releasing the lock fast in the committer method, it should still be an asynchronous commit.

@StephanEwen
Copy link
Contributor Author

Closing this for #2574

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants