Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[kafka010] class KafkaMicroBatchStream(
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends SupportsAdmissionControl with ReportsSourceMetrics with MicroBatchStream with Logging {
extends SupportsTriggerAvailableNow with ReportsSourceMetrics with MicroBatchStream with Logging {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about SupportsAdmissionControl - I see other APIs specifying both - https://livegrep.dev.databricks.com/view/databricks/runtime/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L57

even if one extends the other, is it better to list them explicitly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is necessary. It is redundant

Copy link
Copy Markdown
Member

@viirya viirya Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private[kafka010] val pollTimeoutMs = options.getLong(
KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
Expand All @@ -81,6 +81,8 @@ private[kafka010] class KafkaMicroBatchStream(

private var latestPartitionOffsets: PartitionOffsetMap = _

private var allDataForTriggerAvailableNow: PartitionOffsetMap = _

/**
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
Expand All @@ -98,7 +100,8 @@ private[kafka010] class KafkaMicroBatchStream(
} else if (minOffsetPerTrigger.isDefined) {
ReadLimit.minRows(minOffsetPerTrigger.get, maxTriggerDelayMs)
} else {
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
// TODO (SPARK-37973) Directly call super.getDefaultReadLimit when scala issue 12523 is fixed
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change? Isn't it better to delegate to the parent if you have no opinion - e.g. the parent could use some config default or whatever.

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous comment seems to be out of date. It's to avoid Scala compiler bug.

While we are here, probably good to leave code comment for the reason, and tag // TODO: to fix it eventually.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about fill a Spark Jira with the detailed description. Then we just need a single line comment here like:

TODO(SPARK-xxxx): directly call super.getDefaultReadLimit when ...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

}
}

Expand All @@ -113,7 +116,13 @@ private[kafka010] class KafkaMicroBatchStream(

override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))

// Use the pre-fetched list of partition offsets when Trigger.AvailableNow is enabled.
latestPartitionOffsets = if (allDataForTriggerAvailableNow != null) {
allDataForTriggerAvailableNow
} else {
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
}

val limits: Seq[ReadLimit] = readLimit match {
case rows: CompositeReadLimit => rows.getReadLimits
Expand Down Expand Up @@ -298,6 +307,11 @@ private[kafka010] class KafkaMicroBatchStream(
logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
}
}

override def prepareForTriggerAvailableNow(): Unit = {
allDataForTriggerAvailableNow = kafkaOffsetReader.fetchLatestOffsets(
Some(getOrCreateInitialPartitionOffsets()))
}
}

object KafkaMicroBatchStream extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[kafka010] class KafkaSource(
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends SupportsAdmissionControl with Source with Logging {
extends SupportsTriggerAvailableNow with Source with Logging {

private val sc = sqlContext.sparkContext

Expand All @@ -99,6 +99,8 @@ private[kafka010] class KafkaSource(

private var lastTriggerMillis = 0L

private var allDataForTriggerAvailableNow: PartitionOffsetMap = _

/**
* Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only
* called in StreamExecutionThread. Otherwise, interrupting a thread while running
Expand Down Expand Up @@ -130,7 +132,8 @@ private[kafka010] class KafkaSource(
} else if (minOffsetPerTrigger.isDefined) {
ReadLimit.minRows(minOffsetPerTrigger.get, maxTriggerDelayMs)
} else {
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit)
// TODO (SPARK-37973) Directly call super.getDefaultReadLimit when scala issue 12523 is fixed
maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable())
}
}

Expand Down Expand Up @@ -159,7 +162,14 @@ private[kafka010] class KafkaSource(
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets
val currentOffsets = currentPartitionOffsets.orElse(Some(initialPartitionOffsets))
val latest = kafkaReader.fetchLatestOffsets(currentOffsets)

// Use the pre-fetched list of partition offsets when Trigger.AvailableNow is enabled.
val latest = if (allDataForTriggerAvailableNow != null) {
allDataForTriggerAvailableNow
} else {
kafkaReader.fetchLatestOffsets(currentOffsets)
}

latestPartitionOffsets = Some(latest)

val limits: Seq[ReadLimit] = limit match {
Expand Down Expand Up @@ -331,6 +341,10 @@ private[kafka010] class KafkaSource(
logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
}
}

override def prepareForTriggerAvailableNow(): Unit = {
allDataForTriggerAvailableNow = kafkaReader.fetchLatestOffsets(Some(initialPartitionOffsets))
}
}

/** Companion object for the [[KafkaSource]]. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.streaming.{StreamingQuery, StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -195,6 +195,45 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
true
}

test("Trigger.AvailableNow") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 5)

testUtils.sendMessages(topic, (0 until 15).map { case x =>
s"foo-$x"
}.toArray, Some(0))

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("maxOffsetsPerTrigger", 5)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()

var index: Int = 0
def startTriggerAvailableNowQuery(): StreamingQuery = {
reader.writeStream
.foreachBatch((_: Dataset[Row], _: Long) => {
index += 1
})
.trigger(Trigger.AvailableNow)
.start()
}

val query = startTriggerAvailableNowQuery()
try {
assert(query.awaitTermination(streamingTimeout.toMillis))
} finally {
query.stop()
}

// should have 3 batches now i.e. 15 / 5 = 3
assert(index == 3)
}

test("(de)serialization of initial offsets") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 5)
Expand Down