diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index bfc16dbf55294..3cb02c0a2dae9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -479,31 +479,29 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo String partitionStatName = entry.getKey(); List writeStats = entry.getValue(); - HashMap updatedFilesToSizesMapping = - writeStats.stream().reduce(new HashMap<>(writeStats.size()), - (map, stat) -> { - String pathWithPartition = stat.getPath(); - if (pathWithPartition == null) { - // Empty partition - log.warn("Unable to find path in write stat to update metadata table {}", stat); - return map; - } - - String fileName = FSUtils.getFileName(pathWithPartition, partitionStatName); - - // Since write-stats are coming in no particular order, if the same - // file have previously been appended to w/in the txn, we simply pick max - // of the sizes as reported after every write, since file-sizes are - // monotonically increasing (ie file-size never goes down, unless deleted) - map.merge(fileName, stat.getFileSizeInBytes(), Math::max); - - Map cdcPathAndSizes = stat.getCdcStats(); - if (cdcPathAndSizes != null && !cdcPathAndSizes.isEmpty()) { - cdcPathAndSizes.forEach((key, value) -> map.put(FSUtils.getFileName(key, partitionStatName), value)); - } - return map; - }, - CollectionUtils::combine); + HashMap updatedFilesToSizesMapping = new HashMap<>(writeStats.size()); + for (HoodieWriteStat stat : writeStats) { + String pathWithPartition = stat.getPath(); + if (pathWithPartition == null) { + // Empty partition + log.warn("Unable to find path in write stat to update metadata table {}", stat); + continue; + } + + String fileName = FSUtils.getFileName(pathWithPartition, partitionStatName); + + // Since write-stats are coming in no particular order, if the same + // file have previously been appended to w/in the txn, we simply pick max + // of the sizes as reported after every write, since file-sizes are + // monotonically increasing (ie file-size never goes down, unless deleted) + updatedFilesToSizesMapping.merge(fileName, stat.getFileSizeInBytes(), Math::max); + + Map cdcPathAndSizes = stat.getCdcStats(); + if (cdcPathAndSizes != null && !cdcPathAndSizes.isEmpty()) { + cdcPathAndSizes.forEach((key, value) -> + updatedFilesToSizesMapping.put(FSUtils.getFileName(key, partitionStatName), value)); + } + } newFileCount.add(updatedFilesToSizesMapping.size()); return HoodieMetadataPayload.createPartitionFilesRecord(partitionStatName, updatedFilesToSizesMapping,