refactor(metadata): Replace misused stream reduce with a plain for-loop#18532
refactor(metadata): Replace misused stream reduce with a plain for-loop#18532voonhous wants to merge 1 commit into
Conversation
- HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords aggregated per-partition write stats via writeStats.stream().reduce(new HashMap<>(), accumulator, CollectionUtils::combine). - The "identity" is a mutable HashMap that the accumulator mutates in place - a misuse of Stream.reduce. - It only works because the stream is sequential and the method runs on the driver (HoodieMetadataWriteUtils then wraps the result via context.parallelize(..., 1)). - A plain for-loop expresses the same aggregation directly and is idiomatic for mutable-accumulation sequential code. - No behavior change. No measurable perf impact - readability/idiom cleanup.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the cleanup! This refactor replaces a misused Stream.reduce (with a mutable HashMap identity) with an equivalent for-loop, preserving the same merge semantics. No issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.
cc @yihua
| return map; | ||
| }, | ||
| CollectionUtils::combine); | ||
| HashMap<String, Long> updatedFilesToSizesMapping = new HashMap<>(writeStats.size()); |
There was a problem hiding this comment.
is it possible we use the collect API instead:
Map<String, String> result = instances.stream()
.collect(Collectors.toMap(
Instance::deriveKey,
Instance::deriveValue,
(existing, replacement) -> existing // Merge function if keys collide
));There was a problem hiding this comment.
You mean to simplify the entire thing to use Collectors#toMap?
Don't think it's possible...
- Each HoodieWriteStat can produce 1 main-file entry plus 0..N CDC entries. A bare
#toMapcan't express that, we will need#flatMapfirst to explode each stat into a stream of entries. - There is asymmetric merge semantics, the main path uses
Math::max(file sizes monotonically increase, so we keep the largest reported size), while CDC entries use plain put (last write wins).#toMaptakes a single merge function, so the two can't be expressed together cleanly if they ever share a key. In prod, IIUC they do not collide, the original code preserves that asymmetry. Changing toMap might override this.
There was a problem hiding this comment.
A bare #toMap can't express that, we will need #flatMap first
flatMap sounds good to me, did you ever try this?
There was a problem hiding this comment.
Will explore further and do this fix when i have time. Will ping you again for a followup fix.
Am focusing on the unstructured track for now.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18532 +/- ##
=========================================
Coverage 68.84% 68.84%
- Complexity 28323 28336 +13
=========================================
Files 2467 2467
Lines 135839 135840 +1
Branches 16483 16481 -2
=========================================
+ Hits 93518 93520 +2
+ Misses 34922 34921 -1
Partials 7399 7399
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecordsaggregated per-partition write stats viawriteStats.stream().reduce(new HashMap<>(...), accumulator, CollectionUtils::combine).The "identity" is a mutable
HashMapthat the accumulator mutates and returns which is a misuse ofStream.reduce, whose contract assumes the identity is safe to combine with any element as a no-op.The only reason this works is that the stream is sequential and the method runs on the driver (the caller,
HoodieMetadataWriteUtils, then wraps the returned list viacontext.parallelize(..., 1)).A plain for-loop expresses the same aggregation directly and is the idiomatic shape for mutable-accumulation sequential code.
Summary and Changelog
Internal readability/idiom cleanup in the metadata-table write path. No behavior change.
writeStats.stream().reduce(...)call with an imperativefor (HoodieWriteStat stat : writeStats)loop that builds theupdatedFilesToSizesMappingHashMap directly.Math::maxon per-file size; CDC path/size entries overlaid).partitionToWriteStats.entrySet().stream().map(...)pipeline.Impact
None. Readability and idiom cleanup only; behavior and allocation shape are materially unchanged.
Risk Level
low
Documentation Update
none
Contributor's checklist