feat(utilities): Add HoodieStashPartitionsTool for backing up and removing partitions#18526
feat(utilities): Add HoodieStashPartitionsTool for backing up and removing partitions#18526nsivabalan wants to merge 3 commits into
Conversation
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review — a few minor readability nits: one FQN-instead-of-import in DefaultStashPartitionRenameHelper, another in HoodieStashPartitionsTool's doDryRun, and a log message in preCheckAndRecoverPartialStash that contradicts its own Javadoc by saying "non-MDT table" when the method runs for both.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This is a useful operational tool, and the decision to couple the file move with the replace commit via a pre-commit validator is well-reasoned and documented. A few correctness/edge-case concerns worth a closer look in the inline comments, mostly around failure-handling in the CLI entry point and the semantics of rollback_stash / pre-check after a commit has already landed. Once the inline comments are addressed, this should be ready for a Hudi committer or PMC member to take it from here.
|
|
||
| preCheckAndRecoverPartialStash(partitionsToStash); | ||
|
|
||
| // Set up props with stash-specific configs for the pre-commit validator |
There was a problem hiding this comment.
🤖 The pre-check unconditionally moves files back from stash to source whenever it sees data in the stash dir. But if a prior stash run actually succeeded (replace commit landed) and the user re-invokes stash with the same partitions, this will move data back into the source dirs even though the timeline still marks those file groups as replaced — leaving the table in an inconsistent state where files physically exist under partitions that Hudi considers deleted (and the cleaner will eventually remove them). Could the pre-check first verify that no completed replace commit exists for these partitions at the last instant (or at least error out with a clear message) before moving data back?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
There was a problem hiding this comment.
sure. fixing it.
| continue; | ||
| } | ||
| // Files exist in stash — copy them back to source to ensure deletePartitions | ||
| // sees all files and creates a complete replace commit. |
There was a problem hiding this comment.
🤖 rollback_stash copies data from stash back to source with no check against the Hudi timeline. If the stash commit already landed (successful stash), running rollback_stash repopulates the source directory but the replace commit still marks those file groups as deleted — readers won't see the restored data, and the cleaner will eventually delete these files. Could rollback_stash fail fast (or at least warn loudly) when a completed replace commit for these partitions exists? The javadoc says it's meant for 'aborting in-progress stashing', but nothing enforces that.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| return; | ||
| } | ||
|
|
||
| // Determine which files already exist in target (from a prior partial attempt) |
There was a problem hiding this comment.
🤖 listDirectEntries returns both files and subdirectories. If a partition directory happens to contain a subdir (or e.g. a stray .hoodie_partition_metadata-like nested path in some storage layouts), the copy / deleteFile loop below would fail or skip silently. Is it worth asserting sourceFileInfo.isFile() (or filtering out directories) before the copy, so we fail loudly on unexpected layouts rather than producing a partial move?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| LOG.info("Copying file {} to {}", sourceFileInfo.getPath(), destFilePath); | ||
| FileIOUtils.copy(storage, sourceFileInfo.getPath(), destFilePath); | ||
| } | ||
| } |
There was a problem hiding this comment.
🤖 The 'already in target' check only matches by file name. If a previous attempt was killed mid-copy and left a truncated file in the target, this will treat it as fully copied and skip, then delete the source copy — silently losing the complete data. Given that this helper is explicitly documented as needing idempotency across partial failures, comparing file size (and/or using a tmp-name + atomic rename in the copy) would make recovery safer.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
There was a problem hiding this comment.
preCheckAndRecoverPartialStash takes care of copying back any partially copied/moved to stash location back to the source location
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: Introduces partition stashing functionality to Hudi: a new interface and default implementation for moving partition files to a stash location, a pre-commit validator to automate stashing during delete-partition operations, and a CLI tool supporting stash, rollback, and dry-run modes for partition management.
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Client as SparkRDDWriteClient
participant Validator as StashPartitionsPreCommitValidator
participant Helper as StashPartitionRenameHelper
participant Storage as HoodieStorage
participant Timeline as HoodieTimeline
Client->>Validator: validate(writeResult, partitionsAffected)
Validator->>Storage: listDirectEntries(sourcePartition)
Storage-->>Validator: source entries
alt Source is non-empty
Validator->>Helper: movePartitionFiles(storage, sourcePath, stashPath)
Helper->>Storage: listDirectEntries(sourcePath)
Storage-->>Helper: source file entries
Helper->>Storage: exists(stashPath)
Storage-->>Helper: false
Helper->>Storage: makeDirectories(stashPath)
Storage-->>Helper: ack
loop For each source file
Helper->>Storage: copy(sourceFile, stashPath)
Storage-->>Helper: ack
end
loop For each source file
Helper->>Storage: delete(sourceFile)
Storage-->>Helper: ack
end
Helper-->>Validator: movePartitionFiles complete
else Source is empty
Validator->>Validator: skip partition
end
Validator-->>Client: validate complete
Client->>Client: deletePartitions(partitions)
Client->>Timeline: commitReplace(replaceInstant)
Timeline-->>Client: commit ack
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Tool as HoodieStashPartitionsTool
participant Helper as StashPartitionRenameHelper
participant Storage as HoodieStorage
participant Timeline as HoodieTimeline
Tool->>Tool: rollback_stash mode
loop For each requested partition
Tool->>Storage: listDirectEntries(stashPath/partition)
Storage-->>Tool: stash entries
alt Stash partition is non-empty
Tool->>Helper: movePartitionFiles(storage, stashPath, sourcePath)
Helper->>Storage: copy(stashFile, sourcePath)
Storage-->>Helper: ack
Helper->>Storage: delete(stashFile)
Storage-->>Helper: ack
Helper-->>Tool: movePartitionFiles complete
else Stash partition is empty
Tool->>Tool: skip partition
end
end
Tool-->>Tool: rollback_stash complete
CodeRabbit: yihua#51 (review)
| // Delete all source files after successful copy | ||
| for (StoragePathInfo sourceFileInfo : sourceFiles) { | ||
| LOG.info("Deleting source file {}", sourceFileInfo.getPath()); | ||
| storage.deleteFile(sourceFileInfo.getPath()); |
There was a problem hiding this comment.
Handle directory entries explicitly before moving.
listDirectEntries can return directories, but Lines 73 and 80 apply file operations unconditionally. That can fail on nested paths and leave partitions partially moved. Please recurse for directories (or reject them explicitly) instead of treating every entry as a file.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/DefaultStashPartitionRenameHelper.java`
around lines 56 - 80, DefaultStashPartitionRenameHelper currently treats every
entry returned by listDirectEntries as a file (iterating sourceFiles and calling
FileIOUtils.copy and storage.deleteFile) which will fail for directory entries;
update the loop that processes sourceFiles to explicitly check StoragePathInfo
(or its Path) for being a directory and either recurse into sub-entries to
copy/delete recursively or skip/reject directories with a clear error; ensure
the same check is applied when building existingTargetFileNames
(getExistingFileNames) so directories are handled consistently, and preserve
existing logging (e.g., the "Copying file ..." and "Deleting source file ..."
messages) only for actual files.
— CodeRabbit (original) (source:comment#3105547965)
| // Delete all source files after successful copy | ||
| for (StoragePathInfo sourceFileInfo : sourceFiles) { | ||
| LOG.info("Deleting source file {}", sourceFileInfo.getPath()); | ||
| storage.deleteFile(sourceFileInfo.getPath()); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify helper logic: skip-existing + copy + delete sequence
rg -n -C3 'existingTargetFileNames|FileIOUtils.copy|deleteFile|listDirectEntries' \
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/DefaultStashPartitionRenameHelper.java
# Verify copy semantics in FileIOUtils
rg -n -C25 'public static void copy\(' \
hudi-io/src/main/java/org/apache/hudi/io/util/FileIOUtils.javaRepository: yihua/hudi
Length of output: 4358
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find listDirectEntries definition to understand what it returns
rg -n 'listDirectEntries' \
hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java \
-A 5 | head -50
# Also check if there's any filtering for files only
rg -n 'isDirectory|isFile' \
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/DefaultStashPartitionRenameHelper.javaRepository: yihua/hudi
Length of output: 1278
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find concrete implementations of HoodieStorage to see what listDirectEntries returns
fd -e java -path '*/storage/*' | head -20 | xargs -I {} grep -l 'class.*HoodieStorage' {} | head -5
# Search for implementations more broadly
rg 'class.*HoodieStorage' -t java | grep -v abstract | head -10Repository: yihua/hudi
Length of output: 222
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find concrete implementations of HoodieStorage
rg 'class.*extends.*HoodieStorage|class.*implements.*HoodieStorage' --type java | head -10
# Also check what StoragePathInfo contains
rg -A 20 'class StoragePathInfo' --type java | head -40Repository: yihua/hudi
Length of output: 3068
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check HoodieHadoopStorage implementation of deleteFile
rg -n 'public.*deleteFile' \
hudi-hadoop-common/src/main/java/org/apache/hudi/storage/hadoop/HoodieHadoopStorage.java \
-A 10 | head -40
# Also check if copy operation would fail on directories
rg -n 'public.*void copy' \
hudi-io/src/main/java/org/apache/hudi/io/util/FileIOUtils.java \
-B 2 -A 15 | head -50Repository: yihua/hudi
Length of output: 1813
Prevent data loss when retrying after interrupted copies.
At line 68, existence-by-name is treated as success. With line 73 using non-atomic streaming copy (1024-byte chunks), a crash during copy leaves a partial destination file. On retry, the existence check treats this incomplete file as fully copied and line 80 deletes the source without validation, causing data loss.
Require completion validation before skipping recopy and deleting source—for example, temp-path copy followed by atomic rename, or verified size/checksum comparison.
Additionally, listDirectEntries() on line 56 returns both files and directories (indicated by the isDirectory flag in StoragePathInfo), but the code does not filter directories. Attempting to copy or delete directories will throw exceptions at lines 73 and 80.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/DefaultStashPartitionRenameHelper.java`
around lines 63 - 80, The current DefaultStashPartitionRenameHelper logic treats
existence-by-name as a successful copy and iterates over all entries from
listDirectEntries() without filtering directories, which can leave partial
destination files on interrupted copies and attempt to copy/delete directories.
Fix by filtering sourceFiles to only regular files (StoragePathInfo.isDirectory
== false) before processing, perform copies to a temporary path and then
atomically rename to the final StoragePath (replace direct FileIOUtils.copy
calls), and validate completion (e.g., compare sizes or checksums via
getExistingFileNames or an added helper) before skipping recopy or calling
storage.deleteFile; update getExistingFileNames usage to check validated
completion rather than name-only presence.
— CodeRabbit (original) (source:comment#3105547968)
| // Fall back to write stats if partitionToReplaceFileIds is not populated | ||
| partitionsAffected = getPartitionsModified(writeResult); | ||
| } | ||
| validateRecordsBeforeAndAfter(before, after, partitionsAffected); |
There was a problem hiding this comment.
Do not fallback to write-stats partitions for this destructive validator.
At Line 113, falling back to getPartitionsModified(...) allows this validator to move files for non-DELETE_PARTITION writes if configured incorrectly. This class should fail fast unless partitionToReplaceFileIds is populated.
Proposed fix
- Set<String> partitionsAffected = new HashSet<>(writeResult.getPartitionToReplaceFileIds().keySet());
- if (partitionsAffected.isEmpty()) {
- // Fall back to write stats if partitionToReplaceFileIds is not populated
- partitionsAffected = getPartitionsModified(writeResult);
- }
+ Set<String> partitionsAffected = new HashSet<>(writeResult.getPartitionToReplaceFileIds().keySet());
+ ValidationUtils.checkState(!partitionsAffected.isEmpty(),
+ "StashPartitionsPreCommitValidator must be used only with DELETE_PARTITION replace commits.");
validateRecordsBeforeAndAfter(before, after, partitionsAffected);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| validateRecordsBeforeAndAfter(before, after, partitionsAffected); | |
| Set<String> partitionsAffected = new HashSet<>(writeResult.getPartitionToReplaceFileIds().keySet()); | |
| ValidationUtils.checkState(!partitionsAffected.isEmpty(), | |
| "StashPartitionsPreCommitValidator must be used only with DELETE_PARTITION replace commits."); | |
| validateRecordsBeforeAndAfter(before, after, partitionsAffected); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/StashPartitionsPreCommitValidator.java`
around lines 112 - 117, The validator must not fall back to write-stats; in
StashPartitionsPreCommitValidator replace the current fallback logic that sets
partitionsAffected from getPartitionsModified(writeResult) with a fail-fast
behavior: when writeResult.getPartitionToReplaceFileIds() is empty, throw a
clear IllegalStateException (or HoodieCommitException) indicating
partitionToReplaceFileIds is required for this destructive validator, otherwise
proceed to call validateRecordsBeforeAndAfter(before, after, partitionsAffected)
using the set from writeResult.getPartitionToReplaceFileIds().keySet().
— CodeRabbit (original) (source:comment#3105547971)
| LOG.error("Fail to run stash partitions for " + cfg, throwable); | ||
| } finally { | ||
| jsc.stop(); | ||
| } |
There was a problem hiding this comment.
Exit with non-zero status when execution fails.
At Line 195, failures are logged but the process still exits successfully. This breaks automation and monitoring around this CLI.
Proposed fix
- try {
+ int exitCode = 0;
+ try {
tool.run();
} catch (Throwable throwable) {
LOG.error("Fail to run stash partitions for " + cfg, throwable);
+ exitCode = 1;
} finally {
jsc.stop();
}
+ if (exitCode != 0) {
+ System.exit(exitCode);
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| } | |
| int exitCode = 0; | |
| try { | |
| tool.run(); | |
| } catch (Throwable throwable) { | |
| LOG.error("Fail to run stash partitions for " + cfg, throwable); | |
| exitCode = 1; | |
| } finally { | |
| jsc.stop(); | |
| } | |
| if (exitCode != 0) { | |
| System.exit(exitCode); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieStashPartitionsTool.java`
around lines 193 - 199, The catch in HoodieStashPartitionsTool around tool.run()
logs failures but never signals an error exit; update the catch block in the
main execution path (where tool.run() is invoked in HoodieStashPartitionsTool)
to either rethrow the exception or call System.exit with a non-zero status
(e.g., System.exit(1)) after logging the error so the process exits with
failure; ensure jsc.stop() still runs in the finally block to clean up
SparkContext.
— CodeRabbit (original) (source:comment#3105547976)
| * sees the complete partition state. | ||
| */ | ||
| private void doStash() { | ||
| List<String> partitionsToStash = Arrays.asList(cfg.partitions.split(",")); |
There was a problem hiding this comment.
Normalize and validate partition input before use.
Raw split(",") allows empty/whitespace tokens. An empty token can resolve to an unintended path scope. Parse once (trim + filter empties) and fail fast if the result is empty.
Proposed fix
- List<String> partitionsToStash = Arrays.asList(cfg.partitions.split(","));
+ List<String> partitionsToStash = parsePartitions(cfg.partitions);
@@
- List<String> partitions = Arrays.asList(cfg.partitions.split(","));
+ List<String> partitions = parsePartitions(cfg.partitions);
@@
- List<String> parts = Arrays.asList(cfg.partitions.split(","));
+ List<String> parts = parsePartitions(cfg.partitions);+ private List<String> parsePartitions(String rawPartitions) {
+ List<String> parsed = Arrays.stream(rawPartitions.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+ if (parsed.isEmpty()) {
+ throw new HoodieException("No valid partitions were provided.");
+ }
+ return parsed;
+ }Also applies to: 303-303, 336-336
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieStashPartitionsTool.java`
at line 239, Normalize and validate the cfg.partitions string before using it:
replace direct Arrays.asList(cfg.partitions.split(",")) with a single parsed
list that trims each token, filters out empty/whitespace tokens, and collects
the results into partitionsToStash (and the other occurrences referenced by
variable partitionsToStash); if the resulting list is empty, throw/exit fast
with a clear error explaining that no valid partitions were provided. Ensure
this logic is used wherever partitions are parsed (e.g., in
HoodieStashPartitionsTool methods that assign partitionsToStash).
— CodeRabbit (original) (source:comment#3105547979)
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the added documentation — the new javadoc block clearly explains why the file move is bound to the pre-commit validator rather than done as a separate step in the tool, which is valuable context for future maintainers. This incremental change is doc-only and doesn't modify any code paths, so the correctness concerns from the prior review (main() swallowing Throwable and exiting 0, preCheckAndRecoverPartialStash and rollback_stash not checking the timeline for an already-landed replace commit, the name-only idempotency check in DefaultStashPartitionRenameHelper, partial-failure on-disk state in StashPartitionsPreCommitValidator, and the subdirectory filter question) all remain open. No new issues flagged from this pass — please take a look at the outstanding inline comments from the prior round, and this should be ready for a Hudi committer or PMC member to take it from here.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: A new partition stashing feature is introduced with interfaces, implementations, validators, and a CLI tool to move partition files from source locations to a configured stash directory during delete operations, supporting recovery and rollback scenarios.
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant WriteClient
participant Validator as PreCommitValidator
participant Helper as RenameHelper
participant Storage as HoodieStorage
participant Table
WriteClient->>Validator: validate(writeResult, ...)
Validator->>Table: getPartitionsModified()/getPartitionToReplaceFileIds()
Validator->>Storage: check RLI enabled?
Validator->>Validator: instantiate StashPartitionRenameHelper
loop For each affected partition
Validator->>Storage: check source partition exists & non-empty
alt Source is empty/missing
Validator->>Validator: skip (already stashed)
else Source has files
Validator->>Helper: movePartitionFiles(source, stash)
Helper->>Storage: ensure target directory exists
Helper->>Storage: list source entries
Helper->>Storage: list existing files in target
Helper->>Storage: copy source files (skip if present in target)
Helper->>Storage: delete source files
end
end
Validator-->>WriteClient: success / HoodieValidationException
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant CLI as HoodieStashPartitionsTool
participant Client as SparkRDDWriteClient
participant Helper as RenameHelper
participant Storage as HoodieStorage
CLI->>CLI: parse Config (mode, partitions, stash path)
alt Mode == STASH
CLI->>CLI: pre-check recovery (reverse partial prior stashes)
CLI->>Client: startCommitAction(REPLACE_COMMIT_ACTION)
CLI->>Client: deletePartitions(partitions, instantTime)
Note over Client: Pre-commit Validator stashes files<br/>(via movePartitionFiles)
Client-->>CLI: write statuses & partition→fileId mappings
CLI->>Client: commit(statuses, mappings)
else Mode == ROLLBACK_STASH
loop For each partition
CLI->>Helper: movePartitionFiles(stash, source)
Helper->>Storage: copy stash files to source
Helper->>Storage: delete stash files
end
else Mode == DRY_RUN
CLI->>CLI: compute affected file groups per partition
CLI->>Storage: check source/stash path states
CLI->>CLI: log planned impacts (no modifications)
end
CodeRabbit: hudi-agent#8 (review)
| throw new HoodieException("Failed to recover partition " + partition + " from stash during pre-check.", e); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Fix misleading log message.
Line 289 says "(non-MDT table)" but the method's Javadoc (lines 285-286) explicitly states this runs "for both MDT and non-MDT tables." The log message is incorrect and could confuse operators reviewing logs.
🔧 Proposed fix
private void preCheckAndRecoverPartialStash(List<String> partitions) {
- LOG.info("Running pre-check for partial stash recovery (non-MDT table).");
+ LOG.info("Running pre-check for partial stash recovery.");
HoodieStorage storage = HoodieStorageUtils.getStorage(📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| } | |
| private void preCheckAndRecoverPartialStash(List<String> partitions) { | |
| LOG.info("Running pre-check for partial stash recovery."); | |
| HoodieStorage storage = HoodieStorageUtils.getStorage( | |
| cfg.basePath, metaClient.getStorageConf()); | |
| StashPartitionRenameHelper renameHelper = createRenameHelper(); | |
| StoragePath basePath = metaClient.getBasePath(); | |
| for (String partition : partitions) { | |
| StoragePath stashPartitionPath = new StoragePath(cfg.stashPath, partition); | |
| StoragePath sourcePartitionPath = new StoragePath(basePath, partition); | |
| try { | |
| if (!storage.exists(stashPartitionPath) || isDirectoryEmpty(storage, stashPartitionPath)) { | |
| LOG.info("Partition {} has no stashed data from prior attempt, skipping pre-check.", partition); | |
| continue; | |
| } | |
| // Files exist in stash — copy them back to source to ensure deletePartitions | |
| // sees all files and creates a complete replace commit. | |
| LOG.info("Found stashed files for partition {} from prior attempt. Moving back to source.", partition); | |
| renameHelper.movePartitionFiles(storage, stashPartitionPath, sourcePartitionPath); | |
| LOG.info("Recovered partition {} back to source.", partition); | |
| } catch (IOException e) { | |
| throw new HoodieException("Failed to recover partition " + partition + " from stash during pre-check.", e); | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieStashPartitionsTool.java`
around lines 288 - 313, Update the misleading log message in
preCheckAndRecoverPartialStash: replace the current LOG.info("Running pre-check
for partial stash recovery (non-MDT table).") with a message that reflects this
runs for both MDT and non-MDT tables (e.g., "Running pre-check for partial stash
recovery (MDT and non-MDT tables).") so it matches the method Javadoc; ensure
you change the LOG.info call in the
HoodieStashPartitionsTool.preCheckAndRecoverPartialStash method and nothing
else.
— CodeRabbit (original) (source:comment#3113644201)
kbuci
left a comment
There was a problem hiding this comment.
Thanks for adding this tool! Had some initial questions/suggestions
| return entries.isEmpty(); | ||
| } | ||
|
|
||
| private StashPartitionRenameHelper createRenameHelper() { |
There was a problem hiding this comment.
can we re-use same helper in StashPartitionsPreCommitValidator ?
There was a problem hiding this comment.
sorry, not sure what do you mean by this. the pre commit validator gets instantiated using reflection. So, we cannot instantiate a renameHelper in our tool and make the preCommitValidator use the same instance.
or did you mean something else
|
hey @kbuci : all feedback has been addressed. |
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the thorough iteration — most of the prior feedback has been addressed: main() now rethrows, pre-check and rollback both classify partition state via HoodieTableMetadata (distinguishing ALREADY_STASHED / PARTIAL_STASH / NOT_STASHED / NO_DATA), the rename helper has separate stash/restore directions with idempotency tests, SI is now disallowed, and there's a new validateFileGroupsForStash enforcing exactly-one-slice + base-file-only + no-log-files. A couple of items worth double-checking in the inline comments: whether the retry-after-commit-crash test accurately reproduces the intended scenario when MDT is enabled, the mixed "some already stashed, some not" flow, and whether other MDT index partitions beyond RLI/SI should also be disallowed. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here.
| assertSourcePartitionEmpty(storage, new StoragePath(basePath(), partition)); | ||
| } | ||
| metaClient = HoodieTableMetaClient.reload(metaClient); | ||
| assertFalse(metaClient.getActiveTimeline().getCompletedReplaceTimeline().empty()); |
There was a problem hiding this comment.
🤖 The new simulation runs a real stash (which lands the replace commit and updates MDT) and then deletes only the DT timeline's replace instant. In the real "validator succeeded but commit crashed" scenario, neither DT nor MDT would have a committed replace — so MDT still "knows" the partition was replaced. With enableMetadata=true, does evaluatePartitionStates then see partitionHasFiles=false and classify all partitions as ALREADY_STASHED, causing the retry to short-circuit without re-running deletePartitions? If so, the assertion that a replace commit exists after retry may be passing for a different reason than intended (e.g. the stale DT instant state leaves a lingering commit), or may only hold because MDT is also implicitly reset. Could you confirm what path is actually being exercised here?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| return false; | ||
| } | ||
| return true; | ||
| } |
There was a problem hiding this comment.
🤖 If a subset of the provided partitions come back as ALREADY_STASHED and the rest as NOT_STASHED, this proceeds with the full partition list into deletePartitions and validatePartitionsForStash. For the already-stashed ones the partition is already gone on disk (and from MDT), so validateFileGroups vacuously passes and the replace commit attempts to delete a partition that's not present. Is that the intended behavior, or should already-stashed partitions be filtered out of the list before the next stages so the commit operates only on partitions that actually have data? A mid-run partial success today could put the user into this state (some partitions stashed + committed, others failed before reaching the commit).
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| !metadataConfig.isRecordLevelIndexEnabled() && !metadataConfig.isGlobalRecordLevelIndexEnabled(), | ||
| "StashPartitionsPreCommitValidator does not support tables with Record Level Index (RLI) enabled. " | ||
| + "Disable RLI before using the stash partitions tool."); | ||
| ValidationUtils.checkState( |
There was a problem hiding this comment.
🤖 RLI/Global-RLI/SI are now blocked, but other MDT indexes that carry partition-level state (column_stats, partition_stats, bloom_filters, expression_index) aren't checked. Is that intentional? If any of those can end up with stale entries after a raw file move that bypasses normal write plumbing, it might be worth either disallowing them here too, or clarifying in javadoc/docs which index combinations are supported.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18526 +/- ##
============================================
- Coverage 68.88% 68.68% -0.21%
- Complexity 28279 28473 +194
============================================
Files 2464 2478 +14
Lines 135568 136972 +1404
Branches 16446 16653 +207
============================================
+ Hits 93385 94077 +692
- Misses 34801 35324 +523
- Partials 7382 7571 +189
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Closes #17866
Users need the ability to back up certain partitions to a separate location and remove them from the original Hudi table, with the option to restore them later. This is driven by use cases like partition TTL where stashed data needs to be retained for a grace period before permanent deletion.
Summary and Changelog
Adds a new spark-submit tool HoodieStashPartitionsTool that moves partition data to a stash location and issues a deletePartitions operation. The tool uses a custom SparkPreCommitValidator to atomically move files before the replace commit completes.
Stash operation:
Why a pre-commit validator instead of moving files in the tool?
A simpler approach would be to first call deletePartitions from the tool, then move data files to the stash location as a separate step. However, this is unsafe: if the tool crashes after the deletePartitions replace commit succeeds but before the data is moved, the data is at risk. Once the replace commit lands on the timeline, the Hudi cleaner will eventually delete the physical data files for the replaced file groups based on its configured retention policy. If the cleaner runs before the tool is retried, the data intended for stashing is permanently lost.
By using a StashPartitionsPreCommitValidator, the file move happens before the commit is finalized. If the move fails, the commit does not land, and the table remains in its original state — the cleaner has nothing to clean. This ensures data is never in a state where it has been marked as deleted but has not been safely copied to the stash location.
Rollback stash (rollback_stash mode):
Evaluates partition states using the same HoodieTableMetadata-based classification. Only restores files for partitions in PARTIAL_STASH state (active files exist + stash has data — the commit never landed). For partitions where the stash fully completed (ALREADY_STASHED — replace commit landed), rollback is skipped with a warning directing the user to use insert_overwrite from the stash location instead, since simply moving files back does not undo the replace commit on the timeline.
Restore: Users use the standard insert_overwrite write operation reading from the stash location (no new tooling).
Changelog:
(stash→table). Separate APIs allow custom implementations to use different storage strategies per direction (e.g., fast rename for stashing, verified copy for restoring).
FS-level rename APIs.
Impact
Risk Level
Low.
Documentation Update
Test Plan
Contributor's checklist