feat(table-services): Support hoodie.clustering.enable.expirations to allow cleanup of failed clustering plans (intended for PreferWriterConflictResolutionStrategy)#18302
Conversation
nsivabalan
left a comment
There was a problem hiding this comment.
Reviewed source code. Will review tests in next iteration
| Option<HoodieClusteringPlan> clusteringPlan = table | ||
| .scheduleClustering(context, instantTime, extraMetadata); | ||
| option = clusteringPlan.map(plan -> instantTime); | ||
| if (option.isPresent() && config.isRollbackFailedClustering()) { |
There was a problem hiding this comment.
using config.isRollbackFailedClustering() here does not sit well.
can we name something like
config.isExpirationOfClusteringEnabled()
this reads nicely and is understandable.
wdyt?
There was a problem hiding this comment.
Expiration is a good name , since its more broad than failed. Updated
| } | ||
| HoodieInstant instant = metaClient.getInstantGenerator() | ||
| .createNewInstant(HoodieInstant.State.INFLIGHT, action, instantToRollback); | ||
| if (!isClusteringInstantEligibleForRollback(metaClient, instant) |
There was a problem hiding this comment.
can we split this up to keep it simple as before. current code structure is bit confusing.
boolean isClusteringInstant = ClusteringUtils.isClusteringInstant(
metaClient.getActiveTimeline(), instant, metaClient.getInstantGenerator())
if (isClusteringInstant) {
if (!isClusteringInstantEligibleForRollback(metaClient, instant)) {
continue;
}
} else {
continue;
}
There was a problem hiding this comment.
actually instead of continue, can we add it to list w/n the if condition only.
There was a problem hiding this comment.
Sure, I refactored it to not need any continue and to be closer to original approach
| .markAdvanced() | ||
| .withDocumentation("Number of heartbeat misses, before a writer is deemed not alive and all pending writes are aborted."); | ||
|
|
||
| public static final ConfigProperty<Boolean> ROLLBACK_FAILED_CLUSTERING = ConfigProperty |
There was a problem hiding this comment.
lets move this to HoodieClusteringConfig and name this
hoodie.clustering.enable.expirations
There was a problem hiding this comment.
Are there chances that we will enable expiration by clustering runners too or you folks purely needed this from ingestion writer standpoint?
There was a problem hiding this comment.
Sure I can use that config name.
In our clustering table service runner, we currently want to rollback any (potentially conflicting) clustering instant with an expired heartbeat, regardless of how recent it is. So currently with our internal setup/build, we in clustering runner we enable the config to allow rollback of failed clustering writes, but we set hoodie.clustering.expiration.time.mins to 0
| + "client must be used to schedule, execute, and commit the clustering instant."); | ||
|
|
||
| public static final ConfigProperty<Long> ROLLBACK_FAILED_CLUSTERING_WAIT_MINUTES = ConfigProperty | ||
| .key("hoodie.rollback.failed.clustering.wait.minutes") |
There was a problem hiding this comment.
how about hoodie.clustering.expiration.time.mins ?
There was a problem hiding this comment.
Sure we can use that name
| .markAdvanced() | ||
| .withDocumentation("When hoodie.rollback.failed.clustering is enabled, rollbackFailedWrites will not attempt to rollback " | ||
| + "a clustering instant unless it is at least this many minutes old. This is a temporary guardrail to reduce the chance " | ||
| + "of transient failures from concurrent rollback attempts until https://github.com/apache/hudi/issues/18050 is resolved."); |
There was a problem hiding this comment.
lets add talk about 18050.
we are going to get 18050 landed for 1.2 shortly.
So, why talk about it in the documentation.
There was a problem hiding this comment.
Oh ok if this change will anyway be released with the rest in 1.2 then makes sense let me remove that reference
| metaClient.getActiveTimeline(), instant, metaClient.getInstantGenerator()); | ||
| } | ||
|
|
||
| private static boolean isInstantOldEnough(String instantTime, long waitMinutes) { |
| } | ||
| } | ||
|
|
||
| public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient metaClient, HoodieInstant instant) { |
There was a problem hiding this comment.
where are we checking for heart beats?
There was a problem hiding this comment.
In getInstantsToRollback, we first call this function and then the subsequent call to getInstantsToRollbackForLazyCleanPolicy will filter out instants with an active heartbeat. Because getInstantsToRollbackForLazyCleanPolicy already takes care of this heartbeat check I decided to avoid doing a heartbeat check + timeline reload again beforehand in isClusteringInstantEligibleForRollback
| private static boolean isInstantOldEnough(String instantTime, long waitMinutes) { | ||
| try { | ||
| Date instantDate = TimelineUtils.parseDateFromInstantTime(instantTime); | ||
| long ageMs = System.currentTimeMillis() - instantDate.getTime(); |
There was a problem hiding this comment.
Lets get the time zone from hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone() and calculate based on that?
There was a problem hiding this comment.
Oh I see, good point unlike the timeline the client job might not be in UTC then I guess? Updated
|
|
||
| public static final ConfigProperty<Long> ROLLBACK_FAILED_CLUSTERING_WAIT_MINUTES = ConfigProperty | ||
| .key("hoodie.rollback.failed.clustering.wait.minutes") | ||
| .defaultValue(60L) |
There was a problem hiding this comment.
which interval this config dictates.
is it, after the heart beat expires, we need to wait until X mins to trigger rollback?
if yes, the documentation is not clear
There was a problem hiding this comment.
We wait x minutes until after the requested instant is created, updated doc to clarify that
|
@kbuci : can you update PR description based on the change in config key |
...nt/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient metaClient, HoodieInstant instant) { |
There was a problem hiding this comment.
I wanted HoodieClusteringJob to be able to use this API
There was a problem hiding this comment.
can we make it static then.
| HoodieInstantTimeGenerator.fixInstantTimeCompatibility(instantTime), | ||
| HoodieInstantTimeGenerator.MILLIS_INSTANT_TIME_FORMATTER); | ||
| long instantEpochMs = instantDateTime.atZone(zoneId).toInstant().toEpochMilli(); | ||
| long ageMs = System.currentTimeMillis() - instantEpochMs; |
There was a problem hiding this comment.
should we do something like
ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(java.time.Instant.now(), table.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
long currentTimeMs = latestDateTime.toInstant().toEpochMilli();
long replaceCommitInstantTime = HoodieInstantTimeGenerator.parseDateFromInstantTime(instantTime).toInstant().toEpochMilli();
long ageMs = currentTimeMs - replaceCommitInstantTime;
| return ageMs >= TimeUnit.MINUTES.toMillis(expirationMins); | ||
| } catch (DateTimeParseException e) { | ||
| log.warn("Could not parse instant time {}, assuming it has expired", instantTime, e); | ||
| return true; |
There was a problem hiding this comment.
won't this unintentionally rollback an on-going or in progress clustering?
should we throw instead?
There was a problem hiding this comment.
Thanks for catching, fixing to make sure errors are re-thrown as needed
| .key("hoodie.clustering.updates.strategy") | ||
| .defaultValue("org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy") | ||
| .withInferFunction(cfg -> { | ||
| String strategy = cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME, ""); |
There was a problem hiding this comment.
can you write some UTs for this.
what does it mean to have a default value set(L245) and also infer func(L246).
shouldn't only one of them will take effect?
If yes, then, we should fix L247 to set org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy as default
There was a problem hiding this comment.
Added a UT and removed the default value to avoid confusion
| public static final ConfigProperty<Boolean> ENABLE_EXPIRATIONS = ConfigProperty | ||
| .key("hoodie.clustering.enable.expirations") | ||
| .defaultValue(false) | ||
| .withInferFunction(cfg -> { |
There was a problem hiding this comment.
same comment as above.
lets validate if setting both default value and infer func works
| .withInferFunction(cfg -> { | ||
| String strategy = cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME, ""); | ||
| if (PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) { | ||
| return Option.of(true); |
There was a problem hiding this comment.
can we leave this false for OOB users. this is slightly orthogonal feature right.
There was a problem hiding this comment.
Sure let me just make this default false then and not infer, since even though this is intended for preferred write conflict users, its technically orthogonal to that
| metaClient.getActiveTimeline(), instant, metaClient.getInstantGenerator()); | ||
| } | ||
|
|
||
| private static boolean hasInstantExpired(HoodieTableMetaClient metaClient, String instantTime, long expirationMins) { |
There was a problem hiding this comment.
are we not checking the heart beat expiry, but just purely based on the instant time?
There was a problem hiding this comment.
I wanted to avoid redundant heartbeat checks which is why I didn't do that here - https://github.com/apache/hudi/pull/18302/changes#r2943866147 .. Especially since the caller anyway needs to reload the timeline after to double-check that an inflight instant with expired heartbeat wasn't actually just a completed instant.
But since this is not intuitive when reading the code, I ended up abandoning this approach. And now this API also checks heartbeat status.
| getPendingClusteringInstantsForPartitions(metaClient, partitions).stream() | ||
| .filter(instantTime -> { | ||
| HoodieInstant instant = metaClient.getInstantGenerator() | ||
| .createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.CLUSTERING_ACTION, instantTime); |
There was a problem hiding this comment.
is it not possible to reuse the writeClient to rollback an expired clustering.
I am looking to avoid having duplicate codes.
for eg, if we chance the way we deduce the expiration of a clustering instant, we have to fix in two places.
There was a problem hiding this comment.
Based on other changes, I avoided some redundant heartbeat related checks
| metaClient.getActiveTimeline(), instant, metaClient.getInstantGenerator()); | ||
| } | ||
|
|
||
| private static boolean hasInstantExpired(HoodieTableMetaClient metaClient, String instantTime, long expirationMins) { |
There was a problem hiding this comment.
should we check from the time the heart beat expired and it we have elapsed the expiration interval?
There was a problem hiding this comment.
I didn't want to go with this approach, since if the heartbeat file is cleaned up then I'm not sure if there is an easy way to infer when the heartbeat actually expired.
| } | ||
| } | ||
|
|
||
| public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient metaClient, HoodieInstant instant) { |
| } | ||
| } | ||
|
|
||
| public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient metaClient, HoodieInstant instant) { |
There was a problem hiding this comment.
can we make it static then.
| + "client must be used to schedule, execute, and commit the clustering instant. And a clustering plan cannot be " | ||
| + "re-attempted"); | ||
|
|
||
| public static final ConfigProperty<Long> EXPIRATION_TIME_MINS = ConfigProperty |
There was a problem hiding this comment.
sorry. my bad. hoodie.clustering.expiration.threshold.mins
time might give a wrong notion of absolute value. but here, we are referring to interval or threshold.
| metaClient.getActiveTimeline(), instant, metaClient.getInstantGenerator()); | ||
| } | ||
|
|
||
| private static boolean hasInstantExpired(HoodieTableMetaClient metaClient, String instantTime, long expirationMins) { |
| metaClient.getActiveTimeline(), instant, metaClient.getInstantGenerator()); | ||
| } | ||
|
|
||
| private static boolean hasInstantExpired(HoodieTableMetaClient metaClient, String instantTime, long expirationMins) { |
| // --- Tests for clustering expiration logic --- | ||
|
|
||
| @Test | ||
| void isClusteringInstantEligibleForRollback_returnsFalseWhenConfigDisabled() throws IOException { |
There was a problem hiding this comment.
can we parametrize these tests or both v6 and v9. bcoz, the requested and inflight have different actions in the timeline in these table versions.
| // --- Tests for clustering expiration logic --- | ||
|
|
||
| @Test | ||
| void isClusteringInstantEligibleForRollback_returnsFalseWhenConfigDisabled() throws IOException { |
There was a problem hiding this comment.
similarly for some of tests added here, can we parametrize for v6 and v9? lets be tactical. lets not parametrize all tests, but a good no of tests to give us good coverage
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18302 +/- ##
=============================================
- Coverage 69.26% 57.32% -11.95%
+ Complexity 27117 23484 -3633
=============================================
Files 2391 2433 +42
Lines 129572 133338 +3766
Branches 15366 16041 +675
=============================================
- Hits 89746 76431 -13315
- Misses 32969 50706 +17737
+ Partials 6857 6201 -656
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
When using
PreferWriterConflictResolutionStrategyfor multi-writer setups, clustering jobs can fail and leave behind incompletereplacecommitinstants on the timeline. These stale clustering instants block future writes targeting the same file groups and require manual intervention to clean up. This PR introduces automatic rollback of failed clustering instants with expired heartbeats, gated behind a new configuration so it is opt-in for users who need it.Issue: #17879
Summary and Changelog
Adds opt-in support for automatically rolling back failed/stale clustering instants during the
rollbackFailedWritesflow (LAZY cleaning policy), and a utility for partition-targeted rollback of failed clustering.New Configurations:
hoodie.clustering.enable.expirations(default:false): Enables rollback of incomplete clustering instants with expired heartbeats. Can only be applied ifPreferWriterConflictResolutionStrategyis the configured conflict resolution strategy.hoodie.clustering.expiration.time.mins(default:60): Minimum age (in minutes) a clustering instant must have before it is eligible for rollback. Acts as a guardrail against ingestion jobs rolling back clustering operations that a table service platform is anyway immediately attempting to rollback.Behavioral Changes:
HoodieWriteConfig.autoAdjustConfigsForConcurrencyMode: WhenPreferWriterConflictResolutionStrategyis enabled, the clustering updates strategy is automatically set toSparkAllowUpdateStrategyso that ingestion writes can proceed even when there is inflight clustering targeting the same file groups.BaseHoodieTableServiceClient.getInstantsToRollback: Under the LAZY failed writes cleaning policy, eligible incomplete clustering instants (old enough, config enabled, confirmed as clustering action) are now included in the inflight stream before heartbeat-based expiry filtering.BaseHoodieTableServiceClient.getInstantsToRollbackForLazyCleanPolicy: The double-check after timeline reload now also considers the pending replace/clustering timeline when the config is enabled, so that expired clustering instants (if eligible) are not inadvertently filtered out.BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback: Encapsulates the check for whether an instant is a clustering instant (with expired heartbeat) that is old enough and the rollback config is enabled.BaseHoodieTableServiceClient.getPendingRollbackInfos: Uses the new helper to allow re-attempting pending rollback plans for eligible clustering instants.New Utilities in
HoodieClusteringJob:getPendingClusteringInstantsForPartitions(metaClient, partitions): Returns all pending clustering instant times that target any of the given partitions.rollbackFailedClusteringForPartitions(client, metaClient, partitions): Rolls back pending clustering instants targeting the given partitions, filtering for eligibility (config enabled, old enough, clustering action) and expired heartbeat. This allows users such as a table service platform to "clear out" any potentially conflicting clustering plans before attempting a new clustering plan (assuminghoodie.clustering.enable.expirationsis used). Since otherwise that new clustering plan would not include file groups from those other inflight clustering .requested plans.Tests:
TestHoodieWriteConfigfor new config defaults, explicit enable, inference from PreferWriterConflictResolutionStrategy, and auto-adjustment of clustering update strategy.TestBaseHoodieTableServiceClientforisClusteringInstantEligibleForRollbackandgetInstantsToRollbackbehavior with clustering instants under various conditions (config disabled, too recent, eligible, non-clustering, active vs expired heartbeat).TestHoodieClusteringJobforgetPendingClusteringInstantsForPartitionsandrollbackFailedClusteringForPartitions(expired heartbeat triggers rollback, active heartbeat skips rollback).Impact
hoodie.clustering.enable.expirationsandhoodie.clustering.enable.expirations.time.mins.PreferWriterConflictResolutionStrategyis used,hoodie.clustering.updates.strategyis now auto-set toSparkAllowUpdateStrategy.PreferWriterConflictResolutionStrategyare unaffected.Risk Level
Low. The rollback of failed clustering instants is gated behind a config that defaults to
falseand only activates for instants that are old enough (configurable wait time) with expired heartbeats. The auto-adjustment of the clustering update strategy only applies whenPreferWriterConflictResolutionStrategyis already in use. Unit and integration tests cover the key scenarios.Documentation Update
hoodie.clustering.enable.expirationsandhoodie.clustering.enable.expirations.time.minsis included in the config property definitions with inline documentation.hoodie.clustering.updates.strategywhen usingPreferWriterConflictResolutionStrategyis logged at INFO level.Contributor's checklist