HDDS-15447. Persist bucket scanned key pointer periodically#10412
Draft
ChenSammi wants to merge 1 commit into
Draft
HDDS-15447. Persist bucket scanned key pointer periodically#10412ChenSammi wants to merge 1 commit into
ChenSammi wants to merge 1 commit into
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces persistent bucket scan pointers for the OM KeyLifecycleService, allowing lifecycle scans to be resumed across OM restarts/leader transfers rather than restarting from the beginning. It adds a new lifecycleScanStateTable, a SaveLifecycleScanState write request type, and piggybacks scan-state updates onto DeleteKeysRequest so progress is persisted atomically with deletions. FSO and OBS scan paths are taught to skip already-processed directories/keys via the resumed pointer.
Changes:
- New
OmLifecycleScanStatePOJO, protobuf message, andlifecycleScanStateTable(added toOMDBDefinition/OmMetadataManagerImpl/OMMetadataManager). - New
SaveLifecycleScanStateOM request/response, and an optionalLifecycleScanStatefield onDeleteKeysRequestso deletes carry forward scan progress in the same Ratis batch. KeyLifecycleServicerefactored: trackslastScannedKey/Dir/DirKey, restores state at task start, skips already-scanned directories (canSkipDir), seeks iterators to the resume point, and emits state save requests;SubDirectorySummaryrenamed toDirectoryListand now also carries the dir table keys.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| HDDS-8342-Evaluation-And-Design.md | Design/evaluation doc added at the repo root (should not be committed). |
| hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto | Adds LifecycleScanState, SaveLifecycleScanStateRequest/Response, and scan-state field on DeleteKeysRequest. |
| hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLifecycleScanState.java | New POJO with codec/builder/proto conversion. |
| hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmLifecycleScanState.java | Builder/proto round-trip tests. |
| hadoop-ozone/interface-storage/.../OMMetadataManager.java | New getLifecycleScanStateTable() API. |
| hadoop-ozone/ozone-manager/.../OmMetadataManagerImpl.java | Wires the new table. |
| hadoop-ozone/ozone-manager/.../codec/OMDBDefinition.java | Registers LIFECYCLE_SCAN_STATE_TABLE_DEF. |
| hadoop-ozone/ozone-manager/.../service/KeyLifecycleService.java | Core resumable-scan logic, state save/resume, dir-skip evaluation, debug logs. |
| hadoop-ozone/ozone-manager/.../request/key/OMKeysDeleteRequest.java | Reads piggybacked scan state and updates cache; adds debug logging. |
| hadoop-ozone/ozone-manager/.../request/key/OmKeysDeleteRequestWithFSO.java | Threads OmLifecycleScanState through to FSO response. |
| hadoop-ozone/ozone-manager/.../request/lifecycle/OMLifecycleSaveScanStateRequest.java | New request handler updating the scan-state cache. |
| hadoop-ozone/ozone-manager/.../response/key/OMKeysDeleteResponse(WithFSO).java | Persist scan state in same batch as deletes; cleanup-table info updated. |
| hadoop-ozone/ozone-manager/.../response/lifecycle/OMLifecycleSaveScanStateResponse.java | New response persists state to DB. |
| hadoop-ozone/ozone-manager/.../ratis/utils/OzoneManagerRatisUtils.java | Dispatches new SaveLifecycleScanState request type. |
| hadoop-ozone/ozone-manager/.../test/.../TestOMLifecycleSaveScanStateRequest.java | Unit test for save request. |
| hadoop-ozone/ozone-manager/.../test/.../TestOMLifecycleSaveScanStateResponse.java | Unit test for save response. |
| hadoop-ozone/ozone-manager/.../test/.../TestOMKeysDeleteResponse(WithFSO).java | Updated to pass the new scanState argument. |
| hadoop-ozone/ozone-manager/.../test/.../TestKeyLifecycleService.java | Adds resume/skip/seek/state-failure tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+1
to
+90
| # Evaluation of Branch HDDS-8342 & Design for Resumable Lifecycle Scans | ||
|
|
||
| ## 1. Evaluation of Current Implementation (HDDS-8342) | ||
|
|
||
| The `HDDS-8342` branch introduces the `KeyLifecycleService`, a background service running on the Ozone Manager (OM) Leader to enforce bucket lifecycle rules (expiration, moving to trash, and aborting incomplete multipart uploads). | ||
|
|
||
| ### Strengths: | ||
| - **Comprehensive Coverage:** Handles OBS/Legacy buckets via sequential key iteration and FSO buckets via a Depth-First Search (DFS) to safely delete empty directories. | ||
| - **Batching:** Efficiently batches delete requests and submits them via Ratis, respecting the `ratisByteLimit`. | ||
| - **Fault Injection & Metrics:** Good use of metrics and fault injectors for testing. | ||
|
|
||
| ### Identified Issues & Limitations: | ||
| 1. **Lack of Resumability:** The entire bucket is scanned in a single `call()` execution. If the OM restarts, crashes, or a leader transfer occurs, the scan state is lost. The new leader must restart the scan from the beginning. For buckets with billions of keys, the scan may never complete if leader transfers happen periodically. | ||
| 2. **Thread Starvation (No Yielding):** The `call()` method loops over the entire bucket. This blocks the background service thread for a very long time, preventing other buckets or background tasks from getting fair CPU time. | ||
| 3. **Redundant FSO Scanning:** FSO buckets are evaluated rule by rule (for rules with prefixes). If multiple rules have different prefixes, the DFS traverses overlapping paths, which is less efficient than a single unified pass. | ||
|
|
||
| --- | ||
|
|
||
| ## 2. Design: Persisting Bucket Scan Pointers | ||
|
|
||
| To solve the resumability and thread starvation issues, we need to persist the scan progress (the "pointer") to the OM DB. This ensures that a new OM leader can resume from where the previous leader left off, and allows the task to yield cooperatively. | ||
|
|
||
| ### 2.1 Data Structure for the Scan Pointer | ||
| We will define a new Protobuf message `LifecycleScanStateProto` to capture the exact position of the scan. | ||
|
|
||
| ```protobuf | ||
| message LifecycleScanStateProto { | ||
| optional string bucketKey = 1; // e.g., /volume/bucket | ||
| optional uint64 scanStartTime = 2; // Epoch time when this full scan started | ||
|
|
||
| // Phase indicator | ||
| enum ScanPhase { | ||
| OBS_KEY_SCAN = 1; | ||
| FSO_DFS_SCAN = 2; | ||
| MPU_SCAN = 3; | ||
| COMPLETED = 4; | ||
| } | ||
| optional ScanPhase phase = 3; | ||
|
|
||
| // For OBS/Legacy (Sequential Scan) | ||
| optional string lastScannedKey = 4; | ||
|
|
||
| // For FSO (DFS Scan) | ||
| optional uint64 currentRuleIndex = 5; // Which rule is currently being evaluated | ||
| optional uint64 lastScannedDirObjId = 6; // The directory currently being processed | ||
| optional string lastScannedFsoKey = 7; // The last key evaluated in the current directory | ||
| optional string lastScannedSubDir = 8; // The last subdirectory evaluated in the current directory | ||
|
|
||
| // For MPU Scan | ||
| optional string lastScannedMpuKey = 9; | ||
| } | ||
| ``` | ||
|
|
||
| ### 2.2 OM Database Schema Updates | ||
| Add a new table to `OMMetadataManager` to store the scan states: | ||
| - **Table Name:** `lifecycleStateTable` | ||
| - **Key:** `bucketKey` (String, e.g., `/volumeName/bucketName`) | ||
| - **Value:** `LifecycleScanState` (Parsed from `LifecycleScanStateProto`) | ||
|
|
||
| ### 2.3 When to Persist the Pointer (Checkpointing) | ||
| Persisting the pointer for every key would overwhelm Ratis and RocksDB. We should checkpoint periodically: | ||
|
|
||
| 1. **Piggybacking on Deletes:** Add an optional `LifecycleScanStateProto` field to `DeleteKeysRequest` and `RenameKeyRequest` (when moving to trash). When the OM state machine applies the deletion, it atomically updates the `lifecycleStateTable` with the new pointer. This guarantees exactly-once semantics for the scan pointer relative to deletions. | ||
| 2. **Periodic Standalone Checkpoints:** If no keys are expired (e.g., scanning millions of valid keys), we still need to save progress. We introduce a new OM request `SaveLifecycleScanStateRequest`. The `LifecycleActionTask` will send this request periodically (e.g., every 100,000 keys iterated, or every 1 minute of execution time). | ||
| 3. **End of Scan:** When the scan for a bucket finishes, a `SaveLifecycleScanStateRequest` is sent to mark the phase as `COMPLETED` and record the completion time. | ||
|
|
||
| ### 2.4 How to Resume the Scan | ||
| When `KeyLifecycleService` schedules a `LifecycleActionTask` for a bucket, it first reads the `LifecycleScanState` from the `lifecycleStateTable`. | ||
|
|
||
| - **OBS/Legacy Resumption:** | ||
| The iterator for `keyTable` is initialized to seek to `lastScannedKey` instead of the bucket prefix. | ||
| ```java | ||
| TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> keyTblItr = keyTable.iterator(bucketPrefix); | ||
| if (state.getLastScannedKey() != null) { | ||
| keyTblItr.seek(state.getLastScannedKey()); | ||
| // skip the exact match since it was already processed | ||
| } | ||
| ``` | ||
|
|
||
| - **FSO Resumption:** | ||
| Rebuilding the DFS stack is required. Using the `lastScannedDirObjId`, the task queries the `directoryTable` to find the directory, then traverses its parent pointers up to the bucket root to reconstruct the `LimitedSizeStack`. | ||
| Once the stack is rebuilt, the task resumes iterating keys in `lastScannedDirObjId` starting from `lastScannedFsoKey`, and subdirectories starting from `lastScannedSubDir`. | ||
|
|
||
| - **MPU Resumption:** | ||
| The `multipartInfoTable` iterator seeks to `lastScannedMpuKey` and continues. | ||
|
|
||
| ### 2.5 Task Yielding (Cooperative Multitasking) | ||
| With resumability in place, we can fix the thread starvation issue. | ||
| - The `LifecycleActionTask` should track its execution time. After running for a maximum time slice (e.g., 2 minutes) or processing a maximum number of keys, the task saves its state via `SaveLifecycleScanStateRequest` and returns `EmptyTaskResult`. | ||
| - The `KeyLifecycleService` will re-schedule the bucket in the next queue polling cycle, allowing other buckets or background tasks to get fair CPU time. |
| import org.apache.ozone.test.GenericTestUtils; | ||
| import org.apache.ozone.test.OzoneTestBase; | ||
| import org.apache.ratis.util.ExitUtils; | ||
| import org.eclipse.jetty.io.ByteBufferPool; |
Comment on lines
+556
to
+567
| do { | ||
| long parentID = currentDir.getParentObjectID(); | ||
| for (int i = 0; i < count; i++) { | ||
| OmDirectoryInfo dir = dirList.getSubDirList().get(i); | ||
| if (dir.getParentObjectID() == parentID) { | ||
| if (dirList.getSubDirKeyList().get(i).compareTo(currentDirTableKey) < 0) { | ||
| return true; | ||
| } | ||
| } | ||
| } | ||
| return false; | ||
| } while(true); |
Comment on lines
+330
to
+332
| if (scanState == null || (scanState != null && (scanState.getBucketObjID() != bucket.getObjectID() || | ||
| scanState.getLifecycleConfigurationUpdateID() != policy.getUpdateID() || | ||
| scanState.getScanEndTime() != null))) { |
Comment on lines
+581
to
+590
| public Stream<Arguments> parameters11() { | ||
| return Stream.of( | ||
| arguments(FILE_SYSTEM_OPTIMIZED, 2), | ||
| arguments(FILE_SYSTEM_OPTIMIZED, 3), | ||
| arguments(FILE_SYSTEM_OPTIMIZED, 7), | ||
| arguments(BucketLayout.OBJECT_STORE, 2), | ||
| arguments(BucketLayout.OBJECT_STORE, 3), | ||
| arguments(BucketLayout.OBJECT_STORE, 7) | ||
| ); | ||
| } |
Comment on lines
+586
to
+589
| } catch (IOException e) { | ||
| LOG.info("Failed to get DirList for lastScannedDirInState", e); | ||
| lastScannedDirInState = null; | ||
| } |
Comment on lines
+918
to
+932
| if (scanStateBuilder != null && scanStateBuilder.getLastScannedKey() != null) { | ||
| keyTblItr.seek(scanStateBuilder.getLastScannedKey()); | ||
| // Skip the exact match since it was already processed | ||
| if (keyTblItr.hasNext()) { | ||
| Table.KeyValue<String, OmKeyInfo> first = keyTblItr.next(); | ||
| if (!first.getKey().equals(scanStateBuilder.getLastScannedKey())) { | ||
| // We seeked past it, so we need to process this one. | ||
| // We can't easily "push back" in TableIterator, so we handle it here. | ||
| processKey(bucketInfo, first.getValue(), ruleList, expiredKeyList, scanStateBuilder); | ||
| numKeyIterated++; | ||
| lastScannedKey = first.getKey(); | ||
| LOG.info("lastScannedKey3 {}", lastScannedKey); | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Provide a one-liner summary of the changes in the PR Title field above.
It should be in the form of
HDDS-1234. Short summary of the change.Please describe your PR in detail:
perspective not just for the reviewer.
the Jira's description if the jira is well defined.
issue investigation, github discussion, etc.
Examples of well-written pull requests:
What is the link to the Apache JIRA
Please create an issue in ASF JIRA before opening a pull request, and you need to set the title of the pull
request which starts with the corresponding JIRA issue number. (e.g. HDDS-XXXX. Fix a typo in YYY.)
If you do not have an ASF Jira account yet, please follow the first-time contributor
instructions in the Jira guideline.
(Please replace this section with the link to the Apache JIRA)
How was this patch tested?
(Please explain how this patch was tested. Ex: unit tests, manual tests, workflow run on the fork git repo.)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this.)