New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28875][DSTREAMS][SS][TESTS] Add Task retry tests to make sure new consumer used #25582
Conversation
Test build #109732 has finished for PR 25582 at commit
|
cc @HeartSaVioR to reflect discussion #22138 (comment) |
Test build #109734 has finished for PR 25582 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams, true) | ||
consumer2.release() | ||
|
||
// The first consumer should be removed from cache and new non-cached should be returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say consumer2 should be cached as it's created after invalidation, but here you only address test so that's OK.
@@ -78,7 +78,7 @@ private[kafka010] sealed trait KafkaDataConsumer { | |||
def release(): Unit | |||
|
|||
/** Reference to the internal implementation that this wrapper delegates to */ | |||
protected def internalConsumer: InternalKafkaConsumer | |||
def internalConsumer: InternalKafkaConsumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's technically private[kafka010]
as class scope so seems OK.
Merging to master. |
What changes were proposed in this pull request?
When Task retry happens with Kafka source then it's not known whether the consumer is the issue so the old consumer removed from cache and new consumer created. The feature works fine but not covered with tests.
In this PR I've added such test for DStreams + Structured Streaming.
Why are the changes needed?
No such tests are there.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing + new unit tests.