From ec069b3d269b6ec2e6ca9edcc0f6981ccecdad3d Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Mon, 3 Sep 2018 23:06:33 -0400 Subject: [PATCH 1/5] [SPARK-25318]. Add exception handling when wrapping the input stream during the the fetch or stage retry in response to a corrupted block --- .../storage/ShuffleBlockFetcherIterator.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 00d01dd28afb5..c4c67873bd4e3 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -444,10 +444,23 @@ final class ShuffleBlockFetcherIterator( throwFetchFailedException(blockId, address, e) } - input = streamWrapper(blockId, in) + var wrapCorruption: Boolean = false + try { + input = streamWrapper(blockId, in) + } catch { + case e: IOException => + buf.release() + logWarning(s"got a corrupted block $blockId from $address while wrapping it" + + s" locally, fetch again", e) + corruptedBlocks += blockId + fetchRequests += FetchRequest(address, Array((blockId, size))) + wrapCorruption = true + result = null + in.close + } // Only copy the stream if it's wrapped by compression or encryption, also the size of // block is small (the decompressed block is smaller than maxBytesInFlight) - if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { + if (detectCorrupt && !wrapCorruption && !input.eq(in) && size < maxBytesInFlight / 3) { val originalInput = input val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) try { From 7ec6de29c4813dea56031c398e034d49d474e976 Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Wed, 19 Sep 2018 11:01:26 -0400 Subject: [PATCH 2/5] Merging the two try and removing the unnecessary val and updating the related test --- .../storage/ShuffleBlockFetcherIterator.scala | 53 +++++++------------ .../ShuffleBlockFetcherIteratorSuite.scala | 6 +-- 2 files changed, 22 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index c4c67873bd4e3..feb68f63ef898 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -444,49 +444,34 @@ final class ShuffleBlockFetcherIterator( throwFetchFailedException(blockId, address, e) } - var wrapCorruption: Boolean = false try { input = streamWrapper(blockId, in) - } catch { - case e: IOException => - buf.release() - logWarning(s"got a corrupted block $blockId from $address while wrapping it" + - s" locally, fetch again", e) - corruptedBlocks += blockId - fetchRequests += FetchRequest(address, Array((blockId, size))) - wrapCorruption = true - result = null - in.close - } - // Only copy the stream if it's wrapped by compression or encryption, also the size of - // block is small (the decompressed block is smaller than maxBytesInFlight) - if (detectCorrupt && !wrapCorruption && !input.eq(in) && size < maxBytesInFlight / 3) { - val originalInput = input - val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) - try { + // Only copy the stream if it's wrapped by compression or encryption, also the size of + // block is small (the decompressed block is smaller than maxBytesInFlight) + if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { + val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) // Decompress the whole block at once to detect any corruption, which could increase // the memory usage tne potential increase the chance of OOM. // TODO: manage the memory used here, and spill it into disk in case of OOM. Utils.copyStream(input, out) out.close() input = out.toChunkedByteBuffer.toInputStream(dispose = true) - } catch { - case e: IOException => - buf.release() - if (buf.isInstanceOf[FileSegmentManagedBuffer] - || corruptedBlocks.contains(blockId)) { - throwFetchFailedException(blockId, address, e) - } else { - logWarning(s"got an corrupted block $blockId from $address, fetch again", e) - corruptedBlocks += blockId - fetchRequests += FetchRequest(address, Array((blockId, size))) - result = null - } - } finally { - // TODO: release the buf here to free memory earlier - originalInput.close() - in.close() } + } catch { + case e: IOException => + buf.release() + if (buf.isInstanceOf[FileSegmentManagedBuffer] + || corruptedBlocks.contains(blockId)) { + throwFetchFailedException(blockId, address, e) + } else { + logWarning(s"got an corrupted block $blockId from $address, fetch again", e) + corruptedBlocks += blockId + fetchRequests += FetchRequest(address, Array((blockId, size))) + result = null + } + } finally { + // TODO: release the buf here to free memory earlier + in.close() } case FailureFetchResult(blockId, address, e) => diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index a2997dbd1b1ac..cc6f4fbba6062 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -128,13 +128,13 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT verify(mockBuf, times(0)).release() val delegateAccess = PrivateMethod[InputStream]('delegate) - verify(wrappedInputStream.invokePrivate(delegateAccess()), times(0)).close() + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close() wrappedInputStream.close() verify(mockBuf, times(1)).release() - verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close() + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(2)).close() wrappedInputStream.close() // close should be idempotent verify(mockBuf, times(1)).release() - verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close() + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(2)).close() } // 3 local blocks, and 2 remote blocks From 4983d69abf48594e9876d7a40f3f531836333243 Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Wed, 19 Sep 2018 16:01:05 -0400 Subject: [PATCH 3/5] Adding back the originalInput val to avoid data leaks with the adapting its scope --- .../apache/spark/storage/ShuffleBlockFetcherIterator.scala | 3 +++ .../spark/storage/ShuffleBlockFetcherIteratorSuite.scala | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index feb68f63ef898..794867ce3e065 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -386,6 +386,7 @@ final class ShuffleBlockFetcherIterator( var result: FetchResult = null var input: InputStream = null + var originalInput: InputStream = null // Take the next fetched result and try to decompress it to detect data corruption, // then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch // is also corrupt, so the previous stage could be retried. @@ -446,6 +447,7 @@ final class ShuffleBlockFetcherIterator( try { input = streamWrapper(blockId, in) + originalInput = input // Only copy the stream if it's wrapped by compression or encryption, also the size of // block is small (the decompressed block is smaller than maxBytesInFlight) if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { @@ -471,6 +473,7 @@ final class ShuffleBlockFetcherIterator( } } finally { // TODO: release the buf here to free memory earlier + originalInput.close() in.close() } diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index cc6f4fbba6062..8d54bb084ddb5 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -128,13 +128,13 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT verify(mockBuf, times(0)).release() val delegateAccess = PrivateMethod[InputStream]('delegate) - verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close() + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(2)).close() wrappedInputStream.close() verify(mockBuf, times(1)).release() - verify(wrappedInputStream.invokePrivate(delegateAccess()), times(2)).close() + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(3)).close() wrappedInputStream.close() // close should be idempotent verify(mockBuf, times(1)).release() - verify(wrappedInputStream.invokePrivate(delegateAccess()), times(2)).close() + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(3)).close() } // 3 local blocks, and 2 remote blocks From 16caa5b2c9a3b2582ca2d8268251d10f58aa400a Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Wed, 19 Sep 2018 22:56:09 -0400 Subject: [PATCH 4/5] Use closeStreams=true and close in only when the stream is copied --- .../spark/storage/ShuffleBlockFetcherIterator.scala | 12 ++++++------ .../storage/ShuffleBlockFetcherIteratorSuite.scala | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 794867ce3e065..67c6ce259252f 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -444,19 +444,18 @@ final class ShuffleBlockFetcherIterator( buf.release() throwFetchFailedException(blockId, address, e) } - + var isStreamCopied: Boolean = false try { input = streamWrapper(blockId, in) - originalInput = input // Only copy the stream if it's wrapped by compression or encryption, also the size of // block is small (the decompressed block is smaller than maxBytesInFlight) if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { + isStreamCopied = true val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) // Decompress the whole block at once to detect any corruption, which could increase // the memory usage tne potential increase the chance of OOM. // TODO: manage the memory used here, and spill it into disk in case of OOM. - Utils.copyStream(input, out) - out.close() + Utils.copyStream(input, out, closeStreams = true) input = out.toChunkedByteBuffer.toInputStream(dispose = true) } } catch { @@ -473,8 +472,9 @@ final class ShuffleBlockFetcherIterator( } } finally { // TODO: release the buf here to free memory earlier - originalInput.close() - in.close() + if (isStreamCopied) { + in.close() + } } case FailureFetchResult(blockId, address, e) => diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 8d54bb084ddb5..a2997dbd1b1ac 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -128,13 +128,13 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT verify(mockBuf, times(0)).release() val delegateAccess = PrivateMethod[InputStream]('delegate) - verify(wrappedInputStream.invokePrivate(delegateAccess()), times(2)).close() + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(0)).close() wrappedInputStream.close() verify(mockBuf, times(1)).release() - verify(wrappedInputStream.invokePrivate(delegateAccess()), times(3)).close() + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close() wrappedInputStream.close() // close should be idempotent verify(mockBuf, times(1)).release() - verify(wrappedInputStream.invokePrivate(delegateAccess()), times(3)).close() + verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close() } // 3 local blocks, and 2 remote blocks From 296f65bea18f9ae436d8c34ce43fe8c12b46e834 Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Thu, 20 Sep 2018 12:48:33 -0400 Subject: [PATCH 5/5] Removed the unused var --- .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 67c6ce259252f..9232e173ce973 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -386,7 +386,6 @@ final class ShuffleBlockFetcherIterator( var result: FetchResult = null var input: InputStream = null - var originalInput: InputStream = null // Take the next fetched result and try to decompress it to detect data corruption, // then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch // is also corrupt, so the previous stage could be retried.