Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47518][CORE] Skip transfer the last spilled shuffle data #45661

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Optional;
import java.util.zip.Checksum;

import org.apache.spark.SparkException;
Expand Down Expand Up @@ -153,8 +154,10 @@ public long[] getChecksums() {
* @param isFinalFile if true, this indicates that we're writing the final output file and that
* the bytes written should be counted towards shuffle write metrics rather
* than shuffle spill metrics.
* @param finalDataFileDir if present, the directory to write the final output file to. If not
* present, the file will be written to a temporary directory.
*/
private void writeSortedFile(boolean isFinalFile) {
private void writeSortedFile(boolean isFinalFile, Optional<File> finalDataFileDir) {
// Only emit the log if this is an actual spilling.
if (!isFinalFile) {
logger.info(
Expand Down Expand Up @@ -198,7 +201,8 @@ private void writeSortedFile(boolean isFinalFile) {
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more details.
final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
blockManager.diskBlockManager().createTempShuffleBlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one idea: if isFinalFile is true, then we call a special version of createTempShuffleBlock that takes shuffle & map id, and returns a file path under the same directory of the final shuffle file. Then we don't need to change other places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean change the parameters of createTempShuffleBlockInDir from finalDataFileDir to Tuple2<ShuffleId, MapId> ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would assume the final output has to go to blockResolver.getDataFile(shuffleId, mapId) right @cloud-fan ?
Currently at this layer we do not make that assumption ...

I was initially toying with the idea of passing mapId and shuffleId as constructor params ... and do something similar when I realized this would make assumptions that the code currently does not make - and so why the base directory is being passed around.

(And then ofcourse I thought we could solve it in LocalDiskSingleSpillMapOutputWriter here, but was completely wrong :-( ).

finalDataFileDir.map(blockManager.diskBlockManager()::createTempShuffleBlockInDir)
.orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
finalDataFileDir.map(blockManager.diskBlockManager()::createTempShuffleBlockInDir)
.orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock);
finalDataFileDir.filter(v -> spills.isEmpty()).map(blockManager.diskBlockManager()::createTempShuffleBlockInDir)
.orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock);

We need this only when there is a single output file, else we want to spread it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit this change, thanks

final File file = spilledFileInfo._2();
final TempShuffleBlockId blockId = spilledFileInfo._1();
final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);
Expand Down Expand Up @@ -292,7 +296,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
return 0L;
}

writeSortedFile(false);
writeSortedFile(false, Optional.empty());
final long spillSize = freeMemory();
inMemSorter.reset();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
Expand Down Expand Up @@ -440,14 +444,16 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
/**
* Close the sorter, causing any buffered data to be sorted and written out to disk.
*
* @param finalDataFileDir if present, the directory to write the final output file to. If not
* present, the file will be written to a temporary directory.
* @return metadata for the spill files written by this sorter. If no records were ever inserted
* into this sorter, then this will return an empty array.
*/
public SpillInfo[] closeAndGetSpills() throws IOException {
public SpillInfo[] closeAndGetSpills(Optional<File> finalDataFileDir) throws IOException {
if (inMemSorter != null) {
// Here we are spilling the remaining data in the buffer. If there is no spill before, this
// final spill file will be the final shuffle output file.
writeSortedFile(/* isFinalFile = */spills.isEmpty());
writeSortedFile(/* isFinalFile = */spills.isEmpty(), finalDataFileDir);
freeMemory();
inMemSorter.free();
inMemSorter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
import org.apache.spark.shuffle.api.ShufflePartitionWriter;
import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter;
import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
Expand Down Expand Up @@ -219,7 +221,15 @@ void closeAndWriteOutput() throws IOException {
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
Optional<File> finalDataFileDir;
if (shuffleExecutorComponents instanceof LocalDiskShuffleExecutorComponents) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it looks a bit hacky to handle local disk shuffle specially here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not familiar with the block storage in KubernetesLocalDiskShuffleExecutorComponents, so only handle LocalDiskShuffleExecutorComponents here.

Or should I add a new method getDataFile() in trait ShuffleExecutorComponents ?

File dataFile =
new IndexShuffleBlockResolver(sparkConf, blockManager).getDataFile(shuffleId, mapId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only used to invoke getParentFile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

finalDataFileDir = Optional.of(dataFile.getParentFile());
} else {
finalDataFileDir = Optional.empty();
}
final SpillInfo[] spills = sorter.closeAndGetSpills(finalDataFileDir);
try {
partitionLengths = mergeSpills(spills);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,24 @@ private[spark] class DiskBlockManager(
(blockId, getFile(blockId))
}

/** Produces a unique block id and File suitable for storing shuffled intermediate results
* in the input directory. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit.

  /**
   * Produces a unique block id and File suitable for storing shuffled intermediate results
   * in the input directory.
   */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

def createTempShuffleBlockInDir(fileDir: File): (TempShuffleBlockId, File) = {
var blockId = TempShuffleBlockId(UUID.randomUUID())
var tmpFile = new File(fileDir, blockId.name)
while (tmpFile.exists()) {
blockId = TempShuffleBlockId(UUID.randomUUID())
tmpFile = new File(fileDir, blockId.name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fileDir is invalid, we are in the Infinite loop, aren't we? It seems that we need a safe guard to avoid the infinite loop, @wankunde . Also, please add a unit test case for the invalid fileDir (maybe, nonExist)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added UT, if fileDir is invalid, tmpFile will not exists, exit this loop.
Generate new block only when the fileDir is valid and the first TempShuffleBlockId file already created by some other task.

}
if (permissionChangingRequired) {
// SPARK-37618: we need to make the file world readable because the parent will
// lose the setgid bit when making it group writable. Without this the shuffle
// service can't read the shuffle files in a secure setup.
createWorldReadableFile(tmpFile)
}
(blockId, tmpFile)
}

/** Produces a unique block id and File suitable for storing shuffled intermediate results. */
def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
var blockId = TempShuffleBlockId(UUID.randomUUID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,21 @@ public void setUp() throws Exception {
spillFilesCreated.add(file);
return Tuple2$.MODULE$.apply(blockId, file);
});
when(diskBlockManager.createTempShuffleBlockInDir(any(File.class))).thenAnswer(invocationOnMock -> {
File fileDir = (File) invocationOnMock.getArguments()[0];
TempShuffleBlockId blockId = new TempShuffleBlockId(UUID.randomUUID());
File file = spy(new File(fileDir, blockId.name()));
when(file.delete()).thenAnswer(inv -> {
totalSpilledDiskBytes += file.length();
return inv.callRealMethod();
});
spillFilesCreated.add(file);
return new Tuple2<>(blockId, file);
});
when(diskBlockManager.getFile(any(BlockId.class))).thenAnswer(invocationOnMock -> {
BlockId blockId = (BlockId) invocationOnMock.getArguments()[0];
return new File(tempDir, blockId.name());
});
when(diskBlockManager.createTempFileWith(any(File.class))).thenAnswer(invocationOnMock -> {
File file = (File) invocationOnMock.getArguments()[0];
return Utils.tempFileWith(file);
Expand Down