Skip to content

Commit

Permalink
[SPARK-20950][CORE] add a new config to diskWriteBufferSize which is …
Browse files Browse the repository at this point in the history
…hard coded before

## What changes were proposed in this pull request?

This PR Improvement in two:
1.With spark.shuffle.spill.diskWriteBufferSize configure diskWriteBufferSize of ShuffleExternalSorter.
    when change the size of the diskWriteBufferSize to test `forceSorterToSpill`
    The average performance of running 10 times is as follows:(their unit is MS).
```
diskWriteBufferSize:       1M    512K    256K    128K    64K    32K    16K    8K    4K
---------------------------------------------------------------------------------------
RecordSize = 2.5M          742   722     694     686     667    668    671    669   683
RecordSize = 1M            294   293     292     287     283    285    281    279   285
```

2.Remove outputBufferSizeInBytes and inputBufferSizeInBytes to initialize in mergeSpillsWithFileStream function.

## How was this patch tested?
The unit test.

Author: caoxuewen <cao.xuewen@zte.com.cn>

Closes #18174 from heary-cao/buffersize.
  • Loading branch information
heary-cao authored and cloud-fan committed Jul 6, 2017
1 parent d540dfb commit 565e7a8
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.Utils;
import org.apache.spark.internal.config.package$;

/**
* An external sorter that is specialized for sort-based shuffle.
Expand Down Expand Up @@ -82,6 +83,9 @@ final class ShuffleExternalSorter extends MemoryConsumer {
/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSizeBytes;

/** The buffer size to use when writing the sorted records to an on-disk file */
private final int diskWriteBufferSize;

