[HUDI-XXXXX] fix: Parallelize cloud object existence checks in S3EventsHoodieIncrSource#18252
Open
vinishjail97 wants to merge 2 commits intoapache:masterfrom
Open
[HUDI-XXXXX] fix: Parallelize cloud object existence checks in S3EventsHoodieIncrSource#18252vinishjail97 wants to merge 2 commits intoapache:masterfrom
vinishjail97 wants to merge 2 commits intoapache:masterfrom
Conversation
…dieIncrSource Repartition by totalExecutorCores and run a per-task thread pool for concurrent HEAD requests during file existence checks. Previously all checks ran sequentially in a single partition due to upstream Window coalescing, causing ~5hr latency for 176K+ files. - Add EXISTS_CHECK_PARALLELISM config (default 32 threads per task) - Repartition distinct() output by totalCores for cluster utilization - Add thread pool in getCloudObjectMetadataPerPartition using CompletableFuture.supplyAsync with explicit ExecutorService - Extract processRow helper, fix per-file INFO log to DEBUG - Add tests for parallel and sequential exists check paths
…e in parallel exists check Replace sequential per-future join() with CompletableFuture.allOf().join() to wait for all futures concurrently inside the try block. This ensures shutdownNow() only fires after all work is done, preventing interrupted threads when the caller consumes results. Also adds proper CompletionException handling on the fan-in join.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18252 +/- ##
============================================
- Coverage 57.30% 57.23% -0.07%
- Complexity 18561 18602 +41
============================================
Files 1945 1948 +3
Lines 106256 106732 +476
Branches 13131 13199 +68
============================================
+ Hits 60885 61086 +201
- Misses 39648 39888 +240
- Partials 5723 5758 +35
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Collaborator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
EXISTS_CHECK_PARALLELISMconfig (default 32 threads per Spark task) for concurrent HEAD requests during file existence checksdistinct()output bytotalExecutorCoresinstead of using a single partition (caused by upstreamWindow.orderBy()+ AQE coalescing)getCloudObjectMetadataPerPartitionusingCompletableFuture.supplyAsyncwith explicitExecutorServiceCompletableFuture.allOf().join()to wait for all futures concurrently before executor shutdown, preventingSdkInterruptedExceptionon interrupted threadsprocessRowhelper, fix per-file INFO log to DEBUGContext
The upstream unpartitioned
Window.orderBy()forces all data into a single partition. AQE then coalesces the downstreamdistinct()output to 1 partition. This means all file exists checks run sequentially in a single thread — for 176K+ files at ~100ms per HEAD request, that's ~5 hours.New approach: repartition by
totalExecutorCoresand run a 32-thread pool within each partition'smapPartitionslambda. This givestotalCores × 32concurrent HEAD requests across the cluster.Bug fix: executor shutdown race
The original implementation returned a lazy
iterator()from inside thetryblock, causingshutdownNow()infinallyto fire before any.join()calls executed. This interrupted in-flight S3getFileStatusthreads, leading toSdkInterruptedException→AbortedException→InterruptedIOException→ task failure after 4 retries.Fix: use
CompletableFuture.allOf().join()to wait for all futures at a single fan-in point inside thetryblock, then eagerly collect results beforefinallyfires.Test plan
TestCloudObjectsSelectorCommon— new tests for parallel exists check, sequential fallback, size validationTestS3EventsHoodieIncrSource— verify no regressions