Skip to content

[SPARK-41375][SS] Avoid empty latest KafkaSourceOffset#38898

Closed
wecharyu wants to merge 2 commits intoapache:masterfrom
wecharyu:SPARK-41375
Closed

[SPARK-41375][SS] Avoid empty latest KafkaSourceOffset#38898
wecharyu wants to merge 2 commits intoapache:masterfrom
wecharyu:SPARK-41375

Conversation

@wecharyu
Copy link
Contributor

@wecharyu wecharyu commented Dec 3, 2022

What changes were proposed in this pull request?

Add the empty offset filter in latestOffset() for Kafka Source, so that offset remains unchanged if Kafka provides no topic partition during fetch.

Why are the changes needed?

KafkaOffsetReader may fetch empty partitions in some extreme cases like getting partitions while Kafka cluster is reassigning partitions, this will produce an empty PartitionOffsetMap (although there are topic-partitions being unchanged) and stored in committedOffsets after runBatch().

Then in the next batch, we fetch partitions normally and get the actual offsets, but when fetching data of this batch in KafkaOffsetReaderAdmin#getOffsetRangesFromResolvedOffsets() all partitions in endOffsets will be considered as new partitions since the startOffsets is empty, then these "new partitions" will fetch earliest offsets, which will cause the data duplication.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add an unit test.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@jerrypeng jerrypeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a unit test for this?

@wecharyu
Copy link
Contributor Author

wecharyu commented Dec 6, 2022

Can you write a unit test for this?

It seems a bit difficult to write unit test to cover the case where fetching empty partitions from Kafka cluster, any idea will be appreciated.

@jerrypeng
Copy link
Contributor

@wecharyu can you run one batch and then delete all the partitions?

@jerrypeng
Copy link
Contributor

To avoid the data duplication in the extreme cases where spark fetch empty latest Kafka source offset.

@wecharyu how does an empty latest Kafka source offset cause data duplication?

Copy link
Contributor

@jerrypeng jerrypeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the data duplication in the extreme cases where spark fetch empty latest Kafka source offset.

@wecharyu how does an empty latest Kafka source offset cause data duplication?

@wecharyu
Copy link
Contributor Author

wecharyu commented Dec 8, 2022

@jerrypeng the empty offset will be stored in committedOffsets, when we run next batch, the following code will record an empty map startOffset in newBatchesPlan:

r.copy(startOffset = Some(start), endOffset = Some(end))

Then while fetching partitions, all the partitions are considered as "new partitions" and will fetch the earliest offsets, which will produce dupicate data.
override def getOffsetRangesFromResolvedOffsets(
fromPartitionOffsets: PartitionOffsetMap,
untilPartitionOffsets: PartitionOffsetMap,
reportDataLoss: String => Unit): Seq[KafkaOffsetRange] = {
// Find the new partitions, and get their earliest offsets
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
if (newPartitionInitialOffsets.keySet != newPartitions) {
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
reportDataLoss(
s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed")
}
logInfo(s"Partitions added: $newPartitionInitialOffsets")
newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
reportDataLoss(
s"Added partition $p starts from $o instead of 0. Some data may have been missed")
}

@HeartSaVioR
Copy link
Contributor

Could you please try to summarize the description of JIRA to PR template, especially the part of "Root Cause"? Also, is it "known" issue for Kafka consumer?

Also please note that we changed the default offset fetching mechanism from consumer group assignment from Kafka to active fetch via AdminClient, which won't have such issue.
https://github.com/apache/spark/blob/master/docs/ss-migration-guide.md#upgrading-from-structured-streaming-33-to-34

That said, your test case should turn on spark.sql.streaming.kafka.useDeprecatedOffsetFetching to true to not test with default config.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Dec 8, 2022

I'm trying to understand the case - if my understanding is correct, the new test is just to trigger the same behavior rather than reproducing actual problem, right? In the new test, recognizing all topic partitions as new one and process all records in next microbatch is arguably NOT a wrong behavior for me, hence I really would like to understand the actual problem.

According to the JIRA description, the actual problem is that Kafka can "transiently" give no topic partition as assignment when it performs reassignment among consumers, specifically here:

      consumer.poll(0)
      val partitions = consumer.assignment()

which we expect Kafka to assign topic partitions to this consumer accordingly after calling poll.

Do I understand correctly?

(If I'm on the right track, the fix helps more for queries which failOnDataLoss is turned "on". Previously the query will just fail with surprising and incorrect error message - it's correct from Spark's point of view though - and after this fix the query won't fail.)

}

test("SPARK-41375: empty partitions should not record to latest offset") {
val topicPrefix = newTopic()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please set spark.sql.streaming.kafka.useDeprecatedOffsetFetching to true. You can do this with leveraging withSQLConf(...map of explicit config here...) { ...test code here... }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well never mind. We're not reproducing the actual problem here, then it seems sufficient.

@wecharyu
Copy link
Contributor Author

wecharyu commented Dec 8, 2022

@HeartSaVioR yes you are right, the actual problem is that we may fetch empty partitions unexpectedly in one batch, and in the next batch we fetch the real partitions again. The new test is just used to mock the empty partitions, but it also make sense to not record the empty offset for the empty partitions.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Nice fix!

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Dec 8, 2022

Thanks! Merging to master/3.3.

HeartSaVioR pushed a commit that referenced this pull request Dec 8, 2022
### What changes were proposed in this pull request?

Add the empty offset filter in `latestOffset()` for Kafka Source, so that offset remains unchanged if Kafka provides no topic partition during fetch.

### Why are the changes needed?

KafkaOffsetReader may fetch empty partitions in some extreme cases like getting partitions while Kafka cluster is reassigning partitions, this will produce an empty `PartitionOffsetMap` (although there are topic-partitions being unchanged) and stored in `committedOffsets` after `runBatch()`.

Then in the next batch, we fetch partitions normally and get the actual offsets, but when fetching data of this batch in `KafkaOffsetReaderAdmin#getOffsetRangesFromResolvedOffsets()` all partitions in endOffsets will be considered as new partitions since the startOffsets is empty, then these "new partitions" will fetch earliest offsets, which will cause the data duplication.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add an unit test.

Closes #38898 from wecharyu/SPARK-41375.

Authored-by: wecharyu <yuwq1996@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 043475a)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@wecharyu
Copy link
Contributor Author

wecharyu commented Dec 8, 2022

@jerrypeng @HeartSaVioR thanks for your review!

beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
### What changes were proposed in this pull request?

Add the empty offset filter in `latestOffset()` for Kafka Source, so that offset remains unchanged if Kafka provides no topic partition during fetch.

### Why are the changes needed?

KafkaOffsetReader may fetch empty partitions in some extreme cases like getting partitions while Kafka cluster is reassigning partitions, this will produce an empty `PartitionOffsetMap` (although there are topic-partitions being unchanged) and stored in `committedOffsets` after `runBatch()`.

Then in the next batch, we fetch partitions normally and get the actual offsets, but when fetching data of this batch in `KafkaOffsetReaderAdmin#getOffsetRangesFromResolvedOffsets()` all partitions in endOffsets will be considered as new partitions since the startOffsets is empty, then these "new partitions" will fetch earliest offsets, which will cause the data duplication.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add an unit test.

Closes apache#38898 from wecharyu/SPARK-41375.

Authored-by: wecharyu <yuwq1996@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants

Comments