-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47518][CORE] Skip transfer the last spilled shuffle data #45661
base: master
Are you sure you want to change the base?
Changes from 8 commits
4c3256d
2434b03
5e23dcf
9b1c993
ca30594
0bbef20
04e9fd9
9d36397
5cb52bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -21,6 +21,7 @@ | |||||||||
import java.io.File; | ||||||||||
import java.io.IOException; | ||||||||||
import java.util.LinkedList; | ||||||||||
import java.util.Optional; | ||||||||||
import java.util.zip.Checksum; | ||||||||||
|
||||||||||
import org.apache.spark.SparkException; | ||||||||||
|
@@ -153,8 +154,10 @@ public long[] getChecksums() { | |||||||||
* @param isFinalFile if true, this indicates that we're writing the final output file and that | ||||||||||
* the bytes written should be counted towards shuffle write metrics rather | ||||||||||
* than shuffle spill metrics. | ||||||||||
* @param finalDataFileDir if present, the directory to write the final output file to. If not | ||||||||||
* present, the file will be written to a temporary directory. | ||||||||||
*/ | ||||||||||
private void writeSortedFile(boolean isFinalFile) { | ||||||||||
private void writeSortedFile(boolean isFinalFile, Optional<File> finalDataFileDir) { | ||||||||||
// Only emit the log if this is an actual spilling. | ||||||||||
if (!isFinalFile) { | ||||||||||
logger.info( | ||||||||||
|
@@ -198,7 +201,8 @@ private void writeSortedFile(boolean isFinalFile) { | |||||||||
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use | ||||||||||
// createTempShuffleBlock here; see SPARK-3426 for more details. | ||||||||||
final Tuple2<TempShuffleBlockId, File> spilledFileInfo = | ||||||||||
blockManager.diskBlockManager().createTempShuffleBlock(); | ||||||||||
finalDataFileDir.map(blockManager.diskBlockManager()::createTempShuffleBlockInDir) | ||||||||||
.orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
We need this only when there is a single output file, else we want to spread it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Commit this change, thanks |
||||||||||
final File file = spilledFileInfo._2(); | ||||||||||
final TempShuffleBlockId blockId = spilledFileInfo._1(); | ||||||||||
final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); | ||||||||||
|
@@ -292,7 +296,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { | |||||||||
return 0L; | ||||||||||
} | ||||||||||
|
||||||||||
writeSortedFile(false); | ||||||||||
writeSortedFile(false, Optional.empty()); | ||||||||||
final long spillSize = freeMemory(); | ||||||||||
inMemSorter.reset(); | ||||||||||
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the | ||||||||||
|
@@ -440,14 +444,16 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p | |||||||||
/** | ||||||||||
* Close the sorter, causing any buffered data to be sorted and written out to disk. | ||||||||||
* | ||||||||||
* @param finalDataFileDir if present, the directory to write the final output file to. If not | ||||||||||
* present, the file will be written to a temporary directory. | ||||||||||
* @return metadata for the spill files written by this sorter. If no records were ever inserted | ||||||||||
* into this sorter, then this will return an empty array. | ||||||||||
*/ | ||||||||||
public SpillInfo[] closeAndGetSpills() throws IOException { | ||||||||||
public SpillInfo[] closeAndGetSpills(Optional<File> finalDataFileDir) throws IOException { | ||||||||||
if (inMemSorter != null) { | ||||||||||
// Here we are spilling the remaining data in the buffer. If there is no spill before, this | ||||||||||
// final spill file will be the final shuffle output file. | ||||||||||
writeSortedFile(/* isFinalFile = */spills.isEmpty()); | ||||||||||
writeSortedFile(/* isFinalFile = */spills.isEmpty(), finalDataFileDir); | ||||||||||
freeMemory(); | ||||||||||
inMemSorter.free(); | ||||||||||
inMemSorter = null; | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -226,6 +226,25 @@ private[spark] class DiskBlockManager( | |
(blockId, getFile(blockId)) | ||
} | ||
|
||
/** Produces a unique block id and File suitable for storing shuffled intermediate results | ||
* in the input directory. | ||
*/ | ||
def createTempShuffleBlockInDir(fileDir: File): (TempShuffleBlockId, File) = { | ||
var blockId = TempShuffleBlockId(UUID.randomUUID()) | ||
var tmpFile = new File(fileDir, blockId.name) | ||
while (tmpFile.exists()) { | ||
blockId = TempShuffleBlockId(UUID.randomUUID()) | ||
tmpFile = new File(fileDir, blockId.name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added UT, if |
||
} | ||
if (permissionChangingRequired) { | ||
// SPARK-37618: we need to make the file world readable because the parent will | ||
// lose the setgid bit when making it group writable. Without this the shuffle | ||
// service can't read the shuffle files in a secure setup. | ||
createWorldReadableFile(tmpFile) | ||
} | ||
(blockId, tmpFile) | ||
} | ||
|
||
/** Produces a unique block id and File suitable for storing shuffled intermediate results. */ | ||
def createTempShuffleBlock(): (TempShuffleBlockId, File) = { | ||
var blockId = TempShuffleBlockId(UUID.randomUUID()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one idea: if
isFinalFile
is true, then we call a special version ofcreateTempShuffleBlock
that takes shuffle & map id, and returns a file path under the same directory of the final shuffle file. Then we don't need to change other places?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean change the parameters of
createTempShuffleBlockInDir
from finalDataFileDir to Tuple2<ShuffleId, MapId> ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would assume the final output has to go to
blockResolver.getDataFile(shuffleId, mapId)
right @cloud-fan ?Currently at this layer we do not make that assumption ...
I was initially toying with the idea of passing
mapId
andshuffleId
as constructor params ... and do something similar when I realized this would make assumptions that the code currently does not make - and so why the base directory is being passed around.(And then ofcourse I thought we could solve it in
LocalDiskSingleSpillMapOutputWriter
here, but was completely wrong :-( ).