From fbd2497dca4bed8724cffeac5fc4b1ee51f7cb36 Mon Sep 17 00:00:00 2001 From: Brad Kaiser Date: Mon, 27 Nov 2017 14:29:52 -0500 Subject: [PATCH 1/2] [SPARK-22618][CORE] Catch exception in removeRDD to stop jobs from dying --- .../storage/BlockManagerMasterEndpoint.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 56d0266b8edad..0a295be48b7b2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -17,6 +17,7 @@ package org.apache.spark.storage +import java.io.IOException import java.util.{HashMap => JHashMap} import scala.collection.JavaConverters._ @@ -159,11 +160,18 @@ class BlockManagerMasterEndpoint( // Ask the slaves to remove the RDD, and put the result in a sequence of Futures. // The dispatcher is used as an implicit argument into the Future sequence construction. val removeMsg = RemoveRdd(rddId) - Future.sequence( - blockManagerInfo.values.map { bm => - bm.slaveEndpoint.ask[Int](removeMsg) - }.toSeq - ) + + val handleRemoveRddException: PartialFunction[Throwable, Int] = { + case e: IOException => + logWarning(s"Error trying to remove RDD $rddId", e) + 0 // zero blocks were removed + } + + val futures = blockManagerInfo.values.map { bm => + bm.slaveEndpoint.ask[Int](removeMsg).recover(handleRemoveRddException) + }.toSeq + + Future.sequence(futures) } private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { From e2ad8c3a58984d0cca0cc3c4d4894bd2f25c2eed Mon Sep 17 00:00:00 2001 From: Brad Kaiser Date: Wed, 6 Dec 2017 09:40:09 -0500 Subject: [PATCH 2/2] [SPARK-22618][CORE] inlined error handling function --- .../spark/storage/BlockManagerMasterEndpoint.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 0a295be48b7b2..89a6a71a589a1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -161,14 +161,12 @@ class BlockManagerMasterEndpoint( // The dispatcher is used as an implicit argument into the Future sequence construction. val removeMsg = RemoveRdd(rddId) - val handleRemoveRddException: PartialFunction[Throwable, Int] = { - case e: IOException => - logWarning(s"Error trying to remove RDD $rddId", e) - 0 // zero blocks were removed - } - val futures = blockManagerInfo.values.map { bm => - bm.slaveEndpoint.ask[Int](removeMsg).recover(handleRemoveRddException) + bm.slaveEndpoint.ask[Int](removeMsg).recover { + case e: IOException => + logWarning(s"Error trying to remove RDD $rddId", e) + 0 // zero blocks were removed + } }.toSeq Future.sequence(futures)