From aa8eed44ad1b5e4bdee9c8c32a44fb07b1fa9f2e Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 1 Mar 2018 14:47:49 +0100 Subject: [PATCH] [SPARK-19185][SS] Make Kafka consumer cache configurable --- .../structured-streaming-kafka-integration.md | 9 +++++ .../sql/kafka010/KafkaMicroBatchReader.scala | 37 ++++++++++++++----- .../kafka010/KafkaMicroBatchSourceSuite.scala | 1 + 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 5647ec6bc5797..6703a5efa5b6b 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -353,6 +353,13 @@ The following configurations are optional: streaming and batch The timeout in milliseconds to poll data from Kafka in executors. + + kafkaConsumer.useConsumerCache + true or false + true + streaming and batch + Whether enable or disable caching for Kafka consumers. Disabling the cache may be needed to workaround the problem described in SPARK-19185. This property may be removed in later versions of Spark, once SPARK-19185 is resolved. + fetchOffset.numRetries int @@ -376,6 +383,8 @@ The following configurations are optional: +If you would like to disable the caching for Kafka consumers, you can set `spark.streaming.kafka.consumer.cache.enabled` to `false`. Disabling the cache may be needed to workaround the problem described in SPARK-19185. This property may be removed in later versions of Spark, once SPARK-19185 is resolved. + ## Writing Data to Kafka Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala index fb647ca7e70dd..806389c6f5841 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala @@ -76,6 +76,10 @@ private[kafka010] class KafkaMicroBatchReader( private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) + private val useConsumerCache = options.getBoolean( + "kafkaConsumer.useConsumerCache", + SparkEnv.get.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled", true)) + /** * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only * called in StreamExecutionThread. Otherwise, interrupting a thread while running @@ -157,8 +161,8 @@ private[kafka010] class KafkaMicroBatchReader( } else None val range = KafkaOffsetRange(tp, fromOffset, untilOffset) Some( - new KafkaMicroBatchDataReaderFactory( - range, preferredLoc, executorKafkaParams, pollTimeoutMs, failOnDataLoss)) + new KafkaMicroBatchDataReaderFactory(range, preferredLoc, executorKafkaParams, + pollTimeoutMs, failOnDataLoss, useConsumerCache)) } else { reportDataLoss( s"Partition $tp's offset was changed from " + @@ -325,12 +329,13 @@ private[kafka010] class KafkaMicroBatchDataReaderFactory( preferredLoc: Option[String], executorKafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, - failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] { + failOnDataLoss: Boolean, + useConsumerCache: Boolean) extends DataReaderFactory[UnsafeRow] { override def preferredLocations(): Array[String] = preferredLoc.toArray override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader( - range, executorKafkaParams, pollTimeoutMs, failOnDataLoss) + range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, useConsumerCache) } /** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */ @@ -338,10 +343,16 @@ private[kafka010] class KafkaMicroBatchDataReader( offsetRange: KafkaOffsetRange, executorKafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, - failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging { + failOnDataLoss: Boolean, + useConsumerCache: Boolean) extends DataReader[UnsafeRow] with Logging { - private val consumer = CachedKafkaConsumer.getOrCreate( - offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) + private val consumer = if (useConsumerCache) { + CachedKafkaConsumer.getOrCreate( + offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) + } else { + CachedKafkaConsumer.createUncached( + offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) + } private val rangeToRead = resolveRange(offsetRange) private val converter = new KafkaRecordToUnsafeRowConverter @@ -369,9 +380,15 @@ private[kafka010] class KafkaMicroBatchDataReader( } override def close(): Unit = { - // Indicate that we're no longer using this consumer - CachedKafkaConsumer.releaseKafkaConsumer( - offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) + if (useConsumerCache) { + // Indicate that we're no longer using this consumer + CachedKafkaConsumer.releaseKafkaConsumer( + offsetRange.topicPartition.topic, offsetRange.topicPartition.partition, executorKafkaParams) + } else { + if (consumer != null) { + consumer.close() + } + } } private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 89c9ef4cc73b5..fc0e128633911 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -567,6 +567,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .readStream .format("kafka") .option("subscribe", topic) + .option("kafkaConsumer.useConsumerCache", false) .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") .load()