Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14307; Controller time-based snapshots #12761

Merged
merged 14 commits into from Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Expand Up @@ -201,6 +201,7 @@ class ControllerServer(
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
TimeUnit.MILLISECONDS)).
setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
setMaxSnapshotIntervalMs(config.metadataMaxSnapshotIntervalMs).
jsancio marked this conversation as resolved.
Show resolved Hide resolved
setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
setMaxIdleIntervalNs(maxIdleIntervalNs).
setMetrics(controllerMetrics).
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Expand Up @@ -81,6 +81,7 @@ object Defaults {
val BrokerHeartbeatIntervalMs = 2000
val BrokerSessionTimeoutMs = 9000
val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024
val MetadataMaxSnapshotIntervalMs = TimeUnit.HOURS.toMillis(1);
jsancio marked this conversation as resolved.
Show resolved Hide resolved
val MetadataMaxIdleIntervalMs = 500

/** KRaft mode configs */
Expand Down Expand Up @@ -400,6 +401,7 @@ object KafkaConfig {
val NodeIdProp = "node.id"
val MetadataLogDirProp = "metadata.log.dir"
val MetadataSnapshotMaxNewRecordBytesProp = "metadata.log.max.record.bytes.between.snapshots"
val MetadataMaxSnapshotIntervalMsProp = "metadata.log.max.snapshot.interval.ms"
val ControllerListenerNamesProp = "controller.listener.names"
val SaslMechanismControllerProtocolProp = "sasl.mechanism.controller.protocol"
val MetadataLogSegmentMinBytesProp = "metadata.log.segment.min.bytes"
Expand Down Expand Up @@ -725,7 +727,12 @@ object KafkaConfig {
"This is required configuration when running in KRaft mode."
val MetadataLogDirDoc = "This configuration determines where we put the metadata log for clusters in KRaft mode. " +
"If it is not set, the metadata log is placed in the first log directory from log.dirs."
val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of bytes in the log between the latest snapshot and the high-watermark needed before generating a new snapshot."
val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of bytes in the log between the latest " +
"snapshot and the high-watermark needed before generating a new snapshot. The default value is " +
Defaults.MetadataSnapshotMaxNewRecordBytes
val MetadataMaxSnapshotIntervalMsDoc = "This is the maximum number of milliseconds to wait to generate a snapshot " +
jsancio marked this conversation as resolved.
Show resolved Hide resolved
"if there are records in the log that are not included in the lastest snapshot. A value of zero disables time based " +
s"snapshot generation. The default value is ${Defaults.MetadataMaxSnapshotIntervalMs}."
val MetadataMaxIdleIntervalMsDoc = "This configuration controls how often the active " +
"controller should write no-op records to the metadata partition. If the value is 0, no-op records " +
s"are not appended to the metadata partition. The default value is ${Defaults.MetadataMaxIdleIntervalMs}";
Expand Down Expand Up @@ -1157,6 +1164,7 @@ object KafkaConfig {
* KRaft mode configs.
*/
.define(MetadataSnapshotMaxNewRecordBytesProp, LONG, Defaults.MetadataSnapshotMaxNewRecordBytes, atLeast(1), HIGH, MetadataSnapshotMaxNewRecordBytesDoc)
.define(MetadataMaxSnapshotIntervalMsProp, LONG, Defaults.MetadataMaxSnapshotIntervalMs, atLeast(0), HIGH, MetadataMaxSnapshotIntervalMsDoc)
jsancio marked this conversation as resolved.
Show resolved Hide resolved

/*
* KRaft mode private configs. Note that these configs are defined as internal. We will make them public in the 3.0.0 release.
Expand Down Expand Up @@ -1697,6 +1705,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami

/************* Metadata Configuration ***********/
val metadataSnapshotMaxNewRecordBytes = getLong(KafkaConfig.MetadataSnapshotMaxNewRecordBytesProp)
val metadataMaxSnapshotIntervalMs = getLong(KafkaConfig.MetadataMaxSnapshotIntervalMsProp)
val metadataMaxIdleIntervalNs: Option[Long] = {
val value = TimeUnit.NANOSECONDS.convert(getInt(KafkaConfig.MetadataMaxIdleIntervalMsProp).toLong, TimeUnit.MILLISECONDS)
if (value > 0) Some(value) else None
Expand Down
Expand Up @@ -17,19 +17,19 @@
package kafka.server.metadata

import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CompletableFuture, TimeUnit}
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.image.writer.{ImageWriterOptions, RecordListWriter}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.util.SnapshotReason
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.snapshot.SnapshotReader

import java.util.concurrent.atomic.AtomicBoolean
import scala.compat.java8.OptionConverters._


object BrokerMetadataListener {
Expand Down Expand Up @@ -153,25 +153,27 @@ class BrokerMetadataListener(
}

private def shouldSnapshot(): Set[SnapshotReason] = {
val metadataVersionHasChanged = metadataVersionChanged()
val maxBytesHaveExceeded = (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots)

if (maxBytesHaveExceeded && metadataVersionHasChanged) {
Set(SnapshotReason.MetadataVersionChanged, SnapshotReason.MaxBytesExceeded)
} else if (maxBytesHaveExceeded) {
Set(SnapshotReason.MaxBytesExceeded)
} else if (metadataVersionHasChanged) {
Set(SnapshotReason.MetadataVersionChanged)
val maybeMetadataVersionChanged = metadataVersionChanged.toSet

if (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) {
maybeMetadataVersionChanged + SnapshotReason.maxBytesExceeded(_bytesSinceLastSnapshot, maxBytesBetweenSnapshots)
} else {
Set()
maybeMetadataVersionChanged
}
}

private def metadataVersionChanged(): Boolean = {
private def metadataVersionChanged: Option[SnapshotReason] = {
// The _publisher is empty before starting publishing, and we won't compute feature delta
// until we starting publishing
_publisher.nonEmpty && Option(_delta.featuresDelta()).exists { featuresDelta =>
featuresDelta.metadataVersionChange().isPresent
if (_publisher.nonEmpty) {
Option(_delta.featuresDelta()).flatMap { featuresDelta =>
featuresDelta
.metadataVersionChange()
.map(SnapshotReason.metadataVersionChanged)
.asScala
}
} else {
None
}
}

Expand Down Expand Up @@ -306,9 +308,8 @@ class BrokerMetadataListener(
_publisher = Some(publisher)
log.info(s"Starting to publish metadata events at offset $highestMetadataOffset.")
try {
if (metadataVersionChanged()) {
maybeStartSnapshot(Set(SnapshotReason.MetadataVersionChanged))
}
// Generate a snapshot if the metadata version changed
metadataVersionChanged.foreach(reason => maybeStartSnapshot(Set(reason)))
publish(publisher)
future.complete(null)
} catch {
Expand Down
Expand Up @@ -18,13 +18,14 @@ package kafka.server.metadata

import java.util.concurrent.RejectedExecutionException
import kafka.utils.Logging
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.image.writer.{ImageWriterOptions, RaftSnapshotWriter}
import org.apache.kafka.metadata.util.SnapshotReason
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.snapshot.SnapshotWriter
import scala.jdk.CollectionConverters._

trait SnapshotWriterBuilder {
def build(committedOffset: Long,
Expand Down Expand Up @@ -62,9 +63,13 @@ class BrokerMetadataSnapshotter(
*/
val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse(""))

override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage, snapshotReasons: Set[SnapshotReason]): Boolean = synchronized {
override def maybeStartSnapshot(
lastContainedLogTime: Long,
image: MetadataImage,
snapshotReasons: Set[SnapshotReason]
): Boolean = synchronized {
if (_currentSnapshotOffset != -1) {
info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " +
info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch} because " +
s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}")
false
} else {
Expand All @@ -74,9 +79,10 @@ class BrokerMetadataSnapshotter(
lastContainedLogTime
)
if (writer.nonEmpty) {
_currentSnapshotOffset = image.highestOffsetAndEpoch().offset
_currentSnapshotOffset = image.highestOffsetAndEpoch.offset

info(s"Creating a new snapshot at offset ${_currentSnapshotOffset} because, ${snapshotReasons.mkString(" and ")}")
val snapshotReasonsMessage = SnapshotReason.stringFromReasons(snapshotReasons.asJava)
info(s"Creating a new snapshot at ${image.highestOffsetAndEpoch} because: $snapshotReasonsMessage")
eventQueue.append(new CreateSnapshotEvent(image, writer.get))
true
} else {
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Expand Up @@ -1616,4 +1616,23 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props)
assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
}

@Test
def testMetadataMaxSnapshotInterval(): Unit = {
val validValue = 100
val props = new Properties()
props.putAll(kraftProps())
props.setProperty(KafkaConfig.MetadataMaxSnapshotIntervalMsProp, validValue.toString)

val config = KafkaConfig.fromProps(props)
assertEquals(validValue, config.metadataMaxSnapshotIntervalMs)

props.setProperty(KafkaConfig.MetadataMaxSnapshotIntervalMsProp, "-1")
val errorMessage = assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage

assertEquals(
"Invalid value -1 for configuration metadata.log.max.snapshot.interval.ms: Value must be at least 0",
errorMessage
)
}
}
Expand Up @@ -95,29 +95,10 @@ class BrokerMetadataSnapshotterTest {
def testCreateSnapshot(): Unit = {
val writerBuilder = new MockSnapshotWriterBuilder()
val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, writerBuilder)

try {
val blockingEvent = new BlockingEvent()
val reasons = Set(SnapshotReason.UnknownReason)

snapshotter.eventQueue.append(blockingEvent)
assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1, reasons))
assertFalse(snapshotter.maybeStartSnapshot(11000L, MetadataImageTest.IMAGE2, reasons))
blockingEvent.latch.countDown()
assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
} finally {
snapshotter.close()
}
}

@Test
def testCreateSnapshotMultipleReasons(): Unit = {
val writerBuilder = new MockSnapshotWriterBuilder()
val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, writerBuilder)

try {
val blockingEvent = new BlockingEvent()
val reasons = Set(SnapshotReason.MaxBytesExceeded, SnapshotReason.MetadataVersionChanged)
val reasons = Set(SnapshotReason.UNKNOWN)

snapshotter.eventQueue.append(blockingEvent)
assertTrue(snapshotter.maybeStartSnapshot(10000L, MetadataImageTest.IMAGE1, reasons))
Expand Down