Skip to content

[HUDI-7503] Compaction execution should fail if another active writer is already executing the same plan#18012

Open
kbuci wants to merge 14 commits intoapache:masterfrom
kbuci:OSS-7503
Open

[HUDI-7503] Compaction execution should fail if another active writer is already executing the same plan#18012
kbuci wants to merge 14 commits intoapache:masterfrom
kbuci:OSS-7503

Conversation

@kbuci
Copy link
Contributor

@kbuci kbuci commented Jan 27, 2026

Describe the issue this Pull Request addresses

Updated compact to start a heartbeat (within a transaction) before attempting to execute a plan.

If multiple writers attempt to execute same compact/logcompact plan at same time, only one of them will process and the rest will fail with an exception (upon seeing a heartbeat has already been started) and will abort.

Summary and Changelog

Without this change, if multiple jobs are launched at the same that target executing the same compact plan (due to a non-HUDI related user-side configuration/orchestration issue) then one job can execute the compact plan and create an inflight/commit instant file while the other jobs can roll it back (and delete inflight instant files or data files). This can lead to dataset being in a corrupted state.

With this change, we are updating compact API (that executes an existing compaction plan) to first take the table lock (by starting a transaction), start a heartbeat, and release the table lock (end the transaction). Though if a heartbeat already exists, then an exception will be thrown. The heartbeat will be closed when the compaction plan execution succeeds. This means that if compaction execution fails but a spark task/driver is still lingering and writing/deleting files, the heartbeat won't be immediately cleaned up.

heartbeat table service lifecycle

If multiple concurrent job attempt to try execute a compaction plan, then all jobs except the first one will fail (until the heartbeat of first on expires). Note that the table lock mentioned above is needed to ensure that multiple writers don't independently check that heartbeat is inactive and start it.

flow table serivce

Impact

Risk Level

Documentation Update

None

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions bot added the size:S PR with lines of changes in (10, 100] label Jan 27, 2026
@github-actions github-actions bot added size:M PR with lines of changes in (100, 300] and removed size:S PR with lines of changes in (10, 100] labels Jan 29, 2026
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we create a follow up to enable this for clustering as well.

Future<?> future1 = executors.submit(() -> {
try {
// Wait for both writers to be ready
cyclicBarrier.await(60, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use countdownLatch to be deterministic and not add a 60 sec wait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the 60 sec wait, but my understanding is that this API is deterministic as well

client1.close();
client2.close();
FileIOUtils.deleteDirectory(new File(basePath));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also test the case where writer1 started compaction execution and failed mid-way. and after sometime writer2 starts and able to rollback and execute the failed compaction (after heart beat expired)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added another test cae

@kbuci
Copy link
Contributor Author

kbuci commented Feb 4, 2026

@nsivabalan While checking failing UTs, I noticed a complication. In some places the client calls compact with auto commit as false, and then later calls one of the "commit compaction" APIs. At first I thought this was just for UTs, but I realized that this is actually a legitimate way users can execute a compaction
In spark I see some usages like
org.apache.hudi.utilities.HoodieCompactor#doCompact
org.apache.spark.sql.hudi.command.procedures.RunCompactionProcedure#call
And for flink specifically as well
org.apache.hudi.sink.v2.compact.CompactionCommitSinkV2#commitIfNecessary

Because of this, for now I am thinking that we can just have this heatbeat guard if auto-complete/commit is enabled?
The alternative is that we change the heartbeating logic such that it starts and closes the heartbeat twice

  • When calling the compact API it starts a heartbeat (within a transaction) at beginning , and then stops it before it exits the function (or auto-commits)
  • When calling any of the APIs to commit a compaction, we again start a heartbeat (within a transaction) and then stop it after completing the compaction commit

The issue though is that this adds some code complexity since there seem to be multiple APIs that users can directly call for committing a compaction:

  • org.apache.hudi.client.BaseHoodieWriteClient#completeCompaction (mainly used by Flink)
  • org.apache.hudi.client.BaseHoodieTableServiceClient#commitCompaction (technically engine agnostic, but based on comment my hunch is that its mainly intended for spark use case)
    We could add the checks to both of those APIs, but at first glance I'm not sure if that would actually catch all "concurrent compact attempt" edge cases. For example, if one the Flink "ingestion sink" classes executes a compaction plan at the same time as a concurrent spark wrtier.

What are your thoughts?

@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Feb 6, 2026
@kbuci kbuci requested a review from nsivabalan February 6, 2026 19:23
@nsivabalan
Copy link
Contributor

I feel, we should keep it simple.

start at the beginning of compact() method in TableServiceClient (irrespective of whether shouldComplete is true or not).

w/n completeCompaction, in the end, we end the heart beat.

but lets ensure we have a try catch block w/n compact(), so that w/n catch block we can stop the heart beat on failures.
same applies to completeCompaction as well.

@kbuci
Copy link
Contributor Author

kbuci commented Feb 13, 2026

I feel, we should keep it simple.

start at the beginning of compact() method in TableServiceClient (irrespective of whether shouldComplete is true or not).

w/n completeCompaction, in the end, we end the heart beat.

That's what I originally tried. But the issue I saw though is that there could be case like #18012 (comment) where a user calls compact without comimting, and then calls compact again. So in that scenario we would need to stop the heartbeat of that first compact call be exiting, even though auto commit wasn't enabled.

@kbuci
Copy link
Contributor Author

kbuci commented Feb 17, 2026

@nsivabalan As discussed, updated PR under the assumption that:

  • Once a (spark/flink) user attempts to execute a compaction plan via compact , they must (attempt to) commit the compaction in the same job. Either implicity via enabling auto-commit like Spark MDT writers or by explicitly calling it later like what Flink ingestion wrappers seems to do.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments