New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector #25135
Conversation
Test build #107595 has finished for PR 25135 at commit
|
cc @HeartSaVioR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change mostly looks good. Some minors.
Btw, just wondering, if you managed to test with situation delaying updating metadata (finally timed-out), then what would be occurred? Query fails? Or reattempt of relevant tasks happen?
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
Show resolved
Hide resolved
...l/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
Outdated
Show resolved
Hide resolved
Presume you're interested in what is the new behavior. If exception arrives |
Test build #107681 has finished for PR 25135 at commit
|
Ah OK there's already a retry loop. Thanks for the info. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
Show resolved
Hide resolved
LGTM again. |
Test build #107731 has finished for PR 25135 at commit
|
retest this please |
Test build #107732 has finished for PR 25135 at commit
|
Created a quick fix #25171 |
retest this please |
Test build #107735 has finished for PR 25135 at commit
|
Checking the issue... |
Test build #107739 has finished for PR 25135 at commit
|
Little bit explanation to the last change:
So the end judgement is that the actual implementation works but as a side track I'm going to analyze the |
@gaborgsomogyi yeah, I think it's better to ask Kafka guys to clarify how to use this new poll API. It seems it cannot be used directly without retrying. |
@zsxwing the main approach is created in agreement with the Kafka guys. The only thing which bothers me is why producing the following approaches different results (maybe an edge case hit with 0?!).
Fails consistently and no flakyness (worth to mention only couple of tests, so maybe tests are wrongly implemented):
|
It feels me like Maybe it's only me, but I'm not sure I understand why there's no alternative being proposed on widely used patterns. If we can't use |
If that would be true several users would protest with blocker bugs on Kafka side but haven't seen such situation (polling without assignment is not possible). I've seen several heavy users who are doing the polling (and no other magic) with
Anyway, I'm going to have 2 weeks vacation but after that I'm going to sit together with the Kafka guys and having a deeper look at this (again, only couple of Spark tests are failing consistently and it may happen we're using the API not properly). |
Here's a part of test code Kafka has been doing with new poll.
Kafka has still some parts of test code relying on deprecated Sometimes Kafka calls
In many cases of calling I guess in our case we need to either leverage Btw, I'm seeing KIP-288 which proposed new public API |
First of all thanks @HeartSaVioR for the deep look, it helped! @zsxwing I've had another round with the Kafka guys on this and here are the conclusions:
Here is the client log:
|
To resolve this situation I've considered mainly 2 things:
The second approach is more like an overkill from my point of view. Such configuration can be added later if we see the need. |
Test build #108719 has finished for PR 25135 at commit
|
Test build #108720 has finished for PR 25135 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latest changes look good to me.
Maybe we would like to guide that it might be safer to set CONSUMER_POLL_TIMEOUT
to be higher value so that it could count on metadata update as well. Not sure where is the best place to guide so, maybe release note? Let's hear the committers' voices.
private def getPartitions(): ju.Set[TopicPartition] = { | ||
consumer.poll(jt.Duration.ZERO) | ||
var partitions = consumer.assignment() | ||
val startTimeMs = System.currentTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this kind of logic it's better to use System.nanoTime()
which is monotonic. Also you can do a little less computation this way:
val deadline = System.nanoTime() + someTimeout;
while (... && System.nanoTime() < deadline) {
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, since @zsxwing suggested new API usage I would wait here and check the Kafka side.
val startTimeMs = System.currentTimeMillis() | ||
while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { | ||
// Poll to get the latest assigned partitions | ||
consumer.poll(jt.Duration.ofMillis(100)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So using this new API will pull data to driver. Right? The previous poll(0)
is basically a hack to avoid fetching data to driver. Maybe we should ask the Kafka community to add a new API to pull metadata only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. While Kafka doc says the behavior of such hack has been indeterministic and Kafka never support it officially, we expect such behavior in any way.
I've initiated thread to ask about viable alternatives of poll(0)
and possibility of adding public API to update metadata only.
https://lists.apache.org/thread.html/017cf631ef981ab1b494b1249be5c11d7edfe5f4867770a18188ebdc@%3Cdev.kafka.apache.org%3E
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm aware of that but since the doc says:
there is no guarantee that poll(0) won't return records the first time it's called
I've considered poll(0)
usage as design decision and no problem if small amount of data comes. Since you say it is not guaranteed but was working like that all the time the situation is different.
@HeartSaVioR thanks for initiating the discussion and let's see where it goes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you're right that's also true as well. One thing slightly different between two is, we are providing small amount of timeout (not 0) and we don't know how much amount of remaining timeout would be used as polling records (instead of polling metadata). It would be unlikely to be exactly 0 as it would be timed out if it goes below 0.
To make a small extract about the discussion in the Kafka dev channel the proposal where tends to be agreement is the following:
Worth to mention such filtering is happening on the client side in
|
@gaborgsomogyi thanks for coordinating with the Kafka community. I think we can close this PR for now and revisit it when they have a release with the new API. |
@zsxwing yep, not much to do for now. Thanks for your help finding this out. |
@HeartSaVioR gave me the pointer of this PR. |
Sorry to make confusion. Proposal of KIP-396 was accepted in time, but the code seems to be merged after code freeze of Kafka 2.4. (Though it's just 4 days later... I thought it was merged in time.) |
Thank you for updates, @HeartSaVioR . |
@gaborgsomogyi just for understanding, since the upstream Spark now uses Kafka 2.5 client, there is no obstacle to use KafkaConsumer.poll API in Kafka connector? Thanks for this improvement! |
@Tagar In short, it's OK to use the new API in executors (we may need to set longer timeout then), but still not be ideal to use it in driver. It requires Spark to implement the subscription modes by itself with AdminClient API, which is not impossible but brings complexity to match the behavior same as we expect from Kafka client. |
That said, it can be unblocked if we concern about infinite timeout seriously (see the behavior fairly often) and consider as critical one tolerating additional complexity brought in driver side. I'm even OK to go with partial address, fix executors usage first, and leave an issue for driver. This would greatly reduce the chance of infinite timeout - only Kafka consumer in driver can hang. @zsxwing @gaborgsomogyi WDYT? |
@HeartSaVioR I think it makes sense to split up the problem so created sub-tasks and executor side PR is on the way. The driver side requires further attention and discussion w/ Kafka. Re-implementing the subscription in Spark is an overkill even if we would like to avoid infinite wait. We need another way... |
@gaborgsomogyi and @HeartSaVioR thanks. saw the first PR for SPARK-32033 - that's great! |
What changes were proposed in this pull request?
Spark uses an old and deprecated API named
KafkaConsumer.poll(long)
which never returns and stays in live lock if metadata is not updated (for instance when broker disappears at consumer creation). Please see Kafka documentation and standalone test application for further details.In this PR I've applied the new
KafkaConsumer.poll(Duration)
API.How was this patch tested?
Existing unit tests + started Kafka to Kafka query on cluster.