From 91c925795e4ccbe6c9ccf68f99ed8994ebd92a4b Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Thu, 14 Dec 2017 23:38:29 -0800 Subject: [PATCH 1/6] [SPARK-22827][CORE] Avoid throwing OutOfMemoryError in case of exception in spill --- .../org/apache/spark/memory/MemoryConsumer.java | 6 +++--- .../spark/memory/SparkOutOfMemoryError.java | 17 +++++++++++++++++ .../apache/spark/memory/TaskMemoryManager.java | 4 ++-- .../shuffle/sort/ShuffleExternalSorter.java | 3 ++- .../unsafe/sort/UnsafeInMemorySorter.java | 3 ++- .../org/apache/spark/executor/Executor.scala | 5 ++--- .../aggregate/TungstenAggregationIterator.scala | 3 ++- 7 files changed, 30 insertions(+), 11 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 2dff241900e82..73f5b619c602a 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -17,11 +17,11 @@ package org.apache.spark.memory; -import java.io.IOException; - import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; +import java.io.IOException; + /** * A memory consumer of {@link TaskMemoryManager} that supports spilling. * @@ -154,6 +154,6 @@ private void throwOom(final MemoryBlock page, final long required) { taskMemoryManager.freePage(page, this); } taskMemoryManager.showMemoryUsage(); - throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); + throw new SparkOutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); } } diff --git a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java new file mode 100644 index 0000000000000..7e86128e2077a --- /dev/null +++ b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java @@ -0,0 +1,17 @@ +package org.apache.spark.memory; + +/** + * This exception is thrown when a task can not acquire memory from the Memory manager. + * Instead of throwing {@link OutOfMemoryError}, which kills the executor, + * we should use throw this exception, which will just kill the current task. + */ +public final class SparkOutOfMemoryError extends OutOfMemoryError { + + public SparkOutOfMemoryError(String s) { + super(s); + } + + public SparkOutOfMemoryError(OutOfMemoryError e) { + super(e.getMessage()); + } +} diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index f6b5ea3c0ad26..e8d3730daa7a4 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -192,7 +192,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + c, e); - throw new OutOfMemoryError("error while calling spill() on " + c + " : " + throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : " + e.getMessage()); } } @@ -213,7 +213,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) { throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + consumer, e); - throw new OutOfMemoryError("error while calling spill() on " + consumer + " : " + throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : " + e.getMessage()); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index e80f9734ecf7b..47f73d0d271e2 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -33,6 +33,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.internal.config.package$; import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.memory.TooLargePageException; import org.apache.spark.serializer.DummySerializerInstance; @@ -341,7 +342,7 @@ private void growPointerArrayIfNecessary() throws IOException { // should have trigger spilling if (!inMemSorter.hasSpaceForAnotherRecord()) { logger.error("Unable to grow the pointer array"); - throw e; + throw new SparkOutOfMemoryError(e); } return; } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 3bb87a6ed653d..951d076420ee6 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -24,6 +24,7 @@ import org.apache.spark.TaskContext; import org.apache.spark.memory.MemoryConsumer; +import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.UnsafeAlignedOffset; @@ -212,7 +213,7 @@ public boolean hasSpaceForAnotherRecord() { public void expandPointerArray(LongArray newArray) { if (newArray.size() < array.size()) { - throw new OutOfMemoryError("Not enough memory to grow pointer array"); + throw new SparkOutOfMemoryError("Not enough memory to grow pointer array"); } Platform.copyMemory( array.getBaseObject(), diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index af0a0ab656564..2c3a8ef74800b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -35,7 +35,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription} import org.apache.spark.shuffle.FetchFailedException @@ -553,10 +553,9 @@ private[spark] class Executor( // Don't forcibly exit unless the exception was inherently fatal, to avoid // stopping other tasks unnecessarily. - if (Utils.isFatalError(t)) { + if (!t.isInstanceOf[SparkOutOfMemoryError] && Utils.isFatalError(t)) { uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t) } - } finally { runningTasks.remove(taskId) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 756eeb642e2d0..9dc334c1ead3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.TaskContext import org.apache.spark.internal.Logging +import org.apache.spark.memory.SparkOutOfMemoryError import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -205,7 +206,7 @@ class TungstenAggregationIterator( buffer = hashMap.getAggregationBufferFromUnsafeRow(groupingKey) if (buffer == null) { // failed to allocate the first page - throw new OutOfMemoryError("No enough memory for aggregation") + throw new SparkOutOfMemoryError("No enough memory for aggregation") } } processRow(buffer, newInput) From 4254d3e35bf6d8c34435c7d7456d18e4b86a7e59 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Mon, 18 Dec 2017 16:48:02 -0800 Subject: [PATCH 2/6] add license --- .../spark/memory/SparkOutOfMemoryError.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java index 7e86128e2077a..731cbeb3b7efd 100644 --- a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java +++ b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.memory; /** From 089ab45487d37225f9fa27704fe9f7e110750ba9 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 19 Dec 2017 00:43:06 -0800 Subject: [PATCH 3/6] Add private annotattion --- .../java/org/apache/spark/memory/SparkOutOfMemoryError.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java index 731cbeb3b7efd..778af311ce52a 100644 --- a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java +++ b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java @@ -16,11 +16,14 @@ */ package org.apache.spark.memory; +import org.apache.spark.annotation.Private; + /** * This exception is thrown when a task can not acquire memory from the Memory manager. * Instead of throwing {@link OutOfMemoryError}, which kills the executor, * we should use throw this exception, which will just kill the current task. */ +@Private public final class SparkOutOfMemoryError extends OutOfMemoryError { public SparkOutOfMemoryError(String s) { From 13777de861b22965471b6b0474dff12b525450aa Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 19 Dec 2017 08:35:19 -0800 Subject: [PATCH 4/6] minor nit --- .../java/org/apache/spark/memory/SparkOutOfMemoryError.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java index 778af311ce52a..ca00ca58e9713 100644 --- a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java +++ b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java @@ -21,7 +21,7 @@ /** * This exception is thrown when a task can not acquire memory from the Memory manager. * Instead of throwing {@link OutOfMemoryError}, which kills the executor, - * we should use throw this exception, which will just kill the current task. + * we should use throw this exception, which just kills the current task. */ @Private public final class SparkOutOfMemoryError extends OutOfMemoryError { From 2465e81dcb6e5c5a169fb7a97a6fa8b428802c5d Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 19 Dec 2017 10:42:39 -0800 Subject: [PATCH 5/6] review comments --- .../src/main/java/org/apache/spark/memory/MemoryConsumer.java | 2 +- .../org/apache/spark/shuffle/sort/ShuffleExternalSorter.java | 4 ++-- .../util/collection/unsafe/sort/UnsafeExternalSorter.java | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 73f5b619c602a..0140d37a67c8d 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -88,7 +88,7 @@ public void spill() throws IOException { * `LongArray` is too large to fit in a single page. The caller side should take care of these * two exceptions, or make sure the `size` is small enough that won't trigger exceptions. * - * @throws OutOfMemoryError + * @throws SparkOutOfMemoryError * @throws TooLargePageException */ public LongArray allocateArray(long size) { diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index 47f73d0d271e2..c3a07b2abf896 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -338,11 +338,11 @@ private void growPointerArrayIfNecessary() throws IOException { // The pointer array is too big to fix in a single page, spill. spill(); return; - } catch (OutOfMemoryError e) { + } catch (SparkOutOfMemoryError e) { // should have trigger spilling if (!inMemSorter.hasSpaceForAnotherRecord()) { logger.error("Unable to grow the pointer array"); - throw new SparkOutOfMemoryError(e); + throw e; } return; } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 8b8e15e3f78ed..66118f454159b 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -25,6 +25,7 @@ import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; +import org.apache.spark.memory.SparkOutOfMemoryError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -349,7 +350,7 @@ private void growPointerArrayIfNecessary() throws IOException { // The pointer array is too big to fix in a single page, spill. spill(); return; - } catch (OutOfMemoryError e) { + } catch (SparkOutOfMemoryError e) { // should have trigger spilling if (!inMemSorter.hasSpaceForAnotherRecord()) { logger.error("Unable to grow the pointer array"); From 5682416cba8643eeb556d0c90d052acf6b964d8b Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Tue, 19 Dec 2017 10:44:03 -0800 Subject: [PATCH 6/6] Review comments --- .../src/main/java/org/apache/spark/memory/MemoryConsumer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 0140d37a67c8d..a7bd4b3799a25 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -17,11 +17,11 @@ package org.apache.spark.memory; +import java.io.IOException; + import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; -import java.io.IOException; - /** * A memory consumer of {@link TaskMemoryManager} that supports spilling. *