Skip to content

Commit

Permalink
[SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleRecei…
Browse files Browse the repository at this point in the history
…vers returns balanced results

This PR fixes the following cases for `ReceiverSchedulingPolicy`.

1) Assume there are 4 executors: host1, host2, host3, host4, and 5 receivers: r1, r2, r3, r4, r5. Then `ReceiverSchedulingPolicy.scheduleReceivers` will return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1).
Let's assume r1 starts at first on `host1` as `scheduleReceivers` suggested,  and try to register with ReceiverTracker. But the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will return (host2, host3, host4) according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 0.5, host4 -> 0.5), so ReceiverTracker will reject `r1`. This is unexpected since r1 is starting exactly where `scheduleReceivers` suggested.

This case can be fixed by ignoring the information of the receiver that is rescheduling in `receiverTrackingInfoMap`.

2) Assume there are 3 executors (host1, host2, host3) and each executors has 3 cores, and 3 receivers: r1, r2, r3. Assume r1 is running on host1. Now r2 is restarting, the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will always return (host1, host2, host3). So it's possible that r2 will be scheduled to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that there are 3 receivers running on host1, while host2 and host3 are idle.

This issue can be fixed by returning only executors that have the minimum wight rather than returning at least 3 executors.

Author: zsxwing <zsxwing@gmail.com>

Closes #8340 from zsxwing/fix-receiver-scheduling.
  • Loading branch information
zsxwing authored and tdas committed Aug 25, 2015
1 parent d9c25de commit f023aa2
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,36 @@ import scala.collection.mutable

import org.apache.spark.streaming.receiver.Receiver

