[server] Implement JBOD Phase 1 Local Multi-Directory Support#3030
[server] Implement JBOD Phase 1 Local Multi-Directory Support#3030hanliu0830 wants to merge 3 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Implements Phase 1 of JBOD (multi local disk) support in the Fluss TabletServer by introducing a LocalDiskManager that validates/locks multiple local directories, persists per-disk identity, and enables per-directory checkpointing + placement decisions across Log/KV/Replica/RemoteLog components.
Changes:
- Add
data.dirsconfig option and a newLocalDiskManagerthat validates/locks directories and manages per-disk metadata + simple load counters. - Plumb multi-directory awareness through
TabletServer,LogManager,KvManager,ReplicaManager,Replica, andRemoteLogManager(per-dir checkpoints, placement, cache). - Extend/adjust unit tests to cover multi-directory placement and per-directory checkpoint/cache behavior.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-server/src/test/java/org/apache/fluss/server/storage/LocalDiskManagerTest.java | New unit tests for directory validation, locking, disk.properties, and placement counters. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java | Test base now initializes LocalDiskManager and supports tagged JBOD multi-dir tests. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java | Adds JBOD multi-dir tests for distribution and per-dir high-watermark checkpointing. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java | Updates replica-manager construction in tests to pass LocalDiskManager. |
| fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java | Adds JBOD multi-dir test verifying index cache is tied to replica directory. |
| fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogIndexCacheTest.java | Uses per-dir RemoteLogIndexCache accessor. |
| fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java | Adds JBOD tests for per-dir recovery checkpoints and clean shutdown markers. |
| fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java | Updates tests to pass explicit data dir and close/recreate LocalDiskManager. |
| fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java | Updates KV tests to pass LocalDiskManager and explicit data dir for tablet creation. |
| fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java | Refactors to operate on a list of data dirs; lists tablets per dir; adds per-dir executor helper. |
| fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java | Creates/closes LocalDiskManager and passes it into log/kv/replica managers. |
| fluss-server/src/main/java/org/apache/fluss/server/storage/LocalDiskManager.java | New component that owns multi-dir validation, locking, disk identity, and placement counters. |
| fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java | Per-dir high-watermark checkpoints; placement uses LocalDiskManager; remote log manager ctor updated. |
| fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java | Replica now carries dataDir to ensure Log/KV tablets and paths are created in the chosen directory. |
| fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java | Maintains per-dir remote index caches; resolves cache based on bucket/log location. |
| fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java | Multi-dir recovery/shutdown, per-dir recovery-point checkpoints, and explicit data-dir log creation. |
| fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java | Multi-dir tablet creation APIs; plumbs selected data dir into KV tablet creation. |
| fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java | Adds new data.dirs configuration option. |
Comments suppressed due to low confidence (1)
fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java:134
- With multiple
data.dirs, log recovery/shutdown now creates per-directory thread pools. BecauserecoveryThreadsPerDataDiris sourced fromnetty.server.num-worker-threads(default 8), the total thread count scales asnumDataDirs * netty.server.num-worker-threads(plus extra per-dir executors), which can be much higher than before. Consider introducing a dedicated recovery thread config (total vs per-dir) or otherwise capping threads to avoid startup/shutdown thread explosion on hosts with many disks.
return new LogManager(
localDiskManager,
conf,
zkClient,
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
scheduler,
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public int lookupPositionForOffset(RemoteLogSegment remoteLogSegment, long offset) { | ||
| return remoteLogIndexCache.lookupPosition(remoteLogSegment, offset); | ||
| RemoteLogIndexCache remoteLogIndexCache = | ||
| remoteLogIndexCacheForBucket(remoteLogSegment.tableBucket()); | ||
| return remoteLogIndexCache == null | ||
| ? -1 |
There was a problem hiding this comment.
lookupPositionForOffset returns -1 when remoteLogIndexCacheForBucket(...) cannot resolve a cache. That firstStartPos is propagated into RemoteLogFetchInfo and the client uses it as the starting position within the first remote segment; -1 effectively causes the client to read from the beginning of the segment (wrong offset / duplicates). Consider failing fast (throw) or providing a deterministic fallback cache so a valid position is always computed for buckets that have remote segments.
| for (File dataDir : localDiskManager.dataDirs()) { | ||
| remoteLogIndexCachesByDir.put( | ||
| dataDir, | ||
| new RemoteLogIndexCache( | ||
| (int) | ||
| conf.get(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE) | ||
| .getBytes(), |
There was a problem hiding this comment.
REMOTE_LOG_INDEX_FILE_CACHE_SIZE is applied per configured dataDir by creating one RemoteLogIndexCache per directory, which multiplies the effective on-disk cache budget by #data.dirs (and changes the meaning of the config option, whose description implies a total budget). Consider splitting the configured cache size across directories or updating the configuration semantics/documentation accordingly.
| for (File dataDir : localDiskManager.dataDirs()) { | |
| remoteLogIndexCachesByDir.put( | |
| dataDir, | |
| new RemoteLogIndexCache( | |
| (int) | |
| conf.get(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE) | |
| .getBytes(), | |
| List<File> dataDirs = localDiskManager.dataDirs(); | |
| long totalRemoteLogIndexFileCacheSizeBytes = | |
| conf.get(ConfigOptions.REMOTE_LOG_INDEX_FILE_CACHE_SIZE).getBytes(); | |
| int perDataDirRemoteLogIndexFileCacheSizeBytes = | |
| dataDirs.isEmpty() | |
| ? (int) totalRemoteLogIndexFileCacheSizeBytes | |
| : (int) (totalRemoteLogIndexFileCacheSizeBytes / dataDirs.size()); | |
| for (File dataDir : dataDirs) { | |
| remoteLogIndexCachesByDir.put( | |
| dataDir, | |
| new RemoteLogIndexCache( | |
| perDataDirRemoteLogIndexFileCacheSizeBytes, |
| String serverId = properties.getProperty(SERVER_ID_KEY); | ||
| return new DiskProperties(parsedVersion, diskId, serverId); | ||
| } catch (NoSuchFileException e) { | ||
| LOG.warn("No disk.properties file under dir {}", file.getAbsolutePath()); |
There was a problem hiding this comment.
Missing disk.properties appears to be an expected state on first startup for a fresh directory, but this is logged at WARN. This can create noisy logs during normal bootstraps; consider lowering this to INFO/DEBUG (or only warn when the directory is non-empty / previously initialized).
| LOG.warn("No disk.properties file under dir {}", file.getAbsolutePath()); | |
| LOG.info("No disk.properties file under dir {}", file.getAbsolutePath()); |
| /** Test for {@link LogManager}. */ | ||
| final class LogManagerTest extends LogTestBase { | ||
|
|
||
| private static final String JBOD_MULTI_DIR_TAG = "jbod-multidir"; |
There was a problem hiding this comment.
JBOD_MULTI_DIR_TAG duplicates the same tag constant already defined in ReplicaTestBase (ReplicaTestBase.JBOD_MULTI_DIR_TAG). Reusing the shared constant would avoid accidental drift if the tag string changes.
| private static final String JBOD_MULTI_DIR_TAG = "jbod-multidir"; | |
| private static final String JBOD_MULTI_DIR_TAG = | |
| org.apache.fluss.server.replica.ReplicaTestBase.JBOD_MULTI_DIR_TAG; |
| } | ||
| } | ||
| } catch (LogStorageException e) { | ||
| throw e; |
There was a problem hiding this comment.
This exception has been thrown inside the loop, the outer try-catch could be removed.
There was a problem hiding this comment.
This exception has been thrown inside the loop, the outer try-catch could be removed.
Addressed in 6999b9e. I removed the redundant outer try-catch since the exception is already handled inside the loop.
| return remoteLogIndexCache.lookupOffsetForTimestamp(segment, timestamp); | ||
| RemoteLogIndexCache remoteLogIndexCache = remoteLogIndexCacheForBucket(tableBucket); | ||
| return remoteLogIndexCache == null | ||
| ? -1L |
There was a problem hiding this comment.
When remoteLogIndexCache is null, will it be better to throw an exception instead of return -1?
The same returning value -1 will mix up the cases between 1. failing to find the offset index. and 2. indexCache is null.
There was a problem hiding this comment.
When remoteLogIndexCache is null, will it be better to throw an exception instead of return -1? The same returning value -1 will mix up the cases between 1. failing to find the offset index. and 2. indexCache is null.
Addressed in 6999b9e. I changed this branch to throw NotLeaderOrFollowerException instead of returning -1, so we no longer mix up “index lookup miss” with “failed to resolve the index cache”.
| public synchronized File resolveDataDir(File path) { | ||
| Path pathToResolve = path.toPath(); | ||
| for (File dataDir : dataDirs) { | ||
| if (pathToResolve.startsWith(dataDir.toPath())) { |
There was a problem hiding this comment.
should ensure that the pathToResolve should be an absolute path, since we check with startsWith function. pathToResolve.toAbsolutePath().normalize()
There was a problem hiding this comment.
should ensure that the pathToResolve should be an absolute path, since we check with
startsWithfunction.pathToResolve.toAbsolutePath().normalize()
Thanks. Addressed in 6999b9e.
| } | ||
|
|
||
| try { | ||
| return Integer.parseInt(serverId.trim()); |
There was a problem hiding this comment.
many trim() calls in one function. In the constructor the serverId already trimmed. Or just trim() once in one function.
There was a problem hiding this comment.
many
trim()calls in one function. In the constructor the serverId already trimmed. Or just trim() once in one function.
Thanks. Addressed in 6999b9e.
| } | ||
|
|
||
| private int serverIdAsInt(File dataDir) throws IOException { | ||
| if (serverId == null || serverId.trim().isEmpty()) { |
There was a problem hiding this comment.
serverId is already trimmed in constructor.
There was a problem hiding this comment.
serverId is already trimmed in constructor.
Thanks. Addressed in 6999b9e.
|
|
||
| private String diskId(File dataDir) throws IOException { | ||
| if (diskId == null || diskId.trim().isEmpty()) { | ||
| throw new IOException( |
There was a problem hiding this comment.
same as above. diskId also was trimmed in the constructor.
There was a problem hiding this comment.
same as above. diskId also was trimmed in the constructor.
Thanks. Addressed in 6999b9e.
6999b9e to
8c7375b
Compare
(The sections below can be removed for hotfixes or typos)
-->
Purpose
Linked issue: close #145
Brief change log
Tests
API and Format
Documentation