From 987f15ccb01b6c0351fbfdd49d6930b929c50a74 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 30 Jan 2018 21:54:47 +0100 Subject: [PATCH 01/11] locking tweak --- .../apache/spark/io/ReadAheadInputStream.java | 79 ++++++++++--------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 5b45d268ace8d..e5413e55de581 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -246,8 +246,13 @@ private void waitForAsyncReadComplete() throws IOException { @Override public int read() throws IOException { - byte[] oneByteArray = oneByte.get(); - return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF; + if (activeBuffer.remaining() > readAheadThresholdInBytes) { + // short path - just get one byte. + return activeBuffer.get() & 0xFF; + } else { + byte[] oneByteArray = oneByte.get(); + return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF; + } } @Override @@ -258,54 +263,54 @@ public int read(byte[] b, int offset, int len) throws IOException { if (len == 0) { return 0; } - stateChangeLock.lock(); - try { - return readInternal(b, offset, len); - } finally { - stateChangeLock.unlock(); - } - } - /** - * flip the active and read ahead buffer - */ - private void swapBuffers() { - ByteBuffer temp = activeBuffer; - activeBuffer = readAheadBuffer; - readAheadBuffer = temp; - } - - /** - * Internal read function which should be called only from read() api. The assumption is that - * the stateChangeLock is already acquired in the caller before calling this function. - */ - private int readInternal(byte[] b, int offset, int len) throws IOException { - assert (stateChangeLock.isLocked()); if (!activeBuffer.hasRemaining()) { - waitForAsyncReadComplete(); - if (readAheadBuffer.hasRemaining()) { - swapBuffers(); - } else { - // The first read or activeBuffer is skipped. - readAsync(); + // No remaining in active buffer - lock and switch to write ahead buffer. + stateChangeLock.lock(); + try { waitForAsyncReadComplete(); - if (isEndOfStream()) { - return -1; + if (readAheadBuffer.hasRemaining()) { + swapBuffers(); + } else { + // The first read or activeBuffer is skipped. + readAsync(); + waitForAsyncReadComplete(); + if (isEndOfStream()) { + return -1; + } + swapBuffers(); } - swapBuffers(); + } finally { + stateChangeLock.unlock(); } - } else { - checkReadException(); } len = Math.min(len, activeBuffer.remaining()); activeBuffer.get(b, offset, len); - if (activeBuffer.remaining() <= readAheadThresholdInBytes && !readAheadBuffer.hasRemaining()) { - readAsync(); + if (activeBuffer.remaining() <= readAheadThresholdInBytes) { + // low remaining in active buffer - trigger async read ahead. + stateChangeLock.lock(); + try { + if (!readAheadBuffer.hasRemaining()) { + readAsync(); + } + } finally { + stateChangeLock.unlock(); + } } + return len; } + /** + * flip the active and read ahead buffer + */ + private void swapBuffers() { + ByteBuffer temp = activeBuffer; + activeBuffer = readAheadBuffer; + readAheadBuffer = temp; + } + @Override public int available() throws IOException { stateChangeLock.lock(); From b26ffce6780078dbc38bff658e1ef7e9c56c3dd8 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Thu, 1 Feb 2018 15:27:09 +0100 Subject: [PATCH 02/11] fill the read ahead buffer --- .../apache/spark/io/ReadAheadInputStream.java | 61 +++++++------------ .../unsafe/sort/UnsafeSorterSpillReader.java | 10 +-- .../spark/io/ReadAheadInputStreamSuite.java | 2 +- 3 files changed, 26 insertions(+), 47 deletions(-) diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index e5413e55de581..719d31c36f8f4 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -78,9 +79,8 @@ public class ReadAheadInputStream extends InputStream { // whether there is a read ahead task running, private boolean isReading; - // If the remaining data size in the current buffer is below this threshold, - // we issue an async read from the underlying input stream. - private final int readAheadThresholdInBytes; + // whether there is a reader waiting for data. + private AtomicBoolean isWaiting = new AtomicBoolean(false); private final InputStream underlyingInputStream; @@ -97,20 +97,13 @@ public class ReadAheadInputStream extends InputStream { * * @param inputStream The underlying input stream. * @param bufferSizeInBytes The buffer size. - * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead - * threshold, an async read is triggered. */ public ReadAheadInputStream( - InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { + InputStream inputStream, int bufferSizeInBytes) { Preconditions.checkArgument(bufferSizeInBytes > 0, "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes); - Preconditions.checkArgument(readAheadThresholdInBytes > 0 && - readAheadThresholdInBytes < bufferSizeInBytes, - "readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes, " + - "but the value is " + readAheadThresholdInBytes); activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); - this.readAheadThresholdInBytes = readAheadThresholdInBytes; this.underlyingInputStream = inputStream; activeBuffer.flip(); readAheadBuffer.flip(); @@ -166,12 +159,17 @@ public void run() { // in that case the reader waits for this async read to complete. // So there is no race condition in both the situations. int read = 0; + int off = 0, len = arr.length; Throwable exception = null; try { - while (true) { - read = underlyingInputStream.read(arr); - if (0 != read) break; - } + // try to fill the read ahead buffer. + // if a reader is waiting, possibly return early. + do { + read = underlyingInputStream.read(arr, off, len); + if (read <= 0) break; + off += read; + len -= read; + } while (len > 0 && !isWaiting.get()); } catch (Throwable ex) { exception = ex; if (ex instanceof Error) { @@ -181,13 +179,12 @@ public void run() { } } finally { stateChangeLock.lock(); + readAheadBuffer.limit(off); if (read < 0 || (exception instanceof EOFException)) { endOfStream = true; } else if (exception != null) { readAborted = true; readException = exception; - } else { - readAheadBuffer.limit(read); } readInProgress = false; signalAsyncReadComplete(); @@ -232,7 +229,9 @@ private void waitForAsyncReadComplete() throws IOException { stateChangeLock.lock(); try { while (readInProgress) { + isWaiting.set(true); asyncReadComplete.await(); + isWaiting.set(false); } } catch (InterruptedException e) { InterruptedIOException iio = new InterruptedIOException(e.getMessage()); @@ -246,7 +245,7 @@ private void waitForAsyncReadComplete() throws IOException { @Override public int read() throws IOException { - if (activeBuffer.remaining() > readAheadThresholdInBytes) { + if (activeBuffer.hasRemaining()) { // short path - just get one byte. return activeBuffer.get() & 0xFF; } else { @@ -269,17 +268,18 @@ public int read(byte[] b, int offset, int len) throws IOException { stateChangeLock.lock(); try { waitForAsyncReadComplete(); - if (readAheadBuffer.hasRemaining()) { - swapBuffers(); - } else { + if (!readAheadBuffer.hasRemaining()) { // The first read or activeBuffer is skipped. readAsync(); waitForAsyncReadComplete(); if (isEndOfStream()) { return -1; } - swapBuffers(); } + // Swap the newly read read ahead buffer in place of empty active buffer. + swapBuffers(); + // After swapping buffers, trigger another async read for read ahead buffer. + readAsync(); } finally { stateChangeLock.unlock(); } @@ -287,18 +287,6 @@ public int read(byte[] b, int offset, int len) throws IOException { len = Math.min(len, activeBuffer.remaining()); activeBuffer.get(b, offset, len); - if (activeBuffer.remaining() <= readAheadThresholdInBytes) { - // low remaining in active buffer - trigger async read ahead. - stateChangeLock.lock(); - try { - if (!readAheadBuffer.hasRemaining()) { - readAsync(); - } - } finally { - stateChangeLock.unlock(); - } - } - return len; } @@ -354,10 +342,6 @@ private long skipInternal(long n) throws IOException { if (toSkip <= activeBuffer.remaining()) { // Only skipping from active buffer is sufficient activeBuffer.position(toSkip + activeBuffer.position()); - if (activeBuffer.remaining() <= readAheadThresholdInBytes - && !readAheadBuffer.hasRemaining()) { - readAsync(); - } return n; } // We need to skip from both active buffer and read ahead buffer @@ -366,6 +350,7 @@ private long skipInternal(long n) throws IOException { activeBuffer.flip(); readAheadBuffer.position(toSkip + readAheadBuffer.position()); swapBuffers(); + // Trigger async read to emptied read ahead buffer. readAsync(); return n; } else { diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index 2c53c8d809d2e..fb179d07edebc 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -72,21 +72,15 @@ public UnsafeSorterSpillReader( bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; } - final double readAheadFraction = - SparkEnv.get() == null ? 0.5 : - SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction", 0.5); - - // SPARK-23310: Disable read-ahead input stream, because it is causing lock contention and perf - // regression for TPC-DS queries. final boolean readAheadEnabled = SparkEnv.get() != null && - SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", false); + SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", true); final InputStream bs = new NioBufferedFileInputStream(file, (int) bufferSizeBytes); try { if (readAheadEnabled) { this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs), - (int) bufferSizeBytes, (int) (bufferSizeBytes * readAheadFraction)); + (int) bufferSizeBytes); } else { this.in = serializerManager.wrapStream(blockId, bs); } diff --git a/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java index 918ddc4517ec4..c20f0d460a6dc 100644 --- a/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java +++ b/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java @@ -29,6 +29,6 @@ public class ReadAheadInputStreamSuite extends GenericFileInputStreamSuite { public void setUp() throws IOException { super.setUp(); inputStream = new ReadAheadInputStream( - new NioBufferedFileInputStream(inputFile), 8 * 1024, 4 * 1024); + new NioBufferedFileInputStream(inputFile), 8 * 1024); } } From ca45a8887ad05e1a4ec4e690ebd7dfa2ca3a3968 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Mon, 12 Feb 2018 10:35:08 -0800 Subject: [PATCH 03/11] reset isWaiting after exception --- core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 719d31c36f8f4..4a97be873c85b 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -239,6 +239,7 @@ private void waitForAsyncReadComplete() throws IOException { throw iio; } finally { stateChangeLock.unlock(); + isWaiting.set(false); } checkReadException(); } From eaa6b4e8de8415f01724d0341825f7aa9a63f81e Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Mon, 12 Feb 2018 10:42:48 -0800 Subject: [PATCH 04/11] remove waiting loop --- .../java/org/apache/spark/io/ReadAheadInputStream.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 4a97be873c85b..107c303886027 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -227,19 +227,19 @@ private void signalAsyncReadComplete() { private void waitForAsyncReadComplete() throws IOException { stateChangeLock.lock(); + isWaiting.set(true); try { - while (readInProgress) { - isWaiting.set(true); + if (readInProgress) { asyncReadComplete.await(); - isWaiting.set(false); } + assert(!readInProgress) } catch (InterruptedException e) { InterruptedIOException iio = new InterruptedIOException(e.getMessage()); iio.initCause(e); throw iio; } finally { - stateChangeLock.unlock(); isWaiting.set(false); + stateChangeLock.unlock(); } checkReadException(); } From 723818102cb84556598e69eaf12b255bcf9ada6a Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Mon, 12 Feb 2018 10:47:03 -0800 Subject: [PATCH 05/11] add short path for skip --- .../org/apache/spark/io/ReadAheadInputStream.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 107c303886027..930612016fa74 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -317,6 +317,11 @@ public long skip(long n) throws IOException { if (n <= 0L) { return 0L; } + if (n <= activeBuffer.remaining()) { + // Only skipping from active buffer is sufficient + activeBuffer.position(toSkip + activeBuffer.position()); + return n; + } stateChangeLock.lock(); long skipped; try { @@ -340,13 +345,9 @@ private long skipInternal(long n) throws IOException { if (available() >= n) { // we can skip from the internal buffers int toSkip = (int) n; - if (toSkip <= activeBuffer.remaining()) { - // Only skipping from active buffer is sufficient - activeBuffer.position(toSkip + activeBuffer.position()); - return n; - } // We need to skip from both active buffer and read ahead buffer toSkip -= activeBuffer.remaining(); + assert(toSkip > 0); // skipping from activeBuffer already handled. activeBuffer.position(0); activeBuffer.flip(); readAheadBuffer.position(toSkip + readAheadBuffer.position()); From d6d44fc3007205437a58e2eb9333bb055598dba1 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Mon, 12 Feb 2018 11:33:00 -0800 Subject: [PATCH 06/11] fix compilation --- .../main/java/org/apache/spark/io/ReadAheadInputStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 930612016fa74..6694dd80a0321 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -232,7 +232,7 @@ private void waitForAsyncReadComplete() throws IOException { if (readInProgress) { asyncReadComplete.await(); } - assert(!readInProgress) + assert(!readInProgress); } catch (InterruptedException e) { InterruptedIOException iio = new InterruptedIOException(e.getMessage()); iio.initCause(e); @@ -319,7 +319,7 @@ public long skip(long n) throws IOException { } if (n <= activeBuffer.remaining()) { // Only skipping from active buffer is sufficient - activeBuffer.position(toSkip + activeBuffer.position()); + activeBuffer.position((int) n + activeBuffer.position()); return n; } stateChangeLock.lock(); From 5273176abf20c8a87776df21fa7458b33f11a990 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 13 Feb 2018 11:16:48 -0800 Subject: [PATCH 07/11] update comment --- .../src/main/java/org/apache/spark/io/ReadAheadInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 6694dd80a0321..abef90cf36d1b 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -270,7 +270,7 @@ public int read(byte[] b, int offset, int len) throws IOException { try { waitForAsyncReadComplete(); if (!readAheadBuffer.hasRemaining()) { - // The first read or activeBuffer is skipped. + // The first read. readAsync(); waitForAsyncReadComplete(); if (isEndOfStream()) { From 62cefcd6e97e5826713b6690f62b542a82d9e2aa Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 13 Feb 2018 11:22:03 -0800 Subject: [PATCH 08/11] update test suite with uneven buffer sizes --- .../java/org/apache/spark/io/ReadAheadInputStreamSuite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java index c20f0d460a6dc..4b457b5370043 100644 --- a/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java +++ b/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java @@ -29,6 +29,6 @@ public class ReadAheadInputStreamSuite extends GenericFileInputStreamSuite { public void setUp() throws IOException { super.setUp(); inputStream = new ReadAheadInputStream( - new NioBufferedFileInputStream(inputFile), 8 * 1024); + new NioBufferedFileInputStream(inputFile, 123), 321); } } From 1b3e9709db730b2cb5caa155ecb4562c18044f73 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 13 Feb 2018 11:53:11 -0800 Subject: [PATCH 09/11] more testing combinations --- .../spark/io/GenericFileInputStreamSuite.java | 98 +++++++++++-------- .../spark/io/NioBufferedInputStreamSuite.java | 6 +- .../spark/io/ReadAheadInputStreamSuite.java | 17 +++- 3 files changed, 75 insertions(+), 46 deletions(-) diff --git a/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java index 3440e1aea2f46..22db3592ecc96 100644 --- a/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java +++ b/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java @@ -37,7 +37,7 @@ public abstract class GenericFileInputStreamSuite { protected File inputFile; - protected InputStream inputStream; + protected InputStream[] inputStreams; @Before public void setUp() throws IOException { @@ -54,77 +54,91 @@ public void tearDown() { @Test public void testReadOneByte() throws IOException { - for (int i = 0; i < randomBytes.length; i++) { - assertEquals(randomBytes[i], (byte) inputStream.read()); + for (InputStream inputStream: inputStreams) { + for (int i = 0; i < randomBytes.length; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } } } @Test public void testReadMultipleBytes() throws IOException { - byte[] readBytes = new byte[8 * 1024]; - int i = 0; - while (i < randomBytes.length) { - int read = inputStream.read(readBytes, 0, 8 * 1024); - for (int j = 0; j < read; j++) { - assertEquals(randomBytes[i], readBytes[j]); - i++; + for (InputStream inputStream: inputStreams) { + byte[] readBytes = new byte[8 * 1024]; + int i = 0; + while (i < randomBytes.length) { + int read = inputStream.read(readBytes, 0, 8 * 1024); + for (int j = 0; j < read; j++) { + assertEquals(randomBytes[i], readBytes[j]); + i++; + } } } } @Test public void testBytesSkipped() throws IOException { - assertEquals(1024, inputStream.skip(1024)); - for (int i = 1024; i < randomBytes.length; i++) { - assertEquals(randomBytes[i], (byte) inputStream.read()); + for (InputStream inputStream: inputStreams) { + assertEquals(1024, inputStream.skip(1024)); + for (int i = 1024; i < randomBytes.length; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } } } @Test public void testBytesSkippedAfterRead() throws IOException { - for (int i = 0; i < 1024; i++) { - assertEquals(randomBytes[i], (byte) inputStream.read()); - } - assertEquals(1024, inputStream.skip(1024)); - for (int i = 2048; i < randomBytes.length; i++) { - assertEquals(randomBytes[i], (byte) inputStream.read()); + for (InputStream inputStream: inputStreams) { + for (int i = 0; i < 1024; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } + assertEquals(1024, inputStream.skip(1024)); + for (int i = 2048; i < randomBytes.length; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } } } @Test public void testNegativeBytesSkippedAfterRead() throws IOException { - for (int i = 0; i < 1024; i++) { - assertEquals(randomBytes[i], (byte) inputStream.read()); - } - // Skipping negative bytes should essential be a no-op - assertEquals(0, inputStream.skip(-1)); - assertEquals(0, inputStream.skip(-1024)); - assertEquals(0, inputStream.skip(Long.MIN_VALUE)); - assertEquals(1024, inputStream.skip(1024)); - for (int i = 2048; i < randomBytes.length; i++) { - assertEquals(randomBytes[i], (byte) inputStream.read()); + for (InputStream inputStream: inputStreams) { + for (int i = 0; i < 1024; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } + // Skipping negative bytes should essential be a no-op + assertEquals(0, inputStream.skip(-1)); + assertEquals(0, inputStream.skip(-1024)); + assertEquals(0, inputStream.skip(Long.MIN_VALUE)); + assertEquals(1024, inputStream.skip(1024)); + for (int i = 2048; i < randomBytes.length; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } } } @Test public void testSkipFromFileChannel() throws IOException { - // Since the buffer is smaller than the skipped bytes, this will guarantee - // we skip from underlying file channel. - assertEquals(1024, inputStream.skip(1024)); - for (int i = 1024; i < 2048; i++) { - assertEquals(randomBytes[i], (byte) inputStream.read()); - } - assertEquals(256, inputStream.skip(256)); - assertEquals(256, inputStream.skip(256)); - assertEquals(512, inputStream.skip(512)); - for (int i = 3072; i < randomBytes.length; i++) { - assertEquals(randomBytes[i], (byte) inputStream.read()); + for (InputStream inputStream: inputStreams) { + // Since the buffer is smaller than the skipped bytes, this will guarantee + // we skip from underlying file channel. + assertEquals(1024, inputStream.skip(1024)); + for (int i = 1024; i < 2048; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } + assertEquals(256, inputStream.skip(256)); + assertEquals(256, inputStream.skip(256)); + assertEquals(512, inputStream.skip(512)); + for (int i = 3072; i < randomBytes.length; i++) { + assertEquals(randomBytes[i], (byte) inputStream.read()); + } } } @Test public void testBytesSkippedAfterEOF() throws IOException { - assertEquals(randomBytes.length, inputStream.skip(randomBytes.length + 1)); - assertEquals(-1, inputStream.read()); + for (InputStream inputStream: inputStreams) { + assertEquals(randomBytes.length, inputStream.skip(randomBytes.length + 1)); + assertEquals(-1, inputStream.read()); + } } } diff --git a/core/src/test/java/org/apache/spark/io/NioBufferedInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/NioBufferedInputStreamSuite.java index 211b33a1a9fb0..a320f8662f707 100644 --- a/core/src/test/java/org/apache/spark/io/NioBufferedInputStreamSuite.java +++ b/core/src/test/java/org/apache/spark/io/NioBufferedInputStreamSuite.java @@ -18,6 +18,7 @@ import org.junit.Before; +import java.io.InputStream; import java.io.IOException; /** @@ -28,6 +29,9 @@ public class NioBufferedInputStreamSuite extends GenericFileInputStreamSuite { @Before public void setUp() throws IOException { super.setUp(); - inputStream = new NioBufferedFileInputStream(inputFile); + inputStreams = new InputStream[] { + new NioBufferedFileInputStream(inputFile), // default + new NioBufferedFileInputStream(inputFile, 123) // small, unaligned buffer + }; } } diff --git a/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java index 4b457b5370043..bfa1e0b908824 100644 --- a/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java +++ b/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java @@ -19,16 +19,27 @@ import org.junit.Before; import java.io.IOException; +import java.io.InputStream; /** - * Tests functionality of {@link NioBufferedFileInputStream} + * Tests functionality of {@link ReadAheadInputStreamSuite} */ public class ReadAheadInputStreamSuite extends GenericFileInputStreamSuite { @Before public void setUp() throws IOException { super.setUp(); - inputStream = new ReadAheadInputStream( - new NioBufferedFileInputStream(inputFile, 123), 321); + inputStreams = new InputStream[] { + // Tests equal and aligned buffers of wrapped an outer stream. + new ReadAheadInputStream(new NioBufferedFileInputStream(inputFile, 8 * 1024), 8 * 1024), + // Tests aligned buffers, wrapped bigger than outer. + new ReadAheadInputStream(new NioBufferedFileInputStream(inputFile, 3 * 1024), 2 * 1024), + // Tests aligned buffers, wrapped smaller than outer. + new ReadAheadInputStream(new NioBufferedFileInputStream(inputFile, 2 * 1024), 3 * 1024), + // Tests unaligned buffers, wrapped bigger than outer. + new ReadAheadInputStream(new NioBufferedFileInputStream(inputFile, 321), 123), + // Tests unaligned buffers, wrapped smaller than outer. + new ReadAheadInputStream(new NioBufferedFileInputStream(inputFile, 123), 321) + }; } } From 52f4a7c3e97c475b4464b82c7d8e00dcd9d889b3 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 13 Feb 2018 16:41:03 -0800 Subject: [PATCH 10/11] while loop against spurious wakeups --- .../main/java/org/apache/spark/io/ReadAheadInputStream.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index abef90cf36d1b..fe355fe2e226a 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -229,10 +229,9 @@ private void waitForAsyncReadComplete() throws IOException { stateChangeLock.lock(); isWaiting.set(true); try { - if (readInProgress) { + while (readInProgress) { asyncReadComplete.await(); } - assert(!readInProgress); } catch (InterruptedException e) { InterruptedIOException iio = new InterruptedIOException(e.getMessage()); iio.initCause(e); From b6852aa8a7f0e053985d5b89ba4020792d648f82 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 13 Feb 2018 19:08:07 -0800 Subject: [PATCH 11/11] add comment about spuriour wakeups --- .../src/main/java/org/apache/spark/io/ReadAheadInputStream.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index fe355fe2e226a..0cced9e222952 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -229,6 +229,8 @@ private void waitForAsyncReadComplete() throws IOException { stateChangeLock.lock(); isWaiting.set(true); try { + // There is only one reader, and one writer, so the writer should signal only once, + // but a while loop checking the wake up condition is still needed to avoid spurious wakeups. while (readInProgress) { asyncReadComplete.await(); }