/**
* Memory pages that hold the records being sorted. The pages in this list are freed when
* spilling, although in principle we could recycle these pages across spills (on the other hand,
Expand Down Expand Up @@ -116,13 +120,14 @@ final class ShuffleExternalSorter extends MemoryConsumer {
this.taskContext = taskContext;
this.numPartitions = numPartitions;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.numElementsForSpillThreshold =
conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024);
this.writeMetrics = writeMetrics;
this.inMemSorter = new ShuffleInMemorySorter(
this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
this.peakMemoryUsedBytes = getMemoryUsage();
this.diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
}

/**
Expand Down Expand Up @@ -155,7 +160,7 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
// data through a byte array. This array does not need to be large enough to hold a single
// record;
final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
final byte[] writeBuffer = new byte[diskWriteBufferSize];

// Because this output will be read during shuffle, its compression codec must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
Expand Down Expand Up @@ -195,7 +200,7 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
long recordReadPosition = recordOffsetInPage + 4; // skip over record length
while (dataRemaining > 0) {
final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining);
final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
Platform.copyMemory(
recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
writer.write(writeBuffer, 0, toTransfer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.Utils;
import org.apache.spark.internal.config.package$;

@Private
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
Expand All @@ -65,6 +66,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {

@VisibleForTesting
static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096;
static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1024 * 1024;

private final BlockManager blockManager;
private final IndexShuffleBlockResolver shuffleBlockResolver;
Expand All @@ -78,6 +80,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final SparkConf sparkConf;
private final boolean transferToEnabled;
private final int initialSortBufferSize;
private final int inputBufferSizeInBytes;
private final int outputBufferSizeInBytes;

@Nullable private MapStatus mapStatus;
@Nullable private ShuffleExternalSorter sorter;
Expand Down Expand Up @@ -140,6 +144,10 @@ public UnsafeShuffleWriter(
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
this.initialSortBufferSize = sparkConf.getInt("spark.shuffle.sort.initialBufferSize",
DEFAULT_INITIAL_SORT_BUFFER_SIZE);
this.inputBufferSizeInBytes =
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.outputBufferSizeInBytes =
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
open();
}

Expand Down Expand Up @@ -209,7 +217,7 @@ private void open() throws IOException {
partitioner.numPartitions(),
sparkConf,
writeMetrics);
serBuffer = new MyByteArrayOutputStream(1024 * 1024);
serBuffer = new MyByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE);
serOutputStream = serializer.serializeStream(serBuffer);
}

Expand Down Expand Up @@ -360,12 +368,10 @@ private long[] mergeSpillsWithFileStream(

final OutputStream bos = new BufferedOutputStream(
new FileOutputStream(outputFile),
(int) sparkConf.getSizeAsKb("spark.shuffle.unsafe.file.output.buffer", "32k") * 1024);
outputBufferSizeInBytes);
// Use a counting output stream to avoid having to close the underlying file and ask
// the file system for its size after each partition is written.
final CountingOutputStream mergedFileOutputStream = new CountingOutputStream(bos);
final int inputBufferSizeInBytes =
(int) sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;

boolean threwException = true;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@
import java.io.File;
import java.io.IOException;

import org.apache.spark.serializer.SerializerManager;
import scala.Tuple2;

import org.apache.spark.SparkConf;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.DummySerializerInstance;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.DiskBlockObjectWriter;
import org.apache.spark.storage.TempLocalBlockId;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.internal.config.package$;

/**
* Spills a list of sorted records to disk. Spill files have the following format:
Expand All @@ -38,12 +40,16 @@
*/
public final class UnsafeSorterSpillWriter {

static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
private final SparkConf conf = new SparkConf();

/** The buffer size to use when writing the sorted records to an on-disk file */
private final int diskWriteBufferSize =
(int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());

// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
// data through a byte array.
private byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
private byte[] writeBuffer = new byte[diskWriteBufferSize];

private final File file;
private final BlockId blockId;
Expand Down Expand Up @@ -114,23 +120,23 @@ public void write(
writeIntToBuffer(recordLength, 0);
writeLongToBuffer(keyPrefix, 4);
int dataRemaining = recordLength;
int freeSpaceInWriteBuffer = DISK_WRITE_BUFFER_SIZE - 4 - 8; // space used by prefix + len
int freeSpaceInWriteBuffer = diskWriteBufferSize - 4 - 8; // space used by prefix + len
long recordReadPosition = baseOffset;
while (dataRemaining > 0) {
final int toTransfer = Math.min(freeSpaceInWriteBuffer, dataRemaining);
Platform.copyMemory(
baseObject,
recordReadPosition,
writeBuffer,
Platform.BYTE_ARRAY_OFFSET + (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer),
Platform.BYTE_ARRAY_OFFSET + (diskWriteBufferSize - freeSpaceInWriteBuffer),
toTransfer);
writer.write(writeBuffer, 0, (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer) + toTransfer);
writer.write(writeBuffer, 0, (diskWriteBufferSize - freeSpaceInWriteBuffer) + toTransfer);
recordReadPosition += toTransfer;
dataRemaining -= toTransfer;
freeSpaceInWriteBuffer = DISK_WRITE_BUFFER_SIZE;
freeSpaceInWriteBuffer = diskWriteBufferSize;
}
if (freeSpaceInWriteBuffer < DISK_WRITE_BUFFER_SIZE) {
writer.write(writeBuffer, 0, (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer));
if (freeSpaceInWriteBuffer < diskWriteBufferSize) {
writer.write(writeBuffer, 0, (diskWriteBufferSize - freeSpaceInWriteBuffer));
}
writer.recordWritten();
}
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,31 @@ package object config {
"spark.")
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_FILE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.file.buffer")
.doc("Size of the in-memory buffer for each shuffle file output stream. " +
"These buffers reduce the number of disk seeks and system calls made " +
"in creating intermediate shuffle files.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
.doc("The file system for this buffer size after each partition " +
"is written in unsafe shuffle writer.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
.doc("The buffer size to use when writing the sorted records to an on-disk file.")
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v > 0 && v <= Int.MaxValue,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.")
.createWithDefault(1024 * 1024)
}

0 comments on commit 565e7a8

Please sign in to comment.