Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10137][Streaming]Avoid to restart receivers if scheduleReceivers returns balanced results #8340

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,36 @@ import scala.collection.mutable

import org.apache.spark.streaming.receiver.Receiver

/**
* A class that tries to schedule receivers with evenly distributed. There are two phases for
* scheduling receivers.
*
* - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
* all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
* It will try to schedule receivers with evenly distributed. ReceiverTracker should update its
* receiverTrackingInfoMap according to the results of `scheduleReceivers`.
* `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that
* contains the scheduled locations. Then when a receiver is starting, it will send a register
* request and `ReceiverTracker.registerReceiver` will be called. In
* `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check
* if the location of this receiver is one of the scheduled executors, if not, the register will
* be rejected.
* - The second phase is local scheduling when a receiver is restarting. There are two cases of
* receiver restarting:
* - If a receiver is restarting because it's rejected due to the real location and the scheduled
* executors mismatching, in other words, it fails to start in one of the locations that
* `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
* still alive in the list of scheduled executors, then use them to launch the receiver job.
* - If a receiver is restarting without a scheduled executors list, or the executors in the list
* are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
* not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear
* it. Then when this receiver is registering, we can know this is a local scheduling, and
* `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
* location is matching.
*
* In conclusion, we should make a global schedule, try to achieve that exactly as long as possible,
* otherwise do local scheduling.
*/
private[streaming] class ReceiverSchedulingPolicy {

/**
Expand Down Expand Up @@ -102,8 +132,7 @@ private[streaming] class ReceiverSchedulingPolicy {

/**
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can
* run this receiver in arbitrary executor. The caller can use `preferredNumExecutors` to require
* returning `preferredNumExecutors` executors if possible.
* run this receiver in arbitrary executor.
*
* This method tries to balance executors' load. Here is the approach to schedule executors
* for a receiver.
Expand All @@ -122,9 +151,8 @@ private[streaming] class ReceiverSchedulingPolicy {
* If a receiver is scheduled to an executor but has not yet run, it contributes
* `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
* </ul>
* At last, if there are more than `preferredNumExecutors` idle executors (weight = 0),
* returns all idle executors. Otherwise, we only return `preferredNumExecutors` best options
* according to the weights.
* At last, if there are any idle executors (weight = 0), returns all idle executors.
* Otherwise, returns the executors that have the minimum weight.
* </li>
* </ol>
*
Expand All @@ -134,8 +162,7 @@ private[streaming] class ReceiverSchedulingPolicy {
receiverId: Int,
preferredLocation: Option[String],
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
executors: Seq[String],
preferredNumExecutors: Int = 3): Seq[String] = {
executors: Seq[String]): Seq[String] = {
if (executors.isEmpty) {
return Seq.empty
}
Expand All @@ -156,15 +183,18 @@ private[streaming] class ReceiverSchedulingPolicy {
}
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor

val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq
if (idleExecutors.size >= preferredNumExecutors) {
// If there are more than `preferredNumExecutors` idle executors, return all of them
val idleExecutors = executors.toSet -- executorWeights.keys
if (idleExecutors.nonEmpty) {
scheduledExecutors ++= idleExecutors
} else {
// If there are less than `preferredNumExecutors` idle executors, return 3 best options
scheduledExecutors ++= idleExecutors
val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1)
scheduledExecutors ++= (idleExecutors ++ sortedExecutors).take(preferredNumExecutors)
// There is no idle executor. So select all executors that have the minimum weight.
val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
if (sortedExecutors.nonEmpty) {
val minWeight = sortedExecutors(0)._2
scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
} else {
// This should not happen since "executors" is not empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about executorWeights no being empty? And why it cannot be empty? Is it because that by the timerescheduleReceiveris called, all the receivers have already been scheduling byscheduleReceivers` which means that there are some weights for at some executors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I mean, because idleExecutors is empty and executors is not empty, since sortedExecutors.keys == executors - idleExecutors, sortedExecutors must be not empty.

}
}
scheduledExecutors.toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,21 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}

if (isTrackerStopping || isTrackerStopped) {
false
} else if (!scheduleReceiver(streamId).contains(hostPort)) {
return false
}

val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be more intuitive rewrite this nested if to the following:

val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
val accetableExecutors = if (scheduledExecutors.nonEmpty) {
  scheduledExecutors 
} else {
  scheduleReceiver(streamId).contains(hostPort)
}
if (!accetableExecutors.contains(hostPort)) {
  false
} else {
  // existing code to update ReceiverTrackingInfo
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

val accetableExecutors = if (scheduledExecutors.nonEmpty) {
// This receiver is registering and it's scheduled by
// ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it.
scheduledExecutors.get
} else {
// This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
// "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
scheduleReceiver(streamId)
}

if (!accetableExecutors.contains(hostPort)) {
// Refuse it since it's scheduled to a wrong executor
false
} else {
Expand Down Expand Up @@ -426,12 +439,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
startReceiver(receiver, executors)
}
case RestartReceiver(receiver) =>
val scheduledExecutors = schedulingPolicy.rescheduleReceiver(
receiver.streamId,
receiver.preferredLocation,
receiverTrackingInfos,
getExecutors)
updateReceiverScheduledExecutors(receiver.streamId, scheduledExecutors)
// Old scheduled executors minus the ones that are not active any more
val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment say this is old scheduled executors minus the one that are not active any more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) {
// Try global scheduling again
oldScheduledExecutors
} else {
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
// Clear "scheduledExecutors" to indicate we are going to do local scheduling
val newReceiverInfo = oldReceiverInfo.copy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt it already marked INACTIVE when the previous jobs failed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. If registerReceiver return false, deregisterReceiver won't be called.

state = ReceiverState.INACTIVE, scheduledExecutors = None)
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
schedulingPolicy.rescheduleReceiver(
receiver.streamId,
receiver.preferredLocation,
receiverTrackingInfos,
getExecutors)
}
// Assume there is one receiver restarting at one time, so we don't need to update
// receiverTrackingInfos
startReceiver(receiver, scheduledExecutors)
case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
Expand Down Expand Up @@ -464,6 +490,24 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
context.reply(true)
}

/**
* Return the stored scheduled executors that are still alive.
*/
private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = {
if (receiverTrackingInfos.contains(receiverId)) {
val scheduledExecutors = receiverTrackingInfos(receiverId).scheduledExecutors
if (scheduledExecutors.nonEmpty) {
val executors = getExecutors.toSet
// Only return the alive executors
scheduledExecutors.get.filter(executors)
} else {
Nil
}
} else {
Nil
}
}

/**
* Start a receiver along with its scheduled executors
*/
Expand All @@ -484,7 +528,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

// Function to start the receiver on the worker node
val startReceiverFunc = new StartReceiverFunc(checkpointDirOption, serializableHadoopConf)
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}

// Create the RDD using the scheduledExecutors to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
Expand Down Expand Up @@ -541,31 +601,3 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}

}

/**
* Function to start the receiver on the worker node. Use a class instead of closure to avoid
* the serialization issue.
*/
private[streaming] class StartReceiverFunc(
checkpointDirOption: Option[String],
serializableHadoopConf: SerializableConfiguration)
extends (Iterator[Receiver[_]] => Unit) with Serializable {

override def apply(iterator: Iterator[Receiver[_]]): Unit = {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(scheduledExecutors.toSet === Set("host1", "host2"))
}

test("rescheduleReceiver: return all idle executors if more than 3 idle executors") {
test("rescheduleReceiver: return all idle executors if there are any idle executors") {
val executors = Seq("host1", "host2", "host3", "host4", "host5")
// host3 is idle
val receiverTrackingInfoMap = Map(
Expand All @@ -49,16 +49,16 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
}

test("rescheduleReceiver: return 3 best options if less than 3 idle executors") {
test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") {
val executors = Seq("host1", "host2", "host3", "host4", "host5")
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0
// host4 and host5 are idle
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
val receiverTrackingInfoMap = Map(
0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
2 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None))
2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None),
3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None))
val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
3, None, receiverTrackingInfoMap, executors)
4, None, receiverTrackingInfoMap, executors)
assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
}

Expand Down Expand Up @@ -127,4 +127,5 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(executors.isEmpty)
}
}

}