diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java index 2bbd0e9bb44a..0e53c214d66e 100644 --- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java @@ -54,6 +54,7 @@ public class LogManagerBuilder { private LogDirFailureChannel logDirFailureChannel = null; private Time time = Time.SYSTEM; private boolean keepPartitionMetadataFile = true; + private boolean remoteStorageSystemEnable = false; public LogManagerBuilder setLogDirs(List logDirs) { this.logDirs = logDirs; @@ -145,6 +146,11 @@ public LogManagerBuilder setKeepPartitionMetadataFile(boolean keepPartitionMetad return this; } + public LogManagerBuilder setRemoteStorageSystemEnable(boolean remoteStorageSystemEnable) { + this.remoteStorageSystemEnable = remoteStorageSystemEnable; + return this; + } + public LogManager build() { if (logDirs == null) throw new RuntimeException("you must set logDirs"); if (configRepository == null) throw new RuntimeException("you must set configRepository"); @@ -172,6 +178,7 @@ public LogManager build() { brokerTopicStats, logDirFailureChannel, time, - keepPartitionMetadataFile); + keepPartitionMetadataFile, + remoteStorageSystemEnable); } } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index b28641965f87..1efc8bf75007 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -23,6 +23,7 @@ import kafka.api.LeaderAndIsr import kafka.common.UnexpectedAppendOffsetException import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log._ +import kafka.log.remote.RemoteLogManager import kafka.server._ import kafka.server.checkpoints.OffsetCheckpoints import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache} @@ -1478,7 +1479,8 @@ class Partition(val topicPartition: TopicPartition, def fetchOffsetForTimestamp(timestamp: Long, isolationLevel: Option[IsolationLevel], currentLeaderEpoch: Optional[Integer], - fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) { + fetchOnlyFromLeader: Boolean, + remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) { // decide whether to only fetch from leader val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader) @@ -1504,7 +1506,7 @@ class Partition(val topicPartition: TopicPartition, s"start offset from the beginning of this epoch ($epochStart).")) def getOffsetByTimestamp: Option[TimestampAndOffset] = { - logManager.getLog(topicPartition).flatMap(log => log.fetchOffsetByTimestamp(timestamp)) + logManager.getLog(topicPartition).flatMap(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager)) } // If we're in the lagging HW state after a leader election, throw OffsetNotAvailable for "latest" offset diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 381872acd2f2..40681bdb7ffa 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -76,7 +76,8 @@ class LogManager(logDirs: Seq[File], brokerTopicStats: BrokerTopicStats, logDirFailureChannel: LogDirFailureChannel, time: Time, - val keepPartitionMetadataFile: Boolean) extends Logging { + val keepPartitionMetadataFile: Boolean, + remoteStorageSystemEnable: Boolean) extends Logging { import LogManager._ @@ -290,7 +291,8 @@ class LogManager(logDirs: Seq[File], lastShutdownClean = hadCleanShutdown, topicId = None, keepPartitionMetadataFile = keepPartitionMetadataFile, - numRemainingSegments = numRemainingSegments) + numRemainingSegments = numRemainingSegments, + remoteStorageSystemEnable = remoteStorageSystemEnable) if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) { addLogToBeDeleted(log) @@ -971,7 +973,8 @@ class LogManager(logDirs: Seq[File], brokerTopicStats = brokerTopicStats, logDirFailureChannel = logDirFailureChannel, topicId = topicId, - keepPartitionMetadataFile = keepPartitionMetadataFile) + keepPartitionMetadataFile = keepPartitionMetadataFile, + remoteStorageSystemEnable = remoteStorageSystemEnable) if (isFuture) futureLogs.put(topicPartition, log) @@ -1398,7 +1401,8 @@ object LogManager { logDirFailureChannel = logDirFailureChannel, time = time, keepPartitionMetadataFile = keepPartitionMetadataFile, - interBrokerProtocolVersion = config.interBrokerProtocolVersion) + interBrokerProtocolVersion = config.interBrokerProtocolVersion, + remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem()) } } diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index f4529516e350..a2aedf0a2903 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -93,7 +93,6 @@ import scala.jdk.CollectionConverters._ * If the inter-broker protocol version on a ZK cluster is below 2.8, partition.metadata * will be deleted to avoid ID conflicts upon re-upgrade. * @param remoteStorageSystemEnable flag to indicate whether the system level remote log storage is enabled or not. - * @param remoteLogManager Optional RemoteLogManager instance if it exists. */ @threadsafe class UnifiedLog(@volatile var logStartOffset: Long, @@ -105,7 +104,6 @@ class UnifiedLog(@volatile var logStartOffset: Long, @volatile private var _topicId: Option[Uuid], val keepPartitionMetadataFile: Boolean, val remoteStorageSystemEnable: Boolean = false, - remoteLogManager: Option[RemoteLogManager] = None, @volatile private var logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging { import kafka.log.UnifiedLog._ @@ -1169,11 +1167,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, * , i.e. it only gives back the timestamp based on the last modification time of the log segments. * * @param targetTimestamp The given timestamp for offset fetching. + * @param remoteLogManager Optional RemoteLogManager instance if it exists. * @return The offset of the first message whose timestamp is greater than or equals to the given timestamp. * None if no such message is found. */ @nowarn("cat=deprecation") - def fetchOffsetByTimestamp(targetTimestamp: Long): Option[TimestampAndOffset] = { + def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = { maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") { debug(s"Searching offset for timestamp $targetTimestamp") @@ -1779,7 +1778,6 @@ object UnifiedLog extends Logging { keepPartitionMetadataFile: Boolean, numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int], remoteStorageSystemEnable: Boolean = false, - remoteLogManager: Option[RemoteLogManager] = None, logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = { // create the log directory if it doesn't exist Files.createDirectories(dir.toPath) @@ -1819,7 +1817,6 @@ object UnifiedLog extends Logging { topicId, keepPartitionMetadataFile, remoteStorageSystemEnable, - remoteLogManager, logOffsetsListener) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0027fe77afe1..cf9e557aaac9 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1142,7 +1142,7 @@ class ReplicaManager(val config: KafkaConfig, currentLeaderEpoch: Optional[Integer], fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = { val partition = getPartitionOrException(topicPartition) - partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader) + partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader, remoteLogManager) } def legacyFetchOffsetsForTimestamp(topicPartition: TopicPartition, diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 52dfa760cd8f..f36673e4a9f6 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -120,7 +120,8 @@ class LogLoaderTest { brokerTopicStats = new BrokerTopicStats(), logDirFailureChannel = logDirFailureChannel, time = time, - keepPartitionMetadataFile = config.usesTopicId) { + keepPartitionMetadataFile = config.usesTopicId, + remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem()) { override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig, diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala index 1c7cce3ac992..1be51bfc2497 100644 --- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala +++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala @@ -113,7 +113,6 @@ object LogTestUtils { keepPartitionMetadataFile = keepPartitionMetadataFile, numRemainingSegments = numRemainingSegments, remoteStorageSystemEnable = remoteStorageSystemEnable, - remoteLogManager = remoteLogManager, logOffsetsListener = logOffsetsListener ) } diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 7de7288a1c18..0190e3c9462d 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -2036,7 +2036,7 @@ class UnifiedLogTest { val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager)) when(remoteLogManager.findOffsetByTimestamp(log.topicPartition, 0, 0, log.leaderEpochCache.get)) .thenReturn(Optional.empty[TimestampAndOffset]()) - assertEquals(None, log.fetchOffsetByTimestamp(0L)) + assertEquals(None, log.fetchOffsetByTimestamp(0L, Some(remoteLogManager))) val firstTimestamp = mockTime.milliseconds val firstLeaderEpoch = 0 @@ -2063,23 +2063,23 @@ class UnifiedLogTest { log._localLogStartOffset = 1 assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))), - log.fetchOffsetByTimestamp(firstTimestamp)) + log.fetchOffsetByTimestamp(firstTimestamp, Some(remoteLogManager))) assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))), - log.fetchOffsetByTimestamp(secondTimestamp)) + log.fetchOffsetByTimestamp(secondTimestamp, Some(remoteLogManager))) assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP)) + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP, Some(remoteLogManager))) assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)) + log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(remoteLogManager))) assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)) + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) // The cache can be updated directly after a leader change. // The new latest offset should reflect the updated epoch. log.maybeAssignEpochStartOffset(2, 2L) assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))), - log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP)) + log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) } /** diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index fcbd49072164..db55550b7143 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1380,7 +1380,8 @@ object TestUtils extends Logging { brokerTopicStats = new BrokerTopicStats, logDirFailureChannel = new LogDirFailureChannel(logDirs.size), keepPartitionMetadataFile = true, - interBrokerProtocolVersion = interBrokerProtocolVersion) + interBrokerProtocolVersion = interBrokerProtocolVersion, + remoteStorageSystemEnable = false) } class MockAlterPartitionManager extends AlterPartitionManager { diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java index 2509a448a58d..8e0b52d71dd3 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java @@ -159,7 +159,7 @@ public void run() { while (!closing) { maybeWaitForPartitionsAssignment(); - log.info("Polling consumer to receive remote log metadata topic records"); + log.trace("Polling consumer to receive remote log metadata topic records"); ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS)); for (ConsumerRecord record : consumerRecords) { processConsumerRecord(record);