Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25341][Core] Support rolling back a shuffle map stage and re-generate the shuffle files #24892

Closed
wants to merge 9 commits into from

Conversation

xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Jun 17, 2019

After the newly added shuffle block fetching protocol in #24565, we can keep this work by extending the FetchShuffleBlocks message.

What changes were proposed in this pull request?

This is a follow-up work for #22112's future improvment[1]: Currently we can't rollback and rerun a shuffle map stage, and just fail.

Spark will rerun a finished shuffle write stage while meeting fetch failures, currently, the rerun shuffle map stage will only resubmit the task for missing partitions and reuse the output of other partitions. This logic is fine in most scenarios, but for indeterministic operations(like repartition), multiple shuffle write attempts may write different data, only rerun the missing partition will lead a correctness bug. So for the shuffle map stage of indeterministic operations, we need to support rolling back the shuffle map stage and re-generate the shuffle files. In this patch, we achieve this by adding the indeterministic tag in the stage, for the indeterministic stage, when the stage resubmits, we'll clear all existing map status and rerun all partitions. We also add properties in task LocalProperties to record the message for the retried indeterminate stage, so that the shuffle file which generated from retried indeterministic stage will keep the shuffle generation id in the file name. The corresponding reduce side will specify which shuffle generation id of shuffle it wants to read. The shuffle generation id marked the retried times of this stage, so we reuse the stage attempt id as the shuffle generation id while meeting the indeterminate stage reran.

All changes are summarized as follows:

  • Extend FetchShuffleBlock message with shuffleGenerationId.
  • Add corresponding support for ShuffleBlockResolver, if the shuffle file generated from the indeterminate stage, its name will contain the indeterminateAttemptId, otherwise the file name just as before.
  • Add the retried indeterminate stage info in TaskContext.localProperties and use it in Shuffle Reader and Writer.
  • Add the determinate flag in Stage and use it in DAGScheduler and the cleaning work for the intermediate stage.

How was this patch tested?

  • UT: Add UT for all changing code and newly added function.
  • Manual Test: Also providing a manual test to verify the effect.
import scala.sys.process._
import org.apache.spark.TaskContext

val determinateStage0 = sc.parallelize(0 until 1000 * 1000 * 100, 10)
val indeterminateStage1 = determinateStage0.repartition(200)
val indeterminateStage2 = indeterminateStage1.repartition(200)
val indeterminateStage3 = indeterminateStage2.repartition(100)
val indeterminateStage4 = indeterminateStage3.repartition(300)
val fetchFailIndeterminateStage4 = indeterminateStage4.map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 190 && 
  TaskContext.get.stageAttemptNumber == 0) {
  throw new Exception("pkill -f -n java".!!)
  }
  x
}
val indeterminateStage5 = fetchFailIndeterminateStage4.repartition(200)
val finalStage6 = indeterminateStage5.repartition(100).collect().distinct.length

It's a simple job with multi indeterminate stage, it will get a wrong answer while using old Spark version like 2.2/2.3, and will be killed after #22112. With this fix, the job can retry all indeterminate stage as below screenshot and get the right result.
image

@SparkQA
Copy link

SparkQA commented Jun 17, 2019

Test build #106588 has finished for PR 24892 at commit 11301bc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

ping @cloud-fan @gatorsmile

ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
ExecutorShuffleInfo executor, int shuffleId,
int mapId, int reduceId, int stageAttemptId) {
String baseFileName = "shuffle_" + shuffleId + "_" + mapId + "_0";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to your change, but do you know what _0 means here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, it's the IndexShuffleBlockResolver.NOOP_REDUCE_ID, as described in the comment

// No-op reduce ID used in interactions with disk store.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.

After all blocks consolidate in single file, we didn't use reduceId in the shuffle file name, just use the offsite reading from index file to find the block in the shuffle data file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was from the hash shuffle algorithm from long ago -- that last id was the reducePartitionId. But now we always merge all reducePartitions into one.

@cloud-fan
Copy link
Contributor

Hi @xuanyuanking thanks for your great work! can you extend the PR description to explain what is shuffle generation id and why we can use stage attempt id to represent it?

@SparkQA
Copy link

SparkQA commented Jun 20, 2019

Test build #106719 has finished for PR 24892 at commit 5a4b8b8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

@cloud-fan Thanks Wenchen for your review and advice, comment address and test fix done.

@SparkQA
Copy link

SparkQA commented Jun 20, 2019

Test build #106724 has finished for PR 24892 at commit b8e3133.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -30,7 +30,7 @@ trait BlockDataManager {
* Interface to get local block data. Throws an exception if the block cannot be found or
* cannot be read successfully.
*/
def getBlockData(blockId: BlockId): ManagedBuffer
def getBlockData(blockId: BlockId, shuffleGenerationId: Int): ManagedBuffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, it's a little weird to have a shuffleGenerationId parameter in a general getBlockData method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully agree, actually in the original patch, I add a new function named getShuffleBlockData, which want only to address getting shuffle block data. But I found that for support old version of fetching blocks by OpenBlocks, we still keep the logic of fetching shuffle block data which has no shuffle generation id in getBlockData function, so I revert back and implement as the current version. Does Wenchen have any suggestion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep getBlockData unchanged, and add a new getShuffleBlockData method.

For old shuffle protocol, we still call getBlockData. Otherwise, call getShuffleBlockData.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, done this in 96bab1e.

// First figure out the indexes of partition ids to compute.
// Before find missing partition, do the intermediate state clean work first.
stage match {
case sms: ShuffleMapStage if stage.isIndeterminate =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add if stage.latestInfo.attemptNumber() > 0 as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will cause the correctness problem because we didn't call makeNewStageAttempt so far here.

@SparkQA
Copy link

SparkQA commented Jun 23, 2019

Test build #106807 has finished for PR 24892 at commit 4c4af33.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
if (indeterminateBlock) blockId += "_0";
Copy link
Contributor

@cloud-fan cloud-fan Aug 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_0 is a little confusing here. Maybe we should have a shuffleGenerationId parameter (by default -1), instead of
indeterminateBlock.

@@ -168,7 +180,9 @@ private FetchResult fetchBlocks(
String execId,
String[] blockIds,
TransportConf clientConf,
int port) throws Exception {
int port,
int shuffleGenerationId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just an Integer shuffleGenerationId, and use null to indicate fetching data block.

@@ -157,9 +158,20 @@ public void releaseBuffers() {
}
}

// Fetch a set of blocks from a pre-registered executor.
// Fetch a set of shuffle blocks with default generation id -1 from a pre-registered executor.
private FetchResult fetchBlocks(String execId, String[] blockIds) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we rename it to fetchShuffleBlocks?

}

// Fetch a set of shuffle blocks from a pre-registered executor.
private FetchResult fetchBlocks(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

TransportConf transportConf) {
TransportConf transportConf,
boolean useShuffleBlockFetcher,
int shuffleGenerationId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, shall we use a single Integer shuffleGenerationId parameter?

@@ -39,16 +39,20 @@
/**
* Called once per map task to create a writer that will be responsible for persisting all the
* partitioned bytes written by that map task.
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param shuffleId Unique identifier for the shuffle the map task is a part of
* @param shuffleGenerationId The shuffle generation ID of the stage that this task belongs to,
Copy link
Contributor

@cloud-fan cloud-fan Aug 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

The generation ID of the shuffle, which is always -1 if the shuffle map stage is deterministic.
Otherwise, it starts at 0 and increases when the shuffle map stage gets retied.

abortStage(mapStage, reason, None)
} else {
logInfo(s"The indeterminate stage $mapStage will be resubmitted," +
" the stage self and all indeterminate parent stage will be" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the stage itself

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all indeterminate parent stage => all parent stages. We need to rerun the parent stages even if they are deterministic.

@@ -28,7 +28,8 @@ private[spark] class TaskSet(
val stageId: Int,
val stageAttemptId: Int,
val priority: Int,
val properties: Properties) {
val properties: Properties,
val isIndeterminateRerun: Boolean = false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about allowSpeculativeTask: Boolean = true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and add some comments to explain when we don't allow speculative tasks.

}

test("SPARK-25341: continuous indeterminate stage roll back") {
// shuffleMapRdd1/2/3 are all indeterminate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we make the shuffleMapRdd3 deterministic? To prove that all parent stages need to rerun even if they are deterministic.

val mapId = 2
val idxName = s"shuffle_${shuffleId}_${mapId}_0.index"
private def testWithIndexShuffleBlockResolver(
shuffleId: Int, mapId: Int, idxName: String, generationId: Int): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we put generationId right after shuffleId?

*/
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int shuffleGenerationId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shuffleGenerationId is a bit long, how about just generationId?

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a few minor comments, thanks for fixing it!

@squito
Copy link
Contributor

squito commented Aug 12, 2019

Sorry I don't understand -- why don't you want to support speculative execution?

Correctness before performance, yes.

On the assumptions about task independence -- yes, that was the assumption, before this whole thread of issues related to non-determinstic tasks and stage retry.

Fetch failures are more likely on large clusters & large workloads, precisely where speculative execution is important too. If we couldn't get it to work together, then I would totally agree we should go for correctness. But I think using the global TID would give us the behavior we want.

I also think the global TID is simpler. For one, debugging is simpler -- the shuffle id is actually the shuffle id; there isn't some other state that is tracked separately to know which block to get.

If you're really just concerned about the overhead of an additional field in the shuffle block, I think you could even swap out the original map partition id for the TID of the map task (though that would be more complex in other ways, the MapStatus would need to track the TID since its position in the array would no longer be sufficient).

@cloud-fan
Copy link
Contributor

After another look, I think speculative task is OK. When we run an indeterminate shuffle map stage, it's always a fresh run (either the first run, or a retry that reruns all the downstream stages). Sorry about missing it before.

It's fine to write shuffle files with speculative tasks. The shuffle map task writes to a temp file first, and then try to rename the temp file to the formal shuffle file name(shuffleId-mapId-reduceId). If a file with the formal shuffle file name already exists, give up and delete the temp file.

I think it's a good idea to use TID instead of partition ID to represent mapId. There is no more file name conflict anymore. We can keep the shuffle protocol unchanged, but there will be a little overhead in ShuffleStatus, which I think is acceptable.

One concern is, it will be hard to test. Now we need to query MapOutputTracker to get mapId, instead of writing mapId(0, 1, 2, 3, ...) directly in test.

I think this worth a discussion, cc @vanzin @tgravescs @jiangxb1987

@squito
Copy link
Contributor

squito commented Aug 12, 2019

Thanks for taking another look.

Another thing to keep in mind w/ speculative shuffle tasks -- the scheduler never puts speculative tasks on the same host as the original task. That ensures the speculative task isn't trying to write to the same disk with local shuffle storage. But, that doesn't help at all with distributed shuffle storage (and doesn't help deal w/ zombie tasks etc.)

@vanzin
Copy link
Contributor

vanzin commented Aug 15, 2019

So I'm trying to page in enough context about all this, but I can't shake this feeling that I'm missing something about speculative tasks in non-deterministic stages being safe.

The code that triggers me is this, in DAGScheduler.handleTaskCompletion:

          case smt: ShuffleMapTask =>
            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
            shuffleStage.pendingPartitions -= task.partitionId
            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            logDebug("ShuffleMapTask finished on " + execId)
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
            } else {
              // The epoch of the task is acceptable (i.e., the task was launched after the most
              // recent failure we're aware of for the executor), so mark the task's output as
              // available.
              mapOutputTracker.registerMapOutput(
                shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
            }

That seems to be blindly overwriting an existing task's output with the new one. Wouldn't that mean that a speculative task could replace the output of another task after the stage has finished (and thus after the next stage started running)?

The stage is marked as finished as soon as there are output blocks for all the partitions, and at that point there may still be speculative tasks that haven't reported back. I believe in that case the driver makes an effort to kill them, but what if that task result arrives first?

(There's a check at the very next line where if checks if (runningStages.contains(shuffleStage)...), which feels like it should also be applied to the mapOutputTracker call I mention above.)

Sorry if I'm missing something obvious. Need to spend more time to fully understand this. (I also realize that what I'm commenting on isn't necessarily caused by this particular PR or would change by the latest suggestions, but rather is an existing thing.)

@cloud-fan
Copy link
Contributor

@vanzin I think your concern is valid. Seems the shuffle writing policy is contradictory to itself: if a partition has multiple shuffle write tasks, 1) if they are on the same node (e.g. a stage becomes zombie and rerun), first write wins. 2) if they are on different nodes (speculative tasks), last write wins.

I think we should stick with "first write wins". As you said we should only update the map status if the task is the first one that completes.

@cloud-fan
Copy link
Contributor

@vanzin I checked the code:

if (!stageIdToStage.contains(task.stageId)) {
// The stage may have already finished when we get this event -- eg. maybe it was a
// speculative task. It is important that we send the TaskEnd event in any case, so listeners
// are properly notified and can chose to handle it. For instance, some listeners are
// doing their own accounting and if they don't get the task end event they think
// tasks are still running when they really aren't.
postTaskEnd(event)
// Skip all the actions if the stage has been cancelled.
return
}

Spark will ignore late speculative tasks that are completed after the stage is completed. So at least the worst case won't happen: a speculative task could replace the output of another task after the stage has finished (and thus after the next stage started running)

But we still have the contradiction: executor side first shuffle write wins, driver side last shuffle write wins. When we run an indeterminate stage, the downstream stages are always fresh (when we rerun an indeterminate stage, the scheduler rolls back all downstream stages). So it doesn't matter which shuffle write wins, as long as the shuffle write is atomic. We'd better fix this contradiction, but it doesn't cause any real problems.

@squito
Copy link
Contributor

squito commented Aug 15, 2019

thanks for the looking into this @vanzin and @cloud-fan . I agree its unnecessarily confusing, would be good to make this consistent.

@vanzin
Copy link
Contributor

vanzin commented Aug 20, 2019

Thanks for looking at the speculation-related code. it's a little confusing at times.

Another idea: instead of introducing a shuffleGenerationId, couldn't we use the MapOutputTracker epoch for this same purpose?

e.g. when an indeterminate stage fails, you call into the MapOutputTracker to invalidate that stage (a new call that would be basically unregisterShuffle + incrementEpoch).

The DAGScheduler might need some adjustments, but other than that, wouldn't this approach also solve the problem? And without needing to change the way block IDs are used anywhere.

@vanzin
Copy link
Contributor

vanzin commented Aug 20, 2019

Hmm, nevermind, that doesn't account for the existing files from the old shuffle, which is the main change in this PR (changing the file names so that they contain the shuffle generation id).

@vanzin
Copy link
Contributor

vanzin commented Aug 20, 2019

@cloud-fan could you clarify this comment?

One concern is, it will be hard to test. Now we need to query MapOutputTracker to get mapId, instead of writing mapId(0, 1, 2, 3, ...) directly in test.

What tests are you thinking about? The way I see Imran's suggestion (encode the task ID in the MapStatus, which would take care of most of the book-keeping), it should be mostly transparent to test, wouldn't it?

Maybe there are some tests that make assumptions about how the MapOutputTracker works internally, but it seems worthy to fix that in any case, so that tests don't assume that the mapId in the shuffle block ID is the same as the partition ID.

On an unrelated note, just a note about my comments above: it seems that this could also be fixed by using a "last write wins" approach on the executor side (invalidate all outputs from previous stage attempt in the tracker, and tasks from the new stage attempt would overwrite files generated by the previous attempt). But that sounds dangerous (and hard to make sure all is working as it should), so the unique file name based on task ID sounds like a good balance between complexity and size of changes. (It would shift a lot of the changes to the test code, if what Wenchen says really is a problem, but I prefer that to more changes in the main code.)

@vanzin
Copy link
Contributor

vanzin commented Aug 20, 2019

Oh, one more thing: changing from partition ID to task ID in ShuffleBlockId & friends would still qualify as a change in the shuffle service protocol, since there's a type change from int to long, and a lot of code in the shuffle service assumes that the id will be an integer.

It should still work in a lot of cases with an old shuffle service, but would break when task IDs exceed Int.MaxValue.

@xuanyuanking
Copy link
Member Author

xuanyuanking commented Aug 29, 2019

Great thanks @squito for the idea of reusing the task attempt id as map id, this significantly reduces the code changes. I reimplement the task in #25620. Beside of the tests changes and map status should add the map task attempt id, I found maybe the last overhead during this work, it's about the SortShuffleManager, we need to record all the map task id while only keep the map numbers before, let's discuss this here.

Oh, one more thing: changing from partition ID to task ID in ShuffleBlockId & friends would still qualify as a change in the shuffle service protocol, since there's a type change from int to long, and a lot of code in the shuffle service assumes that the id will be an integer.

Thanks @vanzin for the reminding, the compatibility for external shuffle service is definitely an important consideration. We'll only do this extension for the new shuffle protocol, thanks for the work in #24565, we can compatible with old external shuffle service by using the old protocol, you can see the corresponding implement here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants