From ebb9b51c51a4411811a7e0e09fff8f8608faa017 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 28 Feb 2018 17:28:32 -0800 Subject: [PATCH 1/5] Implemented --- .../sql/kafka010/KafkaMicroBatchReader.scala | 110 ++++++++-------- .../kafka010/KafkaOffsetRangeCalculator.scala | 105 +++++++++++++++ .../sql/kafka010/KafkaSourceProvider.scala | 7 + .../apache/spark/sql/kafka010/package.scala | 24 ++++ .../kafka010/KafkaMicroBatchSourceSuite.scala | 56 +++++++- .../KafkaOffsetRangeCalculatorSuite.scala | 122 ++++++++++++++++++ 6 files changed, 364 insertions(+), 60 deletions(-) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala create mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala index fb647ca7e70dd..980f46c11c016 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala @@ -24,7 +24,6 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import org.apache.commons.io.IOUtils -import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging @@ -64,8 +63,6 @@ private[kafka010] class KafkaMicroBatchReader( failOnDataLoss: Boolean) extends MicroBatchReader with SupportsScanUnsafeRow with Logging { - type PartitionOffsetMap = Map[TopicPartition, Long] - private var startPartitionOffsets: PartitionOffsetMap = _ private var endPartitionOffsets: PartitionOffsetMap = _ @@ -76,6 +73,7 @@ private[kafka010] class KafkaMicroBatchReader( private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) + private val rangeCalculator = KafkaOffsetRangeCalculator(options) /** * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only * called in StreamExecutionThread. Otherwise, interrupting a thread while running @@ -106,15 +104,15 @@ private[kafka010] class KafkaMicroBatchReader( override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = { // Find the new partitions, and get their earliest offsets val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet) - val newPartitionOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq) - if (newPartitionOffsets.keySet != newPartitions) { + val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq) + if (newPartitionInitialOffsets.keySet != newPartitions) { // We cannot get from offsets for some partitions. It means they got deleted. - val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet) + val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) reportDataLoss( s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed") } - logInfo(s"Partitions added: $newPartitionOffsets") - newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) => + logInfo(s"Partitions added: $newPartitionInitialOffsets") + newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => reportDataLoss( s"Added partition $p starts from $o instead of 0. Some data may have been missed") } @@ -125,46 +123,28 @@ private[kafka010] class KafkaMicroBatchReader( reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed") } - // Use the until partitions to calculate offset ranges to ignore partitions that have + // Use the end partitions to calculate offset ranges to ignore partitions that have // been deleted val topicPartitions = endPartitionOffsets.keySet.filter { tp => // Ignore partitions that we don't know the from offsets. - newPartitionOffsets.contains(tp) || startPartitionOffsets.contains(tp) + newPartitionInitialOffsets.contains(tp) || startPartitionOffsets.contains(tp) }.toSeq logDebug("TopicPartitions: " + topicPartitions.mkString(", ")) - val sortedExecutors = getSortedExecutorList() - val numExecutors = sortedExecutors.length - logDebug("Sorted executors: " + sortedExecutors.mkString(", ")) - // Calculate offset ranges - val factories = topicPartitions.flatMap { tp => - val fromOffset = startPartitionOffsets.get(tp).getOrElse { - newPartitionOffsets.getOrElse( - tp, { - // This should not happen since newPartitionOffsets contains all partitions not in - // fromPartitionOffsets - throw new IllegalStateException(s"$tp doesn't have a from offset") - }) - } - val untilOffset = endPartitionOffsets(tp) - - if (untilOffset >= fromOffset) { - // This allows cached KafkaConsumers in the executors to be re-used to read the same - // partition in every batch. - val preferredLoc = if (numExecutors > 0) { - Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors))) - } else None - val range = KafkaOffsetRange(tp, fromOffset, untilOffset) - Some( - new KafkaMicroBatchDataReaderFactory( - range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss)) - } else { - reportDataLoss( - s"Partition $tp's offset was changed from " + - s"$fromOffset to $untilOffset, some data may have been missed") - None - } + val offsetRanges = rangeCalculator.getRanges( + fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets, + untilOffsets = endPartitionOffsets, + executorLocations = getSortedExecutorList()) + + // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions, + // that is, concurrent tasks will not read the same TopicPartitions. + val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == offsetRanges.size + + // Generate factories based on the offset ranges + val factories = offsetRanges.map { range => + new KafkaMicroBatchDataReaderFactory( + range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer) } factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava } @@ -199,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader( // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread. // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever // (KAFKA-1894). - assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) + require(Thread.currentThread().isInstanceOf[UninterruptibleThread]) // SparkSession is required for getting Hadoop configuration for writing to checkpoints - assert(SparkSession.getActiveSession.nonEmpty) + require(SparkSession.getActiveSession.nonEmpty) val metadataLog = new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath) @@ -320,28 +300,39 @@ private[kafka010] class KafkaMicroBatchReader( } /** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */ -private[kafka010] class KafkaMicroBatchDataReaderFactory( - range: KafkaOffsetRange, - preferredLoc: Option[String], +private[kafka010] case class KafkaMicroBatchDataReaderFactory( + offsetRange: KafkaOffsetRange, executorKafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, - failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] { + failOnDataLoss: Boolean, + reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] { - override def preferredLocations(): Array[String] = preferredLoc.toArray + override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader( - range, executorKafkaParams, pollTimeoutMs, failOnDataLoss) + offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer) } /** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */ -private[kafka010] class KafkaMicroBatchDataReader( +private[kafka010] case class KafkaMicroBatchDataReader( offsetRange: KafkaOffsetRange, executorKafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, - failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging { + failOnDataLoss: Boolean, + reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging { + + private val consumer = { + if (!reuseKafkaConsumer) { + // If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. As here we + // uses `assign`, we don't need to worry about the "group.id" conflicts. + CachedKafkaConsumer.createUncached( + offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) + } else { + CachedKafkaConsumer.getOrCreate( + offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) + } + } - private val consumer = CachedKafkaConsumer.getOrCreate( - offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) private val rangeToRead = resolveRange(offsetRange) private val converter = new KafkaRecordToUnsafeRowConverter @@ -370,8 +361,14 @@ private[kafka010] class KafkaMicroBatchDataReader( override def close(): Unit = { // Indicate that we're no longer using this consumer - CachedKafkaConsumer.releaseKafkaConsumer( - offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) + if (!reuseKafkaConsumer) { + // Don't forget to close non-reuse KafkaConsumers. You may take down your cluster! + consumer.close() + } else { + // Indicate that we're no longer using this consumer + CachedKafkaConsumer.releaseKafkaConsumer( + offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) + } } private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = { @@ -398,6 +395,3 @@ private[kafka010] class KafkaMicroBatchDataReader( } } } - -private[kafka010] case class KafkaOffsetRange( - topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala new file mode 100644 index 0000000000000..9989eeb998866 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.sql.sources.v2.DataSourceOptions + + +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { + require(minPartitions >= 0) + + import KafkaOffsetRangeCalculator._ + /** + * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * is not set or is set less than or equal the number of `topicPartitions` that we're going to + * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If + * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up + * the read tasks of the skewed partitions to multiple Spark tasks. + * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more + * depending on rounding errors or Kafka partitions that didn't receive any new data. + */ + def getRanges( + fromOffsets: PartitionOffsetMap, + untilOffsets: PartitionOffsetMap, + executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange] = { + val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) + + val offsetRanges = partitionsToRead.toSeq.map { tp => + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp)) + } + + // If minPartitions not set or there are enough partitions to satisfy minPartitions + if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { + // Assign preferred executor locations to each range such that the same topic-partition is + // always read from the same executor and the KafkaConsumer can be reused + offsetRanges.map { range => + range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) + } + } else { + + // Splits offset ranges with relatively large amount of data to smaller ones. + val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + offsetRanges.flatMap { offsetRange => + val tp = offsetRange.topicPartition + val size = offsetRange.untilOffset - offsetRange.fromOffset + // number of partitions to divvy up this topic partition to + val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt + var remaining = size + var startOffset = offsetRange.fromOffset + (0 until parts).map { part => + // Fine to do integer division. Last partition will consume all the round off errors + val thisPartition = remaining / (parts - part) + remaining -= thisPartition + val endOffset = startOffset + thisPartition + val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, preferredLoc = None) + startOffset = endOffset + offsetRange + } + } + } + } + + private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): Option[String] = { + def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b + + val numExecutors = executorLocations.length + if (numExecutors > 0) { + // This allows cached KafkaConsumers in the executors to be re-used to read the same + // partition in every batch. + Some(executorLocations(floorMod(tp.hashCode, numExecutors))) + } else None + } +} + +private[kafka010] object KafkaOffsetRangeCalculator { + + private val DEFAULT_MIN_PARTITIONS = 0 + + def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = { + new KafkaOffsetRangeCalculator(options.getInt("minPartitions", DEFAULT_MIN_PARTITIONS)) + } +} + + +private[kafka010] case class KafkaOffsetRange( + topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long, + preferredLoc: Option[String] = None) + + diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 0aa64a6a9cf90..36b9f0466566b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -348,6 +348,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister throw new IllegalArgumentException("Unknown option") } + // Validate minPartitions value if present + if (caseInsensitiveParams.contains(MIN_PARTITIONS_OPTION_KEY)) { + val p = caseInsensitiveParams(MIN_PARTITIONS_OPTION_KEY).toInt + if (p <= 0) throw new IllegalArgumentException("minPartitions must be positive") + } + // Validate user-specified Kafka options if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) { @@ -455,6 +461,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets" private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss" + private val MIN_PARTITIONS_OPTION_KEY = "minpartitions" val TOPIC_OPTION_KEY = "topic" diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala new file mode 100644 index 0000000000000..43acd6a8d9473 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import org.apache.kafka.common.TopicPartition + +package object kafka010 { // scalastyle:ignore + // ^^ scalastyle:ignore is for ignoring warnings about digits in package name + type PartitionOffsetMap = Map[TopicPartition, Long] +} diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 89c9ef4cc73b5..f2b3ff7615e74 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.kafka010 import java.io._ import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.{Files, Paths} -import java.util.{Locale, Properties} +import java.util.{Locale, Optional, Properties} import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.io.Source import scala.util.Random @@ -34,15 +35,19 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkContext -import org.apache.spark.sql.{Dataset, ForeachWriter} +import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession} +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ +import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} +import org.apache.spark.sql.types.StructType abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { @@ -642,6 +647,53 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { } ) } + + testWithUninterruptibleThread("minPartitions is supported") { + import testImplicits._ + + val topic = newTopic() + val tp = new TopicPartition(topic, 0) + testUtils.createTopic(topic, partitions = 1) + + def test( + minPartitions: String, + numPartitionsGenerated: Int, + reusesConsumers: Boolean): Unit = { + + SparkSession.setActiveSession(spark) + withTempDir { dir => + val provider = new KafkaSourceProvider() + val options = Map( + "kafka.bootstrap.servers" -> testUtils.brokerAddress, + "subscribe" -> topic + ) ++ Option(minPartitions).map { p => "minPartitions" -> p} + val reader = provider.createMicroBatchReader( + Optional.empty[StructType], dir.getAbsolutePath, new DataSourceOptions(options.asJava)) + reader.setOffsetRange( + Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))), + Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L))) + ) + val factories = reader.createUnsafeRowReaderFactories().asScala + .map(_.asInstanceOf[KafkaMicroBatchDataReaderFactory]) + withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") { + assert(factories.size == numPartitionsGenerated) + factories.foreach { f => assert(f.reuseKafkaConsumer == reusesConsumers) } + } + } + } + + // Test cases when minPartitions is used and not used + test(minPartitions = null, numPartitionsGenerated = 1, reusesConsumers = true) + test(minPartitions = "1", numPartitionsGenerated = 1, reusesConsumers = true) + test(minPartitions = "4", numPartitionsGenerated = 4, reusesConsumers = false) + + // Test illegal minPartitions values + intercept[IllegalArgumentException] { test(minPartitions = "a", 1, true) } + intercept[IllegalArgumentException] { test(minPartitions = "1.0", 1, true) } + intercept[IllegalArgumentException] { test(minPartitions = "0", 1, true) } + intercept[IllegalArgumentException] { test(minPartitions = "-1", 1, true) } + } + } abstract class KafkaSourceSuiteBase extends KafkaSourceTest { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala new file mode 100644 index 0000000000000..8704c69a0a0c1 --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import scala.collection.JavaConverters._ + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.sources.v2.DataSourceOptions + +class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { + + def testWithMinPartitions(name: String, minPartition: Int) + (f: KafkaOffsetRangeCalculator => Unit): Unit = { + val options = new DataSourceOptions(Map("minPartitions" -> minPartition.toString).asJava) + test(s"with minPartition = $minPartition: $name") { + f(KafkaOffsetRangeCalculator(options)) + } + } + + + test("with no minPartition: N TopicPartitions to N offset ranges") { + val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty()) + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1), + untilOffsets = Map(tp1 -> 2)) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1, tp2 -> 1), + untilOffsets = Map(tp1 -> 2, tp2 -> 1)) == + Seq(KafkaOffsetRange(tp1, 1, 2, None), KafkaOffsetRange(tp2, 1, 1, None))) + + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1), + untilOffsets = Map(tp1 -> 2, tp2 -> 1), Seq.empty) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1, tp2 -> 1), + untilOffsets = Map(tp1 -> 2)) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1, tp2 -> 1), + untilOffsets = Map(tp1 -> 2), + executorLocations = Seq("location")) == + Seq(KafkaOffsetRange(tp1, 1, 2, Some("location")))) + } + + testWithMinPartitions("N TopicPartitions to N offset ranges", 3) { calc => + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1, tp2 -> 1, tp3 -> 1), + untilOffsets = Map(tp1 -> 2, tp2 -> 2, tp3 -> 2)) == + Seq( + KafkaOffsetRange(tp1, 1, 2, None), + KafkaOffsetRange(tp2, 1, 2, None), + KafkaOffsetRange(tp3, 1, 2, None))) + } + + testWithMinPartitions("1 TopicPartition to N offset ranges", 4) { calc => + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1), + untilOffsets = Map(tp1 -> 5)) == + Seq( + KafkaOffsetRange(tp1, 1, 2, None), + KafkaOffsetRange(tp1, 2, 3, None), + KafkaOffsetRange(tp1, 3, 4, None), + KafkaOffsetRange(tp1, 4, 5, None))) + + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1), + untilOffsets = Map(tp1 -> 5), + executorLocations = Seq("location")) == + Seq( + KafkaOffsetRange(tp1, 1, 2, None), + KafkaOffsetRange(tp1, 2, 3, None), + KafkaOffsetRange(tp1, 3, 4, None), + KafkaOffsetRange(tp1, 4, 5, None))) // location pref not set when minPartition is set + } + + testWithMinPartitions("N skewed TopicPartition to M offset ranges", 3) { calc => + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1, tp2 -> 1), + untilOffsets = Map(tp1 -> 5, tp2 -> 21)) == + Seq( + KafkaOffsetRange(tp1, 1, 5, None), + KafkaOffsetRange(tp2, 1, 7, None), // 1 + 20 / 3 => 1 + 6 = 7 + KafkaOffsetRange(tp2, 7, 14, None), // 7 + 14 / 2 => 7 + 7 = 14 + KafkaOffsetRange(tp2, 14, 21, None))) // 14 + 7 / 1 => 14 + 7 = 21 + } + + private val tp1 = new TopicPartition("t1", 1) + private val tp2 = new TopicPartition("t2", 1) + private val tp3 = new TopicPartition("t3", 1) + +} From 5f066a058f685a394397244cb46b022483f7e892 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 28 Feb 2018 17:37:39 -0800 Subject: [PATCH 2/5] Minor changes --- .../spark/sql/kafka010/KafkaMicroBatchReader.scala | 2 +- .../sql/kafka010/KafkaOffsetRangeCalculator.scala | 12 ++++++------ .../kafka010/KafkaOffsetRangeCalculatorSuite.scala | 1 - 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala index 980f46c11c016..e50b90c03ad31 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala @@ -389,7 +389,7 @@ private[kafka010] case class KafkaMicroBatchDataReader( } else { range.untilOffset } - KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset) + KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None) } else { range } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index 9989eeb998866..20e97221a01bb 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -22,6 +22,10 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.sql.sources.v2.DataSourceOptions +/** + * Class to calculate offset ranges to process based on the the from and until offsets, and + * the configured `minPartitions`. + */ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { require(minPartitions >= 0) @@ -42,7 +46,7 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { val partitionsToRead = untilOffsets.keySet.intersect(fromOffsets.keySet) val offsetRanges = partitionsToRead.toSeq.map { tp => - KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp)) + KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) } // If minPartitions not set or there are enough partitions to satisfy minPartitions @@ -97,9 +101,5 @@ private[kafka010] object KafkaOffsetRangeCalculator { } } - private[kafka010] case class KafkaOffsetRange( - topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long, - preferredLoc: Option[String] = None) - - + topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long, preferredLoc: Option[String]) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala index 8704c69a0a0c1..deb3e1505b51d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala @@ -118,5 +118,4 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { private val tp1 = new TopicPartition("t1", 1) private val tp2 = new TopicPartition("t2", 1) private val tp3 = new TopicPartition("t3", 1) - } From 3eae3f188deaf82d6384ebf698f09c9bf0d0b735 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 1 Mar 2018 18:17:30 -0800 Subject: [PATCH 3/5] Addressed comments --- .../sql/kafka010/KafkaMicroBatchReader.scala | 9 ++-- .../kafka010/KafkaOffsetRangeCalculator.scala | 43 +++++++++---------- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala index e50b90c03ad31..8a5f3a249b11c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala @@ -179,10 +179,10 @@ private[kafka010] class KafkaMicroBatchReader( // Make sure that `KafkaConsumer.poll` is only called in StreamExecutionThread. // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may hang forever // (KAFKA-1894). - require(Thread.currentThread().isInstanceOf[UninterruptibleThread]) + assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) // SparkSession is required for getting Hadoop configuration for writing to checkpoints - require(SparkSession.getActiveSession.nonEmpty) + assert(SparkSession.getActiveSession.nonEmpty) val metadataLog = new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath) @@ -323,8 +323,8 @@ private[kafka010] case class KafkaMicroBatchDataReader( private val consumer = { if (!reuseKafkaConsumer) { - // If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. As here we - // uses `assign`, we don't need to worry about the "group.id" conflicts. + // If we can't reuse CachedKafkaConsumers, creating a new CachedKafkaConsumer. We + // uses `assign` here, hence we don't need to worry about the "group.id" conflicts. CachedKafkaConsumer.createUncached( offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) } else { @@ -360,7 +360,6 @@ private[kafka010] case class KafkaMicroBatchDataReader( } override def close(): Unit = { - // Indicate that we're no longer using this consumer if (!reuseKafkaConsumer) { // Don't forget to close non-reuse KafkaConsumers. You may take down your cluster! consumer.close() diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index 20e97221a01bb..ff2505fb7422e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -26,12 +26,12 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions * Class to calculate offset ranges to process based on the the from and until offsets, and * the configured `minPartitions`. */ -private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { - require(minPartitions >= 0) +private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int]) { + require(minPartitions.isEmpty || minPartitions.get > 0) import KafkaOffsetRangeCalculator._ /** - * Calculate the offset ranges that we are going to process this batch. If `numPartitions` + * Calculate the offset ranges that we are going to process this batch. If `minPartitions` * is not set or is set less than or equal the number of `topicPartitions` that we're going to * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka partitions. If * `numPartitions` is set higher than the number of our `topicPartitions`, then we will split up @@ -50,9 +50,9 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { } // If minPartitions not set or there are enough partitions to satisfy minPartitions - if (minPartitions == DEFAULT_MIN_PARTITIONS || offsetRanges.size > minPartitions) { + if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { // Assign preferred executor locations to each range such that the same topic-partition is - // always read from the same executor and the KafkaConsumer can be reused + // preferentially read from the same executor and the KafkaConsumer can be reused. offsetRanges.map { range => range.copy(preferredLoc = getLocation(range.topicPartition, executorLocations)) } @@ -60,22 +60,20 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { // Splits offset ranges with relatively large amount of data to smaller ones. val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum - offsetRanges.flatMap { offsetRange => - val tp = offsetRange.topicPartition - val size = offsetRange.untilOffset - offsetRange.fromOffset - // number of partitions to divvy up this topic partition to - val parts = math.max(math.round(size * 1.0 / totalSize * minPartitions), 1).toInt - var remaining = size - var startOffset = offsetRange.fromOffset - (0 until parts).map { part => - // Fine to do integer division. Last partition will consume all the round off errors - val thisPartition = remaining / (parts - part) - remaining -= thisPartition - val endOffset = startOffset + thisPartition - val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, preferredLoc = None) - startOffset = endOffset - offsetRange + val idealRangeSize = totalSize.toDouble / minPartitions.get + + offsetRanges.flatMap { range => + // Split the current range into subranges as close to the ideal range size + val rangeSize = range.untilOffset - range.fromOffset + val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt + + (0 until numSplitsInRange).map { i => + val splitStart = range.fromOffset + rangeSize * (i.toDouble / numSplitsInRange) + val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) / numSplitsInRange) + KafkaOffsetRange( + range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) } + } } } @@ -94,10 +92,9 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Int) { private[kafka010] object KafkaOffsetRangeCalculator { - private val DEFAULT_MIN_PARTITIONS = 0 - def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = { - new KafkaOffsetRangeCalculator(options.getInt("minPartitions", DEFAULT_MIN_PARTITIONS)) + val optionalValue = Option(options.get("minPartitions").orElse(null)).map(_.toInt) + new KafkaOffsetRangeCalculator(optionalValue) } } From 1e244e0d484d13d08b5afeed45bcbd9805254fe1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Mar 2018 01:01:54 -0800 Subject: [PATCH 4/5] Added 1 more test --- .../kafka010/KafkaOffsetRangeCalculator.scala | 10 +++++-- .../KafkaOffsetRangeCalculatorSuite.scala | 29 ++++++++++++++----- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index ff2505fb7422e..9bc8fe2841890 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -47,7 +47,7 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int val offsetRanges = partitionsToRead.toSeq.map { tp => KafkaOffsetRange(tp, fromOffsets(tp), untilOffsets(tp), preferredLoc = None) - } + }.filter(_.size > 0) // If minPartitions not set or there are enough partitions to satisfy minPartitions if (minPartitions.isEmpty || offsetRanges.size > minPartitions.get) { @@ -73,7 +73,6 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int KafkaOffsetRange( range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) } - } } } @@ -99,4 +98,9 @@ private[kafka010] object KafkaOffsetRangeCalculator { } private[kafka010] case class KafkaOffsetRange( - topicPartition: TopicPartition, fromOffset: Long, untilOffset: Long, preferredLoc: Option[String]) + topicPartition: TopicPartition, + fromOffset: Long, + untilOffset: Long, + preferredLoc: Option[String]) { + def size: Long = untilOffset - fromOffset +} diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala index deb3e1505b51d..46250499a4f51 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala @@ -43,12 +43,6 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { untilOffsets = Map(tp1 -> 2)) == Seq(KafkaOffsetRange(tp1, 1, 2, None))) - assert( - calc.getRanges( - fromOffsets = Map(tp1 -> 1, tp2 -> 1), - untilOffsets = Map(tp1 -> 2, tp2 -> 1)) == - Seq(KafkaOffsetRange(tp1, 1, 2, None), KafkaOffsetRange(tp2, 1, 1, None))) - assert( calc.getRanges( fromOffsets = Map(tp1 -> 1), @@ -69,6 +63,15 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { Seq(KafkaOffsetRange(tp1, 1, 2, Some("location")))) } + test("with no minPartition: empty ranges ignored") { + val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty()) + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1, tp2 -> 1), + untilOffsets = Map(tp1 -> 2, tp2 -> 1)) == + Seq(KafkaOffsetRange(tp1, 1, 2, None))) + } + testWithMinPartitions("N TopicPartitions to N offset ranges", 3) { calc => assert( calc.getRanges( @@ -103,7 +106,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { KafkaOffsetRange(tp1, 4, 5, None))) // location pref not set when minPartition is set } - testWithMinPartitions("N skewed TopicPartition to M offset ranges", 3) { calc => + testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 3) { calc => assert( calc.getRanges( fromOffsets = Map(tp1 -> 1, tp2 -> 1), @@ -115,6 +118,18 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { KafkaOffsetRange(tp2, 14, 21, None))) // 14 + 7 / 1 => 14 + 7 = 21 } + testWithMinPartitions("empty ranges ignored", 3) { calc => + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1, tp2 -> 1, tp3 -> 1), + untilOffsets = Map(tp1 -> 5, tp2 -> 21, tp3 -> 1)) == + Seq( + KafkaOffsetRange(tp1, 1, 5, None), + KafkaOffsetRange(tp2, 1, 7, None), // 1 + 20 / 3 => 1 + 6 = 7 + KafkaOffsetRange(tp2, 7, 14, None), // 7 + 14 / 2 => 7 + 7 = 14 + KafkaOffsetRange(tp2, 14, 21, None))) // 14 + 7 / 1 => 14 + 7 = 21 + } + private val tp1 = new TopicPartition("t1", 1) private val tp2 = new TopicPartition("t2", 1) private val tp3 = new TopicPartition("t3", 1) From 602ab36490a692080682867f98a8a5d8f7b2390d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Mar 2018 15:49:25 -0800 Subject: [PATCH 5/5] Addressed comments --- .../kafka010/KafkaOffsetRangeCalculator.scala | 11 ++++----- .../KafkaOffsetRangeCalculatorSuite.scala | 23 ++++++++++++++----- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index 9bc8fe2841890..6631ae84167c8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -59,17 +59,16 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int } else { // Splits offset ranges with relatively large amount of data to smaller ones. - val totalSize = offsetRanges.map(o => o.untilOffset - o.fromOffset).sum + val totalSize = offsetRanges.map(_.size).sum val idealRangeSize = totalSize.toDouble / minPartitions.get offsetRanges.flatMap { range => // Split the current range into subranges as close to the ideal range size - val rangeSize = range.untilOffset - range.fromOffset - val numSplitsInRange = math.round(rangeSize.toDouble / idealRangeSize).toInt + val numSplitsInRange = math.round(range.size.toDouble / idealRangeSize).toInt (0 until numSplitsInRange).map { i => - val splitStart = range.fromOffset + rangeSize * (i.toDouble / numSplitsInRange) - val splitEnd = range.fromOffset + rangeSize * ((i.toDouble + 1) / numSplitsInRange) + val splitStart = range.fromOffset + range.size * (i.toDouble / numSplitsInRange) + val splitEnd = range.fromOffset + range.size * ((i.toDouble + 1) / numSplitsInRange) KafkaOffsetRange( range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) } @@ -102,5 +101,5 @@ private[kafka010] case class KafkaOffsetRange( fromOffset: Long, untilOffset: Long, preferredLoc: Option[String]) { - def size: Long = untilOffset - fromOffset + lazy val size: Long = untilOffset - fromOffset } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala index 46250499a4f51..2ccf3e291bea7 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala @@ -113,9 +113,20 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { untilOffsets = Map(tp1 -> 5, tp2 -> 21)) == Seq( KafkaOffsetRange(tp1, 1, 5, None), - KafkaOffsetRange(tp2, 1, 7, None), // 1 + 20 / 3 => 1 + 6 = 7 - KafkaOffsetRange(tp2, 7, 14, None), // 7 + 14 / 2 => 7 + 7 = 14 - KafkaOffsetRange(tp2, 14, 21, None))) // 14 + 7 / 1 => 14 + 7 = 21 + KafkaOffsetRange(tp2, 1, 7, None), + KafkaOffsetRange(tp2, 7, 14, None), + KafkaOffsetRange(tp2, 14, 21, None))) + } + + testWithMinPartitions("range inexact multiple of minPartitions", 3) { calc => + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 1), + untilOffsets = Map(tp1 -> 11)) == + Seq( + KafkaOffsetRange(tp1, 1, 4, None), + KafkaOffsetRange(tp1, 4, 7, None), + KafkaOffsetRange(tp1, 7, 11, None))) } testWithMinPartitions("empty ranges ignored", 3) { calc => @@ -125,9 +136,9 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { untilOffsets = Map(tp1 -> 5, tp2 -> 21, tp3 -> 1)) == Seq( KafkaOffsetRange(tp1, 1, 5, None), - KafkaOffsetRange(tp2, 1, 7, None), // 1 + 20 / 3 => 1 + 6 = 7 - KafkaOffsetRange(tp2, 7, 14, None), // 7 + 14 / 2 => 7 + 7 = 14 - KafkaOffsetRange(tp2, 14, 21, None))) // 14 + 7 / 1 => 14 + 7 = 21 + KafkaOffsetRange(tp2, 1, 7, None), + KafkaOffsetRange(tp2, 7, 14, None), + KafkaOffsetRange(tp2, 14, 21, None))) } private val tp1 = new TopicPartition("t1", 1)