From e5072a3317e631317762b49b847bd009c6cc2b70 Mon Sep 17 00:00:00 2001 From: Han Liu Date: Wed, 11 Mar 2026 16:12:59 +0800 Subject: [PATCH 1/3] [server] Implement JBOD Phase 1 Local Multi-Directory Support --- .../apache/fluss/config/ConfigOptions.java | 14 + .../fluss/server/TabletManagerBase.java | 43 +- .../org/apache/fluss/server/kv/KvManager.java | 22 +- .../apache/fluss/server/log/LogManager.java | 298 ++++--- .../server/log/remote/RemoteLogManager.java | 72 +- .../apache/fluss/server/replica/Replica.java | 13 +- .../fluss/server/replica/ReplicaManager.java | 80 +- .../server/storage/LocalDiskManager.java | 744 ++++++++++++++++++ .../fluss/server/tablet/TabletServer.java | 24 +- .../apache/fluss/server/kv/KvManagerTest.java | 35 +- .../server/log/DroppedTableRecoveryTest.java | 95 ++- .../fluss/server/log/LogManagerTest.java | 122 ++- .../log/remote/RemoteLogIndexCacheTest.java | 2 +- .../log/remote/RemoteLogManagerTest.java | 50 ++ .../server/replica/ReplicaManagerTest.java | 70 ++ .../fluss/server/replica/ReplicaTestBase.java | 44 +- .../fetcher/ReplicaFetcherThreadTest.java | 38 +- .../server/storage/LocalDiskManagerTest.java | 400 ++++++++++ 18 files changed, 1949 insertions(+), 217 deletions(-) create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/storage/LocalDiskManagerTest.java diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index df21129db6..b42b8aaab4 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -498,6 +498,20 @@ public class ConfigOptions { "This configuration controls the directory where fluss will store its data. " + "The default value is /tmp/fluss-data"); + public static final ConfigOption> DATA_DIRS = + key("data.dirs") + .stringType() + .asList() + .noDefaultValue() + .withDescription( + "A comma-separated list of local directories used by TabletServer to store " + + "local log, kv, checkpoints, and other node-local files. " + + "If configured, this option takes precedence over `" + + DATA_DIR.key() + + "`. If not configured, `" + + DATA_DIR.key() + + "` is used as the only local data directory."); + public static final ConfigOption WRITER_ID_EXPIRATION_TIME = key("server.writer-id.expiration-time") .durationType() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java b/fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java index 6525df1c45..0bb2cbee33 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java @@ -43,7 +43,9 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -71,7 +73,7 @@ public enum TabletType { KV } - protected final File dataDir; + protected final List dataDirs; protected final Configuration conf; @@ -83,22 +85,31 @@ public enum TabletType { private final String tabletDirPrefix; public TabletManagerBase( - TabletType tabletType, File dataDir, Configuration conf, int recoveryThreads) { + TabletType tabletType, List dataDirs, Configuration conf, int recoveryThreads) { this.tabletType = tabletType; this.tabletDirPrefix = getTabletDirPrefix(tabletType); - this.dataDir = dataDir; + this.dataDirs = new ArrayList<>(dataDirs); this.conf = conf; this.recoveryThreads = recoveryThreads; } /** - * Return the directories of the tablets to be loaded. + * Return the directories of the tablets to be loaded, grouped by configured data directory. * *

See more about the local directory contracts: {@link FlussPaths#logTabletDir(File, * PhysicalTablePath, TableBucket)} and {@link FlussPaths#kvTabletDir(File, PhysicalTablePath, * TableBucket)}. */ - protected List listTabletsToLoad() { + protected Map> listTabletsToLoad() { + Map> tabletsToLoadByDataDir = new LinkedHashMap<>(); + for (File dataDir : dataDirs) { + tabletsToLoadByDataDir.put(dataDir, listTabletsToLoad(dataDir)); + } + return tabletsToLoadByDataDir; + } + + /** Returns the tablet directories to be loaded from a single configured data directory. */ + protected List listTabletsToLoad(File dataDir) { List tabletsToLoad = new ArrayList<>(); // Get all database directory. File[] dbDirs = FileUtils.listDirectories(dataDir); @@ -140,6 +151,11 @@ protected ExecutorService createThreadPool(String poolName) { return Executors.newFixedThreadPool(recoveryThreads, new ExecutorThreadFactory(poolName)); } + protected ExecutorService createThreadPoolByDir(String poolName, File dataDir) { + return Executors.newSingleThreadExecutor( + new ExecutorThreadFactory(poolName + "-" + dataDir.getAbsolutePath())); + } + /** Running a series of jobs in a thread pool, and return the count of the successful job. */ protected int runInThreadPool(Runnable[] runnableJobs, String poolName) throws Throwable { List> jobsForTabletDir = new ArrayList<>(); @@ -164,17 +180,20 @@ protected int runInThreadPool(Runnable[] runnableJobs, String poolName) throws T } /** - * Get the tablet directory with given directory name for the given table path and table bucket. + * Get the tablet directory with given directory name for the given data directory, table path + * and table bucket. * *

When the parent directory of the tablet directory is missing, it will create the * directory. * + * @param dataDir the local data directory chosen for this tablet * @param tablePath the table path of the bucket * @param tableBucket the table bucket * @return the tablet directory */ - protected File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) { - File tabletDir = getTabletDir(tablePath, tableBucket); + protected File getOrCreateTabletDir( + File dataDir, PhysicalTablePath tablePath, TableBucket tableBucket) { + File tabletDir = getTabletDir(dataDir, tablePath, tableBucket); if (tabletDir.exists()) { return tabletDir; } @@ -182,11 +201,13 @@ protected File getOrCreateTabletDir(PhysicalTablePath tablePath, TableBucket tab return tabletDir; } - public Path getTabletParentDir(PhysicalTablePath tablePath, TableBucket tableBucket) { - return getTabletDir(tablePath, tableBucket).toPath().getParent(); + public Path getTabletParentDir( + File dataDir, PhysicalTablePath tablePath, TableBucket tableBucket) { + return getTabletDir(dataDir, tablePath, tableBucket).toPath().getParent(); } - protected File getTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) { + protected File getTabletDir( + File dataDir, PhysicalTablePath tablePath, TableBucket tableBucket) { switch (tabletType) { case LOG: return FlussPaths.logTabletDir(dataDir, tablePath, tableBucket); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 14617ae1ca..ea87986c6b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -42,6 +42,7 @@ import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocatorUtil; @@ -110,6 +111,7 @@ public static RateLimiter getDefaultRateLimiter() { } private final LogManager logManager; + private final LocalDiskManager localDiskManager; private final TabletServerMetricGroup serverMetricGroup; @@ -141,14 +143,15 @@ public static RateLimiter getDefaultRateLimiter() { private volatile boolean isShutdown = false; private KvManager( - File dataDir, + LocalDiskManager localDiskManager, Configuration conf, ZooKeeperClient zkClient, int recoveryThreadsPerDataDir, LogManager logManager, TabletServerMetricGroup tabletServerMetricGroup) throws IOException { - super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir); + super(TabletType.KV, localDiskManager.dataDirs(), conf, recoveryThreadsPerDataDir); + this.localDiskManager = localDiskManager; this.logManager = logManager; this.arrowBufferAllocator = BufferAllocatorUtil.createBufferAllocator(null); this.memorySegmentPool = LazyMemorySegmentPool.createServerBufferPool(conf); @@ -183,12 +186,11 @@ public static KvManager create( Configuration conf, ZooKeeperClient zkClient, LogManager logManager, - TabletServerMetricGroup tabletServerMetricGroup) + TabletServerMetricGroup tabletServerMetricGroup, + LocalDiskManager localDiskManager) throws IOException { - String dataDirString = conf.getString(ConfigOptions.DATA_DIR); - File dataDir = new File(dataDirString).getAbsoluteFile(); return new KvManager( - dataDir, + localDiskManager, conf, zkClient, conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS), @@ -233,6 +235,7 @@ public void shutdown() { * @param kvFormat the kv format */ public KvTablet getOrCreateKv( + File dataDir, PhysicalTablePath tablePath, TableBucket tableBucket, LogTablet logTablet, @@ -248,7 +251,7 @@ public KvTablet getOrCreateKv( return currentKvs.get(tableBucket); } - File tabletDir = getOrCreateTabletDir(tablePath, tableBucket); + File tabletDir = getOrCreateTabletDir(dataDir, tablePath, tableBucket); RowMerger merger = RowMerger.create(tableConfig, kvFormat, schemaGetter); AutoIncrementManager autoIncrementManager = new AutoIncrementManager( @@ -294,8 +297,9 @@ public KvTablet getOrCreateKv( * @param tableBucket the table bucket * @return the tablet directory */ - public File createTabletDir(PhysicalTablePath tablePath, TableBucket tableBucket) { - File tabletDir = getTabletDir(tablePath, tableBucket); + public File createTabletDir( + File dataDir, PhysicalTablePath tablePath, TableBucket tableBucket) { + File tabletDir = getTabletDir(dataDir, tablePath, tableBucket); // delete the tablet dir if exists FileUtils.deleteDirectoryQuietly(tabletDir); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java index f7f35f4618..368e856099 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java @@ -31,6 +31,7 @@ import org.apache.fluss.server.TabletManagerBase; import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile; import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.FlussPaths; @@ -46,9 +47,9 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -90,15 +91,16 @@ public final class LogManager extends TabletManagerBase { private final Scheduler scheduler; private final Clock clock; private final TabletServerMetricGroup serverMetricGroup; + private final LocalDiskManager localDiskManager; private final ReentrantLock logCreationOrDeletionLock = new ReentrantLock(); private final Map currentLogs = new ConcurrentHashMap<>(); - private volatile OffsetCheckpointFile recoveryPointCheckpoint; + private volatile Map recoveryPointCheckpoints; private boolean loadLogsCompletedFlag = false; private LogManager( - File dataDir, + LocalDiskManager localDiskManager, Configuration conf, ZooKeeperClient zkClient, int recoveryThreadsPerDataDir, @@ -106,12 +108,12 @@ private LogManager( Clock clock, TabletServerMetricGroup serverMetricGroup) throws Exception { - super(TabletType.LOG, dataDir, conf, recoveryThreadsPerDataDir); + super(TabletType.LOG, localDiskManager.dataDirs(), conf, recoveryThreadsPerDataDir); this.zkClient = zkClient; this.scheduler = scheduler; this.clock = clock; this.serverMetricGroup = serverMetricGroup; - createAndValidateDataDir(dataDir); + this.localDiskManager = localDiskManager; initializeCheckpointMaps(); } @@ -121,12 +123,11 @@ public static LogManager create( ZooKeeperClient zkClient, Scheduler scheduler, Clock clock, - TabletServerMetricGroup serverMetricGroup) + TabletServerMetricGroup serverMetricGroup, + LocalDiskManager localDiskManager) throws Exception { - String dataDirString = conf.getString(ConfigOptions.DATA_DIR); - File dataDir = new File(dataDirString).getAbsoluteFile(); return new LogManager( - dataDir, + localDiskManager, conf, zkClient, conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS), @@ -141,71 +142,106 @@ public void startup() { // TODO add more scheduler, like log-flusher etc. } - public File getDataDir() { - return dataDir; - } - private void initializeCheckpointMaps() throws IOException { - recoveryPointCheckpoint = - new OffsetCheckpointFile(new File(dataDir, RECOVERY_POINT_CHECKPOINT_FILE)); + recoveryPointCheckpoints = new HashMap<>(); + for (File dataDir : dataDirs) { + recoveryPointCheckpoints.put( + dataDir, + new OffsetCheckpointFile(new File(dataDir, RECOVERY_POINT_CHECKPOINT_FILE))); + } } /** Recover and load all logs in the given data directories. */ private void loadLogs() { - LOG.info("Loading logs from dir {}", dataDir); - - String dataDirAbsolutePath = dataDir.getAbsolutePath(); try { - boolean isCleanShutdown = false; - File cleanShutdownFile = new File(dataDir, CLEAN_SHUTDOWN_FILE); - if (cleanShutdownFile.exists()) { - // Cache the clean shutdown status marker and use that for rest of log loading - // workflow. Delete the CleanShutdownFile so that if tabletServer crashes while - // loading the log, it is considered hard shutdown during the next boot up. - Files.deleteIfExists(cleanShutdownFile.toPath()); - isCleanShutdown = true; + Map> tabletsToLoadByDataDir = listTabletsToLoad(); + LOG.info("Loading logs from {} dirs", tabletsToLoadByDataDir.size()); + Map poolsByDataDir = new LinkedHashMap<>(); + List> jobsForDataDir = new ArrayList<>(); + for (Map.Entry> entry : tabletsToLoadByDataDir.entrySet()) { + File dataDir = entry.getKey(); + List tabletsToLoad = entry.getValue(); + Runnable runnable = + () -> { + try { + loadLogs(dataDir, tabletsToLoad); + } catch (Throwable t) { + throw new FlussRuntimeException( + "Failed to recover logs from " + dataDir.getAbsolutePath(), + t); + } + }; + ExecutorService pool = createThreadPoolByDir("log-recovery-data-dir", dataDir); + poolsByDataDir.put(dataDir, pool); + jobsForDataDir.add(pool.submit(runnable)); } - - Map recoveryPoints = new HashMap<>(); try { - recoveryPoints = recoveryPointCheckpoint.read(); - } catch (Exception e) { - LOG.warn( - "Error occurred while reading recovery-point-offset-checkpoint file of directory {}, " - + "resetting the recovery checkpoint to 0", - dataDirAbsolutePath, - e); - } - - List tabletsToLoad = listTabletsToLoad(); - if (tabletsToLoad.isEmpty()) { - LOG.info("No logs found to be loaded in {}", dataDirAbsolutePath); - } else if (isCleanShutdown) { - LOG.info("Skipping some recovery log process since clean shutdown file was found"); - } else { - LOG.info("Recovering all local logs since no clean shutdown file was found"); + for (Future future : jobsForDataDir) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw e.getCause(); + } + } + loadLogsCompletedFlag = true; + LOG.info("Log loader complete."); + } finally { + for (ExecutorService pool : poolsByDataDir.values()) { + pool.shutdown(); + } } + } catch (Throwable e) { + throw new FlussRuntimeException("Failed to recover logs", e); + } + } - final Map finalRecoveryPoints = recoveryPoints; - final boolean cleanShutdown = isCleanShutdown; - // set runnable job. - Runnable[] jobsForDir = - createLogLoadingJobs( - tabletsToLoad, cleanShutdown, finalRecoveryPoints, conf, clock); + private void loadLogs(File dataDir, List tabletsToLoad) throws Throwable { + LOG.info("Loading logs from dir {}", dataDir); - long startTime = System.currentTimeMillis(); + String dataDirAbsolutePath = dataDir.getAbsolutePath(); + boolean isCleanShutdown = false; + File cleanShutdownFile = new File(dataDir, CLEAN_SHUTDOWN_FILE); + if (cleanShutdownFile.exists()) { + // Cache the clean shutdown status marker and use that for rest of log loading + // workflow. Delete the CleanShutdownFile so that if tabletServer crashes while + // loading the log, it is considered hard shutdown during the next boot up. + Files.deleteIfExists(cleanShutdownFile.toPath()); + isCleanShutdown = true; + } - int successLoadCount = - runInThreadPool(jobsForDir, "log-recovery-" + dataDirAbsolutePath); + Map recoveryPoints = new HashMap<>(); + try { + recoveryPoints = recoveryPointCheckpoints.get(dataDir).read(); + } catch (Exception e) { + LOG.warn( + "Error occurred while reading recovery-point-offset-checkpoint file of directory {}, " + + "resetting the recovery checkpoint to 0", + dataDirAbsolutePath, + e); + } - loadLogsCompletedFlag = true; - LOG.info( - "Log loader complete. Total success loaded log count is {}, Take {} ms", - successLoadCount, - System.currentTimeMillis() - startTime); - } catch (Throwable e) { - throw new FlussRuntimeException("Failed to recovery log", e); + if (tabletsToLoad.isEmpty()) { + LOG.info("No logs found to be loaded in {}", dataDirAbsolutePath); + } else if (isCleanShutdown) { + LOG.info("Skipping some recovery log process since clean shutdown file was found"); + } else { + LOG.info("Recovering all local logs since no clean shutdown file was found"); } + + final Map finalRecoveryPoints = recoveryPoints; + final boolean cleanShutdown = isCleanShutdown; + // set runnable job. + Runnable[] jobsForDir = + createLogLoadingJobs( + dataDir, tabletsToLoad, cleanShutdown, finalRecoveryPoints, conf, clock); + + long startTime = System.currentTimeMillis(); + int successLoadCount = runInThreadPool(jobsForDir, "log-recovery-" + dataDirAbsolutePath); + LOG.info( + "Log loader complete for {}. Total success loaded log count is {}, Take {} ms", + dataDirAbsolutePath, + successLoadCount, + System.currentTimeMillis() - startTime); } /** @@ -213,6 +249,7 @@ private void loadLogs() { * return a copy of the existing log. Otherwise, create a log for the given table and the given * bucket. * + * @param dataDir the local data directory chosen for the bucket * @param tablePath the table path of the bucket belongs to * @param tableBucket the table bucket * @param logFormat the log format @@ -220,6 +257,7 @@ private void loadLogs() { * @param isChangelog whether the log is a changelog of primary key table */ public LogTablet getOrCreateLog( + File dataDir, PhysicalTablePath tablePath, TableBucket tableBucket, LogFormat logFormat, @@ -233,7 +271,7 @@ public LogTablet getOrCreateLog( return currentLogs.get(tableBucket); } - File tabletDir = getOrCreateTabletDir(tablePath, tableBucket); + File tabletDir = getOrCreateTabletDir(dataDir, tablePath, tableBucket); LogTablet logTablet = LogTablet.create( @@ -311,7 +349,7 @@ public void truncateTo(TableBucket tableBucket, long offset) throws LogStorageEx LogTablet logTablet = currentLogs.get(tableBucket); // If the log tablet does not exist, skip it. if (logTablet != null && logTablet.truncateTo(offset)) { - checkpointRecoveryOffsets(); + checkpointRecoveryOffsets(localDiskManager.resolveDataDir(logTablet.getLogDir())); } } @@ -320,11 +358,12 @@ public void truncateFullyAndStartAt(TableBucket tableBucket, long newOffset) { // If the log tablet does not exist, skip it. if (logTablet != null) { logTablet.truncateFullyAndStartAt(newOffset); - checkpointRecoveryOffsets(); + checkpointRecoveryOffsets(localDiskManager.resolveDataDir(logTablet.getLogDir())); } } private LogTablet loadLog( + File dataDir, File tabletDir, boolean isCleanShutdown, Map recoveryPoints, @@ -367,49 +406,64 @@ private LogTablet loadLog( currentLogs.get(tableBucket).getLogDir().getAbsolutePath())); } currentLogs.put(tableBucket, logTablet); + localDiskManager.recordReplicaLoad(dataDir, tableInfo.hasPrimaryKey()); return logTablet; } - private void createAndValidateDataDir(File dataDir) { - try { - inLock( - logCreationOrDeletionLock, - () -> { - if (!dataDir.exists()) { - LOG.info( - "Data directory {} not found, creating it.", - dataDir.getAbsolutePath()); - boolean created = dataDir.mkdirs(); - if (!created) { - throw new IOException( - "Failed to create data directory " - + dataDir.getAbsolutePath()); - } - Path parentPath = - dataDir.toPath().toAbsolutePath().normalize().getParent(); - FileUtils.flushDir(parentPath); - } - if (!dataDir.isDirectory() || !dataDir.canRead()) { - throw new IOException( - dataDir.getAbsolutePath() - + " is not a readable data directory."); - } - }); - } catch (IOException e) { - throw new FlussRuntimeException( - "Failed to create or validate data directory " + dataDir.getAbsolutePath(), e); - } - } - /** Close all the logs. */ public void shutdown() { LOG.info("Shutting down LogManager."); + Map> logsByDataDir = new LinkedHashMap<>(); + for (File dataDir : dataDirs) { + logsByDataDir.put(dataDir, new ArrayList<>()); + } + for (LogTablet logTablet : currentLogs.values()) { + File dataDir = localDiskManager.resolveDataDir(logTablet.getLogDir()); + logsByDataDir.computeIfAbsent(dataDir, ignored -> new ArrayList<>()).add(logTablet); + } + + Map poolsByDataDir = new LinkedHashMap<>(); + List> jobsForDataDir = new ArrayList<>(); + for (Map.Entry> entry : logsByDataDir.entrySet()) { + File dataDir = entry.getKey(); + List logs = entry.getValue(); + Runnable runnable = () -> shutdown(dataDir, logs); + ExecutorService pool = createThreadPoolByDir("log-tablet-closing-data-dir", dataDir); + poolsByDataDir.put(dataDir, pool); + jobsForDataDir.add(pool.submit(runnable)); + } + + try { + if (loadLogsCompletedFlag) { + LOG.debug("Writing clean shutdown marker."); + } + for (Future future : jobsForDataDir) { + try { + future.get(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while shutting down LogManager."); + } catch (ExecutionException e) { + LOG.warn( + "There was an error in one of the threads during LogManager shutdown", + e); + } + } + } finally { + for (ExecutorService pool : poolsByDataDir.values()) { + pool.shutdown(); + } + } + + LOG.info("Shut down LogManager complete."); + } + + private void shutdown(File dataDir, List logs) { String dataDirAbsolutePath = dataDir.getAbsolutePath(); + LOG.info("Shutting down {} logs in dir {}", logs.size(), dataDirAbsolutePath); ExecutorService pool = createThreadPool("log-tablet-closing-" + dataDirAbsolutePath); - List logs = new ArrayList<>(currentLogs.values()); List> jobsForTabletDir = new ArrayList<>(); for (LogTablet logTablet : logs) { Runnable runnable = @@ -437,28 +491,26 @@ public void shutdown() { } } - // update the last flush point. - checkpointRecoveryOffsets(); + checkpointRecoveryOffsets(dataDir, logs); // mark that the shutdown was clean by creating marker file for log dirs that all logs // have been recovered at startup time. if (loadLogsCompletedFlag) { - LOG.debug("Writing clean shutdown marker."); try { + LOG.debug("Writing clean shutdown marker for directory {}.", dataDir); Files.createFile(new File(dataDir, CLEAN_SHUTDOWN_FILE).toPath()); } catch (IOException e) { - LOG.warn("Failed to write clean shutdown marker.", e); + LOG.warn("Failed to write clean shutdown marker for directory {}.", dataDir, e); } } } finally { pool.shutdown(); } - - LOG.info("Shut down LogManager complete."); } /** Create runnable jobs for loading logs from tablet directories. */ private Runnable[] createLogLoadingJobs( + File dataDir, List tabletsToLoad, boolean cleanShutdown, Map recoveryPoints, @@ -467,13 +519,16 @@ private Runnable[] createLogLoadingJobs( Runnable[] jobs = new Runnable[tabletsToLoad.size()]; for (int i = 0; i < tabletsToLoad.size(); i++) { final File tabletDir = tabletsToLoad.get(i); - jobs[i] = createLogLoadingJob(tabletDir, cleanShutdown, recoveryPoints, conf, clock); + jobs[i] = + createLogLoadingJob( + dataDir, tabletDir, cleanShutdown, recoveryPoints, conf, clock); } return jobs; } /** Create a runnable job for loading log from a single tablet directory. */ private Runnable createLogLoadingJob( + File dataDir, File tabletDir, boolean cleanShutdown, Map recoveryPoints, @@ -484,7 +539,7 @@ private Runnable createLogLoadingJob( public void run() { LOG.debug("Loading log {}", tabletDir); try { - loadLog(tabletDir, cleanShutdown, recoveryPoints, conf, clock); + loadLog(dataDir, tabletDir, cleanShutdown, recoveryPoints, conf, clock); } catch (Exception e) { LOG.error("Fail to loadLog from {}", tabletDir, e); if (e instanceof SchemaNotExistException) { @@ -522,23 +577,32 @@ public void run() { } @VisibleForTesting - void checkpointRecoveryOffsets() { - // Assuming TableBucket and LogTablet are actual types used in your application - if (recoveryPointCheckpoint != null) { - try { - Map recoveryOffsets = new HashMap<>(); - for (Map.Entry entry : currentLogs.entrySet()) { + void checkpointRecoveryOffsets(File dataDir) { + try { + Map recoveryOffsets = new HashMap<>(); + for (Map.Entry entry : currentLogs.entrySet()) { + if (localDiskManager.resolveDataDir(entry.getValue().getLogDir()).equals(dataDir)) { recoveryOffsets.put(entry.getKey(), entry.getValue().getRecoveryPoint()); } - recoveryPointCheckpoint.write(recoveryOffsets); - } catch (Exception e) { - throw new LogStorageException( - "Disk error while writing recovery offsets checkpoint in directory " - + dataDir - + ": " - + e.getMessage(), - e); } + recoveryPointCheckpoints.get(dataDir).write(recoveryOffsets); + } catch (Exception e) { + throw new LogStorageException( + "Disk error while writing recovery offsets checkpoint", e); + } + } + + private void checkpointRecoveryOffsets(File dataDir, List logs) { + try { + Map recoveryOffsets = new HashMap<>(); + for (LogTablet logTablet : logs) { + recoveryOffsets.put(logTablet.getTableBucket(), logTablet.getRecoveryPoint()); + } + LOG.debug("Writing recovery offsets checkpoint for directory {}.", dataDir); + recoveryPointCheckpoints.get(dataDir).write(recoveryOffsets); + } catch (Exception e) { + throw new LogStorageException( + "Disk error while writing recovery offsets checkpoint", e); } } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java index 282ae8d9ee..d7485aa652 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java @@ -26,8 +26,10 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.remote.RemoteLogSegment; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.replica.Replica; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; import org.apache.fluss.utils.IOUtils; @@ -71,12 +73,14 @@ public class RemoteLogManager implements Closeable { private final long taskInterval; private final int maxUploadSegmentsPerTask; - private final RemoteLogIndexCache remoteLogIndexCache; + private final Map remoteLogIndexCachesByDir; private final RemoteLogStorage remoteLogStorage; private final CoordinatorGateway coordinatorGateway; private final ScheduledExecutorService rlManagerScheduledThreadPool; private final Clock clock; private final ZooKeeperClient zkClient; + private final LogManager logManager; + private final LocalDiskManager localDiskManager; private final Map rlmTasks = new ConcurrentHashMap<>(); private final Map remoteLogs = new ConcurrentHashMap<>(); @@ -85,6 +89,8 @@ public RemoteLogManager( Configuration conf, ZooKeeperClient zkClient, CoordinatorGateway coordinatorGateway, + LocalDiskManager localDiskManager, + LogManager logManager, Clock clock, ExecutorService ioExecutor) throws IOException { @@ -92,6 +98,8 @@ public RemoteLogManager( conf, zkClient, coordinatorGateway, + localDiskManager, + logManager, new DefaultRemoteLogStorage(conf, ioExecutor), Executors.newScheduledThreadPool( conf.getInt(ConfigOptions.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE), @@ -104,6 +112,8 @@ public RemoteLogManager( Configuration conf, ZooKeeperClient zkClient, CoordinatorGateway coordinatorGateway, + LocalDiskManager localDiskManager, + LogManager logManager, RemoteLogStorage remoteLogStorage, ScheduledExecutorService scheduledExecutor, Clock clock) @@ -111,13 +121,19 @@ public RemoteLogManager( this.remoteLogStorage = remoteLogStorage; this.zkClient = zkClient; this.coordinatorGateway = coordinatorGateway; - - File dataDir = new File(conf.getString(ConfigOptions.DATA_DIR)); - this.remoteLogIndexCache = - new RemoteLogIndexCache( - (int) conf.get(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE).getBytes(), - remoteLogStorage, - dataDir); + this.logManager = logManager; + this.localDiskManager = localDiskManager; + this.remoteLogIndexCachesByDir = MapUtils.newConcurrentHashMap(); + for (File dataDir : localDiskManager.dataDirs()) { + remoteLogIndexCachesByDir.put( + dataDir, + new RemoteLogIndexCache( + (int) + conf.get(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE) + .getBytes(), + remoteLogStorage, + dataDir)); + } this.taskInterval = conf.get(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION).toMillis(); this.maxUploadSegmentsPerTask = conf.getInt(ConfigOptions.REMOTE_LOG_TASK_MAX_UPLOAD_SEGMENTS); @@ -191,7 +207,7 @@ public void stopLogTiering(Replica replica) { .map(RemoteLogSegment::remoteLogSegmentId) .collect(Collectors.toList()); // remove cache. - remoteLogIndexCache.removeAll(remoteLogSegmentIdList); + remoteLogIndexCache(replica.getDataDir()).removeAll(remoteLogSegmentIdList); // unregister the remote log metrics, only leader needs to report remoteLog.unregisterMetrics(); } @@ -228,7 +244,11 @@ public void stopReplica(Replica replica, boolean deleteRemote) { /** Get the position of the given offset in the remote log segment. */ public int lookupPositionForOffset(RemoteLogSegment remoteLogSegment, long offset) { - return remoteLogIndexCache.lookupPosition(remoteLogSegment, offset); + RemoteLogIndexCache remoteLogIndexCache = + remoteLogIndexCacheForBucket(remoteLogSegment.tableBucket()); + return remoteLogIndexCache == null + ? -1 + : remoteLogIndexCache.lookupPosition(remoteLogSegment, offset); } /** @@ -249,7 +269,10 @@ public long lookupOffsetForTimestamp(TableBucket tableBucket, long timestamp) { if (segment == null) { return -1L; } else { - return remoteLogIndexCache.lookupOffsetForTimestamp(segment, timestamp); + RemoteLogIndexCache remoteLogIndexCache = remoteLogIndexCacheForBucket(tableBucket); + return remoteLogIndexCache == null + ? -1L + : remoteLogIndexCache.lookupOffsetForTimestamp(segment, timestamp); } } @@ -332,7 +355,9 @@ public RemoteLogTablet remoteLogTablet(TableBucket tableBucket) { public void close() throws IOException { rlmTasks.values().forEach(TaskWithFuture::cancel); IOUtils.closeQuietly(remoteLogStorage, "RemoteLogStorageManager"); - IOUtils.closeQuietly(remoteLogIndexCache, "RemoteIndexCache"); + remoteLogIndexCachesByDir.forEach( + (dataDir, cache) -> + IOUtils.closeQuietly(cache, "RemoteIndexCache-" + dataDir.getName())); shutdownAndAwaitTermination( rlManagerScheduledThreadPool, "RLMScheduledThreadPool", 10, TimeUnit.SECONDS); @@ -398,8 +423,8 @@ private static void shutdownAndAwaitTermination( } @VisibleForTesting - public RemoteLogIndexCache getRemoteLogIndexCache() { - return remoteLogIndexCache; + public RemoteLogIndexCache getRemoteLogIndexCache(File dataDir) { + return remoteLogIndexCache(dataDir); } @VisibleForTesting @@ -407,4 +432,23 @@ public RemoteLogIndexCache getRemoteLogIndexCache() { TaskWithFuture getTaskWithFuture(TableBucket tableBucket) { return rlmTasks.get(tableBucket); } + + private @Nullable RemoteLogIndexCache remoteLogIndexCacheForBucket(TableBucket tableBucket) { + Optional logTabletOpt = logManager.getLog(tableBucket); + if (!logTabletOpt.isPresent()) { + LOG.error( + "Can't resolve remote log index cache for bucket {} because no local log exists.", + tableBucket); + return null; + } + return remoteLogIndexCache(localDiskManager.resolveDataDir(logTabletOpt.get().getLogDir())); + } + + private RemoteLogIndexCache remoteLogIndexCache(File dataDir) { + RemoteLogIndexCache remoteLogIndexCache = remoteLogIndexCachesByDir.get(dataDir); + if (remoteLogIndexCache == null) { + throw new IllegalArgumentException("Unknown local data directory: " + dataDir); + } + return remoteLogIndexCache; + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 7a48813237..67d2959ea7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -155,6 +155,7 @@ public final class Replica { private static final Logger LOG = LoggerFactory.getLogger(Replica.class); + private final File dataDir; private final PhysicalTablePath physicalPath; private final TableBucket tableBucket; @@ -217,6 +218,7 @@ public final class Replica { private MetricGroup lakeTieringMetricGroup; public Replica( + File dataDir, PhysicalTablePath physicalPath, TableBucket tableBucket, LogManager logManager, @@ -236,6 +238,7 @@ public Replica( Clock clock, RemoteLogManager remoteLogManager) throws Exception { + this.dataDir = dataDir; this.physicalPath = physicalPath; this.tableBucket = tableBucket; this.logManager = logManager; @@ -369,7 +372,11 @@ public int writerIdCount() { } public Path getTabletParentDir() { - return logManager.getTabletParentDir(physicalPath, tableBucket); + return logManager.getTabletParentDir(dataDir, physicalPath, tableBucket); + } + + public File getDataDir() { + return dataDir; } public @Nullable KvTablet getKvTablet() { @@ -730,7 +737,7 @@ private Optional initKvTablet() { physicalPath); CompletedSnapshot completedSnapshot = optCompletedSnapshot.get(); // always create a new dir for the kv tablet - File tabletDir = kvManager.createTabletDir(physicalPath, tableBucket); + File tabletDir = kvManager.createTabletDir(dataDir, physicalPath, tableBucket); // down the snapshot to target tablet dir downloadKvSnapshots(completedSnapshot, tabletDir.toPath()); @@ -752,6 +759,7 @@ private Optional initKvTablet() { // if it exists before init kv tablet kvTablet = kvManager.getOrCreateKv( + dataDir, physicalPath, tableBucket, logTablet, @@ -2039,6 +2047,7 @@ private LogTablet createLog( throws Exception { LogTablet log = logManager.getOrCreateLog( + dataDir, physicalPath, tableBucket, tableConfig.getLogFormat(), diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 3b847266eb..52ddcbc78f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -105,6 +105,7 @@ import org.apache.fluss.server.replica.delay.DelayedWrite; import org.apache.fluss.server.replica.fetcher.InitialFetchStatus; import org.apache.fluss.server.replica.fetcher.ReplicaFetcherManager; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.utils.FatalErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; @@ -160,7 +161,8 @@ public class ReplicaManager implements ServerReconfigurable { private final ZooKeeperClient zkClient; protected final int serverId; private final AtomicBoolean highWatermarkCheckPointThreadStarted = new AtomicBoolean(false); - private final OffsetCheckpointFile highWatermarkCheckpoint; + private final Map highWatermarkCheckpoints; + private final LocalDiskManager localDiskManager; @GuardedBy("replicaStateChangeLock") private final Map allReplicas = new ConcurrentHashMap<>(); @@ -224,7 +226,8 @@ public ReplicaManager( TabletServerMetricGroup serverMetricGroup, UserMetrics userMetrics, Clock clock, - ExecutorService ioExecutor) + ExecutorService ioExecutor, + LocalDiskManager localDiskManager) throws IOException { this( conf, @@ -240,9 +243,17 @@ public ReplicaManager( fatalErrorHandler, serverMetricGroup, userMetrics, - new RemoteLogManager(conf, zkClient, coordinatorGateway, clock, ioExecutor), + new RemoteLogManager( + conf, + zkClient, + coordinatorGateway, + localDiskManager, + logManager, + clock, + ioExecutor), clock, - ioExecutor); + ioExecutor, + localDiskManager); } @VisibleForTesting @@ -262,21 +273,25 @@ public ReplicaManager( UserMetrics userMetrics, RemoteLogManager remoteLogManager, Clock clock, - ExecutorService ioExecutor) + ExecutorService ioExecutor, + LocalDiskManager localDiskManager) throws IOException { this.conf = conf; this.zkClient = zkClient; this.scheduler = scheduler; + this.localDiskManager = localDiskManager; this.logManager = logManager; this.kvManager = kvManager; this.serverId = serverId; this.metadataCache = metadataCache; - this.highWatermarkCheckpoint = - new OffsetCheckpointFile( - new File( - logManager.getDataDir().getAbsolutePath(), - HIGH_WATERMARK_CHECKPOINT_FILE_NAME)); + this.highWatermarkCheckpoints = new HashMap<>(); + for (File dataDir : localDiskManager.dataDirs()) { + highWatermarkCheckpoints.put( + dataDir, + new OffsetCheckpointFile( + new File(dataDir, HIGH_WATERMARK_CHECKPOINT_FILE_NAME))); + } this.delayedWriteManager = new DelayedOperationManager<>( "delay write", @@ -1568,18 +1583,31 @@ private void startHighWatermarkCheckPointThread() { @VisibleForTesting void checkpointHighWatermarks() { List onlineReplicasList = getOnlineReplicaList(); - Map highWatermarks = new HashMap<>(); + if (onlineReplicasList.isEmpty()) { + return; + } + + Map> highWatermarksByDir = new HashMap<>(); for (Replica replica : onlineReplicasList) { LogTablet logTablet = replica.getLogTablet(); - highWatermarks.put(logTablet.getTableBucket(), logTablet.getHighWatermark()); + highWatermarksByDir + .computeIfAbsent(replica.getDataDir(), ignored -> new HashMap<>()) + .put(logTablet.getTableBucket(), logTablet.getHighWatermark()); } - if (!highWatermarks.isEmpty()) { - try { - highWatermarkCheckpoint.write(highWatermarks); - } catch (Exception e) { - throw new LogStorageException("Error while writing to high watermark file", e); + try { + for (Map.Entry> entry : highWatermarksByDir.entrySet()) { + if (!entry.getValue().isEmpty()) { + try { + highWatermarkCheckpoints.get(entry.getKey()).write(entry.getValue()); + } catch (Exception e) { + throw new LogStorageException( + "Error while writing to high watermark file", e); + } + } } + } catch (LogStorageException e) { + throw e; } } @@ -1841,6 +1869,8 @@ private StopReplicaResultForBucket stopReplica( serverMetricGroup.removeTableBucketMetricGroup( replicaToDelete.getPhysicalTablePath().getTablePath(), tb); replicaToDelete.delete(); + localDiskManager.recordReplicaDelete( + replicaToDelete.getDataDir(), replicaToDelete.isKvTable()); Path tabletParentDir = replicaToDelete.getTabletParentDir(); if (tb.getPartitionId() != null) { deletedPartitionIds.put(tb.getPartitionId(), tabletParentDir); @@ -1922,11 +1952,22 @@ protected Optional maybeCreateReplica(NotifyLeaderAndIsrData data) { TableInfo tableInfo = getTableInfo(zkClient, tablePath); boolean isKvTable = tableInfo.hasPrimaryKey(); + Optional dataDirOpt = + logManager + .getLog(tb) + .map( + logTablet -> + localDiskManager.resolveDataDir( + logTablet.getLogDir())); + File localDataDir = + dataDirOpt.orElseGet( + () -> localDiskManager.selectDataDirForNewBucket(isKvTable)); BucketMetricGroup bucketMetricGroup = serverMetricGroup.addTableBucketMetricGroup( physicalTablePath, tb, isKvTable); Replica replica = new Replica( + localDataDir, physicalTablePath, tb, logManager, @@ -1935,7 +1976,7 @@ protected Optional maybeCreateReplica(NotifyLeaderAndIsrData data) { this::getMinInSyncReplicas, serverId, new OffsetCheckpointFile.LazyOffsetCheckpoints( - highWatermarkCheckpoint), + highWatermarkCheckpoints.get(localDataDir)), delayedWriteManager, delayedFetchLogManager, adjustIsrManager, @@ -1946,6 +1987,9 @@ protected Optional maybeCreateReplica(NotifyLeaderAndIsrData data) { tableInfo, clock, remoteLogManager); + if (!dataDirOpt.isPresent()) { + localDiskManager.recordReplicaLoad(localDataDir, isKvTable); + } allReplicas.put(tb, new OnlineReplica(replica)); replicaOpt = Optional.of(replica); } else if (hostedReplica instanceof OnlineReplica) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java b/fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java new file mode 100644 index 0000000000..b4eab07868 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java @@ -0,0 +1,744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.storage; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.IllegalConfigurationException; +import org.apache.fluss.exception.LogStorageException; +import org.apache.fluss.utils.FileUtils; +import org.apache.fluss.utils.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +/** + * Manages the local data directories used by a TabletServer. + * + *

The manager has two responsibilities: + * + *

    + *
  1. During startup, validate configured data directories, recover or create {@code + * disk.properties}, and hold a {@code .lock} file for every live directory. + *
  2. After startup, provide runtime helpers for path resolution and simple per-disk load + * accounting used by replica placement. + *
+ * + *

Thread-safety: initialization happens before the manager is published to other components. + * After that, the mutable state is limited to in-memory bucket counters and shutdown cleanup. The + * public runtime APIs synchronize on the manager instance so directory lookups, counter updates, + * and {@link #close()} observe a consistent view. + */ +public final class LocalDiskManager implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(LocalDiskManager.class); + + public static final String DISK_PROPERTIES_FILE_NAME = "disk.properties"; + public static final String LOCK_FILE_NAME = ".lock"; + private static final int DISK_PROPERTIES_VERSION = 1; + private static final String VERSION_KEY = "version"; + private static final String DISK_ID_KEY = "disk.id"; + private static final String SERVER_ID_KEY = "server.id"; + + private final int serverId; + private final List dataDirs; + private final Map diskInfosByDir = new LinkedHashMap<>(); + private final Map directoryLocks = new LinkedHashMap<>(); + + // ------------------------------------------------------------------------ + // Initialization + // ------------------------------------------------------------------------ + + private LocalDiskManager(Configuration conf) throws IOException { + this.serverId = conf.getInt(ConfigOptions.TABLET_SERVER_ID); + List configuredDataDirs = resolveConfiguredDataDirs(conf); + List initializedDataDirs; + try { + initializedDataDirs = initializeDataDirs(configuredDataDirs); + } catch (Throwable throwable) { + close(); + throw throwable; + } + this.dataDirs = Collections.unmodifiableList(initializedDataDirs); + } + + public static LocalDiskManager create(Configuration conf) throws IOException { + LocalDiskManager localDiskManager = new LocalDiskManager(conf); + LOG.info( + "Initialized LocalDiskManager with local data directories {}.", + localDiskManager.dataDirs); + return localDiskManager; + } + + /** + * Resolves {@code data.dirs} or falls back to {@code data.dir}, then normalizes every path + * exactly once. + * + *

The method rejects: + * + *

    + *
  • empty values, for example {@code ["/data-0", " "]} + *
  • duplicate directories, for example {@code ["/data-0", "/data-0"]} + *
  • overlapping directories, for example {@code ["/data", "/data/table-1"]} + *
+ * + *

Overlapping directories are rejected because they do not describe independent storage + * roots and would make placement and cleanup ambiguous. + */ + private static List resolveConfiguredDataDirs(Configuration conf) { + List configuredDataDirs = conf.get(ConfigOptions.DATA_DIRS); + if (configuredDataDirs == null || configuredDataDirs.isEmpty()) { + configuredDataDirs = Collections.singletonList(conf.get(ConfigOptions.DATA_DIR)); + } + + List dataDirs = new ArrayList<>(configuredDataDirs.size()); + for (String configuredDataDir : configuredDataDirs) { + if (configuredDataDir == null || configuredDataDir.trim().isEmpty()) { + throw new IllegalConfigurationException( + "Configured local data directories must not contain empty values."); + } + File dataDir = + new File(configuredDataDir).toPath().toAbsolutePath().normalize().toFile(); + for (File existingDataDir : dataDirs) { + if (existingDataDir.equals(dataDir)) { + throw new IllegalConfigurationException( + "Duplicate local data directory configured: " + dataDir); + } + + Path existingPath = existingDataDir.toPath(); + Path candidatePath = dataDir.toPath(); + if (existingPath.startsWith(candidatePath) + || candidatePath.startsWith(existingPath)) { + throw new IllegalConfigurationException( + "Configured local data directories must not overlap: " + + existingPath + + " and " + + candidatePath); + } + } + dataDirs.add(dataDir); + } + return dataDirs; + } + + /** + * Initializes all configured data directories and returns the directories that remain online + * after validation. + * + *

The initialization pipeline keeps dropping bad directories from {@code onlineDataDirs}. A + * directory can be removed at any step if it fails metadata loading, local directory + * validation, locking, or checkpoint writing. The remaining directories become the runtime + * storage roots. + */ + private List initializeDataDirs(List configuredDataDirs) throws IOException { + Set onlineDataDirs = new LinkedHashSet<>(configuredDataDirs); + Set offlineDataDirs = new LinkedHashSet<>(); + + // 1. Create disk.properties checkpoints for all configured data directories. + Map diskPropertiesCheckpoints = + newDiskPropertiesCheckpoints(configuredDataDirs); + // 2. Load persisted disk metadata from disk.properties. + Map diskPropertiesMap = + loadExistingDiskProperties( + onlineDataDirs, offlineDataDirs, diskPropertiesCheckpoints); + // 3. Create and validate the configured data directories. + createAndValidateDataDirs(onlineDataDirs, offlineDataDirs); + // 4. Acquire .lock for all directories that are still online. + lockDataDirs(onlineDataDirs, offlineDataDirs); + // 5. Assign disk ids and write disk.properties for directories that do not have it yet. + initializeMissingDiskProperties( + onlineDataDirs, offlineDataDirs, diskPropertiesMap, diskPropertiesCheckpoints); + // 6. Fail fast if every configured directory has been dropped during initialization. + validateNonEmptyDataDirs(configuredDataDirs, onlineDataDirs); + // 7. Publish the runtime disk state used by placement and path resolution. + finishDiskInfos(onlineDataDirs, diskPropertiesMap); + + return new ArrayList<>(onlineDataDirs); + } + + private static Map newDiskPropertiesCheckpoints( + List configuredDataDirs) { + Map checkpoints = new LinkedHashMap<>(); + for (File dataDir : configuredDataDirs) { + checkpoints.put( + dataDir, + new DiskPropertiesCheckpoint(new File(dataDir, DISK_PROPERTIES_FILE_NAME))); + } + return checkpoints; + } + + private Map loadExistingDiskProperties( + Set onlineDataDirs, + Set offlineDataDirs, + Map diskPropertiesCheckpoints) { + Map diskPropertiesMap = new LinkedHashMap<>(); + Set persistedServerIdSet = new LinkedHashSet<>(); + Map diskIdToDataDir = new LinkedHashMap<>(); + + for (File dataDir : new ArrayList<>(onlineDataDirs)) { + try { + DiskProperties diskProperties = diskPropertiesCheckpoints.get(dataDir).read(); + if (diskProperties != null) { + int persistedServerId = diskProperties.serverIdAsInt(dataDir); + String persistedDiskId = diskProperties.diskId(dataDir); + diskPropertiesMap.put(dataDir, diskProperties); + persistedServerIdSet.add(persistedServerId); + + File previousDataDir = diskIdToDataDir.putIfAbsent(persistedDiskId, dataDir); + if (previousDataDir != null) { + throw new IllegalConfigurationException( + "Configured local data directories contain duplicate disk.id " + + persistedDiskId + + " for " + + previousDataDir + + " and " + + dataDir); + } + } + } catch (IOException e) { + markDataDirOffline( + dataDir, + onlineDataDirs, + offlineDataDirs, + "Failed to read disk.properties for data directory " + dataDir, + e); + } + } + + if (persistedServerIdSet.size() > 1) { + StringBuilder builder = new StringBuilder(); + for (Map.Entry entry : diskPropertiesMap.entrySet()) { + builder.append("- ") + .append(entry.getKey()) + .append(" -> ") + .append(entry.getValue()) + .append('\n'); + } + throw new IllegalConfigurationException( + "disk.properties is not consistent across data.dirs. This could happen if " + + "multiple tablet servers shared a data directory (data.dirs) or partial data was " + + "manually copied from another server. Found:\n" + + builder); + } else if (persistedServerIdSet.size() == 1 && !persistedServerIdSet.contains(serverId)) { + int persistedServerId = persistedServerIdSet.iterator().next(); + throw new IllegalConfigurationException( + "Configured tablet server id " + + serverId + + " does not match server.id " + + persistedServerId + + " in disk.properties. If you moved your data, make sure the " + + "configured tablet server id matches. If you intend to create a " + + "new tablet server, remove all data under your data directories " + + "(data.dirs)."); + } + + return diskPropertiesMap; + } + + private void createAndValidateDataDirs(Set onlineDataDirs, Set offlineDataDirs) { + for (File dataDir : new ArrayList<>(onlineDataDirs)) { + try { + if (!dataDir.exists()) { + LOG.info( + "Data directory {} not found, creating it.", dataDir.getAbsolutePath()); + if (!dataDir.mkdirs()) { + throw new IOException( + "Failed to create data directory " + dataDir.getAbsolutePath()); + } + Path parentPath = dataDir.toPath().toAbsolutePath().normalize().getParent(); + FileUtils.flushDir(parentPath); + } + if (!dataDir.isDirectory() || !dataDir.canRead()) { + throw new IOException( + dataDir.getAbsolutePath() + " is not a readable data directory."); + } + } catch (IOException e) { + markDataDirOffline( + dataDir, + onlineDataDirs, + offlineDataDirs, + "Failed to create or validate data directory " + dataDir, + e); + } + } + } + + private void lockDataDirs(Set onlineDataDirs, Set offlineDataDirs) { + for (File dataDir : new ArrayList<>(onlineDataDirs)) { + try { + DirectoryLock directoryLock = new DirectoryLock(new File(dataDir, LOCK_FILE_NAME)); + if (!directoryLock.tryLock()) { + closeDirectoryLock(directoryLock); + throw new IllegalConfigurationException( + "Failed to acquire lock on file " + + LOCK_FILE_NAME + + " in " + + directoryLock.file.getParent() + + ". A Fluss tablet server instance in another process or " + + "thread is using this directory."); + } + directoryLocks.put(dataDir, directoryLock); + } catch (IOException e) { + markDataDirOffline( + dataDir, + onlineDataDirs, + offlineDataDirs, + "Failed to acquire lock for data directory " + dataDir, + e); + } + } + } + + private void initializeMissingDiskProperties( + Set onlineDataDirs, + Set offlineDataDirs, + Map diskPropertiesMap, + Map diskPropertiesCheckpoints) { + + // 1. Find directories that are live but do not have persisted local metadata yet. + Set dataDirsWithoutDiskProperties = new LinkedHashSet<>(); + for (File dataDir : onlineDataDirs) { + if (!diskPropertiesMap.containsKey(dataDir)) { + dataDirsWithoutDiskProperties.add(dataDir); + } + } + // 2. Collect disk ids that are already persisted so we do not assign duplicates locally. + Set usedDiskIds = new LinkedHashSet<>(); + for (DiskProperties diskProperties : diskPropertiesMap.values()) { + usedDiskIds.add(diskProperties.diskId); + } + // 3. Create the missing metadata in memory first. + for (File dataDir : dataDirsWithoutDiskProperties) { + diskPropertiesMap.put( + dataDir, + new DiskProperties( + DISK_PROPERTIES_VERSION, + newDiskId(usedDiskIds), + String.valueOf(serverId))); + } + // 4. Persist the missing metadata. A write failure makes the directory unusable for this + // startup, so the directory is unlocked and moved to offlineDataDirs. + for (File dataDir : new ArrayList<>(dataDirsWithoutDiskProperties)) { + DiskProperties diskProperties = diskPropertiesMap.get(dataDir); + try { + diskPropertiesCheckpoints.get(dataDir).write(diskProperties); + } catch (IOException e) { + closeDirectoryLock(dataDir); + diskPropertiesMap.remove(dataDir); + markDataDirOffline( + dataDir, + onlineDataDirs, + offlineDataDirs, + "Failed to checkpoint disk.properties for data directory " + dataDir, + e); + } + } + } + + private void validateNonEmptyDataDirs(List configuredDataDirs, Set onlineDataDirs) { + if (onlineDataDirs.isEmpty()) { + throw new LogStorageException( + "None of the specified data dirs from " + + configuredDataDirs + + " can be created, locked or checkpointed"); + } + } + + private void finishDiskInfos( + Set onlineDataDirs, Map diskPropertiesMap) { + for (File dataDir : onlineDataDirs) { + DiskProperties diskProperties = diskPropertiesMap.get(dataDir); + diskInfosByDir.put(dataDir, new DiskInfo(dataDir, diskProperties.diskId)); + } + } + + // ------------------------------------------------------------------------ + // Public APIs + // ------------------------------------------------------------------------ + + public List dataDirs() { + return dataDirs; + } + + /** + * Resolves a local path back to the configured data directory that owns it. + * + *

Example: if {@code /data-0} is configured and the path is {@code /data-0/db/tablet-1}, the + * returned directory is {@code /data-0}. + */ + public synchronized File resolveDataDir(File path) { + Path pathToResolve = path.toPath(); + for (File dataDir : dataDirs) { + if (pathToResolve.startsWith(dataDir.toPath())) { + return dataDir; + } + } + + throw new IllegalArgumentException( + "Path " + path + " does not belong to any configured data directory."); + } + + /** + * Selects a target directory for a new bucket replica. + * + *

Log-table buckets use {@code logBucketCount}. Primary-key table buckets use {@code + * kvBucketCount} so KV-heavy workloads are balanced independently from pure log workloads. + */ + public synchronized File selectDataDirForNewBucket(boolean hasPrimaryKey) { + DiskInfo selectedDisk = null; + Comparator comparator = + hasPrimaryKey + ? Comparator.comparingInt((DiskInfo diskInfo) -> diskInfo.kvBucketCount) + : Comparator.comparingInt((DiskInfo diskInfo) -> diskInfo.logBucketCount); + for (DiskInfo candidate : diskInfosByDir.values()) { + if (selectedDisk == null || comparator.compare(candidate, selectedDisk) < 0) { + selectedDisk = candidate; + } + } + if (selectedDisk == null) { + throw new IllegalStateException("No configured data directory is available."); + } + return selectedDisk.dataDir; + } + + public synchronized void recordReplicaLoad(File dataDir, boolean hasPrimaryKey) { + DiskInfo diskInfo = diskInfo(dataDir); + diskInfo.logBucketCount++; + if (hasPrimaryKey) { + diskInfo.kvBucketCount++; + } + } + + public synchronized void recordReplicaDelete(File dataDir, boolean hasPrimaryKey) { + DiskInfo diskInfo = diskInfo(dataDir); + diskInfo.logBucketCount = Math.max(0, diskInfo.logBucketCount - 1); + if (hasPrimaryKey) { + diskInfo.kvBucketCount = Math.max(0, diskInfo.kvBucketCount - 1); + } + } + + @VisibleForTesting + synchronized String diskId(File dataDir) { + return diskInfo(dataDir).diskId; + } + + @VisibleForTesting + synchronized int logBucketCount(File dataDir) { + return diskInfo(dataDir).logBucketCount; + } + + @VisibleForTesting + synchronized int kvBucketCount(File dataDir) { + return diskInfo(dataDir).kvBucketCount; + } + + @Override + public synchronized void close() { + for (DirectoryLock directoryLock : directoryLocks.values()) { + closeDirectoryLock(directoryLock); + } + directoryLocks.clear(); + diskInfosByDir.clear(); + } + + // ------------------------------------------------------------------------ + // Internal helpers + // ------------------------------------------------------------------------ + + private void markDataDirOffline( + File dataDir, + Set onlineDataDirs, + Set offlineDataDirs, + String message, + Exception exception) { + onlineDataDirs.remove(dataDir); + offlineDataDirs.add(dataDir); + if (exception == null) { + LOG.error(message); + } else { + LOG.error(message, exception); + } + } + + private static String newDiskId(Set usedDiskIds) { + String diskId; + do { + diskId = UUID.randomUUID().toString(); + } while (!usedDiskIds.add(diskId)); + return diskId; + } + + private DiskInfo diskInfo(File dataDir) { + DiskInfo diskInfo = diskInfosByDir.get(dataDir); + if (diskInfo == null) { + throw new IllegalArgumentException("Unknown data directory: " + dataDir); + } + return diskInfo; + } + + private void closeDirectoryLock(File dataDir) { + DirectoryLock directoryLock = directoryLocks.remove(dataDir); + if (directoryLock != null) { + closeDirectoryLock(directoryLock); + } + } + + private void closeDirectoryLock(DirectoryLock directoryLock) { + try { + directoryLock.destroy(); + } catch (IOException e) { + LOG.warn( + "Failed to close directory lock for {}", directoryLock.file.getParentFile(), e); + } + } + + // ------------------------------------------------------------------------ + // Core data objects + // ------------------------------------------------------------------------ + + /** Parsed contents of a {@code disk.properties} file. */ + private static final class DiskProperties { + private final int version; + private final String diskId; + private final String serverId; + + private DiskProperties(int version, String diskId, String serverId) { + this.version = version; + this.diskId = diskId == null ? null : diskId.trim(); + this.serverId = serverId == null ? null : serverId.trim(); + } + + private int serverIdAsInt(File dataDir) throws IOException { + if (serverId == null || serverId.trim().isEmpty()) { + throw new IOException( + "Missing server.id in disk.properties under " + dataDir.getAbsolutePath()); + } + + try { + return Integer.parseInt(serverId.trim()); + } catch (NumberFormatException e) { + throw new IOException( + "Invalid server.id in disk.properties under " + + dataDir.getAbsolutePath() + + ": " + + serverId, + e); + } + } + + private String diskId(File dataDir) throws IOException { + if (diskId == null || diskId.trim().isEmpty()) { + throw new IOException( + "Missing disk.id in disk.properties under " + dataDir.getAbsolutePath()); + } + return diskId.trim(); + } + + @Override + public String toString() { + return "DiskProperties(version=" + + version + + ", diskId=" + + diskId + + ", serverId=" + + serverId + + ")"; + } + } + + /** Read/write helper for a single {@code disk.properties} file. */ + private static final class DiskPropertiesCheckpoint { + private final File file; + private final Object lock = new Object(); + + private DiskPropertiesCheckpoint(File file) { + this.file = file; + } + + private void write(DiskProperties diskProperties) throws IOException { + synchronized (lock) { + try { + File tempFile = new File(file.getAbsolutePath() + ".tmp"); + FileOutputStream outputStream = new FileOutputStream(tempFile); + try { + Properties properties = new Properties(); + properties.setProperty(VERSION_KEY, String.valueOf(diskProperties.version)); + properties.setProperty(DISK_ID_KEY, diskProperties.diskId); + properties.setProperty(SERVER_ID_KEY, diskProperties.serverId); + properties.store(outputStream, ""); + outputStream.flush(); + outputStream.getFD().sync(); + } finally { + IOUtils.closeQuietly(outputStream, tempFile.getName()); + } + FileUtils.atomicMoveWithFallback(tempFile.toPath(), file.toPath(), false); + } catch (IOException e) { + LOG.error("Failed to write disk.properties due to", e); + throw e; + } + } + } + + private DiskProperties read() throws IOException { + Files.deleteIfExists(new File(file.getPath() + ".tmp").toPath()); + + synchronized (lock) { + InputStream inputStream = null; + try { + inputStream = Files.newInputStream(file.toPath()); + Properties properties = new Properties(); + properties.load(inputStream); + + String version = properties.getProperty(VERSION_KEY); + if (version == null) { + throw new IOException( + "Unrecognized version of disk.properties file under " + + file.getAbsolutePath() + + ": null"); + } + + int parsedVersion; + try { + parsedVersion = Integer.parseInt(version); + } catch (NumberFormatException e) { + throw new IOException( + "Unrecognized version of disk.properties file under " + + file.getAbsolutePath() + + ": " + + version, + e); + } + + if (parsedVersion != DISK_PROPERTIES_VERSION) { + throw new IOException( + "Unrecognized version of disk.properties file under " + + file.getAbsolutePath() + + ": " + + version); + } + + String diskId = properties.getProperty(DISK_ID_KEY); + String serverId = properties.getProperty(SERVER_ID_KEY); + return new DiskProperties(parsedVersion, diskId, serverId); + } catch (NoSuchFileException e) { + LOG.warn("No disk.properties file under dir {}", file.getAbsolutePath()); + return null; + } catch (Exception e) { + LOG.error( + "Failed to read disk.properties file under dir {} due to {}", + file.getAbsolutePath(), + e.getMessage()); + if (e instanceof IOException) { + throw (IOException) e; + } + throw new IOException( + "Failed to read disk.properties file under dir " + + file.getAbsolutePath(), + e); + } finally { + IOUtils.closeQuietly(inputStream, file.getName()); + } + } + } + } + + /** Holds the {@code .lock} file for one live data directory. */ + private static final class DirectoryLock { + private final File file; + private final FileChannel channel; + private FileLock flock; + + private DirectoryLock(File file) throws IOException { + this( + file, + FileChannel.open( + file.toPath(), + StandardOpenOption.CREATE, + StandardOpenOption.READ, + StandardOpenOption.WRITE), + null); + } + + private DirectoryLock(File file, FileChannel channel, FileLock flock) { + this.file = file; + this.channel = channel; + this.flock = flock; + } + + private synchronized void lock() throws IOException { + flock = channel.lock(); + } + + private synchronized boolean tryLock() throws IOException { + try { + flock = channel.tryLock(); + return flock != null; + } catch (OverlappingFileLockException e) { + return false; + } + } + + private synchronized void unlock() throws IOException { + if (flock != null) { + flock.release(); + } + } + + private synchronized void destroy() throws IOException { + unlock(); + channel.close(); + } + } + + /** Runtime state for one online data directory. */ + private static final class DiskInfo { + private final File dataDir; + private final String diskId; + private int logBucketCount; + private int kvBucketCount; + + private DiskInfo(File dataDir, String diskId) { + this.dataDir = dataDir; + this.diskId = diskId; + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 988f7565ac..40a543fb25 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -48,6 +48,7 @@ import org.apache.fluss.server.metrics.UserMetrics; import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.replica.ReplicaManager; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperUtils; import org.apache.fluss.server.zk.data.TabletServerRegistration; @@ -142,6 +143,9 @@ public class TabletServer extends ServerBase { @GuardedBy("lock") private KvManager kvManager; + @GuardedBy("lock") + private LocalDiskManager localDiskManager; + @GuardedBy("lock") private ReplicaManager replicaManager; @@ -223,11 +227,20 @@ protected void startServices() throws Exception { this.metadataCache = new TabletServerMetadataCache(metadataManager); + this.localDiskManager = LocalDiskManager.create(conf); this.logManager = - LogManager.create(conf, zkClient, scheduler, clock, tabletServerMetricGroup); + LogManager.create( + conf, + zkClient, + scheduler, + clock, + tabletServerMetricGroup, + localDiskManager); logManager.startup(); - this.kvManager = KvManager.create(conf, zkClient, logManager, tabletServerMetricGroup); + this.kvManager = + KvManager.create( + conf, zkClient, logManager, tabletServerMetricGroup, localDiskManager); kvManager.startup(); // Register kvManager to dynamicConfigManager for dynamic reconfiguration @@ -271,7 +284,8 @@ protected void startServices() throws Exception { tabletServerMetricGroup, userMetrics, clock, - ioExecutor); + ioExecutor, + localDiskManager); replicaManager.startup(); // Register DefaultSnapshotContext for dynamic kv.snapshot.interval @@ -449,6 +463,10 @@ CompletableFuture stopServices() { replicaManager.shutdown(); } + if (localDiskManager != null) { + localDiskManager.close(); + } + if (authorizer != null) { authorizer.close(); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java index cecff0f8ff..51a979923c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java @@ -36,6 +36,7 @@ import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; @@ -91,6 +92,7 @@ final class KvManagerTest { private TableBucket tableBucket1; private TableBucket tableBucket2; + private LocalDiskManager localDiskManager; private LogManager logManager; private KvManager kvManager; private Configuration conf; @@ -107,34 +109,43 @@ static void baseBeforeAll() { void setup() throws Exception { conf = new Configuration(); conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath()); + conf.set(ConfigOptions.TABLET_SERVER_ID, 1); String dbName = "db1"; tablePath1 = TablePath.of(dbName, "t1"); tablePath2 = TablePath.of(dbName, "t2"); // we need a log manager for kv manager - + localDiskManager = LocalDiskManager.create(conf); logManager = LogManager.create( conf, zkClient, new FlussScheduler(1), SystemClock.getInstance(), - TestingMetricGroups.TABLET_SERVER_METRICS); + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); kvManager = KvManager.create( - conf, zkClient, logManager, TestingMetricGroups.TABLET_SERVER_METRICS); + conf, + zkClient, + logManager, + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); kvManager.startup(); } @AfterEach - void tearDown() { + void tearDown() throws Exception { if (kvManager != null) { kvManager.shutdown(); } if (logManager != null) { logManager.shutdown(); } + if (localDiskManager != null) { + localDiskManager.close(); + } } static List partitionProvider() { @@ -185,7 +196,11 @@ void testRecoveryAfterKvManagerShutDown(String partitionName) throws Exception { kvManager.shutdown(); kvManager = KvManager.create( - conf, zkClient, logManager, TestingMetricGroups.TABLET_SERVER_METRICS); + conf, + zkClient, + logManager, + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); kvManager.startup(); kv1 = getOrCreateKv(tablePath1, partitionName, tableBucket1); kv2 = getOrCreateKv(tablePath2, partitionName, tableBucket2); @@ -230,7 +245,11 @@ void testRecoveryWithSchemaChange(String partitionName) throws Exception { testingSchemaGetter.updateLatestSchemaInfo(new SchemaInfo(DATA2_SCHEMA, newSchemaId)); kvManager = KvManager.create( - conf, zkClient, logManager, TestingMetricGroups.TABLET_SERVER_METRICS); + conf, + zkClient, + logManager, + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); kvManager.startup(); // insert again after restart. @@ -351,8 +370,10 @@ private KvTablet getOrCreateKv( PhysicalTablePath.of( tablePath.getDatabaseName(), tablePath.getTableName(), partitionName); LogTablet logTablet = - logManager.getOrCreateLog(physicalTablePath, tableBucket, LogFormat.ARROW, 1, true); + logManager.getOrCreateLog( + tempDir, physicalTablePath, tableBucket, LogFormat.ARROW, 1, true); return kvManager.getOrCreateKv( + tempDir, physicalTablePath, tableBucket, logTablet, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java index 021e9d1b04..ba0ccad737 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java @@ -32,6 +32,7 @@ import org.apache.fluss.server.kv.KvManager; import org.apache.fluss.server.kv.KvTablet; import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; @@ -70,6 +71,7 @@ final class DroppedTableRecoveryTest extends LogTestBase { private @TempDir File tempDir; private TablePath tablePath; private TableBucket tableBucket; + private LocalDiskManager localDiskManager; private LogManager logManager; private KvManager kvManager; @@ -85,24 +87,31 @@ static void baseBeforeAll() { public void setup() throws Exception { super.before(); conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath()); + conf.set(ConfigOptions.TABLET_SERVER_ID, 1); String dbName = "test_db"; tablePath = TablePath.of(dbName, "dropped_table"); tableBucket = new TableBucket(DATA1_TABLE_ID, 1); registerTableInZkClient(); + localDiskManager = LocalDiskManager.create(conf); logManager = LogManager.create( conf, zkClient, new FlussScheduler(1), SystemClock.getInstance(), - TestingMetricGroups.TABLET_SERVER_METRICS); + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); logManager.startup(); kvManager = KvManager.create( - conf, zkClient, logManager, TestingMetricGroups.TABLET_SERVER_METRICS); + conf, + zkClient, + logManager, + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); kvManager.startup(); } @@ -116,13 +125,16 @@ private void registerTableInZkClient() throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws Exception { if (kvManager != null) { kvManager.shutdown(); } if (logManager != null) { logManager.shutdown(); } + if (localDiskManager != null) { + localDiskManager.close(); + } } @Test @@ -133,10 +145,20 @@ void testMultipleLogTabletResidualDataDirectoriesCleanup() throws Exception { LogTablet log1 = logManager.getOrCreateLog( - PhysicalTablePath.of(tablePath), tableBucket1, LogFormat.ARROW, 1, false); + tempDir, + PhysicalTablePath.of(tablePath), + tableBucket1, + LogFormat.ARROW, + 1, + false); LogTablet log2 = logManager.getOrCreateLog( - PhysicalTablePath.of(tablePath), tableBucket2, LogFormat.ARROW, 1, false); + tempDir, + PhysicalTablePath.of(tablePath), + tableBucket2, + LogFormat.ARROW, + 1, + false); // Write some data to both logs MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1); @@ -151,18 +173,21 @@ void testMultipleLogTabletResidualDataDirectoriesCleanup() throws Exception { // Shutdown log manager first logManager.shutdown(); + localDiskManager.close(); // Simulate table drop: remove metadata from ZooKeeper zkClient.deleteTable(tablePath); // Start LogManager again + localDiskManager = LocalDiskManager.create(conf); LogManager newLogManager = LogManager.create( conf, zkClient, new FlussScheduler(1), SystemClock.getInstance(), - TestingMetricGroups.TABLET_SERVER_METRICS); + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); newLogManager.startup(); // Verify that both residual data directories were cleaned up @@ -181,7 +206,12 @@ void testLogTabletResidualDataCleanupWithPartitionedTable() throws Exception { LogTablet log = logManager.getOrCreateLog( - partitionedTablePath, partitionedTableBucket, LogFormat.ARROW, 1, false); + tempDir, + partitionedTablePath, + partitionedTableBucket, + LogFormat.ARROW, + 1, + false); // Write some data to the log MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1); @@ -192,18 +222,21 @@ void testLogTabletResidualDataCleanupWithPartitionedTable() throws Exception { // Shutdown log manager first logManager.shutdown(); + localDiskManager.close(); // Simulate table drop: remove metadata from ZooKeeper zkClient.deleteTable(tablePath); // Start LogManager again + localDiskManager = LocalDiskManager.create(conf); LogManager newLogManager = LogManager.create( conf, zkClient, new FlussScheduler(1), SystemClock.getInstance(), - TestingMetricGroups.TABLET_SERVER_METRICS); + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); newLogManager.startup(); // Verify that the residual data directory was cleaned up @@ -220,10 +253,20 @@ void testMultipleKvTabletResidualDataDirectoriesCleanup() throws Exception { LogTablet log1 = logManager.getOrCreateLog( - PhysicalTablePath.of(tablePath), tableBucket1, LogFormat.ARROW, 1, false); + tempDir, + PhysicalTablePath.of(tablePath), + tableBucket1, + LogFormat.ARROW, + 1, + false); LogTablet log2 = logManager.getOrCreateLog( - PhysicalTablePath.of(tablePath), tableBucket2, LogFormat.ARROW, 1, false); + tempDir, + PhysicalTablePath.of(tablePath), + tableBucket2, + LogFormat.ARROW, + 1, + false); // Write some data to both logs MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1); @@ -237,6 +280,7 @@ void testMultipleKvTabletResidualDataDirectoriesCleanup() throws Exception { new TableConfig(Configuration.fromMap(DATA1_TABLE_DESCRIPTOR.getProperties())); KvTablet kvTablet1 = kvManager.getOrCreateKv( + tempDir, PhysicalTablePath.of(tablePath), tableBucket1, log1, @@ -246,6 +290,7 @@ void testMultipleKvTabletResidualDataDirectoriesCleanup() throws Exception { DEFAULT_COMPRESSION); KvTablet kvTablet2 = kvManager.getOrCreateKv( + tempDir, PhysicalTablePath.of(tablePath), tableBucket2, log2, @@ -263,23 +308,30 @@ void testMultipleKvTabletResidualDataDirectoriesCleanup() throws Exception { // Shutdown managers first kvManager.shutdown(); logManager.shutdown(); + localDiskManager.close(); // Simulate table drop: remove metadata from ZooKeeper zkClient.deleteTable(tablePath); // Start managers again + localDiskManager = LocalDiskManager.create(conf); LogManager newLogManager = LogManager.create( conf, zkClient, new FlussScheduler(1), SystemClock.getInstance(), - TestingMetricGroups.TABLET_SERVER_METRICS); + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); newLogManager.startup(); // Should clean up log directories KvManager newKvManager = KvManager.create( - conf, zkClient, newLogManager, TestingMetricGroups.TABLET_SERVER_METRICS); + conf, + zkClient, + newLogManager, + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); newKvManager.startup(); // KV tablet directories should be cleaned up by LogManager automatically @@ -303,7 +355,12 @@ void testKvTabletResidualDataCleanupWithPartitionedTable() throws Exception { LogTablet log = logManager.getOrCreateLog( - partitionedTablePath, partitionedTableBucket, LogFormat.ARROW, 1, false); + tempDir, + partitionedTablePath, + partitionedTableBucket, + LogFormat.ARROW, + 1, + false); // Write some data to the log MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1); @@ -315,6 +372,7 @@ void testKvTabletResidualDataCleanupWithPartitionedTable() throws Exception { new TableConfig(Configuration.fromMap(DATA1_TABLE_DESCRIPTOR.getProperties())); KvTablet kvTablet = kvManager.getOrCreateKv( + tempDir, partitionedTablePath, partitionedTableBucket, log, @@ -329,23 +387,30 @@ void testKvTabletResidualDataCleanupWithPartitionedTable() throws Exception { // Shutdown managers first kvManager.shutdown(); logManager.shutdown(); + localDiskManager.close(); // Simulate table drop: remove metadata from ZooKeeper zkClient.deleteTable(tablePath); // Start managers again + localDiskManager = LocalDiskManager.create(conf); LogManager newLogManager = LogManager.create( conf, zkClient, new FlussScheduler(1), SystemClock.getInstance(), - TestingMetricGroups.TABLET_SERVER_METRICS); + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); newLogManager.startup(); KvManager newKvManager = KvManager.create( - conf, zkClient, newLogManager, TestingMetricGroups.TABLET_SERVER_METRICS); + conf, + zkClient, + newLogManager, + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); newKvManager.startup(); // KV tablet directory should be cleaned up by LogManager automatically diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java index 981685a3d1..a70e32e3b9 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java @@ -26,6 +26,7 @@ import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile; import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; @@ -37,7 +38,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -68,6 +71,9 @@ /** Test for {@link LogManager}. */ final class LogManagerTest extends LogTestBase { + + private static final String JBOD_MULTI_DIR_TAG = "jbod-multidir"; + @RegisterExtension public static final AllCallbackWrapper ZOO_KEEPER_EXTENSION_WRAPPER = new AllCallbackWrapper<>(new ZooKeeperExtension()); @@ -78,6 +84,7 @@ final class LogManagerTest extends LogTestBase { private TablePath tablePath2; private TableBucket tableBucket1; private TableBucket tableBucket2; + private LocalDiskManager localDiskManager; private LogManager logManager; // TODO add more tests refer to kafka's LogManagerTest. @@ -91,23 +98,34 @@ static void baseBeforeAll() { } @BeforeEach - public void setup() throws Exception { + public void setup(TestInfo testInfo) throws Exception { super.before(); - conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath()); + if (testInfo.getTags().contains(JBOD_MULTI_DIR_TAG)) { + conf.set( + ConfigOptions.DATA_DIRS, + Arrays.asList( + new File(tempDir, "data-1").getAbsolutePath(), + new File(tempDir, "data-2").getAbsolutePath())); + } else { + conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath()); + } conf.setString(ConfigOptions.COORDINATOR_HOST, "localhost"); + conf.set(ConfigOptions.TABLET_SERVER_ID, 1); String dbName = "db1"; tablePath1 = TablePath.of(dbName, "t1"); tablePath2 = TablePath.of(dbName, "t2"); registerTableInZkClient(); + localDiskManager = LocalDiskManager.create(conf); logManager = LogManager.create( conf, zkClient, new FlussScheduler(1), SystemClock.getInstance(), - TestingMetricGroups.TABLET_SERVER_METRICS); + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); logManager.startup(); } @@ -176,7 +194,7 @@ void testCheckpointRecoveryPoints(String partitionName) throws Exception { } log2.flush(false); - logManager.checkpointRecoveryOffsets(); + logManager.checkpointRecoveryOffsets(tempDir); Map checkpoints = new OffsetCheckpointFile( new File(tempDir, LogManager.RECOVERY_POINT_CHECKPOINT_FILE)) @@ -210,7 +228,8 @@ void testRecoveryAfterLogManagerShutdown() throws Exception { zkClient, new FlussScheduler(1), SystemClock.getInstance(), - TestingMetricGroups.TABLET_SERVER_METRICS); + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); newLogManager.startup(); logManager = newLogManager; log1 = getOrCreateLog(tablePath1, null, tableBucket1); @@ -255,7 +274,8 @@ void testHasCleanShutdownMarkerAfterLogManagerShutdown(String partitionName) thr zkClient, new FlussScheduler(1), SystemClock.getInstance(), - TestingMetricGroups.TABLET_SERVER_METRICS); + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); assertThat(new File(dataDir, CLEAN_SHUTDOWN_FILE).exists()).isTrue(); newLogManager.startup(); logManager = newLogManager; @@ -300,9 +320,88 @@ void testDeleteLog(String partitionName) throws Exception { assertThat(logManager.getLog(log1.getTableBucket()).isPresent()).isTrue(); } + @Test + @Tag(JBOD_MULTI_DIR_TAG) + void testCheckpointRecoveryPointsAreWrittenPerDirectory() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + initTableBuckets(null); + + LogTablet log1 = createLog(tablePath1, tableBucket1, dataDir1); + LogTablet log2 = createLog(tablePath2, tableBucket2, dataDir2); + + MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1); + log1.appendAsLeader(records); + log2.appendAsLeader(records); + log1.flush(false); + log2.flush(false); + + logManager.checkpointRecoveryOffsets(dataDir1); + logManager.checkpointRecoveryOffsets(dataDir2); + + Map dir1Checkpoints = + new OffsetCheckpointFile( + new File(dataDir1, LogManager.RECOVERY_POINT_CHECKPOINT_FILE)) + .read(); + Map dir2Checkpoints = + new OffsetCheckpointFile( + new File(dataDir2, LogManager.RECOVERY_POINT_CHECKPOINT_FILE)) + .read(); + + assertThat(dir1Checkpoints).containsOnlyKeys(tableBucket1); + assertThat(dir1Checkpoints.get(tableBucket1)).isEqualTo(log1.getRecoveryPoint()); + assertThat(dir2Checkpoints).containsOnlyKeys(tableBucket2); + assertThat(dir2Checkpoints.get(tableBucket2)).isEqualTo(log2.getRecoveryPoint()); + } + + @Test + @Tag(JBOD_MULTI_DIR_TAG) + void testPerDirectoryCleanShutdownAndRecovery() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + initTableBuckets(null); + + createLog(tablePath1, tableBucket1, dataDir1) + .appendAsLeader(genMemoryLogRecordsByObject(DATA1)); + createLog(tablePath2, tableBucket2, dataDir2) + .appendAsLeader(genMemoryLogRecordsByObject(DATA1)); + + logManager.shutdown(); + logManager = null; + localDiskManager.close(); + + assertThat(new File(dataDir1, LogManager.CLEAN_SHUTDOWN_FILE)).exists(); + assertThat(new File(dataDir2, LogManager.CLEAN_SHUTDOWN_FILE)).exists(); + + localDiskManager = LocalDiskManager.create(conf); + logManager = + LogManager.create( + conf, + zkClient, + new FlussScheduler(1), + SystemClock.getInstance(), + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); + logManager.startup(); + + assertThat(new File(dataDir1, LogManager.CLEAN_SHUTDOWN_FILE)).doesNotExist(); + assertThat(new File(dataDir2, LogManager.CLEAN_SHUTDOWN_FILE)).doesNotExist(); + assertThat(logManager.getLog(tableBucket1)).isPresent(); + assertThat( + localDiskManager.resolveDataDir( + logManager.getLog(tableBucket1).get().getLogDir())) + .isEqualTo(dataDir1.getAbsoluteFile()); + assertThat(logManager.getLog(tableBucket2)).isPresent(); + assertThat( + localDiskManager.resolveDataDir( + logManager.getLog(tableBucket2).get().getLogDir())) + .isEqualTo(dataDir2.getAbsoluteFile()); + } + private LogTablet getOrCreateLog( TablePath tablePath, String partitionName, TableBucket tableBucket) throws Exception { return logManager.getOrCreateLog( + tempDir, PhysicalTablePath.of( tablePath.getDatabaseName(), tablePath.getTableName(), partitionName), tableBucket, @@ -311,6 +410,12 @@ private LogTablet getOrCreateLog( false); } + private LogTablet createLog(TablePath tablePath, TableBucket tableBucket, File dataDir) + throws Exception { + return logManager.getOrCreateLog( + dataDir, PhysicalTablePath.of(tablePath), tableBucket, LogFormat.ARROW, 1, false); + } + private void initTableBuckets(@Nullable String partitionName) { if (partitionName == null) { tableBucket1 = new TableBucket(DATA1_TABLE_ID, 1); @@ -326,9 +431,12 @@ private FetchDataInfo readLog(LogTablet log) throws Exception { } @AfterEach - public void tearDown() { + public void tearDown() throws Exception { if (logManager != null) { logManager.shutdown(); } + if (localDiskManager != null) { + localDiskManager.close(); + } } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogIndexCacheTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogIndexCacheTest.java index 841e58a13a..d6788c4958 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogIndexCacheTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogIndexCacheTest.java @@ -49,7 +49,7 @@ class RemoteLogIndexCacheTest extends RemoteLogTestBase { @BeforeEach public void setup() throws Exception { super.setup(); - rlIndexCache = remoteLogManager.getRemoteLogIndexCache(); + rlIndexCache = remoteLogManager.getRemoteLogIndexCache(localDiskManager.dataDirs().get(0)); } @ParameterizedTest diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java index 2ac4e20967..eac1e22efa 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java @@ -32,14 +32,19 @@ import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.replica.ReplicaManager; +import org.apache.fluss.server.replica.ReplicaTestBase; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import java.io.File; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -711,4 +716,49 @@ private static Stream stopArgs() { Arguments.of(true, false), Arguments.of(true, true)); } + + // ---- JBOD multi-directory tests ---- + + @Test + @Tag(ReplicaTestBase.JBOD_MULTI_DIR_TAG) + void testRemoteIndexCacheFollowsReplicaDirectory() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + makeLogTableAsLeader(0); + makeLogTableAsLeader(1); + + TableBucket tableBucket1 = new TableBucket(DATA1_TABLE_ID, 0); + TableBucket tableBucket2 = new TableBucket(DATA1_TABLE_ID, 1); + Replica replica1 = replicaManager.getReplicaOrException(tableBucket1); + Replica replica2 = replicaManager.getReplicaOrException(tableBucket2); + assertThat(replica1.getDataDir()).isEqualTo(dataDir1.getAbsoluteFile()); + assertThat(replica2.getDataDir()).isEqualTo(dataDir2.getAbsoluteFile()); + + addMultiSegmentsToLogTablet(replica1.getLogTablet(), 5); + addMultiSegmentsToLogTablet(replica2.getLogTablet(), 5); + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + + RemoteLogSegment segment1 = + remoteLogManager.relevantRemoteLogSegments(tableBucket1, 0L).get(0); + RemoteLogSegment segment2 = + remoteLogManager.relevantRemoteLogSegments(tableBucket2, 0L).get(0); + remoteLogManager.lookupPositionForOffset(segment1, 2L); + remoteLogManager.lookupPositionForOffset(segment2, 2L); + + List dir1Files = + Arrays.stream(remoteLogManager.getRemoteLogIndexCache(dataDir1).cacheDir().list()) + .collect(Collectors.toList()); + List dir2Files = + Arrays.stream(remoteLogManager.getRemoteLogIndexCache(dataDir2).cacheDir().list()) + .collect(Collectors.toList()); + + assertThat(dir1Files) + .anyMatch(name -> name.contains(segment1.remoteLogSegmentId().toString())); + assertThat(dir1Files) + .noneMatch(name -> name.contains(segment2.remoteLogSegmentId().toString())); + assertThat(dir2Files) + .anyMatch(name -> name.contains(segment2.remoteLogSegmentId().toString())); + assertThat(dir2Files) + .noneMatch(name -> name.contains(segment1.remoteLogSegmentId().toString())); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index 0d9f5bc565..bdbb6fa195 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -66,6 +66,7 @@ import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.log.FetchParams; import org.apache.fluss.server.log.ListOffsetsParam; +import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile; import org.apache.fluss.server.metadata.BucketMetadata; import org.apache.fluss.server.metadata.ClusterMetadata; import org.apache.fluss.server.metadata.PartitionMetadata; @@ -81,12 +82,14 @@ import org.apache.fluss.utils.CloseableIterator; import org.apache.fluss.utils.types.Tuple2; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -129,6 +132,7 @@ import static org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH; import static org.apache.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_ID; import static org.apache.fluss.server.metadata.TableMetadata.DELETED_TABLE_ID; +import static org.apache.fluss.server.replica.ReplicaManager.HIGH_WATERMARK_CHECKPOINT_FILE_NAME; import static org.apache.fluss.server.testutils.PartitionMetadataAssert.assertPartitionMetadata; import static org.apache.fluss.server.testutils.TableMetadataAssert.assertTableMetadata; import static org.apache.fluss.server.zk.data.LeaderAndIsr.INITIAL_BUCKET_EPOCH; @@ -2370,4 +2374,70 @@ private void assertUpdateMetadataEquals( } }); } + + // ---- JBOD multi-directory tests ---- + + @Test + @Tag(ReplicaTestBase.JBOD_MULTI_DIR_TAG) + void testNewBucketsDistributedAcrossDataDirs() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + makeLogTableAsLeader(0); + makeLogTableAsLeader(1); + + Replica logReplica1 = + replicaManager.getReplicaOrException(new TableBucket(DATA1_TABLE_ID, 0)); + Replica logReplica2 = + replicaManager.getReplicaOrException(new TableBucket(DATA1_TABLE_ID, 1)); + assertThat(logReplica1.getDataDir()).isEqualTo(dataDir1.getAbsoluteFile()); + assertThat(logReplica2.getDataDir()).isEqualTo(dataDir2.getAbsoluteFile()); + } + + @Test + @Tag(ReplicaTestBase.JBOD_MULTI_DIR_TAG) + void testLogAndKvCoLocatedForPrimaryKeyTable() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, 0); + makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK, 1); + + Replica kvReplica1 = + replicaManager.getReplicaOrException(new TableBucket(DATA1_TABLE_ID_PK, 0)); + Replica kvReplica2 = + replicaManager.getReplicaOrException(new TableBucket(DATA1_TABLE_ID_PK, 1)); + assertThat(kvReplica1.getLogTablet().getLogDir().toPath()).startsWith(dataDir1.toPath()); + assertThat(kvReplica1.getKvTablet().getKvTabletDir().toPath()) + .startsWith(dataDir1.toPath()); + assertThat(kvReplica2.getLogTablet().getLogDir().toPath()).startsWith(dataDir2.toPath()); + assertThat(kvReplica2.getKvTablet().getKvTabletDir().toPath()) + .startsWith(dataDir2.toPath()); + } + + @Test + @Tag(ReplicaTestBase.JBOD_MULTI_DIR_TAG) + void testHighWatermarkCheckpointIsWrittenPerDirectory() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + makeLogTableAsLeader(0); + makeLogTableAsLeader(1); + + Replica replica1 = replicaManager.getReplicaOrException(new TableBucket(DATA1_TABLE_ID, 0)); + Replica replica2 = replicaManager.getReplicaOrException(new TableBucket(DATA1_TABLE_ID, 1)); + replica1.appendRecordsToLeader(genMemoryLogRecordsByObject(DATA1), 1); + replica2.appendRecordsToLeader(genMemoryLogRecordsByObject(DATA1), 1); + + replicaManager.checkpointHighWatermarks(); + + Map checkpoint1 = + new OffsetCheckpointFile(new File(dataDir1, HIGH_WATERMARK_CHECKPOINT_FILE_NAME)) + .read(); + Map checkpoint2 = + new OffsetCheckpointFile(new File(dataDir2, HIGH_WATERMARK_CHECKPOINT_FILE_NAME)) + .read(); + + assertThat(checkpoint1).containsOnlyKeys(new TableBucket(DATA1_TABLE_ID, 0)); + assertThat(checkpoint1.get(new TableBucket(DATA1_TABLE_ID, 0))).isEqualTo(10L); + assertThat(checkpoint2).containsOnlyKeys(new TableBucket(DATA1_TABLE_ID, 1)); + assertThat(checkpoint2.get(new TableBucket(DATA1_TABLE_ID, 1))).isEqualTo(10L); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java index 8c2f772c3e..e544f4dd2f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java @@ -53,6 +53,7 @@ import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.metrics.group.BucketMetricGroup; import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; @@ -69,6 +70,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; @@ -124,6 +126,8 @@ * function managed by {@link ReplicaManager}. */ public class ReplicaTestBase { + public static final String JBOD_MULTI_DIR_TAG = "jbod-multidir"; + @RegisterExtension public static final AllCallbackWrapper ZOO_KEEPER_EXTENSION_WRAPPER = new AllCallbackWrapper<>(new ZooKeeperExtension()); @@ -137,6 +141,7 @@ public class ReplicaTestBase { protected @TempDir File tempDir; protected ManualClock manualClock; + protected LocalDiskManager localDiskManager; protected LogManager logManager; protected KvManager kvManager; protected ReplicaManager replicaManager; @@ -168,9 +173,18 @@ static void baseBeforeAll() { } @BeforeEach - public void setup() throws Exception { + public void setup(TestInfo testInfo) throws Exception { conf = getServerConf(); - conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath()); + conf.set(ConfigOptions.TABLET_SERVER_ID, TABLET_SERVER_ID); + if (testInfo != null && testInfo.getTags().contains(JBOD_MULTI_DIR_TAG)) { + conf.set( + ConfigOptions.DATA_DIRS, + Arrays.asList( + new File(tempDir, "data-1").getAbsolutePath(), + new File(tempDir, "data-2").getAbsolutePath())); + } else { + conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath()); + } conf.setString(ConfigOptions.COORDINATOR_HOST, "localhost"); conf.set(ConfigOptions.REMOTE_DATA_DIR, tempDir.getAbsolutePath() + "/remote_data_dir"); conf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 2); @@ -188,18 +202,24 @@ public void setup() throws Exception { ioExecutor = Executors.newSingleThreadExecutor(); manualClock = new ManualClock(System.currentTimeMillis()); + localDiskManager = LocalDiskManager.create(conf); logManager = LogManager.create( conf, zkClient, scheduler, manualClock, - TestingMetricGroups.TABLET_SERVER_METRICS); + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); logManager.startup(); kvManager = KvManager.create( - conf, zkClient, logManager, TestingMetricGroups.TABLET_SERVER_METRICS); + conf, + zkClient, + logManager, + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); kvManager.startup(); serverMetadataCache = @@ -225,6 +245,10 @@ public void setup() throws Exception { registerTableInZkClient(); } + // Kept for subclasses that still define @BeforeEach setup() and call super.setup(). + // The actual initialization already happens in setup(TestInfo). + protected void setup() throws Exception {} + private void initMetadataCache(TabletServerMetadataCache metadataCache) { metadataCache.updateClusterMetadata( new ClusterMetadata( @@ -324,7 +348,8 @@ protected ReplicaManager buildReplicaManager(CoordinatorGateway coordinatorGatew TestingMetricGroups.USER_METRICS, remoteLogManager, manualClock, - ioExecutor); + ioExecutor, + localDiskManager); } @AfterEach @@ -359,6 +384,10 @@ void tearDown() throws Exception { scheduler.shutdown(); } + if (localDiskManager != null) { + localDiskManager.close(); + } + if (ioExecutor != null) { ioExecutor.shutdown(); } @@ -479,6 +508,7 @@ private Replica makeReplica( .getServerMetricGroup() .addTableBucketMetricGroup(physicalTablePath, tableBucket, isPkTable); return new Replica( + localDiskManager.selectDataDirForNewBucket(isPkTable), physicalTablePath, tableBucket, logManager, @@ -489,7 +519,7 @@ private Replica makeReplica( new OffsetCheckpointFile.LazyOffsetCheckpoints( new OffsetCheckpointFile( new File( - conf.getString(ConfigOptions.DATA_DIR), + localDiskManager.dataDirs().get(0), HIGH_WATERMARK_CHECKPOINT_FILE_NAME))), replicaManager.getDelayedWriteManager(), replicaManager.getDelayedFetchLogManager(), @@ -511,6 +541,8 @@ private void initRemoteLogEnv() throws Exception { conf, zkClient, testCoordinatorGateway, + localDiskManager, + logManager, remoteLogStorage, remoteLogTaskScheduler, manualClock); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java index 556c5bea4c..8d62ea72da 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java @@ -40,6 +40,7 @@ import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.replica.ReplicaManager; +import org.apache.fluss.server.storage.LocalDiskManager; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; @@ -104,6 +105,8 @@ public class ReplicaFetcherThreadTest { private ReplicaManager followerRM; private ReplicaFetcherThread followerFetcher; private ExecutorService ioExecutor; + private LocalDiskManager leaderLocalDiskManager; + private LocalDiskManager followerLocalDiskManager; @BeforeAll static void baseBeforeAll() { @@ -119,8 +122,10 @@ public void setup() throws Exception { manualClock = new ManualClock(System.currentTimeMillis()); Configuration conf = new Configuration(); tb = new TableBucket(DATA1_TABLE_ID, 0); - leaderRM = createReplicaManager(leaderServerId); - followerRM = createReplicaManager(followerServerId); + leaderLocalDiskManager = createLocalDiskManager(leaderServerId); + leaderRM = createReplicaManager(leaderServerId, leaderLocalDiskManager); + followerLocalDiskManager = createLocalDiskManager(followerServerId); + followerRM = createReplicaManager(followerServerId, followerLocalDiskManager); // with local test leader end point. leader = new ServerNode( @@ -143,6 +148,12 @@ public void setup() throws Exception { @AfterEach public void tearDown() throws Exception { + if (leaderLocalDiskManager != null) { + leaderLocalDiskManager.close(); + } + if (followerLocalDiskManager != null) { + followerLocalDiskManager.close(); + } if (ioExecutor != null) { ioExecutor.shutdownNow(); } @@ -496,8 +507,17 @@ private void makeLeaderAndFollower() { result -> {}); } - private ReplicaManager createReplicaManager(int serverId) throws Exception { + private LocalDiskManager createLocalDiskManager(int serverId) throws Exception { Configuration conf = new Configuration(); + conf.set(ConfigOptions.TABLET_SERVER_ID, serverId); + conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath() + "/server-" + serverId); + return LocalDiskManager.create(conf); + } + + private ReplicaManager createReplicaManager(int serverId, LocalDiskManager localDiskManager) + throws Exception { + Configuration conf = new Configuration(); + conf.set(ConfigOptions.TABLET_SERVER_ID, serverId); conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath() + "/server-" + serverId); conf.set(ConfigOptions.WRITER_ID_EXPIRATION_TIME, Duration.ofHours(12)); Scheduler scheduler = new FlussScheduler(2); @@ -509,7 +529,8 @@ private ReplicaManager createReplicaManager(int serverId) throws Exception { zkClient, scheduler, manualClock, - TestingMetricGroups.TABLET_SERVER_METRICS); + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); logManager.startup(); ReplicaManager replicaManager = new TestingReplicaManager( @@ -527,7 +548,8 @@ private ReplicaManager createReplicaManager(int serverId) throws Exception { RpcClient.create(conf, TestingClientMetricGroup.newInstance()), TestingMetricGroups.TABLET_SERVER_METRICS, manualClock, - ioExecutor); + ioExecutor, + localDiskManager); replicaManager.startup(); return replicaManager; } @@ -548,7 +570,8 @@ public TestingReplicaManager( RpcClient rpcClient, TabletServerMetricGroup serverMetricGroup, Clock clock, - ExecutorService ioExecutor) + ExecutorService ioExecutor, + LocalDiskManager localDiskManager) throws IOException { super( conf, @@ -565,7 +588,8 @@ public TestingReplicaManager( serverMetricGroup, USER_METRICS, clock, - ioExecutor); + ioExecutor, + localDiskManager); } @Override diff --git a/fluss-server/src/test/java/org/apache/fluss/server/storage/LocalDiskManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/storage/LocalDiskManagerTest.java new file mode 100644 index 0000000000..78c5bcfd5a --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/storage/LocalDiskManagerTest.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.storage; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.IllegalConfigurationException; +import org.apache.fluss.exception.LogStorageException; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.Arrays; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link LocalDiskManager}. */ +class LocalDiskManagerTest { + + private static final int TABLET_SERVER_ID = 1; + + @TempDir private File tempDir; + + @Test + void testCreateDiskPropertiesAndReuseDiskId() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + Configuration conf = createConf(dataDir1, dataDir2); + + String diskId1; + String diskId2; + try (LocalDiskManager localDiskManager = LocalDiskManager.create(conf)) { + diskId1 = diskId(dataDir1); + diskId2 = diskId(dataDir2); + assertThat(new File(dataDir1, LocalDiskManager.DISK_PROPERTIES_FILE_NAME)).exists(); + assertThat(new File(dataDir2, LocalDiskManager.DISK_PROPERTIES_FILE_NAME)).exists(); + assertThat(new File(dataDir1, LocalDiskManager.LOCK_FILE_NAME)).exists(); + assertThat(new File(dataDir2, LocalDiskManager.LOCK_FILE_NAME)).exists(); + assertThat(localDiskManager.diskId(dataDir1)).isEqualTo(diskId1); + assertThat(localDiskManager.diskId(dataDir2)).isEqualTo(diskId2); + } + + try (LocalDiskManager localDiskManager = LocalDiskManager.create(conf)) { + assertThat(localDiskManager.diskId(dataDir1)).isEqualTo(diskId1); + assertThat(localDiskManager.diskId(dataDir2)).isEqualTo(diskId2); + } + } + + @Test + void testFallbackToDataDirWhenDataDirsIsNotConfigured() throws Exception { + File expectedDataDir = new File(tempDir, "nested/../data-single"); + Configuration conf = new Configuration(); + conf.set(ConfigOptions.TABLET_SERVER_ID, TABLET_SERVER_ID); + conf.setString(ConfigOptions.DATA_DIR, expectedDataDir.getPath()); + + try (LocalDiskManager localDiskManager = LocalDiskManager.create(conf)) { + assertThat(localDiskManager.dataDirs()) + .containsExactly( + expectedDataDir.getAbsoluteFile().toPath().normalize().toFile()); + } + } + + @Test + void testDataDirsTakePrecedenceOverDataDir() throws Exception { + File fallbackDataDir = new File(tempDir, "fallback-single"); + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + + Configuration conf = new Configuration(); + conf.set(ConfigOptions.TABLET_SERVER_ID, TABLET_SERVER_ID); + conf.setString(ConfigOptions.DATA_DIR, fallbackDataDir.getAbsolutePath()); + conf.set( + ConfigOptions.DATA_DIRS, + Arrays.asList(dataDir1.getAbsolutePath(), dataDir2.getAbsolutePath())); + + try (LocalDiskManager localDiskManager = LocalDiskManager.create(conf)) { + assertThat(localDiskManager.dataDirs()) + .containsExactly(dataDir1.getAbsoluteFile(), dataDir2.getAbsoluteFile()); + assertThat(fallbackDataDir).doesNotExist(); + } + } + + @Test + void testNormalizeConfiguredDataDirPaths() throws Exception { + File normalizedDataDir = new File(tempDir, "data-1"); + Configuration normalizedConf = + createConfFromPaths(new File(tempDir, "nested/../data-1").getPath()); + + try (LocalDiskManager localDiskManager = LocalDiskManager.create(normalizedConf)) { + assertThat(localDiskManager.dataDirs()) + .containsExactly(normalizedDataDir.getAbsoluteFile()); + } + } + + @Test + void testRejectDuplicateAndOverlappingDirs() { + // exact duplicate + Configuration duplicateConf = new Configuration(); + duplicateConf.set(ConfigOptions.TABLET_SERVER_ID, TABLET_SERVER_ID); + String sameDir = new File(tempDir, "same").getAbsolutePath(); + duplicateConf.set(ConfigOptions.DATA_DIRS, Arrays.asList(sameDir, sameDir)); + + assertThatThrownBy(() -> LocalDiskManager.create(duplicateConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining("Duplicate local data directory"); + + // normalized duplicate + File normalizedDataDir = new File(tempDir, "data-1"); + Configuration normalizedDupConf = + createConfFromPaths( + normalizedDataDir.getAbsolutePath(), + new File(tempDir, "nested/../data-1").getPath()); + assertThatThrownBy(() -> LocalDiskManager.create(normalizedDupConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining("Duplicate local data directory"); + + // parent-child overlap + File parent = new File(tempDir, "parent"); + Configuration overlapConf = new Configuration(); + overlapConf.set(ConfigOptions.TABLET_SERVER_ID, TABLET_SERVER_ID); + overlapConf.set( + ConfigOptions.DATA_DIRS, + Arrays.asList( + parent.getAbsolutePath(), new File(parent, "child").getAbsolutePath())); + + assertThatThrownBy(() -> LocalDiskManager.create(overlapConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining("must not overlap"); + + // normalized overlap + Configuration normalizedOverlapConf = + createConfFromPaths( + new File(tempDir, "root").getAbsolutePath(), + new File(tempDir, "nested/../root/child").getPath()); + assertThatThrownBy(() -> LocalDiskManager.create(normalizedOverlapConf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining("must not overlap"); + } + + @Test + void testDirectoryLocksPreventDoubleOwnership() throws Exception { + File dataDir = new File(tempDir, "locked"); + Configuration conf = createConf(dataDir); + + try (LocalDiskManager ignored = LocalDiskManager.create(conf)) { + assertThatThrownBy(() -> LocalDiskManager.create(conf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining("Failed to acquire lock on file .lock") + .hasMessageContaining("another process or thread is using this directory"); + } + } + + @Test + void testRejectDuplicateDiskIdsAcrossDataDirs() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + assertThat(dataDir1.mkdirs()).isTrue(); + assertThat(dataDir2.mkdirs()).isTrue(); + writeDiskProperties(dataDir1, "shared-disk-id", TABLET_SERVER_ID); + writeDiskProperties(dataDir2, "shared-disk-id", TABLET_SERVER_ID); + + Configuration conf = createConf(dataDir1, dataDir2); + + assertThatThrownBy(() -> LocalDiskManager.create(conf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining("duplicate disk.id"); + assertThat(new File(dataDir1, LocalDiskManager.LOCK_FILE_NAME)).doesNotExist(); + assertThat(new File(dataDir2, LocalDiskManager.LOCK_FILE_NAME)).doesNotExist(); + } + + @Test + void testKeepHealthyDirWhenAnotherDirHasBrokenDiskProperties() throws Exception { + File brokenDataDir = new File(tempDir, "broken-data"); + File healthyDataDir = new File(tempDir, "healthy-data"); + assertThat(brokenDataDir.mkdirs()).isTrue(); + assertThat(healthyDataDir.mkdirs()).isTrue(); + + Properties properties = new Properties(); + properties.setProperty("version", "999"); + properties.setProperty("disk.id", "broken-disk-id"); + properties.setProperty("server.id", String.valueOf(TABLET_SERVER_ID)); + try (FileOutputStream outputStream = + new FileOutputStream( + new File(brokenDataDir, LocalDiskManager.DISK_PROPERTIES_FILE_NAME))) { + properties.store(outputStream, null); + } + + Configuration conf = createConf(brokenDataDir, healthyDataDir); + + try (LocalDiskManager localDiskManager = LocalDiskManager.create(conf)) { + assertThat(localDiskManager.dataDirs()) + .containsExactly(healthyDataDir.getAbsoluteFile()); + assertThat(localDiskManager.diskId(healthyDataDir)).isNotBlank(); + assertThatThrownBy(() -> localDiskManager.diskId(brokenDataDir)) + .isInstanceOf(IllegalArgumentException.class); + } + } + + @Test + void testKeepHealthyDirWhenAnotherDirCannotBeValidated() throws Exception { + File badPath = new File(tempDir, "not-a-dir"); + assertThat(badPath.createNewFile()).isTrue(); + File healthyDataDir = new File(tempDir, "healthy-data"); + + Configuration conf = createConf(badPath, healthyDataDir); + + try (LocalDiskManager localDiskManager = LocalDiskManager.create(conf)) { + assertThat(localDiskManager.dataDirs()) + .containsExactly(healthyDataDir.getAbsoluteFile()); + assertThat(new File(healthyDataDir, LocalDiskManager.DISK_PROPERTIES_FILE_NAME)) + .exists(); + assertThat(new File(healthyDataDir, LocalDiskManager.LOCK_FILE_NAME)).exists(); + } + } + + @Test + void testSelectDataDirUsesConfiguredOrderAsTieBreaker() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + Configuration conf = createConf(dataDir2, dataDir1); + + try (LocalDiskManager localDiskManager = LocalDiskManager.create(conf)) { + assertThat(localDiskManager.selectDataDirForNewBucket(false)) + .isEqualTo(dataDir2.getAbsoluteFile()); + assertThat(localDiskManager.selectDataDirForNewBucket(true)) + .isEqualTo(dataDir2.getAbsoluteFile()); + } + } + + @Test + void testSelectDataDirUsesSeparatedLogAndKvCounters() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + Configuration conf = createConf(dataDir1, dataDir2); + + try (LocalDiskManager localDiskManager = LocalDiskManager.create(conf)) { + File logDataDir1 = localDiskManager.selectDataDirForNewBucket(false); + assertThat(logDataDir1).isEqualTo(dataDir1.getAbsoluteFile()); + localDiskManager.recordReplicaLoad(logDataDir1, false); + + File logDataDir2 = localDiskManager.selectDataDirForNewBucket(false); + assertThat(logDataDir2).isEqualTo(dataDir2.getAbsoluteFile()); + localDiskManager.recordReplicaLoad(logDataDir2, false); + + File kvDataDir1 = localDiskManager.selectDataDirForNewBucket(true); + assertThat(kvDataDir1).isEqualTo(dataDir1.getAbsoluteFile()); + localDiskManager.recordReplicaLoad(kvDataDir1, true); + + File kvDataDir2 = localDiskManager.selectDataDirForNewBucket(true); + assertThat(kvDataDir2).isEqualTo(dataDir2.getAbsoluteFile()); + + assertThat(localDiskManager.logBucketCount(dataDir1)).isEqualTo(2); + assertThat(localDiskManager.logBucketCount(dataDir2)).isEqualTo(1); + assertThat(localDiskManager.kvBucketCount(dataDir1)).isEqualTo(1); + assertThat(localDiskManager.kvBucketCount(dataDir2)).isEqualTo(0); + } + } + + @Test + void testRejectMismatchedServerIdInDiskProperties() throws Exception { + File dataDir = new File(tempDir, "data-1"); + assertThat(dataDir.mkdirs()).isTrue(); + writeDiskProperties(dataDir, "disk-1", TABLET_SERVER_ID + 1); + + assertThatThrownBy(() -> LocalDiskManager.create(createConf(dataDir))) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining("does not match server.id"); + } + + @Test + void testResolveDataDir() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + Configuration conf = createConf(dataDir1, dataDir2); + + try (LocalDiskManager localDiskManager = LocalDiskManager.create(conf)) { + File childOfDir1 = new File(dataDir1, "db/table/bucket-0"); + assertThat(localDiskManager.resolveDataDir(childOfDir1)) + .isEqualTo(dataDir1.getAbsoluteFile()); + + File childOfDir2 = new File(dataDir2, "db/table/bucket-1"); + assertThat(localDiskManager.resolveDataDir(childOfDir2)) + .isEqualTo(dataDir2.getAbsoluteFile()); + + File unknownPath = new File(tempDir, "unknown/path"); + assertThatThrownBy(() -> localDiskManager.resolveDataDir(unknownPath)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("does not belong to any configured data directory"); + } + } + + @Test + void testRecordReplicaDeleteDecrementsCounters() throws Exception { + File dataDir1 = new File(tempDir, "data-1"); + File dataDir2 = new File(tempDir, "data-2"); + Configuration conf = createConf(dataDir1, dataDir2); + + try (LocalDiskManager localDiskManager = LocalDiskManager.create(conf)) { + // load some replicas + localDiskManager.recordReplicaLoad(dataDir1.getAbsoluteFile(), false); + localDiskManager.recordReplicaLoad(dataDir1.getAbsoluteFile(), true); + localDiskManager.recordReplicaLoad(dataDir2.getAbsoluteFile(), false); + + assertThat(localDiskManager.logBucketCount(dataDir1)).isEqualTo(2); + assertThat(localDiskManager.kvBucketCount(dataDir1)).isEqualTo(1); + assertThat(localDiskManager.logBucketCount(dataDir2)).isEqualTo(1); + + // delete log-only replica + localDiskManager.recordReplicaDelete(dataDir2.getAbsoluteFile(), false); + assertThat(localDiskManager.logBucketCount(dataDir2)).isEqualTo(0); + assertThat(localDiskManager.kvBucketCount(dataDir2)).isEqualTo(0); + + // delete PK replica + localDiskManager.recordReplicaDelete(dataDir1.getAbsoluteFile(), true); + assertThat(localDiskManager.logBucketCount(dataDir1)).isEqualTo(1); + assertThat(localDiskManager.kvBucketCount(dataDir1)).isEqualTo(0); + + // ensure counts do not go below zero + localDiskManager.recordReplicaDelete(dataDir2.getAbsoluteFile(), true); + assertThat(localDiskManager.logBucketCount(dataDir2)).isEqualTo(0); + assertThat(localDiskManager.kvBucketCount(dataDir2)).isEqualTo(0); + } + } + + @Test + void testAllDirectoriesUnusableFailsStartup() throws Exception { + // create two paths that are regular files, not directories + File badPath1 = new File(tempDir, "bad-1"); + File badPath2 = new File(tempDir, "bad-2"); + assertThat(badPath1.createNewFile()).isTrue(); + assertThat(badPath2.createNewFile()).isTrue(); + + Configuration conf = createConf(badPath1, badPath2); + + assertThatThrownBy(() -> LocalDiskManager.create(conf)) + .isInstanceOf(LogStorageException.class) + .hasMessageContaining("None of the specified data dirs"); + } + + private Configuration createConf(File... dataDirs) { + Configuration conf = new Configuration(); + conf.set(ConfigOptions.TABLET_SERVER_ID, TABLET_SERVER_ID); + conf.set( + ConfigOptions.DATA_DIRS, + Arrays.stream(dataDirs).map(File::getAbsolutePath).collect(Collectors.toList())); + return conf; + } + + private Configuration createConfFromPaths(String... dataDirs) { + Configuration conf = new Configuration(); + conf.set(ConfigOptions.TABLET_SERVER_ID, TABLET_SERVER_ID); + conf.set(ConfigOptions.DATA_DIRS, Arrays.asList(dataDirs)); + return conf; + } + + private String diskId(File dataDir) throws Exception { + Properties properties = new Properties(); + try (FileInputStream inputStream = + new FileInputStream( + new File(dataDir, LocalDiskManager.DISK_PROPERTIES_FILE_NAME))) { + properties.load(inputStream); + } + assertThat(properties.getProperty("version")).isEqualTo("1"); + assertThat(properties.getProperty("server.id")).isEqualTo(String.valueOf(TABLET_SERVER_ID)); + return properties.getProperty("disk.id"); + } + + private void writeDiskProperties(File dataDir, String diskId, int serverId) throws Exception { + Properties properties = new Properties(); + properties.setProperty("version", "1"); + properties.setProperty("disk.id", diskId); + properties.setProperty("server.id", String.valueOf(serverId)); + try (FileOutputStream outputStream = + new FileOutputStream( + new File(dataDir, LocalDiskManager.DISK_PROPERTIES_FILE_NAME))) { + properties.store(outputStream, null); + } + } +} From 8c7375b5517b4d646515a57957680afbfbe3b275 Mon Sep 17 00:00:00 2001 From: Han Liu Date: Thu, 23 Apr 2026 20:58:43 +0800 Subject: [PATCH 2/3] flix comment --- .../apache/fluss/config/ConfigOptions.java | 4 +- .../server/log/remote/RemoteLogManager.java | 24 +++++----- .../fluss/server/replica/ReplicaManager.java | 44 +++++++++---------- .../server/storage/LocalDiskManager.java | 12 ++--- .../fluss/server/log/LogManagerTest.java | 9 ++-- .../log/remote/RemoteLogManagerTest.java | 27 +++++++++++- .../server/replica/ReplicaManagerTest.java | 7 +-- .../fluss/server/replica/ReplicaTestBase.java | 4 +- .../server/testutils/ServerTestTags.java | 26 +++++++++++ 9 files changed, 100 insertions(+), 57 deletions(-) create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/testutils/ServerTestTags.java diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index b42b8aaab4..fad275d6f9 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -878,8 +878,8 @@ public class ConfigOptions { .memoryType() .defaultValue(MemorySize.parse("1gb")) .withDescription( - "The total size of the space allocated to store index files fetched " - + "from remote storage in the local storage."); + "The size of the space allocated per local data directory to store " + + "index files fetched from remote storage in the local storage."); public static final ConfigOption REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = key("remote.log-manager.thread-pool-size") diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java index d7485aa652..9a01a0fe62 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java @@ -20,6 +20,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.NotLeaderOrFollowerException; import org.apache.fluss.exception.RemoteStorageException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.PhysicalTablePath; @@ -244,11 +245,8 @@ public void stopReplica(Replica replica, boolean deleteRemote) { /** Get the position of the given offset in the remote log segment. */ public int lookupPositionForOffset(RemoteLogSegment remoteLogSegment, long offset) { - RemoteLogIndexCache remoteLogIndexCache = - remoteLogIndexCacheForBucket(remoteLogSegment.tableBucket()); - return remoteLogIndexCache == null - ? -1 - : remoteLogIndexCache.lookupPosition(remoteLogSegment, offset); + return remoteLogIndexCacheForBucket(remoteLogSegment.tableBucket()) + .lookupPosition(remoteLogSegment, offset); } /** @@ -269,10 +267,8 @@ public long lookupOffsetForTimestamp(TableBucket tableBucket, long timestamp) { if (segment == null) { return -1L; } else { - RemoteLogIndexCache remoteLogIndexCache = remoteLogIndexCacheForBucket(tableBucket); - return remoteLogIndexCache == null - ? -1L - : remoteLogIndexCache.lookupOffsetForTimestamp(segment, timestamp); + return remoteLogIndexCacheForBucket(tableBucket) + .lookupOffsetForTimestamp(segment, timestamp); } } @@ -433,13 +429,13 @@ TaskWithFuture getTaskWithFuture(TableBucket tableBucket) { return rlmTasks.get(tableBucket); } - private @Nullable RemoteLogIndexCache remoteLogIndexCacheForBucket(TableBucket tableBucket) { + private RemoteLogIndexCache remoteLogIndexCacheForBucket(TableBucket tableBucket) { Optional logTabletOpt = logManager.getLog(tableBucket); if (!logTabletOpt.isPresent()) { - LOG.error( - "Can't resolve remote log index cache for bucket {} because no local log exists.", - tableBucket); - return null; + throw new NotLeaderOrFollowerException( + String.format( + "Can't resolve remote log index cache for bucket %s because no local log exists.", + tableBucket)); } return remoteLogIndexCache(localDiskManager.resolveDataDir(logTabletOpt.get().getLogDir())); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 52ddcbc78f..7810dc7d93 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -1503,19 +1503,22 @@ private FetchLogResultForBucket handleFetchOutOfRangeException( // of RemoteLogSegment. For client fetcher, it will fetch the log from remote in client. // For follower, it can update its local metadata to adjust the next fetch offset. else if (canFetchFromRemoteLog(replica, fetchOffset)) { - RemoteLogFetchInfo remoteLogFetchInfo = fetchLogFromRemote(replica, fetchOffset); - if (remoteLogFetchInfo != null) { - return new FetchLogResultForBucket( - tb, remoteLogFetchInfo, replica.getLogHighWatermark()); - } else { - return new FetchLogResultForBucket( - tb, - ApiError.fromThrowable( - new LogOffsetOutOfRangeException( - String.format( - "The fetch offset %s is out of range for table bucket %s", - fetchOffset, tb)))); + try { + RemoteLogFetchInfo remoteLogFetchInfo = fetchLogFromRemote(replica, fetchOffset); + if (remoteLogFetchInfo != null) { + return new FetchLogResultForBucket( + tb, remoteLogFetchInfo, replica.getLogHighWatermark()); + } + } catch (Exception ex) { + return new FetchLogResultForBucket(tb, ApiError.fromThrowable(ex)); } + return new FetchLogResultForBucket( + tb, + ApiError.fromThrowable( + new LogOffsetOutOfRangeException( + String.format( + "The fetch offset %s is out of range for table bucket %s", + fetchOffset, tb)))); } else { return new FetchLogResultForBucket(tb, ApiError.fromThrowable(e)); } @@ -1595,19 +1598,14 @@ void checkpointHighWatermarks() { .put(logTablet.getTableBucket(), logTablet.getHighWatermark()); } - try { - for (Map.Entry> entry : highWatermarksByDir.entrySet()) { - if (!entry.getValue().isEmpty()) { - try { - highWatermarkCheckpoints.get(entry.getKey()).write(entry.getValue()); - } catch (Exception e) { - throw new LogStorageException( - "Error while writing to high watermark file", e); - } + for (Map.Entry> entry : highWatermarksByDir.entrySet()) { + if (!entry.getValue().isEmpty()) { + try { + highWatermarkCheckpoints.get(entry.getKey()).write(entry.getValue()); + } catch (Exception e) { + throw new LogStorageException("Error while writing to high watermark file", e); } } - } catch (LogStorageException e) { - throw e; } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java b/fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java index b4eab07868..a89a86d76c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java @@ -406,7 +406,7 @@ public List dataDirs() { * returned directory is {@code /data-0}. */ public synchronized File resolveDataDir(File path) { - Path pathToResolve = path.toPath(); + Path pathToResolve = path.toPath().toAbsolutePath().normalize(); for (File dataDir : dataDirs) { if (pathToResolve.startsWith(dataDir.toPath())) { return dataDir; @@ -548,13 +548,13 @@ private DiskProperties(int version, String diskId, String serverId) { } private int serverIdAsInt(File dataDir) throws IOException { - if (serverId == null || serverId.trim().isEmpty()) { + if (serverId == null || serverId.isEmpty()) { throw new IOException( "Missing server.id in disk.properties under " + dataDir.getAbsolutePath()); } try { - return Integer.parseInt(serverId.trim()); + return Integer.parseInt(serverId); } catch (NumberFormatException e) { throw new IOException( "Invalid server.id in disk.properties under " @@ -566,11 +566,11 @@ private int serverIdAsInt(File dataDir) throws IOException { } private String diskId(File dataDir) throws IOException { - if (diskId == null || diskId.trim().isEmpty()) { + if (diskId == null || diskId.isEmpty()) { throw new IOException( "Missing disk.id in disk.properties under " + dataDir.getAbsolutePath()); } - return diskId.trim(); + return diskId; } @Override @@ -660,7 +660,7 @@ private DiskProperties read() throws IOException { String serverId = properties.getProperty(SERVER_ID_KEY); return new DiskProperties(parsedVersion, diskId, serverId); } catch (NoSuchFileException e) { - LOG.warn("No disk.properties file under dir {}", file.getAbsolutePath()); + LOG.info("No disk.properties file under dir {}", file.getAbsolutePath()); return null; } catch (Exception e) { LOG.error( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java index a70e32e3b9..a644413c75 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java @@ -27,6 +27,7 @@ import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile; import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.storage.LocalDiskManager; +import org.apache.fluss.server.testutils.ServerTestTags; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; @@ -72,8 +73,6 @@ /** Test for {@link LogManager}. */ final class LogManagerTest extends LogTestBase { - private static final String JBOD_MULTI_DIR_TAG = "jbod-multidir"; - @RegisterExtension public static final AllCallbackWrapper ZOO_KEEPER_EXTENSION_WRAPPER = new AllCallbackWrapper<>(new ZooKeeperExtension()); @@ -100,7 +99,7 @@ static void baseBeforeAll() { @BeforeEach public void setup(TestInfo testInfo) throws Exception { super.before(); - if (testInfo.getTags().contains(JBOD_MULTI_DIR_TAG)) { + if (testInfo.getTags().contains(ServerTestTags.JBOD_MULTI_DIR_TAG)) { conf.set( ConfigOptions.DATA_DIRS, Arrays.asList( @@ -321,7 +320,7 @@ void testDeleteLog(String partitionName) throws Exception { } @Test - @Tag(JBOD_MULTI_DIR_TAG) + @Tag(ServerTestTags.JBOD_MULTI_DIR_TAG) void testCheckpointRecoveryPointsAreWrittenPerDirectory() throws Exception { File dataDir1 = new File(tempDir, "data-1"); File dataDir2 = new File(tempDir, "data-2"); @@ -355,7 +354,7 @@ void testCheckpointRecoveryPointsAreWrittenPerDirectory() throws Exception { } @Test - @Tag(JBOD_MULTI_DIR_TAG) + @Tag(ServerTestTags.JBOD_MULTI_DIR_TAG) void testPerDirectoryCleanShutdownAndRecovery() throws Exception { File dataDir1 = new File(tempDir, "data-1"); File dataDir2 = new File(tempDir, "data-2"); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java index eac1e22efa..301bc08765 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.server.log.remote; import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.NotLeaderOrFollowerException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.remote.RemoteLogFetchInfo; @@ -32,7 +33,7 @@ import org.apache.fluss.server.log.LogTablet; import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.replica.ReplicaManager; -import org.apache.fluss.server.replica.ReplicaTestBase; +import org.apache.fluss.server.testutils.ServerTestTags; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -610,6 +611,28 @@ void testLookupOffsetForTimestamp(boolean partitionTable) throws Exception { .isEqualTo(-1L); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testLookupRemoteIndexThrowsWhenLocalLogMissing(boolean partitionTable) throws Exception { + TableBucket tb = makeTableBucket(partitionTable); + makeLogTableAsLeader(tb, partitionTable); + LogTablet logTablet = replicaManager.getReplicaOrException(tb).getLogTablet(); + long startTimestamp = manualClock.milliseconds(); + addMultiSegmentsToLogTablet(logTablet, 5); + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + + RemoteLogSegment remoteLogSegment = + remoteLogManager.relevantRemoteLogSegments(tb, 0L).get(0); + logManager.dropLog(tb); + + assertThatThrownBy(() -> remoteLogManager.lookupPositionForOffset(remoteLogSegment, 2L)) + .isInstanceOf(NotLeaderOrFollowerException.class) + .hasMessageContaining("Can't resolve remote log index cache for bucket " + tb); + assertThatThrownBy(() -> remoteLogManager.lookupOffsetForTimestamp(tb, startTimestamp)) + .isInstanceOf(NotLeaderOrFollowerException.class) + .hasMessageContaining("Can't resolve remote log index cache for bucket " + tb); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testAlterTableTieredLogLocalSegments(boolean partitionedTable) throws Exception { @@ -720,7 +743,7 @@ private static Stream stopArgs() { // ---- JBOD multi-directory tests ---- @Test - @Tag(ReplicaTestBase.JBOD_MULTI_DIR_TAG) + @Tag(ServerTestTags.JBOD_MULTI_DIR_TAG) void testRemoteIndexCacheFollowsReplicaDirectory() throws Exception { File dataDir1 = new File(tempDir, "data-1"); File dataDir2 = new File(tempDir, "data-2"); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java index bdbb6fa195..617b46b444 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java @@ -73,6 +73,7 @@ import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.metadata.TableMetadata; import org.apache.fluss.server.testutils.KvTestUtils; +import org.apache.fluss.server.testutils.ServerTestTags; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.testutils.DataTestUtils; @@ -2378,7 +2379,7 @@ private void assertUpdateMetadataEquals( // ---- JBOD multi-directory tests ---- @Test - @Tag(ReplicaTestBase.JBOD_MULTI_DIR_TAG) + @Tag(ServerTestTags.JBOD_MULTI_DIR_TAG) void testNewBucketsDistributedAcrossDataDirs() throws Exception { File dataDir1 = new File(tempDir, "data-1"); File dataDir2 = new File(tempDir, "data-2"); @@ -2394,7 +2395,7 @@ void testNewBucketsDistributedAcrossDataDirs() throws Exception { } @Test - @Tag(ReplicaTestBase.JBOD_MULTI_DIR_TAG) + @Tag(ServerTestTags.JBOD_MULTI_DIR_TAG) void testLogAndKvCoLocatedForPrimaryKeyTable() throws Exception { File dataDir1 = new File(tempDir, "data-1"); File dataDir2 = new File(tempDir, "data-2"); @@ -2414,7 +2415,7 @@ void testLogAndKvCoLocatedForPrimaryKeyTable() throws Exception { } @Test - @Tag(ReplicaTestBase.JBOD_MULTI_DIR_TAG) + @Tag(ServerTestTags.JBOD_MULTI_DIR_TAG) void testHighWatermarkCheckpointIsWrittenPerDirectory() throws Exception { File dataDir1 = new File(tempDir, "data-1"); File dataDir2 = new File(tempDir, "data-2"); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java index e544f4dd2f..e8cd440b48 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java @@ -54,6 +54,7 @@ import org.apache.fluss.server.metrics.group.BucketMetricGroup; import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.storage.LocalDiskManager; +import org.apache.fluss.server.testutils.ServerTestTags; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; @@ -126,7 +127,6 @@ * function managed by {@link ReplicaManager}. */ public class ReplicaTestBase { - public static final String JBOD_MULTI_DIR_TAG = "jbod-multidir"; @RegisterExtension public static final AllCallbackWrapper ZOO_KEEPER_EXTENSION_WRAPPER = @@ -176,7 +176,7 @@ static void baseBeforeAll() { public void setup(TestInfo testInfo) throws Exception { conf = getServerConf(); conf.set(ConfigOptions.TABLET_SERVER_ID, TABLET_SERVER_ID); - if (testInfo != null && testInfo.getTags().contains(JBOD_MULTI_DIR_TAG)) { + if (testInfo != null && testInfo.getTags().contains(ServerTestTags.JBOD_MULTI_DIR_TAG)) { conf.set( ConfigOptions.DATA_DIRS, Arrays.asList( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/ServerTestTags.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/ServerTestTags.java new file mode 100644 index 0000000000..167229d5b1 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/ServerTestTags.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.testutils; + +/** Common JUnit tags for server tests. */ +public final class ServerTestTags { + + public static final String JBOD_MULTI_DIR_TAG = "jbod-multidir"; + + private ServerTestTags() {} +} From 9bc453cc9a43706cf81a2d82d461866292fe4cd7 Mon Sep 17 00:00:00 2001 From: Han Liu Date: Fri, 24 Apr 2026 16:08:56 +0800 Subject: [PATCH 3/3] [rebase] fix MapUtils --- .../org/apache/fluss/server/log/remote/RemoteLogManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java index 9a01a0fe62..060b0e6be2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java @@ -124,7 +124,7 @@ public RemoteLogManager( this.coordinatorGateway = coordinatorGateway; this.logManager = logManager; this.localDiskManager = localDiskManager; - this.remoteLogIndexCachesByDir = MapUtils.newConcurrentHashMap(); + this.remoteLogIndexCachesByDir = new ConcurrentHashMap<>(); for (File dataDir : localDiskManager.dataDirs()) { remoteLogIndexCachesByDir.put( dataDir,