From a5b52c94b9f7eaa293d7882bde0fb432ef3fa632 Mon Sep 17 00:00:00 2001 From: quentin Date: Mon, 30 Jul 2018 16:43:56 +0200 Subject: [PATCH 1/6] SPARK-24720 add option to align ranges with offset having records to support kafka transaction --- .../kafka010/DirectKafkaInputDStream.scala | 40 +++++- .../kafka010/OffsetWithRecordRewinder.scala | 133 ++++++++++++++++++ .../OffsetWithRecordRewinderSuite.scala | 118 ++++++++++++++++ 3 files changed, 288 insertions(+), 3 deletions(-) create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinder.scala create mode 100644 external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinderSuite.scala diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 0246006acf0bd..ce42bf72b56ea 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kafka010 -import java.{ util => ju } +import java.{util => ju} import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicReference @@ -59,6 +59,24 @@ private[spark] class DirectKafkaInputDStream[K, V]( private val initialRate = context.sparkContext.getConf.getLong( "spark.streaming.backpressure.initialRate", 0) + private val alignRangesToCommittedTransaction = context.sparkContext.getConf.getBoolean( + "spark.streaming.kafka.alignRangesToCommittedTransaction", false) + + private val offsetSearchRewind = context.sparkContext.getConf.getLong( + "spark.streaming.kafka.offsetSearchRewind", 10) + + @transient private var rw: OffsetWithRecordRewinder[K, V] = null + def rewinder(): OffsetWithRecordRewinder[K, V] = this.synchronized { + if (rw == null) { + val params = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams) + params.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, offsetSearchRewind.toString) + params.remove(ConsumerConfig.GROUP_ID_CONFIG) + rw = new OffsetWithRecordRewinder(offsetSearchRewind, + consumerStrategy.executorKafkaParams) + } + rw + } + val executorKafkaParams = { val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams) KafkaUtils.fixKafkaParams(ekp) @@ -223,9 +241,22 @@ private[spark] class DirectKafkaInputDStream[K, V]( }.getOrElse(offsets) } + protected def handleTransaction(offsets: Map[TopicPartition, Long]) = { + if (alignRangesToCommittedTransaction) { + val localRw = rewinder() + val localOffsets = currentOffsets + context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { + tpos.map{case (tp, o) => localRw.rewind(localOffsets, tp, o)} + }).collect().toMap + } else { + offsets + } + } + override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { - val untilOffsets = clamp(latestOffsets()) - val offsetRanges = untilOffsets.map { case (tp, uo) => + val untilOffsets = handleTransaction(clamp(latestOffsets())) + + val offsetRanges = untilOffsets.map { case (tp, uo) => val fo = currentOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo) } @@ -268,6 +299,9 @@ private[spark] class DirectKafkaInputDStream[K, V]( if (kc != null) { kc.close() } + if(rw != null) { + rw.close() + } } protected val commitQueue = new ConcurrentLinkedQueue[OffsetRange] diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinder.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinder.scala new file mode 100644 index 0000000000000..d4684dd77b26a --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinder.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords, KafkaConsumer} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.internal.Logging + + +protected class KafkaConsumerHolder[K, V](private val kafkaParams: ju.Map[String, Object]) { + private var closed = false + private var consumers: mutable.Map[TopicPartition, Consumer[K, V]] = mutable.Map() + + /** + * Get a consumer from the pool. Creates one if needed. + * Creates 1 consmuer per TopicPartition to be able to fetch multiple partitions + * at the same time. + */ + def getConsumer(tp: TopicPartition): Consumer[K, V] = { + this.synchronized { + assert(!closed, "Consumers have been closed, can't open new.") + if (!consumers.contains(tp)) { + val c = new KafkaConsumer[K, V](kafkaParams) + c.assign(Set(tp).asJava) + consumers(tp) = c + } + consumers(tp) + } + } + + def close(): Unit = { + this.synchronized { + closed = true + consumers.foreach(_._2.close()) + } + } +} + +/** + * If we endup on an empty offset (transaction marker or abort transaction), + * a call to c.poll() won't return any data and we can't tell if it's because + * we missed data or the offset is empty. + * To prevent that, we change the offset range to always end on an offset with + * data. The range can be increased or reduced to the next living offset. + * @param rewind how many offset we'll try to rewind before seeking offset with data + * @param kafkaParams kafka params. Isolation level must be read_committed + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +class OffsetWithRecordRewinder[K, V]( + rewind: Long, + kafkaParams: ju.Map[String, Object] + ) extends Logging with Serializable { + + if (!kafkaParams.containsKey("isolation.level") || + kafkaParams.get("isolation.level") != "read_committed") { + throw new IllegalStateException("DirectStream only support read_committed." + + "Please add isolation.level = read_committed to your kafka configuration") + } + + lazy val holder = new KafkaConsumerHolder[K, V](kafkaParams) + + def close(): Unit = { + holder.close() + } + + def rewind(currentOffsets: Map[TopicPartition, Long], tp: TopicPartition, + o: Long): (TopicPartition, Long) = { + val consumer = holder.getConsumer(tp) + val rewindOffset = rewindUntilDataExist(currentOffsets, tp, o, 1, rewind, consumer) + (tp, rewindOffset) + } + + // Try to poll the last message if any. + def rewindUntilDataExist(currentOffsets: Map[TopicPartition, Long], tp: TopicPartition, + toOffset: Long, iteration: Int, rewind: Long, + consumer: Consumer[K, V]): Long = { + if (toOffset <= currentOffsets(tp)) { + logDebug(s"can't rewind further: $toOffset <= ${currentOffsets(tp)}. Stop") + return toOffset + } + val nextStart = toOffset - Math.pow(rewind, iteration).toLong + val startOffsetScan = Math.max(currentOffsets(tp), nextStart) + logDebug(s"searching the next non-empty offset in ${tp} from offset $startOffsetScan") + val records: ConsumerRecords[K, V] = seekAndPoll(consumer, tp, startOffsetScan) + val smallestRecordOffset = records.iterator().asScala + .foldLeft(startOffsetScan)((maxOffset, r) => r.offset() match { + // we get an offset after our range, but our max is already after the range, we take the min + case recordOffset if recordOffset >= toOffset && maxOffset > toOffset => maxOffset + // we get an offset after our range, take it (it'll increase the range). + case recordOffset if recordOffset >= toOffset => recordOffset + // we get an offset bigger than the max but before our range, take it. + case recordOffset if recordOffset > maxOffset => recordOffset + // we get an offset smaller than the max, ignore it. + case _ => maxOffset + }) + // we have at least 1 offset with data, use this one as final range + if(smallestRecordOffset > startOffsetScan) { + smallestRecordOffset + } else { + // if we don't get any data, try to rewind faster + rewindUntilDataExist(currentOffsets, tp, startOffsetScan, iteration + 1, rewind, consumer) + } + } + + protected def seekAndPoll(c: Consumer[K, V], tp: TopicPartition, startOffsetScan: Long) = { + c.seek(tp, startOffsetScan) + // Try to get the last records. We want at least 1 living record + // Will be slow if all range data is aborted transactions + c.poll(1000) + } +} diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinderSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinderSuite.scala new file mode 100644 index 0000000000000..a5aa91983ff78 --- /dev/null +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinderSuite.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerRecords} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.Logging + +class OffsetWithRecordRewinderSuite + extends SparkFunSuite + with Logging { + + class OffsetWithRecordRewinderMock[K, V](records: Map[Long, ConsumerRecords[K, V]]) + extends OffsetWithRecordRewinder[K, V]( 10, + Map[String, Object]("isolation.level" -> "read_committed").asJava) { + + override protected def seekAndPoll(c: Consumer[K, V], tp: TopicPartition, offset: Long) = { + records(offset) + } + + } + + val emptyConsumerRecords = new ConsumerRecords[String, String](ju.Collections.emptyMap()) + val tp = new TopicPartition("topic", 0) + + test("Rewinder construction should fail if isolation level isn't set to committed") { + intercept[IllegalStateException] { + new OffsetWithRecordRewinder[String, String](10, + Map[String, Object]().asJava) + } + intercept[IllegalStateException] { + new OffsetWithRecordRewinder[String, String](10, + Map[String, Object]("isolation.level" -> "read_uncommitted").asJava) + } + } + + test("Rewind should find the last offset containing data. " + + "records are empty, should always rewind to 0") { + var rewinder = new OffsetWithRecordRewinderMock[String, String]( + Map(0L -> emptyConsumerRecords)) + assert(0 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 10, iteration = 1, 10, null)) + + rewinder = new OffsetWithRecordRewinderMock[String, String](Map(0L -> emptyConsumerRecords)) + assert(0 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 8, iteration = 1, 10, null)) + + rewinder = new OffsetWithRecordRewinderMock[String, String]( + Map(0L -> emptyConsumerRecords, 2L -> emptyConsumerRecords, 12L -> emptyConsumerRecords)) + assert(0 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 22, iteration = 1, 10, null)) + + rewinder = new OffsetWithRecordRewinderMock[String, String]( + Map(0L -> emptyConsumerRecords, 2L -> emptyConsumerRecords, 22L -> emptyConsumerRecords)) + assert(0 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 32, iteration = 1, 10, null)) + + rewinder = new OffsetWithRecordRewinderMock[String, String]( + Map(0L -> emptyConsumerRecords, 12L -> emptyConsumerRecords, 32L -> emptyConsumerRecords)) + assert(0 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 42, iteration = 1, 10, null)) + } + + test("Rewind should not do anything if start = last") { + var rewinder = new OffsetWithRecordRewinderMock[String, String](Map()) + assert(10 === rewinder.rewindUntilDataExist(Map(tp -> 10), tp, 10, iteration = 1, 10, null)) + } + + test("Rewind should find the first existing records") { + var records = List(new ConsumerRecord("topic", 0, 35, "k", "v"), + new ConsumerRecord("topic", 0, 36, "k", "v")).asJava + var rewinder = new OffsetWithRecordRewinderMock[String, String]( + Map(32L -> buildConsumerRecords(records))) + assert(36 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 42, iteration = 1, 10, null)) + + records = List(new ConsumerRecord("topic", 0, 35, "k", "v")).asJava + rewinder = new OffsetWithRecordRewinderMock[String, String]( + Map(32L -> buildConsumerRecords(records))) + assert(35 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 42, iteration = 1, 10, null)) + } + + test("Rewind should find the first existing records and retry") { + val records = List(new ConsumerRecord("topic", 0, 34, "k", "v"), + new ConsumerRecord("topic", 0, 35, "k", "v")).asJava + var rewinder = new OffsetWithRecordRewinderMock[String, String]( + Map(32L -> buildConsumerRecords(records), + 132L -> emptyConsumerRecords)) + assert(35 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 142, iteration = 1, 10, null)) + } + + test("Rewind should find the first existing records even the offset if higher") { + val records = List(new ConsumerRecord("topic", 0, 144, "k", "v"), + new ConsumerRecord("topic", 0, 145, "k", "v")).asJava + var rewinder = new OffsetWithRecordRewinderMock[String, String]( + Map(132L -> buildConsumerRecords(records))) + assert(144 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 142, iteration = 1, 10, null)) + } + + private def buildConsumerRecords(records: ju.List[ConsumerRecord[String, String]]) = { + new ConsumerRecords[String, String](Map(tp -> records).asJava) + } +} From 79d83db0f535fe1e9e5f534a6a0b4fe7c3d6257f Mon Sep 17 00:00:00 2001 From: quentin Date: Mon, 30 Jul 2018 16:47:33 +0200 Subject: [PATCH 2/6] correction indentation --- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index ce42bf72b56ea..57fbedd652434 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kafka010 -import java.{util => ju} +import java.{ util => ju } import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicReference @@ -42,7 +42,7 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator * per second that each '''partition''' will accept. * @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]], * see [[LocationStrategy]] for more details. - * @param consumerStrategy In most cases, pass in [[ConsumerStrategies.Subscribe]], + * @param consumerStrategy In most caseslamp, pass in [[ConsumerStrategies.Subscribe]], * see [[ConsumerStrategy]] for more details * @param ppc configuration of settings such as max rate on a per-partition basis. * see [[PerPartitionConfig]] for more details. @@ -256,7 +256,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { val untilOffsets = handleTransaction(clamp(latestOffsets())) - val offsetRanges = untilOffsets.map { case (tp, uo) => + val offsetRanges = untilOffsets.map { case (tp, uo) => val fo = currentOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo) } From 05c7e7fb96806c07bc9b0513ef59fbcdd5ae9118 Mon Sep 17 00:00:00 2001 From: quentin Date: Mon, 30 Jul 2018 16:53:45 +0200 Subject: [PATCH 3/6] remove wrong comment edit --- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 57fbedd652434..f595f5738b6d1 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -42,7 +42,7 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator * per second that each '''partition''' will accept. * @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]], * see [[LocationStrategy]] for more details. - * @param consumerStrategy In most caseslamp, pass in [[ConsumerStrategies.Subscribe]], + * @param consumerStrategy In most cases, pass in [[ConsumerStrategies.Subscribe]], * see [[ConsumerStrategy]] for more details * @param ppc configuration of settings such as max rate on a per-partition basis. * see [[PerPartitionConfig]] for more details. From 70ecd38e05073c6713855413512fe6a9452f2994 Mon Sep 17 00:00:00 2001 From: quentin Date: Thu, 2 Aug 2018 21:52:47 +0200 Subject: [PATCH 4/6] add offset scan logic and conf --- .../streaming/kafka010/DirectKafkaConf.scala | 45 ++++++ .../kafka010/DirectKafkaInputDStream.scala | 77 +++++----- .../kafka010/KafkaDataConsumer.scala | 27 +++- .../spark/streaming/kafka010/KafkaRDD.scala | 43 ++---- .../streaming/kafka010/OffsetRange.scala | 39 +++-- .../kafka010/OffsetWithRecordRewinder.scala | 133 ------------------ .../kafka010/OffsetWithRecordScanner.scala | 89 ++++++++++++ .../streaming/kafka010/JavaKafkaRDDSuite.java | 4 +- .../streaming/kafka010/KafkaRDDSuite.scala | 20 +-- .../OffsetWithRecordRewinderSuite.scala | 118 ---------------- .../OffsetWithRecordScannerSuite.scala | 110 +++++++++++++++ 11 files changed, 360 insertions(+), 345 deletions(-) create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaConf.scala delete mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinder.scala create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScanner.scala delete mode 100644 external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinderSuite.scala create mode 100644 external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScannerSuite.scala diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaConf.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaConf.scala new file mode 100644 index 0000000000000..8e17817dbf377 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaConf.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import org.apache.spark.SparkConf + +private object DirectKafkaConf { + + def pollTimeout(conf: SparkConf): Long = conf + .getLong("spark.streaming.kafka.consumer.poll.ms", + conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L) + + def cacheInitialCapacity(conf: SparkConf): Int = conf + .getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) + + def cacheMaxCapacity(conf: SparkConf): Int = conf + .getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) + + def cacheLoadFactor(conf: SparkConf): Float = conf + .getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat + + def initialRate(conf: SparkConf): Long = conf + .getLong("spark.streaming.backpressure.initialRate", 0) + + def nonConsecutive(conf: SparkConf): Boolean = conf + .getBoolean("spark.streaming.kafka.allowNonConsecutiveOffsets", false) + + def useConsumerCache(conf: SparkConf): Boolean = conf + .getBoolean("spark.streaming.kafka.consumer.cache.enabled", true) +} diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index f595f5738b6d1..26b7111944495 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -56,23 +56,22 @@ private[spark] class DirectKafkaInputDStream[K, V]( ppc: PerPartitionConfig ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets { - private val initialRate = context.sparkContext.getConf.getLong( - "spark.streaming.backpressure.initialRate", 0) + private val initialRate = DirectKafkaConf.initialRate(context.sparkContext.conf) + private val nonConsecutive = DirectKafkaConf.nonConsecutive(context.sparkContext.conf) - private val alignRangesToCommittedTransaction = context.sparkContext.getConf.getBoolean( - "spark.streaming.kafka.alignRangesToCommittedTransaction", false) - - private val offsetSearchRewind = context.sparkContext.getConf.getLong( - "spark.streaming.kafka.offsetSearchRewind", 10) - - @transient private var rw: OffsetWithRecordRewinder[K, V] = null - def rewinder(): OffsetWithRecordRewinder[K, V] = this.synchronized { + @transient private var rw: OffsetWithRecordScanner[K, V] = null + def rewinder(): OffsetWithRecordScanner[K, V] = this.synchronized { if (rw == null) { val params = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams) - params.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, offsetSearchRewind.toString) - params.remove(ConsumerConfig.GROUP_ID_CONFIG) - rw = new OffsetWithRecordRewinder(offsetSearchRewind, - consumerStrategy.executorKafkaParams) + val cacheInitialCapacity = DirectKafkaConf + .cacheInitialCapacity(context.sparkContext.conf) + val cacheMaxCapacity = DirectKafkaConf.cacheMaxCapacity(context.sparkContext.conf) + val cacheLoadFactor = DirectKafkaConf.cacheLoadFactor(context.sparkContext.conf) + val useConsumerCache = DirectKafkaConf.useConsumerCache(context.sparkContext.conf) + + rw = new OffsetWithRecordScanner( + consumerStrategy.executorKafkaParams, cacheInitialCapacity, + cacheMaxCapacity, cacheLoadFactor, useConsumerCache) } rw } @@ -241,30 +240,47 @@ private[spark] class DirectKafkaInputDStream[K, V]( }.getOrElse(offsets) } - protected def handleTransaction(offsets: Map[TopicPartition, Long]) = { - if (alignRangesToCommittedTransaction) { + /** + * Return the offset range. For non consecutive offset the last offset must have record. + * If offsets have missing data (transaction marker or abort), increases the + * range until we get the requested number of record or no more records. + * Because we have to iterate over all the records in this case, + * we also return the total number of records. + * @param offsets the target range we would like if offset were continue + * @return (totalNumberOfRecords, updated offset) + */ + private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = { + if (nonConsecutive) { val localRw = rewinder() val localOffsets = currentOffsets context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { - tpos.map{case (tp, o) => localRw.rewind(localOffsets, tp, o)} - }).collect().toMap + tpos.map { case (tp, o) => + val offsetAndCount = localRw.getLastOffsetAndCount(localOffsets(tp), tp, o) + (tp, offsetAndCount) + } + }).collect() + .map { case (tp, (untilOffset, size)) => + val fromOffset = currentOffsets(tp) + OffsetRange(tp.topic(), tp.partition, fromOffset, untilOffset, size) + } } else { - offsets + offsets.map { case (tp, untilOffset) => + val size = untilOffset - currentOffsets(tp) + val offsetFrom = currentOffsets(tp) + OffsetRange(tp.topic(), tp.partition, offsetFrom, untilOffset, size) + } } } - override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { - val untilOffsets = handleTransaction(clamp(latestOffsets())) - val offsetRanges = untilOffsets.map { case (tp, uo) => - val fo = currentOffsets(tp) - OffsetRange(tp.topic, tp.partition, fo, uo) - } - val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled", - true) + override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { + val offsets = clamp(latestOffsets()) + val offsetRanges = alignRanges(offsets) + val useConsumerCache = DirectKafkaConf.useConsumerCache(context.conf) val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, useConsumerCache) + // Report the record number and metadata of this batch interval to InputInfoTracker. val description = offsetRanges.filter { offsetRange => // Don't display empty ranges. @@ -280,7 +296,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( val inputInfo = StreamInputInfo(id, rdd.count, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) - currentOffsets = untilOffsets + currentOffsets = offsetRanges.map(r => (r.topicPartition(), r.untilOffset)).toMap commitAll() Some(rdd) } @@ -299,9 +315,6 @@ private[spark] class DirectKafkaInputDStream[K, V]( if (kc != null) { kc.close() } - if(rw != null) { - rw.close() - } } protected val commitQueue = new ConcurrentLinkedQueue[OffsetRange] @@ -342,7 +355,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( private[streaming] class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) { - def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = { + def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long, Long)]] = { data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]] } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index 68c5fe9ab066a..81327e62287fe 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.kafka010 import java.{util => ju} -import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords, KafkaConsumer} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.spark.TaskContext @@ -56,6 +56,10 @@ private[kafka010] sealed trait KafkaDataConsumer[K, V] { internalConsumer.compactedNext(pollTimeoutMs) } + def next(pollTimeoutMs: Long): Option[ConsumerRecord[K, V]] = { + internalConsumer.next(pollTimeoutMs) + } + /** * Rewind to previous record in the batch from a compacted topic. * @@ -183,6 +187,22 @@ private[kafka010] class InternalKafkaConsumer[K, V]( record } + /** + * Similar to compactedStart but will return None if poll doesn't + * return any value. Won't throw any exception. + */ + def next(pollTimeoutMs: Long): Option[ConsumerRecord[K, V]] = { + if (!buffer.hasNext()) { + poll(pollTimeoutMs) + } + if (!buffer.hasNext) { + return None + } + val record = buffer.next() + nextOffset = record.offset + 1 + Some(record) + } + /** * Rewind to previous record in the batch from a compacted topic. * @throws NoSuchElementException if no previous element @@ -191,6 +211,11 @@ private[kafka010] class InternalKafkaConsumer[K, V]( buffer.previous() } + def seekAndPoll(offset: Long, timeout: Long): ConsumerRecords[K, V] = { + seek(offset) + consumer.poll(timeout) + } + private def seek(offset: Long): Unit = { logDebug(s"Seeking to $topicPartition $offset") consumer.seek(topicPartition, offset) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 3efc90fe466b2..a7876f1332a82 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -64,16 +64,11 @@ private[spark] class KafkaRDD[K, V]( " must be set to false for executor kafka params, else offsets may commit before processing") // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? - private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", - conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L) - private val cacheInitialCapacity = - conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) - private val cacheMaxCapacity = - conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) - private val cacheLoadFactor = - conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat - private val compacted = - conf.getBoolean("spark.streaming.kafka.allowNonConsecutiveOffsets", false) + private val pollTimeout = DirectKafkaConf.pollTimeout(conf) + private val cacheInitialCapacity = DirectKafkaConf.cacheInitialCapacity(conf) + private val cacheMaxCapacity = DirectKafkaConf.cacheMaxCapacity(conf) + private val cacheLoadFactor = DirectKafkaConf.cacheLoadFactor(conf) + private val compacted = DirectKafkaConf.nonConsecutive(conf) override def persist(newLevel: StorageLevel): this.type = { logError("Kafka ConsumerRecord is not serializable. " + @@ -87,30 +82,17 @@ private[spark] class KafkaRDD[K, V]( }.toArray } - override def count(): Long = - if (compacted) { - super.count() - } else { - offsetRanges.map(_.count).sum - } + override def count(): Long = offsetRanges.map(_.count).sum override def countApprox( timeout: Long, confidence: Double = 0.95 - ): PartialResult[BoundedDouble] = - if (compacted) { - super.countApprox(timeout, confidence) - } else { + ): PartialResult[BoundedDouble] = { val c = count new PartialResult(new BoundedDouble(c, 1.0, c, c), true) } - override def isEmpty(): Boolean = - if (compacted) { - super.isEmpty() - } else { - count == 0L - } + override def isEmpty(): Boolean = count == 0L override def take(num: Int): Array[ConsumerRecord[K, V]] = if (compacted) { @@ -291,7 +273,6 @@ private class CompactedKafkaRDDIterator[K, V]( consumer.compactedStart(part.fromOffset, pollTimeout) - private var nextRecord = consumer.compactedNext(pollTimeout) private var okNext: Boolean = true @@ -301,15 +282,9 @@ private class CompactedKafkaRDDIterator[K, V]( if (!hasNext) { throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached") } - val r = nextRecord + val r = consumer.compactedNext(pollTimeout) if (r.offset + 1 >= part.untilOffset) { okNext = false - } else { - nextRecord = consumer.compactedNext(pollTimeout) - if (nextRecord.offset >= part.untilOffset) { - okNext = false - consumer.compactedPrevious() - } } r } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala index c66d3c9b8d229..400c2d9f272a0 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala @@ -90,21 +90,23 @@ final class OffsetRange private( val topic: String, val partition: Int, val fromOffset: Long, - val untilOffset: Long) extends Serializable { + val untilOffset: Long, + val recordNumber: Long) extends Serializable { import OffsetRange.OffsetRangeTuple /** Kafka TopicPartition object, for convenience */ def topicPartition(): TopicPartition = new TopicPartition(topic, partition) /** Number of messages this OffsetRange refers to */ - def count(): Long = untilOffset - fromOffset + def count(): Long = recordNumber override def equals(obj: Any): Boolean = obj match { case that: OffsetRange => this.topic == that.topic && this.partition == that.partition && this.fromOffset == that.fromOffset && - this.untilOffset == that.untilOffset + this.untilOffset == that.untilOffset && + this.recordNumber == that.recordNumber case _ => false } @@ -113,41 +115,48 @@ final class OffsetRange private( } override def toString(): String = { - s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])" + s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset]," + + s" recordNumber: $recordNumber)" } /** this is to avoid ClassNotFoundException during checkpoint restore */ private[streaming] - def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset) + def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset, recordNumber) } /** * Companion object the provides methods to create instances of [[OffsetRange]]. */ object OffsetRange { - def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = - new OffsetRange(topic, partition, fromOffset, untilOffset) + def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long, + recordNumber: Long): OffsetRange = + new OffsetRange(topic, partition, fromOffset, untilOffset, recordNumber) def create( topicPartition: TopicPartition, fromOffset: Long, - untilOffset: Long): OffsetRange = - new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset) + untilOffset: Long, + recordNumber: Long): OffsetRange = + new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset, + recordNumber) - def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = - new OffsetRange(topic, partition, fromOffset, untilOffset) + def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long, + recordNumber: Long): OffsetRange = + new OffsetRange(topic, partition, fromOffset, untilOffset, recordNumber) def apply( topicPartition: TopicPartition, fromOffset: Long, - untilOffset: Long): OffsetRange = - new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset) + untilOffset: Long, + recordNumber: Long): OffsetRange = + new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset, + recordNumber) /** this is to avoid ClassNotFoundException during checkpoint restore */ private[kafka010] - type OffsetRangeTuple = (String, Int, Long, Long) + type OffsetRangeTuple = (String, Int, Long, Long, Long) private[kafka010] def apply(t: OffsetRangeTuple) = - new OffsetRange(t._1, t._2, t._3, t._4) + new OffsetRange(t._1, t._2, t._3, t._4, t._4) } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinder.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinder.scala deleted file mode 100644 index d4684dd77b26a..0000000000000 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinder.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka010 - -import java.{util => ju} - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords, KafkaConsumer} -import org.apache.kafka.common.TopicPartition - -import org.apache.spark.internal.Logging - - -protected class KafkaConsumerHolder[K, V](private val kafkaParams: ju.Map[String, Object]) { - private var closed = false - private var consumers: mutable.Map[TopicPartition, Consumer[K, V]] = mutable.Map() - - /** - * Get a consumer from the pool. Creates one if needed. - * Creates 1 consmuer per TopicPartition to be able to fetch multiple partitions - * at the same time. - */ - def getConsumer(tp: TopicPartition): Consumer[K, V] = { - this.synchronized { - assert(!closed, "Consumers have been closed, can't open new.") - if (!consumers.contains(tp)) { - val c = new KafkaConsumer[K, V](kafkaParams) - c.assign(Set(tp).asJava) - consumers(tp) = c - } - consumers(tp) - } - } - - def close(): Unit = { - this.synchronized { - closed = true - consumers.foreach(_._2.close()) - } - } -} - -/** - * If we endup on an empty offset (transaction marker or abort transaction), - * a call to c.poll() won't return any data and we can't tell if it's because - * we missed data or the offset is empty. - * To prevent that, we change the offset range to always end on an offset with - * data. The range can be increased or reduced to the next living offset. - * @param rewind how many offset we'll try to rewind before seeking offset with data - * @param kafkaParams kafka params. Isolation level must be read_committed - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - */ -class OffsetWithRecordRewinder[K, V]( - rewind: Long, - kafkaParams: ju.Map[String, Object] - ) extends Logging with Serializable { - - if (!kafkaParams.containsKey("isolation.level") || - kafkaParams.get("isolation.level") != "read_committed") { - throw new IllegalStateException("DirectStream only support read_committed." + - "Please add isolation.level = read_committed to your kafka configuration") - } - - lazy val holder = new KafkaConsumerHolder[K, V](kafkaParams) - - def close(): Unit = { - holder.close() - } - - def rewind(currentOffsets: Map[TopicPartition, Long], tp: TopicPartition, - o: Long): (TopicPartition, Long) = { - val consumer = holder.getConsumer(tp) - val rewindOffset = rewindUntilDataExist(currentOffsets, tp, o, 1, rewind, consumer) - (tp, rewindOffset) - } - - // Try to poll the last message if any. - def rewindUntilDataExist(currentOffsets: Map[TopicPartition, Long], tp: TopicPartition, - toOffset: Long, iteration: Int, rewind: Long, - consumer: Consumer[K, V]): Long = { - if (toOffset <= currentOffsets(tp)) { - logDebug(s"can't rewind further: $toOffset <= ${currentOffsets(tp)}. Stop") - return toOffset - } - val nextStart = toOffset - Math.pow(rewind, iteration).toLong - val startOffsetScan = Math.max(currentOffsets(tp), nextStart) - logDebug(s"searching the next non-empty offset in ${tp} from offset $startOffsetScan") - val records: ConsumerRecords[K, V] = seekAndPoll(consumer, tp, startOffsetScan) - val smallestRecordOffset = records.iterator().asScala - .foldLeft(startOffsetScan)((maxOffset, r) => r.offset() match { - // we get an offset after our range, but our max is already after the range, we take the min - case recordOffset if recordOffset >= toOffset && maxOffset > toOffset => maxOffset - // we get an offset after our range, take it (it'll increase the range). - case recordOffset if recordOffset >= toOffset => recordOffset - // we get an offset bigger than the max but before our range, take it. - case recordOffset if recordOffset > maxOffset => recordOffset - // we get an offset smaller than the max, ignore it. - case _ => maxOffset - }) - // we have at least 1 offset with data, use this one as final range - if(smallestRecordOffset > startOffsetScan) { - smallestRecordOffset - } else { - // if we don't get any data, try to rewind faster - rewindUntilDataExist(currentOffsets, tp, startOffsetScan, iteration + 1, rewind, consumer) - } - } - - protected def seekAndPoll(c: Consumer[K, V], tp: TopicPartition, startOffsetScan: Long) = { - c.seek(tp, startOffsetScan) - // Try to get the last records. We want at least 1 living record - // Will be slow if all range data is aborted transactions - c.poll(1000) - } -} diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScanner.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScanner.scala new file mode 100644 index 0000000000000..e342759cbf881 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScanner.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + + +/** + * If we endup on an empty offset (transaction marker or abort transaction), + * a call to c.poll() won't return any data and we can't tell if it's because + * we missed data or the offset is empty. + * To prevent that, we change the offset range to always end on an offset with + * data. The range can be increased or reduced to the next living offset. + * @param kafkaParams kafka params. Isolation level must be read_committed + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +class OffsetWithRecordScanner[K, V]( + kafkaParams: ju.Map[String, Object], + cacheInitialCapacity: Int, + cacheMaxCapacity: Int, + cacheLoadFactor: Float, + useConsumerCache: Boolean + ) extends Logging with Serializable { + + if (kafkaParams.containsKey("isolation.level") && + kafkaParams.get("isolation.level") != "read_committed") { + throw new IllegalStateException("DirectStream only support read_committed." + + "Please add isolation.level = read_committed to your kafka configuration") + } + + def getLastOffsetAndCount(fromOffset: Long, tp: TopicPartition, + toOffset: Long): (Long, Long) = { + if (toOffset <= fromOffset) { + return (toOffset, 0) + } + logDebug(s"Initial offsets for $tp: [$fromOffset, $toOffset]") + KafkaDataConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) + val c = KafkaDataConsumer.acquire[K, V](tp, kafkaParams, TaskContext.get, useConsumerCache) + val (o, size) = iterateUntilLastOrEmpty(fromOffset, 0, c, toOffset - fromOffset) + c.release() + logDebug(s"Rectified offset for $tp: [$fromOffset, $o]. Real size=$size") + (o, size) + } + + final def iterateUntilLastOrEmpty(lastOffset: Long, count: Long, + c: KafkaDataConsumer[K, V], rangeSize: Long): (Long, Long) = { + iterate(lastOffset - 1, count, c, rangeSize) + } + + @tailrec + final def iterate(lastOffset: Long, count: Long, + c: KafkaDataConsumer[K, V], rangeSize: Long): (Long, Long) = { + getNext(c) match { + // No more records or can't get new one. stop here + case None => (lastOffset + 1, count) + // We have enough records. stop here + case Some(r) if count + 1 >= rangeSize => (r.offset() + 1, count + 1) + case Some(r) => iterate(r.offset(), count + 1, c, rangeSize) + } + } + + protected def getNext(c: KafkaDataConsumer[K, V]) = { + c.next(1000) + } +} diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java index b20fad2291262..813f1877b83cb 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java @@ -79,8 +79,8 @@ public void testKafkaRDD() throws InterruptedException { "-" + System.currentTimeMillis()); OffsetRange[] offsetRanges = { - OffsetRange.create(topic1, 0, 0, 1), - OffsetRange.create(topic2, 0, 0, 1) + OffsetRange.create(topic1, 0, 0, 1, 1), + OffsetRange.create(topic2, 0, 0, 1, 1) }; Map leaders = new HashMap<>(); diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 271adea1df731..5f1e660e6c818 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -113,7 +113,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { val kafkaParams = getKafkaParams() - val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) + val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size, messages.size)) val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams, offsetRanges, preferredHosts) .map(_.value) @@ -130,12 +130,12 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { assert(rdd.take(messages.size + 10).size === messages.size) val emptyRdd = KafkaUtils.createRDD[String, String]( - sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts) + sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0, 0)), preferredHosts) assert(emptyRdd.isEmpty) // invalid offset ranges throw exceptions - val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1)) + val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1, messages.size + 1)) intercept[SparkException] { val result = KafkaUtils.createRDD[String, String](sc, kafkaParams, badRanges, preferredHosts) .map(_.value) @@ -177,7 +177,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { val kafkaParams = getKafkaParams() - val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) + val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size, messages.size)) val rdd = KafkaUtils.createRDD[String, String]( sc, kafkaParams, offsetRanges, preferredHosts @@ -195,12 +195,12 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { assert(rdd.take(messages.size + 10).size === compactedMessages.size) val emptyRdd = KafkaUtils.createRDD[String, String]( - sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts) + sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0, 0)), preferredHosts) assert(emptyRdd.isEmpty) // invalid offset ranges throw exceptions - val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1)) + val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1, messages.size + 1)) intercept[SparkException] { val result = KafkaUtils.createRDD[String, String](sc, kafkaParams, badRanges, preferredHosts) .map(_.value) @@ -221,7 +221,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { var sentCount = sent.values.sum val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams, - Array(OffsetRange(topic, 0, 0, sentCount)), preferredHosts) + Array(OffsetRange(topic, 0, 0, sentCount, sentCount)), preferredHosts) val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum @@ -232,7 +232,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { // this is the "0 messages" case val rdd2 = KafkaUtils.createRDD[String, String](sc, kafkaParams, - Array(OffsetRange(topic, 0, sentCount, sentCount)), preferredHosts) + Array(OffsetRange(topic, 0, sentCount, sentCount, 0)), preferredHosts) // shouldn't get anything, since message is sent after rdd was defined val sentOnlyOne = Map("d" -> 1) @@ -243,7 +243,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above val rdd3 = KafkaUtils.createRDD[String, String](sc, kafkaParams, - Array(OffsetRange(topic, 0, sentCount, sentCount + 1)), preferredHosts) + Array(OffsetRange(topic, 0, sentCount, sentCount + 1, 1)), preferredHosts) // send lots of messages after rdd was defined, they shouldn't show up kafkaTestUtils.sendMessages(topic, Map("extra" -> 22)) @@ -258,7 +258,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { val rdd = new KafkaRDD[String, String]( sc, kafkaParams, - Array(OffsetRange("unused", 0, 1, 2)), + Array(OffsetRange("unused", 0, 1, 2, 1)), ju.Collections.emptyMap[TopicPartition, String](), true) val a3 = ExecutorCacheTaskLocation("a", "3") diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinderSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinderSuite.scala deleted file mode 100644 index a5aa91983ff78..0000000000000 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordRewinderSuite.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka010 - -import java.{util => ju} - -import scala.collection.JavaConverters._ - -import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerRecords} -import org.apache.kafka.common.TopicPartition - -import org.apache.spark.SparkFunSuite -import org.apache.spark.internal.Logging - -class OffsetWithRecordRewinderSuite - extends SparkFunSuite - with Logging { - - class OffsetWithRecordRewinderMock[K, V](records: Map[Long, ConsumerRecords[K, V]]) - extends OffsetWithRecordRewinder[K, V]( 10, - Map[String, Object]("isolation.level" -> "read_committed").asJava) { - - override protected def seekAndPoll(c: Consumer[K, V], tp: TopicPartition, offset: Long) = { - records(offset) - } - - } - - val emptyConsumerRecords = new ConsumerRecords[String, String](ju.Collections.emptyMap()) - val tp = new TopicPartition("topic", 0) - - test("Rewinder construction should fail if isolation level isn't set to committed") { - intercept[IllegalStateException] { - new OffsetWithRecordRewinder[String, String](10, - Map[String, Object]().asJava) - } - intercept[IllegalStateException] { - new OffsetWithRecordRewinder[String, String](10, - Map[String, Object]("isolation.level" -> "read_uncommitted").asJava) - } - } - - test("Rewind should find the last offset containing data. " + - "records are empty, should always rewind to 0") { - var rewinder = new OffsetWithRecordRewinderMock[String, String]( - Map(0L -> emptyConsumerRecords)) - assert(0 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 10, iteration = 1, 10, null)) - - rewinder = new OffsetWithRecordRewinderMock[String, String](Map(0L -> emptyConsumerRecords)) - assert(0 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 8, iteration = 1, 10, null)) - - rewinder = new OffsetWithRecordRewinderMock[String, String]( - Map(0L -> emptyConsumerRecords, 2L -> emptyConsumerRecords, 12L -> emptyConsumerRecords)) - assert(0 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 22, iteration = 1, 10, null)) - - rewinder = new OffsetWithRecordRewinderMock[String, String]( - Map(0L -> emptyConsumerRecords, 2L -> emptyConsumerRecords, 22L -> emptyConsumerRecords)) - assert(0 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 32, iteration = 1, 10, null)) - - rewinder = new OffsetWithRecordRewinderMock[String, String]( - Map(0L -> emptyConsumerRecords, 12L -> emptyConsumerRecords, 32L -> emptyConsumerRecords)) - assert(0 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 42, iteration = 1, 10, null)) - } - - test("Rewind should not do anything if start = last") { - var rewinder = new OffsetWithRecordRewinderMock[String, String](Map()) - assert(10 === rewinder.rewindUntilDataExist(Map(tp -> 10), tp, 10, iteration = 1, 10, null)) - } - - test("Rewind should find the first existing records") { - var records = List(new ConsumerRecord("topic", 0, 35, "k", "v"), - new ConsumerRecord("topic", 0, 36, "k", "v")).asJava - var rewinder = new OffsetWithRecordRewinderMock[String, String]( - Map(32L -> buildConsumerRecords(records))) - assert(36 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 42, iteration = 1, 10, null)) - - records = List(new ConsumerRecord("topic", 0, 35, "k", "v")).asJava - rewinder = new OffsetWithRecordRewinderMock[String, String]( - Map(32L -> buildConsumerRecords(records))) - assert(35 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 42, iteration = 1, 10, null)) - } - - test("Rewind should find the first existing records and retry") { - val records = List(new ConsumerRecord("topic", 0, 34, "k", "v"), - new ConsumerRecord("topic", 0, 35, "k", "v")).asJava - var rewinder = new OffsetWithRecordRewinderMock[String, String]( - Map(32L -> buildConsumerRecords(records), - 132L -> emptyConsumerRecords)) - assert(35 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 142, iteration = 1, 10, null)) - } - - test("Rewind should find the first existing records even the offset if higher") { - val records = List(new ConsumerRecord("topic", 0, 144, "k", "v"), - new ConsumerRecord("topic", 0, 145, "k", "v")).asJava - var rewinder = new OffsetWithRecordRewinderMock[String, String]( - Map(132L -> buildConsumerRecords(records))) - assert(144 === rewinder.rewindUntilDataExist(Map(tp -> 0), tp, 142, iteration = 1, 10, null)) - } - - private def buildConsumerRecords(records: ju.List[ConsumerRecord[String, String]]) = { - new ConsumerRecords[String, String](Map(tp -> records).asJava) - } -} diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScannerSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScannerSuite.scala new file mode 100644 index 0000000000000..262e8f36e80db --- /dev/null +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/OffsetWithRecordScannerSuite.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerRecords} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.Logging + +class OffsetWithRecordScannerSuite + extends SparkFunSuite + with Logging { + + class OffsetWithRecordScannerMock[K, V](records: List[Option[ConsumerRecord[K, V]]]) + extends OffsetWithRecordScanner[K, V]( + Map[String, Object]("isolation.level" -> "read_committed").asJava, 1, 1, 0.75F, true) { + var i = -1 + override protected def getNext(c: KafkaDataConsumer[K, V]): Option[ConsumerRecord[K, V]] = { + i = i + 1 + records(i) + } + + } + + val emptyConsumerRecords = new ConsumerRecords[String, String](ju.Collections.emptyMap()) + val tp = new TopicPartition("topic", 0) + + test("Rewinder construction should fail if isolation level isn set to read_committed") { + intercept[IllegalStateException] { + new OffsetWithRecordScanner[String, String]( + Map[String, Object]("isolation.level" -> "read_uncommitted").asJava, 1, 1, 0.75F, true) + } + } + + test("Rewinder construction shouldn't fail if isolation level isn't set") { + assert(new OffsetWithRecordScanner[String, String]( + Map[String, Object]().asJava, 1, 1, 0.75F, true) != null) + } + + test("Rewinder construction should fail if isolation level isn't set to committed") { + intercept[IllegalStateException] { + new OffsetWithRecordScanner[String, String]( + Map[String, Object]("isolation.level" -> "read_uncommitted").asJava, 1, 1, 0.75F, true) + } + } + + test("Rewind should return the proper count.") { + var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), Some(1), Some(2), Some(3))) + val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2) + assert(offset === 2) + assert(size === 2) + } + + test("Rewind should return the proper count with gap") { + var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), Some(1), Some(3), Some(4), Some(5))) + val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 3) + assert(offset === 4) + assert(size === 3) + } + + test("Rewind should return the proper count for the end of the iterator") { + var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), Some(1), Some(2), None)) + val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 3) + assert(offset === 3) + assert(size === 3) + } + + test("Rewind should return the proper count missing data") { + var scanner = new OffsetWithRecordScannerMock[String, String]( + records(Some(0), None)) + val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2) + assert(offset === 1) + assert(size === 1) + } + + test("Rewind should return the proper count without data") { + var scanner = new OffsetWithRecordScannerMock[String, String]( + records(None)) + val (offset, size) = scanner.iterateUntilLastOrEmpty(0, 0, null, 2) + assert(offset === 0) + assert(size === 0) + } + + private def records(offsets: Option[Long]*) = { + offsets.map(o => o.map(new ConsumerRecord("topic", 0, _, "k", "v"))).toList + } +} From 29c5406be32c112c11a6552b06925d3541c7d472 Mon Sep 17 00:00:00 2001 From: quentin Date: Thu, 2 Aug 2018 22:08:09 +0200 Subject: [PATCH 5/6] keep OffsetRange creation method for backward compatibility --- .../kafka010/DirectKafkaInputDStream.scala | 3 +- .../streaming/kafka010/OffsetRange.scala | 40 ++++++++++++++----- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 26b7111944495..a1d6e22821970 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -265,9 +265,8 @@ private[spark] class DirectKafkaInputDStream[K, V]( } } else { offsets.map { case (tp, untilOffset) => - val size = untilOffset - currentOffsets(tp) val offsetFrom = currentOffsets(tp) - OffsetRange(tp.topic(), tp.partition, offsetFrom, untilOffset, size) + OffsetRange(tp.topic(), tp.partition, offsetFrom, untilOffset) } } } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala index 400c2d9f272a0..326172dd8bfaa 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala @@ -132,25 +132,43 @@ object OffsetRange { recordNumber: Long): OffsetRange = new OffsetRange(topic, partition, fromOffset, untilOffset, recordNumber) - def create( - topicPartition: TopicPartition, - fromOffset: Long, - untilOffset: Long, - recordNumber: Long): OffsetRange = + def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = + new OffsetRange(topic, partition, fromOffset, untilOffset, untilOffset - fromOffset) + + def create(topicPartition: TopicPartition, + fromOffset: Long, + untilOffset: Long, + recordNumber: Long): OffsetRange = + new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset, + recordNumber) + + def create(topicPartition: TopicPartition, + fromOffset: Long, + untilOffset: Long): OffsetRange = new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset, - recordNumber) + untilOffset - fromOffset) def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long, recordNumber: Long): OffsetRange = new OffsetRange(topic, partition, fromOffset, untilOffset, recordNumber) def apply( - topicPartition: TopicPartition, - fromOffset: Long, - untilOffset: Long, - recordNumber: Long): OffsetRange = + topicPartition: TopicPartition, + fromOffset: Long, + untilOffset: Long, + recordNumber: Long): OffsetRange = + new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset, + recordNumber) + + def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = + new OffsetRange(topic, partition, fromOffset, untilOffset, untilOffset - fromOffset) + + def apply( + topicPartition: TopicPartition, + fromOffset: Long, + untilOffset: Long): OffsetRange = new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset, - recordNumber) + untilOffset - fromOffset) /** this is to avoid ClassNotFoundException during checkpoint restore */ private[kafka010] From 69582f46cb8c7f7285c42b86d06bf475a43a3856 Mon Sep 17 00:00:00 2001 From: quentin Date: Thu, 2 Aug 2018 22:14:15 +0200 Subject: [PATCH 6/6] revert unecessary modification in test suite on offset range --- .../streaming/kafka010/KafkaRDDSuite.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 5f1e660e6c818..271adea1df731 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -113,7 +113,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { val kafkaParams = getKafkaParams() - val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size, messages.size)) + val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams, offsetRanges, preferredHosts) .map(_.value) @@ -130,12 +130,12 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { assert(rdd.take(messages.size + 10).size === messages.size) val emptyRdd = KafkaUtils.createRDD[String, String]( - sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0, 0)), preferredHosts) + sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts) assert(emptyRdd.isEmpty) // invalid offset ranges throw exceptions - val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1, messages.size + 1)) + val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1)) intercept[SparkException] { val result = KafkaUtils.createRDD[String, String](sc, kafkaParams, badRanges, preferredHosts) .map(_.value) @@ -177,7 +177,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { val kafkaParams = getKafkaParams() - val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size, messages.size)) + val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) val rdd = KafkaUtils.createRDD[String, String]( sc, kafkaParams, offsetRanges, preferredHosts @@ -195,12 +195,12 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { assert(rdd.take(messages.size + 10).size === compactedMessages.size) val emptyRdd = KafkaUtils.createRDD[String, String]( - sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0, 0)), preferredHosts) + sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts) assert(emptyRdd.isEmpty) // invalid offset ranges throw exceptions - val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1, messages.size + 1)) + val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1)) intercept[SparkException] { val result = KafkaUtils.createRDD[String, String](sc, kafkaParams, badRanges, preferredHosts) .map(_.value) @@ -221,7 +221,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { var sentCount = sent.values.sum val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams, - Array(OffsetRange(topic, 0, 0, sentCount, sentCount)), preferredHosts) + Array(OffsetRange(topic, 0, 0, sentCount)), preferredHosts) val ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum @@ -232,7 +232,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { // this is the "0 messages" case val rdd2 = KafkaUtils.createRDD[String, String](sc, kafkaParams, - Array(OffsetRange(topic, 0, sentCount, sentCount, 0)), preferredHosts) + Array(OffsetRange(topic, 0, sentCount, sentCount)), preferredHosts) // shouldn't get anything, since message is sent after rdd was defined val sentOnlyOne = Map("d" -> 1) @@ -243,7 +243,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above val rdd3 = KafkaUtils.createRDD[String, String](sc, kafkaParams, - Array(OffsetRange(topic, 0, sentCount, sentCount + 1, 1)), preferredHosts) + Array(OffsetRange(topic, 0, sentCount, sentCount + 1)), preferredHosts) // send lots of messages after rdd was defined, they shouldn't show up kafkaTestUtils.sendMessages(topic, Map("extra" -> 22)) @@ -258,7 +258,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { val rdd = new KafkaRDD[String, String]( sc, kafkaParams, - Array(OffsetRange("unused", 0, 1, 2, 1)), + Array(OffsetRange("unused", 0, 1, 2)), ju.Collections.emptyMap[TopicPartition, String](), true) val a3 = ExecutorCacheTaskLocation("a", "3")