Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15037: pass remoteLogEnabled to unifiedLog #13779

Merged
merged 3 commits into from Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -54,6 +54,7 @@ public class LogManagerBuilder {
private LogDirFailureChannel logDirFailureChannel = null;
private Time time = Time.SYSTEM;
private boolean keepPartitionMetadataFile = true;
private boolean remoteStorageSystemEnable = false;

public LogManagerBuilder setLogDirs(List<File> logDirs) {
this.logDirs = logDirs;
Expand Down Expand Up @@ -145,6 +146,11 @@ public LogManagerBuilder setKeepPartitionMetadataFile(boolean keepPartitionMetad
return this;
}

public LogManagerBuilder setRemoteStorageSystemEnable(boolean remoteStorageSystemEnable) {
this.remoteStorageSystemEnable = remoteStorageSystemEnable;
return this;
}

public LogManager build() {
if (logDirs == null) throw new RuntimeException("you must set logDirs");
if (configRepository == null) throw new RuntimeException("you must set configRepository");
Expand Down Expand Up @@ -172,6 +178,7 @@ public LogManager build() {
brokerTopicStats,
logDirFailureChannel,
time,
keepPartitionMetadataFile);
keepPartitionMetadataFile,
remoteStorageSystemEnable);
}
}
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Expand Up @@ -23,6 +23,7 @@ import kafka.api.LeaderAndIsr
import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log._
import kafka.log.remote.RemoteLogManager
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
Expand Down Expand Up @@ -1478,7 +1479,8 @@ class Partition(val topicPartition: TopicPartition,
def fetchOffsetForTimestamp(timestamp: Long,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) {
fetchOnlyFromLeader: Boolean,
remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) {
// decide whether to only fetch from leader
val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader)

Expand All @@ -1504,7 +1506,7 @@ class Partition(val topicPartition: TopicPartition,
s"start offset from the beginning of this epoch ($epochStart)."))

def getOffsetByTimestamp: Option[TimestampAndOffset] = {
logManager.getLog(topicPartition).flatMap(log => log.fetchOffsetByTimestamp(timestamp))
logManager.getLog(topicPartition).flatMap(log => log.fetchOffsetByTimestamp(timestamp, remoteLogManager))
}

// If we're in the lagging HW state after a leader election, throw OffsetNotAvailable for "latest" offset
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/scala/kafka/log/LogManager.scala
Expand Up @@ -40,6 +40,7 @@ import org.apache.kafka.common.config.TopicConfig

import java.util.Properties
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.Scheduler
Expand Down Expand Up @@ -76,7 +77,8 @@ class LogManager(logDirs: Seq[File],
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
time: Time,
val keepPartitionMetadataFile: Boolean) extends Logging {
val keepPartitionMetadataFile: Boolean,
remoteStorageSystemEnable: Boolean) extends Logging {

import LogManager._

Expand Down Expand Up @@ -290,7 +292,8 @@ class LogManager(logDirs: Seq[File],
lastShutdownClean = hadCleanShutdown,
topicId = None,
keepPartitionMetadataFile = keepPartitionMetadataFile,
numRemainingSegments = numRemainingSegments)
numRemainingSegments = numRemainingSegments,
remoteStorageSystemEnable = remoteStorageSystemEnable)

if (logDir.getName.endsWith(UnifiedLog.DeleteDirSuffix)) {
addLogToBeDeleted(log)
Expand Down Expand Up @@ -971,7 +974,8 @@ class LogManager(logDirs: Seq[File],
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel,
topicId = topicId,
keepPartitionMetadataFile = keepPartitionMetadataFile)
keepPartitionMetadataFile = keepPartitionMetadataFile,
remoteStorageSystemEnable = remoteStorageSystemEnable)

if (isFuture)
futureLogs.put(topicPartition, log)
Expand Down Expand Up @@ -1398,7 +1402,8 @@ object LogManager {
logDirFailureChannel = logDirFailureChannel,
time = time,
keepPartitionMetadataFile = keepPartitionMetadataFile,
interBrokerProtocolVersion = config.interBrokerProtocolVersion)
interBrokerProtocolVersion = config.interBrokerProtocolVersion,
remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem())
}

}
7 changes: 2 additions & 5 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Expand Up @@ -93,7 +93,6 @@ import scala.jdk.CollectionConverters._
* If the inter-broker protocol version on a ZK cluster is below 2.8, partition.metadata
* will be deleted to avoid ID conflicts upon re-upgrade.
* @param remoteStorageSystemEnable flag to indicate whether the system level remote log storage is enabled or not.
* @param remoteLogManager Optional RemoteLogManager instance if it exists.
*/
@threadsafe
class UnifiedLog(@volatile var logStartOffset: Long,
Expand All @@ -105,7 +104,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
@volatile private var _topicId: Option[Uuid],
val keepPartitionMetadataFile: Boolean,
val remoteStorageSystemEnable: Boolean = false,
remoteLogManager: Option[RemoteLogManager] = None,
@volatile private var logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging {

import kafka.log.UnifiedLog._
Expand Down Expand Up @@ -1169,11 +1167,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* , i.e. it only gives back the timestamp based on the last modification time of the log segments.
*
* @param targetTimestamp The given timestamp for offset fetching.
* @param remoteLogManager Optional RemoteLogManager instance if it exists.
* @return The offset of the first message whose timestamp is greater than or equals to the given timestamp.
* None if no such message is found.
*/
@nowarn("cat=deprecation")
def fetchOffsetByTimestamp(targetTimestamp: Long): Option[TimestampAndOffset] = {
def fetchOffsetByTimestamp(targetTimestamp: Long, remoteLogManager: Option[RemoteLogManager] = None): Option[TimestampAndOffset] = {
maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") {
debug(s"Searching offset for timestamp $targetTimestamp")

Expand Down Expand Up @@ -1779,7 +1778,6 @@ object UnifiedLog extends Logging {
keepPartitionMetadataFile: Boolean,
numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int],
remoteStorageSystemEnable: Boolean = false,
remoteLogManager: Option[RemoteLogManager] = None,
logOffsetsListener: LogOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
Expand Down Expand Up @@ -1819,7 +1817,6 @@ object UnifiedLog extends Logging {
topicId,
keepPartitionMetadataFile,
remoteStorageSystemEnable,
remoteLogManager,
logOffsetsListener)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Expand Up @@ -1142,7 +1142,7 @@ class ReplicaManager(val config: KafkaConfig,
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = {
val partition = getPartitionOrException(topicPartition)
partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader)
partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader, remoteLogManager)
}

def legacyFetchOffsetsForTimestamp(topicPartition: TopicPartition,
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Expand Up @@ -120,7 +120,8 @@ class LogLoaderTest {
brokerTopicStats = new BrokerTopicStats(),
logDirFailureChannel = logDirFailureChannel,
time = time,
keepPartitionMetadataFile = config.usesTopicId) {
keepPartitionMetadataFile = config.usesTopicId,
remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem()) {

override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long],
logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig,
Expand Down
1 change: 0 additions & 1 deletion core/src/test/scala/unit/kafka/log/LogTestUtils.scala
Expand Up @@ -113,7 +113,6 @@ object LogTestUtils {
keepPartitionMetadataFile = keepPartitionMetadataFile,
numRemainingSegments = numRemainingSegments,
remoteStorageSystemEnable = remoteStorageSystemEnable,
remoteLogManager = remoteLogManager,
logOffsetsListener = logOffsetsListener
)
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/unit/kafka/utils/TestUtils.scala
Expand Up @@ -1380,7 +1380,8 @@ object TestUtils extends Logging {
brokerTopicStats = new BrokerTopicStats,
logDirFailureChannel = new LogDirFailureChannel(logDirs.size),
keepPartitionMetadataFile = true,
interBrokerProtocolVersion = interBrokerProtocolVersion)
interBrokerProtocolVersion = interBrokerProtocolVersion,
remoteStorageSystemEnable = false)
}

class MockAlterPartitionManager extends AlterPartitionManager {
Expand Down
Expand Up @@ -159,7 +159,7 @@ public void run() {
while (!closing) {
maybeWaitForPartitionsAssignment();

log.info("Polling consumer to receive remote log metadata topic records");
log.trace("Polling consumer to receive remote log metadata topic records");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging this line on each consumer poll will flood the log. Change to trace level

ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
processConsumerRecord(record);
Expand Down