From 4cac9f941e2c13798c30acec810047a189be7b0d Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Sun, 4 Feb 2018 14:33:13 +0000 Subject: [PATCH 1/4] KAFKA-6501: Dynamic broker config tests updates and metrics fix --- .../scala/kafka/network/SocketServer.scala | 1 + .../kafka/server/AbstractFetcherManager.scala | 3 +- .../scala/kafka/server/MetadataCache.scala | 11 +- .../DynamicBrokerReconfigurationTest.scala | 103 +++++++++++++----- .../unit/kafka/server/MetadataCacheTest.scala | 13 +-- 5 files changed, 92 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index fef412b71989..578da8d51409 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -742,6 +742,7 @@ private[kafka] class Processor(val id: Int, close(channel.id) } selector.close() + removeMetric("IdlePercent", Map("networkProcessor" -> id.toString)) } // 'protected` to allow override for testing diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 312123c3ab69..aa085857560b 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -91,7 +91,8 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri } } - private def getFetcherId(topic: String, partitionId: Int) : Int = { + // Visibility for testing + private[server] def getFetcherId(topic: String, partitionId: Int) : Int = { lock synchronized { Utils.abs(31 * topic.hashCode() + partitionId) % numFetchersPerBroker } diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index b4a015de5768..6096e6cf25db 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -103,12 +103,15 @@ class MetadataCache(brokerId: Int) extends Logging { } } - def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] = + private def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] = inReadLock(partitionMetadataLock) { + // Returns None if broker is not alive or if the broker does not have a listener named `listenerName`. + // Since listeners can be added dynamically, a broker with a missing listener could be a transient error. aliveNodes.get(brokerId).map { nodeMap => - nodeMap.getOrElse(listenerName, - throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have listener with name `$listenerName`")) - } + val node = nodeMap.get(listenerName) + warn(s"Broker endpoint not found for broker $brokerId listenerName $listenerName") + node + }.getOrElse(None) } // errorUnavailableEndpoints exists to support v0 MetadataResponses diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index cb2ac5244a06..c3f643b89705 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -26,6 +26,7 @@ import java.util.{Collections, Properties} import java.util.concurrent._ import javax.management.ObjectName +import com.yammer.metrics.Metrics import kafka.admin.ConfigCommand import kafka.api.{KafkaSasl, SaslSetup} import kafka.coordinator.group.OffsetConfig @@ -102,10 +103,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN") props.put(KafkaConfig.ZkEnableSecureAclsProp, "true") props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(",")) - props.put(KafkaConfig.LogSegmentBytesProp, "2000") - props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") + props.put(KafkaConfig.LogSegmentBytesProp, "2000") // low value to test log rolling on config update + props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads + props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") // non-default value to trigger a new metric props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") - props.put(KafkaConfig.PasswordEncoderOldSecretProp, "old-dynamic-config-secret") + props.put(KafkaConfig.PasswordEncoderOldSecretProp, "old-dynamic-config-secret") // for testing secret rotation props ++= sslProperties1 addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal) @@ -261,9 +263,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000") reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogCleanerThreadsProp, "2")) - // Verify cleaner config was updated + // Verify cleaner config was updated. Wait for one of the configs to be updated and verify + // that all other others were updated at the same time since they are reconfigured together val newCleanerConfig = servers.head.logManager.cleaner.currentConfig - assertEquals(2, newCleanerConfig.numThreads) + TestUtils.waitUntilTrue(() => newCleanerConfig.numThreads == 2, "Log cleaner not reconfigured") assertEquals(20000000, newCleanerConfig.dedupeBufferSize) assertEquals(0.8, newCleanerConfig.dedupeBufferLoadFactor, 0.001) assertEquals(300000, newCleanerConfig.ioBufferSize) @@ -291,7 +294,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val (producerThread, consumerThread) = startProduceConsume(retries = 0) val props = new Properties - props.put(KafkaConfig.LogSegmentBytesProp, "10000") + props.put(KafkaConfig.LogSegmentBytesProp, "4000") props.put(KafkaConfig.LogRollTimeMillisProp, TimeUnit.HOURS.toMillis(2).toString) props.put(KafkaConfig.LogRollTimeJitterMillisProp, TimeUnit.HOURS.toMillis(1).toString) props.put(KafkaConfig.LogIndexSizeMaxBytesProp, "100000") @@ -312,7 +315,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props.put(KafkaConfig.LogPreAllocateProp, true.toString) props.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.toString) props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000") - reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogSegmentBytesProp, "10000")) + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogSegmentBytesProp, "4000")) // Verify that all broker defaults have been updated servers.foreach { server => @@ -325,7 +328,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val newLogConfig = LogConfig(KafkaServer.copyKafkaConfigToLog(servers.head.config)) assertEquals(newLogConfig, servers.head.logManager.currentDefaultConfig) val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) - TestUtils.waitUntilTrue(() => log.config.segmentSize == 10000, "Existing topic config using defaults not updated") + TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config using defaults not updated") props.asScala.foreach { case (k, v) => val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k) val expectedValue = if (k == KafkaConfig.LogCleanupPolicyProp) s"[$v]" else v @@ -335,7 +338,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet consumerThread.waitForMatchingRecords(record => record.timestampType == TimestampType.LOG_APPEND_TIME) // Verify that the new config is actually used for new segments of existing logs - TestUtils.waitUntilTrue(() => log.logSegments.exists(_.size > 9000), "Log segment size increase not applied") + TestUtils.waitUntilTrue(() => log.logSegments.exists(_.size > 3000), "Log segment size increase not applied") // Verify that overridden topic configs are not updated when broker default is updated val log2 = servers.head.logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)) @@ -383,19 +386,20 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet // For others, thread count should be configuredCount * threadMultiplier * numBrokers val threadMultiplier = Map( requestHandlerPrefix -> 1, - networkThreadPrefix -> 2, // 2 endpoints + networkThreadPrefix -> 2, // 2 endpoints fetcherThreadPrefix -> (servers.size - 1) ) // Tolerate threads left over from previous tests - def leftOverThreadCount(prefix: String, perBrokerCount: Int) : Int = { + def leftOverThreadCount(prefix: String, perBrokerCount: Int): Int = { val count = matchingThreads(prefix).size - perBrokerCount * servers.size * threadMultiplier(prefix) if (count > 0) count else 0 } + val leftOverThreads = Map( requestHandlerPrefix -> leftOverThreadCount(requestHandlerPrefix, servers.head.config.numIoThreads), - networkThreadPrefix -> leftOverThreadCount(networkThreadPrefix, servers.head.config.numNetworkThreads), - fetcherThreadPrefix -> leftOverThreadCount(fetcherThreadPrefix, servers.head.config.numReplicaFetchers) + networkThreadPrefix -> leftOverThreadCount(networkThreadPrefix, servers.head.config.numNetworkThreads), + fetcherThreadPrefix -> leftOverThreadCount(fetcherThreadPrefix, servers.head.config.numReplicaFetchers) ) def maybeVerifyThreadPoolSize(propName: String, size: Int, threadPrefix: String): Unit = { @@ -404,21 +408,26 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet if (expectedCountPerBroker > 0) verifyThreads(threadPrefix, expectedCountPerBroker, ignoreCount) } + def reducePoolSize(propName: String, currentSize: => Int, threadPrefix: String): Int = { val newSize = if (currentSize / 2 == 0) 1 else currentSize / 2 resizeThreadPool(propName, newSize, threadPrefix) newSize } + def increasePoolSize(propName: String, currentSize: => Int, threadPrefix: String): Int = { - resizeThreadPool(propName, currentSize * 2, threadPrefix) - currentSize * 2 + val newSize = currentSize * 2 - 1 + resizeThreadPool(propName, newSize, threadPrefix) + newSize } + def resizeThreadPool(propName: String, newSize: Int, threadPrefix: String): Unit = { val props = new Properties props.put(propName, newSize.toString) reconfigureServers(props, perBrokerConfig = false, (propName, newSize.toString)) maybeVerifyThreadPoolSize(propName, newSize, threadPrefix) } + def verifyThreadPoolResize(propName: String, currentSize: => Int, threadPrefix: String, mayReceiveDuplicates: Boolean): Unit = { maybeVerifyThreadPoolSize(propName, currentSize, threadPrefix) val numRetries = if (mayReceiveDuplicates) 100 else 0 @@ -444,6 +453,47 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet "", mayReceiveDuplicates = false) verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads, networkThreadPrefix, mayReceiveDuplicates = true) + + verifyProcessorMetrics() + verifyMarkPartitionsForTruncation() + } + + // Verify that metrics from processors that were removed have been deleted. + // Since processor ids are not reused, it is sufficient to check metrics count + // based on the current number of processors + private def verifyProcessorMetrics(): Unit = { + val numProcessors = servers.head.config.numNetworkThreads * 2 // 2 listeners + + val kafkaMetrics = servers.head.metrics.metrics().keySet.asScala + .filter(_.tags.containsKey("networkProcessor")) + .groupBy(_.tags.get("networkProcessor")) + assertEquals(numProcessors, kafkaMetrics.size) + + Metrics.defaultRegistry.allMetrics.keySet.asScala + .filter(_.getMBeanName.contains("networkProcessor=")) + .groupBy(_.getName) + .foreach { case (name, set) => assertEquals(s"Metrics not deleted $name", numProcessors, set.size) } + } + + // Verify that replicaFetcherManager.markPartitionsForTruncation uses the current fetcher thread size + // to obtain partition assignment + private def verifyMarkPartitionsForTruncation(): Unit = { + val leaderId = 0 + val partitions = (0 until numPartitions).map(i => new TopicPartition(topic, i)).filter { tp => + zkClient.getLeaderForPartition(tp) == Some(leaderId) + } + assertTrue(s"Partitons not found with leader $leaderId", partitions.nonEmpty) + partitions.foreach { tp => + (1 to 2).foreach { i => + val replicaFetcherManager = servers(i).replicaManager.replicaFetcherManager + val truncationOffset = tp.partition + replicaFetcherManager.markPartitionsForTruncation(leaderId, tp, truncationOffset) + val fetcherThreads = replicaFetcherManager.fetcherThreadMap.filter(_._2.partitionStates.contains(tp)) + assertEquals(1, fetcherThreads.size) + assertEquals(replicaFetcherManager.getFetcherId(tp.topic, tp.partition), fetcherThreads.head._1.fetcherId) + assertEquals(truncationOffset, fetcherThreads.head._2.partitionStates.stateValue(tp).fetchOffset) + } + } } @Test @@ -672,7 +722,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol, saslMechanisms: Seq[String]): Unit = { val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head - val producer1 = createProducer(listenerName, securityProtocol, saslMechanism) + val producer1 = createProducer(listenerName, securityProtocol, saslMechanism, retries = 1000) val consumer1 = createConsumer(listenerName, securityProtocol, saslMechanism, s"remove-listener-group-$securityProtocol") verifyProduceConsume(producer1, consumer1, numRecords = 10, topic) @@ -716,7 +766,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet private def verifyListener(securityProtocol: SecurityProtocol, saslMechanism: Option[String]): Unit = { val mechanism = saslMechanism.getOrElse("") - val producer = createProducer(securityProtocol.name, securityProtocol, mechanism) + val retries = 1000 // since it may take time for metadata to be updated on all brokers + val producer = createProducer(securityProtocol.name, securityProtocol, mechanism, retries) val consumer = createConsumer(securityProtocol.name, securityProtocol, mechanism, s"add-listener-group-$securityProtocol-$mechanism") verifyProduceConsume(producer, consumer, numRecords = 10, topic) @@ -785,11 +836,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props } - private def createProducer(listenerName: String, securityProtocol: SecurityProtocol, - saslMechanism: String): KafkaProducer[String, String] = { + private def createProducer(listenerName: String, + securityProtocol: SecurityProtocol, + saslMechanism: String, + retries: Int): KafkaProducer[String, String] = { val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName)) val producer = TestUtils.createNewProducer(bootstrapServers, - acks = -1, retries = 0, + acks = -1, retries = retries, securityProtocol = securityProtocol, keySerializer = new StringSerializer, valueSerializer = new StringSerializer, @@ -834,12 +887,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet topic: String): Unit = { val producerRecords = (1 to numRecords).map(i => new ProducerRecord(topic, s"key$i", s"value$i")) producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS)) - - val records = new ArrayBuffer[ConsumerRecord[String, String]] + var received = 0 TestUtils.waitUntilTrue(() => { - records ++= consumer.poll(50).asScala - records.size == numRecords - }, s"Consumed ${records.size} records until timeout instead of the expected $numRecords records") + received += consumer.poll(50).count + received >= numRecords + }, s"Consumed $received records until timeout instead of the expected $numRecords records") + assertEquals(numRecords, received) } private def verifyAuthenticationFailure(producer: KafkaProducer[_, _]): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 383c1e238c7a..0ee73659780e 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -19,7 +19,6 @@ package kafka.server import java.util import util.Arrays.asList -import kafka.common.BrokerEndPointNotAvailableException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -289,14 +288,10 @@ class MetadataCacheTest { brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) - try { - val result = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL)) - fail(s"Exception should be thrown by `getTopicMetadata` with non-supported SecurityProtocol, $result was returned instead") - } - catch { - case _: BrokerEndPointNotAvailableException => //expected - } - + val topicMetadata = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL)) + assertEquals(1, topicMetadata.size) + assertEquals(1, topicMetadata.head.partitionMetadata.size) + assertEquals(-1, topicMetadata.head.partitionMetadata.get(0).leaderId) } @Test From d4d97525dc198c9a67f859142a91714b6ec89cbc Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 8 Feb 2018 10:26:01 +0000 Subject: [PATCH 2/4] Address review comments --- .../main/scala/kafka/network/SocketServer.scala | 17 ++++++++++++----- .../main/scala/kafka/server/MetadataCache.scala | 7 ++++--- .../DynamicBrokerReconfigurationTest.scala | 10 +++++++--- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 578da8d51409..049ee33a4a6a 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -433,6 +433,12 @@ private[kafka] class Acceptor(val endPoint: EndPoint, } +private[kafka] object Processor { + val IdlePercentMetricName = "IdlePercent" + val NetworkProcessorMetricTag = "networkProcessor" + val ListenerMetricTag = "listener" +} + /** * Thread that processes all requests from a single connection. There are N of these running in parallel * each of which has its own selector @@ -451,6 +457,7 @@ private[kafka] class Processor(val id: Int, memoryPool: MemoryPool, logContext: LogContext) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + import Processor._ private object ConnectionId { def fromString(s: String): Option[ConnectionId] = s.split("-") match { case Array(local, remote, index) => BrokerEndPoint.parseHostPort(local).flatMap { case (localHost, localPort) => @@ -471,8 +478,8 @@ private[kafka] class Processor(val id: Int, private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]() private[kafka] val metricTags = mutable.LinkedHashMap( - "listener" -> listenerName.value, - "networkProcessor" -> id.toString + ListenerMetricTag -> listenerName.value, + NetworkProcessorMetricTag -> id.toString ).asJava newGauge("ResponseQueueSize", @@ -482,7 +489,7 @@ private[kafka] class Processor(val id: Int, Map("processor" -> id.toString) ) - newGauge("IdlePercent", + newGauge(IdlePercentMetricName, new Gauge[Double] { def value = { Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags))).fold(0.0)(_.value) @@ -490,7 +497,7 @@ private[kafka] class Processor(val id: Int, }, // for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric // also includes the listener name) - Map("networkProcessor" -> id.toString) + Map(NetworkProcessorMetricTag -> id.toString) ) private val selector = createSelector( @@ -742,7 +749,7 @@ private[kafka] class Processor(val id: Int, close(channel.id) } selector.close() - removeMetric("IdlePercent", Map("networkProcessor" -> id.toString)) + removeMetric(IdlePercentMetricName, Map(NetworkProcessorMetricTag -> id.toString)) } // 'protected` to allow override for testing diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 6096e6cf25db..0e58de19cfed 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -107,11 +107,12 @@ class MetadataCache(brokerId: Int) extends Logging { inReadLock(partitionMetadataLock) { // Returns None if broker is not alive or if the broker does not have a listener named `listenerName`. // Since listeners can be added dynamically, a broker with a missing listener could be a transient error. - aliveNodes.get(brokerId).map { nodeMap => + aliveNodes.get(brokerId).flatMap { nodeMap => val node = nodeMap.get(listenerName) - warn(s"Broker endpoint not found for broker $brokerId listenerName $listenerName") + if (node.isEmpty) + error(s"Broker endpoint not found for broker $brokerId listenerName $listenerName") node - }.getOrElse(None) + } } // errorUnavailableEndpoints exists to support v0 MetadataResponses diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index c3f643b89705..c43922d71c0c 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -32,6 +32,7 @@ import kafka.api.{KafkaSasl, SaslSetup} import kafka.coordinator.group.OffsetConfig import kafka.log.LogConfig import kafka.message.ProducerCompressionCodec +import kafka.network.Processor import kafka.utils._ import kafka.utils.Implicits._ import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness} @@ -465,12 +466,15 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val numProcessors = servers.head.config.numNetworkThreads * 2 // 2 listeners val kafkaMetrics = servers.head.metrics.metrics().keySet.asScala - .filter(_.tags.containsKey("networkProcessor")) - .groupBy(_.tags.get("networkProcessor")) + .filter(_.tags.containsKey(Processor.NetworkProcessorMetricTag)) + .groupBy(_.tags.get(Processor.NetworkProcessorMetricTag)) assertEquals(numProcessors, kafkaMetrics.size) Metrics.defaultRegistry.allMetrics.keySet.asScala - .filter(_.getMBeanName.contains("networkProcessor=")) + .filter { metric => + val mbeanName = metric.getMBeanName + mbeanName.contains(s"${Processor.NetworkProcessorMetricTag}=") || mbeanName.contains("processor=") + } .groupBy(_.getName) .foreach { case (name, set) => assertEquals(s"Metrics not deleted $name", numProcessors, set.size) } } From d005c79ed46a0f23425e8521577312555bd57dda Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 8 Feb 2018 19:53:25 +0000 Subject: [PATCH 3/4] Remove metrics left over from previous tests to avoid build failure --- .../DynamicBrokerReconfigurationTest.scala | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index c43922d71c0c..4ed719d0e101 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -27,6 +27,7 @@ import java.util.concurrent._ import javax.management.ObjectName import com.yammer.metrics.Metrics +import com.yammer.metrics.core.MetricName import kafka.admin.ConfigCommand import kafka.api.{KafkaSasl, SaslSetup} import kafka.coordinator.group.OffsetConfig @@ -94,6 +95,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism))) super.setUp() + clearLeftOverProcessorMetrics() // clear metrics left over from other tests so that new ones can be tested + (0 until numServers).foreach { brokerId => val props = TestUtils.createBrokerConfig(brokerId, zkConnect, trustStoreFile = Some(trustStoreFile1)) @@ -459,6 +462,16 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet verifyMarkPartitionsForTruncation() } + private def isProcessorMetric(metricName: MetricName): Boolean = { + val mbeanName = metricName.getMBeanName + mbeanName.contains(s"${Processor.NetworkProcessorMetricTag}=") || mbeanName.contains("processor=") + } + + private def clearLeftOverProcessorMetrics(): Unit = { + val metricsFromOldTests = Metrics.defaultRegistry.allMetrics.keySet.asScala.filter(isProcessorMetric) + metricsFromOldTests.foreach(Metrics.defaultRegistry.removeMetric) + } + // Verify that metrics from processors that were removed have been deleted. // Since processor ids are not reused, it is sufficient to check metrics count // based on the current number of processors @@ -471,10 +484,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet assertEquals(numProcessors, kafkaMetrics.size) Metrics.defaultRegistry.allMetrics.keySet.asScala - .filter { metric => - val mbeanName = metric.getMBeanName - mbeanName.contains(s"${Processor.NetworkProcessorMetricTag}=") || mbeanName.contains("processor=") - } + .filter(isProcessorMetric) .groupBy(_.getName) .foreach { case (name, set) => assertEquals(s"Metrics not deleted $name", numProcessors, set.size) } } From 0c041868cd82ee8463e8d9f0eca07439227334fb Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Thu, 8 Feb 2018 20:55:44 +0000 Subject: [PATCH 4/4] Address review comments --- .../scala/kafka/network/RequestChannel.scala | 18 +++++++++++++++--- .../scala/kafka/network/SocketServer.scala | 8 -------- .../scala/kafka/server/MetadataCache.scala | 12 ++++++------ .../DynamicBrokerReconfigurationTest.scala | 4 ++-- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 144632c6c5d4..8a17528bfb71 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -24,7 +24,6 @@ import java.util.concurrent._ import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.{Gauge, Meter} import kafka.metrics.KafkaMetricsGroup -import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction} import kafka.utils.{Logging, NotNothing} import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.network.Send @@ -40,6 +39,10 @@ import scala.reflect.ClassTag object RequestChannel extends Logging { private val requestLogger = Logger("kafka.request.logger") + val RequestQueueSizeMetric = "RequestQueueSize" + val ResponseQueueSizeMetric = "ResponseQueueSize" + val ProcessorMetricTag = "processor" + def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled sealed trait BaseRequest @@ -241,15 +244,16 @@ object RequestChannel extends Logging { } class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup { + import RequestChannel._ val metrics = new RequestChannel.Metrics private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val processors = new ConcurrentHashMap[Int, Processor]() - newGauge("RequestQueueSize", new Gauge[Int] { + newGauge(RequestQueueSizeMetric, new Gauge[Int] { def value = requestQueue.size }) - newGauge("ResponseQueueSize", new Gauge[Int]{ + newGauge(ResponseQueueSizeMetric, new Gauge[Int]{ def value = processors.values.asScala.foldLeft(0) {(total, processor) => total + processor.responseQueueSize } @@ -258,10 +262,18 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup { def addProcessor(processor: Processor): Unit = { if (processors.putIfAbsent(processor.id, processor) != null) warn(s"Unexpected processor with processorId ${processor.id}") + + newGauge(ResponseQueueSizeMetric, + new Gauge[Int] { + def value = processor.responseQueueSize + }, + Map(ProcessorMetricTag -> processor.id.toString) + ) } def removeProcessor(processorId: Int): Unit = { processors.remove(processorId) + removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString)) } /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 049ee33a4a6a..d37b52315940 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -482,13 +482,6 @@ private[kafka] class Processor(val id: Int, NetworkProcessorMetricTag -> id.toString ).asJava - newGauge("ResponseQueueSize", - new Gauge[Int] { - def value = responseQueue.size() - }, - Map("processor" -> id.toString) - ) - newGauge(IdlePercentMetricName, new Gauge[Double] { def value = { @@ -800,7 +793,6 @@ private[kafka] class Processor(val id: Int, override def shutdown(): Unit = { super.shutdown() - removeMetric("ResponseQueueSize", Map("processor" -> id.toString)) removeMetric("IdlePercent", Map("networkProcessor" -> id.toString)) } diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 0e58de19cfed..eb2d835d876a 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -107,12 +107,7 @@ class MetadataCache(brokerId: Int) extends Logging { inReadLock(partitionMetadataLock) { // Returns None if broker is not alive or if the broker does not have a listener named `listenerName`. // Since listeners can be added dynamically, a broker with a missing listener could be a transient error. - aliveNodes.get(brokerId).flatMap { nodeMap => - val node = nodeMap.get(listenerName) - if (node.isEmpty) - error(s"Broker endpoint not found for broker $brokerId listenerName $listenerName") - node - } + aliveNodes.get(brokerId).flatMap(_.get(listenerName)) } // errorUnavailableEndpoints exists to support v0 MetadataResponses @@ -207,6 +202,11 @@ class MetadataCache(brokerId: Int) extends Logging { aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) aliveNodes(broker.id) = nodes.asScala } + aliveNodes.get(brokerId).foreach { listenerMap => + val listeners = listenerMap.keySet + if (!aliveNodes.values.forall(_.keySet == listeners)) + error(s"Listeners are not identical across brokers: $aliveNodes") + } val deletedPartitions = new mutable.ArrayBuffer[TopicPartition] updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) => diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 4ed719d0e101..4cdc98943272 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -33,7 +33,7 @@ import kafka.api.{KafkaSasl, SaslSetup} import kafka.coordinator.group.OffsetConfig import kafka.log.LogConfig import kafka.message.ProducerCompressionCodec -import kafka.network.Processor +import kafka.network.{Processor, RequestChannel} import kafka.utils._ import kafka.utils.Implicits._ import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness} @@ -464,7 +464,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet private def isProcessorMetric(metricName: MetricName): Boolean = { val mbeanName = metricName.getMBeanName - mbeanName.contains(s"${Processor.NetworkProcessorMetricTag}=") || mbeanName.contains("processor=") + mbeanName.contains(s"${Processor.NetworkProcessorMetricTag}=") || mbeanName.contains(s"${RequestChannel.ProcessorMetricTag}=") } private def clearLeftOverProcessorMetrics(): Unit = {