Skip to content

Commit

Permalink
Merge branch 'return-map-output-metadata' into register-map-output-me…
Browse files Browse the repository at this point in the history
…tadata
  • Loading branch information
mccheah committed Jun 3, 2020
2 parents 25e98e7 + 4fd056d commit 51df151
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
Expand Up @@ -61,8 +61,13 @@ public interface ShuffleMapOutputWriter {
* <p>
* This can also close any resources and clean up temporary state if necessary.
* <p>
* The returned array should contain, for each partition from (0) to (numPartitions - 1), the
* number of bytes written by the partition writer for that partition id.
* The returned commit message is a structure with two components:
* <p>
* 1) An array of longs, which should contain, for each partition from (0) to
* (numPartitions - 1), the number of bytes written by the partition writer
* for that partition id.
* <p>
* 2) An optional metadata blob that can be used by shuffle readers.
*/
MapOutputCommitMessage commitAllPartitions() throws IOException;

Expand Down
Expand Up @@ -39,7 +39,7 @@ public final class MapOutputCommitMessage {
private final long[] partitionLengths;
private final Optional<MapOutputMetadata> mapOutputMetadata;

public MapOutputCommitMessage(
private MapOutputCommitMessage(
long[] partitionLengths, Optional<MapOutputMetadata> mapOutputMetadata) {
this.partitionLengths = partitionLengths;
this.mapOutputMetadata = mapOutputMetadata;
Expand Down
Expand Up @@ -283,8 +283,9 @@ private MapOutputCommitMessage mergeSpills(SpillInfo[] spills) throws IOExceptio
Optional<MapOutputMetadata> maybeMetadata =
maybeSingleFileWriter.get().transferMapSpillFile(
spills[0].file, spills[0].partitionLengths);
mapOutputCommitMessage = new MapOutputCommitMessage(
spills[0].partitionLengths, maybeMetadata);
mapOutputCommitMessage = maybeMetadata.map(
metadata -> MapOutputCommitMessage.of(spills[0].partitionLengths, metadata))
.orElse(MapOutputCommitMessage.of(spills[0].partitionLengths));
} else {
mapOutputCommitMessage = mergeSpillsUsingStandardWriter(spills);
}
Expand Down

0 comments on commit 51df151

Please sign in to comment.