Skip to content

Commit

Permalink
[SPARK-3495][SPARK-3496] Backporting block replication fixes made in …
Browse files Browse the repository at this point in the history
…master to branch 1.1

The original PR was #2366

This backport was non-trivial because Spark 1.1 uses ConnectionManager instead of NioBlockTransferService, which required slight modification to unit tests. Other than that the code is exactly same as in the original PR. Please refer to discussion in the original PR if you have any thoughts.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #3191 from tdas/replication-fix-branch-1.1-backport and squashes the following commits:

593214a [Tathagata Das] Merge remote-tracking branch 'apache-github/branch-1.1' into branch-1.1
2ed927f [Tathagata Das] Fixed error in unit test.
de4ff73 [Tathagata Das] [SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target
  • Loading branch information
tdas committed Nov 11, 2014
1 parent 3d889df commit be0cc99
Show file tree
Hide file tree
Showing 8 changed files with 535 additions and 44 deletions.
120 changes: 104 additions & 16 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ private[spark] class BlockManager(
private val broadcastCleaner = new MetadataCleaner(
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)

// Field related to peer block managers that are necessary for block replication
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
private var lastPeerFetchTime = 0L

initialize()

/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
Expand Down Expand Up @@ -822,28 +827,111 @@ private[spark] class BlockManager(
}

/**
* Replicate block to another node.
* Get peer block managers in the system.
*/
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
peerFetchLock.synchronized {
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
if (cachedPeers == null || forceFetch || timeout) {
cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
lastPeerFetchTime = System.currentTimeMillis
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
}
cachedPeers
}
}

/**
* Replicate block to another node. Not that this is a blocking call that returns after
* the block has been replicated.
*/
@volatile var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersForReplication = new ArrayBuffer[BlockManagerId]
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)

var replicationFailed = false
var failures = 0
var done = false

// Get cached list of peers
peersForReplication ++= getPeers(forceFetch = false)

// Get a random peer. Note that this selection of a peer is deterministic on the block id.
// So assuming the list of peers does not change and no replication failures,
// if there are multiple attempts in the same node to replicate the same block,
// the same set of peers will be selected.
def getRandomPeer(): Option[BlockManagerId] = {
// If replication had failed, then force update the cached list of peers and remove the peers
// that have been already used
if (replicationFailed) {
peersForReplication.clear()
peersForReplication ++= getPeers(forceFetch = true)
peersForReplication --= peersReplicatedTo
peersForReplication --= peersFailedToReplicateTo
}
if (!peersForReplication.isEmpty) {
Some(peersForReplication(random.nextInt(peersForReplication.size)))
} else {
None
}
}
for (peer: BlockManagerId <- cachedPeers) {
val start = System.nanoTime
data.rewind()
logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
s"To node: $peer")
val putBlock = PutBlock(blockId, data, tLevel)
val cmId = new ConnectionManagerId(peer.host, peer.port)
val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId)
if (!syncPutBlockSuccess) {
logError(s"Failed to call syncPutBlock to $peer")

// One by one choose a random peer and try uploading the block to it
// If replication fails (e.g., target peer is down), force the list of cached peers
// to be re-fetched from driver and then pick another random peer for replication. Also
// temporarily black list the peer for which replication failed.
//
// This selection of a peer and replication is continued in a loop until one of the
// following 3 conditions is fulfilled:
// (i) specified number of peers have been replicated to
// (ii) too many failures in replicating to peers
// (iii) no peer left to replicate to
//
while (!done) {
getRandomPeer() match {
case Some(peer) =>
val onePeerStartTime = System.currentTimeMillis
data.rewind()
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
val putBlock = PutBlock(blockId, data, tLevel)
val cmId = new ConnectionManagerId(peer.host, peer.port)
val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId)
if (syncPutBlockSuccess) {
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %d ms"
.format((System.currentTimeMillis - onePeerStartTime)))
peersReplicatedTo += peer
peersForReplication -= peer
replicationFailed = false
if (peersReplicatedTo.size == numPeersToReplicateTo) {
done = true // specified number of peers have been replicated to
}
} else {
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures")
failures += 1
replicationFailed = true
peersFailedToReplicateTo += peer
if (failures > maxReplicationFailures) { // too many failures in replicating to peers
done = true
}
}
case None => // no peer left to replicate to
done = true
}
logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
.format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
}
val timeTakeMs = (System.currentTimeMillis - startTime)
logTrace(s"Replicating $blockId of ${data.limit()} bytes to " +
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ class BlockManagerId private (

def nettyPort: Int = nettyPort_

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
def isDriver: Boolean = (executorId == "<driver>")

override def writeExternal(out: ObjectOutput) {
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
}

/** Get ids of other nodes in the cluster from the driver */
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
if (result.length != numPeers) {
throw new SparkException(
"Error getting peers, only got " + result.size + " instead of " + numPeers)
}
result
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetLocationsMultipleBlockIds(blockIds) =>
sender ! getLocationsMultipleBlockIds(blockIds)

case GetPeers(blockManagerId, size) =>
sender ! getPeers(blockManagerId, size)
case GetPeers(blockManagerId) =>
sender ! getPeers(blockManagerId)

case GetMemoryStatus =>
sender ! memoryStatus
Expand Down Expand Up @@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
* from the executors, but not from the driver.
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
// TODO: Consolidate usages of <driver>
import context.dispatcher
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || info.blockManagerId.executorId != "<driver>"
removeFromDriver || !info.blockManagerId.isDriver
}
Future.sequence(
requiredBlockManagers.map { bm =>
Expand Down Expand Up @@ -213,7 +212,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
val minSeenTime = now - slaveTimeout
val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "<driver>") {
if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
Expand All @@ -233,7 +232,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
*/
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
blockManagerId.executorId == "<driver>" && !isLocal
blockManagerId.isDriver && !isLocal
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
true
Expand Down Expand Up @@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
tachyonSize: Long) {

if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.executorId == "<driver>" && !isLocal) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
Expand Down Expand Up @@ -403,16 +402,14 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockIds.map(blockId => getLocations(blockId))
}

private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray

val selfIndex = peers.indexOf(blockManagerId)
if (selfIndex == -1) {
throw new SparkException("Self index for " + blockManagerId + " not found")
/** Get the list of the peers of the given block manager */
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
} else {
Seq.empty
}

// Note that this logic will select the same node multiple times if there aren't enough peers
Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[spark] object BlockManagerMessages {

case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster

case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case class RemoveExecutor(execId: String) extends ToBlockManagerMaster

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
assert(statuses.size === 1)
statuses.head match { case (bm, status) =>
assert(bm.executorId === "<driver>", "Block should only be on the driver")
assert(bm.isDriver, "Block should only be on the driver")
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store on the driver")
assert(status.diskSize === 0, "Block should not be in disk store on the driver")
Expand Down
Loading

0 comments on commit be0cc99

Please sign in to comment.