Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module. #13275

Merged
merged 10 commits into from
Jul 11, 2023
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,6 @@ project(':core') {


implementation libs.argparse4j
implementation libs.caffeine
implementation libs.commonsValidator
implementation libs.jacksonDatabind
implementation libs.jacksonModuleScala
Expand Down Expand Up @@ -931,7 +930,8 @@ project(':core') {
testImplementation(libs.jfreechart) {
exclude group: 'junit', module: 'junit'
}

testImplementation libs.caffeine

generator project(':generator')
}

Expand Down Expand Up @@ -1704,6 +1704,7 @@ project(':storage') {
implementation project(':storage:api')
implementation project(':server-common')
implementation project(':clients')
implementation libs.caffeine
divijvaidya marked this conversation as resolved.
Show resolved Hide resolved
implementation libs.slf4jApi
implementation libs.jacksonDatabind

Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage.internals"/>
<allow pkg="org.apache.kafka.common" />
<allow pkg="com.github.benmanes.caffeine.cache" />
</subpackage>

<subpackage name="shell">
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@
<suppress checks="CyclomaticComplexity"
files="(LogValidator|RemoteLogManagerConfig).java"/>
<suppress checks="NPathComplexity"
files="LogValidator.java"/>
files="(LogValidator|RemoteIndexCache).java"/>
<suppress checks="ParameterNumber"
files="(LogAppendInfo|RemoteLogManagerConfig).java"/>

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.OffsetPosition;
import org.apache.kafka.storage.internals.log.RemoteIndexCache;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.storage.internals.log.TxnIndexSearchResult;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
Expand Down Expand Up @@ -160,7 +162,7 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
String logDir,
String clusterId,
Time time,
Function<TopicPartition, Optional<UnifiedLog>> fetchLog) {
Function<TopicPartition, Optional<UnifiedLog>> fetchLog) throws IOException {
this.rlmConfig = rlmConfig;
this.brokerId = brokerId;
this.logDir = logDir;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LocalLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ class LocalLog(@volatile private var _dir: File,
object LocalLog extends Logging {

/** a file that is scheduled to be deleted */
private[log] val DeletedFileSuffix = ".deleted"
private[log] val DeletedFileSuffix = LogFileUtils.DELETED_FILE_SUFFIX

/** A temporary file that is being used for log cleaning */
private[log] val CleanedFileSuffix = ".cleaned"
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package kafka.log

import kafka.log.remote.RemoteIndexCache

import java.io._
import java.nio.file.Files
import java.util.concurrent._
Expand All @@ -43,7 +41,7 @@ import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig, RemoteIndexCache}

import scala.annotation.nowarn

Expand Down Expand Up @@ -397,7 +395,7 @@ class LogManager(logDirs: Seq[File],
logDir.isDirectory &&
// Ignore remote-log-index-cache directory as that is index cache maintained by tiered storage subsystem
// but not any topic-partition dir.
!logDir.getName.equals(RemoteIndexCache.DirName) &&
!logDir.getName.equals(RemoteIndexCache.DIR_NAME) &&
UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
numTotalLogs += logsToLoad.length
numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length)
Expand Down