Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-24815] [CORE] Trigger Interval based DRA for Structured Streaming #42352

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
165 changes: 159 additions & 6 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Expand Up @@ -96,6 +96,30 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
* If an executor with caching data blocks has been idle for more than this duration,
* the executor will be removed
*
* Dynamic resource allocation is also extended to work for structured streaming use case.
* (micro-batch paradigm).
* For it to work we would still need the above configs + few additional configs.
*
* For executor allocation, In traditional DRA target number of executors are added based on the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why for batch queries does DRA only account for backlogged tasks per stage?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding in batch queries each stage can have varied resource requirements depending upon what it does. So DRA has schedulerBacklogTimeout to figure out when it should ask for more resources (more on it). So the pendingTasks are determined by the pending tasks of current stage. I have modified it to consider the pending tasks of other stages as well because structured streaming deals with micro-batches and we want to scale out if the there are still other stages pending in the same micro-batch.

for eg:

with current DRA code, if config spark.dynamicAllocation.schedulerBacklogTimeout is set to 6 seconds and we use that for structured streaming job where a micro-batch consists of 4 stages which will run at max for 5 seconds each.

Then it wouldn't scale out even if 20 seconds pass because it is just 5+5+5+5 = 30seconds.

But the above mentioned changes I have done, while running the second stage on the 6th second it figures out that other stages in the micro-batch are pending so it scale-out appropriately.

* backlogged tasks waiting to be scheduled per stage, now they are added based on tasks waiting
* to be scheduled per Job (micro-batch).
* So we still use the M and N seconds to add executors.
*
* The remove policy is same as earlier, but now only P percentage of the idle executors will be
* removed with a delay of Q seconds after a set of executors are marked for removal.
* This helps in slowly reducing the executors across micro-batches.
*
* The additional properties required to work for structured streaming are.
*
* spark.dynamicAllocation.streaming.enabled -
* Whether DRA for structured streaming is enabled
*
* spark.dynamicAllocation.streaming.executorDeallocationRatio (P) -
* Remove only P percentage of idle executors.
*
* spark.dynamicAllocation.streaming.executorDeallocationTimeout (Q) -
* After a set of executors are removed, wait for duration before removing more.
*
*/
private[spark] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
Expand Down Expand Up @@ -133,6 +157,16 @@ private[spark] class ExecutorAllocationManager(

private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id

private val streamingDRAFeatureEnabled = conf.get(DYN_ALLOCATION_STREAMING_ENABLED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to update the description of this class to include the streaming details and how the algorithm works.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Will add more info at class description. But will add more documentation with example in https://spark.apache.org/docs/latest/job-scheduling.html section.


private val executorDeallocationRatio =
conf.get(DYN_ALLOCATION_EXECUTOR_DEALLOCATION_RATIO)

private var removeTime: Long = NOT_SET

private val executorDeallocationTimeoutS =
conf.get(DYN_ALLOCATION_EXECUTOR_DEALLOCATION_TIMEOUT)

validateSettings()

// Number of executors to add for each ResourceProfile in the next round
Expand Down Expand Up @@ -204,6 +238,10 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
if (streamingDRAFeatureEnabled && executorDeallocationTimeoutS <= 0) {
throw new SparkException(
s"s${DYN_ALLOCATION_EXECUTOR_DEALLOCATION_TIMEOUT.key} must be > 0!")
}
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !reliableShuffleStorage) {
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
logInfo("Dynamic allocation is enabled without a shuffle service.")
Expand All @@ -220,6 +258,12 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(
s"${DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO.key} must be > 0 and <= 1.0")
}

if (streamingDRAFeatureEnabled &&
(executorDeallocationRatio > 1.0 || executorDeallocationRatio <= 0.0)) {
throw new SparkException(
s"${DYN_ALLOCATION_EXECUTOR_DEALLOCATION_RATIO.key} must be > 0 and <= 1.0")
}
}

