-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache #39459
Conversation
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Outdated
Show resolved
Hide resolved
Can one of the admins verify this patch? |
Please ping me when it is ready for review, and not WIP. Thanks ! |
test("SPARK-41497: accumulators should be reported in the case of task retry with rdd cache") { | ||
// Set up a cluster with 2 executors | ||
val conf = new SparkConf() | ||
.setMaster("local-cluster[2, 1, 1024]").setAppName("TaskSchedulerImplSuite") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The app name is not appropriate here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will update the app name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a quick pass through it.
/** | ||
* Retrieve the given block if it exists, otherwise call the provided `makeIterator` method | ||
* to compute the block, persist it, and return its values. | ||
* | ||
* @return either a BlockResult if the block was successfully cached, or an iterator if the block | ||
* could not be cached. | ||
*/ | ||
def getOrElseUpdate[T]( | ||
private[spark] def getOrElseUpdate[T]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this private instead ? And fix the tests which are depending on this directly to use getOrElseUpdateRDDBlock
instead ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds more reasonable. Will make the change. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
if (!computed) { | ||
// Loaded from cache, re-compute to update accumulators. | ||
makeIterator() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a block is not visible, and it was not computed in this task, we should not be using that block's cached data at all - even if cached here or elsewhere.
Particularly for unordered or indeterminate computation, this can impact the accumulators generated - and it wont match the result iterator returned.
So we cant simply call makeIterator
here - as @Ngone51 suggested, we should pass the flag to getOrElseUpdate
and decide there based on visibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will make the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Particularly for unordered or indeterminate computation, this can impact the accumulators generated - and it wont match the result iterator returned.
There could be some other issues with such scenarios, like if we have blocks generated from different task attempts or cache re-compute, each cache may also have different data. Would this also be a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point - and couple of issues that come to mind:
- We do not disable speculative execution for indeterminate computation currently - and data generated from two task attempts can vary (which could be cached, and so differ for same partition)
- We do not invalidate previously cached data, when there is a stage re-execution due to failures for an indeterminate computation,
This is something we should revisit @Ngone51, @jiangxb1987 - thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not disable speculative execution for indeterminate computation currently - and data generated from two task attempts can vary (which could be cached, and so differ for same partition)
Right..in this case, the rdd block locations for different data can be attached to the same rdd block id. So the reader could get the different data for the same rdd block, which makes the rdd block data also indeterminate.
We do not invalidate previously cached data, when there is a stage re-execution due to failures for an indeterminate computation,
This seems to be a missing point in the indeterminate computation framework. @cloud-fan could you help confirm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right..in this case, the rdd block locations for different data can be attached to the same rdd block id. So the reader could get the different data for the same rdd block, which makes the rdd block data also indeterminate.
@Ngone51 do you think shall we solve this issue in this PR? This looks like a more general issue about the indeterminate computation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. We should fix that issue separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, can you add a todo for this here @ivoson ? Thx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
if (blockInfoManager.isRDDBlockVisible(blockId)) { | ||
return true | ||
} | ||
master.isRDDBlockVisible(blockId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: Should we be caching this instead of hitting driver each time ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I've thought about this. Then we will add a mechanism to clean the cache. Let me re-think about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way in my mind is that, we cache the results(for visible rdd blocks) in the block manager. Once the rdd is removed, a broadcast message will be sent to each BlockManager to clean the cache.
I am wondering is it worth to do this? Since there is locality scheduling, if tasks got scheduled to the executor where cached block exists, there'll be no calls to master.
Let me know your thoughts about this, thanks. cc @mridulm @Ngone51
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once the rdd is removed, a broadcast message will be sent to each BlockManager to clean the cache.
I think the problem in this way is that it introduces the race condition between the visible cache and the broadcast message.
master.isRDDBlockVisible(blockId)
shouldn't be frequently called, right? I thought blockInfoManager.isRDDBlockVisible(blockId)
would be the most case for us, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant was, cache visibility - once a block becomes visible, we do not revert it back to being invisible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not trying to talk about the cache stuff. But just try to highlight this. So this should somehow be a behavior change, right? If T1 generates B1 in early time and T1 turns out to be a long-running task, it can be terrible for tasks like T2 which reads B1.
Do we actually have usecases where some other stage/task is depending on an earlier task generating a block as a prefix to its computation and the task itself has not completed ?
Looks like a fairly brittle assumption, no ? (Unless I misunderstood the usecase here !)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, the current PR is handling it on a second read ... Since we are already checking for blockInfoManager.isRDDBlockVisible(blockId) first.
This should cover the case of (1) - and we will always query in case block is available, and we have to distinguish (2).
(2.1) would be an optimization we can attempt later on.
Thanks @mridulm . Just want to clarity that are you suggesting that we can also cache the state got from driver when it's already visible?
I think we can update the cache state if the block exists in the executor after getting the results from driver/master.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, blockInfoManager.isRDDBlockVisible(blockId)
should check for the current executor side state of the block to see if it is visible or not.
If visible, immediately return true.
Else, query driver for visibility status - if driver returns true, cache that.
Also, when a block becomes visible at driver, it will update the block managers hosting the block about the visibility status - which is then maintained at executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually have usecases where some other stage/task is depending on an earlier task generating a block as a prefix to its computation and the task itself has not completed ?
@mridulm Right...after a second thinking, it's not a valid concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, please take a look. Thanks.
// Mapping from task id to the set of rdd blocks which are generated from the task. | ||
private val tidToRddBlockIds = new mutable.HashMap[Long, mutable.HashSet[RDDBlockId]] | ||
// Record the visible RDD blocks which have been generated at least from one successful task. | ||
private val visibleRDDBlocks = new mutable.HashSet[RDDBlockId] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of keeping track of visible blocks, should we be tracking those which are not yet visible ?
The size would be much smaller - blocks by default get added to this Set
, and get promoted to visible explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be better. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just found one problem if we track the invisible RDD blocks.
If we track the invisible RDD blocks, then we would mark a RDD block as visible(cache can be used) only when it exists in blockLocations
and not exists in invisibleRDDBlocks
. When blockLocations
removed the block(could be caused by executor lost), we will lose the information. Then the new cached data won't be leveraged as soon as possible(right after the cache is generated/reported to master).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I follow - we cannot leverage the block until it is explicitly marked visible (unless within same task).
If the executor is lost before that happens, then we can never make it visible - and so fine to clean up from invisible blocks, right ? (Within task's executor, if check is for task, block can be used if visible or if tid == current task for invisible)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me explain this further. If we track visible blocks, it's clear that we always know which blocks are visible.
If we track invisible blocks, the way we consider a block as visible is that at least one block exists and it's not in invisible lists. So if the existing blocks got lost, we will lose the information. Next time the cache is re-computed, we will do this again(firstly put it into invisible lists, then promote it to visible by removing it from invisible list once task finished successfully). And after doing the process again, the cache would be visible then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If existing blocks are lost - why would you need that information as they are gone? In other words, how is it different from today's situation (without visibility) - if a block is lost, it is no longer in system.
Here is an example for the scenario I am trying to describe:
- we have a cached block rdd_1_1 which has been successfully cached and marked as visible.
- the cached block got lost due to executor lost;
- another task on rdd1 got submitted and the 1st attempt failed after putting the cache block rdd_1_1, for the 2nd attempts, things could be different here:
a. if we still have the visiblily status, the 2nd attempt can use the cached block directly;
b otherwise, we still need to do the computing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit unclear on this actually ... why would a block status be cached on an executor (other than driver) which is not hosting the block ?
If executor is lost, driver will update its state as well due to the loss (evict the block) ... if there are other copies of the block, continues to be visible (since block exists). If task fails before block transitions to visible (or executor loss), driver will evict all blocks.
Note, when executor is fetching a block for computation from remote executor - it should be 'seeing' a block only if it is visible.
(If block is locally available due to replication - then it is invisible until marked visible. And so same logic as above would work).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If executor is lost, driver will update its state as well due to the loss (evict the block) ... if there are other copies of the block, continues to be visible (since block exists). If task fails before block transitions to visible (or executor loss), driver will evict all blocks.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will try to make the change, the scenario I mentioned above would not be a common case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ivoson could you add a feature flag for the whole changes? |
case None => | ||
// doPut() didn't hand work back to us, so the block already existed or was successfully | ||
// stored. Therefore, we now hold a read lock on the block. | ||
if (!isCacheVisible && !computed) { | ||
// Force compute to report accumulator updates. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the result is only computed but not cached, no?
Could we force put the result iterator even if the block exists in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if the block already exists in the node, here just computed it again but not putting the result.
Could we force put the result iterator even if the block exists in this case?
This will need to modify the locking mechanism a little bit. This is also a issue about indeterminate operation, for derterminate operations no need to replace the cache here. I am wondering can the inderterminate framework cover this case with some other solutions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if the block already exists in the node, here just computed it again but not putting the result.
Shouldn't we replace the block with the new computed result?
Oh..the recomputation is only for accumulator updates? and the result should be same so no need to replace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the recomputation is only for updating accumulators. The reulst should be the same unless the result is indeterminate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.doc("Set to be true to enabled RDD cache block's visibility status. Once it's enabled," + | ||
" a RDD cache block can be used only when it's marked as visible. And a RDD block will be" + | ||
" marked as visible only when one of the tasks generating the cache block finished" + | ||
" successfully.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally add that this is relevant in context of consistent accumulator status
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
if(master.isRDDBlockVisible(blockId)) { | ||
// Cache the visibility status if block exists. | ||
blockInfoManager.tryAddVisibleBlock(blockId) | ||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we still return true
if blockInfoManager
doesn't contain the block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so. Even though current executor doesn't have the cached block, we still can read the cache from a remote executor.
@@ -77,6 +78,11 @@ class BlockManagerMasterEndpoint( | |||
// Mapping from block id to the set of block managers that have the block. | |||
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] | |||
|
|||
// Mapping from task id to the set of rdd blocks which are generated from the task. | |||
private val tidToRddBlockIds = new mutable.HashMap[Long, mutable.HashSet[RDDBlockId]] | |||
// Record the visible RDD blocks which have been generated at least from one successful task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
visible? but the variable name is invisible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed the comments.
|
||
case UpdateRDDBlockTaskInfo(blockId, taskId) => | ||
// This is to report the information that a rdd block(with `blockId`) is computed | ||
// and cached by task(with `taskId`). The happens right after the task finished |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"The" -> "This"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated.
|
||
case GetRDDBlockVisibility(blockId) => | ||
// Get the visibility status of a specific rdd block. | ||
if (!trackingCacheVisibility) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you push down the flag check into isRDDBlockVisible
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, updated.
* Record visible rdd blocks stored in the block manager, entries will be removed | ||
* by [[removeBlock()]] | ||
*/ | ||
private[spark] val visibleRDDBlocks = ConcurrentHashMap.newKeySet[RDDBlockId] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's weird to me that we use the opposite visibility tracking logic between the driver and executor. Couldn't we use the same "invisible" logic at the executor too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, will make the change. Thanks.
verify(master, times(2)).updateRDDBlockTaskInfo(blockId, 1) | ||
} | ||
|
||
test("add block rdd visibility status") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the JIR number for the new tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
val res = blockInfoWrappers.putIfAbsent(blockId, wrapper) | ||
if (res == null && trackingCacheVisibility) { | ||
// Added to invisible blocks if it doesn't exist before. | ||
blockId.asRDDId.foreach(invisibleRDDBlocks.add) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest updating invisibleRDDBlocks
within lockForWriting()
when the write locking is acquired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the way we check whether a block is visible is to check the information from both blockInfoWrappers
and invisibleRDDBlocks
, so we will need the synchronized block to modify the state to avoid concurrent issues (invisibleRDDBlocks
should be updated once new item put into blockInfoWrappers
within the synchronized block). It will be easier to have the syncrhonized block here.
private[spark] def isRDDBlockVisible(blockId: RDDBlockId): Boolean = {
if (trackingCacheVisibility) {
invisibleRDDBlocks.synchronized {
blockInfoWrappers.containsKey(blockId) && !invisibleRDDBlocks.contains(blockId)
}
} else {
// Always be visible if the feature flag is disabled.
true
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok..but seem like the access to blockInfoWrappers
is not always protected by invisibleRDDBlocks.synchronized
? e.g., BlockInfoManager#blockInfo
, BlockInfoManager#acquireLock
. It doesn't make sense if we only protect it when it's accessed together with invisibleRDDBlocks
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we syncrhonized all the write operations, and also the read operations which need to check both the variables. For other read operations, since the variabled won't change, I think synchronized block for such read operations may be not necessary.
@@ -502,7 +536,10 @@ private[storage] class BlockInfoManager extends Logging { | |||
throw new IllegalStateException( | |||
s"Task $taskAttemptId called remove() on block $blockId without a write lock") | |||
} else { | |||
blockInfoWrappers.remove(blockId) | |||
invisibleRDDBlocks.synchronized { | |||
blockInfoWrappers.remove(blockId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blockInfoWrappers
isn't necessary to be protected by invisibleRDDBlocks.synchronized
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to make sure that state change for both blockInfoWrappers
and invisibleRDDBlocks
are processed in the same syncrhonized block to avoid potential concurrent issues since we depends on the state of the 2 variables to decide whether a block is visible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good - just a few minor comments.
Thanks for working on this @ivoson !
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
Outdated
Show resolved
Hide resolved
Hi @mridulm comments addressed. Please take a look when you have time. Thanks. |
" a RDD cache block can be used only when it's marked as visible. And a RDD block will be" + | ||
" marked as visible only when one of the tasks generating the cache block finished" + | ||
" successfully. This is relevant in context of consistent accumulator status.") | ||
.version("3.4.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the commit history, it seems this pr lands at master branch(3.5.0). Should this backport to 3.4.0 or update the version ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we should mark this as 3.5.0 instead ... slipped through the review (it was created way before 3.4 was cut).
Thanks for catching this @ulysses-you !
Can you create a follow up to change this to 3.5 @ivoson ? Thx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do.
…ityTracking.enabled` support version to 3.5.0 ### What changes were proposed in this pull request? In #39459 we introduced a new config entry `spark.rdd.cache.visibilityTracking.enabled` and mark the support version as 3.4.0. Based on the discussion #39459 (comment) we won't backport this to 3.4, so modify the support version for the new added config entry to 3.5.0 ### Why are the changes needed? Fixing config entry support version. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT. Closes #40281 from ivoson/SPARK-41497-followup. Authored-by: Tengfei Huang <tengfei.h@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
As described in SPARK-41497, when a task with rdd cache failed after caching the block data successfully, the retry task will load from the cache. While since the first task attempt failed, so the registered accumulators won't get updated.
The general idea to fix the issue in this PR is to add a visibility status for RDDBlocks, a RDDBlock will be visible only when one of the tasks generating the RDDBlock succeed to guarantee that accumulators have been updated.
Making below changes to do this:
BlockManagerMasterEndpoint
, addingvisibleRDDBlocks
to help record the RDDBlocks which are visible, andtidToRddBlockIds
to help to track the RDDBlocks generated in each taskId so that we can update the visibility status based on task status;BlockInfoManager
, addingvisibleRDDBlocks
to track the visible RDDBlocks in the block manager, once a RDDBlock is visible, master will ask BlockManagers having the block to update the visibility status;RDD
getOrCompute, re-compute the partition to update accumulators if the cached RDDBlock is not visible event if the cached data exists, and report the taskId and RDDBlock relationship toBlockManagerMasterEndpoint
;BlockManagerMasterEndpoint
to update the blocks to be visible, and broadcast the visibility status toBlockManagers
having the cached data.Why are the changes needed?
Bug fix.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Adding new UT.