Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.kafka.common.es;

import java.util.concurrent.atomic.AtomicBoolean;

public final class ElasticStreamSwitch {
private static AtomicBoolean switchEnabled = new AtomicBoolean(false);

public static void setSwitch(boolean enabled) {
switchEnabled.set(enabled);
}
public static boolean isEnabled() {
return switchEnabled.get();
}
}
2 changes: 2 additions & 0 deletions config/kraft/broker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ log.retention.check.interval.ms=300000
# offsets.topic.replication.factor=1

############################# Settings for elastic stream #############################
# enable store data in elastic stream layer
elasticstream.enable=true

# The Elastic Stream endpoint, ex. es://hostname1:port1,hostname2:port2,hostname3:port3.
# You could also PoC launch it in memory mode with endpoint like memory:// or redis mode with redis://.
Expand Down
2 changes: 2 additions & 0 deletions config/kraft/controller.properties
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,7 @@ log.retention.check.interval.ms=300000


############################# Settings for elastic stream #############################
# enable store data in elastic stream layer
elasticstream.enable=true

create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy
2 changes: 2 additions & 0 deletions config/kraft/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ log.retention.check.interval.ms=300000

create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy

# enable store data in elastic stream layer
elasticstream.enable=true
# The Elastic Stream endpoint, ex. es://hostname1:port1,hostname2:port2,hostname3:port3.
# You could also PoC launch it in memory mode with endpoint like memory:// or redis mode with redis://.
# Note that in memory mode, this Kafka node can not work in a cluster.
Expand Down
47 changes: 24 additions & 23 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -765,31 +765,32 @@ class Partition(val topicPartition: TopicPartition,
)

// elastic stream inject start
// only create log when partition is leader
// try {
// createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId)
// } catch {
// case e: ZooKeeperClientException =>
// stateChangeLogger.error(s"A ZooKeeper client exception has occurred. makeFollower will be skipping the " +
// s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
// return false
// }
//
// val followerLog = localLogOrException
// elastic stream inject end
val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
if (!ElasticLogManager.enabled()) {
// only create log when partition is leader
try {
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId)
} catch {
case e: ZooKeeperClientException =>
stateChangeLogger.error(s"A ZooKeeper client exception has occurred. makeFollower will be skipping the " +
s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
return false
}

val followerLog = localLogOrException

if (isNewLeaderEpoch) {
val leaderEpochEndOffset = followerLog.logEndOffset
stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
s"offset $leaderEpochEndOffset with partition epoch ${partitionState.partitionEpoch} and " +
s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionState.leader}. " +
s"Previous leader epoch was $leaderEpoch.")
} else {
stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " +
s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.")
}
}

// elastic stream inject start
// if (isNewLeaderEpoch) {
// val leaderEpochEndOffset = followerLog.logEndOffset
// stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
// s"offset $leaderEpochEndOffset with partition epoch ${partitionState.partitionEpoch} and " +
// s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionState.leader}. " +
// s"Previous leader epoch was $leaderEpoch.")
// } else {
// stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " +
// s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.")
// }
// elastic stream inject end

