Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29976][CORE] Trigger speculation for stages with too few tasks #26614

Closed
wants to merge 13 commits into from

Conversation

@yuchenhuo
Copy link
Contributor

yuchenhuo commented Nov 21, 2019

What changes were proposed in this pull request?

This PR add an optional spark conf for speculation to allow speculative runs for stages where there are only a few tasks.

spark.speculation.task.duration.threshold

If provided, tasks would be speculatively run if the TaskSet contains less tasks than the number of slots on a single executor and the task is taking longer time than the threshold.

Why are the changes needed?

This change helps avoid scenarios where there is single executor that could hang forever due to disk issue and we unfortunately assigned the single task in a TaskSet to that executor and cause the whole job to hang forever.

Does this PR introduce any user-facing change?

yes. If the new config spark.speculation.task.duration.threshold is provided and the TaskSet contains less tasks than the number of slots on a single executor and the task is taking longer time than the threshold, then speculative tasks would be submitted for the running tasks in the TaskSet.

How was this patch tested?

Unit tests are added to TaskSetManagerSuite.

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

jiangxb1987 commented Nov 21, 2019

test this please

Copy link
Contributor

jiangxb1987 left a comment

Looks good only some minor comments.

private def testSingleTaskSpeculation(singleTaskEnabled: Boolean): Unit = {
sc = new SparkContext("local", "test")
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)

This comment has been minimized.

Copy link
@jiangxb1987

jiangxb1987 Nov 21, 2019

Contributor

Why do we need to set this config here?

This comment has been minimized.

Copy link
@yuchenhuo

yuchenhuo Nov 21, 2019

Author Contributor

yeah, this is not really useful. I can remove it

sc.conf.set(config.SPECULATION_ENABLED, true)
sc.conf.set(config.SPECULATION_SINGLETASKSTAGE_ENABLED, singleTaskEnabled)
// Set the threshold to be 60 minutes
sc.conf.set(config.SPECULATION_SINGLETASKSTAGE_DURATION_THRESHOLD.key, "60min")

This comment has been minimized.

Copy link
@jiangxb1987

jiangxb1987 Nov 21, 2019

Contributor

nit: why not keep it the same with the default value?

This comment has been minimized.

Copy link
@yuchenhuo

yuchenhuo Nov 21, 2019

Author Contributor

This would validate that setting the conf actually works. IMO it's better than testing the default value. I can add another test to test the default value is 30 if that's preferred.

This comment has been minimized.

Copy link
@jiangxb1987

jiangxb1987 Nov 21, 2019

Contributor

sounds good, let's keep the current way

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

jiangxb1987 commented Nov 21, 2019

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Nov 21, 2019

Test build #114180 has finished for PR 26614 at commit bc0f7e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@squito

This comment has been minimized.

Copy link
Contributor

squito commented Nov 22, 2019

I understand the case you're trying to address, but special casing for one task doesn't feel right to me. If there are two tasks, you could end up in the same situation, with two tasks running on the same bad executor? Would it make sense to have this as just a global min time for speculation?

OTOH, I do know that 1 task is common, especially when doing a final aggregation. Its not a bad solution, I just don't know if I want to be stuck w/ this conf.

@yuchenhuo

This comment has been minimized.

Copy link
Contributor Author

yuchenhuo commented Nov 23, 2019

@squito Good point! If I understand correctly, the better way to do this is to have a flag like spark.speculation.aggressive and maybe check the executed time of all the running tasks and if it exceeds the threshold, then speculative run the task. Do you think it necessary to additionally check if the tasks are running on the same executor and only speculate if so?

@cloud-fan

This comment has been minimized.

Copy link
Contributor

cloud-fan commented Nov 25, 2019

We need finished tasks to have an estimation of task duration. I agree with @squito that the only missing thing is a user-supplied task duration. Can we only have one config to specify the task duration for speculation?

