Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -479,31 +479,29 @@ public static List<HoodieRecord> convertMetadataToFilesPartitionRecords(HoodieCo
String partitionStatName = entry.getKey();
List<HoodieWriteStat> writeStats = entry.getValue();

HashMap<String, Long> updatedFilesToSizesMapping =
writeStats.stream().reduce(new HashMap<>(writeStats.size()),
(map, stat) -> {
String pathWithPartition = stat.getPath();
if (pathWithPartition == null) {
// Empty partition
log.warn("Unable to find path in write stat to update metadata table {}", stat);
return map;
}

String fileName = FSUtils.getFileName(pathWithPartition, partitionStatName);

// Since write-stats are coming in no particular order, if the same
// file have previously been appended to w/in the txn, we simply pick max
// of the sizes as reported after every write, since file-sizes are
// monotonically increasing (ie file-size never goes down, unless deleted)
map.merge(fileName, stat.getFileSizeInBytes(), Math::max);

Map<String, Long> cdcPathAndSizes = stat.getCdcStats();
if (cdcPathAndSizes != null && !cdcPathAndSizes.isEmpty()) {
cdcPathAndSizes.forEach((key, value) -> map.put(FSUtils.getFileName(key, partitionStatName), value));
}
return map;
},
CollectionUtils::combine);
HashMap<String, Long> updatedFilesToSizesMapping = new HashMap<>(writeStats.size());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible we use the collect API instead:

Map<String, String> result = instances.stream()
    .collect(Collectors.toMap(
        Instance::deriveKey, 
        Instance::deriveValue,
        (existing, replacement) -> existing // Merge function if keys collide
    ));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean to simplify the entire thing to use Collectors#toMap?

Don't think it's possible...

  1. Each HoodieWriteStat can produce 1 main-file entry plus 0..N CDC entries. A bare #toMap can't express that, we will need #flatMap first to explode each stat into a stream of entries.
  2. There is asymmetric merge semantics, the main path uses Math::max (file sizes monotonically increase, so we keep the largest reported size), while CDC entries use plain put (last write wins). #toMap takes a single merge function, so the two can't be expressed together cleanly if they ever share a key. In prod, IIUC they do not collide, the original code preserves that asymmetry. Changing toMap might override this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bare #toMap can't express that, we will need #flatMap first

flatMap sounds good to me, did you ever try this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will explore further and do this fix when i have time. Will ping you again for a followup fix.

Am focusing on the unstructured track for now.

for (HoodieWriteStat stat : writeStats) {
String pathWithPartition = stat.getPath();
if (pathWithPartition == null) {
// Empty partition
log.warn("Unable to find path in write stat to update metadata table {}", stat);
continue;
}

String fileName = FSUtils.getFileName(pathWithPartition, partitionStatName);

// Since write-stats are coming in no particular order, if the same
// file have previously been appended to w/in the txn, we simply pick max
// of the sizes as reported after every write, since file-sizes are
// monotonically increasing (ie file-size never goes down, unless deleted)
updatedFilesToSizesMapping.merge(fileName, stat.getFileSizeInBytes(), Math::max);

Map<String, Long> cdcPathAndSizes = stat.getCdcStats();
if (cdcPathAndSizes != null && !cdcPathAndSizes.isEmpty()) {
cdcPathAndSizes.forEach((key, value) ->
updatedFilesToSizesMapping.put(FSUtils.getFileName(key, partitionStatName), value));
}
}

newFileCount.add(updatedFilesToSizesMapping.size());
return HoodieMetadataPayload.createPartitionFilesRecord(partitionStatName, updatedFilesToSizesMapping,
Expand Down
Loading