Skip to content

Commit

Permalink
Add sleep + remove duplications
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi committed Jul 15, 2019
1 parent ad1863a commit f92c758
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 46 deletions.
Expand Up @@ -422,7 +422,7 @@ private[kafka010] class KafkaOffsetReader(
val startTimeMs = System.currentTimeMillis()
while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) {
// Poll to get the latest assigned partitions
consumer.poll(jt.Duration.ZERO)
consumer.poll(jt.Duration.ofMillis(100))
partitions = consumer.assignment()
}
require(!partitions.isEmpty)
Expand Down
Expand Up @@ -20,8 +20,7 @@ package org.apache.spark.sql.kafka010
import java.io.{File, IOException}
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
import java.time.Duration
import java.util.{Collections, Map => JMap, Properties, Set => JSet, UUID}
import java.util.{Collections, Map => JMap, Properties, UUID}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
Expand All @@ -33,7 +32,6 @@ import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions, NewTopic}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
Expand All @@ -44,6 +42,7 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.KafkaSourceProvider.kafkaParamsForDriver
import org.apache.spark.util.{ShutdownHookManager, Utils}

/**
Expand Down Expand Up @@ -284,43 +283,29 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
}

def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = {
val kc = new KafkaConsumer[String, String](consumerConfiguration)
logInfo("Created consumer to get earliest offsets")
kc.subscribe(topics.asJavaCollection)
val partitions = getPartitions(kc)
kc.pause(partitions)
kc.seekToBeginning(partitions)
val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap
kc.close()
logInfo("Closed consumer to get earliest offsets")
offsets
val reader = getKafkaOffsetReader(topics)
try {
reader.fetchEarliestOffsets()
} finally {
reader.close()
}
}

def getLatestOffsets(topics: Set[String]): Map[TopicPartition, Long] = {
val kc = new KafkaConsumer[String, String](consumerConfiguration)
logInfo("Created consumer to get latest offsets")
kc.subscribe(topics.asJavaCollection)
val partitions = getPartitions(kc)
kc.pause(partitions)
kc.seekToEnd(partitions)
val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap
kc.close()
logInfo("Closed consumer to get latest offsets")
offsets
val reader = getKafkaOffsetReader(topics)
try {
reader.fetchLatestOffsets(None)
} finally {
reader.close()
}
}

private def getPartitions(consumer: KafkaConsumer[String, String]): JSet[TopicPartition] = {
var partitions = Set.empty[TopicPartition].asJava
val startTimeMs = System.currentTimeMillis()
val timeoutMs = timeout(1.minute).value.toMillis
while (partitions.isEmpty&& System.currentTimeMillis() - startTimeMs < timeoutMs) {
// Poll to get the latest assigned partitions
consumer.poll(Duration.ZERO)
partitions = consumer.assignment()
}
require(!partitions.isEmpty)
logDebug(s"Partitions assigned to consumer: $partitions")
partitions
private def getKafkaOffsetReader(topics: Set[String]): KafkaOffsetReader = {
new KafkaOffsetReader(
SubscribeStrategy(topics.toSeq),
kafkaParamsForDriver(Map("bootstrap.servers" -> brokerAddress)),
Map.empty,
driverGroupIdPrefix = "group-KafkaTestUtils")
}

def listConsumerGroups(): ListConsumerGroupsResult = {
Expand Down Expand Up @@ -376,16 +361,6 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
}
}

private def consumerConfiguration: Properties = {
val props = new Properties()
props.put("bootstrap.servers", brokerAddress)
props.put("group.id", "group-KafkaTestUtils-" + Random.nextInt)
props.put("value.deserializer", classOf[StringDeserializer].getName)
props.put("key.deserializer", classOf[StringDeserializer].getName)
props.put("enable.auto.commit", "false")
props
}

/** Verify topic is deleted in all places, e.g, brokers, zookeeper. */
private def verifyTopicDeletion(
topic: String,
Expand Down

0 comments on commit f92c758

Please sign in to comment.