Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34187][SS][3.0] Use available offset range obtained during polling when checking offset validation #31328

Closed
wants to merge 1 commit into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Jan 25, 2021

What changes were proposed in this pull request?

This patch uses the available offset range obtained during polling Kafka records to do offset validation check.

Why are the changes needed?

We support non-consecutive offsets for Kafka since 2.4.0. In fetchRecord, we do offset validation by checking if the offset is in available offset range. But currently we obtain latest available offset range to do the check. It looks not correct as the available offset range could be changed during the batch, so the available offset range is different than the one when we polling the records from Kafka.

It is possible that an offset is valid when polling, but at the time we do the above check, it is out of latest available offset range. We will wrongly consider it as data loss case and fail the query or drop the record.

Does this PR introduce any user-facing change?

No

How was this patch tested?

This should pass existing unit tests.

This is hard to have unit test as the Kafka producer and the consumer is asynchronous. Further, we also need to make the offset out of new available offset range.

@viirya
Copy link
Member Author

viirya commented Jan 25, 2021

cc @dongjoon-hyun

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @viirya .
Merged to branch-3.0.

dongjoon-hyun pushed a commit that referenced this pull request Jan 25, 2021
…ling when checking offset validation

### What changes were proposed in this pull request?

This patch uses the available offset range obtained during polling Kafka records to do offset validation check.

### Why are the changes needed?

We support non-consecutive offsets for Kafka since 2.4.0. In `fetchRecord`, we do offset validation by checking if the offset is in available offset range. But currently we obtain latest available offset range to do the check. It looks not correct as the available offset range could be changed during the batch, so the available offset range is different than the one when we polling the records from Kafka.

It is possible that an offset is valid when polling, but at the time we do the above check, it is out of latest available offset range. We will wrongly consider it as data loss case and fail the query or drop the record.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

This should pass existing unit tests.

This is hard to have unit test as the Kafka producer and the consumer is asynchronous. Further, we also need to make the offset out of new available offset range.

Closes #31328 from viirya/SPARK-34187-3.0.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@SparkQA
Copy link

SparkQA commented Jan 25, 2021

Test build #134458 has finished for PR 31328 at commit 3cd8d78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya viirya deleted the SPARK-34187-3.0 branch December 27, 2023 18:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants