-
Notifications
You must be signed in to change notification settings - Fork 28k
/
ExecutorAllocationManager.scala
1020 lines (922 loc) · 46.7 KB
/
ExecutorAllocationManager.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.Tests.TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED
import org.apache.spark.metrics.source.Source
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
import org.apache.spark.resource.ResourceProfileManager
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.dynalloc.ExecutorMonitor
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
/**
* An agent that dynamically allocates and removes executors based on the workload.
*
* The ExecutorAllocationManager maintains a moving target number of executors, for each
* ResourceProfile, which is periodically synced to the cluster manager. The target starts
* at a configured initial value and changes with the number of pending and running tasks.
*
* Decreasing the target number of executors happens when the current target is more than needed to
* handle the current load. The target number of executors is always truncated to the number of
* executors that could run all current running and pending tasks at once.
*
* Increasing the target number of executors happens in response to backlogged tasks waiting to be
* scheduled. If the scheduler queue is not drained in M seconds, then new executors are added. If
* the queue persists for another N seconds, then more executors are added and so on. The number
* added in each round increases exponentially from the previous round until an upper bound has been
* reached. The upper bound is based both on a configured property and on the current number of
* running and pending tasks, as described above.
*
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
* we may add more executors than we need just to remove them later. (2) Executors should be added
* quickly over time in case the maximum number of executors is very high. Otherwise, it will take
* a long time to ramp up under heavy workloads.
*
* The remove policy is simpler and is applied on each ResourceProfile separately. If an executor
* for that ResourceProfile has been idle for K seconds and the number of executors is more
* then what is needed for that ResourceProfile, meaning there are not enough tasks that could use
* the executor, then it is removed. Note that an executor caching any data
* blocks will be removed if it has been idle for more than L seconds.
*
* There is no retry logic in either case because we make the assumption that the cluster manager
* will eventually fulfill all requests it receives asynchronously.
*
* The relevant Spark properties are below. Each of these properties applies separately to
* every ResourceProfile. So if you set a minimum number of executors, that is a minimum
* for each ResourceProfile.
*
* spark.dynamicAllocation.enabled - Whether this feature is enabled
* spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
* spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
* spark.dynamicAllocation.initialExecutors - Number of executors to start with
*
* spark.dynamicAllocation.executorAllocationRatio -
* This is used to reduce the parallelism of the dynamic allocation that can waste
* resources when tasks are small
*
* spark.dynamicAllocation.schedulerBacklogTimeout (M) -
* If there are backlogged tasks for this duration, add new executors
*
* spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) -
* If the backlog is sustained for this duration, add more executors
* This is used only after the initial backlog timeout is exceeded
*
* spark.dynamicAllocation.executorIdleTimeout (K) -
* If an executor without caching any data blocks has been idle for this duration, remove it
*
* spark.dynamicAllocation.cachedExecutorIdleTimeout (L) -
* If an executor with caching data blocks has been idle for more than this duration,
* the executor will be removed
*
*/
private[spark] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
listenerBus: LiveListenerBus,
conf: SparkConf,
cleaner: Option[ContextCleaner] = None,
clock: Clock = new SystemClock(),
resourceProfileManager: ResourceProfileManager,
reliableShuffleStorage: Boolean)
extends Logging {
allocationManager =>
import ExecutorAllocationManager._
// Lower and upper bounds on the number of executors.
private val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
private val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeoutS = conf.get(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT)
// Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
private val sustainedSchedulerBacklogTimeoutS =
conf.get(DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT)
// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.get(DYN_ALLOCATION_TESTING)
private val executorAllocationRatio =
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
private val decommissionEnabled = conf.get(DECOMMISSION_ENABLED)
private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id
validateSettings()
// Number of executors to add for each ResourceProfile in the next round
private[spark] val numExecutorsToAddPerResourceProfileId = new mutable.HashMap[Int, Int]
numExecutorsToAddPerResourceProfileId(defaultProfileId) = 1
// The desired number of executors at this moment in time. If all our executors were to die, this
// is the number of executors we would immediately want from the cluster manager.
// Note every profile will be allowed to have initial number,
// we may want to make this configurable per Profile in the future
private[spark] val numExecutorsTargetPerResourceProfileId = new mutable.HashMap[Int, Int]
numExecutorsTargetPerResourceProfileId(defaultProfileId) = initialNumExecutors
// A timestamp of when an addition should be triggered, or NOT_SET if it is not set
// This is set when pending tasks are added but not scheduled yet
private var addTime: Long = NOT_SET
// Polling loop interval (ms)
private val intervalMillis: Long = 100
// Listener for Spark events that impact the allocation policy
val listener = new ExecutorAllocationListener
// Executor that handles the scheduling task.
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
// Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
val executorAllocationManagerSource = new ExecutorAllocationManagerSource(this)
val executorMonitor =
new ExecutorMonitor(conf, client, listenerBus, clock, executorAllocationManagerSource)
// Whether we are still waiting for the initial set of executors to be allocated.
// While this is true, we will not cancel outstanding executor requests. This is
// set to false when:
// (1) a stage is submitted, or
// (2) an executor idle timeout has elapsed.
@volatile private var initializing: Boolean = true
// Number of locality aware tasks for each ResourceProfile, used for executor placement.
private var numLocalityAwareTasksPerResourceProfileId = new mutable.HashMap[Int, Int]
numLocalityAwareTasksPerResourceProfileId(defaultProfileId) = 0
// ResourceProfile id to Host to possible task running on it, used for executor placement.
private var rpIdToHostToLocalTaskCount: Map[Int, Map[String, Int]] = Map.empty
/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
*/
private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
throw new SparkException(
s"${DYN_ALLOCATION_MIN_EXECUTORS.key} and ${DYN_ALLOCATION_MAX_EXECUTORS.key} must be " +
"positive!")
}
if (maxNumExecutors == 0) {
throw new SparkException(s"${DYN_ALLOCATION_MAX_EXECUTORS.key} cannot be 0!")
}
if (minNumExecutors > maxNumExecutors) {
throw new SparkException(s"${DYN_ALLOCATION_MIN_EXECUTORS.key} ($minNumExecutors) must " +
s"be less than or equal to ${DYN_ALLOCATION_MAX_EXECUTORS.key} ($maxNumExecutors)!")
}
if (schedulerBacklogTimeoutS <= 0) {
throw new SparkException(s"${DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
if (sustainedSchedulerBacklogTimeoutS <= 0) {
throw new SparkException(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !reliableShuffleStorage) {
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
logInfo("Dynamic allocation is enabled without a shuffle service.")
} else if (decommissionEnabled &&
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
logInfo("Shuffle data decommission is enabled without a shuffle service.")
} else if (!testing) {
throw new SparkException("Dynamic allocation of executors requires one of the " +
"following conditions: 1) enabling external shuffle service through " +
s"${config.SHUFFLE_SERVICE_ENABLED.key}. 2) enabling shuffle tracking through " +
s"${DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED.key}. 3) enabling shuffle blocks " +
s"decommission through ${DECOMMISSION_ENABLED.key} and " +
s"${STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key}. 4) (Experimental) " +
s"configuring ${SHUFFLE_IO_PLUGIN_CLASS.key} to use a custom ShuffleDataIO who's " +
"ShuffleDriverComponents supports reliable storage.")
}
}
if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
throw new SparkException(
s"${DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO.key} must be > 0 and <= 1.0")
}
}
/**
* Register for scheduler callbacks to decide when to add and remove executors, and start
* the scheduling task.
*/
def start(): Unit = {
listenerBus.addToManagementQueue(listener)
listenerBus.addToManagementQueue(executorMonitor)
cleaner.foreach(_.attachListener(executorMonitor))
val scheduleTask = new Runnable() {
override def run(): Unit = Utils.tryLog(schedule())
}
if (!testing || conf.get(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED)) {
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
// copy the maps inside synchronize to ensure not being modified
val (numExecutorsTarget, numLocalityAware) = synchronized {
val numTarget = numExecutorsTargetPerResourceProfileId.toMap
val numLocality = numLocalityAwareTasksPerResourceProfileId.toMap
(numTarget, numLocality)
}
client.requestTotalExecutors(numExecutorsTarget, numLocalityAware, rpIdToHostToLocalTaskCount)
}
/**
* Stop the allocation manager.
*/
def stop(): Unit = {
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
}
/**
* Reset the allocation manager when the cluster manager loses track of the driver's state.
* This is currently only done in YARN client mode, when the AM is restarted.
*
* This method forgets about any state about existing executors, and forces the scheduler to
* re-evaluate the number of needed executors the next time it's run.
*/
def reset(): Unit = synchronized {
addTime = 0L
numExecutorsTargetPerResourceProfileId.keys.foreach { rpId =>
numExecutorsTargetPerResourceProfileId(rpId) = initialNumExecutors
}
numExecutorsToAddPerResourceProfileId.keys.foreach { rpId =>
numExecutorsToAddPerResourceProfileId(rpId) = 1
}
executorMonitor.reset()
}
/**
* The maximum number of executors, for the ResourceProfile id passed in, that we would need
* under the current load to satisfy all running and pending tasks, rounded up.
*/
private[spark] def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
val pendingTask = listener.pendingTasksPerResourceProfile(rpId)
val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId)
val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
val running = listener.totalRunningTasksPerResourceProfile(rpId)
val numRunningOrPendingTasks = pendingTask + pendingSpeculative + running
val rp = resourceProfileManager.resourceProfileFromId(rpId)
val tasksPerExecutor = rp.maxTasksPerExecutor(conf)
logDebug(s"max needed for rpId: $rpId numpending: $numRunningOrPendingTasks," +
s" tasksperexecutor: $tasksPerExecutor")
val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
tasksPerExecutor).toInt
val maxNeededWithSpeculationLocalityOffset =
if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
// If we have pending speculative tasks and only need a single executor, allocate one more
// to satisfy the locality requirements of speculation
maxNeeded + 1
} else {
maxNeeded
}
if (unschedulableTaskSets > 0) {
// Request additional executors to account for task sets having tasks that are unschedulable
// due to executors excluded for failures when the active executor count has already reached
// the max needed which we would normally get.
val maxNeededForUnschedulables = math.ceil(unschedulableTaskSets * executorAllocationRatio /
tasksPerExecutor).toInt
math.max(maxNeededWithSpeculationLocalityOffset,
executorMonitor.executorCountWithResourceProfile(rpId) + maxNeededForUnschedulables)
} else {
maxNeededWithSpeculationLocalityOffset
}
}
private def totalRunningTasksPerResourceProfile(id: Int): Int = synchronized {
listener.totalRunningTasksPerResourceProfile(id)
}
/**
* This is called at a fixed interval to regulate the number of pending executor requests
* and number of executors running.
*
* First, adjust our requested executors based on the add time and our current needs.
* Then, if the remove time for an existing executor has expired, kill the executor.
*
* This is factored out into its own method for testing.
*/
private def schedule(): Unit = synchronized {
val executorIdsToBeRemoved = executorMonitor.timedOutExecutors()
if (executorIdsToBeRemoved.nonEmpty) {
initializing = false
}
// Update executor target number only after initializing flag is unset
updateAndSyncNumExecutorsTarget(clock.nanoTime())
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
}
/**
* Updates our target number of executors for each ResourceProfile and then syncs the result
* with the cluster manager.
*
* Check to see whether our existing allocation and the requests we've made previously exceed our
* current needs. If so, truncate our target and let the cluster manager know so that it can
* cancel pending requests that are unneeded.
*
* If not, and the add time has expired, see if we can request new executors and refresh the add
* time.
*
* @return the delta in the target number of executors.
*/
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
if (initializing) {
// Do not change our target while we are still initializing,
// Otherwise the first job may have to ramp up unnecessarily
0
} else {
val updatesNeeded = new mutable.HashMap[Int, ExecutorAllocationManager.TargetNumUpdates]
// Update targets for all ResourceProfiles then do a single request to the cluster manager
numExecutorsTargetPerResourceProfileId.foreach { case (rpId, targetExecs) =>
val maxNeeded = maxNumExecutorsNeededPerResourceProfile(rpId)
if (maxNeeded < targetExecs) {
// The target number exceeds the number we actually need, so stop adding new
// executors and inform the cluster manager to cancel the extra pending requests
// We lower the target number of executors but don't actively kill any yet. Killing is
// controlled separately by an idle timeout. It's still helpful to reduce
// the target number in case an executor just happens to get lost (e.g., bad hardware,
// or the cluster manager preempts it) -- in that case, there is no point in trying
// to immediately get a new executor, since we wouldn't even use it yet.
decrementExecutorsFromTarget(maxNeeded, rpId, updatesNeeded)
} else if (addTime != NOT_SET && now >= addTime) {
addExecutorsToTarget(maxNeeded, rpId, updatesNeeded)
}
}
doUpdateRequest(updatesNeeded.toMap, now)
}
}
private def addExecutorsToTarget(
maxNeeded: Int,
rpId: Int,
updatesNeeded: mutable.HashMap[Int, ExecutorAllocationManager.TargetNumUpdates]): Int = {
updateTargetExecs(addExecutors, maxNeeded, rpId, updatesNeeded)
}
private def decrementExecutorsFromTarget(
maxNeeded: Int,
rpId: Int,
updatesNeeded: mutable.HashMap[Int, ExecutorAllocationManager.TargetNumUpdates]): Int = {
updateTargetExecs(decrementExecutors, maxNeeded, rpId, updatesNeeded)
}
private def updateTargetExecs(
updateTargetFn: (Int, Int) => Int,
maxNeeded: Int,
rpId: Int,
updatesNeeded: mutable.HashMap[Int, ExecutorAllocationManager.TargetNumUpdates]): Int = {
val oldNumExecutorsTarget = numExecutorsTargetPerResourceProfileId(rpId)
// update the target number (add or remove)
val delta = updateTargetFn(maxNeeded, rpId)
if (delta != 0) {
updatesNeeded(rpId) = ExecutorAllocationManager.TargetNumUpdates(delta, oldNumExecutorsTarget)
}
delta
}
private def doUpdateRequest(
updates: Map[Int, ExecutorAllocationManager.TargetNumUpdates],
now: Long): Int = {
// Only call cluster manager if target has changed.
if (updates.size > 0) {
val requestAcknowledged = try {
logDebug("requesting updates: " + updates)
testing ||
client.requestTotalExecutors(
numExecutorsTargetPerResourceProfileId.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap,
rpIdToHostToLocalTaskCount)
} catch {
case NonFatal(e) =>
// Use INFO level so the error it doesn't show up by default in shells.
// Errors here are more commonly caused by YARN AM restarts, which is a recoverable
// issue, and generate a lot of noisy output.
logInfo("Error reaching cluster manager.", e)
false
}
if (requestAcknowledged) {
// have to go through all resource profiles that changed
var totalDelta = 0
updates.foreach { case (rpId, targetNum) =>
val delta = targetNum.delta
totalDelta += delta
if (delta > 0) {
val executorsString = "executor" + { if (delta > 1) "s" else "" }
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged " +
s"(new desired total will be ${numExecutorsTargetPerResourceProfileId(rpId)} " +
s"for resource profile id: ${rpId})")
numExecutorsToAddPerResourceProfileId(rpId) =
if (delta == numExecutorsToAddPerResourceProfileId(rpId)) {
numExecutorsToAddPerResourceProfileId(rpId) * 2
} else {
1
}
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime = now + TimeUnit.SECONDS.toNanos(sustainedSchedulerBacklogTimeoutS)
} else {
logDebug(s"Lowering target number of executors to" +
s" ${numExecutorsTargetPerResourceProfileId(rpId)} (previously " +
s"${targetNum.oldNumExecutorsTarget} for resource profile id: ${rpId}) " +
"because not all requested executors " +
"are actually needed")
}
}
totalDelta
} else {
// request was for all profiles so we have to go through all to reset to old num
updates.foreach { case (rpId, targetNum) =>
logWarning("Unable to reach the cluster manager to request more executors!")
numExecutorsTargetPerResourceProfileId(rpId) = targetNum.oldNumExecutorsTarget
}
0
}
} else {
logDebug("No change in number of executors")
0
}
}
private def decrementExecutors(maxNeeded: Int, rpId: Int): Int = {
val oldNumExecutorsTarget = numExecutorsTargetPerResourceProfileId(rpId)
numExecutorsTargetPerResourceProfileId(rpId) = math.max(maxNeeded, minNumExecutors)
numExecutorsToAddPerResourceProfileId(rpId) = 1
numExecutorsTargetPerResourceProfileId(rpId) - oldNumExecutorsTarget
}
/**
* Update the target number of executors and figure out how many to add.
* If the cap on the number of executors is reached, give up and reset the
* number of executors to add next round instead of continuing to double it.
*
* @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending
* tasks could fill
* @param rpId the ResourceProfile id of the executors
* @return the number of additional executors actually requested.
*/
private def addExecutors(maxNumExecutorsNeeded: Int, rpId: Int): Int = {
val oldNumExecutorsTarget = numExecutorsTargetPerResourceProfileId(rpId)
// Do not request more executors if it would put our target over the upper bound
// this is doing a max check per ResourceProfile
if (oldNumExecutorsTarget >= maxNumExecutors) {
logDebug("Not adding executors because our current target total " +
s"is already ${oldNumExecutorsTarget} (limit $maxNumExecutors)")
numExecutorsToAddPerResourceProfileId(rpId) = 1
return 0
}
// There's no point in wasting time ramping up to the number of executors we already have, so
// make sure our target is at least as much as our current allocation:
var numExecutorsTarget = math.max(numExecutorsTargetPerResourceProfileId(rpId),
executorMonitor.executorCountWithResourceProfile(rpId))
// Boost our target with the number to add for this round:
numExecutorsTarget += numExecutorsToAddPerResourceProfileId(rpId)
// Ensure that our target doesn't exceed what we need at the present moment:
numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
// Ensure that our target fits within configured bounds:
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
val delta = numExecutorsTarget - oldNumExecutorsTarget
numExecutorsTargetPerResourceProfileId(rpId) = numExecutorsTarget
// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth
if (delta == 0) {
numExecutorsToAddPerResourceProfileId(rpId) = 1
}
delta
}
/**
* Request the cluster manager to remove the given executors.
* Returns the list of executors which are removed.
*/
private def removeExecutors(executors: Seq[(String, Int)]): Seq[String] = synchronized {
val executorIdsToBeRemoved = new ArrayBuffer[String]
logDebug(s"Request to remove executorIds: ${executors.mkString(", ")}")
val numExecutorsTotalPerRpId = mutable.Map[Int, Int]()
executors.foreach { case (executorIdToBeRemoved, rpId) =>
if (rpId == UNKNOWN_RESOURCE_PROFILE_ID) {
if (testing) {
throw new SparkException("ResourceProfile Id was UNKNOWN, this is not expected")
}
logWarning(s"Not removing executor $executorIdToBeRemoved because the " +
"ResourceProfile was UNKNOWN!")
} else {
// get the running total as we remove or initialize it to the count - pendingRemoval
val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId,
(executorMonitor.executorCountWithResourceProfile(rpId) -
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) -
executorMonitor.decommissioningPerResourceProfileId(rpId)
))
if (newExecutorTotal - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +
s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " +
s"$minNumExecutors)")
} else if (newExecutorTotal - 1 < numExecutorsTargetPerResourceProfileId(rpId)) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +
s"are only $newExecutorTotal executor(s) left (number of executor " +
s"target ${numExecutorsTargetPerResourceProfileId(rpId)})")
} else {
executorIdsToBeRemoved += executorIdToBeRemoved
numExecutorsTotalPerRpId(rpId) -= 1
}
}
}
if (executorIdsToBeRemoved.isEmpty) {
return Seq.empty[String]
}
// Send a request to the backend to kill this executor(s)
val executorsRemoved = if (testing) {
executorIdsToBeRemoved
} else {
// We don't want to change our target number of executors, because we already did that
// when the task backlog decreased.
if (decommissionEnabled) {
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.map(
id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray
client.decommissionExecutors(
executorIdsWithoutHostLoss,
adjustTargetNumExecutors = false,
triggeredByExecutor = false)
} else {
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
countFailures = false, force = false)
}
}
// [SPARK-21834] killExecutors api reduces the target number of executors.
// So we need to update the target with desired value.
client.requestTotalExecutors(
numExecutorsTargetPerResourceProfileId.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap,
rpIdToHostToLocalTaskCount)
// reset the newExecutorTotal to the existing number of executors
if (testing || executorsRemoved.nonEmpty) {
if (decommissionEnabled) {
executorMonitor.executorsDecommissioned(executorsRemoved.toSeq)
} else {
executorMonitor.executorsKilled(executorsRemoved.toSeq)
}
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
executorsRemoved.toSeq
} else {
logWarning(s"Unable to reach the cluster manager to kill executor/s " +
s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!")
Seq.empty[String]
}
}
/**
* Callback invoked when the scheduler receives new pending tasks.
* This sets a time in the future that decides when executors should be added
* if it is not already set.
*/
private def onSchedulerBacklogged(): Unit = synchronized {
if (addTime == NOT_SET) {
logDebug(s"Starting timer to add executors because pending tasks " +
s"are building up (to expire in $schedulerBacklogTimeoutS seconds)")
addTime = clock.nanoTime() + TimeUnit.SECONDS.toNanos(schedulerBacklogTimeoutS)
}
}
/**
* Callback invoked when the scheduler queue is drained.
* This resets all variables used for adding executors.
*/
private def onSchedulerQueueEmpty(): Unit = synchronized {
logDebug("Clearing timer to add executors because there are no more pending tasks")
addTime = NOT_SET
numExecutorsToAddPerResourceProfileId.transform { case (_, _) => 1 }
}
private case class StageAttempt(stageId: Int, stageAttemptId: Int) {
override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
}
/**
* A listener that notifies the given allocation manager of when to add and remove executors.
*
* This class is intentionally conservative in its assumptions about the relative ordering
* and consistency of events returned by the listener.
*/
private[spark] class ExecutorAllocationListener extends SparkListener {
private val stageAttemptToNumTasks = new mutable.HashMap[StageAttempt, Int]
// Number of running tasks per stageAttempt including speculative tasks.
// Should be 0 when no stages are active.
private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
// Map from each stageAttempt to a set of running speculative task indexes
// TODO(SPARK-41192): We simply need an Int for this.
private val stageAttemptToSpeculativeTaskIndices =
new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
// Map from each stageAttempt to a set of pending speculative task indexes
private val stageAttemptToPendingSpeculativeTasks =
new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
private val resourceProfileIdToStageAttempt =
new mutable.HashMap[Int, mutable.Set[StageAttempt]]
// Keep track of unschedulable task sets because of executor/node exclusions from too many task
// failures. This is a Set of StageAttempt's because we'll only take the last unschedulable task
// in a taskset although there can be more. This is done in order to avoid costly loops in the
// scheduling. Check TaskSetManager#getCompletelyExcludedTaskIfAny for more details.
private val unschedulableTaskSets = new mutable.HashSet[StageAttempt]
// stageAttempt to tuple (the number of task with locality preferences, a map where each pair
// is a node and the number of tasks that would like to be scheduled on that node, and
// the resource profile id) map,
// maintain the executor placement hints for each stageAttempt used by resource framework
// to better place the executors.
private val stageAttemptToExecutorPlacementHints =
new mutable.HashMap[StageAttempt, (Int, Map[String, Int], Int)]
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
initializing = false
val stageId = stageSubmitted.stageInfo.stageId
val stageAttemptId = stageSubmitted.stageInfo.attemptNumber()
val stageAttempt = StageAttempt(stageId, stageAttemptId)
val numTasks = stageSubmitted.stageInfo.numTasks
allocationManager.synchronized {
stageAttemptToNumTasks(stageAttempt) = numTasks
allocationManager.onSchedulerBacklogged()
// need to keep stage task requirements to ask for the right containers
val profId = stageSubmitted.stageInfo.resourceProfileId
logDebug(s"Stage resource profile id is: $profId with numTasks: $numTasks")
resourceProfileIdToStageAttempt.getOrElseUpdate(
profId, new mutable.HashSet[StageAttempt]) += stageAttempt
numExecutorsToAddPerResourceProfileId.getOrElseUpdate(profId, 1)
// Compute the number of tasks requested by the stage on each host
var numTasksPending = 0
val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]()
stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>
if (!locality.isEmpty) {
numTasksPending += 1
locality.foreach { location =>
val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1
hostToLocalTaskCountPerStage(location.host) = count
}
}
}
stageAttemptToExecutorPlacementHints.put(stageAttempt,
(numTasksPending, hostToLocalTaskCountPerStage.toMap, profId))
// Update the executor placement hints
updateExecutorPlacementHints()
if (!numExecutorsTargetPerResourceProfileId.contains(profId)) {
numExecutorsTargetPerResourceProfileId.put(profId, initialNumExecutors)
if (initialNumExecutors > 0) {
logDebug(s"requesting executors, rpId: $profId, initial number is $initialNumExecutors")
// we need to trigger a schedule since we add an initial number here.
client.requestTotalExecutors(
numExecutorsTargetPerResourceProfileId.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap,
rpIdToHostToLocalTaskCount)
}
}
}
}
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val stageId = stageCompleted.stageInfo.stageId
val stageAttemptId = stageCompleted.stageInfo.attemptNumber()
val stageAttempt = StageAttempt(stageId, stageAttemptId)
allocationManager.synchronized {
// do NOT remove stageAttempt from stageAttemptToNumRunningTask
// because the attempt may still have running tasks,
// even after another attempt for the stage is submitted.
stageAttemptToNumTasks -= stageAttempt
stageAttemptToPendingSpeculativeTasks -= stageAttempt
stageAttemptToTaskIndices -= stageAttempt
stageAttemptToSpeculativeTaskIndices -= stageAttempt
stageAttemptToExecutorPlacementHints -= stageAttempt
removeStageFromResourceProfileIfUnused(stageAttempt)
// Update the executor placement hints
updateExecutorPlacementHints()
// If this is the last stage with pending tasks, mark the scheduler queue as empty
// This is needed in case the stage is aborted for any reason
if (stageAttemptToNumTasks.isEmpty
&& stageAttemptToPendingSpeculativeTasks.isEmpty
&& stageAttemptToSpeculativeTaskIndices.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
}
}
}
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
val stageId = taskStart.stageId
val stageAttemptId = taskStart.stageAttemptId
val stageAttempt = StageAttempt(stageId, stageAttemptId)
val taskIndex = taskStart.taskInfo.index
allocationManager.synchronized {
stageAttemptToNumRunningTask(stageAttempt) =
stageAttemptToNumRunningTask.getOrElse(stageAttempt, 0) + 1
// If this is the last pending task, mark the scheduler queue as empty
if (taskStart.taskInfo.speculative) {
stageAttemptToSpeculativeTaskIndices.getOrElseUpdate(stageAttempt,
new mutable.HashSet[Int]) += taskIndex
stageAttemptToPendingSpeculativeTasks
.get(stageAttempt).foreach(_.remove(taskIndex))
} else {
stageAttemptToTaskIndices.getOrElseUpdate(stageAttempt,
new mutable.HashSet[Int]) += taskIndex
}
if (!hasPendingTasks) {
allocationManager.onSchedulerQueueEmpty()
}
}
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val stageId = taskEnd.stageId
val stageAttemptId = taskEnd.stageAttemptId
val stageAttempt = StageAttempt(stageId, stageAttemptId)
val taskIndex = taskEnd.taskInfo.index
allocationManager.synchronized {
if (stageAttemptToNumRunningTask.contains(stageAttempt)) {
stageAttemptToNumRunningTask(stageAttempt) -= 1
if (stageAttemptToNumRunningTask(stageAttempt) == 0) {
stageAttemptToNumRunningTask -= stageAttempt
removeStageFromResourceProfileIfUnused(stageAttempt)
}
}
if (taskEnd.taskInfo.speculative) {
stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}}
}
taskEnd.reason match {
case Success =>
// Remove pending speculative task in case the normal task
// is finished before starting the speculative task
stageAttemptToPendingSpeculativeTasks.get(stageAttempt).foreach(_.remove(taskIndex))
case _: TaskKilled =>
case _ =>
if (!hasPendingTasks) {
// If the task failed (not intentionally killed), we expect it to be resubmitted
// later. To ensure we have enough resources to run the resubmitted task, we need to
// mark the scheduler as backlogged again if it's not already marked as such
// (SPARK-8366)
allocationManager.onSchedulerBacklogged()
}
if (!taskEnd.taskInfo.speculative) {
// If a non-speculative task is intentionally killed, it means the speculative task
// has succeeded, and no further task of this task index will be resubmitted. In this
// case, the task index is completed and we shouldn't remove it from
// stageAttemptToTaskIndices. Otherwise, we will have a pending non-speculative task
// for the task index (SPARK-30511)
stageAttemptToTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
}
}
}
}
override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted)
: Unit = {
val stageId = speculativeTask.stageId
val stageAttemptId = speculativeTask.stageAttemptId
val stageAttempt = StageAttempt(stageId, stageAttemptId)
val taskIndex = speculativeTask.taskIndex
allocationManager.synchronized {
stageAttemptToPendingSpeculativeTasks.getOrElseUpdate(stageAttempt,
new mutable.HashSet[Int]).add(taskIndex)
allocationManager.onSchedulerBacklogged()
}
}
override def onUnschedulableTaskSetAdded(
unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = {
val stageId = unschedulableTaskSetAdded.stageId
val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId
val stageAttempt = StageAttempt(stageId, stageAttemptId)
allocationManager.synchronized {
unschedulableTaskSets.add(stageAttempt)
allocationManager.onSchedulerBacklogged()
}
}
override def onUnschedulableTaskSetRemoved(
unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = {
val stageId = unschedulableTaskSetRemoved.stageId
val stageAttemptId = unschedulableTaskSetRemoved.stageAttemptId
val stageAttempt = StageAttempt(stageId, stageAttemptId)
allocationManager.synchronized {
// Clear unschedulableTaskSets since atleast one task becomes schedulable now
unschedulableTaskSets.remove(stageAttempt)
removeStageFromResourceProfileIfUnused(stageAttempt)
}
}
def removeStageFromResourceProfileIfUnused(stageAttempt: StageAttempt): Unit = {
if (!stageAttemptToNumRunningTask.contains(stageAttempt) &&
!stageAttemptToNumTasks.contains(stageAttempt) &&
!stageAttemptToPendingSpeculativeTasks.contains(stageAttempt) &&
!stageAttemptToTaskIndices.contains(stageAttempt) &&
!stageAttemptToSpeculativeTaskIndices.contains(stageAttempt)
) {
val rpForStage = resourceProfileIdToStageAttempt.filter { case (k, v) =>
v.contains(stageAttempt)
}.keys
if (rpForStage.size == 1) {
// be careful about the removal from here due to late tasks, make sure stage is
// really complete and no tasks left
resourceProfileIdToStageAttempt(rpForStage.head) -= stageAttempt
} else {
logWarning(s"Should have exactly one resource profile for stage $stageAttempt," +
s" but have $rpForStage")
}
}
}
/**
* An estimate of the total number of pending tasks remaining for currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
*/
def pendingTasksPerResourceProfile(rpId: Int): Int = {
val attempts = resourceProfileIdToStageAttempt.getOrElse(rpId, Set.empty).toSeq
attempts.map(attempt => getPendingTaskSum(attempt)).sum
}
def hasPendingRegularTasks: Boolean = {
val attemptSets = resourceProfileIdToStageAttempt.values
attemptSets.exists(attempts => attempts.exists(getPendingTaskSum(_) > 0))
}
private def getPendingTaskSum(attempt: StageAttempt): Int = {
val numTotalTasks = stageAttemptToNumTasks.getOrElse(attempt, 0)
val numRunning = stageAttemptToTaskIndices.get(attempt).map(_.size).getOrElse(0)
numTotalTasks - numRunning
}
def pendingSpeculativeTasksPerResourceProfile(rp: Int): Int = {
val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq
attempts.map(attempt => getPendingSpeculativeTaskSum(attempt)).sum
}
def hasPendingSpeculativeTasks: Boolean = {
val attemptSets = resourceProfileIdToStageAttempt.values
attemptSets.exists { attempts =>
attempts.exists(getPendingSpeculativeTaskSum(_) > 0)
}
}
private def getPendingSpeculativeTaskSum(attempt: StageAttempt): Int = {
stageAttemptToPendingSpeculativeTasks.get(attempt).map(_.size).getOrElse(0)
}
/**
* Currently we only know when a task set has an unschedulable task, we don't know
* the exact number and since the allocation manager isn't tied closely with the scheduler,
* we use the number of tasks sets that are unschedulable as a heuristic to add more executors.
*/
def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq
attempts.count(attempt => unschedulableTaskSets.contains(attempt))
}
def hasPendingTasks: Boolean = {
hasPendingSpeculativeTasks || hasPendingRegularTasks
}
def totalRunningTasksPerResourceProfile(rp: Int): Int = {
val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq
// attempts is a Set, change to Seq so we keep all values
attempts.map { attempt =>
stageAttemptToNumRunningTask.getOrElse(attempt, 0)
}.sum
}
/**
* Update the Executor placement hints (the number of tasks with locality preferences,
* a map where each pair is a node and the number of tasks that would like to be scheduled
* on that node).
*
* These hints are updated when stages arrive and complete, so are not up-to-date at task
* granularity within stages.
*/
def updateExecutorPlacementHints(): Unit = {
val localityAwareTasksPerResourceProfileId = new mutable.HashMap[Int, Int]
// ResourceProfile id => map[host, count]
val rplocalityToCount = new mutable.HashMap[Int, mutable.HashMap[String, Int]]()
stageAttemptToExecutorPlacementHints.values.foreach {
case (numTasksPending, localities, rpId) =>
val rpNumPending =
localityAwareTasksPerResourceProfileId.getOrElse(rpId, 0)
localityAwareTasksPerResourceProfileId(rpId) = rpNumPending + numTasksPending
localities.foreach { case (hostname, count) =>
val rpBasedHostToCount =
rplocalityToCount.getOrElseUpdate(rpId, new mutable.HashMap[String, Int])
val newUpdated = rpBasedHostToCount.getOrElse(hostname, 0) + count
rpBasedHostToCount(hostname) = newUpdated
}
}
allocationManager.numLocalityAwareTasksPerResourceProfileId =
localityAwareTasksPerResourceProfileId
allocationManager.rpIdToHostToLocalTaskCount =
rplocalityToCount.map { case (k, v) => (k, v.toMap)}.toMap
}
}
}
/**
* Metric source for ExecutorAllocationManager to expose its internal executor allocation
* status to MetricsSystem.
* Note: These metrics heavily rely on the internal implementation of
* ExecutorAllocationManager, metrics or value of metrics will be changed when internal
* implementation is changed, so these metrics are not stable across Spark version.
*/
private[spark] class ExecutorAllocationManagerSource(
executorAllocationManager: ExecutorAllocationManager) extends Source {
val sourceName = "ExecutorAllocationManager"
val metricRegistry = new MetricRegistry()
private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = {
metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] {
override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) }
})
}
private def getCounter(name: String): Counter = {
metricRegistry.counter(MetricRegistry.name("executors", name))
}
val gracefullyDecommissioned: Counter = getCounter("numberExecutorsGracefullyDecommissioned")
val decommissionUnfinished: Counter = getCounter("numberExecutorsDecommissionUnfinished")
val driverKilled: Counter = getCounter("numberExecutorsKilledByDriver")
val exitedUnexpectedly: Counter = getCounter("numberExecutorsExitedUnexpectedly")
// The metrics are going to return the sum for all the different ResourceProfiles.
registerGauge("numberExecutorsToAdd",
executorAllocationManager.numExecutorsToAddPerResourceProfileId.values.sum, 0)