Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ private[kafka010] class KafkaMicroBatchStream(

private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false)

private var endPartitionOffsets: KafkaSourceOffset = _

private var latestPartitionOffsets: PartitionOffsetMap = _

private var allDataForTriggerAvailableNow: PartitionOffsetMap = _
Expand Down Expand Up @@ -114,7 +112,7 @@ private[kafka010] class KafkaMicroBatchStream(
}

override def reportLatestOffset(): Offset = {
KafkaSourceOffset(latestPartitionOffsets)
Option(KafkaSourceOffset(latestPartitionOffsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
}

override def latestOffset(): Offset = {
Expand Down Expand Up @@ -163,8 +161,7 @@ private[kafka010] class KafkaMicroBatchStream(
}.getOrElse(latestPartitionOffsets)
}

endPartitionOffsets = KafkaSourceOffset(offsets)
endPartitionOffsets
Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
}

/** Checks if we need to skip this trigger based on minOffsetsPerTrigger & maxTriggerDelay */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private[kafka010] class KafkaSource(
kafkaReader.fetchLatestOffsets(currentOffsets)
}

latestPartitionOffsets = Some(latest)
latestPartitionOffsets = if (latest.isEmpty) None else Some(latest)

val limits: Seq[ReadLimit] = limit match {
case rows: CompositeReadLimit => rows.getReadLimits
Expand Down Expand Up @@ -213,7 +213,7 @@ private[kafka010] class KafkaSource(
}
currentPartitionOffsets = Some(offsets)
logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
KafkaSourceOffset(offsets)
Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
}

/** Checks if we need to skip this trigger based on minOffsetsPerTrigger & maxTriggerDelay */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,45 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
)
}

test("SPARK-41375: empty partitions should not record to latest offset") {
val topicPrefix = newTopic()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please set spark.sql.streaming.kafka.useDeprecatedOffsetFetching to true. You can do this with leveraging withSQLConf(...map of explicit config here...) { ...test code here... }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well never mind. We're not reproducing the actual problem here, then it seems sufficient.

val topic = topicPrefix + "-good"
testUtils.createTopic(topic, partitions = 5)
testUtils.sendMessages(topic, Array("-1"))
require(testUtils.getLatestOffsets(Set(topic)).size === 5)

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("kafka.request.timeout.ms", "3000")
.option("kafka.default.api.timeout.ms", "3000")
.option("subscribePattern", s"$topicPrefix-.*")
.option("failOnDataLoss", "false")

val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val mapped = kafka.map(kv => kv._2.toInt + 1)

testStream(mapped)(
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
Assert {
testUtils.deleteTopic(topic)
true
},
AssertOnQuery { q =>
val latestOffset: Option[(Long, OffsetSeq)] = q.offsetLog.getLatest
latestOffset.exists { offset =>
!offset._2.offsets.exists(_.exists(_.json == "{}"))
}
}
)
}

test("subscribe topic by pattern with topic recreation between batches") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-good"
Expand Down