From 5e205ea74205961528c58305dd6ff0c68856aa8c Mon Sep 17 00:00:00 2001 From: Grant Henke Date: Tue, 12 Jan 2016 16:59:55 -0600 Subject: [PATCH] KAFKA-3012: Avoid reserved.broker.max.id collisions on upgrade Provides a configuration to opt out of broker id generation. --- .../main/scala/kafka/server/KafkaConfig.scala | 11 ++++++++++- .../main/scala/kafka/server/KafkaServer.scala | 6 +++--- .../unit/kafka/server/KafkaConfigTest.scala | 4 +++- .../server/ServerGenerateBrokerIdTest.scala | 16 ++++++++++++++++ docs/upgrade.html | 8 +++++++- 5 files changed, 39 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 54b79aba8a95..4911809631e8 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -42,6 +42,7 @@ object Defaults { val ZkEnableSecureAcls = false /** ********* General Configuration ***********/ + val BrokerIdGenerationEnable = true val MaxReservedBrokerId = 1000 val BrokerId = -1 val MessageMaxBytes = 1000000 + MessageSet.LogOverhead @@ -190,6 +191,7 @@ object KafkaConfig { val ZkSyncTimeMsProp = "zookeeper.sync.time.ms" val ZkEnableSecureAclsProp = "zookeeper.set.acl" /** ********* General Configuration ***********/ + val BrokerIdGenerationEnableProp = "broker.id.generation.enable" val MaxReservedBrokerIdProp = "reserved.broker.max.id" val BrokerIdProp = "broker.id" val MessageMaxBytesProp = "message.max.bytes" @@ -338,6 +340,7 @@ object KafkaConfig { val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader" val ZkEnableSecureAclsDoc = "Set client to use secure ACLs" /** ********* General Configuration ***********/ + val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server? When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed." val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id" val BrokerIdDoc = "The broker id for this server. If unset, a unique broker id will be generated." + "To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids" + @@ -522,6 +525,7 @@ object KafkaConfig { .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc) /** ********* General Configuration ***********/ + .define(BrokerIdGenerationEnableProp, BOOLEAN, Defaults.BrokerIdGenerationEnable, MEDIUM, BrokerIdGenerationEnableDoc) .define(MaxReservedBrokerIdProp, INT, Defaults.MaxReservedBrokerId, atLeast(0), MEDIUM, MaxReservedBrokerIdDoc) .define(BrokerIdProp, INT, Defaults.BrokerId, HIGH, BrokerIdDoc) .define(MessageMaxBytesProp, INT, Defaults.MessageMaxBytes, atLeast(0), HIGH, MessageMaxBytesDoc) @@ -718,6 +722,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp) /** ********* General Configuration ***********/ + val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp) val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp) var brokerId: Int = getInt(KafkaConfig.BrokerIdProp) val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp) @@ -927,7 +932,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra validateValues() private def validateValues() { - require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id") + if(brokerIdGenerationEnable) { + require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id") + } else { + require(brokerId >= 0, "broker.id must be equal or greater than 0") + } require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1") require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0") require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1") diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index aaa6ea9e38d3..454633eb9f89 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -609,9 +609,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr } /** - * Generates new brokerId or reads from meta.properties based on following conditions + * Generates new brokerId if enabled or reads from meta.properties based on following conditions *
    - *
  1. config has no broker.id provided , generates a broker.id based on Zookeeper's sequence + *
  2. config has no broker.id provided and broker id generation is enabled, generates a broker.id based on Zookeeper's sequence *
  3. stored broker.id in meta.properties doesn't match in all the log.dirs throws InconsistentBrokerIdException *
  4. config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException *
  5. config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id @@ -637,7 +637,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr throw new InconsistentBrokerIdException("Failed to match brokerId across logDirs") else if(brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last != brokerId) throw new InconsistentBrokerIdException("Configured brokerId %s doesn't match stored brokerId %s in meta.properties".format(brokerId, brokerIdSet.last)) - else if(brokerIdSet.size == 0 && brokerId < 0) // generate a new brokerId from Zookeeper + else if(brokerIdSet.size == 0 && brokerId < 0 && config.brokerIdGenerationEnable) // generate a new brokerId from Zookeeper brokerId = generateBrokerId else if(brokerIdSet.size == 1) // pick broker.id from meta.properties brokerId = brokerIdSet.last diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 8a5038f3e7a0..9ddc2c1e426d 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -408,7 +408,7 @@ class KafkaConfigTest { case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.AuthorizerClassNameProp => //ignore string - + case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.HostNameProp => // ignore string case KafkaConfig.AdvertisedHostNameProp => //ignore string @@ -526,6 +526,7 @@ class KafkaConfigTest { defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") // For ZkConnectionTimeoutMs defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234") + defaults.put(KafkaConfig.BrokerIdGenerationEnableProp, "false") defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1") defaults.put(KafkaConfig.BrokerIdProp, "1") defaults.put(KafkaConfig.HostNameProp, "127.0.0.1") @@ -542,6 +543,7 @@ class KafkaConfigTest { val config = KafkaConfig.fromProps(defaults) Assert.assertEquals("127.0.0.1:2181", config.zkConnect) Assert.assertEquals(1234, config.zkConnectionTimeoutMs) + Assert.assertEquals(false, config.brokerIdGenerationEnable) Assert.assertEquals(1, config.maxReservedBrokerId) Assert.assertEquals(1, config.brokerId) Assert.assertEquals("127.0.0.1", config.hostName) diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index 9afb2caee27a..60ec56192009 100755 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -81,6 +81,22 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness { TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } + @Test + def testDisableGeneratedBrokerId() { + val props3 = TestUtils.createBrokerConfig(3, zkConnect) + props3.put(KafkaConfig.BrokerIdGenerationEnableProp, "false") + // Set reserve broker ids to cause collision and ensure disabling broker id generation ignores the setting + props3.put(KafkaConfig.MaxReservedBrokerIdProp, "0") + val config3 = KafkaConfig.fromProps(props3) + val server3 = new KafkaServer(config3) + server3.startup() + assertEquals(server3.config.brokerId,3) + server3.shutdown() + assertTrue(verifyBrokerMetadata(server3.config.logDirs,3)) + CoreUtils.rm(server3.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) + } + @Test def testMultipleLogDirsMetaProps() { // add multiple logDirs and check if the generate brokerId is stored in all of them diff --git a/docs/upgrade.html b/docs/upgrade.html index eccd626a7ad3..c72395770ede 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -39,7 +39,6 @@
    Potential breaking cha
  6. Broker IDs above 1000 are now reserved by default to automatically assigned broker IDs. If your cluster has existing broker IDs above that threshold make sure to increase the reserved.broker.max.id broker configuration property accordingly.
  7. Configuration parameter replica.lag.max.messages was removed. Partition leaders will no longer consider the number of lagging messages when deciding which replicas are in sync.
  8. Configuration parameter replica.lag.time.max.ms now refers not just to the time passed since last fetch request from replica, but also to time since the replica last caught up. Replicas that are still fetching messages from leaders but did not catch up to the latest messages in replica.lag.time.max.ms will be considered out of sync.
  9. -
  10. Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics.
  11. MirrorMaker no longer supports multiple target clusters. As a result it will only accept a single --consumer.config parameter. To mirror multiple source clusters, you will need at least one MirrorMaker instance per source cluster, each with its own consumer configuration.
  12. Tools packaged under org.apache.kafka.clients.tools.* have been moved to org.apache.kafka.tools.*. All included scripts will still function as usual, only custom code directly importing these classes will be affected.
  13. The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh.
  14. @@ -49,6 +48,13 @@
    Potential breaking cha
  15. By default all command line tools will print all logging messages to stderr instead of stdout.
  16. +
    Notable changes in 0.9.0.1
    + +
      +
    • The new broker id generation feature can be disable by setting broker.id.generation.enable to false.
    • +
    • Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics.
    • +
    +
    Deprecations in 0.9.0.0