Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47518][CORE] Skip transfer the last spilled shuffle data #45661

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

wankunde
Copy link
Contributor

@wankunde wankunde commented Mar 22, 2024

What changes were proposed in this pull request?

If there is only one spill data file, spark will transfer that spill file to the final data file.

@Override
  public void transferMapSpillFile(
      File mapSpillFile,
      long[] partitionLengths,
      long[] checksums) throws IOException {
    // The map spill file already has the proper format, and it contains all of the partition data.
    // So just transfer it directly to the destination without any merging.
    File outputFile = blockResolver.getDataFile(shuffleId, mapId);
    File tempFile = Utils.tempFileWith(outputFile);
    Files.move(mapSpillFile.toPath(), tempFile.toPath());
    blockResolver
      .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, checksums, tempFile);
  }

But if that spill file and the final data file are on different disks, there will still be a heavy data transfer.

sun.nio.fs.UnixCopyFile.transfer(Native Method)
sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:251)
sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:471)
sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
java.nio.file.Files.move(Files.java:1395)
org.apache.spark.shuffle.sort.io.LocalDiskSingleSpillMapOutputWriter.transferMapSpillFile(LocalDiskSingleSpillMapOutputWriter.java:52)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpills(UnsafeShuffleWriter.java:280)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:180)
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
org.apache.spark.scheduler.Task.run(Task.scala:131)
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
org.apache.spark.executor.Executor$TaskRunner$$Lambda$453/980524593.apply(Unknown Source)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1465)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

We can get the disk where the final shuffle data file is located, and spill the final temp shuffle block in that disk to avoid unnecessary data copying.

Why are the changes needed?

Optimize spark shuffle performance.

Test mv a 2.6 G file between HDD disks on a node which has a high iowait :

$ time mv /hadoop/2/yarn/local/yarn-yarn-nodemanager.log.10 /hadoop/3/yarn/local/yarn-yarn-nodemanager.log.10

real	2m19.694s
user	0m0.029s
sys	0m6.120s
$ time mv /hadoop/3/yarn/local/yarn-yarn-nodemanager.log.10 /hadoop/3/yarn/local/yarn-yarn-nodemanager.log.11

real	0m0.003s
user	0m0.001s
sys	0m0.001s

When mv file to another disk which has a high iowait is very slow while mv file to the same disk is quite fast.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Exists UT

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Mar 22, 2024
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the PR description, are you suggesting the linux file move is slow, @wankunde ? Could you elaborate your experience with numbers?

there will still be a heavy data transfer.

java.nio.file.Files.move(Files.java:1395)

