[KV] Kvscan Server Side#3151
Conversation
4577f63 to
8fe21dc
Compare
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds server-side support for KV full-scan sessions (FIP-17), including session lifecycle management, RPC handling, and operator-facing configuration.
Changes:
- Introduces
ScannerManager/ScannerContextto manage server-side scan sessions with TTL eviction and concurrency limits. - Implements
TabletService#scanKvto open/continue/close scans and stream batched results. - Wires scanner cleanup into replica leadership/stop paths and documents new configuration options.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| website/docs/maintenance/configuration.md | Documents new KV scanner TTL/limits configs. |
| fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java | Adds config keys for scanner TTL, eviction interval, and limits. |
| fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java | Adds openScan to create snapshot-backed scan contexts. |
| fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java | New scan session state holder (snapshot/iterator/lease). |
| fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java | New session manager with TTL eviction and limit enforcement. |
| fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java | Implements scanKv RPC using ScannerManager. |
| fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java | Instantiates/closes ScannerManager and injects into services. |
| fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java | Closes scanners on leadership change / stopReplica; adds leader KV accessor. |
| fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java | Safety-net close of scanners in dropKv(). |
| fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java | Adds unit tests for openScan behavior. |
| fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java | New tests for session creation, limits, TTL eviction, and shutdown. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java | Ensures scanners are closed when stopping replicas. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| /** Injects the {@link ScannerManager} so that {@link #dropKv()} can close active scanners. */ | ||
| public void setScannerManager(@Nullable ScannerManager scannerManager) { |
There was a problem hiding this comment.
This is a little hack. The scannerManager should be immutable when the Replica is created. It should be a part of the constructor.
| conf.get(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME).toMillis() / 2); | ||
| } | ||
|
|
||
| public void setScannerManager(ScannerManager scannerManager) { |
There was a problem hiding this comment.
This is a little hack. The ScannerManager should never be null and should be immutable, so it should be set in the constructor.
| * {@link #closeScannersForBucket(TableBucket)} must be called when a bucket loses leadership to | ||
| * release all RocksDB snapshot/iterator resources for that bucket promptly. | ||
| */ | ||
| public class ScannerManager implements AutoCloseableAsync { |
There was a problem hiding this comment.
IIUC, this class is thread-safe, otherwise, it can be shared across worker threads. Please add @ThreadSafe annotation to the class.
| */ | ||
| @Nullable | ||
| public ScannerContext createScanner( | ||
| KvTablet kvTablet, TableBucket tableBucket, @Nullable Long limit) throws IOException { |
There was a problem hiding this comment.
We can derive the TableBucket variable from kvTablet.getTableBucket(). Thus, we don't need the tableBucket parameter. Otherwise, we have to check the consistence between kvTablet.getTableBucket() and tableBucket, which is an overhead.
| * bucket loses leadership to prevent stale RocksDB snapshot/iterator leaks. | ||
| */ | ||
| public void closeScannersForBucket(TableBucket tableBucket) { | ||
| List<String> toRemove = new ArrayList<>(); |
There was a problem hiding this comment.
nit: we can track ScannerContext rather than id in the toRemove List to avoid additional scanners.get(key) concurrent map lookup.
| while (context.isValid() && totalBytes < batchSizeBytes) { | ||
| byte[] value = context.currentValue(); | ||
| builder.append(value); | ||
| totalBytes += value.length; |
There was a problem hiding this comment.
We should use the builder.sizeInBytes rather than the totalBytes which is more accurate for the final record batch size.
| response.setErrorCode(Errors.forException(e).code()); | ||
| response.setErrorMessage(e.getMessage() != null ? e.getMessage() : ""); |
There was a problem hiding this comment.
it's suggested to use
ApiError error = ApiError.fromThrowable(e).
response.setError(error.error().code(), error.message());|
|
||
| String scannerId = generateScannerId(); | ||
| ScannerContext context = | ||
| kvTablet.openScan(scannerId, limit != null ? limit : -1L, clock.milliseconds()); |
There was a problem hiding this comment.
Unlike lookups(), prefixLookup(), and limitKvScan(), replicaManager.getLeaderKvTablet(tableBucket) returns the raw KvTablet after only an unsynchronized isLeader() check. If leadership flips after getLeaderKvTablet() returns but before
createScanner() registers the session, makeFollowers() has already run closeScannersForBucket(), so the
new scanner survives on a follower. Subsequent scanKv(scanner_id=...) calls never re-check leadership,
which lets clients keep reading a stale snapshot from a bucket that no longer has the leader.
A safe way to open scan should be have a openScan on the Replica, and returns the rocksdb snapshot under a leaderIsrUpdateLock read lock, just like other methods (lookups, limitScan...).
| private long remainingLimit; | ||
| // Initial value -1 so that the first client call_seq_id of 0 satisfies the server's | ||
| // in-order check: expectedSeqId = callSeqId + 1 = -1 + 1 = 0. | ||
| // callSeqId validation is only performed for continuation requests (those carrying a | ||
| // scanner_id), never for the initial open request (those carrying a bucket_scan_req). | ||
| private int callSeqId = -1; | ||
|
|
||
| /** | ||
| * Wall-clock timestamp (ms) of the most recent request that touched this session. Used by | ||
| * {@link ScannerManager} for TTL-based eviction. | ||
| */ | ||
| private long lastAccessTime; |
There was a problem hiding this comment.
ScannerManager updates lastAccessTime on RPC workers and reads it later from the background TTL evictor, but this state is stored in plain fields. Without volatile, the evictor can observe a stale timestamp and expire a scanner that was just used; callSeqId has the same visibility problem if a continuation is handled by a different worker after reconnect/retry. Since ScannerContext is intentionally shared across threads by the new manager, these fields need a happens-before edge.
| } | ||
| } | ||
|
|
||
| return CompletableFuture.completedFuture(response); |
There was a problem hiding this comment.
The ScanKvResponse contract in proto file says the initial batch must carry the log offset captured when the RocksDB snapshot was opened, but this implementation only fills scanner_id, has_more_results, and records.
Without that offset, a client cannot do a consistent snapshot-to-log handoff (and the empty-bucket fast path has no other way to return it), so scans can miss or duplicate updates that race with the snapshot.
We should initialize the log_offset into the ScannerContext when opening scan on the replica.
e406f65 to
627f403
Compare
|
@wuchong Thanks you for your thorough and detailed review. I addressed all the comments, rebased on main and added some improvements. Let me know if you think n there is something more that needs to be addressed. |
This PR brings in the server-side implementation for FIP-17, introduced here
#2809