/**
Expand Down Expand Up @@ -328,7 +372,8 @@ private[spark] class ExecutorAllocationManager(
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val executorIdsToBeRemoved = executorMonitor.timedOutExecutors()
val idleExecutorsIds = executorMonitor.timedOutExecutors()
val executorIdsToBeRemoved = maxExecutorDeallocationsPerEvaluation(idleExecutorsIds)
if (executorIdsToBeRemoved.nonEmpty) {
initializing = false
}
Expand All @@ -340,6 +385,45 @@ private[spark] class ExecutorAllocationManager(
}
}

/**
* Maximum number of executors to be removed per dra evaluation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is something like this not relevant for batch queries?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be useful for batch jobs as well. Like allocation ratio, de-allocation ratio are also relevant in scaling-out/back only few executors at a time. Especially in cases where Stages of a batch job has alternating high/low tasks.

I restricted this behind streamingDRAFeatureEnabled initially for feature distinction. I can modify the code to have this as a config which can be used in traditional DRA as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could potentially be relevant to batch queries, but the algorithm for increasing the number of executors is also supposed to help not allocate to many in the first place. Which also isn't ideal for some workloads. I would say if you have suggestions and data proving the some drain algorithm works better then the existing then we should make it configurable and add it. Removing and added executors can be very application dependent. you could remove something and then need it a few seconds later... you could have a long tail task that takes an hour longer then everything else and removing those asap is going to save you money/get better cluster utilization. Streaming case is naturally going to be different from many batch workloads.

I do think it should be a separate pr and discussion that adds it to batch though.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this potential feature.

Sounds fair. Will limit these configs just for streaming use-case.

I do think it should be a separate pr and discussion that adds it to batch though.

Sure. Maybe in future there are people requesting it for batch, we can change this.

*
* This function limits the number of idle executors to be removed if the
* `streamingDRAFeatureEnabled` flag is enabled, otherwise it returns all the idle executors.
* The number of executors to be removed is based on
* the de-allocation ratio (`executorDeallocationRatio`) and timeout (`removeTime`).
* It helps in removing only few executors per evaluation cycle and helps in gradual removal of
* executors across micro-batches.
*/
private def maxExecutorDeallocationsPerEvaluation(
timedOutExecs : Seq[(String, Int)]
): Seq[(String, Int)] = {
if (!streamingDRAFeatureEnabled) {
timedOutExecs
}
else {
val currentTime = clock.nanoTime()

if (removeTime == NOT_SET) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove all the timedOutExecs when removeTime is not set? Doesn't this mean for the first evaluation of this function when removeTime is not set, we will remove all timeOutExecs and NOT adhere to the configured "executorDeallocationRatio"?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I will update it to Seq.empty[(String, Int)]. Every evaluation happens at a fixed rate of few hundred milliseconds. So all those executors will be evaluated in the next round

removeTime = currentTime
Seq.empty[(String, Int)]
}
else if (removeTime < currentTime && timedOutExecs.nonEmpty) {
val deallocationLimit = Math.ceil(timedOutExecs.size * executorDeallocationRatio).toInt
val executorsToDeallocate = timedOutExecs.take(deallocationLimit)
logInfo(s"$executorsToDeallocate will be removed " +
s"out of $timedOutExecs idle executors in this evaluation " +
s"based on the deallocation ratio: $executorDeallocationRatio and " +
s"deallocation timeout of: $executorDeallocationTimeoutS seconds")
removeTime = clock.nanoTime() + TimeUnit.SECONDS.toNanos(executorDeallocationTimeoutS)
executorsToDeallocate
}
else {
Seq.empty[(String, Int)]
}
}
}

/**
* Updates our target number of executors for each ResourceProfile and then syncs the result
* with the cluster manager.
Expand Down Expand Up @@ -631,6 +715,11 @@ private[spark] class ExecutorAllocationManager(
override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
}

private case class StageDetails(jobId: Int, stageId: Int, numTasks: Int, resourceProfileId: Int) {
override def toString: String = s"StageDetails(id: $stageId, jobId: $jobId," +
s" numTasks: $numTasks, rpId: $resourceProfileId)"
}

/**
* A listener that notifies the given allocation manager of when to add and remove executors.
*
Expand Down Expand Up @@ -669,6 +758,40 @@ private[spark] class ExecutorAllocationManager(
private val stageAttemptToExecutorPlacementHints =
new mutable.HashMap[StageAttempt, (Int, Map[String, Int], Int)]

// to track total no. of tasks in each stage of a micro-batch (streaming use case)
// this will help in requesting resources by counting pending tasks in job,
// rather than counting pending tasks in a stage.
private val pendingStagesToStageDetails = new mutable.HashMap[Int, StageDetails]

override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
if (streamingDRAFeatureEnabled) {
allocationManager.synchronized {
jobStart.stageInfos.foreach {stageInfo =>
pendingStagesToStageDetails(stageInfo.stageId) = StageDetails(
jobStart.jobId,
stageInfo.stageId,
stageInfo.numTasks,
stageInfo.resourceProfileId
)
}
logDebug(s"added Job: ${jobStart.jobId} stages details to: $pendingStagesToStageDetails")
}
}
}

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
if (streamingDRAFeatureEnabled) {
allocationManager.synchronized {
val pendingStagesOfTheJob = pendingStagesToStageDetails.values.filter(
stageDetails => stageDetails.jobId == jobEnd.jobId).map(_.stageId).toSeq
pendingStagesToStageDetails --= pendingStagesOfTheJob
logDebug(s"cleared all pending stages: $pendingStagesOfTheJob of the " +
s"completed Job: ${jobEnd.jobId}. Remaining pendingStagesToStageDetails:" +
s" $pendingStagesToStageDetails")
}
}
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
initializing = false
val stageId = stageSubmitted.stageInfo.stageId
Expand Down Expand Up @@ -702,6 +825,11 @@ private[spark] class ExecutorAllocationManager(
// Update the executor placement hints
updateExecutorPlacementHints()

if (streamingDRAFeatureEnabled) {
pendingStagesToStageDetails -= stageId
logDebug(s"removed current stage $stageId from pending tasks of the job")
}

if (!numExecutorsTargetPerResourceProfileId.contains(profId)) {
numExecutorsTargetPerResourceProfileId.put(profId, initialNumExecutors)
if (initialNumExecutors > 0) {
Expand Down Expand Up @@ -739,7 +867,14 @@ private[spark] class ExecutorAllocationManager(
if (stageAttemptToNumTasks.isEmpty
&& stageAttemptToPendingSpeculativeTasks.isEmpty
&& stageAttemptToSpeculativeTaskIndices.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
if (streamingDRAFeatureEnabled) {
if (!hasPendingTasksFromOtherStagesOfTheJob) {
allocationManager.onSchedulerQueueEmpty()
logDebug("cleared queue when all stages in a job are completed")
}
} else {
allocationManager.onSchedulerQueueEmpty()
}
}
}
}
Expand Down Expand Up @@ -868,15 +1003,28 @@ private[spark] class ExecutorAllocationManager(
}
}

def pendingTasksFromOtherStagesOfTheJob: Int =
pendingStagesToStageDetails.values.map(_.numTasks).sum


def pendingTasksFromOtherStagesOfTheJob(rpId: Int): Int =
pendingStagesToStageDetails.values.filter(_.resourceProfileId == rpId).map(_.numTasks).sum

def hasPendingTasksFromOtherStagesOfTheJob: Boolean =
pendingTasksFromOtherStagesOfTheJob > 0

/**
* An estimate of the total number of pending tasks remaining for currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
* An estimate of the total number of pending tasks remaining for currently running stages.
* will also account for pending tasks in other pending stages of a micro-job when
* `spark.dynamicAllocation.streaming.enabled` is enabled.
* Does not account for tasks which may have failed and been resubmitted.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
*/
def pendingTasksPerResourceProfile(rpId: Int): Int = {
val attempts = resourceProfileIdToStageAttempt.getOrElse(rpId, Set.empty).toSeq
attempts.map(attempt => getPendingTaskSum(attempt)).sum
(attempts.map(attempt => getPendingTaskSum(attempt)).sum +
pendingTasksFromOtherStagesOfTheJob(rpId))
}