/**
* A class that tries to schedule receivers with evenly distributed. There are two phases for
* scheduling receivers.
*
* - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
* all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
* It will try to schedule receivers with evenly distributed. ReceiverTracker should update its
* receiverTrackingInfoMap according to the results of `scheduleReceivers`.
* `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that
* contains the scheduled locations. Then when a receiver is starting, it will send a register
* request and `ReceiverTracker.registerReceiver` will be called. In
* `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check
* if the location of this receiver is one of the scheduled executors, if not, the register will
* be rejected.
* - The second phase is local scheduling when a receiver is restarting. There are two cases of
* receiver restarting:
* - If a receiver is restarting because it's rejected due to the real location and the scheduled
* executors mismatching, in other words, it fails to start in one of the locations that
* `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
* still alive in the list of scheduled executors, then use them to launch the receiver job.
* - If a receiver is restarting without a scheduled executors list, or the executors in the list
* are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
* not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear
* it. Then when this receiver is registering, we can know this is a local scheduling, and
* `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
* location is matching.
*
* In conclusion, we should make a global schedule, try to achieve that exactly as long as possible,
* otherwise do local scheduling.
*/
private[streaming] class ReceiverSchedulingPolicy {

/**
Expand Down Expand Up @@ -102,8 +132,7 @@ private[streaming] class ReceiverSchedulingPolicy {

/**
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can
* run this receiver in arbitrary executor. The caller can use `preferredNumExecutors` to require
* returning `preferredNumExecutors` executors if possible.
* run this receiver in arbitrary executor.
*
* This method tries to balance executors' load. Here is the approach to schedule executors
* for a receiver.
Expand All @@ -122,9 +151,8 @@ private[streaming] class ReceiverSchedulingPolicy {
* If a receiver is scheduled to an executor but has not yet run, it contributes
* `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
* </ul>
* At last, if there are more than `preferredNumExecutors` idle executors (weight = 0),
* returns all idle executors. Otherwise, we only return `preferredNumExecutors` best options
* according to the weights.
* At last, if there are any idle executors (weight = 0), returns all idle executors.
* Otherwise, returns the executors that have the minimum weight.
* </li>
* </ol>
*
Expand All @@ -134,8 +162,7 @@ private[streaming] class ReceiverSchedulingPolicy {
receiverId: Int,
preferredLocation: Option[String],
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
executors: Seq[String],
preferredNumExecutors: Int = 3): Seq[String] = {
executors: Seq[String]): Seq[String] = {
if (executors.isEmpty) {
return Seq.empty
}
Expand All @@ -156,15 +183,18 @@ private[streaming] class ReceiverSchedulingPolicy {
}
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor

val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq
if (idleExecutors.size >= preferredNumExecutors) {
// If there are more than `preferredNumExecutors` idle executors, return all of them
val idleExecutors = executors.toSet -- executorWeights.keys
if (idleExecutors.nonEmpty) {
scheduledExecutors ++= idleExecutors
} else {
// If there are less than `preferredNumExecutors` idle executors, return 3 best options
scheduledExecutors ++= idleExecutors
val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1)
scheduledExecutors ++= (idleExecutors ++ sortedExecutors).take(preferredNumExecutors)
// There is no idle executor. So select all executors that have the minimum weight.
val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
if (sortedExecutors.nonEmpty) {
val minWeight = sortedExecutors(0)._2
scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
} else {
// This should not happen since "executors" is not empty
}
}
scheduledExecutors.toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,21 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}

if (isTrackerStopping || isTrackerStopped) {
false
} else if (!scheduleReceiver(streamId).contains(hostPort)) {
return false
}

val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
val accetableExecutors = if (scheduledExecutors.nonEmpty) {
// This receiver is registering and it's scheduled by
// ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it.
scheduledExecutors.get
} else {
// This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
// "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
scheduleReceiver(streamId)
}

if (!accetableExecutors.contains(hostPort)) {
// Refuse it since it's scheduled to a wrong executor
false
} else {
Expand Down Expand Up @@ -426,12 +439,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
startReceiver(receiver, executors)
}
case RestartReceiver(receiver) =>
val scheduledExecutors = schedulingPolicy.rescheduleReceiver(
receiver.streamId,
receiver.preferredLocation,
receiverTrackingInfos,
getExecutors)
updateReceiverScheduledExecutors(receiver.streamId, scheduledExecutors)
// Old scheduled executors minus the ones that are not active any more
val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) {
// Try global scheduling again
oldScheduledExecutors
} else {
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
// Clear "scheduledExecutors" to indicate we are going to do local scheduling
val newReceiverInfo = oldReceiverInfo.copy(
state = ReceiverState.INACTIVE, scheduledExecutors = None)
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
schedulingPolicy.rescheduleReceiver(
receiver.streamId,
receiver.preferredLocation,
receiverTrackingInfos,
getExecutors)
}
// Assume there is one receiver restarting at one time, so we don't need to update
// receiverTrackingInfos
startReceiver(receiver, scheduledExecutors)
case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
Expand Down Expand Up @@ -464,6 +490,24 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
context.reply(true)
}

/**
* Return the stored scheduled executors that are still alive.
*/
private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = {
if (receiverTrackingInfos.contains(receiverId)) {
val scheduledExecutors = receiverTrackingInfos(receiverId).scheduledExecutors
if (scheduledExecutors.nonEmpty) {
val executors = getExecutors.toSet
// Only return the alive executors
scheduledExecutors.get.filter(executors)
} else {
Nil
}
} else {
Nil
}
}

/**
* Start a receiver along with its scheduled executors
*/
Expand All @@ -484,7 +528,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

// Function to start the receiver on the worker node
val startReceiverFunc = new StartReceiverFunc(checkpointDirOption, serializableHadoopConf)
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}

// Create the RDD using the scheduledExecutors to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
Expand Down Expand Up @@ -541,31 +601,3 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}

}

/**
* Function to start the receiver on the worker node. Use a class instead of closure to avoid
* the serialization issue.
*/
private[streaming] class StartReceiverFunc(
checkpointDirOption: Option[String],
serializableHadoopConf: SerializableConfiguration)
extends (Iterator[Receiver[_]] => Unit) with Serializable {

override def apply(iterator: Iterator[Receiver[_]]): Unit = {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(scheduledExecutors.toSet === Set("host1", "host2"))
}

test("rescheduleReceiver: return all idle executors if more than 3 idle executors") {
test("rescheduleReceiver: return all idle executors if there are any idle executors") {
val executors = Seq("host1", "host2", "host3", "host4", "host5")
// host3 is idle
val receiverTrackingInfoMap = Map(
Expand All @@ -49,16 +49,16 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
}

test("rescheduleReceiver: return 3 best options if less than 3 idle executors") {
test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") {
val executors = Seq("host1", "host2", "host3", "host4", "host5")
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0
// host4 and host5 are idle
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
val receiverTrackingInfoMap = Map(
0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
2 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None))
2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None),
3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None))
val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
3, None, receiverTrackingInfoMap, executors)
4, None, receiverTrackingInfoMap, executors)
assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
}

Expand Down Expand Up @@ -127,4 +127,5 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(executors.isEmpty)
}
}

}

0 comments on commit f023aa2

Please sign in to comment.