Skip to content

Commit

Permalink
[SPARK-32658][CORE] Fix PartitionWriterStream partition length over…
Browse files Browse the repository at this point in the history
…flow

### What changes were proposed in this pull request?

The `count` in `PartitionWriterStream` should be a long value, instead of int. The issue is introduced by apache/sparkabef84a . When the overflow happens, the shuffle index file would record wrong index of a reduceId, thus lead to `FetchFailedException: Stream is corrupted` error.

Besides the fix, I also added some debug logs, so in the future it's easier to debug similar issues.

### Why are the changes needed?

This is a regression and bug fix.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A Spark user reported this issue when migrating their workload to 3.0. One of the jobs fail deterministically on Spark 3.0 without the patch, and the job succeed after applied the fix.

Closes #29474 from jiangxb1987/fixPartitionWriteStream.

Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f793977)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
jiangxb1987 authored and cloud-fan committed Aug 20, 2020
1 parent c4a12f2 commit b87ec5d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.shuffle.sort;

import java.nio.channels.Channels;
import java.util.Arrays;
import java.util.Optional;
import javax.annotation.Nullable;
import java.io.*;
Expand Down Expand Up @@ -274,6 +275,8 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
// Here, we don't need to perform any metrics updates because the bytes written to this
// output file would have already been counted as shuffle bytes written.
partitionLengths = spills[0].partitionLengths;
logger.debug("Merge shuffle spills for mapId {} with length {}", mapId,
partitionLengths.length);
maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths);
} else {
partitionLengths = mergeSpillsUsingStandardWriter(spills);
Expand Down Expand Up @@ -360,6 +363,7 @@ private void mergeSpillsWithFileStream(
SpillInfo[] spills,
ShuffleMapOutputWriter mapWriter,
@Nullable CompressionCodec compressionCodec) throws IOException {
logger.debug("Merge shuffle spills with FileStream for mapId {}", mapId);
final int numPartitions = partitioner.numPartitions();
final InputStream[] spillInputStreams = new InputStream[spills.length];

Expand All @@ -369,6 +373,11 @@ private void mergeSpillsWithFileStream(
spillInputStreams[i] = new NioBufferedFileInputStream(
spills[i].file,
inputBufferSizeInBytes);
// Only convert the partitionLengths when debug level is enabled.
if (logger.isDebugEnabled()) {
logger.debug("Partition lengths for mapId {} in Spill {}: {}", mapId, i,
Arrays.toString(spills[i].partitionLengths));
}
}
for (int partition = 0; partition < numPartitions; partition++) {
boolean copyThrewException = true;
Expand Down Expand Up @@ -431,6 +440,7 @@ private void mergeSpillsWithFileStream(
private void mergeSpillsWithTransferTo(
SpillInfo[] spills,
ShuffleMapOutputWriter mapWriter) throws IOException {
logger.debug("Merge shuffle spills with TransferTo for mapId {}", mapId);
final int numPartitions = partitioner.numPartitions();
final FileChannel[] spillInputChannels = new FileChannel[spills.length];
final long[] spillInputChannelPositions = new long[spills.length];
Expand All @@ -439,6 +449,11 @@ private void mergeSpillsWithTransferTo(
try {
for (int i = 0; i < spills.length; i++) {
spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
// Only convert the partitionLengths when debug level is enabled.
if (logger.isDebugEnabled()) {
logger.debug("Partition lengths for mapId {} in Spill {}: {}", mapId, i,
Arrays.toString(spills[i].partitionLengths));
}
}
for (int partition = 0; partition < numPartitions; partition++) {
boolean copyThrewException = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public long[] commitAllPartitions() throws IOException {
}
cleanUp();
File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
log.debug("Writing shuffle index file for mapId {} with length {}", mapId,
partitionLengths.length);
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
return partitionLengths;
}
Expand Down Expand Up @@ -210,14 +212,14 @@ public long getNumBytesWritten() {

private class PartitionWriterStream extends OutputStream {
private final int partitionId;
private int count = 0;
private long count = 0;
private boolean isClosed = false;

PartitionWriterStream(int partitionId) {
this.partitionId = partitionId;
}

public int getCount() {
public long getCount() {
return count;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ private[spark] class IndexShuffleBlockResolver(
}
}
} finally {
logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", "]")}")
if (indexTmp.exists() && !indexTmp.delete()) {
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
}
Expand Down

0 comments on commit b87ec5d

Please sign in to comment.