yuchenhuo added 3 commits Nov 25, 2019
foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
}
}
if (speculationTaskDurationThresOpt.isDefined) {

This comment has been minimized.

Copy link
@yuchenhuo

yuchenhuo Nov 25, 2019

Author Contributor

There should be a way to combine the two if condition here but might make the logic a bit more complicated. Not sure if it's worthy to do so.

@yuchenhuo

This comment has been minimized.

Copy link
Contributor Author

yuchenhuo commented Nov 25, 2019

@cloud-fan @squito I've updated the PR corresponding to your comments. Though I guess the "ideal" way to do this is to have a mechanism to always speculative run if the threshold exceeds && there are some empty slots, but I'm not sure if it's necessary since if there is no empty slot the speculatable tasks wouldn't be run anyway.

private[spark] val SPECULATION_TASK_DURATION_THRESHOLD =
ConfigBuilder("spark.speculation.task.duration.threshold")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Nov 25, 2019

Contributor

can we add a simple .doc to explain what this config does?

This comment has been minimized.

Copy link
@yuchenhuo

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Nov 25, 2019

Contributor

yes please

if (speculationTaskDurationThresOpt.isDefined) {
val time = clock.getTimeMillis()
val threshold = speculationTaskDurationThresOpt.get
logDebug("Checking tasks taking long time than provided speculation threshold: " + threshold)

This comment has been minimized.

Copy link
@viirya

viirya Nov 25, 2019

Member

taking long time -> taking longer time

@viirya

This comment has been minimized.

Copy link
Member

viirya commented Nov 25, 2019

Please also update the title and description accordingly.

private[spark] val SPECULATION_TASK_DURATION_THRESHOLD =
ConfigBuilder("spark.speculation.task.duration.threshold")
.doc("Task duration after which scheduler would try to speculative run the task. If " +
"provided, tasks would be rerun as long as they exceed the threshold no matter whether" +

This comment has been minimized.

Copy link
@tgravescs

tgravescs Nov 25, 2019

Contributor

should change rerun to something like speculatively run.

@tgravescs

This comment has been minimized.

Copy link
Contributor

tgravescs commented Nov 25, 2019

So while I agree that this could easily happen for 2 tasks instead of 1 in cases like when both put on the same executor, if you make this apply all the time (or more then 1 task) then you have to estimate your worst case timeout across the entire application. So if you have stages with 10000 tasks that take a long time, and then another stage with 1 tasks that takes as a shorter time you have to set the config to the longer time. The 10000 task stage I would think you would want the normal speculation configs to apply to. Perhaps we either want a config for max number of tasks to apply it to or we make it smarter and say apply when you only have a single executor or tasks <= a single executor can fit

@tgravescs

This comment has been minimized.

Copy link
Contributor

tgravescs commented Nov 25, 2019

note I realize there are a bunch of corner cases here, but take back my suggestion of the number of executors, you could have 1 executor but run 10000 tasks on it and you still might want to use the regular logic. I think our best bet is either leave at 1, a config, or tasks <= the number that can fix on a single executor makes more sense to me.

@squito

This comment has been minimized.

Copy link
Contributor

squito commented Nov 25, 2019

yeah I am also torn like @tgravescs . There are a ton of corner cases. I really don't like special casing one task, but I'm not sure of the clean way to configure this. Even with 10 tasks, if you've got 4 cores per executor you've easily got 4 tasks stuck on your one bad executor, and with a default speculation quantile of 0.75 you wouldn't finish 8 tasks successfully to start speculation. If you add in the fact that the poor performance may be across an entire node, and 64 cores per node is not uncommon, the limit goes way higher.

speculative execution is always a heuristic, we know its not going to be perfect. I feel like when you enable speculation, you are saying you're willing to accept some wasted resources, so its more acceptable to run some speculative tasks when you don't really need to. But how much waste is OK? In Tom's exxample, say you had 10k tasks that each took an hour, but all are actually running fine -- the waste is pretty serious, you'll launch a speculative version of each task so its 10k cpu-hours wasted.

One alternative might be to only have this kick in when all tasks are running on the same host (the TSM already knows the hosts of the running task, its in TaskInfo, it would be easy to see if there is just one host used across all tasks).

@yuchenhuo yuchenhuo changed the title [SPARK-29976] New conf for single task stage speculation [SPARK-29976][CORE] New conf for single task stage speculation Nov 25, 2019
@yuchenhuo

This comment has been minimized.

Copy link
Contributor Author

yuchenhuo commented Nov 25, 2019

@tgravescs @squito Looks like there are two potential solutions (1) we check if all of the tasks are running on the same executor first and if so do the time threshold speculative run. (2) add another conf specifying the minimum number of total tasks in the stage that we would trigger the time threshold check.

I think both of the solutions solve the problem I'm hitting but option (2) seems more configurable and handles slightly more corner cases (i.e. multiple problematic nodes).

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

jiangxb1987 commented Nov 25, 2019

+1 for restricting the total running tasks <= the number of slots in one executor. This ensures when there are suspicious tasks that didn't finish after a while (most likely hang) we could start speculative runs, on the other hand, this ensures the speculative tasks started won't waste resources identical to one executor.

@tgravescs

This comment has been minimized.

Copy link
Contributor

tgravescs commented Nov 25, 2019

So with the all tasks run on the same host solution, it might change part way through the stage. So it might start out with all tasks on the same executor and then uses this new timeout config but then you get more executors and then change to the other speculative configs. This might be confusing to users.

@tgravescs

This comment has been minimized.

Copy link
Contributor

tgravescs commented Nov 26, 2019

it seems like either the number of slots on 1 executor or a config might be best. I think there are corner cases for all of these its just picking the one that seems to cover most.

just a side note, Tez has a similar configs but it only applies when a single task is run. it is obviously different but did solve a problem was saw at Yahoo. So based on what we saw there I'm kind of leaning towards the tasks <= number of slots of 1 executor. That doesn't add another config, covers the 1 task case, plus the 1 bad executor case. @squito thoughts?

@squito

This comment has been minimized.

Copy link
Contributor

squito commented Nov 26, 2019

sure I think I'm OK with that, its a decent compromise. You wouldn't launch speculative tasks if you've got multiple executors on a bad node, but thats OK (IIRC we also won't make a dynamic allocation to get an executor on a new node, which would be needed to really handle that case).

A couple of nitpicky points:

  • when you say the number of tasks <= number of slots of 1 executor -- is that the total number of tasks in the taskset, or the delta minFinishedForSpeculation - numSuccessfulTasks? The reason to do the delta is say you've got 10 tasks in the taskset, but the last 4 are all running on the bad executor. The taskset as a whole is too big to meet that condition, but with minFinishedForSpeculation=7 and numSuccessfulTasks=6 you'd meet the delta.
  • Doesn't it still need another config to decide what the timeout is in this case?
@tgravescs

This comment has been minimized.

Copy link
Contributor

tgravescs commented Nov 26, 2019

Yes you still need the config for the timeout, you just don't need a second one when to apply that config. ie when you have <= second config task number then use the config, otherwise use the normal speculation logic.

I was originally thinking the total number of tasks <= number of slots on 1 executor, then apply the timeout config, seemed the most straightforward and obvious to the user. I'm fine with either way though as long as it can be explained to user. I think using delta does complicate things again as it uses the new algorithm sometimes and then the original algorithm at other times. My initial thought is to keep it simple in initial implementation, they can always turn the spark.speculation.quantile down when you have a larger number of tasks, but lots of corner cases again. The thing with 1 task is that the current settings will never work for it because you need at least 1 to compare against.

Note you will ask for a new executor if you speculate and the executors are all used. It might not be on a different node though.

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

jiangxb1987 commented Nov 26, 2019

I was proposing something like this:

     if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
      // Try to add speculative tasks that has been running more than SPECULATION_MULTIPLIER * medianDuration.
    } else if (speculationTaskDurationThresOpt.isDefined && runningTasks <= conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)) {
      // Try to add speculative tasks that has been running more than a specified duration.
    } else {
      // Do not add speculative tasks.
    }

