Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10137][Streaming]Avoid to restart receivers if scheduleReceivers returns balanced results #8340

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ private[streaming] class ReceiverSchedulingPolicy {

/**
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can
* run this receiver in arbitrary executor. The caller can use `preferredNumExecutors` to require
* returning `preferredNumExecutors` executors if possible.
* run this receiver in arbitrary executor.
*
* This method tries to balance executors' load. Here is the approach to schedule executors
* for a receiver.
Expand All @@ -122,9 +121,8 @@ private[streaming] class ReceiverSchedulingPolicy {
* If a receiver is scheduled to an executor but has not yet run, it contributes
* `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
* </ul>
* At last, if there are more than `preferredNumExecutors` idle executors (weight = 0),
* returns all idle executors. Otherwise, we only return `preferredNumExecutors` best options
* according to the weights.
* At last, if there are any idle executors (weight = 0), returns all idle executors.
* Otherwise, returns the executors that have the minimum weight.
* </li>
* </ol>
*
Expand All @@ -134,8 +132,7 @@ private[streaming] class ReceiverSchedulingPolicy {
receiverId: Int,
preferredLocation: Option[String],
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
executors: Seq[String],
preferredNumExecutors: Int = 3): Seq[String] = {
executors: Seq[String]): Seq[String] = {
if (executors.isEmpty) {
return Seq.empty
}
Expand All @@ -144,27 +141,31 @@ private[streaming] class ReceiverSchedulingPolicy {
val scheduledExecutors = mutable.Set[String]()
scheduledExecutors ++= preferredLocation

val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo =>
receiverTrackingInfo.state match {
case ReceiverState.INACTIVE => Nil
case ReceiverState.SCHEDULED =>
val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get
// The probability that a scheduled receiver will run in an executor is
// 1.0 / scheduledLocations.size
scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size))
case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
}
val executorWeights = receiverTrackingInfoMap.filter(_._1 != receiverId).values.flatMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an explanation of why we are filtering?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use the new approach. And since we don't call rescheduleReceiver when the receiver registers at the first time, it's not necessary to do filtering.

receiverTrackingInfo =>
receiverTrackingInfo.state match {
case ReceiverState.INACTIVE => Nil
case ReceiverState.SCHEDULED =>
val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get
// The probability that a scheduled receiver will run in an executor is
// 1.0 / scheduledLocations.size
scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size))
case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
}
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor

val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq
if (idleExecutors.size >= preferredNumExecutors) {
// If there are more than `preferredNumExecutors` idle executors, return all of them
val idleExecutors = executors.toSet -- executorWeights.keys
if (idleExecutors.nonEmpty) {
scheduledExecutors ++= idleExecutors
} else {
// If there are less than `preferredNumExecutors` idle executors, return 3 best options
scheduledExecutors ++= idleExecutors
val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1)
scheduledExecutors ++= (idleExecutors ++ sortedExecutors).take(preferredNumExecutors)
// There is no idle executor. So select all executors that have the minimum weight.
val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
if (sortedExecutors.nonEmpty) {
val minWeight = sortedExecutors(0)._2
scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
} else {
// This should not happen since "executors" is not empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about executorWeights no being empty? And why it cannot be empty? Is it because that by the timerescheduleReceiveris called, all the receivers have already been scheduling byscheduleReceivers` which means that there are some weights for at some executors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I mean, because idleExecutors is empty and executors is not empty, since sortedExecutors.keys == executors - idleExecutors, sortedExecutors must be not empty.

}
}
scheduledExecutors.toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(scheduledExecutors.toSet === Set("host1", "host2"))
}

test("rescheduleReceiver: return all idle executors if more than 3 idle executors") {
test("rescheduleReceiver: return all idle executors if there are any idle executors") {
val executors = Seq("host1", "host2", "host3", "host4", "host5")
// host3 is idle
val receiverTrackingInfoMap = Map(
Expand All @@ -49,16 +49,16 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
}

test("rescheduleReceiver: return 3 best options if less than 3 idle executors") {
test("rescheduleReceiver: return executors that have minimum weight if no idle executors") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'return all executors that have minimum weight`

val executors = Seq("host1", "host2", "host3", "host4", "host5")
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0
// host4 and host5 are idle
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
val receiverTrackingInfoMap = Map(
0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
2 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None))
2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None),
3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None))
val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
3, None, receiverTrackingInfoMap, executors)
4, None, receiverTrackingInfoMap, executors)
assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
}

Expand Down Expand Up @@ -127,4 +127,26 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(executors.isEmpty)
}
}

test("when scheduleReceivers return a balanced result, we should not restart receivers") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when scheduleReceivers return a balanced result, subsequent reschedule calls should return same location

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this test since we don't call rescheduleReceiver when the receiver registers at the first time.

val receivers = (0 until 6).map(new RateTestReceiver(_))
val executors = (10000 until 10005).map(port => s"localhost:${port}")
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
val receiverTrackingInfoMap = scheduledExecutors.map { case (receiverId, executors) =>
assert(executors.size === 1) // Each receiver has been assigned one executor
receiverId -> ReceiverTrackingInfo(receiverId, ReceiverState.SCHEDULED, Some(executors), None)
}
// "scheduledExecutors" has already been balanced, assume we launch receivers as
// "scheduledExecutors" suggested
for (receiverId <- receivers.map(_.streamId)) {
val scheduledExecutorsWhenRegistering = receiverSchedulingPolicy.rescheduleReceiver(
receiverId, None, receiverTrackingInfoMap, executors)
// Assume the receiver has been launched in the exact scheduled location
val runningLocation = scheduledExecutors(receiverId)(0)
// Since all receivers are launched as "scheduledExecutors" suggested, we should allow it to
// run in its current runningLocation, so "scheduledExecutorsWhenRegistering" should contains
// "runningLocation"
assert(scheduledExecutorsWhenRegistering.contains(runningLocation))
}
}
}