From a87d9eab59eea4213eaf2f31687b9544b536318a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 2 May 2017 11:52:52 +0100 Subject: [PATCH] MINOR: Fix error logged if not enough alive brokers for transactions state topic --- .../main/scala/kafka/server/KafkaApis.scala | 46 +++++++++++-------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1e1f0d5576729..3d821f78fb693 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -802,27 +802,35 @@ class KafkaApis(val requestChannel: RequestChannel, } private def createInternalTopic(topic: String): MetadataResponse.TopicMetadata = { - if (topic == null) throw new IllegalArgumentException("topic must not be null") + if (topic == null) + throw new IllegalArgumentException("topic must not be null") val aliveBrokers = metadataCache.getAliveBrokers - val requiredReplicas = if (topic == GroupMetadataTopicName) - config.offsetsTopicReplicationFactor - else - config.transactionTopicReplicationFactor - - if (aliveBrokers.size < requiredReplicas) { - error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " + - s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " + - s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " + - s"and not all brokers are up yet.") - new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, java.util.Collections.emptyList()) - } else { - if (topic == GroupMetadataTopicName) - createTopic(topic, config.offsetsTopicPartitions, - config.offsetsTopicReplicationFactor.toInt, groupCoordinator.offsetsTopicConfigs) - else - createTopic(topic, config.transactionTopicPartitions, - config.transactionTopicReplicationFactor.toInt, txnCoordinator.transactionTopicConfigs) + + topic match { + case GroupMetadataTopicName => + if (aliveBrokers.size < config.offsetsTopicReplicationFactor) { + error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " + + s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " + + s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " + + s"and not all brokers are up yet.") + new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, java.util.Collections.emptyList()) + } else { + createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt, + groupCoordinator.offsetsTopicConfigs) + } + case TransactionStateTopicName => + if (aliveBrokers.size < config.transactionTopicReplicationFactor) { + error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " + + s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " + + s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " + + s"and not all brokers are up yet.") + new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, java.util.Collections.emptyList()) + } else { + createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt, + txnCoordinator.transactionTopicConfigs) + } + case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic") } }