Skip to content

Commit

Permalink
KAFKA-5344; set message.timestamp.difference.max.ms back to Long.MaxV…
Browse files Browse the repository at this point in the history
…alue

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3163 from becketqin/KAFKA-5344
  • Loading branch information
becketqin authored and ijuma committed May 30, 2017
1 parent 6f5930d commit 6b03497
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 15 deletions.
16 changes: 4 additions & 12 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Expand Up @@ -106,7 +106,7 @@ object Defaults {
// lazy val as `InterBrokerProtocolVersion` is defined later
lazy val LogMessageFormatVersion = InterBrokerProtocolVersion
val LogMessageTimestampType = "CreateTime"
val LogMessageTimestampDifferenceMaxMs = LogRetentionHours * 60 * 60 * 1000L
val LogMessageTimestampDifferenceMaxMs = Long.MaxValue
val NumRecoveryThreadsPerDataDir = 1
val AutoCreateTopicsEnable = true
val MinInSyncReplicas = 1
Expand Down Expand Up @@ -514,8 +514,7 @@ object KafkaConfig {
val LogMessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " +
"a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected " +
"if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." +
"The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling. For " +
"this reason, the default is the value of log.retention.ms."
"The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling."
val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " +
Expand Down Expand Up @@ -747,7 +746,7 @@ object KafkaConfig {
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
.define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc)
.define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
.define(LogMessageTimestampDifferenceMaxMsProp, LONG, null, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)

/** ********* Replication configuration ***********/
Expand Down Expand Up @@ -959,7 +958,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val logMessageFormatVersionString = getString(KafkaConfig.LogMessageFormatVersionProp)
val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
val logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
val logMessageTimestampDifferenceMaxMs = getMessageTimestampDifferenceMaxMs
val logMessageTimestampDifferenceMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)

/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
Expand Down Expand Up @@ -1086,13 +1085,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
millis
}

private def getMessageTimestampDifferenceMaxMs: Long = {
Option(getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)) match {
case Some(value) => value
case None => getLogRetentionTimeMillis
}
}

private def getMap(propName: String, propValue: String): Map[String, String] = {
try {
CoreUtils.parseCsvMap(propValue)
Expand Down
Expand Up @@ -712,7 +712,6 @@ class KafkaConfigTest {
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
assertEquals(config.logRetentionTimeMillis, config.logMessageTimestampDifferenceMaxMs)
assertEquals(123L, config.logFlushIntervalMs)
assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)
Expand Down
2 changes: 0 additions & 2 deletions docs/upgrade.html
Expand Up @@ -60,8 +60,6 @@ <h5><a id="upgrade_1100_notable" href="#upgrade_1100_notable">Notable changes in
<li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal
auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this
replication factor requirement.</li>
<li>By default <code>message.timestamp.difference.max.ms</code> is the same as <code>retention.ms</code> instead of
<code>Long.MAX_VALUE</code>.</li>
<li>The broker configuration <code>max.message.bytes</code> now applies to the total size of a batch of messages.
Previously the setting applied to batches of compressed messages, or to non-compressed messages individually. In practice,
the change is minor since a message batch may consist of only a single message, so the limitation on the size of
Expand Down

0 comments on commit 6b03497

Please sign in to comment.