From 9083304b4b42357dc2717151db28882e01245838 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Wed, 16 Aug 2017 13:08:35 +0800 Subject: [PATCH 1/2] [SPARK][CORE] Add retry logic for new broadcast in BroadcastManager --- .../spark/broadcast/BroadcastManager.scala | 17 ++++++- .../spark/broadcast/BroadcastSuite.scala | 44 ++++++++++++++++++- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index e88988fe03b2e..a4465f6bac9dd 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -17,11 +17,12 @@ package org.apache.spark.broadcast +import java.io.IOException import java.util.concurrent.atomic.AtomicLong import scala.reflect.ClassTag -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.internal.Logging private[spark] class BroadcastManager( @@ -32,6 +33,7 @@ private[spark] class BroadcastManager( private var initialized = false private var broadcastFactory: BroadcastFactory = null + private val maxRetries = conf.getInt("spark.broadcast.maxRetries", 3) initialize() @@ -53,7 +55,18 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = { - broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement()) + var lastException: Exception = null + for(attempt <- 1 to maxRetries) { + try { + return broadcastFactory.newBroadcast[T](value_, isLocal, + nextBroadcastId.getAndIncrement()) + } catch { + case e: IOException => + lastException = e + logWarning(s"Failed to create a new broadcast in $attempt attempts", e) + } + } + throw new SparkException(s"Error creating a new broadcast", lastException) } def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 46f9ac6b0273a..e339dbae1cb2c 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -17,18 +17,18 @@ package org.apache.spark.broadcast +import java.io.File import java.util.Locale import scala.util.Random - import org.scalatest.Assertions - import org.apache.spark._ import org.apache.spark.io.SnappyCompressionCodec import org.apache.spark.rdd.RDD import org.apache.spark.security.EncryptionFunSuite import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage._ +import org.apache.spark.util.Utils import org.apache.spark.util.io.ChunkedByteBuffer // Dummy class that creates a broadcast variable but doesn't use it @@ -49,6 +49,46 @@ class DummyBroadcastClass(rdd: RDD[Int]) extends Serializable { class BroadcastSuite extends SparkFunSuite with LocalSparkContext with EncryptionFunSuite { + test("Creating TorrentBroadcast failed with bad disk") { + val torrentConf = new SparkConf() + .set("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory") + val tmp = System.getProperty("java.io.tmpdir") + val badDisk = tmp + "/BAD_DISK" + // Only use one disk + torrentConf.set("spark.local.dir", badDisk) + sc = new SparkContext("local", "test", torrentConf) + val list = List[Int](1, 2, 3, 4) + // delete this dir to simulate a bad disk + Utils.deleteRecursively(new File(badDisk)) + try { + sc.broadcast(list) + assert(false, "Won't be here") + } catch { + case e: SparkException => + assert(true) + } + } + + test("Creating TorrentBroadcast success with bad disks and good disks") { + val torrentConf = new SparkConf() + .set("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory") + val tmp = System.getProperty("java.io.tmpdir") + val badDisk = tmp + "/BAD_DISK" + torrentConf.set("spark.local.dir", s"${tmp}/GOOD_DISK1,${tmp}/GOOD_DISK2,${badDisk}") + val list = List[Int](1, 2, 3, 4) + sc = new SparkContext("local", "test", torrentConf) + // delete this dir to simulate a bad disk + Utils.deleteRecursively(new File(badDisk)) + try { + val broadcast = sc.broadcast(list) + val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum)) + assert(results.collect().toSet === Set((1, 10), (2, 10))) + } catch { + case e: SparkException => + assert(false, "Won't be here") + } + } + test("Using TorrentBroadcast locally") { sc = new SparkContext("local", "test") val list = List[Int](1, 2, 3, 4) From 071eb237fa104bcdf5751c3fdcfd19e3cffbf600 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 17 Aug 2017 11:13:23 +0800 Subject: [PATCH 2/2] Check local dirs number first --- .../org/apache/spark/broadcast/BroadcastManager.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index a4465f6bac9dd..d581a4e9c400a 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -24,6 +24,7 @@ import scala.reflect.ClassTag import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils private[spark] class BroadcastManager( val isDriver: Boolean, @@ -33,7 +34,13 @@ private[spark] class BroadcastManager( private var initialized = false private var broadcastFactory: BroadcastFactory = null - private val maxRetries = conf.getInt("spark.broadcast.maxRetries", 3) + private val maxRetries = { + if (Utils.getConfiguredLocalDirs(conf).length > 1) { + conf.getInt("spark.broadcast.maxRetries", 3) + } else { + 1 + } + } initialize()