From ff4a509902a888563a1dc39fd129eba7f0c53bb9 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Thu, 10 Mar 2016 13:57:46 -0800 Subject: [PATCH 1/2] KAFKA-3373 add 'log' prefix to configurations in KIP-31/32 --- core/src/main/scala/kafka/log/LogConfig.scala | 12 ++++---- .../scala/kafka/server/ConfigHandler.scala | 4 +-- .../main/scala/kafka/server/KafkaConfig.scala | 30 +++++++++---------- .../main/scala/kafka/server/KafkaServer.scala | 6 ++-- .../unit/kafka/server/KafkaConfigTest.scala | 4 +-- .../unit/kafka/server/LogOffsetTest.scala | 24 +++++++-------- docs/upgrade.html | 4 +-- .../services/kafka/config_property.py | 2 +- tests/kafkatest/tests/upgrade_test.py | 6 ++-- 9 files changed, 45 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index a76dce784b1d7..ffec85a95edea 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -47,9 +47,9 @@ object Defaults { val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas val CompressionType = kafka.server.Defaults.CompressionType val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable - val MessageFormatVersion = kafka.server.Defaults.MessageFormatVersion - val MessageTimestampType = kafka.server.Defaults.MessageTimestampType - val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.MessageTimestampDifferenceMaxMs + val MessageFormatVersion = kafka.server.Defaults.LogMessageFormatVersion + val MessageTimestampType = kafka.server.Defaults.LogMessageTimestampType + val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.LogMessageTimestampDifferenceMaxMs } case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) { @@ -110,9 +110,9 @@ object LogConfig { val MinInSyncReplicasProp = "min.insync.replicas" val CompressionTypeProp = "compression.type" val PreAllocateEnableProp = "preallocate" - val MessageFormatVersionProp = KafkaConfig.MessageFormatVersionProp - val MessageTimestampTypeProp = KafkaConfig.MessageTimestampTypeProp - val MessageTimestampDifferenceMaxMsProp = KafkaConfig.MessageTimestampDifferenceMaxMsProp + val MessageFormatVersionProp = "message.format.version" + val MessageTimestampTypeProp = "message.timestamp.type" + val MessageTimestampDifferenceMaxMsProp = "message.timestamp.difference.max.ms" val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log" val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled" diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 4bdd308e2c03e..ab1d7825e1832 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -48,8 +48,8 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for `$topic` because `$versionString` " + s"is not compatible with Kafka inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`") Some(LogConfig.MessageFormatVersionProp) - } - else None + } else + None } val logs = logManager.logsByTopicPartition.filterKeys(_.topic == topic).values.toBuffer diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d13c87229f84c..1afdc27cc5fc2 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -95,12 +95,12 @@ object Defaults { val LogFlushOffsetCheckpointIntervalMs = 60000 val LogPreAllocateEnable = false // lazy val as `InterBrokerProtocolVersion` is defined later - lazy val MessageFormatVersion = InterBrokerProtocolVersion + lazy val LogMessageFormatVersion = InterBrokerProtocolVersion + val LogMessageTimestampType = "CreateTime" + val LogMessageTimestampDifferenceMaxMs = Long.MaxValue val NumRecoveryThreadsPerDataDir = 1 val AutoCreateTopicsEnable = true val MinInSyncReplicas = 1 - val MessageTimestampType = "CreateTime" - val MessageTimestampDifferenceMaxMs = Long.MaxValue /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMs = RequestTimeoutMs @@ -255,12 +255,12 @@ object KafkaConfig { val LogFlushIntervalMsProp = "log.flush.interval.ms" val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms" val LogPreAllocateProp = "log.preallocate" - val MessageFormatVersionProp = "message.format.version" + val LogMessageFormatVersionProp = "log.message.format.version" + val LogMessageTimestampTypeProp = "log.message.timestamp.type" + val LogMessageTimestampDifferenceMaxMsProp = "log.message.timestamp.difference.max.ms" val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" val AutoCreateTopicsEnableProp = "auto.create.topics.enable" val MinInSyncReplicasProp = "min.insync.replicas" - val MessageTimestampTypeProp = "message.timestamp.type" - val MessageTimestampDifferenceMaxMsProp = "message.timestamp.difference.max.ms" /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms" val DefaultReplicationFactorProp = "default.replication.factor" @@ -607,9 +607,9 @@ object KafkaConfig { .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc) .define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc) .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc) - .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc) - .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, MessageTimestampTypeDoc) - .define(MessageTimestampDifferenceMaxMsProp, LONG, Defaults.MessageTimestampDifferenceMaxMs, atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc) + .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, MessageFormatVersionDoc) + .define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, MessageTimestampTypeDoc) + .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc) /** ********* Replication configuration ***********/ .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc) @@ -802,10 +802,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp) // We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0` // is passed, `0.10.0-IV0` may be picked) - val messageFormatVersionString = getString(KafkaConfig.MessageFormatVersionProp) - val messageFormatVersion = ApiVersion(messageFormatVersionString) - val messageTimestampType = TimestampType.forName(getString(KafkaConfig.MessageTimestampTypeProp)) - val messageTimestampDifferenceMaxMs = getLong(KafkaConfig.MessageTimestampDifferenceMaxMsProp) + val logMessageFormatVersionString = getString(KafkaConfig.LogMessageFormatVersionProp) + val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString) + val logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp)) + val logMessageTimestampDifferenceMaxMs = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp) /** ********* Replication configuration ***********/ val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) @@ -986,7 +986,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a subset of ${KafkaConfig.ListenersProp} protocols. " + s"Found ${advertisedListeners.keySet}. The valid options based on currently configured protocols are ${listeners.keySet}" ) - require(interBrokerProtocolVersion >= messageFormatVersion, - s"message.format.version $messageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString") + require(interBrokerProtocolVersion >= logMessageFormatVersion, + s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString") } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2203df92071c1..2f5441ac12115 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -75,9 +75,9 @@ object KafkaServer { logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType) logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable) logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable) - logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.messageFormatVersion.version) - logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.messageTimestampType.name) - logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.messageTimestampDifferenceMaxMs) + logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version) + logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name) + logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs) logProps } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index c5a0079013417..7524e6a6f8726 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -283,14 +283,14 @@ class KafkaConfigTest { props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0") // We need to set the message format version to make the configuration valid. - props.put(KafkaConfig.MessageFormatVersionProp, "0.8.2.0") + props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.0") val conf2 = KafkaConfig.fromProps(props) assertEquals(KAFKA_0_8_2, conf2.interBrokerProtocolVersion) // check that 0.8.2.0 is the same as 0.8.2.1 props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.1") // We need to set the message format version to make the configuration valid - props.put(KafkaConfig.MessageFormatVersionProp, "0.8.2.1") + props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.1") val conf3 = KafkaConfig.fromProps(props) assertEquals(KAFKA_0_8_2, conf3.interBrokerProtocolVersion) diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 5c2092c1af58b..8c86a7b5956af 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -18,21 +18,20 @@ package kafka.server import java.io.File +import java.util.{Properties, Random} + +import kafka.admin.AdminUtils +import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo} +import kafka.common.TopicAndPartition +import kafka.consumer.SimpleConsumer +import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec} +import kafka.utils.TestUtils._ import kafka.utils._ -import org.apache.kafka.common.protocol.Errors +import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.Errors import org.junit.Assert._ -import java.util.{Random, Properties} -import kafka.consumer.SimpleConsumer -import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} -import kafka.zk.ZooKeeperTestHarness -import kafka.admin.AdminUtils -import kafka.api.{ApiVersion, PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} -import kafka.utils.TestUtils._ -import kafka.common.TopicAndPartition -import org.junit.After -import org.junit.Before -import org.junit.Test +import org.junit.{After, Before, Test} class LogOffsetTest extends ZooKeeperTestHarness { val random = new Random() @@ -206,7 +205,6 @@ class LogOffsetTest extends ZooKeeperTestHarness { props.put("log.retention.check.interval.ms", (5*1000*60).toString) props.put("log.segment.bytes", logSize.toString) props.put("zookeeper.connect", zkConnect.toString) - props.put("message.format.version", "0.10.0") props } diff --git a/docs/upgrade.html b/docs/upgrade.html index 863a6face5d9d..15ea3ae045cf1 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -26,7 +26,7 @@

Upgrading from 0.8.x or 0.9.x to 0.10.
  1. Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 or 0.9.0.0). - We recommend that users set message.format.version=CURRENT_KAFKA_VERSION as well to avoid a performance regression + We recommend that users set log.message.format.version=CURRENT_KAFKA_VERSION as well to avoid a performance regression during upgrade. See potential performance impact during upgrade for the details.
  2. Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it.
  3. @@ -41,7 +41,7 @@

    Upgrading from 0.8.x or 0.9.x to 0.10.
    Potential performance impact during upgrade to 0.10.0.0

    The message format in 0.10.0 includes a new timestamp field and uses relative offsets for compressed messages. - The on disk message format can be configured through message.format.version in the server.properties file. + The on disk message format can be configured through log.message.format.version in the server.properties file. The default on-disk message format is 0.10.0. If a consumer client is on a version before 0.10.0.0, it only understands message formats before 0.10.0. In this case, the broker is able to convert messages from the 0.10.0 format to an earlier format before sending the response to the consumer on an older version. However, the broker can't use zero-copy transfer in this case. diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py index b2b1d05ba5316..8f30f1331ed23 100644 --- a/tests/kafkatest/services/kafka/config_property.py +++ b/tests/kafkatest/services/kafka/config_property.py @@ -40,7 +40,7 @@ ZOOKEEPER_CONNECT = "zookeeper.connect" ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms" INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version" -MESSAGE_FORMAT_VERSION = "message.format.version" +MESSAGE_FORMAT_VERSION = "log.message.format.version" diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py index bec4b3f025bd2..9926f11ee0e18 100644 --- a/tests/kafkatest/tests/upgrade_test.py +++ b/tests/kafkatest/tests/upgrade_test.py @@ -78,10 +78,10 @@ def test_upgrade(self, from_kafka_version, to_message_format_version, compressio - Start producer and consumer in the background - Perform two-phase rolling upgrade - First phase: upgrade brokers to 0.10 with inter.broker.protocol.version set to - from_kafka_version and message.format.version set to from_kafka_version + from_kafka_version and log.message.format.version set to from_kafka_version - Second phase: remove inter.broker.protocol.version config with rolling bounce; if - to_message_format_version is set to 0.9, set message.format.version to - to_message_format_version, otherwise remove message.format.version config + to_message_format_version is set to 0.9, set log.message.format.version to + to_message_format_version, otherwise remove log.message.format.version config - Finally, validate that every message acked by the producer was consumed by the consumer """ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, From d83640a8d3ba65bd5d3a4a889539c6e688be208b Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 14 Mar 2016 18:48:32 -0700 Subject: [PATCH 2/2] Addressed Gwen's comments. --- core/src/main/scala/kafka/server/KafkaConfig.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1afdc27cc5fc2..8d14edd17f844 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -23,6 +23,7 @@ import kafka.api.ApiVersion import kafka.cluster.EndPoint import kafka.consumer.ConsumerConfig import kafka.coordinator.OffsetConfig +import kafka.log.LogConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs @@ -184,6 +185,8 @@ object Defaults { object KafkaConfig { + private val LogConfigPrefix = "log." + def main(args: Array[String]) { System.out.println(configDef.toHtmlTable) } @@ -255,9 +258,9 @@ object KafkaConfig { val LogFlushIntervalMsProp = "log.flush.interval.ms" val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms" val LogPreAllocateProp = "log.preallocate" - val LogMessageFormatVersionProp = "log.message.format.version" - val LogMessageTimestampTypeProp = "log.message.timestamp.type" - val LogMessageTimestampDifferenceMaxMsProp = "log.message.timestamp.difference.max.ms" + val LogMessageFormatVersionProp = LogConfigPrefix + LogConfig.MessageFormatVersionProp + val LogMessageTimestampTypeProp = LogConfigPrefix + LogConfig.MessageTimestampTypeProp + val LogMessageTimestampDifferenceMaxMsProp = LogConfigPrefix + LogConfig.MessageTimestampDifferenceMaxMsProp val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" val AutoCreateTopicsEnableProp = "auto.create.topics.enable" val MinInSyncReplicasProp = "min.insync.replicas"