diff --git a/clients/src/main/java/org/apache/kafka/common/es/ElasticStreamSwitch.java b/clients/src/main/java/org/apache/kafka/common/es/ElasticStreamSwitch.java new file mode 100644 index 0000000000..ac522266c9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/es/ElasticStreamSwitch.java @@ -0,0 +1,14 @@ +package org.apache.kafka.common.es; + +import java.util.concurrent.atomic.AtomicBoolean; + +public final class ElasticStreamSwitch { + private static AtomicBoolean switchEnabled = new AtomicBoolean(false); + + public static void setSwitch(boolean enabled) { + switchEnabled.set(enabled); + } + public static boolean isEnabled() { + return switchEnabled.get(); + } +} diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties index e1c11385e2..c2f8921187 100644 --- a/config/kraft/broker.properties +++ b/config/kraft/broker.properties @@ -138,6 +138,8 @@ log.retention.check.interval.ms=300000 # offsets.topic.replication.factor=1 ############################# Settings for elastic stream ############################# +# enable store data in elastic stream layer +elasticstream.enable=true # The Elastic Stream endpoint, ex. es://hostname1:port1,hostname2:port2,hostname3:port3. # You could also PoC launch it in memory mode with endpoint like memory:// or redis mode with redis://. diff --git a/config/kraft/controller.properties b/config/kraft/controller.properties index 882fc1d37b..b31780d9f3 100644 --- a/config/kraft/controller.properties +++ b/config/kraft/controller.properties @@ -129,5 +129,7 @@ log.retention.check.interval.ms=300000 ############################# Settings for elastic stream ############################# +# enable store data in elastic stream layer +elasticstream.enable=true create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy diff --git a/config/kraft/server.properties b/config/kraft/server.properties index 2647eae5f3..d255cfbb97 100644 --- a/config/kraft/server.properties +++ b/config/kraft/server.properties @@ -144,6 +144,8 @@ log.retention.check.interval.ms=300000 create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy +# enable store data in elastic stream layer +elasticstream.enable=true # The Elastic Stream endpoint, ex. es://hostname1:port1,hostname2:port2,hostname3:port3. # You could also PoC launch it in memory mode with endpoint like memory:// or redis mode with redis://. # Note that in memory mode, this Kafka node can not work in a cluster. diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 0c529102ce..d1390ddc92 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -765,31 +765,32 @@ class Partition(val topicPartition: TopicPartition, ) // elastic stream inject start - // only create log when partition is leader -// try { -// createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId) -// } catch { -// case e: ZooKeeperClientException => -// stateChangeLogger.error(s"A ZooKeeper client exception has occurred. makeFollower will be skipping the " + -// s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e) -// return false -// } -// -// val followerLog = localLogOrException - // elastic stream inject end val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch + if (!ElasticLogManager.enabled()) { + // only create log when partition is leader + try { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId) + } catch { + case e: ZooKeeperClientException => + stateChangeLogger.error(s"A ZooKeeper client exception has occurred. makeFollower will be skipping the " + + s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e) + return false + } + + val followerLog = localLogOrException + + if (isNewLeaderEpoch) { + val leaderEpochEndOffset = followerLog.logEndOffset + stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " + + s"offset $leaderEpochEndOffset with partition epoch ${partitionState.partitionEpoch} and " + + s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionState.leader}. " + + s"Previous leader epoch was $leaderEpoch.") + } else { + stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " + + s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.") + } + } - // elastic stream inject start -// if (isNewLeaderEpoch) { -// val leaderEpochEndOffset = followerLog.logEndOffset -// stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " + -// s"offset $leaderEpochEndOffset with partition epoch ${partitionState.partitionEpoch} and " + -// s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionState.leader}. " + -// s"Previous leader epoch was $leaderEpoch.") -// } else { -// stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " + -// s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.") -// } // elastic stream inject end leaderReplicaIdOpt = Option(partitionState.leader) diff --git a/core/src/main/scala/kafka/log/LocalLog.scala b/core/src/main/scala/kafka/log/LocalLog.scala index d9a4bf7efa..11e8aff100 100644 --- a/core/src/main/scala/kafka/log/LocalLog.scala +++ b/core/src/main/scala/kafka/log/LocalLog.scala @@ -458,7 +458,6 @@ class LocalLog(@volatile private var _dir: File, val fetchSize = fetchInfo.records.sizeInBytes val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, fetchInfo.fetchOffsetMetadata.relativePositionInSegment) - // TODO: modify it don't need fetch upper bound val upperBoundOffset = upperBoundOffsetOpt match { case Some(x) => x case None => segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse { diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 78cc26c774..722677cc5b 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -279,7 +279,7 @@ object LogConfig { private[log] val ServerDefaultHeaderName = "Server Default Property" - val configsWithNoServerDefaults: Set[String] = Set(RemoteLogStorageEnableProp, LocalLogRetentionMsProp, LocalLogRetentionBytesProp) + val configsWithNoServerDefaults: Set[String] = Set(RemoteLogStorageEnableProp, LocalLogRetentionMsProp, LocalLogRetentionBytesProp, ReplicationFactorProp) // Package private for testing private[log] class LogConfigDef(base: ConfigDef) extends ConfigDef(base) { @@ -401,7 +401,7 @@ object LogConfig { // elastic stream inject start logConfigDef - .define(ReplicationFactorProp, INT, 1, HIGH, ReplicationFactorDoc) + .define(ReplicationFactorProp, INT, 1, atLeast(1), HIGH, ReplicationFactorDoc) // elastic stream inject end logConfigDef diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 3c2b23cf6c..9ffcfc2fa8 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -18,6 +18,7 @@ package kafka.log import kafka.log.LogConfig.MessageFormatVersion +import kafka.log.es.ElasticLogManager import java.io._ import java.nio.file.Files @@ -37,7 +38,6 @@ import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} import kafka.utils.Implicits._ -import org.apache.kafka.common.internals.Topic import java.util.Properties import org.apache.kafka.server.common.MetadataVersion @@ -392,16 +392,18 @@ class LogManager(logDirs: Seq[File], // elastic stream inject start var logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir => logDir.isDirectory && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) - try{ - logsToLoad.foreach(logDir => { - warn(s"Unexpected partition directory $logDir. It may be due to an unclean shutdown.") - Utils.delete(logDir) - }) - } catch { + if (ElasticLogManager.enabled()) { + try { + logsToLoad.foreach(logDir => { + warn(s"Unexpected partition directory $logDir. It may be due to an unclean shutdown.") + Utils.delete(logDir) + }) + } catch { case e: IOException => - warn(s"Error occurred while cleaning $logDirAbsolutePath", e) + warn(s"Error occurred while cleaning $logDirAbsolutePath", e) + } + logsToLoad = Array() } - logsToLoad = Array() // elastic stream inject end numTotalLogs += logsToLoad.length @@ -618,19 +620,20 @@ class LogManager(logDirs: Seq[File], if (waitForAllToComplete(dirJobs, e => warn(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}"))) { // elastic stream inject start -// val logs = logsInDir(localLogsByDir, dir) - // No need for updating recovery points, updating log start offsets or writing clean shutdown marker Since they have all been done in log.close() + if (ElasticLogManager.enabled()) return + val logs = logsInDir(localLogsByDir, dir) + // update the last flush point -// debug(s"Updating recovery points at $dir") -// checkpointRecoveryOffsetsInDir(dir, logs) -// -// debug(s"Updating log start offsets at $dir") -// checkpointLogStartOffsetsInDir(dir, logs) -// -// // mark that the shutdown was clean by creating marker file -// debug(s"Writing clean shutdown marker at $dir") -// CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this) + debug(s"Updating recovery points at $dir") + checkpointRecoveryOffsetsInDir(dir, logs) + + debug(s"Updating log start offsets at $dir") + checkpointLogStartOffsetsInDir(dir, logs) + + // mark that the shutdown was clean by creating marker file + debug(s"Writing clean shutdown marker at $dir") + CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this) // elastic stream inject end } } @@ -1411,11 +1414,4 @@ object LogManager { interBrokerProtocolVersion = config.interBrokerProtocolVersion) } - // elastic stream inject start - def isClusterMetaPath(dirName: String): Boolean = { - // FIXME: check file path topic part - dirName.contains(Topic.CLUSTER_METADATA_TOPIC_NAME) - } - // elastic stream inject end - } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 793d51bc16..dc6a771a5f 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -742,7 +742,7 @@ object LogSegment { def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = { - if (!isClusterMetaLogSegment(dir) && ElasticLogManager.INSTANCE.isDefined) { + if (!isClusterMetaLogSegment(dir) && ElasticLogManager.enabled()) { return ElasticLogManager.newSegment(dir2TopicPartition(dir), baseOffset, time, fileSuffix) } val maxIndexSize = config.maxIndexSize @@ -771,7 +771,6 @@ object LogSegment { } def dir2TopicPartition(dir: File): TopicPartition = { - // TODO: impl, reuse LocalLog#parseTopicPartitionName LocalLog.parseTopicPartitionName(dir) } // elastic stream inject end diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 13522d0bc3..933013982f 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -583,7 +583,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata, logDirFailureChannel)) } - private def maybeFlushMetadataFile(): Unit = { + protected def maybeFlushMetadataFile(): Unit = { partitionMetadataFile.foreach(_.maybeFlush()) } @@ -692,7 +692,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, * The memory mapped buffer for index files of this log will be left open until the log is deleted. */ def close(): Unit = { - info("Closing log") lock synchronized { maybeFlushMetadataFile() localLog.checkIfMemoryMappedBufferClosed() @@ -703,8 +702,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, // (the clean shutdown file is written after the logs are all closed). producerStateManager.takeSnapshot() } - // flush all inflight data/index - flush(true) localLog.close() } } @@ -1836,7 +1833,7 @@ object UnifiedLog extends Logging { val topicPartition = UnifiedLog.parseTopicPartitionName(dir) // elastic stream inject start - if (!isClusterMetaLogSegment(dir)) { + if (!isClusterMetaLogSegment(dir) && ElasticLogManager.enabled()) { return applyElasticUnifiedLog(topicPartition, dir, config, scheduler, brokerTopicStats, time, maxTransactionTimeoutMs, producerStateManagerConfig, producerIdExpirationCheckIntervalMs, logDirFailureChannel, topicId, leaderEpoch) diff --git a/core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala b/core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala index 847ee8d240..e714aac92b 100644 --- a/core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala +++ b/core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala @@ -23,18 +23,18 @@ import kafka.log.AbstractIndex.{debug, error} import kafka.log.{Index, IndexEntry, IndexSearchType} import kafka.utils.CoreUtils.inLock import kafka.utils.{CoreUtils, Logging} -import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem} +import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, Utils} import java.io.{File, RandomAccessFile} import java.nio.MappedByteBuffer import java.nio.channels.FileChannel -import java.nio.file.Files +import java.nio.file.{Files, NoSuchFileException} import java.util.concurrent.locks.{Lock, ReentrantLock} /** * Implementation ref. AbstractIndex */ -abstract class AbstractStreamIndex(_file: File, val streamSliceSupplier: StreamSliceSupplier, val baseOffset: Long, val maxIndexSize: Int = -1) extends Index { +abstract class AbstractStreamIndex(@volatile private var _file: File, val streamSliceSupplier: StreamSliceSupplier, val baseOffset: Long, val maxIndexSize: Int = -1) extends Index { var stream: ElasticStreamSlice = streamSliceSupplier.get() @@ -77,7 +77,6 @@ abstract class AbstractStreamIndex(_file: File, val streamSliceSupplier: StreamS */ def isFull: Boolean = _entries >= _maxEntries - // TODO: check def file: File = _file def maxEntries: Int = _maxEntries @@ -86,8 +85,9 @@ abstract class AbstractStreamIndex(_file: File, val streamSliceSupplier: StreamS def length: Long = adjustedMaxIndexSize - // TODO: - def updateParentDir(parentDir: File): Unit = {} + def updateParentDir(parentDir: File): Unit = { + _file = new File(parentDir, file.getName) + } /** * Note that stream index actually does not need to resize. Here we only change the maxEntries in memory to be @@ -108,10 +108,15 @@ abstract class AbstractStreamIndex(_file: File, val streamSliceSupplier: StreamS } } - // TODO: - def renameTo(f: File): Unit = {} + def renameTo(f: File): Unit = { + try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) + catch { + case _: NoSuchFileException if !file.exists => () + } + finally _file = f + } - // TODO: + // Deleting index is actually implemented in ElasticLogSegment.deleteIfExists. We implement it here for tests. def deleteIfExists(): Boolean = { close() true diff --git a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java index 1c229ca12b..acd1e72aa7 100644 --- a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java +++ b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java @@ -28,7 +28,9 @@ import com.automq.elasticstream.client.api.StreamClient; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.function.BiConsumer; import org.apache.kafka.common.errors.es.SlowFetchHintException; import org.apache.kafka.common.utils.ThreadUtils; import org.slf4j.Logger; @@ -43,11 +45,6 @@ @SuppressWarnings("uncheck") public class AlwaysSuccessClient implements Client { private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSuccessClient.class); - - // cause of rust frontend is single thread, so we use thread executor free callback overhead. - // caution: it should call another stream method in one method callback to avoid deadlock, if these method callback - // executor is same. - // TODO: change some api to sync call to avoid deadlock. private static final ScheduledExecutorService STREAM_MANAGER_RETRY_SCHEDULER = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("stream-manager-retry-%d", true)); private static final ExecutorService STREAM_MANAGER_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(1, @@ -58,6 +55,8 @@ public class AlwaysSuccessClient implements Client { ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true)); private static final ExecutorService FETCH_CALLBACK_EXECUTORS = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true)); + private static final ScheduledExecutorService DELAY_FETCH_SCHEDULER = Executors.newScheduledThreadPool(1, + ThreadUtils.createThreadFactory("fetch-delayer-%d", true)); private final StreamClient streamClient; private final KVClient kvClient; @@ -128,8 +127,8 @@ private void openStream0(long streamId, OpenStreamOptions options, CompletableFu static class StreamImpl implements Stream { private final Stream stream; private volatile boolean closed = false; - private final Map slowFetchingOffsetMap = new ConcurrentHashMap<>(); - private static final long SLOW_FETCH_TIMEOUT_MILLIS = 10; + private final Map> holdUpFetchingFutureMap = new ConcurrentHashMap<>(); + private final long SLOW_FETCH_TIMEOUT_MILLIS = 10; public StreamImpl(Stream stream) { this.stream = stream; @@ -164,50 +163,62 @@ public CompletableFuture append(RecordBatch recordBatch) { return cf; } + /** + * Get a new CompletableFuture with + * a {@link SlowFetchHintException} if not otherwise completed + * before the given timeout. + * @param id the id of rawFuture in holdUpFetchingFutureMap + * @param rawFuture the raw future + * @param timeout how long to wait before completing exceptionally + * with a SlowFetchHintException, in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the + * {@code timeout} parameter + * @return a new CompletableFuture with completed results of the rawFuture if the raw future is done before timeout, + * otherwise a new CompletableFuture with a {@link SlowFetchHintException} + */ + private CompletableFuture timeoutAndStoreFuture(String id, CompletableFuture rawFuture, long timeout, + TimeUnit unit) { + if (unit == null) { + throw new NullPointerException(); + } - @Override - public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint) { - String slowFetchKey = startOffset + "-" + endOffset; - CompletableFuture cf = new CompletableFuture<>(); - // If it is recorded as slowFetching, then skip timeout check. - if (slowFetchingOffsetMap.containsKey(slowFetchKey)) { - fetch0(startOffset, endOffset, maxBytesHint, cf, slowFetchKey); - } else { - // Try to have a quick stream. If fetching is timeout, then complete with SlowFetchHintException. - stream.fetch(startOffset, endOffset, maxBytesHint) - .orTimeout(SLOW_FETCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) - .whenComplete((rst, ex) -> FutureUtil.suppress(() -> { - if (ex != null) { - if (closed) { - cf.completeExceptionally(new IllegalStateException("stream already closed")); - } else if (ex instanceof TimeoutException) { - LOGGER.info("Fetch stream[{}] [{},{}) timeout for {} ms, retry with slow fetching", streamId(), startOffset, endOffset, SLOW_FETCH_TIMEOUT_MILLIS); - cf.completeExceptionally(new SlowFetchHintException("fetch data too slowly, retry with slow fetching")); - slowFetchingOffsetMap.put(slowFetchKey, true); - } else { - cf.completeExceptionally(ex); - } + if (!rawFuture.isDone()) { + final CompletableFuture cf = new CompletableFuture<>(); + rawFuture.whenComplete(new Canceller(Delayer.delay(() -> { + if (rawFuture == null) { + return; + } + if (rawFuture.isDone()) { + rawFuture.thenAccept(cf::complete); } else { - slowFetchingOffsetMap.remove(slowFetchKey); - cf.complete(rst); + holdUpFetchingFutureMap.putIfAbsent(id, rawFuture); + cf.completeExceptionally(new SlowFetchHintException()); } - }, LOGGER)); + }, + timeout, unit))); + return cf; } + return rawFuture; + } + + @Override + public CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint) { + CompletableFuture cf = new CompletableFuture<>(); + fetch0(startOffset, endOffset, maxBytesHint, cf); return cf; } - private void fetch0(long startOffset, long endOffset, int maxBytesHint, CompletableFuture cf, String slowFetchKey) { + private void fetch0(long startOffset, long endOffset, int maxBytesHint, CompletableFuture cf) { stream.fetch(startOffset, endOffset, maxBytesHint).whenCompleteAsync((rst, ex) -> { FutureUtil.suppress(() -> { if (ex != null) { LOGGER.error("Fetch stream[{}] [{},{}) fail, retry later", streamId(), startOffset, endOffset); if (!closed) { - FETCH_RETRY_SCHEDULER.schedule(() -> fetch0(startOffset, endOffset, maxBytesHint, cf, slowFetchKey), 3, TimeUnit.SECONDS); + FETCH_RETRY_SCHEDULER.schedule(() -> fetch0(startOffset, endOffset, maxBytesHint, cf), 3, TimeUnit.SECONDS); } else { cf.completeExceptionally(new IllegalStateException("stream already closed")); } } else { - slowFetchingOffsetMap.remove(slowFetchKey); cf.complete(rst); } }, LOGGER); @@ -260,4 +271,24 @@ public CompletableFuture destroy() { // return cf; } } + + static final class Delayer { + static ScheduledFuture delay(Runnable command, long delay, + TimeUnit unit) { + return DELAY_FETCH_SCHEDULER.schedule(command, delay, unit); + } + } + + static final class Canceller implements BiConsumer { + final Future f; + + Canceller(Future f) { + this.f = f; + } + + public void accept(Object ignore, Throwable ex) { + if (ex == null && f != null && !f.isDone()) + f.cancel(false); + } + } } diff --git a/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java b/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java index 935f0c1624..269d10a068 100644 --- a/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java +++ b/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java @@ -108,11 +108,6 @@ public SliceRange sliceRange() { } } - @Override - public void destroy() { - // TODO: update ElasticLogMeta and persist meta - } - @Override public void seal() { this.sealed = true; @@ -125,10 +120,9 @@ public Stream stream() { @Override public String toString() { - // TODO: stream info return "DefaultElasticStreamSlice{" + "startOffsetInStream=" + startOffsetInStream + - ", stream=" + stream + + ", stream=[id=" + stream.streamId() + ", startOffset=" + stream.startOffset() + ", nextOffset=" + stream.nextOffset() + "]" + ", nextOffset=" + nextOffset + ", sealed=" + sealed + '}'; diff --git a/core/src/main/scala/kafka/log/es/ElasticLog.scala b/core/src/main/scala/kafka/log/es/ElasticLog.scala index 8a87d8fe7b..d6b9afb17f 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLog.scala +++ b/core/src/main/scala/kafka/log/es/ElasticLog.scala @@ -383,7 +383,7 @@ object ElasticLog extends Logging { def persistProducerSnapshotMeta(meta: ElasticPartitionProducerSnapshotMeta): Unit = { val key = MetaStream.PRODUCER_SNAPSHOT_KEY_PREFIX + meta.getOffset if (meta.isEmpty) { - // TODO: delete the snapshot + // The real deletion of this snapshot is done in metaStream's compaction. producerSnapshotsMeta.remove(meta.getOffset) } else { producerSnapshotsMeta.add(meta.getOffset) diff --git a/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java index 7996ff7c6a..73c8d6f81e 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java @@ -59,7 +59,7 @@ public class ElasticLogFileRecords { protected final AtomicInteger size; protected final Iterable batches; private final ElasticStreamSlice streamSegment; - // logic offset instead of physical offset + // This is The base offset of the corresponding segment. private final long baseOffset; private final AtomicLong nextOffset; private final AtomicLong committedOffset; @@ -71,8 +71,13 @@ public class ElasticLogFileRecords { public ElasticLogFileRecords(ElasticStreamSlice streamSegment, long baseOffset, int size) { this.baseOffset = baseOffset; this.streamSegment = streamSegment; - // TODO: init size when recover, all is size matter anymore? long nextOffset = streamSegment.nextOffset(); + // Note that size is generally used to + // 1) show the physical size of a segment. In these cases, size is refered to decide whether to roll a new + // segment, or calculate the cleaned size in a cleaning task, etc. If size is not correctly recorded for any + // reason, the worst thing will be just a bigger segment than configured. + // 2) show whether this segment is empty, i.e., size == 0. + // Therefore, it is fine to use the nextOffset as a backoff value. this.size = new AtomicInteger(size == 0 ? (int) nextOffset : size); this.nextOffset = new AtomicLong(baseOffset + nextOffset); this.committedOffset = new AtomicLong(baseOffset + nextOffset); @@ -105,6 +110,7 @@ public Records read(long startOffset, long maxOffset, int maxSize) throws SlowFe private Records readAll0(long startOffset, long maxOffset, int maxSize) throws SlowFetchHintException { int readSize = 0; + // calculate the relative offset in the segment, which may start from 0. long nextFetchOffset = startOffset - baseOffset; long endOffset = Utils.min(this.committedOffset.get(), maxOffset) - baseOffset; List fetchResults = new LinkedList<>(); @@ -199,9 +205,26 @@ private Optional maybeLeaderEpoch(int leaderEpoch) { Optional.empty() : Optional.of(leaderEpoch); } + /** + * Return the largest timestamp of the messages after a given offset + * @param startOffset The starting offset. + * @return The largest timestamp of the messages after the given position. + */ public FileRecords.TimestampAndOffset largestTimestampAfter(long startOffset) { - // TODO: implement - return new FileRecords.TimestampAndOffset(0, 0, Optional.empty()); + long maxTimestamp = RecordBatch.NO_TIMESTAMP; + long offsetOfMaxTimestamp = -1L; + int leaderEpochOfMaxTimestamp = RecordBatch.NO_PARTITION_LEADER_EPOCH; + + for (RecordBatch batch : batchesFrom(startOffset)) { + long timestamp = batch.maxTimestamp(); + if (timestamp > maxTimestamp) { + maxTimestamp = timestamp; + offsetOfMaxTimestamp = batch.lastOffset(); + leaderEpochOfMaxTimestamp = batch.partitionLeaderEpoch(); + } + } + return new FileRecords.TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp, + maybeLeaderEpoch(leaderEpochOfMaxTimestamp)); } public ElasticStreamSlice streamSegment() { @@ -333,6 +356,7 @@ public RecordBatch nextBatch() throws IOException { public static class BatchIteratorRecordsAdaptor extends AbstractRecords { private final ElasticLogFileRecords elasticLogFileRecords; + // This is the offset in Kafka layer. private final long startOffset; private final long maxOffset; private final int fetchSize; @@ -391,11 +415,11 @@ private void ensureAllLoaded() { if (sizeInBytes != -1) { return; } - // TODO: direct fetch and composite to a large memoryRecords + Records records = elasticLogFileRecords.readAll0(startOffset, maxOffset, fetchSize); sizeInBytes = 0; CompositeByteBuf allRecordsBuf = Unpooled.compositeBuffer(); RecordBatch lastBatch = null; - for (RecordBatch batch : batches()) { + for (RecordBatch batch : records.batches()) { sizeInBytes += batch.sizeInBytes(); ByteBuffer buffer = ((DefaultRecordBatch) batch).buffer().duplicate(); allRecordsBuf.addComponent(true, Unpooled.wrappedBuffer(buffer)); diff --git a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala index f8d94ded05..48ab36b8f7 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala +++ b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala @@ -43,7 +43,6 @@ class ElasticLogManager(val client: Client) { maxTransactionTimeoutMs: Int, producerStateManagerConfig: ProducerStateManagerConfig, leaderEpoch: Long): ElasticLog = { - // TODO: add log close hook, remove closed elastic log elasticLogs.computeIfAbsent(topicPartition, _ => ElasticLog(client, NAMESPACE, dir, config, scheduler, time, topicPartition, logDirFailureChannel, numRemainingSegments, maxTransactionTimeoutMs, producerStateManagerConfig, leaderEpoch)) } @@ -88,10 +87,13 @@ object ElasticLogManager { var INSTANCE: Option[ElasticLogManager] = None var NAMESPACE = "" - def init(config: KafkaConfig, clusterId: String): Unit = { + def init(config: KafkaConfig, clusterId: String): Boolean = { + if (!config.elasticStreamEnabled) { + return false + } val endpoint = config.elasticStreamEndpoint if (endpoint == null) { - throw new IllegalArgumentException(s"Unsupported elastic stream endpoint: $endpoint") + return false } if (endpoint.startsWith(ES_ENDPOINT_PREFIX)) { val kvEndpoint = config.elasticStreamKvEndpoint; @@ -109,7 +111,7 @@ object ElasticLogManager { } else if (endpoint.startsWith(REDIS_ENDPOINT_PREFIX)) { INSTANCE = Some(new ElasticLogManager(new ElasticRedisClient(endpoint.substring(REDIS_ENDPOINT_PREFIX.length)))) } else { - throw new IllegalArgumentException(s"Unsupported elastic stream endpoint: $endpoint") + return false } val namespace = config.elasticStreamNamespace @@ -118,8 +120,11 @@ object ElasticLogManager { } else { namespace } + true } + def enabled(): Boolean = INSTANCE.isDefined + def removeLog(topicPartition: TopicPartition): Unit = { INSTANCE.get.removeLog(topicPartition) } diff --git a/core/src/main/scala/kafka/log/es/ElasticLogMeta.java b/core/src/main/scala/kafka/log/es/ElasticLogMeta.java index 8f25b4e76c..eba0b844a3 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogMeta.java +++ b/core/src/main/scala/kafka/log/es/ElasticLogMeta.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Map; -// TODO: save meta to stream periodically. save meta when broker shutdown. - /** * logical meta data for a Kafka topicPartition. */ diff --git a/core/src/main/scala/kafka/log/es/ElasticLogSegment.scala b/core/src/main/scala/kafka/log/es/ElasticLogSegment.scala index cb34cffa43..fbf86af117 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogSegment.scala +++ b/core/src/main/scala/kafka/log/es/ElasticLogSegment.scala @@ -63,7 +63,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, } def sanityCheck(timeIndexFileNewlyCreated: Boolean): Unit = { - // TODO: check LogLoader logic + // do nothing since it will not be called. } private val created = time.milliseconds @@ -270,7 +270,8 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, } override def updateParentDir(dir: File): Unit = { - // TODO: check + timeIdx.updateParentDir(dir) + txnIndex.updateParentDir(dir) } // Do nothing here. @@ -330,7 +331,9 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, * Close this log segment */ override def close(): Unit = { - // TODO: timestamp insert + if (_maxTimestampAndOffsetSoFar != TimestampOffset.Unknown) + CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, + skipFullCheck = true), this) CoreUtils.swallow(timeIdx.close(), this) CoreUtils.swallow(_log.close(), this) CoreUtils.swallow(txnIndex.close(), this) diff --git a/core/src/main/scala/kafka/log/es/ElasticLogStreamManager.java b/core/src/main/scala/kafka/log/es/ElasticLogStreamManager.java index edfa54b359..579cd81b17 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogStreamManager.java +++ b/core/src/main/scala/kafka/log/es/ElasticLogStreamManager.java @@ -79,9 +79,8 @@ public void setListener(ElasticStreamEventListener listener) { this.outerListener = listener; } - public void close() { - // TODO: close stream recycle resource. - } + // no operation. + public void close() {} class LazyStreamStreamEventListener implements ElasticStreamEventListener { @Override diff --git a/core/src/main/scala/kafka/log/es/ElasticPartitionProducerSnapshotMeta.java b/core/src/main/scala/kafka/log/es/ElasticPartitionProducerSnapshotMeta.java index 2100812bcf..b327fcd0e0 100644 --- a/core/src/main/scala/kafka/log/es/ElasticPartitionProducerSnapshotMeta.java +++ b/core/src/main/scala/kafka/log/es/ElasticPartitionProducerSnapshotMeta.java @@ -24,7 +24,6 @@ * refers to one snapshot file */ public class ElasticPartitionProducerSnapshotMeta { - // TODO: lazy load /** * raw data of the snapshot */ diff --git a/core/src/main/scala/kafka/log/es/ElasticProducerStateManager.scala b/core/src/main/scala/kafka/log/es/ElasticProducerStateManager.scala index 7e59993fb5..06aca2b7e3 100644 --- a/core/src/main/scala/kafka/log/es/ElasticProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/es/ElasticProducerStateManager.scala @@ -114,8 +114,8 @@ class ElasticProducerStateManager( } } - // TODO: check. maybe need to implement this method -// override def updateParentDir(parentDir: File): Unit = {} + // do nothing + override def updateParentDir(parentDir: File): Unit = {} override protected def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { deleteSnapshot(snapshotOffset) diff --git a/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java b/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java index ab4d6ba2d7..b2ad8f77ce 100644 --- a/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java +++ b/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java @@ -26,7 +26,7 @@ import org.apache.kafka.common.errors.es.SlowFetchHintException; /** - * Elastic stream slice is a slice from elastic stream, the offset of slice starts from 0. + * Elastic stream slice is a slice from elastic stream, the startOffset of a slice is 0. * In the same time, there is only one writable slice in a stream, and the writable slice is always the last slice. */ public interface ElasticStreamSlice { @@ -43,6 +43,7 @@ public interface ElasticStreamSlice { * Fetch record batch from stream slice. * * @param startOffset start offset. + * @param endOffset end offset. * @param maxBytesHint max fetch data size hint, the real return data size may be larger than maxBytesHint. * @return {@link FetchResult} */ @@ -70,11 +71,6 @@ default FetchResult fetch(long startOffset, long endOffset) throws SlowFetchHint */ SliceRange sliceRange(); - /** - * Destroy stream slice. - */ - void destroy(); - /** * Seal slice, forbid future append. */ diff --git a/core/src/main/scala/kafka/log/es/ElasticStreamSliceManager.java b/core/src/main/scala/kafka/log/es/ElasticStreamSliceManager.java index e324e9fbdf..15989469ba 100644 --- a/core/src/main/scala/kafka/log/es/ElasticStreamSliceManager.java +++ b/core/src/main/scala/kafka/log/es/ElasticStreamSliceManager.java @@ -57,8 +57,8 @@ public ElasticStreamSlice newSlice(String streamName) { * - when startOffset != NOOP_OFFSET, then load slice. */ public ElasticStreamSlice loadOrCreateSlice(String streamName, SliceRange sliceRange) { - check(sliceRange.start() >= Offsets.NOOP_OFFSET, "startOffset must be >= 0 or == NOOP_OFFSET -1"); - check(sliceRange.end() >= Offsets.NOOP_OFFSET, "endOffset must be >= 0 or == NOOP_OFFSET -1"); + check(sliceRange.start() >= Offsets.NOOP_OFFSET, "startOffset must be >= NOOP_OFFSET"); + check(sliceRange.end() >= Offsets.NOOP_OFFSET, "endOffset must be >= NOOP_OFFSET"); if (sliceRange.start() == Offsets.NOOP_OFFSET) { return newSlice(streamName); } diff --git a/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala b/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala index 83cbb2a6d9..af755e2e2e 100644 --- a/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala +++ b/core/src/main/scala/kafka/log/es/ElasticTimeIndex.scala @@ -25,11 +25,13 @@ import org.apache.kafka.common.record.RecordBatch import java.io.{File, IOException} import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture class ElasticTimeIndex(_file: File, streamSegmentSupplier: StreamSliceSupplier, baseOffset: Long, maxIndexSize: Int = -1) extends AbstractStreamIndex(_file, streamSegmentSupplier, baseOffset, maxIndexSize) with TimeIndex { @volatile private var _lastEntry = lastEntryFromIndexFile + @volatile private var lastAppend: CompletableFuture[_] = CompletableFuture.completedFuture(null) private var closed = false protected def entrySize: Int = 12 @@ -111,7 +113,7 @@ class ElasticTimeIndex(_file: File, streamSegmentSupplier: StreamSliceSupplier, val relatedOffset = relativeOffset(offset) buffer.putInt(relatedOffset) buffer.flip() - stream.append(RawPayloadRecordBatch.of(buffer)) + lastAppend = stream.append(RawPayloadRecordBatch.of(buffer)) // put time index to cache cache.putLong(_entries * entrySize, timestamp) @@ -141,7 +143,7 @@ class ElasticTimeIndex(_file: File, streamSegmentSupplier: StreamSliceSupplier, } def truncate(): Unit = { - //TODO: + throw new UnsupportedOperationException("truncate() is not supported in ElasticTimeIndex") } override def close(): Unit = { @@ -150,7 +152,7 @@ class ElasticTimeIndex(_file: File, streamSegmentSupplier: StreamSliceSupplier, } def truncateTo(offset: Long): Unit = { - // TODO: + throw new UnsupportedOperationException("truncateTo() is not supported in ElasticTimeIndex") } def sanityCheck(): Unit = { @@ -158,7 +160,7 @@ class ElasticTimeIndex(_file: File, streamSegmentSupplier: StreamSliceSupplier, } override def flush(): Unit = { - // TODO: wait all in-flight append complete + lastAppend.get() } def seal(): Unit = { diff --git a/core/src/main/scala/kafka/log/es/ElasticTransactionIndex.scala b/core/src/main/scala/kafka/log/es/ElasticTransactionIndex.scala index ada2ed15be..7805717012 100644 --- a/core/src/main/scala/kafka/log/es/ElasticTransactionIndex.scala +++ b/core/src/main/scala/kafka/log/es/ElasticTransactionIndex.scala @@ -23,13 +23,15 @@ import org.apache.kafka.common.KafkaException import java.io.{File, IOException} import java.nio.ByteBuffer +import java.util.concurrent.CompletableFuture import scala.collection.mutable -class ElasticTransactionIndex(_file: File, streamSliceSupplier: StreamSliceSupplier, startOffset: Long) +class ElasticTransactionIndex(@volatile private var _file: File, streamSliceSupplier: StreamSliceSupplier, startOffset: Long) extends TransactionIndex(startOffset, _file) { var stream: ElasticStreamSlice = streamSliceSupplier.get() + @volatile private var lastAppend: CompletableFuture[_] = CompletableFuture.completedFuture(null) private var closed = false override def append(abortedTxn: AbortedTxn): Unit = { @@ -41,23 +43,22 @@ class ElasticTransactionIndex(_file: File, streamSliceSupplier: StreamSliceSuppl s"${abortedTxn.lastOffset} is not greater than current last offset $offset of index ${file.getAbsolutePath}") } lastOffset = Some(abortedTxn.lastOffset) - stream.append(RawPayloadRecordBatch.of(abortedTxn.buffer.duplicate())) + lastAppend = stream.append(RawPayloadRecordBatch.of(abortedTxn.buffer.duplicate())) } override def flush(): Unit = { - // TODO: await all inflight + lastAppend.get() } override def file: File = new File("mock") - // TODO: override def updateParentDir(parentDir: File): Unit = { - throw new UnsupportedOperationException() + _file = new File(parentDir, file.getName) } + // Deleting index is actually implemented in ElasticLogSegment.deleteIfExists. We implement it here for tests. override def deleteIfExists(): Boolean = { close() - // TODO: delete true } @@ -67,7 +68,6 @@ class ElasticTransactionIndex(_file: File, streamSliceSupplier: StreamSliceSuppl } override def close(): Unit = { - // TODO: recycle resource closed = true } @@ -76,7 +76,6 @@ class ElasticTransactionIndex(_file: File, streamSliceSupplier: StreamSliceSuppl } override def truncateTo(offset: Long): Unit = { - // TODO: check throw new UnsupportedOperationException() } diff --git a/core/src/main/scala/kafka/log/es/ElasticUnifiedLog.scala b/core/src/main/scala/kafka/log/es/ElasticUnifiedLog.scala index ca077ae6cf..078a4fb521 100644 --- a/core/src/main/scala/kafka/log/es/ElasticUnifiedLog.scala +++ b/core/src/main/scala/kafka/log/es/ElasticUnifiedLog.scala @@ -74,7 +74,21 @@ class ElasticUnifiedLog(_logStartOffset: Long, } override def close(): Unit = { - super.close() + info("Closing log") + lock synchronized { + maybeFlushMetadataFile() + elasticLog.checkIfMemoryMappedBufferClosed() + producerExpireCheck.cancel(true) + maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") { + // We take a snapshot at the last written offset to hopefully avoid the need to scan the log + // after restarting and to ensure that we cannot inadvertently hit the upgrade optimization + // (the clean shutdown file is written after the logs are all closed). + producerStateManager.takeSnapshot() + } + // flush all inflight data/index + flush(true) + elasticLog.close() + } elasticLog.segments.clear() elasticLog.isMemoryMappedBufferClosed = true elasticLog.deleteEmptyDir() diff --git a/core/src/main/scala/kafka/log/es/LazyStream.java b/core/src/main/scala/kafka/log/es/LazyStream.java index ebe0470d77..e83fcd4464 100644 --- a/core/src/main/scala/kafka/log/es/LazyStream.java +++ b/core/src/main/scala/kafka/log/es/LazyStream.java @@ -30,11 +30,14 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Lazy stream, create stream when append record. */ public class LazyStream implements Stream { + private static final Logger LOGGER = LoggerFactory.getLogger(LazyStream.class); public static final long NOOP_STREAM_ID = -1L; private static final Stream NOOP_STREAM = new NoopStream(); private final String name; @@ -118,7 +121,7 @@ public void notifyListener(ElasticStreamMetaEvent event) { try { Optional.ofNullable(eventListener).ifPresent(listener -> listener.onEvent(inner.streamId(), event)); } catch (Throwable e) { - //TODO: log unexpected exception + LOGGER.error("got notify listener error", e); } } diff --git a/core/src/main/scala/kafka/log/es/MetaStream.java b/core/src/main/scala/kafka/log/es/MetaStream.java index d454363486..f409d68af3 100644 --- a/core/src/main/scala/kafka/log/es/MetaStream.java +++ b/core/src/main/scala/kafka/log/es/MetaStream.java @@ -22,6 +22,7 @@ import com.automq.elasticstream.client.api.RecordBatch; import com.automq.elasticstream.client.api.RecordBatchWithContext; import com.automq.elasticstream.client.api.Stream; +import com.fasterxml.jackson.core.JsonProcessingException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; diff --git a/core/src/main/scala/kafka/log/es/SeparateSlowAndQuickFetchHint.java b/core/src/main/scala/kafka/log/es/SeparateSlowAndQuickFetchHint.java new file mode 100644 index 0000000000..03b19cdee1 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/SeparateSlowAndQuickFetchHint.java @@ -0,0 +1,29 @@ +package kafka.log.es; + +import io.netty.util.concurrent.FastThreadLocal; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class is used to mark if it is needed to diff quick fetch from slow fetch in current thread. + * If marked, data should be fetched within a short time, otherwise, the request should be satisfied in a separated slow-fetch thread pool. + */ +public class SeparateSlowAndQuickFetchHint { + private static final FastThreadLocal MANUAL_RELEASE = new FastThreadLocal<>() { + @Override + protected AtomicBoolean initialValue() { + return new AtomicBoolean(false); + } + }; + + public static boolean isMarked() { + return MANUAL_RELEASE.get().get(); + } + + public static void mark() { + MANUAL_RELEASE.get().set(true); + } + + public static void reset() { + MANUAL_RELEASE.get().set(false); + } +} diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 8d4bb38241..7e66bb83fd 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -200,7 +200,13 @@ class BrokerServer( metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId) // elastic stream inject start - ElasticLogManager.init(config, clusterId) + if (config.elasticStreamEnabled) { + if (!ElasticLogManager.init(config, clusterId)) { + throw new UnsupportedOperationException("Elastic stream client failed to be configured. Please check your configuration.") + } + } else { + warn("Elastic stream is disabled. This node will store data locally.") + } // elastic stream inject end // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ad4b4972c9..b252b17916 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -24,7 +24,7 @@ import kafka.controller.ReplicaAssignment import kafka.coordinator.group._ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.log.AppendOrigin -import kafka.log.es.{ElasticLogManager, ReadManualReleaseHint} +import kafka.log.es.{ElasticLogManager, ReadManualReleaseHint, SeparateSlowAndQuickFetchHint} import kafka.message.ZStdCompressionCodec import kafka.network.RequestChannel import kafka.server.KafkaApis.{LAST_RECORD_TIMESTAMP, PRODUCE_ACK_TIMER, PRODUCE_CALLBACK_TIMER, PRODUCE_TIMER} @@ -747,26 +747,34 @@ class KafkaApis(val requestChannel: RequestChannel, val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId // elastic stream inject start - // The appending is done is a separate thread pool to avoid blocking io thread - appendingExecutors.submit(new Runnable { - override def run(): Unit = { - // call the replica manager to append messages to the replicas - replicaManager.appendRecords( - timeout = produceRequest.timeout.toLong, - requiredAcks = produceRequest.acks, - internalTopicsAllowed = internalTopicsAllowed, - origin = AppendOrigin.Client, - entriesPerPartition = authorizedRequestInfo, - requestLocal = requestLocal, - responseCallback = sendResponseCallback, - recordConversionStatsCallback = processingStatsCallback) - - // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; - // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log - produceRequest.clearPartitionRecords() - PRODUCE_TIMER.update(System.nanoTime() - startNanos) - } - }) + def doAppendRecords(): Unit = { + // call the replica manager to append messages to the replicas + replicaManager.appendRecords( + timeout = produceRequest.timeout.toLong, + requiredAcks = produceRequest.acks, + internalTopicsAllowed = internalTopicsAllowed, + origin = AppendOrigin.Client, + entriesPerPartition = authorizedRequestInfo, + requestLocal = requestLocal, + responseCallback = sendResponseCallback, + recordConversionStatsCallback = processingStatsCallback) + + // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; + // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log + produceRequest.clearPartitionRecords() + PRODUCE_TIMER.update(System.nanoTime() - startNanos) + } + + if (ElasticLogManager.enabled()) { + // The appending is done is a separate thread pool to avoid blocking io thread + appendingExecutors.submit(new Runnable { + override def run(): Unit = { + doAppendRecords() + } + }) + } else { + doAppendRecords() + } // elastic stream inject end } } @@ -1068,20 +1076,31 @@ class KafkaApis(val requestChannel: RequestChannel, ) // elastic stream inject start - // The fetching is done is a separate thread pool to avoid blocking io thread. - fetchingExecutors.submit(new Runnable { - override def run(): Unit = { - ReadManualReleaseHint.mark() - // call the replica manager to fetch messages from the local replica - replicaManager.fetchMessages( - params = params, - fetchInfos = interesting, - quota = replicationQuota(fetchRequest), - responseCallback = processResponseCallback, - ) - ReadManualReleaseHint.reset() - } - }) + def doFetchingRecords(): Unit = { + // call the replica manager to fetch messages from the local replica + replicaManager.fetchMessages( + params = params, + fetchInfos = interesting, + quota = replicationQuota(fetchRequest), + responseCallback = processResponseCallback, + ) + } + + if (ElasticLogManager.enabled()) { + // The fetching is done is a separate thread pool to avoid blocking io thread. + fetchingExecutors.submit(new Runnable { + override def run(): Unit = { + ReadManualReleaseHint.mark() + SeparateSlowAndQuickFetchHint.mark() + doFetchingRecords() + SeparateSlowAndQuickFetchHint.reset() + ReadManualReleaseHint.reset() + } + }) + } else { + doFetchingRecords() + } + // elastic stream inject end } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0d0ca18629..d732d583ff 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -658,10 +658,12 @@ object KafkaConfig { // elastic stream inject start /** ********* Elastic stream config *********/ + val ElasticStreamEnableProp = "elasticstream.enable" val ElasticStreamEndpointProp = "elasticstream.endpoint" val ElasticStreamKvEndpointProp = "elasticstream.kv.endpoint" val ElasticStreamNamespaceProp = "elasticstream.namespace" + val ElasticStreamEnableDoc = "Specifies whether to store events in elastic streams" val ElasticStreamEndpointDoc = "Specifies the Elastic Stream endpoint, ex. es://hostname1:port1,hostname2:port2,hostname3:port3.\n" + "You could also PoC launch it in memory mode with endpoint memory::// or redis mode with redis://." val ElasticStreamKvEndpointDoc = "Specifies the Elastic Stream KV endpoint, ex. es://hostname1:port1,hostname2:port2,hostname3:port3.\n" + @@ -1465,6 +1467,7 @@ object KafkaConfig { // elastic stream inject start /** ********* Elastic stream Configuration *********/ + .define(ElasticStreamEnableProp, BOOLEAN, false, HIGH, ElasticStreamEnableDoc) .define(ElasticStreamEndpointProp, STRING, null, HIGH, ElasticStreamEndpointDoc) .define(ElasticStreamKvEndpointProp, STRING, null, HIGH, ElasticStreamKvEndpointDoc) .define(ElasticStreamNamespaceProp, STRING, null, MEDIUM, ElasticStreamNamespaceDoc) @@ -2000,6 +2003,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami // elastic stream inject start /** ********* Elastic stream Configuration *********/ + val elasticStreamEnabled = getBoolean(KafkaConfig.ElasticStreamEnableProp) val elasticStreamEndpoint = getString(KafkaConfig.ElasticStreamEndpointProp) val elasticStreamKvEndpoint = getString(KafkaConfig.ElasticStreamKvEndpointProp) val elasticStreamNamespace = getString(KafkaConfig.ElasticStreamNamespaceProp) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6582c9b341..b4e9aab50e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -22,7 +22,7 @@ import kafka.cluster.{BrokerEndPoint, Partition} import kafka.common.RecordValidationException import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log._ -import kafka.log.es.ReadManualReleaseHint +import kafka.log.es.{ElasticLogManager, ReadManualReleaseHint} import kafka.metrics.KafkaMetricsGroup import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers @@ -465,13 +465,17 @@ class ReplicaManager(val config: KafkaConfig, if (allPartitions.remove(topicPartition, hostedPartition)) { maybeRemoveTopicMetrics(topicPartition.topic) // elastic stream inject start - if (logManager.cleaner != null) { - logManager.cleaner.abortCleaning(topicPartition) + if (ElasticLogManager.enabled()) { + if (logManager.cleaner != null) { + logManager.cleaner.abortCleaning(topicPartition) + } + // For elastic stream, partition leader alter is triggered by setting isr/replicas. + // When broker is not response for the partition, we need to close the partition + // instead of delete the partition. + hostedPartition.partition.close() + } else { + hostedPartition.partition.delete() } - // For elastic stream, partition leader alter is triggered by setting isr/replicas. - // When broker is not response for the partition, we need to close the partition - // instead of delete the partition. - hostedPartition.partition.close() // elastic stream inject end } @@ -489,7 +493,9 @@ class ReplicaManager(val config: KafkaConfig, if (partitionsToDelete.nonEmpty) { // Delete the logs and checkpoint. // elastic stream inject start -// logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, e)) + if (!ElasticLogManager.enabled()) { + logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, e)) + } // elastic stream inject end } errorMap @@ -1047,7 +1053,7 @@ class ReplicaManager(val config: KafkaConfig, logReadResults.foreach { case (topicIdPartition, logReadResult) => brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark() brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark() - if (logReadResult.exception.isDefined && logReadResult.exception.get.isInstanceOf[KafkaStorageException]) { + if (logReadResult.exception.isDefined && logReadResult.exception.get.isInstanceOf[SlowFetchHintException]) { containsSlowFetchHint = true } if (logReadResult.error != Errors.NONE) @@ -2137,20 +2143,30 @@ class ReplicaManager(val config: KafkaConfig, // create new partitions with the same names as the ones we are deleting here. if (!localChanges.deletes.isEmpty) { val deletes = localChanges.deletes.asScala.map(tp => (tp, true)).toMap - partitionOpenCloseExecutors.submit(new Runnable { - override def run(): Unit = { - stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).") - stopPartitions(deletes).forKeyValue { (topicPartition, e) => - if (e.isInstanceOf[KafkaStorageException]) { - stateChangeLogger.error(s"Unable to delete replica $topicPartition because " + - "the local replica for the partition is in an offline log directory") - } else { - stateChangeLogger.error(s"Unable to delete replica $topicPartition because " + - s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}") - } + + def doPartitionDeletion(): Unit = { + stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).") + stopPartitions(deletes).forKeyValue { (topicPartition, e) => + if (e.isInstanceOf[KafkaStorageException]) { + stateChangeLogger.error(s"Unable to delete replica $topicPartition because " + + "the local replica for the partition is in an offline log directory") + } else { + stateChangeLogger.error(s"Unable to delete replica $topicPartition because " + + s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}") } } - }) + } + + if (ElasticLogManager.enabled()) { + partitionOpenCloseExecutors.submit(new Runnable { + override def run(): Unit = { + doPartitionDeletion() + } + }) + } else { + doPartitionDeletion() + } + } // Handle partitions which we are now the leader or follower for. @@ -2158,11 +2174,20 @@ class ReplicaManager(val config: KafkaConfig, val lazyOffsetCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints) val changedPartitions = new mutable.HashSet[Partition] if (!localChanges.leaders.isEmpty) { - partitionOpenCloseExecutors.submit(new Runnable { - override def run(): Unit = { - applyLocalLeadersDelta(changedPartitions, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala) - } - }) + def doPartitionLeading(): Unit = { + applyLocalLeadersDelta(changedPartitions, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala) + } + + if (ElasticLogManager.enabled()) { + partitionOpenCloseExecutors.submit(new Runnable { + override def run(): Unit = { + doPartitionLeading() + } + }) + } else { + doPartitionLeading() + } + } if (!localChanges.followers.isEmpty) { applyLocalFollowersDelta(changedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.followers.asScala) diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 6a7cf8a830..220582fb3d 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -22,6 +22,7 @@ import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole} import kafka.server.Server.MetricsPrefix import kafka.server.metadata.BrokerServerMetrics import kafka.utils.{CoreUtils, Logging} +import org.apache.kafka.common.es.ElasticStreamSwitch import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time} import org.apache.kafka.controller.QuorumControllerMetrics @@ -97,6 +98,9 @@ class SharedServer( private var usedByController: Boolean = false val brokerConfig = new KafkaConfig(sharedServerConfig.props, false, None) val controllerConfig = new KafkaConfig(sharedServerConfig.props, false, None) + // elastic stream injection start + ElasticStreamSwitch.setSwitch(sharedServerConfig.elasticStreamEnabled) + // elastic stream injection end @volatile var metrics: Metrics = _metrics @volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _ @volatile var brokerMetrics: BrokerServerMetrics = _ diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala index 42235759b4..2b2a407aad 100644 --- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala +++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala @@ -17,7 +17,7 @@ package kafka.server.checkpoints import kafka.log.LogManager.{LogStartOffsetCheckpointFile, RecoveryPointCheckpointFile} -import kafka.log.es.{CleanerOffsetCheckpoint, ElasticCheckoutPointFileWithHandler, LogStartOffsetCheckpoint, RecoveryPointCheckpoint, ReplicationOffsetCheckpoint} +import kafka.log.es.{CleanerOffsetCheckpoint, ElasticCheckoutPointFileWithHandler, ElasticLogManager, LogStartOffsetCheckpoint, RawKafkaMeta, RecoveryPointCheckpoint, ReplicationOffsetCheckpoint} import kafka.server.LogDirFailureChannel import kafka.server.ReplicaManager.HighWatermarkFilename import kafka.server.epoch.EpochEntry @@ -34,11 +34,16 @@ object OffsetCheckpointFile { private[checkpoints] val CurrentVersion = 0 private val offsetCheckpointFile = "cleaner-offset-checkpoint" - private val moveToMetaMap = Map( - offsetCheckpointFile -> CleanerOffsetCheckpoint, - LogStartOffsetCheckpointFile -> LogStartOffsetCheckpoint, - RecoveryPointCheckpointFile -> RecoveryPointCheckpoint, - HighWatermarkFilename -> ReplicationOffsetCheckpoint) + private def moveToMetaMap = { + if (ElasticLogManager.enabled()) + Map( + offsetCheckpointFile -> CleanerOffsetCheckpoint, + LogStartOffsetCheckpointFile -> LogStartOffsetCheckpoint, + RecoveryPointCheckpointFile -> RecoveryPointCheckpoint, + HighWatermarkFilename -> ReplicationOffsetCheckpoint) + else + Map.empty[String, RawKafkaMeta] + } object Formatter extends EntryFormatter[(TopicPartition, Long)] { override def toString(entry: (TopicPartition, Long)): String = { diff --git a/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/CruiseControlMetricsReporterAutoCreateTopicTest.java b/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/CruiseControlMetricsReporterAutoCreateTopicTest.java index 2b5208764b..d3e8b1745d 100644 --- a/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/CruiseControlMetricsReporterAutoCreateTopicTest.java +++ b/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/CruiseControlMetricsReporterAutoCreateTopicTest.java @@ -29,12 +29,14 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; +@Tag("esUnit") public class CruiseControlMetricsReporterAutoCreateTopicTest extends CCKafkaClientsIntegrationTestHarness { protected static final String TOPIC = "CruiseControlMetricsReporterTest"; diff --git a/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java b/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java index 04f2d723fb..ba070047ae 100644 --- a/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java +++ b/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java @@ -37,6 +37,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.time.Duration; @@ -70,6 +71,7 @@ import static kafka.metrics.cruisecontrol.metricsreporter.metric.RawMetricType.TOPIC_PARTITION_BYTES_IN; import static kafka.metrics.cruisecontrol.metricsreporter.metric.RawMetricType.TOPIC_PARTITION_BYTES_OUT; +@Tag("esUnit") public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness { protected static final String TOPIC = "CruiseControlMetricsReporterTest"; diff --git a/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/metric/MetricSerdeTest.java b/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/metric/MetricSerdeTest.java index 64ac357bc9..2ea54457e4 100644 --- a/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/metric/MetricSerdeTest.java +++ b/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/metric/MetricSerdeTest.java @@ -21,10 +21,12 @@ package kafka.metrics.cruisecontrol.metricsreporter.metric; import kafka.metrics.cruisecontrol.metricsreporter.exception.UnknownVersionException; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +@Tag("esUnit") public class MetricSerdeTest { private static final long TIME = 123L; private static final int BROKER_ID = 0; diff --git a/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/utils/CCKafkaIntegrationTestHarness.java b/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/utils/CCKafkaIntegrationTestHarness.java index fd064e9e0d..cde2241292 100644 --- a/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/utils/CCKafkaIntegrationTestHarness.java +++ b/core/src/test/java/kafka/metrics/cruisecontrol/metricsreporter/utils/CCKafkaIntegrationTestHarness.java @@ -55,6 +55,7 @@ public void setUp() { int[] port = CCKafkaTestUtils.findLocalPorts(1); propOverrides.put(KafkaConfig.ListenersProp(), "EXTERNAL://" + HOST + ":" + port[0]); propOverrides.put(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), HOST + ":" + port[0]); + propOverrides.put(KafkaConfig.ElasticStreamEnableProp(), "true"); propOverrides.put(KafkaConfig.ElasticStreamEndpointProp(), "memory://"); for (Map.Entry entry : overridingProps().entrySet()) { diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 838c043ff8..b96aa8b3d9 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -90,6 +90,7 @@ class LogConfigTest { case LogConfig.RemoteLogStorageEnableProp => assertPropertyInvalid(name, "not_a_boolean") case LogConfig.LocalLogRetentionMsProp => assertPropertyInvalid(name, "not_a_number", "-3") case LogConfig.LocalLogRetentionBytesProp => assertPropertyInvalid(name, "not_a_number", "-3") + case LogConfig.ReplicationFactorProp => assertPropertyInvalid(name, "not_a_number", "0") case _ => assertPropertyInvalid(name, "not_a_number", "-1") }) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index f0ac47bad8..ad5c519c34 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -949,6 +949,12 @@ class KafkaConfigTest { case KafkaConfig.SaslServerMaxReceiveSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + // elastic stream inject start + case KafkaConfig.ElasticStreamEndpointProp => // ignore string + case KafkaConfig.ElasticStreamKvEndpointProp => // ignore string + case KafkaConfig.ElasticStreamNamespaceProp => // ignore string + // elastic stream inject end + // Raft Quorum Configs case RaftConfig.QUORUM_VOTERS_CONFIG => // ignore string case RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 37a85bda62..c45ec85932 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2015,7 +2015,7 @@ class ReplicaManagerTest { val topicPartitionObj = new TopicPartition(topic, topicPartition) val mockLogMgr: LogManager = mock(classOf[LogManager]) when(mockLogMgr.liveLogDirs).thenReturn(config.logDirs.map(new File(_).getAbsoluteFile)) - when(mockLogMgr.getOrCreateLog(ArgumentMatchers.eq(topicPartitionObj), ArgumentMatchers.eq(false), ArgumentMatchers.eq(false), any())).thenReturn(mockLog) + when(mockLogMgr.getOrCreateLog(ArgumentMatchers.eq(topicPartitionObj), ArgumentMatchers.eq(false), ArgumentMatchers.eq(false), any(), any())).thenReturn(mockLog) when(mockLogMgr.getLog(topicPartitionObj, isFuture = false)).thenReturn(Some(mockLog)) when(mockLogMgr.getLog(topicPartitionObj, isFuture = true)).thenReturn(None) val allLogs = new Pool[TopicPartition, UnifiedLog]() diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 066cc376f0..b4a0e0f23e 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -266,6 +266,7 @@ object TestUtils extends Logging { kvEndpoint: String = "", namespace: String = "__esk_test__"): Properties = { val props = createBrokerConfig(nodeId, "") + props.put(KafkaConfig.ElasticStreamEnableProp, true) props.put(KafkaConfig.ElasticStreamEndpointProp, endpoint) props.put(KafkaConfig.ElasticStreamKvEndpointProp, kvEndpoint) props.put(KafkaConfig.ElasticStreamNamespaceProp, namespace) diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java index ac87935c41..a99f1a8512 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java +++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java @@ -303,7 +303,6 @@ private void completeReassignmentIfNeeded() { } public Optional build() { - // TODO: 强制校验 + 外层逻辑适配,禁止多 ISR,多 REPLICA PartitionChangeRecord record = new PartitionChangeRecord(). setTopicId(topicId). setPartitionId(partitionId); diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 3567135516..f6d00529a6 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.es.ElasticStreamSwitch; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.AlterPartitionRequestData; import org.apache.kafka.common.message.AlterPartitionResponseData; @@ -602,16 +603,18 @@ public void replay(RemoveTopicRecord record) { Map> keyToOps = configChanges.computeIfAbsent(configResource, key -> new HashMap<>()); // First, we pass topic replication factor through log config. int replicationFactor = topic.replicationFactor() == -1 ? - defaultReplicationFactor : topic.replicationFactor(); - // MIN_IN_SYNC_REPLICAS should be forced to 1 since replication factor is 1 (see createTopic method). - keyToOps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, new AbstractMap.SimpleEntry<>(OpType.SET, String.valueOf(1))); - // We reuse the passed replicationFactor to config elastic stream settings. - keyToOps.put(TopicConfig.REPLICATION_FACTOR_CONFIG, new AbstractMap.SimpleEntry<>(OpType.SET, String.valueOf(replicationFactor))); - - // Then, we force replication factor to 1 here since "replicas" on broker layer can only be 1. - if (replicationFactor != 1) { - topic.setReplicationFactor((short) 1); - log.info("force replication factor to 1 for create topic {}, the real replication factor is decided by elastic stream", topic.name()); + defaultReplicationFactor : topic.replicationFactor(); + if (ElasticStreamSwitch.isEnabled()) { + // MIN_IN_SYNC_REPLICAS should be forced to 1 since replication factor is 1 (see createTopic method). + keyToOps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, new AbstractMap.SimpleEntry<>(OpType.SET, String.valueOf(1))); + // We reuse the passed replicationFactor to config elastic stream settings. + keyToOps.put(TopicConfig.REPLICATION_FACTOR_CONFIG, new AbstractMap.SimpleEntry<>(OpType.SET, String.valueOf(replicationFactor))); + + // Then, we force replication factor to 1 here since "replicas" on broker layer can only be 1. + if (replicationFactor != 1) { + topic.setReplicationFactor((short) 1); + log.info("force replication factor to 1 for create topic {}, the real replication factor is decided by elastic stream", topic.name()); + } } // elastic stream inject end @@ -1031,11 +1034,15 @@ ControllerResult alterPartition( } // elastic stream inject start - List newIsr = partitionData.newIsr(); - if (newIsr.size() != 1) { - throw new InvalidReplicaAssignmentException("elastic stream not support isr != 1"); + if (ElasticStreamSwitch.isEnabled()) { + List newIsr = partitionData.newIsr(); + if (newIsr.size() != 1) { + throw new InvalidReplicaAssignmentException("elastic stream not support isr != 1"); + } + builder.setTargetNode(newIsr.get(0)); + } else { + builder.setTargetIsr(partitionData.newIsr()); } - builder.setTargetNode(newIsr.get(0)); // elastic stream inject end builder.setTargetLeaderRecoveryState( @@ -1320,7 +1327,7 @@ void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List electLeaders(ElectLeadersRequestData request) { @@ -1645,7 +1652,9 @@ void createPartitions(CreatePartitionsTopic topic, // elastic stream inject start // Generally, validateManualPartitionAssignment has checked to some extent. We still check here for defensive programming. - maybeCheckCreatePartitionPolicy(new CreatePartitionPolicy.RequestMetadata(replicas, isr)); + if (ElasticStreamSwitch.isEnabled()) { + maybeCheckCreatePartitionPolicy(new CreatePartitionPolicy.RequestMetadata(replicas, isr)); + } // elastic stream inject end records.add(new ApiMessageAndVersion(new PartitionRecord(). @@ -1784,24 +1793,25 @@ void generateLeaderAndIsrUpdates(String context, // target ISR will be the same as the old one). // elastic stream inject start - // builder.setTargetIsr(Replicas.toList( - // Replicas.copyWithout(partition.isr, brokerToRemove))); - // TODO: change builder args set and build func logic => 目前逻辑符合预期,需要添加单测确保后续也一致 - if (brokerToAdd != -1) { - // new broker is unfenced(available), then the broker take no leader partition - builder.setTargetNode(brokerToAdd); - } else { - if (partitionLeaderSelector == null) { - partitionLeaderSelector = new RandomPartitionLeaderSelector(clusterControl.getActiveBrokers()); - } - partitionLeaderSelector + if (ElasticStreamSwitch.isEnabled()) { + if (brokerToAdd != -1) { + // new broker is unfenced(available), then the broker take no leader partition + builder.setTargetNode(brokerToAdd); + } else { + if (partitionLeaderSelector == null) { + partitionLeaderSelector = new RandomPartitionLeaderSelector(clusterControl.getActiveBrokers()); + } + partitionLeaderSelector .select(partition, br -> br.id() != brokerToRemove) .ifPresent(broker -> builder.setTargetNode(broker.id())); - } - - if (fencing) { - TopicPartition topicPartition = new TopicPartition(topic.name(), topicIdPart.partitionId()); - addPartitionToReElectTimeouts(topicPartition); + } + if (fencing) { + TopicPartition topicPartition = new TopicPartition(topic.name(), topicIdPart.partitionId()); + addPartitionToReElectTimeouts(topicPartition); + } + } else { + builder.setTargetIsr(Replicas.toList( + Replicas.copyWithout(partition.isr, brokerToRemove))); } // elastic stream inject end @@ -1881,8 +1891,10 @@ void alterPartitionReassignment(String topicName, Optional record; if (target.replicas() == null) { record = cancelPartitionReassignment(topicName, tp, part); - } else { + } else if (ElasticStreamSwitch.isEnabled()) { record = changePartitionReassignment(tp, part, target); + } else { + record = changePartitionReassignment0(tp, part, target); } record.ifPresent(records::add); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderESTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderESTest.java new file mode 100644 index 0000000000..57a6deb944 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderESTest.java @@ -0,0 +1,68 @@ +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.es.ElasticStreamSwitch; +import org.apache.kafka.metadata.LeaderRecoveryState; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Tag("esUnit") +public class PartitionChangeBuilderESTest { + @BeforeEach + public void setUp() { + ElasticStreamSwitch.setSwitch(true); + } + + @AfterEach + public void tearDown() { + ElasticStreamSwitch.setSwitch(false); + } + + @Test + public void testElectLeader() { + // elect the targetNode regardless of the election type + assertElectLeaderEquals(createFooBuilder().setElection(PartitionChangeBuilder.Election.PREFERRED).setTargetNode(100), 100, false); + assertElectLeaderEquals(createFooBuilder().setTargetNode(101), 101, false); + assertElectLeaderEquals(createFooBuilder().setElection(PartitionChangeBuilder.Election.UNCLEAN).setTargetNode(102), 102, false); + + // There should not be recovering state for leaders since unclean elections will never be touched. However, we + // still test these cases in case of odd situations. + assertElectLeaderEquals(createRecoveringFOOBuilder().setElection(PartitionChangeBuilder.Election.PREFERRED).setTargetNode(100), 100, false); + assertElectLeaderEquals(createRecoveringFOOBuilder().setTargetNode(101), 101, false); + assertElectLeaderEquals(createRecoveringFOOBuilder().setElection(PartitionChangeBuilder.Election.UNCLEAN).setTargetNode(102), 102, false); + } + + private final static PartitionRegistration FOO = new PartitionRegistration( + new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE, + 1, LeaderRecoveryState.RECOVERED, 100, 200); + + private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); + + private static PartitionChangeBuilder createFooBuilder() { + return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, true); + } + + private final static PartitionRegistration RECOVERING_FOO = new PartitionRegistration( + new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE, + 1, LeaderRecoveryState.RECOVERING, 100, 200); + + private final static Uuid RECOVERING_FOO_ID = Uuid.fromString("KbrrdcfiR-KC2CPSTHaJrh"); + + private static PartitionChangeBuilder createRecoveringFOOBuilder() { + return new PartitionChangeBuilder(RECOVERING_FOO, RECOVERING_FOO_ID, 0, r -> r != 3, true); + } + + private static void assertElectLeaderEquals(PartitionChangeBuilder builder, + int expectedNode, + boolean expectedUnclean) { + PartitionChangeBuilder.ElectionResult electionResult = builder.electLeader(); + assertEquals(expectedNode, electionResult.node); + assertEquals(expectedUnclean, electionResult.unclean); + } +}