From 95a0c96f040530f5eec270a2b53b46d195a94b8c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 7 Oct 2016 16:47:15 -0700 Subject: [PATCH 1/3] Fetch the earliest offsets manually in KafkaSource instead of counting on KafkaConsumer --- .../spark/sql/kafka010/KafkaSource.scala | 41 ++++++++++++++----- .../sql/kafka010/KafkaSourceProvider.scala | 19 ++++++--- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 1be70db87497e..1b22a5fd19c35 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -82,6 +82,7 @@ private[kafka010] case class KafkaSource( executorKafkaParams: ju.Map[String, Object], sourceOptions: Map[String, String], metadataPath: String, + startFromEarliestOffset: Boolean, failOnDataLoss: Boolean) extends Source with Logging { @@ -109,7 +110,11 @@ private[kafka010] case class KafkaSource( private lazy val initialPartitionOffsets = { val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath) metadataLog.get(0).getOrElse { - val offsets = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = false)) + val offsets = if (startFromEarliestOffset) { + KafkaSourceOffset(fetchEarliestOffsets()) + } else { + KafkaSourceOffset(fetchLatestOffsets()) + } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") offsets @@ -123,7 +128,7 @@ private[kafka010] case class KafkaSource( // Make sure initialPartitionOffsets is initialized initialPartitionOffsets - val offset = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = true)) + val offset = KafkaSourceOffset(fetchLatestOffsets()) logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}") Some(offset) } @@ -227,11 +232,9 @@ private[kafka010] case class KafkaSource( override def toString(): String = s"KafkaSource[$consumerStrategy]" /** - * Fetch the offset of a partition, either seek to the latest offsets or use the current offsets - * in the consumer. + * Fetch the earliest offsets of partitions. */ - private def fetchPartitionOffsets( - seekToEnd: Boolean): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { + private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894) assert(Thread.currentThread().isInstanceOf[StreamExecutionThread]) // Poll to get the latest assigned partitions @@ -240,11 +243,27 @@ private[kafka010] case class KafkaSource( consumer.pause(partitions) logDebug(s"Partitioned assigned to consumer: $partitions") - // Get the current or latest offset of each partition - if (seekToEnd) { - consumer.seekToEnd(partitions) - logDebug("Seeked to the end") - } + logDebug("Seeked to the beginning") + consumer.seekToBeginning(partitions) + val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap + logDebug(s"Got offsets for partition : $partitionOffsets") + partitionOffsets + } + + /** + * Fetch the latest offset of partitions. + */ + private def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { + // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894) + assert(Thread.currentThread().isInstanceOf[StreamExecutionThread]) + // Poll to get the latest assigned partitions + consumer.poll(0) + val partitions = consumer.assignment() + consumer.pause(partitions) + logDebug(s"Partitioned assigned to consumer: $partitions") + + logDebug("Seeked to the end") + consumer.seekToEnd(partitions) val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap logDebug(s"Got offsets for partition : $partitionOffsets") partitionOffsets diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 1b0a2fe955d03..23b1b60f3bcaa 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -77,10 +77,15 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider // id. Hence, we should generate a unique id for each query. val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - val autoOffsetResetValue = caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY) match { - case Some(value) => value.trim() // same values as those supported by auto.offset.reset - case None => "latest" - } + val startFromEarliestOffset = + caseInsensitiveParams.get(STARTING_OFFSET_OPTION_KEY).map(_.trim.toLowerCase) match { + case Some("latest") => false + case Some("earliest") => true + case Some(pos) => + // This should not happen since we have already checked the options. + throw new IllegalStateException(s"Invalid $STARTING_OFFSET_OPTION_KEY: $pos") + case None => false + } val kafkaParamsForStrategy = ConfigUpdater("source", specifiedKafkaParams) @@ -90,8 +95,9 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider // So that consumers in Kafka source do not mess with any existing group id .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-driver") - // So that consumers can start from earliest or latest - .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetValue) + // Set to "latest" to avoid exceptions. However, KafkaSource will fetch the initial offsets + // by itself instead of counting on KafkaConsumer. + .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") // So that consumers in the driver does not commit offsets unnecessarily .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") @@ -147,6 +153,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider kafkaParamsForExecutors, parameters, metadataPath, + startFromEarliestOffset, failOnDataLoss) } From 957855512083741e0421c3124c1904479fe5f9ce Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 12 Oct 2016 14:56:57 -0700 Subject: [PATCH 2/3] clean up --- .../spark/sql/kafka010/KafkaSource.scala | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 1b22a5fd19c35..a7efdf7e872d0 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -235,18 +235,15 @@ private[kafka010] case class KafkaSource( * Fetch the earliest offsets of partitions. */ private def fetchEarliestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { - // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894) - assert(Thread.currentThread().isInstanceOf[StreamExecutionThread]) // Poll to get the latest assigned partitions consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) - logDebug(s"Partitioned assigned to consumer: $partitions") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the beginning") - logDebug("Seeked to the beginning") consumer.seekToBeginning(partitions) val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap - logDebug(s"Got offsets for partition : $partitionOffsets") + logDebug(s"Got earliest offsets for partition : $partitionOffsets") partitionOffsets } @@ -254,18 +251,15 @@ private[kafka010] case class KafkaSource( * Fetch the latest offset of partitions. */ private def fetchLatestOffsets(): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { - // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894) - assert(Thread.currentThread().isInstanceOf[StreamExecutionThread]) // Poll to get the latest assigned partitions consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) - logDebug(s"Partitioned assigned to consumer: $partitions") + logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.") - logDebug("Seeked to the end") consumer.seekToEnd(partitions) val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap - logDebug(s"Got offsets for partition : $partitionOffsets") + logDebug(s"Got latest offsets for partition : $partitionOffsets") partitionOffsets } @@ -275,8 +269,6 @@ private[kafka010] case class KafkaSource( */ private def fetchNewPartitionEarliestOffsets( newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = withRetriesWithoutInterrupt { - // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894) - assert(Thread.currentThread().isInstanceOf[StreamExecutionThread]) // Poll to get the latest assigned partitions consumer.poll(0) val partitions = consumer.assignment() @@ -289,7 +281,7 @@ private[kafka010] case class KafkaSource( // So we need to ignore them partitions.contains(p) }.map(p => p -> consumer.position(p)).toMap - logDebug(s"Got offsets for new partitions: $partitionToOffsets") + logDebug(s"Got earliest offsets for new partitions: $partitionToOffsets") partitionToOffsets } @@ -303,6 +295,9 @@ private[kafka010] case class KafkaSource( */ private def withRetriesWithoutInterrupt( body: => Map[TopicPartition, Long]): Map[TopicPartition, Long] = { + // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894) + assert(Thread.currentThread().isInstanceOf[StreamExecutionThread]) + synchronized { var result: Option[Map[TopicPartition, Long]] = None var attempt = 1 From 7986f185649ad474b7f22bb0930e0d4a9e992a91 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 13 Oct 2016 12:27:30 -0700 Subject: [PATCH 3/3] Address --- .../scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index a7efdf7e872d0..4b0bb0a0f725c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -272,17 +272,18 @@ private[kafka010] case class KafkaSource( // Poll to get the latest assigned partitions consumer.poll(0) val partitions = consumer.assignment() + consumer.pause(partitions) logDebug(s"\tPartitioned assigned to consumer: $partitions") // Get the earliest offset of each partition consumer.seekToBeginning(partitions) - val partitionToOffsets = newPartitions.filter { p => + val partitionOffsets = newPartitions.filter { p => // When deleting topics happen at the same time, some partitions may not be in `partitions`. // So we need to ignore them partitions.contains(p) }.map(p => p -> consumer.position(p)).toMap - logDebug(s"Got earliest offsets for new partitions: $partitionToOffsets") - partitionToOffsets + logDebug(s"Got earliest offsets for new partitions: $partitionOffsets") + partitionOffsets } /**