From 3082de7e43e8c381dc2227005d1e0fc5bd2c3d29 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Sat, 8 Oct 2016 16:21:48 -0500 Subject: [PATCH 1/9] [SPARK-17147][STREAMING][KAFKA] failing test for compacted topics --- .../kafka010/CachedKafkaConsumer.scala | 2 + .../streaming/kafka010/KafkaTestUtils.scala | 56 ++++++++++- .../kafka010/mocks/MockScheduler.scala | 92 +++++++++++++++++++ .../streaming/kafka010/mocks/MockTime.scala | 50 ++++++++++ .../streaming/kafka010/KafkaRDDSuite.scala | 63 +++++++++++++ 5 files changed, 259 insertions(+), 4 deletions(-) create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala index fa3ea6131a507..f381255f48243 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala @@ -99,6 +99,8 @@ class CachedKafkaConsumer[K, V] private( val p = consumer.poll(timeout) val r = p.records(topicPartition) logDebug(s"Polled ${p.partitions()} ${r.size}") + import scala.collection.JavaConverters._ + r.asScala.foreach { x => System.err.println(s"${x.offset} ${x.key} ${x.value}") } buffer = r.iterator } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index e73823e89883b..4b11f034d5197 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -30,7 +30,7 @@ import scala.util.control.NonFatal import kafka.admin.AdminUtils import kafka.api.Request import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.ZkUtils +import kafka.utils.{ Pool, ZkUtils } import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} @@ -38,6 +38,7 @@ import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.streaming.Time +import org.apache.spark.streaming.kafka010.mocks.MockTime import org.apache.spark.util.Utils /** @@ -73,6 +74,8 @@ private[kafka010] class KafkaTestUtils extends Logging { private var zkReady = false private var brokerReady = false + private val mockTime = new MockTime() + def zkAddress: String = { assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") s"$zkHost:$zkPort" @@ -148,20 +151,55 @@ private[kafka010] class KafkaTestUtils extends Logging { zookeeper.shutdown() zookeeper = null } + + mockTime.scheduler.shutdown() + } + + /** + * Compact logs for the given topic / partition until offset + */ + def compactLogs(topic: String, partition: Int, offset: Long) { + import kafka.log._ + import kafka.common.TopicAndPartition + + val cleaner = { + val logs = new Pool[TopicAndPartition, Log]() + val logDir = brokerConf.logDirs.head + val dir = new java.io.File(logDir, topic + "-" + partition) + val logProps = new Properties() + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + val log = new Log( + dir, + LogConfig(logProps), + 0L, + mockTime.scheduler, + mockTime + ) + logs.put(TopicAndPartition(topic, partition), log) + System.err.println(s"built cleaner for compacting logs for $dir") + new LogCleaner(CleanerConfig(), logDirs = Array(dir), logs = logs) + } + cleaner.awaitCleaned(topic, partition, offset) + System.err.println("finished cleaning") } /** Create a Kafka topic and wait until it is propagated to the whole cluster */ - def createTopic(topic: String, partitions: Int): Unit = { - AdminUtils.createTopic(zkUtils, topic, partitions, 1) + def createTopic(topic: String, partitions: Int, config: Properties): Unit = { + AdminUtils.createTopic(zkUtils, topic, partitions, 1, config) // wait until metadata is propagated (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) } } + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ + def createTopic(topic: String, partitions: Int): Unit = { + createTopic(topic, partitions, new Properties) + } + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ def createTopic(topic: String): Unit = { - createTopic(topic, 1) + createTopic(topic, 1, new Properties) } /** Java-friendly function for sending messages to the Kafka broker */ @@ -185,6 +223,16 @@ private[kafka010] class KafkaTestUtils extends Logging { producer = null } + /** Send the array of (key, value) messages to the Kafka broker */ + def sendMessages(topic: String, messages: Array[(String, String)]): Unit = { + producer = new KafkaProducer[String, String](producerConfiguration) + messages.foreach { message => + producer.send(new ProducerRecord[String, String](topic, message._1, message._2)) + } + producer.close() + producer = null + } + private def brokerConfiguration: Properties = { val props = new Properties() props.put("broker.id", "0") diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala new file mode 100644 index 0000000000000..ecf6287b858ac --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kafka010.mocks + +import scala.collection.mutable.PriorityQueue +import java.util.concurrent.TimeUnit +import kafka.utils.{Scheduler, Time} + +/** + * A mock scheduler that executes tasks synchronously using a mock time instance. Tasks are executed synchronously when + * the time is advanced. This class is meant to be used in conjunction with MockTime. + * + * Example usage + * + * val time = new MockTime + * time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000) + * time.sleep(1001) // this should cause our scheduled task to fire + * + * + * Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time). + */ +private[kafka010] class MockScheduler(val time: Time) extends Scheduler { + + /* a priority queue of tasks ordered by next execution time */ + var tasks = new PriorityQueue[MockTask]() + + def isStarted = true + + def startup() {} + + def shutdown() { + this synchronized { + tasks.foreach(_.fun()) + tasks.clear() + } + } + + /** + * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs + * when this method is called and the execution happens synchronously in the calling thread. + * If you are using the scheduler associated with a MockTime instance this call be triggered automatically. + */ + def tick() { + this synchronized { + val now = time.milliseconds + while(!tasks.isEmpty && tasks.head.nextExecution <= now) { + /* pop and execute the task with the lowest next execution time */ + val curr = tasks.dequeue + curr.fun() + /* if the task is periodic, reschedule it and re-enqueue */ + if(curr.periodic) { + curr.nextExecution += curr.period + this.tasks += curr + } + } + } + } + + def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) { + this synchronized { + tasks += MockTask(name, fun, time.milliseconds + delay, period = period) + tick() + } + } + +} + +case class MockTask(val name: String, val fun: () => Unit, var nextExecution: Long, val period: Long) extends Ordered[MockTask] { + def periodic = period >= 0 + def compare(t: MockTask): Int = { + if(t.nextExecution == nextExecution) + 0 + else if (t.nextExecution < nextExecution) + -1 + else + 1 + } +} diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala new file mode 100644 index 0000000000000..de80b5c284aca --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010.mocks + +import java.util.concurrent._ +import kafka.utils.Time + +/** + * A class used for unit testing things which depend on the Time interface. + * + * This class never manually advances the clock, it only does so when you call + * sleep(ms) + * + * It also comes with an associated scheduler instance for managing background tasks in + * a deterministic way. + */ +private[kafka010] class MockTime(@volatile private var currentMs: Long) extends Time { + + val scheduler = new MockScheduler(this) + + def this() = this(System.currentTimeMillis) + + def milliseconds: Long = currentMs + + def nanoseconds: Long = + TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS) + + def sleep(ms: Long) { + this.currentMs += ms + scheduler.tick() + } + + override def toString() = "MockTime(%d)".format(milliseconds) + +} diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index be373af0599cc..90c46688b0017 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -102,6 +102,69 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { } } + test("compacted topic") { + val topic = s"topiccompacted-${Random.nextInt}-${System.currentTimeMillis}" + val props = new ju.Properties() + props.put("cleanup.policy", "compact") + props.put("flush.messages", "1") + props.put("segment.ms", "1") + props.put("segment.bytes", "256") + kafkaTestUtils.createTopic(topic, 1, props) + val messages = Array( + ("a", "1"), + ("b", "1"), + ("b", "2"), + ("c", "1"), + ("b", "3"), + ("a", "2"), + ("c", "2") + ) + val compactedMessages = Array( + ("a", "2"), + ("b", "3"), + ("c", "2") + ) + + kafkaTestUtils.sendMessages(topic, messages) + // send some junk to fill a log segment + kafkaTestUtils.sendMessages(topic, Array.fill(100)("garbage" -> "1")) + // wait for log compaction + kafkaTestUtils.compactLogs(topic, 0, messages.size - 1) + + Thread.sleep(100000) + val kafkaParams = getKafkaParams() + + val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) + + val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams, offsetRanges, preferredHosts) + .map(m => m.key -> m.value) + .filter(_._1 != "garbage") + + val received = rdd.collect.toSet + assert(received === compactedMessages.toSet) + + // size-related method optimizations return sane results + assert(rdd.count === compactedMessages.size) + assert(rdd.countApprox(0).getFinalValue.mean === compactedMessages.size) + assert(!rdd.isEmpty) + assert(rdd.take(1).size === 1) + assert(rdd.take(1).head === compactedMessages.head) + assert(rdd.take(messages.size + 10).size === compactedMessages.size) + + val emptyRdd = KafkaUtils.createRDD[String, String]( + sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts) + + assert(emptyRdd.isEmpty) + + // invalid offset ranges throw exceptions + val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1)) + intercept[SparkException] { + val result = KafkaUtils.createRDD[String, String](sc, kafkaParams, badRanges, preferredHosts) + .map(_.value) + .collect() + } + } + test("iterator boundary conditions") { // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd val topic = s"topicboundary-${Random.nextInt}-${System.currentTimeMillis}" From e8ea89ea10527c6723df4af2685004ea67d872cd Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Sat, 8 Oct 2016 23:59:39 -0500 Subject: [PATCH 2/9] [SPARK-17147][STREAMING][KAFKA] test passing for compacted topics --- .../kafka010/CachedKafkaConsumer.scala | 38 ++- .../kafka010/DirectKafkaInputDStream.scala | 8 +- .../spark/streaming/kafka010/KafkaRDD.scala | 235 +++++++++++++----- .../streaming/kafka010/KafkaTestUtils.scala | 39 +-- .../spark/streaming/kafka010/KafkaUtils.scala | 41 ++- .../streaming/kafka010/KafkaRDDSuite.scala | 81 ++++-- .../kafka010/mocks/MockScheduler.scala | 0 .../streaming/kafka010/mocks/MockTime.scala | 0 8 files changed, 314 insertions(+), 128 deletions(-) rename external/kafka-0-10/src/{main => test}/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala (100%) rename external/kafka-0-10/src/{main => test}/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala (100%) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala index f381255f48243..f5363641d0daa 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala @@ -53,7 +53,7 @@ class CachedKafkaConsumer[K, V] private( // TODO if the buffer was kept around as a random-access structure, // could possibly optimize re-calculating of an RDD in the same batch - protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() protected var nextOffset = -2L def close(): Unit = consumer.close() @@ -90,6 +90,40 @@ class CachedKafkaConsumer[K, V] private( record } + /** + * Start a batch on a compacted topic + */ + def compactedStart(offset: Long, timeout: Long): Unit = { + logDebug(s"compacted start $groupId $topic $partition starting $offset") + // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics + if (offset != nextOffset) { + logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + } + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + */ + def compactedNext(timeout: Long): ConsumerRecord[K, V] = { + if (!buffer.hasNext()) { poll(timeout) } + assert(buffer.hasNext(), + s"Failed to get records for compacted $groupId $topic $partition after polling for $timeout") + val record = buffer.next() + nextOffset = record.offset + 1 + record + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * Will throw NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { + buffer.previous() + } + private def seek(offset: Long): Unit = { logDebug(s"Seeking to $topicPartition $offset") consumer.seek(topicPartition, offset) @@ -101,7 +135,7 @@ class CachedKafkaConsumer[K, V] private( logDebug(s"Polled ${p.partitions()} ${r.size}") import scala.collection.JavaConverters._ r.asScala.foreach { x => System.err.println(s"${x.offset} ${x.key} ${x.value}") } - buffer = r.iterator + buffer = r.listIterator } } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 13827f68f2cb5..c41ad903d4a8c 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -199,7 +199,12 @@ private[spark] class DirectKafkaInputDStream[K, V]( OffsetRange(tp.topic, tp.partition, fo, uo) } val rdd = new KafkaRDD[K, V]( - context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true) + context.sparkContext, + executorKafkaParams, + offsetRanges.toArray, + getPreferredHosts, + true, + false) // Report the record number and metadata of this batch interval to InputInfoTracker. val description = offsetRanges.filter { offsetRange => @@ -302,6 +307,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( getPreferredHosts, // during restore, it's possible same partition will be consumed from multiple // threads, so dont use cache + false, false ) } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 5b5a9ac48c7ca..034931f0d823e 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -44,6 +44,7 @@ import org.apache.spark.storage.StorageLevel * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. * @param useConsumerCache whether to use a consumer from a per-jvm cache + * @param compacted whether any of the topics have log compaction enabled * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ @@ -52,7 +53,8 @@ private[spark] class KafkaRDD[K, V]( val kafkaParams: ju.Map[String, Object], val offsetRanges: Array[OffsetRange], val preferredHosts: ju.Map[TopicPartition, String], - useConsumerCache: Boolean + useConsumerCache: Boolean, + compacted: Boolean ) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { assert("none" == @@ -86,47 +88,63 @@ private[spark] class KafkaRDD[K, V]( }.toArray } - override def count(): Long = offsetRanges.map(_.count).sum + override def count(): Long = + if (compacted) { + super.count() + } else { + offsetRanges.map(_.count).sum + } override def countApprox( timeout: Long, confidence: Double = 0.95 - ): PartialResult[BoundedDouble] = { - val c = count - new PartialResult(new BoundedDouble(c, 1.0, c, c), true) - } + ): PartialResult[BoundedDouble] = + if (compacted) { + super.countApprox(timeout, confidence) + } else { + val c = count + new PartialResult(new BoundedDouble(c, 1.0, c, c), true) + } - override def isEmpty(): Boolean = count == 0L + override def isEmpty(): Boolean = + if (compacted) { + super.isEmpty() + } else { + count == 0L + } - override def take(num: Int): Array[ConsumerRecord[K, V]] = { - val nonEmptyPartitions = this.partitions - .map(_.asInstanceOf[KafkaRDDPartition]) - .filter(_.count > 0) + override def take(num: Int): Array[ConsumerRecord[K, V]] = + if (compacted) { + super.take(num) + } else { + val nonEmptyPartitions = this.partitions + .map(_.asInstanceOf[KafkaRDDPartition]) + .filter(_.count > 0) - if (num < 1 || nonEmptyPartitions.isEmpty) { - return new Array[ConsumerRecord[K, V]](0) - } + if (num < 1 || nonEmptyPartitions.isEmpty) { + return new Array[ConsumerRecord[K, V]](0) + } - // Determine in advance how many messages need to be taken from each partition - val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => - val remain = num - result.values.sum - if (remain > 0) { - val taken = Math.min(remain, part.count) - result + (part.index -> taken.toInt) - } else { - result + // Determine in advance how many messages need to be taken from each partition + val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => + val remain = num - result.values.sum + if (remain > 0) { + val taken = Math.min(remain, part.count) + result + (part.index -> taken.toInt) + } else { + result + } } - } - val buf = new ArrayBuffer[ConsumerRecord[K, V]] - val res = context.runJob( - this, - (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) => - it.take(parts(tc.partitionId)).toArray, parts.keys.toArray - ) - res.foreach(buf ++= _) - buf.toArray - } + val buf = new ArrayBuffer[ConsumerRecord[K, V]] + val res = context.runJob( + this, + (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) => + it.take(parts(tc.partitionId)).toArray, parts.keys.toArray + ) + res.foreach(buf ++= _) + buf.toArray + } private def executors(): Array[ExecutorCacheTaskLocation] = { val bm = sparkContext.env.blockManager @@ -182,51 +200,132 @@ private[spark] class KafkaRDD[K, V]( s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { - new KafkaRDDIterator(part, context) + logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + + s"offsets ${part.fromOffset} -> ${part.untilOffset}") + if (compacted) { + new CompactedKafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor + ) + } else { + new KafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor + ) + } } } +} - /** - * An iterator that fetches messages directly from Kafka for the offsets in partition. - * Uses a cached consumer where possible to take advantage of prefetching - */ - private class KafkaRDDIterator( - part: KafkaRDDPartition, - context: TaskContext) extends Iterator[ConsumerRecord[K, V]] { - - logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + - s"offsets ${part.fromOffset} -> ${part.untilOffset}") - - val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - - context.addTaskCompletionListener{ context => closeIfNeeded() } - - val consumer = if (useConsumerCache) { - CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) - if (context.attemptNumber > 1) { - // just in case the prior attempt failures were cache related - CachedKafkaConsumer.remove(groupId, part.topic, part.partition) - } - CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams) - } else { - CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams) +/** + * An iterator that fetches messages directly from Kafka for the offsets in partition. + * Uses a cached consumer where possible to take advantage of prefetching + */ +private class KafkaRDDIterator[K, V]( + part: KafkaRDDPartition, + context: TaskContext, + kafkaParams: ju.Map[String, Object], + useConsumerCache: Boolean, + pollTimeout: Long, + initialCapacity: Int, + maxCapacity: Int, + loadFactor: Float +) extends Iterator[ConsumerRecord[K, V]] { + + context.addTaskCompletionListener{ context => closeIfNeeded() } + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + val consumer = if (useConsumerCache) { + CachedKafkaConsumer.init( + initialCapacity, + maxCapacity, + loadFactor + ) + if (context.attemptNumber > 1) { + // just in case the prior attempt failures were cache related + CachedKafkaConsumer.remove(groupId, part.topic, part.partition) } + CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams) + } else { + CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams) + } - var requestOffset = part.fromOffset + var requestOffset = part.fromOffset - def closeIfNeeded(): Unit = { - if (!useConsumerCache && consumer != null) { - consumer.close - } + def closeIfNeeded(): Unit = { + if (!useConsumerCache && consumer != null) { + consumer.close } + } + + override def hasNext(): Boolean = requestOffset < part.untilOffset - override def hasNext(): Boolean = requestOffset < part.untilOffset + override def next(): ConsumerRecord[K, V] = { + assert(hasNext(), "Can't call getNext() once untilOffset has been reached") + val r = consumer.get(requestOffset, pollTimeout) + requestOffset += 1 + r + } +} - override def next(): ConsumerRecord[K, V] = { - assert(hasNext(), "Can't call getNext() once untilOffset has been reached") - val r = consumer.get(requestOffset, pollTimeout) - requestOffset += 1 - r +/** + * An iterator that fetches messages directly from Kafka for the offsets in partition. + * Uses a cached consumer where possible to take advantage of prefetching. + * Intended for use on compacted topics only + */ +private class CompactedKafkaRDDIterator[K, V]( + part: KafkaRDDPartition, + context: TaskContext, + kafkaParams: ju.Map[String, Object], + useConsumerCache: Boolean, + pollTimeout: Long, + initialCapacity: Int, + maxCapacity: Int, + loadFactor: Float + ) extends KafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + initialCapacity, + maxCapacity, + loadFactor + ) { + + consumer.compactedStart(part.fromOffset, pollTimeout) + + var nextRecord = consumer.compactedNext(pollTimeout) + + var okNext: Boolean = true + + override def hasNext(): Boolean = okNext + + override def next(): ConsumerRecord[K, V] = { + assert(hasNext, "Can't call getNext() once untilOffset has been reached") + val r = nextRecord + if (r.offset + 1 >= part.untilOffset) { + okNext = false + } else { + nextRecord = consumer.compactedNext(pollTimeout) + if (nextRecord.offset >= part.untilOffset) { + okNext = false + consumer.compactedPrevious() + } } + r } } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 4b11f034d5197..192c706eddc2c 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -30,7 +30,7 @@ import scala.util.control.NonFatal import kafka.admin.AdminUtils import kafka.api.Request import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{ Pool, ZkUtils } +import kafka.utils.ZkUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} @@ -38,7 +38,6 @@ import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.streaming.Time -import org.apache.spark.streaming.kafka010.mocks.MockTime import org.apache.spark.util.Utils /** @@ -74,8 +73,6 @@ private[kafka010] class KafkaTestUtils extends Logging { private var zkReady = false private var brokerReady = false - private val mockTime = new MockTime() - def zkAddress: String = { assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address") s"$zkHost:$zkPort" @@ -151,36 +148,6 @@ private[kafka010] class KafkaTestUtils extends Logging { zookeeper.shutdown() zookeeper = null } - - mockTime.scheduler.shutdown() - } - - /** - * Compact logs for the given topic / partition until offset - */ - def compactLogs(topic: String, partition: Int, offset: Long) { - import kafka.log._ - import kafka.common.TopicAndPartition - - val cleaner = { - val logs = new Pool[TopicAndPartition, Log]() - val logDir = brokerConf.logDirs.head - val dir = new java.io.File(logDir, topic + "-" + partition) - val logProps = new Properties() - logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - val log = new Log( - dir, - LogConfig(logProps), - 0L, - mockTime.scheduler, - mockTime - ) - logs.put(TopicAndPartition(topic, partition), log) - System.err.println(s"built cleaner for compacting logs for $dir") - new LogCleaner(CleanerConfig(), logDirs = Array(dir), logs = logs) - } - cleaner.awaitCleaned(topic, partition, offset) - System.err.println("finished cleaning") } /** Create a Kafka topic and wait until it is propagated to the whole cluster */ @@ -233,12 +200,14 @@ private[kafka010] class KafkaTestUtils extends Logging { producer = null } + val brokerLogDir = Utils.createTempDir().getAbsolutePath + private def brokerConfiguration: Properties = { val props = new Properties() props.put("broker.id", "0") props.put("host.name", "localhost") props.put("port", brokerPort.toString) - props.put("log.dir", Utils.createTempDir().getAbsolutePath) + props.put("log.dir", brokerLogDir) props.put("zookeeper.connect", zkAddress) props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala index b2190bfa05a3a..d17ad3bdb1417 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -72,9 +72,48 @@ object KafkaUtils extends Logging { fixKafkaParams(kp) val osr = offsetRanges.clone() - new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true) + new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true, false) } + /** + * :: Experimental :: + * Scala constructor for a batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * + * configuration parameters. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, + * see [[LocationStrategies]] for more details. + * @param compacted whether any of the topics have log compaction enabled + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ + @Experimental + def createRDD[K, V]( + sc: SparkContext, + kafkaParams: ju.Map[String, Object], + offsetRanges: Array[OffsetRange], + locationStrategy: LocationStrategy, + compacted: Boolean + ): RDD[ConsumerRecord[K, V]] = { + val preferredHosts = locationStrategy match { + case PreferBrokers => + throw new AssertionError( + "If you want to prefer brokers, you must provide a mapping using PreferFixed " + + "A single KafkaRDD does not have a driver consumer and cannot look up brokers for you.") + case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]() + case PreferFixed(hostMap) => hostMap + } + val kp = new ju.HashMap[String, Object](kafkaParams) + fixKafkaParams(kp) + val osr = offsetRanges.clone() + + new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true, compacted) + } + /** * :: Experimental :: * Java constructor for a batch-oriented interface for consuming from Kafka. diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 90c46688b0017..698b3087a9e56 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -22,12 +22,17 @@ import java.{ util => ju } import scala.collection.JavaConverters._ import scala.util.Random +import kafka.common.TopicAndPartition +import kafka.log._ +import kafka.message._ +import kafka.utils.Pool import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.scalatest.BeforeAndAfterAll import org.apache.spark._ import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.streaming.kafka010.mocks.MockTime class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { @@ -64,6 +69,40 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val preferredHosts = LocationStrategies.PreferConsistent + private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { + val mockTime = new MockTime() + val logs = new Pool[TopicAndPartition, Log]() + val logDir = kafkaTestUtils.brokerLogDir + val dir = new java.io.File(logDir, topic + "-" + partition) + dir.mkdirs() + val logProps = new ju.Properties() + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float) + val log = new Log( + dir, + LogConfig(logProps), + 0L, + mockTime.scheduler, + mockTime + ) + messages.foreach { case (k, v) => + val msg = new ByteBufferMessageSet( + NoCompressionCodec, + new Message(v.getBytes, k.getBytes, Message.NoTimestamp, Message.CurrentMagicValue)) + log.append(msg) + } + log.roll() + logs.put(TopicAndPartition(topic, partition), log) + System.err.println(s"built cleaner for compacting logs for $dir") + val cleaner = new LogCleaner(CleanerConfig(), logDirs = Array(dir), logs = logs) + cleaner.startup() + cleaner.awaitCleaned(topic, partition, log.activeSegment.baseOffset, 1000) + System.err.println("finished cleaning") + cleaner.shutdown() + mockTime.scheduler.shutdown() + } + + test("basic usage") { val topic = s"topicbasic-${Random.nextInt}-${System.currentTimeMillis}" kafkaTestUtils.createTopic(topic) @@ -104,20 +143,15 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { test("compacted topic") { val topic = s"topiccompacted-${Random.nextInt}-${System.currentTimeMillis}" - val props = new ju.Properties() - props.put("cleanup.policy", "compact") - props.put("flush.messages", "1") - props.put("segment.ms", "1") - props.put("segment.bytes", "256") - kafkaTestUtils.createTopic(topic, 1, props) + val messages = Array( ("a", "1"), + ("a", "2"), ("b", "1"), - ("b", "2"), ("c", "1"), - ("b", "3"), - ("a", "2"), - ("c", "2") + ("c", "2"), + ("b", "2"), + ("b", "3") ) val compactedMessages = Array( ("a", "2"), @@ -125,20 +159,24 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { ("c", "2") ) - kafkaTestUtils.sendMessages(topic, messages) - // send some junk to fill a log segment - kafkaTestUtils.sendMessages(topic, Array.fill(100)("garbage" -> "1")) - // wait for log compaction - kafkaTestUtils.compactLogs(topic, 0, messages.size - 1) + compactLogs(topic, 0, messages) + + val props = new ju.Properties() + props.put("cleanup.policy", "compact") + props.put("flush.messages", "1") + props.put("segment.ms", "1") + props.put("segment.bytes", "256") + kafkaTestUtils.createTopic(topic, 1, props) + System.err.println(kafkaTestUtils.brokerLogDir + "/" + topic) + - Thread.sleep(100000) val kafkaParams = getKafkaParams() val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) - val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams, offsetRanges, preferredHosts) - .map(m => m.key -> m.value) - .filter(_._1 != "garbage") + val rdd = KafkaUtils.createRDD[String, String]( + sc, kafkaParams, offsetRanges, preferredHosts, true + ).map(m => m.key -> m.value) val received = rdd.collect.toSet assert(received === compactedMessages.toSet) @@ -152,7 +190,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { assert(rdd.take(messages.size + 10).size === compactedMessages.size) val emptyRdd = KafkaUtils.createRDD[String, String]( - sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts) + sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts, true) assert(emptyRdd.isEmpty) @@ -217,7 +255,8 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { kafkaParams, Array(OffsetRange("unused", 0, 1, 2)), ju.Collections.emptyMap[TopicPartition, String](), - true) + true, + false) val a3 = ExecutorCacheTaskLocation("a", "3") val a4 = ExecutorCacheTaskLocation("a", "4") val b1 = ExecutorCacheTaskLocation("b", "1") diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala similarity index 100% rename from external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala rename to external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala similarity index 100% rename from external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala rename to external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala From 182943e36f596d0cb5841a9c63471bea1dd9047b Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Sat, 10 Feb 2018 22:09:38 -0600 Subject: [PATCH 3/9] spark.streaming.kafka.allowNonConsecutiveOffsets --- .../kafka010/CachedKafkaConsumer.scala | 9 ++- .../kafka010/DirectKafkaInputDStream.scala | 8 +-- .../spark/streaming/kafka010/KafkaRDD.scala | 9 +-- .../spark/streaming/kafka010/KafkaUtils.scala | 41 +------------ .../streaming/kafka010/KafkaRDDSuite.scala | 16 ++--- .../kafka010/mocks/MockScheduler.scala | 60 ++++++++++++------- .../streaming/kafka010/mocks/MockTime.scala | 21 +++---- 7 files changed, 69 insertions(+), 95 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala index f5363641d0daa..f306681c3e2de 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala @@ -22,10 +22,8 @@ import java.{ util => ju } import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } import org.apache.kafka.common.{ KafkaException, TopicPartition } -import org.apache.spark.SparkConf import org.apache.spark.internal.Logging - /** * Consumer of single topicpartition, intended for cached reuse. * Underlying consumer is not threadsafe, so neither is this, @@ -83,7 +81,10 @@ class CachedKafkaConsumer[K, V] private( s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() assert(record.offset == offset, - s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") + s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + + "spark.streaming.kafka.allowNonConsecutiveOffsets" + ) } nextOffset = offset + 1 @@ -133,8 +134,6 @@ class CachedKafkaConsumer[K, V] private( val p = consumer.poll(timeout) val r = p.records(topicPartition) logDebug(s"Polled ${p.partitions()} ${r.size}") - import scala.collection.JavaConverters._ - r.asScala.foreach { x => System.err.println(s"${x.offset} ${x.key} ${x.value}") } buffer = r.listIterator } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index c41ad903d4a8c..13827f68f2cb5 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -199,12 +199,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( OffsetRange(tp.topic, tp.partition, fo, uo) } val rdd = new KafkaRDD[K, V]( - context.sparkContext, - executorKafkaParams, - offsetRanges.toArray, - getPreferredHosts, - true, - false) + context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true) // Report the record number and metadata of this batch interval to InputInfoTracker. val description = offsetRanges.filter { offsetRange => @@ -307,7 +302,6 @@ private[spark] class DirectKafkaInputDStream[K, V]( getPreferredHosts, // during restore, it's possible same partition will be consumed from multiple // threads, so dont use cache - false, false ) } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 034931f0d823e..150f84e479c94 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -53,8 +53,7 @@ private[spark] class KafkaRDD[K, V]( val kafkaParams: ju.Map[String, Object], val offsetRanges: Array[OffsetRange], val preferredHosts: ju.Map[TopicPartition, String], - useConsumerCache: Boolean, - compacted: Boolean + useConsumerCache: Boolean ) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { assert("none" == @@ -75,6 +74,8 @@ private[spark] class KafkaRDD[K, V]( conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) private val cacheLoadFactor = conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat + private val compacted = + conf.getBoolean("spark.streaming.kafka.allowNonConsecutiveOffsets", false) override def persist(newLevel: StorageLevel): this.type = { logError("Kafka ConsumerRecord is not serializable. " + @@ -284,7 +285,7 @@ private class KafkaRDDIterator[K, V]( /** * An iterator that fetches messages directly from Kafka for the offsets in partition. * Uses a cached consumer where possible to take advantage of prefetching. - * Intended for use on compacted topics only + * Intended for compacted topics, or other cases when non-consecutive offsets are ok. */ private class CompactedKafkaRDDIterator[K, V]( part: KafkaRDDPartition, @@ -294,7 +295,7 @@ private class CompactedKafkaRDDIterator[K, V]( pollTimeout: Long, initialCapacity: Int, maxCapacity: Int, - loadFactor: Float + loadFactor: Float ) extends KafkaRDDIterator[K, V]( part, context, diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala index d17ad3bdb1417..b2190bfa05a3a 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -72,48 +72,9 @@ object KafkaUtils extends Logging { fixKafkaParams(kp) val osr = offsetRanges.clone() - new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true, false) + new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true) } - /** - * :: Experimental :: - * Scala constructor for a batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. - * @param kafkaParams Kafka - * - * configuration parameters. Requires "bootstrap.servers" to be set - * with Kafka broker(s) specified in host1:port1,host2:port2 form. - * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD - * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, - * see [[LocationStrategies]] for more details. - * @param compacted whether any of the topics have log compaction enabled - * @tparam K type of Kafka message key - * @tparam V type of Kafka message value - */ - @Experimental - def createRDD[K, V]( - sc: SparkContext, - kafkaParams: ju.Map[String, Object], - offsetRanges: Array[OffsetRange], - locationStrategy: LocationStrategy, - compacted: Boolean - ): RDD[ConsumerRecord[K, V]] = { - val preferredHosts = locationStrategy match { - case PreferBrokers => - throw new AssertionError( - "If you want to prefer brokers, you must provide a mapping using PreferFixed " + - "A single KafkaRDD does not have a driver consumer and cannot look up brokers for you.") - case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]() - case PreferFixed(hostMap) => hostMap - } - val kp = new ju.HashMap[String, Object](kafkaParams) - fixKafkaParams(kp) - val osr = offsetRanges.clone() - - new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true, compacted) - } - /** * :: Experimental :: * Java constructor for a batch-oriented interface for consuming from Kafka. diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 698b3087a9e56..376265a997671 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -93,11 +93,11 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { } log.roll() logs.put(TopicAndPartition(topic, partition), log) - System.err.println(s"built cleaner for compacting logs for $dir") + val cleaner = new LogCleaner(CleanerConfig(), logDirs = Array(dir), logs = logs) cleaner.startup() cleaner.awaitCleaned(topic, partition, log.activeSegment.baseOffset, 1000) - System.err.println("finished cleaning") + cleaner.shutdown() mockTime.scheduler.shutdown() } @@ -142,6 +142,10 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { } test("compacted topic") { + val compactConf = sparkConf.clone() + compactConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") + sc.stop() + sc = new SparkContext(compactConf) val topic = s"topiccompacted-${Random.nextInt}-${System.currentTimeMillis}" val messages = Array( @@ -167,7 +171,6 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { props.put("segment.ms", "1") props.put("segment.bytes", "256") kafkaTestUtils.createTopic(topic, 1, props) - System.err.println(kafkaTestUtils.brokerLogDir + "/" + topic) val kafkaParams = getKafkaParams() @@ -175,7 +178,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) val rdd = KafkaUtils.createRDD[String, String]( - sc, kafkaParams, offsetRanges, preferredHosts, true + sc, kafkaParams, offsetRanges, preferredHosts ).map(m => m.key -> m.value) val received = rdd.collect.toSet @@ -190,7 +193,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { assert(rdd.take(messages.size + 10).size === compactedMessages.size) val emptyRdd = KafkaUtils.createRDD[String, String]( - sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts, true) + sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)), preferredHosts) assert(emptyRdd.isEmpty) @@ -255,8 +258,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { kafkaParams, Array(OffsetRange("unused", 0, 1, 2)), ju.Collections.emptyMap[TopicPartition, String](), - true, - false) + true) val a3 = ExecutorCacheTaskLocation("a", "3") val a4 = ExecutorCacheTaskLocation("a", "4") val b1 = ExecutorCacheTaskLocation("b", "1") diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala index ecf6287b858ac..5bce4c33c6722 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala @@ -1,11 +1,11 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -14,45 +14,51 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.streaming.kafka010.mocks -import scala.collection.mutable.PriorityQueue import java.util.concurrent.TimeUnit + +import scala.collection.mutable.PriorityQueue + import kafka.utils.{Scheduler, Time} /** - * A mock scheduler that executes tasks synchronously using a mock time instance. Tasks are executed synchronously when - * the time is advanced. This class is meant to be used in conjunction with MockTime. - * + * A mock scheduler that executes tasks synchronously using a mock time instance. + * Tasks are executed synchronously when the time is advanced. + * This class is meant to be used in conjunction with MockTime. + * * Example usage * * val time = new MockTime * time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000) * time.sleep(1001) // this should cause our scheduled task to fire * - * - * Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time). + * + * Incrementing the time to the exact next execution time of a task will result in that task + * executing (it as if execution itself takes no time). */ private[kafka010] class MockScheduler(val time: Time) extends Scheduler { - + /* a priority queue of tasks ordered by next execution time */ var tasks = new PriorityQueue[MockTask]() - - def isStarted = true + + def isStarted: Boolean = true def startup() {} - + def shutdown() { this synchronized { tasks.foreach(_.fun()) tasks.clear() } } - + /** * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs * when this method is called and the execution happens synchronously in the calling thread. - * If you are using the scheduler associated with a MockTime instance this call be triggered automatically. + * If you are using the scheduler associated with a MockTime instance this call + * will be triggered automatically. */ def tick() { this synchronized { @@ -69,24 +75,34 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler { } } } - - def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) { + + def schedule( + name: String, + fun: () => Unit, + delay: Long = 0, + period: Long = -1, + unit: TimeUnit = TimeUnit.MILLISECONDS) { this synchronized { tasks += MockTask(name, fun, time.milliseconds + delay, period = period) tick() } } - + } -case class MockTask(val name: String, val fun: () => Unit, var nextExecution: Long, val period: Long) extends Ordered[MockTask] { - def periodic = period >= 0 +case class MockTask( + val name: String, + val fun: () => Unit, + var nextExecution: Long, + val period: Long) extends Ordered[MockTask] { + def periodic: Boolean = period >= 0 def compare(t: MockTask): Int = { - if(t.nextExecution == nextExecution) + if (t.nextExecution == nextExecution) { 0 - else if (t.nextExecution < nextExecution) + } else if (t.nextExecution < nextExecution) { -1 - else + } else { 1 + } } } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala index de80b5c284aca..0a8b6d2676dce 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala @@ -1,11 +1,11 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -18,33 +18,34 @@ package org.apache.spark.streaming.kafka010.mocks import java.util.concurrent._ + import kafka.utils.Time /** * A class used for unit testing things which depend on the Time interface. - * + * * This class never manually advances the clock, it only does so when you call * sleep(ms) - * + * * It also comes with an associated scheduler instance for managing background tasks in * a deterministic way. */ private[kafka010] class MockTime(@volatile private var currentMs: Long) extends Time { - + val scheduler = new MockScheduler(this) - + def this() = this(System.currentTimeMillis) - + def milliseconds: Long = currentMs - def nanoseconds: Long = + def nanoseconds: Long = TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS) def sleep(ms: Long) { this.currentMs += ms scheduler.tick() } - - override def toString() = "MockTime(%d)".format(milliseconds) + + override def toString(): String = "MockTime(%d)".format(milliseconds) } From 89f4bc5f4de78cdcc22b5c9b26a27ee9263048c8 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Sat, 10 Feb 2018 22:13:49 -0600 Subject: [PATCH 4/9] [SPARK-17147][STREAMING][KAFKA] remove stray param doc --- .../scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 150f84e479c94..209f6f77c3a0b 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -44,7 +44,6 @@ import org.apache.spark.storage.StorageLevel * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. * @param useConsumerCache whether to use a consumer from a per-jvm cache - * @param compacted whether any of the topics have log compaction enabled * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ From 12e65bedddbcd2407598e69fa3c6fcbcdfc67e5d Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Sat, 10 Feb 2018 22:28:22 -0600 Subject: [PATCH 5/9] [SPARK-17147][STREAMING][KAFKA] prepare for merge of master --- .../scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 209f6f77c3a0b..fa345c1254b12 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -254,7 +254,7 @@ private class KafkaRDDIterator[K, V]( maxCapacity, loadFactor ) - if (context.attemptNumber > 1) { + if (context.attemptNumber >= 1) { // just in case the prior attempt failures were cache related CachedKafkaConsumer.remove(groupId, part.topic, part.partition) } From 224675046be2fbc38fea59c59394736e19042eb4 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 20 Feb 2018 23:05:43 -0600 Subject: [PATCH 6/9] [SPARK-17147][STREAMING][KAFKA] address srowen feedback --- .../kafka010/CachedKafkaConsumer.scala | 2 +- .../spark/streaming/kafka010/KafkaRDD.scala | 17 ++++++++--------- .../streaming/kafka010/KafkaRDDSuite.scala | 1 + .../kafka010/mocks/MockScheduler.scala | 18 +++++------------- .../streaming/kafka010/mocks/MockTime.scala | 2 +- 5 files changed, 16 insertions(+), 24 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala index f306681c3e2de..a8932dbdb0cbb 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala @@ -119,7 +119,7 @@ class CachedKafkaConsumer[K, V] private( /** * Rewind to previous record in the batch from a compacted topic. - * Will throw NoSuchElementException if no previous element + * @throws NoSuchElementException if no previous element */ def compactedPrevious(): ConsumerRecord[K, V] = { buffer.previous() diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index af82d5936e665..4851e48b4f393 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -137,14 +137,11 @@ private[spark] class KafkaRDD[K, V]( } } - val buf = new ArrayBuffer[ConsumerRecord[K, V]] - val res = context.runJob( + context.runJob( this, (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) => it.take(parts(tc.partitionId)).toArray, parts.keys.toArray - ) - res.foreach(buf ++= _) - buf.toArray + ).flatten } private def executors(): Array[ExecutorCacheTaskLocation] = { @@ -259,14 +256,16 @@ private class KafkaRDDIterator[K, V]( def closeIfNeeded(): Unit = { if (!useConsumerCache && consumer != null) { - consumer.close + consumer.close() } } override def hasNext(): Boolean = requestOffset < part.untilOffset override def next(): ConsumerRecord[K, V] = { - assert(hasNext(), "Can't call getNext() once untilOffset has been reached") + if (!hasNext) { + throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached") + } val r = consumer.get(requestOffset, pollTimeout) requestOffset += 1 r @@ -300,9 +299,9 @@ private class CompactedKafkaRDDIterator[K, V]( consumer.compactedStart(part.fromOffset, pollTimeout) - var nextRecord = consumer.compactedNext(pollTimeout) + private var nextRecord = consumer.compactedNext(pollTimeout) - var okNext: Boolean = true + private var okNext: Boolean = true override def hasNext(): Boolean = okNext diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 376265a997671..0ffd2fbf636ff 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -71,6 +71,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { val mockTime = new MockTime() + // LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api val logs = new Pool[TopicAndPartition, Log]() val logDir = kafkaTestUtils.brokerLogDir val dir = new java.io.File(logDir, topic + "-" + partition) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala index 5bce4c33c6722..88b97e1256e68 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala @@ -45,13 +45,11 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler { def isStarted: Boolean = true - def startup() {} + def startup(): Unit = {} - def shutdown() { - this synchronized { - tasks.foreach(_.fun()) - tasks.clear() - } + def shutdown(): Unit = synchronized { + tasks.foreach(_.fun()) + tasks.clear() } /** @@ -97,12 +95,6 @@ case class MockTask( val period: Long) extends Ordered[MockTask] { def periodic: Boolean = period >= 0 def compare(t: MockTask): Int = { - if (t.nextExecution == nextExecution) { - 0 - } else if (t.nextExecution < nextExecution) { - -1 - } else { - 1 - } + java.lang.Long.compare(t.nextExecution, nextExecution) } } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala index 0a8b6d2676dce..a68f94db1f689 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala @@ -46,6 +46,6 @@ private[kafka010] class MockTime(@volatile private var currentMs: Long) extends scheduler.tick() } - override def toString(): String = "MockTime(%d)".format(milliseconds) + override def toString(): String = s"MockTime($milliseconds)" } From 08f3570c0491f96abcaa9a6dc0f4e3030cfea6c0 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 20 Feb 2018 23:25:36 -0600 Subject: [PATCH 7/9] [SPARK-17147][STREAMING][KAFKA] change another assert to NoSuchElement --- .../scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 4851e48b4f393..9ce4dcc79c8c0 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -306,7 +306,9 @@ private class CompactedKafkaRDDIterator[K, V]( override def hasNext(): Boolean = okNext override def next(): ConsumerRecord[K, V] = { - assert(hasNext, "Can't call getNext() once untilOffset has been reached") + if (!hasNext) { + throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached") + } val r = nextRecord if (r.offset + 1 >= part.untilOffset) { okNext = false From 248b5111651da3d570768a0f4ffffb603193285c Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 21 Feb 2018 22:22:14 -0600 Subject: [PATCH 8/9] [SPARK-17147][STREAMING][KAFKA] remaining asserts are semantically more like require, since they are external things that could go wrong --- .../spark/streaming/kafka010/CachedKafkaConsumer.scala | 10 +++++----- .../org/apache/spark/streaming/kafka010/KafkaRDD.scala | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala index a8932dbdb0cbb..fcca16e1c6879 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala @@ -36,7 +36,7 @@ class CachedKafkaConsumer[K, V] private( val partition: Int, val kafkaParams: ju.Map[String, Object]) extends Logging { - assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), "groupId used for cache key must match the groupId in kafkaParams") val topicPartition = new TopicPartition(topic, partition) @@ -69,7 +69,7 @@ class CachedKafkaConsumer[K, V] private( } if (!buffer.hasNext()) { poll(timeout) } - assert(buffer.hasNext(), + require(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") var record = buffer.next() @@ -77,10 +77,10 @@ class CachedKafkaConsumer[K, V] private( logInfo(s"Buffer miss for $groupId $topic $partition $offset") seek(offset) poll(timeout) - assert(buffer.hasNext(), + require(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() - assert(record.offset == offset, + require(record.offset == offset, s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + "spark.streaming.kafka.allowNonConsecutiveOffsets" @@ -110,7 +110,7 @@ class CachedKafkaConsumer[K, V] private( */ def compactedNext(timeout: Long): ConsumerRecord[K, V] = { if (!buffer.hasNext()) { poll(timeout) } - assert(buffer.hasNext(), + require(buffer.hasNext(), s"Failed to get records for compacted $groupId $topic $partition after polling for $timeout") val record = buffer.next() nextOffset = record.offset + 1 diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 9ce4dcc79c8c0..6d4d64547ae25 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -55,12 +55,12 @@ private[spark] class KafkaRDD[K, V]( useConsumerCache: Boolean ) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { - assert("none" == + require("none" == kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String], ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " must be set to none for executor kafka params, else messages may not match offsetRange") - assert(false == + require(false == kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean], ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " must be set to false for executor kafka params, else offsets may commit before processing") @@ -187,7 +187,7 @@ private[spark] class KafkaRDD[K, V]( override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = { val part = thePart.asInstanceOf[KafkaRDDPartition] - assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) + require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) { logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + s"skipping ${part.topic} ${part.partition}") From e3ae84523621405d2f2b55ec92cb79921aaba961 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Mon, 26 Feb 2018 20:25:24 -0600 Subject: [PATCH 9/9] [SPARK-17147][STREAMING][KAFKA] address further feedback --- .../kafka010/CachedKafkaConsumer.scala | 4 +- .../spark/streaming/kafka010/KafkaRDD.scala | 40 ++++++++++--------- .../streaming/kafka010/KafkaRDDSuite.scala | 13 +++--- .../streaming/kafka010/KafkaTestUtils.scala | 4 +- .../kafka010/mocks/MockScheduler.scala | 30 ++++++-------- 5 files changed, 46 insertions(+), 45 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala index fcca16e1c6879..aeb8c1dc342b3 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala @@ -109,7 +109,9 @@ class CachedKafkaConsumer[K, V] private( * Assumes compactedStart has been called first, and ignores gaps. */ def compactedNext(timeout: Long): ConsumerRecord[K, V] = { - if (!buffer.hasNext()) { poll(timeout) } + if (!buffer.hasNext()) { + poll(timeout) + } require(buffer.hasNext(), s"Failed to get records for compacted $groupId $topic $partition after polling for $timeout") val record = buffer.next() diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 6d4d64547ae25..07239eda64d2e 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -117,31 +117,33 @@ private[spark] class KafkaRDD[K, V]( override def take(num: Int): Array[ConsumerRecord[K, V]] = if (compacted) { super.take(num) + } else if (num < 1) { + Array.empty[ConsumerRecord[K, V]] } else { val nonEmptyPartitions = this.partitions .map(_.asInstanceOf[KafkaRDDPartition]) .filter(_.count > 0) - if (num < 1 || nonEmptyPartitions.isEmpty) { - return new Array[ConsumerRecord[K, V]](0) - } - - // Determine in advance how many messages need to be taken from each partition - val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => - val remain = num - result.values.sum - if (remain > 0) { - val taken = Math.min(remain, part.count) - result + (part.index -> taken.toInt) - } else { - result + if (nonEmptyPartitions.isEmpty) { + Array.empty[ConsumerRecord[K, V]] + } else { + // Determine in advance how many messages need to be taken from each partition + val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => + val remain = num - result.values.sum + if (remain > 0) { + val taken = Math.min(remain, part.count) + result + (part.index -> taken.toInt) + } else { + result + } } - } - context.runJob( - this, - (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) => - it.take(parts(tc.partitionId)).toArray, parts.keys.toArray - ).flatten + context.runJob( + this, + (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) => + it.take(parts(tc.partitionId)).toArray, parts.keys.toArray + ).flatten + } } private def executors(): Array[ExecutorCacheTaskLocation] = { @@ -239,7 +241,7 @@ private class KafkaRDDIterator[K, V]( val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - context.addTaskCompletionListener{ context => closeIfNeeded() } + context.addTaskCompletionListener(_ => closeIfNeeded()) val consumer = if (useConsumerCache) { CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 0ffd2fbf636ff..271adea1df731 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kafka010 import java.{ util => ju } +import java.io.File import scala.collection.JavaConverters._ import scala.util.Random @@ -74,11 +75,11 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { // LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api val logs = new Pool[TopicAndPartition, Log]() val logDir = kafkaTestUtils.brokerLogDir - val dir = new java.io.File(logDir, topic + "-" + partition) + val dir = new File(logDir, topic + "-" + partition) dir.mkdirs() val logProps = new ju.Properties() logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float) + logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f)) val log = new Log( dir, LogConfig(logProps), @@ -87,10 +88,10 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { mockTime ) messages.foreach { case (k, v) => - val msg = new ByteBufferMessageSet( - NoCompressionCodec, - new Message(v.getBytes, k.getBytes, Message.NoTimestamp, Message.CurrentMagicValue)) - log.append(msg) + val msg = new ByteBufferMessageSet( + NoCompressionCodec, + new Message(v.getBytes, k.getBytes, Message.NoTimestamp, Message.CurrentMagicValue)) + log.append(msg) } log.roll() logs.put(TopicAndPartition(topic, partition), log) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index a441ec9331444..70b579d96d692 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -172,12 +172,12 @@ private[kafka010] class KafkaTestUtils extends Logging { /** Create a Kafka topic and wait until it is propagated to the whole cluster */ def createTopic(topic: String, partitions: Int): Unit = { - createTopic(topic, partitions, new Properties) + createTopic(topic, partitions, new Properties()) } /** Create a Kafka topic and wait until it is propagated to the whole cluster */ def createTopic(topic: String): Unit = { - createTopic(topic, 1, new Properties) + createTopic(topic, 1, new Properties()) } /** Java-friendly function for sending messages to the Kafka broker */ diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala index 88b97e1256e68..928e1a6ef54b9 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala @@ -58,18 +58,16 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler { * If you are using the scheduler associated with a MockTime instance this call * will be triggered automatically. */ - def tick() { - this synchronized { - val now = time.milliseconds - while(!tasks.isEmpty && tasks.head.nextExecution <= now) { - /* pop and execute the task with the lowest next execution time */ - val curr = tasks.dequeue - curr.fun() - /* if the task is periodic, reschedule it and re-enqueue */ - if(curr.periodic) { - curr.nextExecution += curr.period - this.tasks += curr - } + def tick(): Unit = synchronized { + val now = time.milliseconds + while(!tasks.isEmpty && tasks.head.nextExecution <= now) { + /* pop and execute the task with the lowest next execution time */ + val curr = tasks.dequeue + curr.fun() + /* if the task is periodic, reschedule it and re-enqueue */ + if(curr.periodic) { + curr.nextExecution += curr.period + this.tasks += curr } } } @@ -79,11 +77,9 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler { fun: () => Unit, delay: Long = 0, period: Long = -1, - unit: TimeUnit = TimeUnit.MILLISECONDS) { - this synchronized { - tasks += MockTask(name, fun, time.milliseconds + delay, period = period) - tick() - } + unit: TimeUnit = TimeUnit.MILLISECONDS): Unit = synchronized { + tasks += MockTask(name, fun, time.milliseconds + delay, period = period) + tick() } }