Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-29998][CORE] Retry getFile() until all folder failed then exit #26643

Closed
wants to merge 2 commits into from

Conversation

AngersZhuuuu
Copy link
Contributor

What changes were proposed in this pull request?

If one NodeManager's disk is broken. when task begin to run, it will get jobConf by broadcast, executor's BlockManager failed to create file. and throw IOException.

19/11/22 15:14:36 INFO org.apache.spark.scheduler.DAGScheduler: "ShuffleMapStage 342 (run at AccessController.java:0) failed in 0.400 s due to Job aborted due to stage failure: Task 21 in st
age 343.0 failed 4 times, most recent failure: Lost task 21.3 in stage 343.0 (TID 34968, hostname, executor 104): java.io.IOException: Failed to create local dir in /disk
11/yarn/local/usercache/username/appcache/application_1573542949548_2889852/blockmgr-a70777d8-5159-48e7-a47e-848df01a831e/3b.
        at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:70)
        at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:129)
        at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:605)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$apply$2.apply(TorrentBroadcast.scala:214)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:211)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1326)
        at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
        at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
        at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
        at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
        at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
        at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:144)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:228)
        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224)
        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Since in TaskSetManager.handleFailedTask()
For this kind of fail reason, it will retry on this Executor until failedTime > maxTaskFailTime
Then this stage failed, total job failed.

In this pr , i want to make it try all local folders, if all folder is broken. then exit executor.

Why are the changes needed?

This problem make job failed, we can fix it by retry.

Does this PR introduce any user-facing change?

NO

How was this patch tested?

WIP

@AngersZhuuuu
Copy link
Contributor Author

@cloud-fan @dongjoon-hyun @HyukjinKwon @srowen
To fix this problem , WDYT?
Hope for advise

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-29998][CORE]Retry getFile() until all folder failed then exit [SPARK-29998][CORE] Retry getFile() until all folder failed then exit Nov 24, 2019
@dongjoon-hyun
Copy link
Member

Hi, @AngersZhuuuu .
This is possible, but I'm not sure it's a good idea to live with the node.
We already handle executor failure in the upper layers, don't we?

@AngersZhuuuu
Copy link
Contributor Author

Hi, @AngersZhuuuu .
This is possible, but I'm not sure it's a good idea to live with the node.
We already handle executor failure in the upper layers, don't we?

I know we will handle executor, but when this situation happened. Executor won't failed.
By current getFile's logical. This task will always retry on this executor's same localdir and subdir.
Since the hash value is same. And will failed all task retries. Finally job failed.
And this kind problem won't make stage retry.

@dongjoon-hyun
Copy link
Member

Can we have a reproducible test case for your claim?

@AngersZhuuuu
Copy link
Contributor Author

Can we have a reproducible test case for your claim?

Yea, I will try reproduce this in UT.

@dongjoon-hyun
Copy link
Member

Sorry, but this is not okay to trigger Jenkins yet.

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-29998][CORE] Retry getFile() until all folder failed then exit [WIP][SPARK-29998][CORE] Retry getFile() until all folder failed then exit Nov 24, 2019
@AngersZhuuuu
Copy link
Contributor Author

Sorry, but this is not okay to trigger Jenkins yet.

No need to trigger yet. Make this pr want to show more clear where the problem is . And then work base this. Add WIP to the title. Thanks for your rigorous work

@AngersZhuuuu
Copy link
Contributor Author

Can we have a reproducible test case for your claim?

Hard to reproduce since this happened when job start and before HadoopRDD.compute(). Then will show error message like I have show. I can't reproduce it in UT.
But When test , I can get so many kinds of error cased by DiskBlockManger.getFile() that can destroy stage or job.

case e: IOException =>
logError(s"Failed to create local dir in $newDir.", e)
count = count + 1
localDirIndex.remove(hashIndex)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Methinks this change corrupts shuffle writer an reader's dependency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Methinks this change corrupts shuffle writer an reader's dependency

Retry only happened for newSubDir, won't destroy origin dependency.
If one blockId have corresponding folder. It will return origin old.
For new coming blockId, will retry when mkdir failed.
And if finally return File and put into subDirs

subDirs(dirId)(subDirId) = newDir

When this block's request come again, will return old since subDirs(dirId)(subDirId) != null

Copy link
Member

@yaooqinn yaooqinn Nov 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the dirId seems unstable, I guess for external shuffle it probably goes wrong

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the dirId seems unstable, I guess for external shuffle it probably goes wrong

Got your point. If the disk problem was fixed, the dirId may not same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaooqinn Add a method to control this problem, but it doesn't look very elegant. As @srowen mentioned, we can use blacklist to prevent this bad case. But i don't think it can handle all executor disk problem

@srowen
Copy link
Member

srowen commented Nov 24, 2019

Does the executor not eventually blacklist in this case or am I missing the idea here?

@AngersZhuuuu
Copy link
Contributor Author

AngersZhuuuu commented Nov 24, 2019

Does the executor not eventually blacklist in this case or am I missing the idea here?

BlackList can be useful. But some times stage failed to start running task.

@cloud-fan
Copy link
Contributor

If the shuffle final path is in a broken disk, we have the same problem, right?

Currently we have a deterministic mapping from filename to its path. The benefit is: the path calculating is stateless and it's cheap to do. We can even do it in different JVMs. But not sure if we leverage this property in Spark. cc @vanzin @squito

@AngersZhuuuu
Copy link
Contributor Author

If the shuffle final path is in a broken disk, we have the same problem, right?

Currently we have a deterministic mapping from filename to its path. The benefit is: the path calculating is stateless and it's cheap to do. We can even do it in different JVMs. But not sure if we leverage this property in Spark. cc @vanzin @squito

yes, maybe add blacklist is enough for this problem. I will check how ExternalShuffleService use deterministic mapping from filename to its path.

@srowen srowen closed this Nov 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants