From efebe3c7f21031352b5fe1d32245a087372cba69 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Mon, 17 Jan 2022 13:18:06 -0800 Subject: [PATCH 1/8] [SPARK-36649] - Support Trigger.AvailableNow on Kafka data source --- .../spark/sql/kafka010/KafkaSource.scala | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 87cef02d0d8f2..1a3a03b2a4b82 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -77,7 +77,10 @@ private[kafka010] class KafkaSource( metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) - extends SupportsAdmissionControl with Source with Logging { + extends SupportsAdmissionControl + with SupportsTriggerAvailableNow + with Source + with Logging { private val sc = sqlContext.sparkContext @@ -99,6 +102,8 @@ private[kafka010] class KafkaSource( private var lastTriggerMillis = 0L + private var allDataForTriggerAvailableNow: PartitionOffsetMap = _ + /** * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only * called in StreamExecutionThread. Otherwise, interrupting a thread while running @@ -159,7 +164,14 @@ private[kafka010] class KafkaSource( // Make sure initialPartitionOffsets is initialized initialPartitionOffsets val currentOffsets = currentPartitionOffsets.orElse(Some(initialPartitionOffsets)) - val latest = kafkaReader.fetchLatestOffsets(currentOffsets) + + // Use the pre-fetched list of partition offsets when Trigger.AvailableNow is enabled. + val latest = if (allDataForTriggerAvailableNow != null) { + allDataForTriggerAvailableNow + } else { + kafkaReader.fetchLatestOffsets(currentOffsets) + } + latestPartitionOffsets = Some(latest) val limits: Seq[ReadLimit] = limit match { @@ -331,6 +343,10 @@ private[kafka010] class KafkaSource( logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") } } + + override def prepareForTriggerAvailableNow(): Unit = { + allDataForTriggerAvailableNow = kafkaReader.fetchLatestOffsets(Some(initialPartitionOffsets)) + } } /** Companion object for the [[KafkaSource]]. */ From a13e3ea5428e5bdf7f597d35e7a2609a6820299e Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Mon, 17 Jan 2022 13:41:13 -0800 Subject: [PATCH 2/8] add test --- .../kafka010/KafkaMicroBatchSourceSuite.scala | 46 ++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index f61696f6485e6..adfe9da97af19 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.streaming.{StreamingQuery, StreamTest, Trigger} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -195,6 +195,50 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { true } + test("Trigger.AvailableNow") { + + println("start") + + val topic = newTopic() + testUtils.createTopic(topic, partitions = 5) + + testUtils.sendMessages(topic, (0 until 15).map(x => { + "foo-" + x + }).toArray, Some(0)) + + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 10) + .option("subscribe", topic) + .option("startingOffsets", "earliest").load() + + def startTriggerAvailableNowQuery(): StreamingQuery = { + reader.writeStream + .foreachBatch((df: Dataset[Row], i: Long) => { + println("i: " + i) + df.printSchema() + println(df.getRows(5, 0)) + df.foreach((f: Row) => { + println("e: " + new String(f.getAs("value").asInstanceOf[Array[Byte]])) + }) + }) + .trigger(Trigger.AvailableNow) + .trigger(Trigger.AvailableNow()) + .start() + } + + val q2 = startTriggerAvailableNowQuery() + try { + assert(q2.awaitTermination(streamingTimeout.toMillis)) + } finally { + q2.stop() + } + } + test("(de)serialization of initial offsets") { val topic = newTopic() testUtils.createTopic(topic, partitions = 5) From e542b044501210fec5fd2f7f375e924d4e10f75f Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Tue, 18 Jan 2022 00:10:44 -0800 Subject: [PATCH 3/8] adding --- .../spark/sql/kafka010/KafkaSource.scala | 5 ++-- .../kafka010/KafkaMicroBatchSourceSuite.scala | 25 ++++++++----------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 1a3a03b2a4b82..839acd41fd40d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -77,8 +77,7 @@ private[kafka010] class KafkaSource( metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) - extends SupportsAdmissionControl - with SupportsTriggerAvailableNow + extends SupportsTriggerAvailableNow with Source with Logging { @@ -135,7 +134,7 @@ private[kafka010] class KafkaSource( } else if (minOffsetPerTrigger.isDefined) { ReadLimit.minRows(minOffsetPerTrigger.get, maxTriggerDelayMs) } else { - maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit) + maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable() ) } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index adfe9da97af19..5db99c9133db0 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -196,9 +196,6 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } test("Trigger.AvailableNow") { - - println("start") - val topic = newTopic() testUtils.createTopic(topic, partitions = 5) @@ -206,37 +203,37 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { "foo-" + x }).toArray, Some(0)) - val reader = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") - .option("maxOffsetsPerTrigger", 10) + .option("maxOffsetsPerTrigger", 5) .option("subscribe", topic) - .option("startingOffsets", "earliest").load() + .option("startingOffsets", "earliest") + .load() + var index: Int = 0 def startTriggerAvailableNowQuery(): StreamingQuery = { reader.writeStream .foreachBatch((df: Dataset[Row], i: Long) => { - println("i: " + i) - df.printSchema() - println(df.getRows(5, 0)) + index+=1 df.foreach((f: Row) => { - println("e: " + new String(f.getAs("value").asInstanceOf[Array[Byte]])) }) }) .trigger(Trigger.AvailableNow) - .trigger(Trigger.AvailableNow()) .start() } - val q2 = startTriggerAvailableNowQuery() + val q1 = startTriggerAvailableNowQuery() try { - assert(q2.awaitTermination(streamingTimeout.toMillis)) + assert(q1.awaitTermination(streamingTimeout.toMillis)) } finally { - q2.stop() + q1.stop() } + + // should have 3 batches now i.e. 15 / 5 = 3 + assert(index == 3) } test("(de)serialization of initial offsets") { From 6881d740aed426ffb04ce57fe52d0ba9a0b6aeed Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Tue, 18 Jan 2022 00:18:50 -0800 Subject: [PATCH 4/8] updating test --- .../apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 5db99c9133db0..c0e0ec11243ef 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -218,8 +218,6 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { reader.writeStream .foreachBatch((df: Dataset[Row], i: Long) => { index+=1 - df.foreach((f: Row) => { - }) }) .trigger(Trigger.AvailableNow) .start() From fc18b1bd371523ee1733371bbfbc8968c26c52ee Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Tue, 18 Jan 2022 12:15:03 -0800 Subject: [PATCH 5/8] fixing test --- .../sql/kafka010/KafkaMicroBatchStream.scala | 19 ++++++++++++++++--- .../spark/sql/kafka010/KafkaSource.scala | 2 +- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 3b73896d631c6..99fd7c656b60e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -57,7 +57,7 @@ private[kafka010] class KafkaMicroBatchStream( metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) - extends SupportsAdmissionControl with ReportsSourceMetrics with MicroBatchStream with Logging { + extends SupportsTriggerAvailableNow with ReportsSourceMetrics with MicroBatchStream with Logging { private[kafka010] val pollTimeoutMs = options.getLong( KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, @@ -81,6 +81,8 @@ private[kafka010] class KafkaMicroBatchStream( private var latestPartitionOffsets: PartitionOffsetMap = _ + private var allDataForTriggerAvailableNow: PartitionOffsetMap = _ + /** * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only * called in StreamExecutionThread. Otherwise, interrupting a thread while running @@ -98,7 +100,7 @@ private[kafka010] class KafkaMicroBatchStream( } else if (minOffsetPerTrigger.isDefined) { ReadLimit.minRows(minOffsetPerTrigger.get, maxTriggerDelayMs) } else { - maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit) + maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable()) } } @@ -113,7 +115,13 @@ private[kafka010] class KafkaMicroBatchStream( override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = { val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets - latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) + + // Use the pre-fetched list of partition offsets when Trigger.AvailableNow is enabled. + latestPartitionOffsets = if (allDataForTriggerAvailableNow != null) { + allDataForTriggerAvailableNow + } else { + kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) + } val limits: Seq[ReadLimit] = readLimit match { case rows: CompositeReadLimit => rows.getReadLimits @@ -298,6 +306,11 @@ private[kafka010] class KafkaMicroBatchStream( logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") } } + + override def prepareForTriggerAvailableNow(): Unit = { + allDataForTriggerAvailableNow = kafkaOffsetReader.fetchLatestOffsets( + Some(getOrCreateInitialPartitionOffsets())) + } } object KafkaMicroBatchStream extends Logging { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 839acd41fd40d..dba963b827c57 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -134,7 +134,7 @@ private[kafka010] class KafkaSource( } else if (minOffsetPerTrigger.isDefined) { ReadLimit.minRows(minOffsetPerTrigger.get, maxTriggerDelayMs) } else { - maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable() ) + maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable()) } } From 1d05a0b122e91319449187cc7f145384255ceed7 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Tue, 18 Jan 2022 17:37:59 -0800 Subject: [PATCH 6/8] addressing comments --- .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 3 +++ .../org/apache/spark/sql/kafka010/KafkaSource.scala | 3 +++ .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 10 +++++----- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 99fd7c656b60e..2d1f516955442 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -100,6 +100,9 @@ private[kafka010] class KafkaMicroBatchStream( } else if (minOffsetPerTrigger.isDefined) { ReadLimit.minRows(minOffsetPerTrigger.get, maxTriggerDelayMs) } else { + // Calling ReadLimit.allAvailable() explicitly instead of super.getDefaultReadLimit because + // scala compiler bug https://github.com/scala/bug/issues/12523 + // TODO revert to calling super.getDefaultReadLimit when scala complier bug is fixed maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable()) } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index dba963b827c57..bb032ea3f8682 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -134,6 +134,9 @@ private[kafka010] class KafkaSource( } else if (minOffsetPerTrigger.isDefined) { ReadLimit.minRows(minOffsetPerTrigger.get, maxTriggerDelayMs) } else { + // Calling ReadLimit.allAvailable() explicitly instead of super.getDefaultReadLimit because + // scala compiler bug https://github.com/scala/bug/issues/12523 + // TODO revert to calling super.getDefaultReadLimit when scala complier bug is fixed maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable()) } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index c0e0ec11243ef..e599f13d0d7c0 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -199,9 +199,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { val topic = newTopic() testUtils.createTopic(topic, partitions = 5) - testUtils.sendMessages(topic, (0 until 15).map(x => { - "foo-" + x - }).toArray, Some(0)) + testUtils.sendMessages(topic, (0 until 15).map{ case x => + s"foo-$x" + }.toArray, Some(0)) val reader = spark .readStream @@ -216,9 +216,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { var index: Int = 0 def startTriggerAvailableNowQuery(): StreamingQuery = { reader.writeStream - .foreachBatch((df: Dataset[Row], i: Long) => { + .foreachBatch{ case (_: Dataset[Row], _: Long) => index+=1 - }) + } .trigger(Trigger.AvailableNow) .start() } From 53e797d06f9c34de57fe22f9c99550fe26b0c164 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Wed, 19 Jan 2022 11:21:20 -0800 Subject: [PATCH 7/8] addressing comments --- .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index e599f13d0d7c0..0ee7f73da520d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -199,7 +199,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { val topic = newTopic() testUtils.createTopic(topic, partitions = 5) - testUtils.sendMessages(topic, (0 until 15).map{ case x => + testUtils.sendMessages(topic, (0 until 15).map { case x => s"foo-$x" }.toArray, Some(0)) @@ -216,9 +216,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { var index: Int = 0 def startTriggerAvailableNowQuery(): StreamingQuery = { reader.writeStream - .foreachBatch{ case (_: Dataset[Row], _: Long) => + .foreachBatch((_: Dataset[Row], _: Long) => { index+=1 - } + }) .trigger(Trigger.AvailableNow) .start() } From 1369e31d58ea7d89c713dbb946e970e8f1dec9e6 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Thu, 20 Jan 2022 11:22:54 -0800 Subject: [PATCH 8/8] addressing comments --- .../apache/spark/sql/kafka010/KafkaMicroBatchStream.scala | 4 +--- .../scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 8 ++------ .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 8 ++++---- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 2d1f516955442..829ee15c13a3d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -100,9 +100,7 @@ private[kafka010] class KafkaMicroBatchStream( } else if (minOffsetPerTrigger.isDefined) { ReadLimit.minRows(minOffsetPerTrigger.get, maxTriggerDelayMs) } else { - // Calling ReadLimit.allAvailable() explicitly instead of super.getDefaultReadLimit because - // scala compiler bug https://github.com/scala/bug/issues/12523 - // TODO revert to calling super.getDefaultReadLimit when scala complier bug is fixed + // TODO (SPARK-37973) Directly call super.getDefaultReadLimit when scala issue 12523 is fixed maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable()) } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index bb032ea3f8682..09db0a7e82dfe 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -77,9 +77,7 @@ private[kafka010] class KafkaSource( metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) - extends SupportsTriggerAvailableNow - with Source - with Logging { + extends SupportsTriggerAvailableNow with Source with Logging { private val sc = sqlContext.sparkContext @@ -134,9 +132,7 @@ private[kafka010] class KafkaSource( } else if (minOffsetPerTrigger.isDefined) { ReadLimit.minRows(minOffsetPerTrigger.get, maxTriggerDelayMs) } else { - // Calling ReadLimit.allAvailable() explicitly instead of super.getDefaultReadLimit because - // scala compiler bug https://github.com/scala/bug/issues/12523 - // TODO revert to calling super.getDefaultReadLimit when scala complier bug is fixed + // TODO (SPARK-37973) Directly call super.getDefaultReadLimit when scala issue 12523 is fixed maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable()) } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 0ee7f73da520d..61be7dd6cd8ef 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -217,17 +217,17 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { def startTriggerAvailableNowQuery(): StreamingQuery = { reader.writeStream .foreachBatch((_: Dataset[Row], _: Long) => { - index+=1 + index += 1 }) .trigger(Trigger.AvailableNow) .start() } - val q1 = startTriggerAvailableNowQuery() + val query = startTriggerAvailableNowQuery() try { - assert(q1.awaitTermination(streamingTimeout.toMillis)) + assert(query.awaitTermination(streamingTimeout.toMillis)) } finally { - q1.stop() + query.stop() } // should have 3 batches now i.e. 15 / 5 = 3