-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10137][Streaming]Avoid to restart receivers if scheduleReceivers returns balanced results #8340
Conversation
Test build #41322 has finished for PR 8340 at commit
|
val minWeight = sortedExecutors(0)._2 | ||
scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1) | ||
} else { | ||
// This should not happen since "executors" is not empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you talking about executorWeights no being empty? And why it cannot be empty? Is it because that by the time
rescheduleReceiveris called, all the receivers have already been scheduling by
scheduleReceivers` which means that there are some weights for at some executors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I mean, because idleExecutors is empty and executors is not empty, since sortedExecutors.keys == executors - idleExecutors
, sortedExecutors must be not empty.
…ed executors to check it and don't call "rescheduleReceiver"
Test build #41370 has finished for PR 8340 at commit
|
Could you update the main text of this PR and the JIRA to document the main change? This is a significant change and good to document. Also link this JIRA with the original receiver scheduling JIRA. |
@@ -431,7 +450,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false | |||
receiver.preferredLocation, | |||
receiverTrackingInfos, | |||
getExecutors) | |||
updateReceiverScheduledExecutors(receiver.streamId, scheduledExecutors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not update the scheduled info in case of rescheduleReceiver
? Why have these two different code paths / policies in two cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing explained me this offline. In case of resecheduling, the scheduled executors are not stored so that the following scenario does not occur.
- Initial globally-optimal schedule is stored, but one receiver gets launched incorrectly.
- The receiver is rejected and therefore has to be rescheduled, but if the rescheduled location (which is locally-optimal for that receiver) is saved, it will overwrite the original globally optimal location, and will get launched somewhere else that does not ensure the proper global balancing.
Test build #41452 has finished for PR 8340 at commit
|
Test build #41455 has finished for PR 8340 at commit
|
Jenkins, test this please. |
Test build #41464 has finished for PR 8340 at commit
|
return false | ||
} | ||
|
||
val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be more intuitive rewrite this nested if to the following:
val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
val accetableExecutors = if (scheduledExecutors.nonEmpty) {
scheduledExecutors
} else {
scheduleReceiver(streamId).contains(hostPort)
}
if (!accetableExecutors.contains(hostPort)) {
false
} else {
// existing code to update ReceiverTrackingInfo
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
Test build #41502 has finished for PR 8340 at commit
|
…vers returns balanced results This PR fixes the following cases for `ReceiverSchedulingPolicy`. 1) Assume there are 4 executors: host1, host2, host3, host4, and 5 receivers: r1, r2, r3, r4, r5. Then `ReceiverSchedulingPolicy.scheduleReceivers` will return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1). Let's assume r1 starts at first on `host1` as `scheduleReceivers` suggested, and try to register with ReceiverTracker. But the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will return (host2, host3, host4) according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 0.5, host4 -> 0.5), so ReceiverTracker will reject `r1`. This is unexpected since r1 is starting exactly where `scheduleReceivers` suggested. This case can be fixed by ignoring the information of the receiver that is rescheduling in `receiverTrackingInfoMap`. 2) Assume there are 3 executors (host1, host2, host3) and each executors has 3 cores, and 3 receivers: r1, r2, r3. Assume r1 is running on host1. Now r2 is restarting, the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will always return (host1, host2, host3). So it's possible that r2 will be scheduled to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that there are 3 receivers running on host1, while host2 and host3 are idle. This issue can be fixed by returning only executors that have the minimum wight rather than returning at least 3 executors. Author: zsxwing <zsxwing@gmail.com> Closes #8340 from zsxwing/fix-receiver-scheduling. (cherry picked from commit f023aa2) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
This PR fixes the following cases for
ReceiverSchedulingPolicy
.ReceiverSchedulingPolicy.scheduleReceivers
will return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1).Let's assume r1 starts at first on
host1
asscheduleReceivers
suggested, and try to register with ReceiverTracker. But the previousReceiverSchedulingPolicy.rescheduleReceiver
will return (host2, host3, host4) according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 0.5, host4 -> 0.5), so ReceiverTracker will rejectr1
. This is unexpected since r1 is starting exactly wherescheduleReceivers
suggested.This case can be fixed by ignoring the information of the receiver that is rescheduling in
receiverTrackingInfoMap
.ReceiverSchedulingPolicy.rescheduleReceiver
will always return (host1, host2, host3). So it's possible that r2 will be scheduled to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that there are 3 receivers running on host1, while host2 and host3 are idle.This issue can be fixed by returning only executors that have the minimum wight rather than returning at least 3 executors.