From f92c75859aed2c6d56e54e1d6ed663b6d52e4771 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 15 Jul 2019 12:51:00 +0200 Subject: [PATCH] Add sleep + remove duplications --- .../sql/kafka010/KafkaOffsetReader.scala | 2 +- .../spark/sql/kafka010/KafkaTestUtils.scala | 65 ++++++------------- 2 files changed, 21 insertions(+), 46 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 147c427cef8da..6663d55d9ea49 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -422,7 +422,7 @@ private[kafka010] class KafkaOffsetReader( val startTimeMs = System.currentTimeMillis() while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { // Poll to get the latest assigned partitions - consumer.poll(jt.Duration.ZERO) + consumer.poll(jt.Duration.ofMillis(100)) partitions = consumer.assignment() } require(!partitions.isEmpty) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index b30463b161c87..6036a65938fc6 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.kafka010 import java.io.{File, IOException} import java.lang.{Integer => JInt} import java.net.InetSocketAddress -import java.time.Duration -import java.util.{Collections, Map => JMap, Properties, Set => JSet, UUID} +import java.util.{Collections, Map => JMap, Properties, UUID} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -33,7 +32,6 @@ import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.ZkUtils import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions, NewTopic} -import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName @@ -44,6 +42,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.sql.kafka010.KafkaSourceProvider.kafkaParamsForDriver import org.apache.spark.util.{ShutdownHookManager, Utils} /** @@ -284,43 +283,29 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L } def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = { - val kc = new KafkaConsumer[String, String](consumerConfiguration) - logInfo("Created consumer to get earliest offsets") - kc.subscribe(topics.asJavaCollection) - val partitions = getPartitions(kc) - kc.pause(partitions) - kc.seekToBeginning(partitions) - val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap - kc.close() - logInfo("Closed consumer to get earliest offsets") - offsets + val reader = getKafkaOffsetReader(topics) + try { + reader.fetchEarliestOffsets() + } finally { + reader.close() + } } def getLatestOffsets(topics: Set[String]): Map[TopicPartition, Long] = { - val kc = new KafkaConsumer[String, String](consumerConfiguration) - logInfo("Created consumer to get latest offsets") - kc.subscribe(topics.asJavaCollection) - val partitions = getPartitions(kc) - kc.pause(partitions) - kc.seekToEnd(partitions) - val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap - kc.close() - logInfo("Closed consumer to get latest offsets") - offsets + val reader = getKafkaOffsetReader(topics) + try { + reader.fetchLatestOffsets(None) + } finally { + reader.close() + } } - private def getPartitions(consumer: KafkaConsumer[String, String]): JSet[TopicPartition] = { - var partitions = Set.empty[TopicPartition].asJava - val startTimeMs = System.currentTimeMillis() - val timeoutMs = timeout(1.minute).value.toMillis - while (partitions.isEmpty&& System.currentTimeMillis() - startTimeMs < timeoutMs) { - // Poll to get the latest assigned partitions - consumer.poll(Duration.ZERO) - partitions = consumer.assignment() - } - require(!partitions.isEmpty) - logDebug(s"Partitions assigned to consumer: $partitions") - partitions + private def getKafkaOffsetReader(topics: Set[String]): KafkaOffsetReader = { + new KafkaOffsetReader( + SubscribeStrategy(topics.toSeq), + kafkaParamsForDriver(Map("bootstrap.servers" -> brokerAddress)), + Map.empty, + driverGroupIdPrefix = "group-KafkaTestUtils") } def listConsumerGroups(): ListConsumerGroupsResult = { @@ -376,16 +361,6 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L } } - private def consumerConfiguration: Properties = { - val props = new Properties() - props.put("bootstrap.servers", brokerAddress) - props.put("group.id", "group-KafkaTestUtils-" + Random.nextInt) - props.put("value.deserializer", classOf[StringDeserializer].getName) - props.put("key.deserializer", classOf[StringDeserializer].getName) - props.put("enable.auto.commit", "false") - props - } - /** Verify topic is deleted in all places, e.g, brokers, zookeeper. */ private def verifyTopicDeletion( topic: String,