So this just introduces a new way to add speculative tasks that have been running more than a specified duration, which should be easy to reason about.

Please note I only consider the number of running tasks in the TaskSet, because the original speculation logic didn't include pending tasks either. On the other hand, if we keep get those long running tasks, at the end more executors would be required to run speculative tasks.

@tgravescs

This comment has been minimized.

Copy link
Contributor

tgravescs commented Nov 26, 2019

original speculation logic doesn't including pending because it can only use the time of successful tasks. Unless you are referring to something else.

I'm ok with any of these as they will be better then what we have now. If you look at running, it could change very quickly, because with dynamic allocation I might have 1 executor, start 4 tasks on it, but then some time later get another executor so then I no longer apply the timeout and use the original speculation logic

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

jiangxb1987 commented Nov 26, 2019

Ah good point, considering both the running and pending tasks would make the speculation strategy more stable (in the means once you entered the speculation task duration threshold branch, unless more tasks finished successfully, you would always choose the same speculation strategy in the next few scheduling iterations).

Also, if runningTasks + pendingTasks > SlotsPerExecutor, that means either too many tasks have been running, or we are waiting for extra executors to be launched, in both cases we'd better not hurry into adding more speculative tasks.

yuchenhuo added 3 commits Nov 27, 2019
@yuchenhuo

This comment has been minimized.

Copy link
Contributor Author

yuchenhuo commented Nov 27, 2019

@squito @tgravescs @jiangxb1987 Thanks for all the suggestions. I've updated the PR correspondingly.

@tgravescs

This comment has been minimized.

Copy link
Contributor

tgravescs commented Dec 2, 2019

test this please

@tgravescs

This comment has been minimized.

Copy link
Contributor

tgravescs commented Dec 2, 2019

so just want to confirm with @squito and @jiangxb1987 ,

It looks like what is implemented now is the number of unfinished <= number of slots on 1 executors and you don't hit the normal speculation logic:
val numUnfinishedTasks = numTasks - numSuccessfulTasks
val speculationTaskNumThres = conf.get(EXECUTOR_CORES) / conf.get(CPUS_PER_TASK)

based on the discussions above I thought we were going to do the number of total tasks <= number of slots on 1 executor to keep it simple, thoughts?

@yuchenhuo yuchenhuo changed the title [SPARK-29976][CORE] New conf for single task stage speculation [SPARK-29976][CORE] Trigger speculation for stages with too few tasks Dec 2, 2019
@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Dec 2, 2019

Test build #114734 has finished for PR 26614 at commit 1dcd5d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@jiangxb1987

This comment has been minimized.

Copy link
Contributor

jiangxb1987 commented Dec 3, 2019

We would enter the original speculation logic anyway when enough tasks have finished successfully, so the difference is only on the case when some of the tasks have succeed, but the number of successful tasks is less than minFinishedForSpeculation.
For example, you may have 4 slots on each executor, the currently running taskSet has 5 tasks, 2 of which has succeed, 1 task is running, and the rest 2 tasks are pending. In this case when the task running time has exceed the speculationTaskDurationThresOpt.get then I think it should be reasonable to submit a speculative task, because the risk that it would consume too much resources is low, and it could possibly resolve the potential task hang issue (like the scenario described in the JIRA).

@tgravescs

This comment has been minimized.

Copy link
Contributor

tgravescs commented Dec 3, 2019

Right, I get that it still uses regular speculation logic if enough have finished, my concerns are confusion to the user or it kicking in when users don't want it to.

Lets say I set both speculation policies because I have different stages with different requirements. I have one stage with 1 task that is problematic, the speculationTaskDurationThresOpt will also be applied to all my other stages that I configured the normal spark speculation configs for. If the speculationTaskDurationThresOpt is something that could is widely different for different stages then its harder to configure this way and can kick in when I don't want it to or when I don't expect it to. the normal speculation configs are based on a multiplier of other task time, this is a just a hardcoded timeout. so lets say my normal speculation config multiplier would kick in only after an hour and my speculationTaskDurationThresOpt is set to 15 minutes. I'm going to start speculating a lot more when the unfinished gets below that threshold.

I totally get that this perhaps covers more scenarios which in my opinion is good and bad as shown above. I was thinking keeping this simple for now and just having it apply if total tasks <= slots on 1 executor. That should be very easy for user to understand and know when it will apply. It solves the issue reported in this jira. If we start to find more specific cases we want to get smarter then we can enhance it later.

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

jiangxb1987 commented Dec 3, 2019

I think I get your concern now, we might have two stages running concurrently, the expected task duration for the first stage could be 15mins and for the second stage it could be 1hr. Thus if we set the speculationTaskDurationThresOpt to 30mins then tasks from the second stage would all get speculated which is not desired.

However I don't see why this is related to comparing the speculationTaskDurationThresOpt with unfinished tasks versus total tasks. Even if we choose total tasks instead of unfinished tasks, it can still happen that one stage contains only one task, but the task duration is actually expected to be longer than speculationTaskDurationThresOpt, then a speculative task shall get launched anyway.

@tgravescs

This comment has been minimized.

Copy link
Contributor

tgravescs commented Dec 4, 2019

it minimizes impact and makes it predictable when the new speculationTaskDurationThresOpt is applied. If you only apply it when the number of tasks is small < number of slots per executors, its easier to reason about, if it can apply during any stage then I need to worry about it being applied to my large stages even if I configured the other speculation configs to be what I really want it to use.

I agree with you that if you have 2 stages of 1 task each picking the timeout here can be tricky, which is why the normal speculation configs use a multiple of the run time. You can't do that with only 1 task though. But I don't see how to get around that.

My point is with using the unfinished, it now expands that same impact to not only stages with 1 task but all my stages.

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

jiangxb1987 commented Dec 4, 2019

Now I agree, we shall be conservative on the behavior change and limit it only on "small" TaskSet (that contains less tasks than the slots per executor), thus we'd better use numTasks instead of numUnfinishedTasks when comparing with speculationTaskNumThres.

@yuchenhuo

This comment has been minimized.

Copy link
Contributor Author

yuchenhuo commented Dec 6, 2019

@tgravescs @jiangxb1987 Thanks for the feedbacks! I've updated the PR corresponding to the discussion. May I get another review?

@tgravescs

This comment has been minimized.

Copy link
Contributor

tgravescs commented Dec 6, 2019

test this please

Task duration after which scheduler would try to speculative run the task. If provided, tasks
would be speculatively run if current stage contains less tasks than the number of slots on a
single executor and the task is taking longer time than the threshold. This config helps
speculate stage with very few tasks.

This comment has been minimized.

Copy link
@tgravescs

tgravescs Dec 6, 2019

Contributor

it might be nice ot add a sentence that the regular other speculation configs may also apply if executor slots large enough

This comment has been minimized.

Copy link
@tgravescs

tgravescs Dec 6, 2019

Contributor

it also might be nice to say default unit is milliseconds if unit not specified.

// speculative run based on the time threshold. SPARK-29976: We set this value to be the number
// of slots on a single executor so that we wouldn't speculate too aggressively but still
// handle basic cases.
val speculationTaskNumThres = conf.get(EXECUTOR_CORES) / conf.get(CPUS_PER_TASK)

This comment has been minimized.

Copy link
@tgravescs

tgravescs Dec 6, 2019

Contributor

use sched.CPUS_PER_TASK instead of conf.get(CPUS_PER_TASK).

}
foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
}
} else if (speculationTaskDurationThresOpt.isDefined && numTasks <= speculationTaskNumThres) {

This comment has been minimized.

Copy link
@tgravescs

tgravescs Dec 6, 2019

Contributor

we can do the comparison numTasks <= speculationTaskNumThres once when taskSetManager created, the numTasks isn't changing in the TaskSet so do it once at top, then we don't even need speculationTaskNumThres

sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, "60min")
}
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
// Create a task set with only one task

This comment has been minimized.

Copy link
@tgravescs

tgravescs Dec 6, 2019

Contributor

remove comment since numTasks passed in

assert(!manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.size == numTasks)
} else {
// If the feature flag is turned off, or the stage contains too few tasks

This comment has been minimized.

Copy link
@tgravescs

tgravescs Dec 6, 2019

Contributor

I think you mean to many tasks

test("SPARK-29976 when a speculation time threshold is provided, should not speculative " +
"if there are too many tasks in the stage even though time threshold is provided") {
testSpeculationDurationThreshold(true, 2, 1)
}

This comment has been minimized.

Copy link
@tgravescs

tgravescs Dec 6, 2019

Contributor

it would be nice to add anther test here that that test interaction of the speculative configs. Meaning I have both the threshold set and the speculation quantile is smaller, the threshold can still apply and vice versa, the quantile can still apply.

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Dec 6, 2019

Test build #114947 has finished for PR 26614 at commit bf22446.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
Copy link
Contributor

tgravescs left a comment

changes look good, thanks @yuchenhuo

@asfgit asfgit closed this in ad238a2 Dec 10, 2019
@squito

This comment has been minimized.

Copy link
Contributor

squito commented Dec 11, 2019

sorry for the delays on my end, a late lgtm from me too. Thanks @yuchenhuo !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants
You can’t perform that action at this time.