leaderReplicaIdOpt = Option(partitionState.leader)
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ class LocalLog(@volatile private var _dir: File,
val fetchSize = fetchInfo.records.sizeInBytes
val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
// TODO: modify it don't need fetch upper bound
val upperBoundOffset = upperBoundOffsetOpt match {
case Some(x) => x
case None => segment.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ object LogConfig {

private[log] val ServerDefaultHeaderName = "Server Default Property"

val configsWithNoServerDefaults: Set[String] = Set(RemoteLogStorageEnableProp, LocalLogRetentionMsProp, LocalLogRetentionBytesProp)
val configsWithNoServerDefaults: Set[String] = Set(RemoteLogStorageEnableProp, LocalLogRetentionMsProp, LocalLogRetentionBytesProp, ReplicationFactorProp)

// Package private for testing
private[log] class LogConfigDef(base: ConfigDef) extends ConfigDef(base) {
Expand Down Expand Up @@ -401,7 +401,7 @@ object LogConfig {

// elastic stream inject start
logConfigDef
.define(ReplicationFactorProp, INT, 1, HIGH, ReplicationFactorDoc)
.define(ReplicationFactorProp, INT, 1, atLeast(1), HIGH, ReplicationFactorDoc)
// elastic stream inject end

logConfigDef
Expand Down
50 changes: 23 additions & 27 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.log

import kafka.log.LogConfig.MessageFormatVersion
import kafka.log.es.ElasticLogManager

import java.io._
import java.nio.file.Files
Expand All @@ -37,7 +38,6 @@ import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import kafka.utils.Implicits._
import org.apache.kafka.common.internals.Topic

import java.util.Properties
import org.apache.kafka.server.common.MetadataVersion
Expand Down Expand Up @@ -392,16 +392,18 @@ class LogManager(logDirs: Seq[File],
// elastic stream inject start
var logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
logDir.isDirectory && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
try{
logsToLoad.foreach(logDir => {
warn(s"Unexpected partition directory $logDir. It may be due to an unclean shutdown.")
Utils.delete(logDir)
})
} catch {
if (ElasticLogManager.enabled()) {
try {
logsToLoad.foreach(logDir => {
warn(s"Unexpected partition directory $logDir. It may be due to an unclean shutdown.")
Utils.delete(logDir)
})
} catch {
case e: IOException =>
warn(s"Error occurred while cleaning $logDirAbsolutePath", e)
warn(s"Error occurred while cleaning $logDirAbsolutePath", e)
}
logsToLoad = Array()
}
logsToLoad = Array()
// elastic stream inject end

numTotalLogs += logsToLoad.length
Expand Down Expand Up @@ -618,19 +620,20 @@ class LogManager(logDirs: Seq[File],
if (waitForAllToComplete(dirJobs,
e => warn(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}"))) {
// elastic stream inject start
// val logs = logsInDir(localLogsByDir, dir)

// No need for updating recovery points, updating log start offsets or writing clean shutdown marker Since they have all been done in log.close()
if (ElasticLogManager.enabled()) return
val logs = logsInDir(localLogsByDir, dir)

// update the last flush point
// debug(s"Updating recovery points at $dir")
// checkpointRecoveryOffsetsInDir(dir, logs)
//
// debug(s"Updating log start offsets at $dir")
// checkpointLogStartOffsetsInDir(dir, logs)
//
// // mark that the shutdown was clean by creating marker file
// debug(s"Writing clean shutdown marker at $dir")
// CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this)
debug(s"Updating recovery points at $dir")
checkpointRecoveryOffsetsInDir(dir, logs)

debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir, logs)

// mark that the shutdown was clean by creating marker file
debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this)
// elastic stream inject end
}
}
Expand Down Expand Up @@ -1411,11 +1414,4 @@ object LogManager {
interBrokerProtocolVersion = config.interBrokerProtocolVersion)
}

// elastic stream inject start
def isClusterMetaPath(dirName: String): Boolean = {
// FIXME: check file path topic part
dirName.contains(Topic.CLUSTER_METADATA_TOPIC_NAME)
}
// elastic stream inject end

}
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ object LogSegment {

def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false,
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
if (!isClusterMetaLogSegment(dir) && ElasticLogManager.INSTANCE.isDefined) {
if (!isClusterMetaLogSegment(dir) && ElasticLogManager.enabled()) {
return ElasticLogManager.newSegment(dir2TopicPartition(dir), baseOffset, time, fileSuffix)
}
val maxIndexSize = config.maxIndexSize
Expand Down Expand Up @@ -771,7 +771,6 @@ object LogSegment {
}

def dir2TopicPartition(dir: File): TopicPartition = {
// TODO: impl, reuse LocalLog#parseTopicPartitionName
LocalLog.parseTopicPartitionName(dir)
}
// elastic stream inject end
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
partitionMetadataFile = Some(new PartitionMetadataFile(partitionMetadata, logDirFailureChannel))
}

private def maybeFlushMetadataFile(): Unit = {
protected def maybeFlushMetadataFile(): Unit = {
partitionMetadataFile.foreach(_.maybeFlush())
}

Expand Down Expand Up @@ -692,7 +692,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* The memory mapped buffer for index files of this log will be left open until the log is deleted.
*/
def close(): Unit = {
info("Closing log")
lock synchronized {
maybeFlushMetadataFile()
localLog.checkIfMemoryMappedBufferClosed()
Expand All @@ -703,8 +702,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// (the clean shutdown file is written after the logs are all closed).
producerStateManager.takeSnapshot()
}
// flush all inflight data/index
flush(true)
localLog.close()
}
}
Expand Down Expand Up @@ -1836,7 +1833,7 @@ object UnifiedLog extends Logging {
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)

// elastic stream inject start
if (!isClusterMetaLogSegment(dir)) {
if (!isClusterMetaLogSegment(dir) && ElasticLogManager.enabled()) {
return applyElasticUnifiedLog(topicPartition, dir, config, scheduler, brokerTopicStats, time,
maxTransactionTimeoutMs, producerStateManagerConfig, producerIdExpirationCheckIntervalMs, logDirFailureChannel,
topicId, leaderEpoch)
Expand Down
23 changes: 14 additions & 9 deletions core/src/main/scala/kafka/log/es/AbstractStreamIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ import kafka.log.AbstractIndex.{debug, error}
import kafka.log.{Index, IndexEntry, IndexSearchType}
import kafka.utils.CoreUtils.inLock
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem}
import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, Utils}

import java.io.{File, RandomAccessFile}
import java.nio.MappedByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.Files
import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent.locks.{Lock, ReentrantLock}

/**
* Implementation ref. AbstractIndex
*/
abstract class AbstractStreamIndex(_file: File, val streamSliceSupplier: StreamSliceSupplier, val baseOffset: Long, val maxIndexSize: Int = -1) extends Index {
abstract class AbstractStreamIndex(@volatile private var _file: File, val streamSliceSupplier: StreamSliceSupplier, val baseOffset: Long, val maxIndexSize: Int = -1) extends Index {

var stream: ElasticStreamSlice = streamSliceSupplier.get()

Expand Down Expand Up @@ -77,7 +77,6 @@ abstract class AbstractStreamIndex(_file: File, val streamSliceSupplier: StreamS
*/
def isFull: Boolean = _entries >= _maxEntries

// TODO: check
def file: File = _file

def maxEntries: Int = _maxEntries
Expand All @@ -86,8 +85,9 @@ abstract class AbstractStreamIndex(_file: File, val streamSliceSupplier: StreamS

def length: Long = adjustedMaxIndexSize

// TODO:
def updateParentDir(parentDir: File): Unit = {}
def updateParentDir(parentDir: File): Unit = {
_file = new File(parentDir, file.getName)
}

/**
* Note that stream index actually does not need to resize. Here we only change the maxEntries in memory to be
Expand All @@ -108,10 +108,15 @@ abstract class AbstractStreamIndex(_file: File, val streamSliceSupplier: StreamS
}
}

// TODO:
def renameTo(f: File): Unit = {}
def renameTo(f: File): Unit = {
try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
catch {
case _: NoSuchFileException if !file.exists => ()
}
finally _file = f
}

// TODO:
// Deleting index is actually implemented in ElasticLogSegment.deleteIfExists. We implement it here for tests.
def deleteIfExists(): Boolean = {
close()
true
Expand Down
Loading