Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,20 @@ class BlockManagerMasterEndpoint(

case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
val response = updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)

response.foreach { isSuccess =>
@inline def handleResult(success: Boolean): Unit = {
// SPARK-30594: we should not post `SparkListenerBlockUpdated` when updateBlockInfo
// returns false since the block info would be updated again later.
if (isSuccess) {
if (success) {
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
}
context.reply(isSuccess)
context.reply(success)
}

if (blockId.isShuffle) {
updateShuffleBlockInfo(blockId, blockManagerId).foreach(handleResult)
} else {
handleResult(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
}

case GetLocations(blockId) =>
Expand Down Expand Up @@ -571,46 +576,54 @@ class BlockManagerMasterEndpoint(
id
}

private def updateShuffleBlockInfo(blockId: BlockId, blockManagerId: BlockManagerId)
: Future[Boolean] = {
blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
// SPARK-36782: Invoke `MapOutputTracker.updateMapOutput` within the thread
// `dispatcher-BlockManagerMaster` could lead to the deadlock when
// `MapOutputTracker.serializeOutputStatuses` broadcasts the serialized mapstatues under
// the acquired write lock. The broadcast block would report its status to
// `BlockManagerMasterEndpoint`, while the `BlockManagerMasterEndpoint` is occupied by
// `updateMapOutput` since it's waiting for the write lock. Thus, we use `Future` to call
// `updateMapOutput` in a separate thread to avoid the deadlock.
Future {
// We need to update this at index file because there exists the index-only block
logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.")
mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
true
}
case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) =>
logDebug(s"Received shuffle data block update for ${shuffleId} ${mapId}, ignore.")
Future.successful(true)
case _ =>
logDebug(s"Unexpected shuffle block type ${blockId}" +
s"as ${blockId.getClass().getSimpleName()}")
Future.successful(false)
}
}

private def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): Future[Boolean] = {
diskSize: Long): Boolean = {
logDebug(s"Updating block info on master ${blockId} for ${blockManagerId}")

if (blockId.isShuffle) {
blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
// We need to update this at index file because there exists the index-only block
return Future {
logDebug(s"Received shuffle index block update for ${shuffleId} ${mapId}, updating.")
mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId)
true
}
case ShuffleDataBlockId(shuffleId: Int, mapId: Long, reduceId: Int) =>
logDebug(s"Received shuffle data block update for ${shuffleId} ${mapId}, ignore.")
return Future.successful(true)
case _ =>
logDebug(s"Unexpected shuffle block type ${blockId}" +
s"as ${blockId.getClass().getSimpleName()}")
return Future.successful(false)
}
}

if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
return Future.successful(true)
return true
} else {
return Future.successful(false)
return false
}
}

if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
return Future.successful(true)
return true
}

blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
Expand Down Expand Up @@ -642,7 +655,7 @@ class BlockManagerMasterEndpoint(
if (locations.size == 0) {
blockLocations.remove(blockId)
}
Future.successful(true)
true
}

private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
Expand Down