Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47518][CORE] Skip transfer the last spilled shuffle data #45661

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Optional;
import java.util.zip.Checksum;

import org.apache.spark.SparkException;
Expand Down Expand Up @@ -153,8 +154,10 @@ public long[] getChecksums() {
* @param isFinalFile if true, this indicates that we're writing the final output file and that
* the bytes written should be counted towards shuffle write metrics rather
* than shuffle spill metrics.
* @param finalDataFileDir if present, the directory to write the final output file to. If not
* present, the file will be written to a temporary directory.
*/
private void writeSortedFile(boolean isFinalFile) {
private void writeSortedFile(boolean isFinalFile, Optional<File> finalDataFileDir) {
// Only emit the log if this is an actual spilling.
if (!isFinalFile) {
logger.info(
Expand Down Expand Up @@ -199,8 +202,9 @@ private void writeSortedFile(boolean isFinalFile) {
// createTempShuffleBlock here; see SPARK-3426 for more details.
final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
blockManager.diskBlockManager().createTempShuffleBlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one idea: if isFinalFile is true, then we call a special version of createTempShuffleBlock that takes shuffle & map id, and returns a file path under the same directory of the final shuffle file. Then we don't need to change other places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean change the parameters of createTempShuffleBlockInDir from finalDataFileDir to Tuple2<ShuffleId, MapId> ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would assume the final output has to go to blockResolver.getDataFile(shuffleId, mapId) right @cloud-fan ?
Currently at this layer we do not make that assumption ...

I was initially toying with the idea of passing mapId and shuffleId as constructor params ... and do something similar when I realized this would make assumptions that the code currently does not make - and so why the base directory is being passed around.

(And then ofcourse I thought we could solve it in LocalDiskSingleSpillMapOutputWriter here, but was completely wrong :-( ).

final File file = spilledFileInfo._2();
final TempShuffleBlockId blockId = spilledFileInfo._1();
final File file =
finalDataFileDir.map(d -> new File(d, blockId.name())).orElseGet(spilledFileInfo::_2);
final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);

// Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
Expand Down Expand Up @@ -292,7 +296,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
return 0L;
}

writeSortedFile(false);
writeSortedFile(false, Optional.empty());
final long spillSize = freeMemory();
inMemSorter.reset();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
Expand Down Expand Up @@ -440,14 +444,16 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
/**
* Close the sorter, causing any buffered data to be sorted and written out to disk.
*
* @param finalDataFileDir if present, the directory to write the final output file to. If not
* present, the file will be written to a temporary directory.
* @return metadata for the spill files written by this sorter. If no records were ever inserted
* into this sorter, then this will return an empty array.
*/
public SpillInfo[] closeAndGetSpills() throws IOException {
public SpillInfo[] closeAndGetSpills(Optional<File> finalDataFileDir) throws IOException {
if (inMemSorter != null) {
// Here we are spilling the remaining data in the buffer. If there is no spill before, this
// final spill file will be the final shuffle output file.
writeSortedFile(/* isFinalFile = */spills.isEmpty());
writeSortedFile(/* isFinalFile = */spills.isEmpty(), finalDataFileDir);
freeMemory();
inMemSorter.free();
inMemSorter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
import org.apache.spark.shuffle.api.ShufflePartitionWriter;
import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter;
import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
Expand Down Expand Up @@ -219,7 +221,15 @@ void closeAndWriteOutput() throws IOException {
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
Optional<File> finalDataFileDir;
if (shuffleExecutorComponents instanceof LocalDiskShuffleExecutorComponents) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it looks a bit hacky to handle local disk shuffle specially here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not familiar with the block storage in KubernetesLocalDiskShuffleExecutorComponents, so only handle LocalDiskShuffleExecutorComponents here.

Or should I add a new method getDataFile() in trait ShuffleExecutorComponents ?

File dataFile =
new IndexShuffleBlockResolver(sparkConf, blockManager).getDataFile(shuffleId, mapId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only used to invoke getParentFile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

finalDataFileDir = Optional.of(dataFile.getParentFile());
} else {
finalDataFileDir = Optional.empty();
}
final SpillInfo[] spills = sorter.closeAndGetSpills(finalDataFileDir);
try {
partitionLengths = mergeSpills(spills);
} finally {
Expand Down