@wankunde wankunde changed the title [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [SPARK-47518][CORE] Skip transfer the last spilled shuffle data Mar 22, 2024
@wankunde wankunde changed the title [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data Mar 22, 2024
@wankunde
Copy link
Contributor Author

wankunde commented Mar 22, 2024

Test mv a 2.6 G file on a node which has a high iowait :

$ time mv /hadoop/2/yarn/local/yarn-yarn-nodemanager.log.10 /hadoop/3/yarn/local/yarn-yarn-nodemanager.log.10

real	2m19.694s
user	0m0.029s
sys	0m6.120s
$ time mv /hadoop/3/yarn/local/yarn-yarn-nodemanager.log.10 /hadoop/3/yarn/local/yarn-yarn-nodemanager.log.11

real	0m0.003s
user	0m0.001s
sys	0m0.001s

When mv file to another disk which has a high iowait is very slow while mv file to the same disk is quite fast.

@dongjoon-hyun
Copy link
Member

Thanks. Could you put that into the PR description, @wankunde .

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional<File> finalDataFileDir;
if (shuffleExecutorComponents instanceof LocalDiskShuffleExecutorComponents) {
File dataFile =
new IndexShuffleBlockResolver(sparkConf, blockManager).getDataFile(shuffleId, mapId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only used to invoke getParentFile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

@@ -226,6 +226,24 @@ private[spark] class DiskBlockManager(
(blockId, getFile(blockId))
}

/** Produces a unique block id and File suitable for storing shuffled intermediate results
* in the input directory. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit.

  /**
   * Produces a unique block id and File suitable for storing shuffled intermediate results
   * in the input directory.
   */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

var tmpFile = new File(fileDir, blockId.name)
while (tmpFile.exists()) {
blockId = TempShuffleBlockId(UUID.randomUUID())
tmpFile = new File(fileDir, blockId.name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fileDir is invalid, we are in the Infinite loop, aren't we? It seems that we need a safe guard to avoid the infinite loop, @wankunde . Also, please add a unit test case for the invalid fileDir (maybe, nonExist)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added UT, if fileDir is invalid, tmpFile will not exists, exit this loop.
Generate new block only when the fileDir is valid and the first TempShuffleBlockId file already created by some other task.

@@ -219,7 +221,15 @@ void closeAndWriteOutput() throws IOException {
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
Optional<File> finalDataFileDir;
if (shuffleExecutorComponents instanceof LocalDiskShuffleExecutorComponents) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it looks a bit hacky to handle local disk shuffle specially here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not familiar with the block storage in KubernetesLocalDiskShuffleExecutorComponents, so only handle LocalDiskShuffleExecutorComponents here.

Or should I add a new method getDataFile() in trait ShuffleExecutorComponents ?

@wankunde
Copy link
Contributor Author

UT org.apache.spark.util.collection.SorterSuite failed:

2024-03-23T03:43:57.7101735Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[32m- SPARK-5984 TimSort bug (32 seconds, 966 milliseconds)�[0m�[0m
2024-03-23T03:43:58.5733341Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31morg.apache.spark.util.collection.SorterSuite *** ABORTED *** (33 seconds, 954 milliseconds)�[0m�[0m
2024-03-23T03:43:58.5736160Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  java.lang.OutOfMemoryError: Java heap space�[0m�[0m
2024-03-23T03:43:58.5739209Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.util.collection.SorterSuite.$anonfun$new$11(SorterSuite.scala:145)�[0m�[0m
2024-03-23T03:43:58.5747591Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.util.collection.SorterSuite$$Lambda$17217/0x00007f618e8d2610.apply$mcV$sp(Unknown Source)�[0m�[0m
2024-03-23T03:43:58.5751054Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)�[0m�[0m

@wankunde
Copy link
Contributor Author

Retest this please

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To @wankunde , to be clear, the below two comments are critical to this PR. You had better answer to resolve them.

@wankunde
Copy link
Contributor Author

To @wankunde , to be clear, the below two comments are critical to this PR. You had better answer to resolve them.

My replies were not submitted , sorry about that.

@wankunde wankunde changed the title [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [SPARK-47518][CORE] Skip transfer the last spilled shuffle data Mar 25, 2024
@cloud-fan
Copy link
Contributor

I'm a bit worried about this change, as it changes the assumption of always having two phases during shuffle: first phase only write to temp files, the second phase "commit" it to the final destination in a short time. The shuffle and task scheduling process is quite convoluted in Spark and I can't be 100% sure that this is a safe change.

@wankunde
Copy link
Contributor Author

wankunde commented Mar 25, 2024

I'm a bit worried about this change, as it changes the assumption of always having two phases during shuffle: first phase only write to temp files, the second phase "commit" it to the final destination in a short time. The shuffle and task scheduling process is quite convoluted in Spark and I can't be 100% sure that this is a safe change.

Hi, @cloud-fan , the shuffle in this PR still need two phases during shuffle:

  • Stage 1 will write to a TempShuffleBlockId file in the final shuffle data file directory, and will make sure no conflict.
  • Stage 2 is "commit phase", and will just rename the TempShuffleBlockId file to the final shuffle data file if there is only one shuffle spill file.

@ulysses-you
Copy link
Contributor

Test mv a 2.6 G file on a node which has a high iowait :

@wankunde what kind of disk did you test ? Have you tried ssd ?

@wankunde
Copy link
Contributor Author

Test mv a 2.6 G file on a node which has a high iowait :

@wankunde what kind of disk did you test ? Have you tried ssd ?

This test is on HDD disks, copy 2.6 file on NVMe disks is 10x times faster than HDD.

@cloud-fan
Copy link
Contributor

Before this PR, will Spark write any temp files to the final shuffle file directory?

@wankunde
Copy link
Contributor Author

Before this PR, will Spark write any temp files to the final shuffle file directory?

In most cases, spark will manage its internal files as block files, including ShuffleDataBlockId, ShuffleIndexBlockId, TempShuffleBlockId and some other blocks. These block files are located by hash(block name) and the full block file path will be localDirs(hash % localDirs.length) / (hash / localDirs.length) % subDirsPerLocalDir / filename

If the block files have the same hash code, then will be in the same directory.

@cloud-fan
Copy link
Contributor

To confirm: is the goal to write the last spill file to the same directory as the final shuffle file, so that the following transfer operation can be cheap?

@mridulm
Copy link
Contributor

mridulm commented Mar 26, 2024

A move between disks requires actual data copy, while a mv within the same disk is simply a metadata operation.
Having said that, if the disk doing < 20 MBps transfer rates, there are some severe infra issues to be sorted out.

I understand the motivation for the proposed fix here, but I agree with @cloud-fan - I am not in favor of complicating spark code for handling extremely degenerate cases like this.

@wankunde
Copy link
Contributor Author

To confirm: is the goal to write the last spill file to the same directory as the final shuffle file, so that the following transfer operation can be cheap?

Yes

@cloud-fan
Copy link
Contributor

cloud-fan commented Mar 26, 2024

I think a more general approach is, DiskBlockManager#createTempShuffleBlock should co-locate temp shuffle files and final shuffle files (with the same shuffle id and map id). This benefits more than one spill files as well.

@wankunde
Copy link
Contributor Author

wankunde commented Mar 26, 2024

I think a more general approach is, DiskBlockManager#createTempShuffleBlock should co-locate temp shuffle files and final shuffle files (with the same shuffle id and map id). This benefits more than one spill files as well.

Spark will rename the TempShuffleBlock file to the final data file when there is only one TempShuffleBlock file.

} else if (spills.length == 1) {
Optional<SingleSpillShuffleMapOutputWriter> maybeSingleFileWriter =
shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId);
if (maybeSingleFileWriter.isPresent()) {
// Here, we don't need to perform any metrics updates because the bytes written to this
// output file would have already been counted as shuffle bytes written.
partitionLengths = spills[0].partitionLengths;
logger.debug("Merge shuffle spills for mapId {} with length {}", mapId,
partitionLengths.length);
maybeSingleFileWriter.get()
.transferMapSpillFile(spills[0].file, partitionLengths, sorter.getChecksums());
} else {
partitionLengths = mergeSpillsUsingStandardWriter(spills);
}

If there are multiple TempShuffleBlock files, spark will always read all the shuffle data into the final shuffle data file.
At this time, the workload of reading and writing is unavoidable.

@cloud-fan
Copy link
Contributor

I think renaming and reading/writing should be faster if the files are co-located on the same disk?

@mridulm
Copy link
Contributor

mridulm commented Mar 26, 2024

I think renaming and reading/writing should be faster if the files are co-located on the same disk?

At these disk speeds, other than rename, it will be slower if they are on same disk.
In general though, we can run out of disk space in a single disk as well - which is why we spread it across disks.

@@ -198,7 +201,8 @@ private void writeSortedFile(boolean isFinalFile) {
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more details.
final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
blockManager.diskBlockManager().createTempShuffleBlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one idea: if isFinalFile is true, then we call a special version of createTempShuffleBlock that takes shuffle & map id, and returns a file path under the same directory of the final shuffle file. Then we don't need to change other places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean change the parameters of createTempShuffleBlockInDir from finalDataFileDir to Tuple2<ShuffleId, MapId> ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would assume the final output has to go to blockResolver.getDataFile(shuffleId, mapId) right @cloud-fan ?
Currently at this layer we do not make that assumption ...

I was initially toying with the idea of passing mapId and shuffleId as constructor params ... and do something similar when I realized this would make assumptions that the code currently does not make - and so why the base directory is being passed around.

(And then ofcourse I thought we could solve it in LocalDiskSingleSpillMapOutputWriter here, but was completely wrong :-( ).

@mridulm
Copy link
Contributor

mridulm commented Mar 26, 2024

ExecutorDiskUtils.getFilePath and DiskBlockManager.getFile determines the layout of the files ...
and given this is specific to an optimization in LocalDiskShuffleExecutorComponents (which determines the output file), why not simply modify LocalDiskSingleSpillMapOutputWriter.transferMapSpillFile to leverage this layout information and 'host' the final output in the same disk ?

KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore for example, does something similar as well (using the layout info to recover - not this specific layout).


  public void transferMapSpillFile(
      File mapSpillFile,
      long[] partitionLengths,
      long[] checksums) throws IOException {
    // The map spill file already has the proper format, and it contains all of the partition data.
    // So just transfer it directly to the destination without any merging.
    File parent = ExecutorDiskUtils.getLocalDir(mapSpillFile);
    File outputFile = blockResolver.getDataFile(shuffleId, mapId, Some(Array(parent.getAbsolutePath)));
    File tempFile = Utils.tempFileWith(outputFile);
    Files.move(mapSpillFile.toPath(), tempFile.toPath());
    blockResolver
      .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, checksums, tempFile);
  }

This is only going to marginally alleviate the issues though, the slow disks are the RC here - and their impact will manifest in a lot of other ways as well. (We built push based shuffle when we started seeing very high disk issues of this sort - though unfortunately it is only supported in yarn right now)

@wankunde
Copy link
Contributor Author

ExecutorDiskUtils.getFilePath and DiskBlockManager.getFile determines the layout of the files ... and given this is specific to an optimization in LocalDiskShuffleExecutorComponents (which determines the output file), why not simply modify LocalDiskSingleSpillMapOutputWriter.transferMapSpillFile to leverage this layout information and 'host' the final output in the same disk ?

KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore for example, does something similar as well (using the layout info to recover - not this specific layout).


  public void transferMapSpillFile(
      File mapSpillFile,
      long[] partitionLengths,
      long[] checksums) throws IOException {
    // The map spill file already has the proper format, and it contains all of the partition data.
    // So just transfer it directly to the destination without any merging.
    File parent = ExecutorDiskUtils.getLocalDir(mapSpillFile);
    File outputFile = blockResolver.getDataFile(shuffleId, mapId, Some(Array(parent.getAbsolutePath)));
    File tempFile = Utils.tempFileWith(outputFile);
    Files.move(mapSpillFile.toPath(), tempFile.toPath());
    blockResolver
      .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, checksums, tempFile);
  }

This is only going to marginally alleviate the issues though, the slow disks are the RC here - and their impact will manifest in a lot of other ways as well. (We built push based shuffle when we started seeing very high disk issues of this sort - though unfortunately it is only supported in yarn right now)

Thanks @mridulm
Do you mean to change the location of the final shuffle data file? I prefer keep the current shuffle files layout, the executors write shuffle files, and then the external shuffle service can read them without any additional work.
KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf, blockManager) will perform a heavy disk scan in only KubernetesLocalDiskShuffleExecutorComponents.initializeExecutor() method, the internal layout of shuffle data files is the same as in yarn.

@mridulm
Copy link
Contributor

mridulm commented Mar 27, 2024

Actually, thinking more, what I suggested will not work - since the local directory is chosen based on the hash of the block id - which is different, sigh.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still very hesitant about this change ... it is trying to workaround overloaded disks for a specific corner case observed.
You will have pervasive slowdowns and sporadic failures when this sort of disk characterstics are exhibited, and there is only so much we can make spark robust unfortunately in this context - without looking at other options to mitigate the underlying RC.

Comment on lines 204 to 205
finalDataFileDir.map(blockManager.diskBlockManager()::createTempShuffleBlockInDir)
.orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
finalDataFileDir.map(blockManager.diskBlockManager()::createTempShuffleBlockInDir)
.orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock);
finalDataFileDir.filter(v -> spills.isEmpty()).map(blockManager.diskBlockManager()::createTempShuffleBlockInDir)
.orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock);

We need this only when there is a single output file, else we want to spread it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit this change, thanks

@@ -198,7 +201,8 @@ private void writeSortedFile(boolean isFinalFile) {
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more details.
final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
blockManager.diskBlockManager().createTempShuffleBlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would assume the final output has to go to blockResolver.getDataFile(shuffleId, mapId) right @cloud-fan ?
Currently at this layer we do not make that assumption ...

I was initially toying with the idea of passing mapId and shuffleId as constructor params ... and do something similar when I realized this would make assumptions that the code currently does not make - and so why the base directory is being passed around.

(And then ofcourse I thought we could solve it in LocalDiskSingleSpillMapOutputWriter here, but was completely wrong :-( ).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants