Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23888][CORE] correct the comment of hasAttemptOnHost() #20998

Closed
wants to merge 6 commits into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Apr 6, 2018

What changes were proposed in this pull request?

There's a bug in:

/** Check whether a task is currently running an attempt on a given host */
 private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
   taskAttempts(taskIndex).exists(_.host == host)
 }

This will ignore hosts which only have finished attempts, so we should check whether the attempt is currently running on the given host.

And it is possible for a speculative task to run on a host where another attempt failed here before.
Assume we have only two machines: host1, host2. We first run task0.0 on host1. Then, due to a long time waiting for task0.0, we launch a speculative task0.1 on host2. And, task0.1 finally failed on host1, but it can not re-run since there's already a copy running on host2. After another long time waiting, we launch a new speculative task0.2. And, now, we can run task0.2 on host1 again, since there's no more running attempt on host1.


After discussion, we simply make the comment be consistent the method's behavior.

How was this patch tested?

Added.

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 6, 2018

ping @pwendell @kayousterhout . pls help review, thanks :)

@felixcheung
Copy link
Member

Jenkins, ok to test

@felixcheung
Copy link
Member

felixcheung commented Apr 8, 2018

sounds fair, but shouldn't this be up to the scheduler backend? multiple tasks/attempts can run simultaneously on the same physical host?

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 8, 2018

Hi, @felixcheung , thank for trigger a test and your comments.

shouldn't this be up to the scheduler backend?

Actually, it is TaskSchedulerImpl who holds a thread to check whether there are any speculative tasks need to be scheduled periodically. If any, then, call backend.reviveOffers to offer resources . But, it's TaskSetManager who decides whether we need to launch a speculative task for a certain task.

multiple tasks/attempts can run simultaneously on the same physical host?

I think multiple task attempts(actually, speculative tasks) can run on the sam physical host, but not simultaneously, as long as there's no running attempt on it. In PR description, I illustrate a case in which a speculative task chose to run on a host, where a previous task attempts have been run on, but failed finally. I think if the task's failure is not relevant to the host, 'run on the same host' can be acceptable.

@SparkQA
Copy link

SparkQA commented Apr 8, 2018

Test build #89026 has finished for PR 20998 at commit 3584a09.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Apr 9, 2018

Adding isRunning can cause a single 'bad' node (from task pov - not necessarily only bad hardware: just that task fails on node) can keep tasks to fail repeatedly causing app to exit.

Particularly with blacklist'ing, I am not very sure how the interactions will play out .. @squito might have more comments.
In general, this is not a benign change imo and can have non trivial side effects.

In the specific usecase of only two machines, it is an unfortunate side effect.

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 10, 2018

Hi, @mridulm, thank for your comment. Actually, I have the same worry with you. May be we can make this change as a second choice for hasAttemptOnHost , in case of there's really no other hosts to select.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change certainly makes it agree with the comment, so I think we should either make this change, or change the comment.

Blacklisting should still work as expected. dequeueSpeculativeTask also checks the blacklist, so if host1 is blacklisted, you'll still skip it. But, with blacklisting off, its more significant change. Even on a large cluster, I can imagine this happening most of the time when the non-speculative task fails, due to locality preferences.

Basically there is a behavior choice: Should a speculative task ever be allowed to run on a host where the task has failed previously? I think it should, as that is better handled by blacklisting.

// there's already a running copy.
clock.advance(1000)
info1.finishTime = clock.getTimeMillis()
assert(info1.running === false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(!info1.running)

// no more running copy of task0
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
val info3 = manager.taskAttempts(0)(0)
assert(info3.running === true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(info3.running)

// after a long long time, task0.0 failed, and task0.0 can not re-run since
// there's already a running copy.
clock.advance(1000)
info1.finishTime = clock.getTimeMillis()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be better here for you to call manager.handleFailedTask, to more accurately simulate the real behavior, and also makes the purpose of a test a little more clear.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice suggestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't need to set info.finishTime anymore, that should be taken care of by manager.handleFailedTask.

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 11, 2018

Hi, @squito . Thank for review and comments.

@SparkQA
Copy link

SparkQA commented Apr 11, 2018

Test build #89164 has finished for PR 20998 at commit 2ed9584.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Apr 11, 2018

@mridulm more thoughts? I think this is the right change but I will leave open for a bit to get more input

// after a long long time, task0.0 failed, and task0.0 can not re-run since
// there's already a running copy.
clock.advance(1000)
info1.finishTime = clock.getTimeMillis()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you shouldn't need to set info.finishTime anymore, that should be taken care of by manager.handleFailedTask.

test("speculative task should not run on a given host where another attempt " +
"is already running on") {
test("SPARK-23888: speculative task should not run on a given host " +
"where another attempt is already running on") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd reword this to be a bit more specific to what you're trying to test:

speculative task cannot run on host with another running attempt, but can run on a host with a failed attempt.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Also, do we need to reword PR and jira title? @squito

@mridulm
Copy link
Contributor

mridulm commented Apr 11, 2018

@squito My concern is, in large workloads, some nodes simply become bad for some tasks (transient env or hardware issues, colocating containers, etc) while being fine for others; speculative tasks should alleviate performance concerns and not increase chances of application failure due to locality preference affinity. For cluster sizes which are very small, speculative execution is less relevant than for those which are large - and here we are tuning for the former.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89228 has finished for PR 20998 at commit 5901728.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Apr 13, 2018

I'm not even really concerned about the case for two hosts -- I agree its fine if we do something sub-optimal. I'm more concerned about code-clarity and the behavior in general. It seems cleaner to me if speculation doesn't worry about where its failed before, and those exclusions are left to the blacklist.

But it sounds like you're saying the prior behavior was really desirable -- you think its better if speculation always excludes hosts that task has ever failed on? I'm happy to defer to your opinion on this, I haven't really stressed speculative execution yet. Then lets just change that comment in the code to be consistent.

@mridulm
Copy link
Contributor

mridulm commented Apr 13, 2018

@squito I completely agree that the comment is inaccurate.
Note that this is for a specific taskset, so impact is limited to that taskset (w.r.t using executors for spec exec)

@squito
Copy link
Contributor

squito commented Apr 17, 2018

@Ngone51 can you instead leave the behavior as is, and just update the comment?

Sorry that its going to be a small change in the end, and all the extra work the bad comments led you to do, but still appreciate you noticing this and fixing. a good PR with a quality test too.

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 17, 2018

Will do, and it's okay.
My view limited in the source code yet, but you guys have more practical experience. So I learned from your points. It's beneficial.

@Ngone51 Ngone51 changed the title [SPARK-23888][CORE] speculative task should not run on a given host where another attempt is already running on [SPARK-23888][CORE] correct the comment of hasAttemptOnHost() Apr 17, 2018
@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89460 has finished for PR 20998 at commit 0c6f305.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

LGTM

@@ -287,7 +287,7 @@ private[spark] class TaskSetManager(
None
}

/** Check whether a task is currently running an attempt on a given host */
/** Check whether a task once run an attempt on a given host */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be "once ran"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Thank you.

@SparkQA
Copy link

SparkQA commented Apr 23, 2018

Test build #89728 has finished for PR 20998 at commit e44d80b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Apr 23, 2018

merged to master, thanks @Ngone51 . I also updated the commit msg some before committing, I thought it best to focus on the eventual change, figured it wasn't worth bugging you for another update cycle.

@asfgit asfgit closed this in c8f3ac6 Apr 23, 2018
@Ngone51
Copy link
Member Author

Ngone51 commented Apr 24, 2018

Agree and thank you @squito .

And thanks for all of you. @felixcheung @mridulm @jiangxb1987 @srowen

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants