From bb6841b3a7a160e252fe35dab82f4ddeb0032591 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 2 Aug 2018 00:15:09 +0800 Subject: [PATCH] [SPARK-24989][Core] Add retry support for OutOfDirectMemoryError --- .../network/shuffle/RetryingBlockFetcher.java | 14 ++++++-- .../shuffle/RetryingBlockFetcherSuite.java | 35 +++++++++++++++++-- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java index f309dda8afca6..d77377a1947db 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java @@ -26,6 +26,7 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import io.netty.util.internal.OutOfDirectMemoryError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,14 +172,21 @@ private synchronized void initiateRetry() { } /** - * Returns true if we should retry due a block fetch failure. We will retry if and only if - * the exception was an IOException and we haven't retried 'maxRetries' times already. + * Returns true if we should retry due a block fetch failure. We will retry if the + * exception was an IOException or {@link io.netty.util.internal.OutOfDirectMemoryError}, + * and we haven't retried 'maxRetries' times already. */ private synchronized boolean shouldRetry(Throwable e) { boolean isIOException = e instanceof IOException || (e.getCause() != null && e.getCause() instanceof IOException); + boolean isOutOfDirectMemoryError = e instanceof OutOfDirectMemoryError; + if (isOutOfDirectMemoryError) { + logger.warn("Got an io.netty.util.internal.OutOfDirectMemoryError, you could consider " + + "about bumping up java option io.netty.maxDirectMemory or trying lager " + + "spark.shuffle.io.retryWait, see more detail in SPARK-24989."); + } boolean hasRemainingRetries = retryCount < maxRetries; - return isIOException && hasRemainingRetries; + return (isOutOfDirectMemoryError || isIOException) && hasRemainingRetries; } /** diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index a530e16734db4..877fd32692526 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -19,6 +19,7 @@ import java.io.IOException; +import java.lang.reflect.Constructor; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.LinkedHashSet; @@ -27,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; +import io.netty.util.internal.OutOfDirectMemoryError; import org.junit.Test; import org.mockito.stubbing.Answer; import org.mockito.stubbing.Stubber; @@ -161,6 +163,33 @@ public void testTwoIOExceptions() throws IOException, InterruptedException { verifyNoMoreInteractions(listener); } + @Test + public void testOutOfDirectMemoryError() throws Exception { + BlockFetchingListener listener = mock(BlockFetchingListener.class); + Constructor constructor = OutOfDirectMemoryError.class.getDeclaredConstructor(String.class); + constructor.setAccessible(true); + OutOfDirectMemoryError err = (OutOfDirectMemoryError) constructor.newInstance( + "failed to allocate x byte(s) of direct memory"); + + List> interactions = Arrays.asList( + // IOException will cause a retry. Since b0 fails, we will retry both. + ImmutableMap.builder() + .put("b0", err) + .put("b1", block1) + .build(), + ImmutableMap.builder() + .put("b0", block0) + .put("b1", block1) + .build() + ); + + performInteractions(interactions, listener); + + verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0); + verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1); + verifyNoMoreInteractions(listener); + } + @Test public void testThreeIOExceptions() throws IOException, InterruptedException { BlockFetchingListener listener = mock(BlockFetchingListener.class); @@ -269,10 +298,10 @@ private static void performInteractions(List> inte if (blockValue instanceof ManagedBuffer) { retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue); - } else if (blockValue instanceof Exception) { - retryListener.onBlockFetchFailure(blockId, (Exception) blockValue); + } else if (blockValue instanceof Throwable) { + retryListener.onBlockFetchFailure(blockId, (Throwable) blockValue); } else { - fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue); + fail("Can only handle ManagedBuffers and Throwable, got " + blockValue); } } return null;