New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44635][CORE] Handle shuffle fetch failures in decommissions #42296
Conversation
currentLocationOpt = getMapOutputLocation(shuffleId, mapId) | ||
} | ||
if (currentLocationOpt.isEmpty) { | ||
throw new MetadataUpdateFailedException(shuffleId, mapId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you reuse MetadataFetchFailedException
? We can use the message
field to distinguish the error case.
+CC @otterc |
// Try to get the cached location first in case other concurrent tasks | ||
// fetched the fresh location already | ||
var currentLocationOpt = getMapOutputLocation(shuffleId, mapId) | ||
if (currentLocationOpt.isDefined && currentLocationOpt.get == prevLocation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
if (currentLocationOpt.isDefined && currentLocationOpt.get == prevLocation) { | |
if (currentLocationOpt.exists(_ == prevLocation)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change to currentLocationOpt.contains(prevLocation)
.
if (currentLocationOpt.isDefined && currentLocationOpt.get == prevLocation) { | ||
// Address in the cache unchanged. Try to clean cache and get a fresh location | ||
unregisterShuffle(shuffleId) | ||
currentLocationOpt = getMapOutputLocation(shuffleId, mapId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: we end up removing both map and merge status here - for this call second call, pass canFetchMergeResult = true
in getMapOutputLocation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Will do.
throw new MetadataFetchFailedException(shuffleId, -1, | ||
message = s"Failed to get map output location for shuffleId $shuffleId, mapId $mapId") | ||
} | ||
currentLocationOpt.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: currentLocationOpt.getOrElse( throw ... )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When shuffle fallback storage is enabled, this currentLocationOpt
can be the FALLBACK_BLOCK_MANAGER_ID
, and DeferFetchRequestResult
below doesn't handle this special case.
so either 1) check the FetchRequest for fallback storage special ID 2)rewrite the RPC address to localhost so we get the blocks inside the fallback storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us filter it out here, and add support for fetching from fallback in a separate pr.
+CC @dongjoon-hyun as well.
.doc("If true, executors will try to refresh the cached locations for the shuffle blocks" + | ||
"when fetch failures happens (and decommission shuffle block migration is enabled), " + | ||
"and retry fetching when the location changes.") | ||
.version("3.5.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to 4.0.0
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Show resolved
Hide resolved
@@ -264,18 +272,22 @@ final class ShuffleBlockFetcherIterator( | |||
case FetchBlockInfo(blockId, size, mapIndex) => (blockId.toString, (size, mapIndex)) | |||
}.toMap | |||
val remainingBlocks = new HashSet[String]() ++= infoMap.keys | |||
val deferredBlocks = new ArrayBuffer[String]() | |||
val deferredBlocks = new HashMap[BlockManagerId, Queue[String]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
val deferredBlocks = new HashMap[BlockManagerId, Queue[String]]() | |
val deferredBlocks = new HashMap[BlockManagerId, ArrayBuffer[String]]() |
@@ -1288,6 +1288,30 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr | |||
mapSizesByExecutorId.iter | |||
} | |||
|
|||
def getMapOutputLocationWithRefresh( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe return Option[BlockManagerId]
?:
def getMapOutputLocationWithRefresh(
shuffleId: Int,
mapId: Long,
prevLocation: BlockManagerId): Option[BlockManagerId] = {
// Try to get the cached location first in case other concurrent tasks
// fetched the fresh location already
getMapOutputLocation(shuffleId, mapId) match {
case Some(location) =>
if (location == prevLocation) {
unregisterShuffle(shuffleId)
getMapOutputLocation(shuffleId, mapId)
} else {
Some(location)
}
case _ =>
None
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still want to throw a MetadataFetchFailedException
when failing to get a refreshed location here. So I would prefer returning a BlockManagerId
and make it specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do the following with Option
:
val currentAddressOpt = mapOutputTrackerWorker
.getMapOutputLocationWithRefresh(shuffleId, mapId, address)
currentAddressOpt match {
case Some(currentAddress) =>
if (currentAddress != address) {
logInfo(s"Map status location for block $blockId changed from $address " +
s"to $currentAddress")
remainingBlocks -= blockId
deferredBlocks.getOrElseUpdate(currentAddress, new ArrayBuffer[String]())
.append(blockId)
enqueueDeferredFetchRequestIfNecessary()
} else {
results.put(FailureFetchResult(block, infoMap(blockId)._2, address, e))
}
case None =>
results.put(FailureFetchResult(block, infoMap(blockId)._2, address, e))
}
It is also consistent with other function signatures with getMapOutputLocation
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply. Will work on this more actively from now.
@@ -1288,6 +1288,30 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr | |||
mapSizesByExecutorId.iter | |||
} | |||
|
|||
def getMapOutputLocationWithRefresh( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still want to throw a MetadataFetchFailedException
when failing to get a refreshed location here. So I would prefer returning a BlockManagerId
and make it specific.
// Try to get the cached location first in case other concurrent tasks | ||
// fetched the fresh location already | ||
var currentLocationOpt = getMapOutputLocation(shuffleId, mapId) | ||
if (currentLocationOpt.isDefined && currentLocationOpt.get == prevLocation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change to currentLocationOpt.contains(prevLocation)
.
if (currentLocationOpt.isDefined && currentLocationOpt.get == prevLocation) { | ||
// Address in the cache unchanged. Try to clean cache and get a fresh location | ||
unregisterShuffle(shuffleId) | ||
currentLocationOpt = getMapOutputLocation(shuffleId, mapId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Will do.
Also CC @jiangxb1987 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
val conf = SparkEnv.get.conf | ||
val (keys, values) = confPairs.unzip | ||
val currentValues = keys.map { key => | ||
if (conf.contains(key)) { | ||
Some(conf.get(key)) | ||
} else { | ||
None | ||
} | ||
} | ||
(keys, values).zipped.foreach { (key, value) => | ||
conf.set(key, value) | ||
} | ||
try f finally { | ||
keys.zip(currentValues).foreach { | ||
case (key, Some(value)) => conf.set(key, value) | ||
case (key, None) => conf.remove(key) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
val conf = SparkEnv.get.conf | |
val (keys, values) = confPairs.unzip | |
val currentValues = keys.map { key => | |
if (conf.contains(key)) { | |
Some(conf.get(key)) | |
} else { | |
None | |
} | |
} | |
(keys, values).zipped.foreach { (key, value) => | |
conf.set(key, value) | |
} | |
try f finally { | |
keys.zip(currentValues).foreach { | |
case (key, Some(value)) => conf.set(key, value) | |
case (key, None) => conf.remove(key) | |
} | |
} | |
} | |
def withConf[T](confPairs: (String, String)*)(f: => T): T = { | |
val conf = SparkEnv.get.conf | |
val inputConfMap = confPairs.toMap | |
val modifiedValues = conf.getAll.filter(kv => inputConfMap.contains(kv._1)).toMap | |
inputConfMap.foreach { kv => | |
conf.set(kv._1, kv._2) | |
} | |
try f finally { | |
inputConfMap.keys.foreach { key => | |
if (modifiedValues.contains(key)) { | |
conf.set(key, modifiedValues(key)) | |
} else { | |
conf.remove(key) | |
} | |
} | |
} | |
} |
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Show resolved
Hide resolved
if (currentLocationOpt.contains(prevLocation)) { | ||
// Address in the cache unchanged. Try to clean cache and get a fresh location | ||
unregisterShuffle(shuffleId) | ||
currentLocationOpt = getMapOutputLocation(shuffleId, mapId, canFetchMergeResult = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentLocationOpt = getMapOutputLocation(shuffleId, mapId, canFetchMergeResult = true) | |
currentLocationOpt = getMapOutputLocation(shuffleId, mapId, fetchMergeResult) |
results.put(FailureFetchResult(block, infoMap(blockId)._2, address, e)) | ||
} else { | ||
val (shuffleId, mapId) = BlockId.getShuffleIdAndMapId(block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move the getShuffleIdAndMapId
into the Try
?
We will effectively block shuffle indefinitely in case getShuffleIdAndMapId
throws an exception (it should not currently - but code could evolve).
Something like:
Try {
val (shuffleId, mapId) = BlockId.getShuffleIdAndMapId(block)
mapOutputTrackerWorker
.getMapOutputLocationWithRefresh(shuffleId, mapId, address)
} match {
} | ||
} | ||
when(mapOutputTracker.getMapOutputLocationWithRefresh(any(), any(), any())) | ||
.thenAnswer(_ => throw new MetadataFetchFailedException(0, 0, "")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: set mapId
to -1
} | ||
|
||
|
||
test("metadata fetch failure in handle map output location change") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a simple test to ensure existing behavior is preserved when no migration has happened ?
@bozhang2820, the test failure might be resolved by updating you branch to the latest master. |
@bozhang2820 Could you rebase the PR? |
val (shuffleId, mapId) = BlockId.getShuffleIdAndMapId(block) | ||
val mapOutputTrackerWorker = mapOutputTracker.asInstanceOf[MapOutputTrackerWorker] | ||
Try(mapOutputTrackerWorker | ||
.getMapOutputLocationWithRefresh(shuffleId, mapId, address)) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refreshing map output locations in a Netty callback thread will cause potential deadlock. Here is the reason:
- Some map output locations are stored via broadcast variables
- This code has a synchronization block
- The netty response to fetch broadcast variables might be blocked by other handlers like the shuffle success handler
- In the above case, because the shuffle success handler also requires the same lock from 2), this is a deadlock
The above situation happened during my test of this code running this patch.
@bozhang2820 not sure if you still have time to work on this PR. I opened another PR to address some issues mentioned above. |
SparkEnv.get.conf.get(config.STORAGE_DECOMMISSION_ENABLED) && | ||
SparkEnv.get.conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) | ||
|
||
private val shouldPerformShuffleLocationRefresh = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about make this one of the constructor argument? One of the benefit is that you don't need to write tests with TestUtils.withConf
We have encountered some performance issues during our tests with this change, and will have to address those before moving forward. |
Do you have details that can be shared @bozhang2820 ? Thanks |
Any updates on this @bozhang2820 ? Thanks |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This change tries to handle shuffle fetch failures due to decommissions.
When encountering a fetch failure for a block,
ShuffleBlockFetcherIterator
will check the updated map output location (BlockManagerId
) for it, and see if the location changes. When it does, the fetch failure will be ignored and the updated location and the block id will be recorded. At the end of the fetch request (when all blocks in the request are processed), multiple requests will be assembled and enqueued, to retry the fetches from the updated locations for the corresponding blocks.Why are the changes needed?
This is to improve stability when decommission is enabled.
Does this PR introduce any user-facing change?
This change comes with a feature flag
spark.storage.decommission.shuffleBlocks.refreshLocationsEnabled
.How was this patch tested?
Added a new unit test.