Skip to content

Commit

Permalink
[SPARK-42252][CORE] Add spark.shuffle.localDisk.file.output.buffer
Browse files Browse the repository at this point in the history
…and deprecate `spark.shuffle.unsafe.file.output.buffer`

### What changes were proposed in this pull request?
Deprecate spark.shuffle.unsafe.file.output.buffer and add a new config spark.shuffle.localDisk.file.output.buffer instead.

### Why are the changes needed?
The old config is desgined to be used in UnsafeShuffleWriter, but now it has been used in all local shuffle writers through LocalDiskShuffleMapOutputWriter, introduced by #25007.

### Does this PR introduce _any_ user-facing change?
Old still works, advised to use new.

### How was this patch tested?
Passed existing tests.

Closes #39819 from wayneguow/shuffle_output_buffer.

Authored-by: wayneguow <guow93@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
wayneguow authored and yaooqinn committed Jun 14, 2024
1 parent 878de00 commit dd8b05f
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public LocalDiskShuffleMapOutputWriter(
this.blockResolver = blockResolver;
this.bufferSize =
(int) (long) sparkConf.get(
package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
package$.MODULE$.SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
this.partitionLengths = new long[numPartitions];
this.outputFile = blockResolver.getDataFile(shuffleId, mapId);
this.outputTempFile = null;
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,9 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0",
"Please use spark.yarn.executor.launch.excludeOnFailure.enabled"),
DeprecatedConfig("spark.network.remoteReadNioBufferConversion", "3.5.2",
"Please open a JIRA ticket to report it if you need to use this configuration.")
"Please open a JIRA ticket to report it if you need to use this configuration."),
DeprecatedConfig("spark.shuffle.unsafe.file.output.buffer", "4.0.0",
"Please use spark.shuffle.localDisk.file.output.buffer")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1463,15 +1463,21 @@ package object config {

private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
.doc("The file system for this buffer size after each partition " +
"is written in unsafe shuffle writer. In KiB unless otherwise specified.")
.doc("(Deprecated since Spark 4.0, please use 'spark.shuffle.localDisk.file.output.buffer'.)")
.version("2.3.0")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024,
s"The buffer size must be positive and less than or equal to" +
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.localDisk.file.output.buffer")
.doc("The file system for this buffer size after each partition " +
"is written in all local disk shuffle writers. In KiB unless otherwise specified.")
.version("4.0.0")
.fallbackConf(SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE)

private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
.doc("The buffer size, in bytes, to use when writing the sorted records to an on-disk file.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class LocalDiskShuffleMapOutputWriterSuite extends SparkFunSuite {
partitionSizesInMergedFile = null
conf = new SparkConf()
.set("spark.app.id", "example.spark.app")
.set("spark.shuffle.unsafe.file.output.buffer", "16k")
.set("spark.shuffle.localDisk.file.output.buffer", "16k")
when(blockResolver.getDataFile(anyInt, anyLong)).thenReturn(mergedOutputFile)
when(blockResolver.createTempFile(any(classOf[File])))
.thenAnswer { invocationOnMock =>
Expand Down
12 changes: 10 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1033,11 +1033,19 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.shuffle.unsafe.file.output.buffer</code></td>
<td>32k</td>
<td>
The file system for this buffer size after each partition is written in unsafe shuffle writer.
In KiB unless otherwise specified.
Deprecated since Spark 4.0, please use <code>spark.shuffle.localDisk.file.output.buffer</code>.
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.shuffle.localDisk.file.output.buffer</code></td>
<td>32k</td>
<td>
The file system for this buffer size after each partition is written in all local disk shuffle writers.
In KiB unless otherwise specified.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.shuffle.spill.diskWriteBufferSize</code></td>
<td>1024 * 1024</td>
Expand Down
2 changes: 2 additions & 0 deletions docs/core-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ license: |

- Since Spark 4.0, Spark performs speculative executions less aggressively with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and `spark.speculation.quantile=0.75`.

- Since Spark 4.0, `spark.shuffle.unsafe.file.output.buffer` is deprecated though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead.

## Upgrading from Core 3.4 to 3.5

- Since Spark 3.5, `spark.yarn.executor.failuresValidityInterval` is deprecated. Use `spark.executor.failuresValidityInterval` instead.
Expand Down

0 comments on commit dd8b05f

Please sign in to comment.