From 9f693b227be83717fcc76067c9fa5b8849362a5b Mon Sep 17 00:00:00 2001 From: Yangyang Liu Date: Sun, 24 Jul 2016 12:16:40 -0700 Subject: [PATCH] Make parameters configurable in BlockManager --- .../apache/spark/storage/BlockManager.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 83a9cbd63d391..3fbc1a7aeff1c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -132,6 +132,12 @@ private[spark] class BlockManager( private val maxFailuresBeforeLocationRefresh = conf.getInt("spark.block.failures.beforeLocationRefresh", 5) + private val maxAttemptsRegisterWithShuffleServer = + conf.getInt("spark.block.attempts.registerWithShuffleServer", 3) + + private val retryIntervalRegisterWithShufflesServer = + conf.getTimeAsMs("spark.block.retry.interval.registerWithShuffleServer", "5s") + private val slaveEndpoint = rpcEnv.setupEndpoint( "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next, new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker)) @@ -184,20 +190,18 @@ private[spark] class BlockManager( diskBlockManager.subDirsPerLocalDir, shuffleManager.getClass.getName) - val MAX_ATTEMPTS = 3 - val SLEEP_TIME_SECS = 5 - - for (i <- 1 to MAX_ATTEMPTS) { + for (i <- 1 to maxAttemptsRegisterWithShuffleServer) { try { // Synchronous and will throw an exception if we cannot connect. shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer( shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig) return } catch { - case e: Exception if i < MAX_ATTEMPTS => - logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}" - + s" more times after waiting $SLEEP_TIME_SECS seconds...", e) - Thread.sleep(SLEEP_TIME_SECS * 1000) + case e: Exception if i < maxAttemptsRegisterWithShuffleServer => + logError(s"Failed to connect to external shuffle server," + + s"will retry ${maxAttemptsRegisterWithShuffleServer - i}" + + s" more times after waiting $retryIntervalRegisterWithShufflesServer seconds...", e) + Thread.sleep(retryIntervalRegisterWithShufflesServer) } } }