New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34187][SS] Use available offset range obtained during polling when checking offset validation #31275
Conversation
This comment has been minimized.
This comment has been minimized.
...kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
Test build #134340 has finished for PR 31275 at commit
|
Kubernetes integration test status failure |
@@ -192,6 +197,13 @@ private[consumer] case class FetchedData( | |||
* Returns the next offset to poll after draining the pre-fetched records. | |||
*/ | |||
def offsetAfterPoll: Long = _offsetAfterPoll | |||
|
|||
/** | |||
* Returns the tuple of earliest and latest offsets that is the available offset range when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use @returns
annotation.
Sorry for being late with this, but I've just noticed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I think this just follows other methods above and below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given your assessment, could you make it as a correctness issue, @viirya ?
It looks not correct ...
... drop the record.
...l/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although we don't have a test case, this looks reasonable. I left a few minor comments.
Done. Thanks. |
Thank you for update, @viirya . |
…when checking offset validation ### What changes were proposed in this pull request? This patch uses the available offset range obtained during polling Kafka records to do offset validation check. ### Why are the changes needed? We support non-consecutive offsets for Kafka since 2.4.0. In `fetchRecord`, we do offset validation by checking if the offset is in available offset range. But currently we obtain latest available offset range to do the check. It looks not correct as the available offset range could be changed during the batch, so the available offset range is different than the one when we polling the records from Kafka. It is possible that an offset is valid when polling, but at the time we do the above check, it is out of latest available offset range. We will wrongly consider it as data loss case and fail the query or drop the record. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This should pass existing unit tests. This is hard to have unit test as the Kafka producer and the consumer is asynchronous. Further, we also need to make the offset out of new available offset range. Closes #31275 from viirya/SPARK-34187. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit ab6c0e5) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Merged to master/3.1. Could you make a backport to 3.0/2.4? |
Test build #134414 has finished for PR 31275 at commit
|
Thanks @dongjoon-hyun. Will file backport later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Late LGTM. It took me some time to reproduce it on cluster but it works like charm.
Nice finding, and thanks for the fix! |
…when checking offset validation ### What changes were proposed in this pull request? This patch uses the available offset range obtained during polling Kafka records to do offset validation check. ### Why are the changes needed? We support non-consecutive offsets for Kafka since 2.4.0. In `fetchRecord`, we do offset validation by checking if the offset is in available offset range. But currently we obtain latest available offset range to do the check. It looks not correct as the available offset range could be changed during the batch, so the available offset range is different than the one when we polling the records from Kafka. It is possible that an offset is valid when polling, but at the time we do the above check, it is out of latest available offset range. We will wrongly consider it as data loss case and fail the query or drop the record. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This should pass existing unit tests. This is hard to have unit test as the Kafka producer and the consumer is asynchronous. Further, we also need to make the offset out of new available offset range. Closes apache#31275 from viirya/SPARK-34187. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
This patch uses the available offset range obtained during polling Kafka records to do offset validation check.
Why are the changes needed?
We support non-consecutive offsets for Kafka since 2.4.0. In
fetchRecord
, we do offset validation by checking if the offset is in available offset range. But currently we obtain latest available offset range to do the check. It looks not correct as the available offset range could be changed during the batch, so the available offset range is different than the one when we polling the records from Kafka.It is possible that an offset is valid when polling, but at the time we do the above check, it is out of latest available offset range. We will wrongly consider it as data loss case and fail the query or drop the record.
Does this PR introduce any user-facing change?
No
How was this patch tested?
This should pass existing unit tests.
This is hard to have unit test as the Kafka producer and the consumer is asynchronous. Further, we also need to make the offset out of new available offset range.