Skip to content

Commit

Permalink
while fetchFailed's reason is connectionException, retry to fetch blo…
Browse files Browse the repository at this point in the history
…cks's result
  • Loading branch information
lianhuiwang committed Nov 1, 2014
1 parent ee29ef3 commit dcfef7d
Showing 1 changed file with 39 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.spark.network.nio

import java.nio.ByteBuffer
import java.io.IOException

import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}

import scala.collection.mutable.HashMap
import scala.concurrent.Future


Expand All @@ -39,6 +41,10 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa

private var blockDataManager: BlockDataManager = _

private val blockFailedCounts = new HashMap[Seq[String], Int]

val maxRetryNum = conf.getInt("spark.shuffle.fetch.maxRetryNumber", 3)

/**
* Port number the service is listening on, available only after [[init]] is invoked.
*/
Expand Down Expand Up @@ -96,6 +102,11 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
future.onSuccess { case message =>
val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
blockFailedCounts.synchronized {
if(blockFailedCounts.contains(blockIds)){
blockFailedCounts -= blockIds
}
}

// SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty.
if (blockMessageArray.isEmpty) {
Expand All @@ -121,8 +132,34 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
}(cm.futureExecContext)

future.onFailure { case exception =>
blockIds.foreach { blockId =>
listener.onBlockFetchFailure(blockId, exception)
exception match {
case connectExcpt: IOException =>
logWarning("Failed to connect to " + hostName + ":" + port);
var isRetry:Boolean = false
var failedCount:Int = 1
blockFailedCounts.synchronized {
if(blockFailedCounts.contains(blockIds)){
failedCount = blockFailedCounts(blockIds)
failedCount += 1
}
if(failedCount >= maxRetryNum){
isRetry = false
}else{
isRetry = true
blockFailedCounts += ((blockIds, failedCount))
}
}
if(isRetry){
fetchBlocks(hostName, port, blockIds, listener)
}else{
blockIds.foreach { blockId =>
listener.onBlockFetchFailure(blockId, connectExcpt)
}
}
case t: Throwable =>
blockIds.foreach { blockId =>
listener.onBlockFetchFailure(blockId, t)
}
}
}(cm.futureExecContext)
}
Expand Down

0 comments on commit dcfef7d

Please sign in to comment.