From 66abe9839a34ad693ed479068e3bd8579785a02a Mon Sep 17 00:00:00 2001 From: fangjun Date: Thu, 26 Mar 2015 10:56:52 +0800 Subject: [PATCH] [SPARK-6396][Core] Add broadcast timeout --- .../apache/spark/network/BlockTransferService.scala | 11 ++++++++--- .../scala/org/apache/spark/storage/BlockManager.scala | 3 ++- .../scala/org/apache/spark/DistributedSuite.scala | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index dcbda5a8515dd..15540b1fa09c2 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -21,7 +21,7 @@ import java.io.Closeable import java.nio.ByteBuffer import scala.concurrent.{Promise, Await, Future} -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import org.apache.spark.Logging import org.apache.spark.network.buffer.{NioManagedBuffer, ManagedBuffer} @@ -83,7 +83,12 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo * * It is also only available after [[init]] is invoked. */ - def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { + def fetchBlockSync( + host: String, + port: Int, + execId: String, + blockId: String, + timeout: Int): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), @@ -99,7 +104,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo } }) - Await.result(result.future, Duration.Inf) + Await.result(result.future, timeout seconds) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 80d66e59132da..ca3f5b4e06332 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -589,10 +589,11 @@ private[spark] class BlockManager( private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) + val timeout = conf.getInt("spark.storage.fetchBlockTimeout", 1000) // seconds for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") val data = blockTransferService.fetchBlockSync( - loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() + loc.host, loc.port, loc.executorId, blockId.toString, timeout).nioByteBuffer() if (data != null) { if (asBlockResult) { diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 97ea3578aa8ba..3005c01e13c5a 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -194,7 +194,7 @@ class DistributedSuite extends FunSuite with Matchers with LocalSparkContext { val blockTransfer = SparkEnv.get.blockTransferService blockManager.master.getLocations(blockId).foreach { cmId => val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, - blockId.toString) + blockId.toString, 1000) val deserialized = blockManager.dataDeserialize(blockId, bytes.nioByteBuffer()) .asInstanceOf[Iterator[Int]].toList assert(deserialized === (1 to 100).toList)