def hasPendingRegularTasks: Boolean = {
Expand Down Expand Up @@ -916,8 +1064,13 @@ private[spark] class ExecutorAllocationManager(
attempts.count(attempt => unschedulableTaskSets.contains(attempt))
}

/**
* Will account for only pending tasks and speculative tasks of a current stage.
* Will also account for pending tasks from other stages if enabled for streaming use case.
* i.e if `spark.dynamicAllocation.streaming.enabled` is enabled.
*/
def hasPendingTasks: Boolean = {
hasPendingSpeculativeTasks || hasPendingRegularTasks
hasPendingSpeculativeTasks || hasPendingRegularTasks || hasPendingTasksFromOtherStagesOfTheJob
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you only check "hasPendingTasksFromOtherStagesOfTheJob" if "streamingDRAFeatureEnabled" is enabled?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasPendingTasksFromOtherStagesOfTheJob will always be false when streamingDRAFeatureEnabled is not enabled.

The underlying data structure stageIdToNumTasks will always be empty as update operations on it are always behind streamingDRAFeatureEnabled flag.

}

def totalRunningTasksPerResourceProfile(rp: Int): Int = {
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Expand Up @@ -601,6 +601,12 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val DYN_ALLOCATION_STREAMING_ENABLED =
ConfigBuilder("spark.dynamicAllocation.streaming.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this interact with spark.dynamicAllocation.enabled ? I think it needs to be clear to the user and docuemnted well.

you also need to update the configuration.md documentation to show this publicly

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update all configuration.md and job-scheduling.md in the next commit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes need to clarify whether it can be enabled separately or not.

.version("3.5.0")
.booleanConf
.createWithDefault(false)

private[spark] val DYN_ALLOCATION_TESTING =
ConfigBuilder("spark.dynamicAllocation.testing")
.version("1.2.0")
Expand Down Expand Up @@ -630,6 +636,12 @@ package object config {
.doubleConf
.createWithDefault(1.0)

private[spark] val DYN_ALLOCATION_EXECUTOR_DEALLOCATION_RATIO =
ConfigBuilder("spark.dynamicAllocation.streaming.executorDeallocationRatio")
.version("3.5.0")
.doubleConf
.createWithDefault(1.0)

private[spark] val DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.cachedExecutorIdleTimeout")
.version("1.4.0")
Expand All @@ -644,6 +656,11 @@ package object config {
.checkValue(_ >= 0L, "Timeout must be >= 0.")
.createWithDefault(60)

private[spark] val DYN_ALLOCATION_EXECUTOR_DEALLOCATION_TIMEOUT =
ConfigBuilder("spark.dynamicAllocation.streaming.executorDeallocationTimeout")
.version("3.5.0")
.fallbackConf(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)

private[spark] val DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED =
ConfigBuilder("spark.dynamicAllocation.shuffleTracking.enabled")
.version("3.0.0")
Expand Down