/
TaskExecutorService.java
1315 lines (1183 loc) · 55.8 KB
/
TaskExecutorService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.llap.daemon.impl;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
import org.apache.hadoop.hive.llap.daemon.impl.comparator.LlapQueueComparatorBase;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.runtime.task.EndReason;
import org.apache.tez.runtime.task.TaskRunner2Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Task executor service provides method for scheduling tasks. Tasks submitted to executor service
* are submitted to wait queue for scheduling. Wait queue tasks are ordered based on the priority
* of the task. The internal wait queue scheduler moves tasks from wait queue when executor slots
* are available or when a higher priority task arrives and will schedule it for execution.
* When pre-emption is enabled, the tasks from wait queue can replace(pre-empt) a running task.
* The pre-empted task is reported back to the Application Master(AM) for it to be rescheduled.
* <br>
* Because of the concurrent nature of task submission, the position of the task in wait queue is
* held as long the scheduling of the task from wait queue (with or without pre-emption) is complete.
* The order of pre-emption is based on the ordering in the pre-emption queue. All tasks that cannot
* run to completion immediately (canFinish = false) are added to pre-emption queue.
* <br>
* When all the executor threads are occupied and wait queue is full, the task scheduler will
* return SubmissionState.REJECTED response
* <br>
* Task executor service can be shut down which will terminated all running tasks and reject all
* new tasks. Shutting down of the task executor service can be done gracefully or immediately.
*/
public class TaskExecutorService extends AbstractService
implements Scheduler<TaskRunnerCallable>, SchedulerFragmentCompletingListener {
private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class);
public static final String TASK_EXECUTOR_THREAD_NAME_FORMAT_PREFIX = "Task-Executor-";
private static final String TASK_EXECUTOR_THREAD_NAME_FORMAT = TASK_EXECUTOR_THREAD_NAME_FORMAT_PREFIX + "%d";
private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT = "Wait-Queue-Scheduler-%d";
private static final long PREEMPTION_KILL_GRACE_MS = 500; // 500ms
private static final int PREEMPTION_KILL_GRACE_SLEEP_MS = 50; // 50ms
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
// Thread pool for actual execution of work.
private final ListeningExecutorService executorService;
@VisibleForTesting
final EvictingPriorityBlockingQueue<TaskWrapper> waitQueue;
// Thread pool for taking entities off the wait queue.
private final ListeningExecutorService waitQueueExecutorService;
// Thread pool for callbacks on completion of execution of a work unit.
private final ListeningExecutorService executionCompletionExecutorService;
@VisibleForTesting
final BlockingQueue<TaskWrapper> preemptionQueue;
private final boolean enablePreemption;
private final ThreadPoolExecutor threadPoolExecutor;
@VisibleForTesting
AtomicInteger numSlotsAvailable;
@VisibleForTesting
int maxParallelExecutors;
private final int configuredMaxExecutors;
private final int configuredWaitingQueueSize;
private final Clock clock;
// Tracks running fragments, and completing fragments.
// Completing since we have a race in the AM being notified and the task actually
// falling off, and the executor service being ready to schedule a new task.
private final AtomicInteger runningFragmentCount = new AtomicInteger(0);
@VisibleForTesting
/**
* Accessed under the epic lock unless it's used for reporting.
* Added when calling schedule.
* Removed if evicted, killed, or finished with success or failure.
* In the former two cases, killTask is called on TaskRunnerCallable. In most cases, some
* cleanup is performed under the epic lock.
*/
final ConcurrentMap<String, TaskWrapper> knownTasks = new ConcurrentHashMap<>();
private final Object lock = new Object();
private final LlapDaemonExecutorMetrics metrics;
public TaskExecutorService(int numExecutors, int waitQueueSize,
String waitQueueComparatorClassName, boolean enablePreemption,
ClassLoader classLoader, final LlapDaemonExecutorMetrics metrics, Clock clock) {
super(TaskExecutorService.class.getSimpleName());
checkNotNull(waitQueueComparatorClassName, "required argument 'waitQueueComparatorClassName' is null");
checkNotNull(classLoader, "required argument 'classLoader' is null");
checkNotNull(metrics, "required argument 'metrics' is null");
LOG.info("TaskExecutorService is being setup with parameters: "
+ "numExecutors=" + numExecutors
+ ", waitQueueSize=" + waitQueueSize
+ ", waitQueueComparatorClassName=" + waitQueueComparatorClassName
+ ", enablePreemption=" + enablePreemption);
final LlapQueueComparatorBase waitQueueComparator = createComparator(
waitQueueComparatorClassName);
this.maxParallelExecutors = numExecutors;
this.configuredMaxExecutors = numExecutors;
this.configuredWaitingQueueSize = waitQueueSize;
this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize);
this.clock = clock == null ? new MonotonicClock() : clock;
this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size
numExecutors, // max pool size
1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), // direct hand-off
new ExecutorThreadFactory(classLoader));
this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
this.preemptionQueue = new PriorityBlockingQueue<>(numExecutors,
new PreemptionQueueComparator());
this.enablePreemption = enablePreemption;
this.numSlotsAvailable = new AtomicInteger(numExecutors);
this.metrics = metrics;
this.metrics.setNumExecutorsAvailable(numSlotsAvailable.get());
this.metrics.setNumExecutors(numExecutors);
this.metrics.setWaitQueueSize(waitQueueSize);
// single threaded scheduler for tasks from wait queue to executor threads
ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT).build());
this.waitQueueExecutorService = MoreExecutors.listeningDecorator(wes);
ExecutorService executionCompletionExecutorServiceRaw = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ExecutionCompletionThread #%d")
.build());
executionCompletionExecutorService = MoreExecutors.listeningDecorator(
executionCompletionExecutorServiceRaw);
ListenableFuture<?> future = waitQueueExecutorService.submit(new WaitQueueWorker());
Futures.addCallback(future, new WaitQueueWorkerCallback(), MoreExecutors.directExecutor());
}
/**
* Sets the TaskExecutorService capacity to the new values. Both the number of executors and the
* queue size should be smaller than that original values, so we do not mess up with the other
* settings. (For example: We do not allow higher executor number which could cause memory
* oversubscription since the container memory sizes are calculated based on the maximum memory
* and the maximum number of executors)
* Setting smaller capacity will not cancel or reject already executing or queued tasks in itself.
* @param newNumExecutors The new number of executors
* @param newWaitQueueSize The new number of wait queue size
*/
@Override
public synchronized void setCapacity(int newNumExecutors, int newWaitQueueSize) {
if (newNumExecutors > configuredMaxExecutors) {
throw new IllegalArgumentException("Requested newNumExecutors=" + newNumExecutors
+ " is greater than the configured maximum=" + configuredMaxExecutors);
}
if (newWaitQueueSize > configuredWaitingQueueSize) {
throw new IllegalArgumentException("Requested newWaitQueueSize=" + newWaitQueueSize
+ " is greater than the configured maximum=" + configuredWaitingQueueSize);
}
if (newNumExecutors < 0) {
throw new IllegalArgumentException("Negative numExecutors is not allowed. Requested "
+ "newNumExecutors=" + newNumExecutors);
}
if (newWaitQueueSize < 0) {
throw new IllegalArgumentException("Negative waitQueueSize is not allowed. Requested "
+ "newWaitQueueSize=" + newWaitQueueSize);
}
numSlotsAvailable.addAndGet(newNumExecutors - maxParallelExecutors);
maxParallelExecutors = newNumExecutors;
waitQueue.setWaitQueueSize(newWaitQueueSize);
metrics.setNumExecutors(newNumExecutors);
metrics.setWaitQueueSize(newWaitQueueSize);
// If there is no executor left so the queued tasks can not be finished anyway, kill them all.
if (newNumExecutors == 0) {
synchronized (lock) {
TaskWrapper task = waitQueue.peek();
while (task != null) {
LOG.info("Killing task [" + task + "], since no executor left.");
task.getTaskRunnerCallable().killTask();
if (waitQueue.remove(task)) {
metrics.setExecutorNumQueuedRequests(waitQueue.size());
}
task = waitQueue.peek();
}
}
}
LOG.info("TaskExecutorService is setting capacity to: numExecutors=" + newNumExecutors
+ ", waitQueueSize=" + newWaitQueueSize);
}
private LlapQueueComparatorBase createComparator(
String waitQueueComparatorClassName) {
final LlapQueueComparatorBase waitQueueComparator;
try {
Class<? extends LlapQueueComparatorBase> waitQueueComparatorClazz =
(Class<? extends LlapQueueComparatorBase>) Class.forName(waitQueueComparatorClassName);
Constructor<? extends LlapQueueComparatorBase> ctor =
waitQueueComparatorClazz.getConstructor(null);
waitQueueComparator = ctor.newInstance(null);
} catch (ClassNotFoundException e) {
throw new RuntimeException(
"Failed to load wait queue comparator, class=" + waitQueueComparatorClassName, e);
} catch (NoSuchMethodException e) {
throw new RuntimeException("Failed to find constructor for wait queue comparator, class=" +
waitQueueComparatorClassName, e);
} catch (InvocationTargetException | InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Failed to find instantiate wait queue comparator, class="
+ waitQueueComparatorClassName, e);
}
return waitQueueComparator;
}
@Override
public void serviceStop() {
shutDown(false);
}
private static final ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};
@Override
public int getNumActiveForReporting() {
int result = 0;
for (Map.Entry<String, TaskWrapper> e : knownTasks.entrySet()) {
TaskWrapper task = e.getValue();
if (task.isInWaitQueue()) continue;
TaskRunnerCallable c = task.getTaskRunnerCallable();
// Count the tasks in intermediate state as waiting.
if (c == null || c.getStartTime() == 0) continue;
++result;
}
return result;
}
@Override
public Set<String> getExecutorsStatusForReporting() {
// TODO Change this method to make the output easier to parse (parse programmatically)
Set<String> result = new LinkedHashSet<>();
Set<String> running = new LinkedHashSet<>();
Set<String> waiting = new LinkedHashSet<>();
StringBuilder value = new StringBuilder();
final List<TaskWrapper> queueState = new ArrayList<>();
// Note: we don't take the scheduling lock here, although the call to queue is still
// synchronized. Best-effort to display the queue in order.
waitQueue.apply(new Function<TaskWrapper, Boolean>() {
public Boolean apply(TaskWrapper input) {
queueState.add(input);
return true;
}});
HashSet<TaskWrapper> queueHs = new HashSet<>();
for (TaskWrapper task : queueState) {
describeTask(value, task.getRequestId(), task, true);
waiting.add(value.toString());
queueHs.add(task);
}
for (Map.Entry<String, TaskWrapper> e : knownTasks.entrySet()) {
String attemptId = e.getKey();
TaskWrapper task = e.getValue();
if (queueHs.contains(task)) {
// Even if the state has changed, don't log it twice.
continue;
}
boolean isWaiting = describeTask(value, attemptId, task, false);
if (isWaiting) {
waiting.add(value.toString());
} else {
running.add(value.toString());
}
}
result.addAll(waiting);
result.addAll(running);
return result;
}
private boolean describeTask(
StringBuilder value, String attemptId, TaskWrapper task, boolean fromQueue) {
value.setLength(0);
boolean isFirst = true;
TaskRunnerCallable c = task.getTaskRunnerCallable();
value.append(attemptId);
if (c != null && c.getVertexSpec() != null) {
SignableVertexSpec fs = c.getVertexSpec();
value.append(isFirst ? " (" : ", ").append(c.getQueryId())
.append("/").append(fs.getVertexName()).append(c.isGuaranteed() ? ", guaranteed" : "");
if (fs.getDagName() != null) {
value.append(", dagName ").append(fs.getDagName());
}
isFirst = false;
}
value.append(isFirst ? " (" : ", ");
if (fromQueue) {
value.append("in queue (in order)");
}
boolean isWaiting;
if (task.isInWaitQueue()) {
isWaiting = true;
if (!fromQueue) {
value.append("in queue (not in order)");
}
} else if (c != null) {
long startTime = c.getStartTime();
isWaiting = false;
if (startTime != 0) {
value.append("started at ").append(sdf.get().format(new Date(startTime)));
} else {
value.append("not started");
}
} else {
isWaiting = true;
value.append("has no callable");
}
if (task.isInPreemptionQueue()) {
value.append(", ").append("in preemption queue");
}
boolean canFinish = c.canFinish();
value.append(", ").append(canFinish ? "can" : "cannot").append(" finish");
if (canFinish != c.canFinishForPriority()) {
value.append(" (not updated in queue)");
}
value.append(")");
return isWaiting;
}
/**
* Worker that takes tasks from wait queue and schedule it for execution.
*/
private final class WaitQueueWorker implements Runnable {
private static final long SANITY_CHECK_TIMEOUT_MS = 1000;
private TaskWrapper task;
private Long nextSanityCheck = null;
@Override
public void run() {
try {
Long lastKillTimeMs = null;
SanityChecker sc = null;
while (!isShutdown.get()) {
RejectedExecutionException rejectedException = null;
if (nextSanityCheck != null && ((nextSanityCheck - System.nanoTime()) <= 0)) {
sc = sanityCheckQueue(sc);
nextSanityCheck = null;
}
synchronized (lock) {
// Since schedule() can be called from multiple threads, we peek the wait queue, try
// scheduling the task and then remove the task if scheduling is successful. This
// will make sure the task's place in the wait queue is held until it gets scheduled.
task = waitQueue.peek();
if (task == null) {
waitOnLock();
continue;
}
// If the task cannot finish and if no slots are available then don't schedule it.
// Also don't wait if we have a task and we just killed something to schedule it.
// (numSlotsAvailable can go negative, if the callback after the thread completes is delayed)
boolean shouldWait = numSlotsAvailable.get() <= 0 && lastKillTimeMs == null;
boolean canKill = false;
if (task.canFinishForPriority() || task.isGuaranteed()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting to schedule task {}, canFinish={}. Current state: "
+ "preemptionQueueSize={}, numSlotsAvailable={}, waitQueueSize={}",
task.getRequestId(), task.getTaskRunnerCallable().canFinish(),
preemptionQueue.size(), numSlotsAvailable.get(), waitQueue.size());
}
canKill = enablePreemption && canPreempt(task, preemptionQueue.peek());
shouldWait = shouldWait && !canKill;
}
if (shouldWait) {
waitOnLock();
// Another task at a higher priority may have come in during the wait. Lookup the
// queue again to pick up the task at the highest priority.
continue;
}
nextSanityCheck = null; // We are going to do something useful now.
try {
tryScheduleUnderLock(task);
lastKillTimeMs = null; // We have filled the spot we may have killed for (if any).
} catch (RejectedExecutionException e) {
rejectedException = e;
}
} // synchronized (lock)
// Handle the rejection outside of the lock
if (rejectedException != null) {
if (lastKillTimeMs != null
&& (clock.getTime() - lastKillTimeMs) < PREEMPTION_KILL_GRACE_MS) {
// We killed something, but still got rejected. Wait a bit to give a chance to our
// previous victim to actually die.
synchronized (lock) {
lock.wait(PREEMPTION_KILL_GRACE_SLEEP_MS);
}
} else {
if (LOG.isDebugEnabled() && lastKillTimeMs != null) {
LOG.debug("Grace period ended for the previous kill; preemtping more tasks");
}
if (handleScheduleAttemptedRejection(task)) {
lastKillTimeMs = clock.getTime(); // We killed something.
}
}
}
}
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT
+ " thread has been interrupted after shutdown.");
} else {
LOG.warn(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " interrupted without shutdown", e);
throw new RuntimeException(e);
}
}
}
private void waitOnLock() throws InterruptedException {
if (isShutdown.get()) return;
nextSanityCheck = System.nanoTime() + SANITY_CHECK_TIMEOUT_MS * 1000000L;
lock.wait(SANITY_CHECK_TIMEOUT_MS);
}
}
private class WaitQueueWorkerCallback implements FutureCallback<Object> {
@Override
public void onSuccess(Object result) {
if (isShutdown.get()) {
LOG.info("Wait queue scheduler worker exited with success!");
} else {
LOG.error("Wait queue scheduler worker exited with success!");
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
new IllegalStateException("WaitQueue worked exited before shutdown"));
}
}
@Override
public void onFailure(Throwable t) {
LOG.error("Wait queue scheduler worker exited with failure!", t);
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
}
}
@Override
public SubmissionState schedule(TaskRunnerCallable task) {
TaskWrapper taskWrapper = new TaskWrapper(task, this);
SubmissionState result;
TaskWrapper evictedTask;
boolean canFinish;
synchronized (lock) {
// If the queue does not have capacity, it does not throw a Rejection. Instead it will
// return the task with the lowest priority, which could be the task which is currently being processed.
// TODO HIVE-11687 It's possible for a bunch of tasks to come in around the same time, without the
// actual executor threads picking up any work. This will lead to unnecessary rejection of tasks.
// The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots)
if (LOG.isDebugEnabled()) {
LOG.debug(
"Offering to wait queue with: waitQueueSize={}, numSlotsAvailable={}, runningFragmentCount={} ",
waitQueue.size(), numSlotsAvailable.get(),
runningFragmentCount.get());
}
canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
taskWrapper.updateCanFinishForPriority(canFinish); // Update the property before offering.
evictedTask = waitQueue.offer(taskWrapper, maxParallelExecutors - runningFragmentCount.get());
// Finishable state is checked on the task, via an explicit query to the TaskRunnerCallable
// null evicted task means offer accepted
// evictedTask is not equal taskWrapper means current task is accepted and it evicted
// some other task
if (evictedTask == null || !evictedTask.equals(taskWrapper)) {
knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
taskWrapper.setIsInWaitQueue(true);
task.setWmCountersQueued();
if (LOG.isDebugEnabled()) {
LOG.debug("{} added to wait queue. Current wait queue size={}", task.getRequestId(),
waitQueue.size());
}
result = evictedTask == null ? SubmissionState.ACCEPTED : SubmissionState.EVICTED_OTHER;
if (LOG.isDebugEnabled() && evictedTask != null) {
LOG.debug("Eviction: {} {} {}", taskWrapper, result, evictedTask);
}
} else {
if (LOG.isInfoEnabled()) {
LOG.info(
"wait queue full, size={}. numSlotsAvailable={}, runningFragmentCount={}. {} not added",
waitQueue.size(), numSlotsAvailable.get(), runningFragmentCount.get(), task.getRequestId());
}
evictedTask.getTaskRunnerCallable().killTask();
result = SubmissionState.REJECTED;
if (LOG.isDebugEnabled()) {
LOG.debug("{} is {} as wait queue is full", taskWrapper.getRequestId(), result);
}
metrics.incrTotalRejectedRequests();
return result;
}
// Register for notifications inside the lock. Should avoid races with unregisterForNotifications
// happens in a different Submission thread. i.e. Avoid register running for this task
// after some other submission has evicted it.
boolean stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
if (stateChanged) {
if (LOG.isDebugEnabled()) {
LOG.debug("Finishable state of {} updated to {} during registration for state updates",
taskWrapper.getRequestId(), !canFinish);
}
finishableStateUpdated(taskWrapper, !canFinish);
}
}
// At this point, the task has been added into the queue. It may have caused an eviction for
// some other task.
// This registration has to be done after knownTasks has been populated.
// Register for state change notifications so that the waitQueue can be re-ordered correctly
// if the fragment moves in or out of the finishable state.
LOG.debug("Wait Queue: {}", waitQueue);
if (evictedTask != null) {
LOG.info("{} evicted from wait queue in favor of {} because of lower priority", evictedTask.getRequestId(),
task.getRequestId());
if (LOG.isDebugEnabled()) { // detailed info about the decision
FragmentRuntimeInfo evictedInfo =
evictedTask.getTaskRunnerCallable().getFragmentRuntimeInfo();
FragmentRuntimeInfo taskInfo = task.getFragmentRuntimeInfo();
int knownPendingTasksForEvicted = evictedInfo.getNumSelfAndUpstreamTasks()
- evictedInfo.getNumSelfAndUpstreamCompletedTasks();
int knownPendingTasksForCurrent =
taskInfo.getNumSelfAndUpstreamTasks() - taskInfo.getNumSelfAndUpstreamCompletedTasks();
long firstAttemptStartTimeEvicted = evictedInfo.getFirstAttemptStartTime();
long firstAttemptStartTimeCurrent = taskInfo.getFirstAttemptStartTime();
LOG.debug(
"{} (guaranteed: {}, canFinishForPriority: {}, withinDagPriority: {}, currentAttemptStartTime: {}, "
+ "firstAttemptStartTime: {}, knownPending: {}) evicted from wait queue"
+ "in favor of {} (guaranteed: {}, canFinishForPriority: {}, withinDagPriority: {},"
+ " currentAttemptStartTime: {}, firstAttemptStartTime: {}, knownPending: {})"
+ "because of lower priority",
evictedTask.getRequestId(), evictedTask.isGuaranteed(), evictedTask.canFinishForPriority(),
evictedInfo.getWithinDagPriority(), evictedInfo.getCurrentAttemptStartTime(),
firstAttemptStartTimeEvicted, knownPendingTasksForEvicted, task.getRequestId(),
taskWrapper.isGuaranteed(), taskWrapper.canFinishForPriority(), taskInfo.getWithinDagPriority(),
taskInfo.getCurrentAttemptStartTime(), firstAttemptStartTimeCurrent,
knownPendingTasksForCurrent);
}
try {
knownTasks.remove(evictedTask.getRequestId());
evictedTask.maybeUnregisterForFinishedStateNotifications();
evictedTask.setIsInWaitQueue(false);
} finally {
// This is dealing with tasks from a different submission, and cause the kill
// to go out before the previous submissions has completed. Handled in the AM
evictedTask.getTaskRunnerCallable().killTask();
}
metrics.incrTotalEvictedFromWaitQueue();
}
synchronized (lock) {
lock.notifyAll();
}
metrics.setExecutorNumQueuedRequests(waitQueue.size());
return result;
}
@Override
public boolean updateFragment(String fragmentId, boolean isGuaranteed) {
synchronized (lock) {
TaskWrapper taskWrapper = knownTasks.get(fragmentId);
if (taskWrapper == null) {
LOG.debug("Fragment not found {}", fragmentId);
return false;
}
if (taskWrapper.isGuaranteed() == isGuaranteed) return true;
LOG.debug("Fragment {} guaranteed state changed to {}; finishable {}, in wait queue {}, "
+ "in preemption queue {}", taskWrapper.getRequestId(), isGuaranteed,
taskWrapper.canFinishForPriority(), taskWrapper.isInWaitQueue(),
taskWrapper.isInPreemptionQueue());
// Do the removal before we change the element, to avoid invalid queue ordering.
if (isGuaranteed && taskWrapper.isInPreemptionQueue() && taskWrapper.canFinishForPriority()) {
removeFromPreemptionQueue(taskWrapper);
}
if (taskWrapper.isInWaitQueue()) {
// Re-order the wait queue. Note: we assume that noone will take our capacity based
// on the fact that we are doing this under the epic lock. If the epic lock is removed,
// we'd need to do the steps under the queue lock; we could pass in a f() to update state.
boolean isRemoved = waitQueue.remove(taskWrapper);
taskWrapper.updateIsGuaranteed(isGuaranteed);
forceReinsertIntoQueue(taskWrapper, isRemoved);
} else {
taskWrapper.updateIsGuaranteed(isGuaranteed);
if (!isGuaranteed && !taskWrapper.isInPreemptionQueue()) {
// No need to check finishable here; if it was set it would already be in the queue.
addToPreemptionQueue(taskWrapper);
}
}
lock.notifyAll();
return true;
}
}
private void forceReinsertIntoQueue(TaskWrapper taskWrapper, boolean isRemoved) {
if (!isRemoved) {
LOG.warn("Failed to remove {} from waitQueue", taskWrapper.getTaskRunnerCallable());
} else {
waitQueue.forceOffer(taskWrapper);
}
}
@Override
public QueryIdentifier findQueryByFragment(String fragmentId) {
synchronized (lock) {
TaskWrapper taskWrapper = knownTasks.get(fragmentId);
return taskWrapper == null ? null : taskWrapper.getTaskRunnerCallable()
.getFragmentInfo().getQueryInfo().getQueryIdentifier();
}
}
@Override
public void killFragment(String fragmentId) {
synchronized (lock) {
TaskWrapper taskWrapper = knownTasks.remove(fragmentId);
// Can be null since the task may have completed meanwhile.
if (taskWrapper != null) {
if (taskWrapper.isInWaitQueue()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing {} from waitQueue", fragmentId);
}
taskWrapper.setIsInWaitQueue(false);
taskWrapper.getTaskRunnerCallable().setWmCountersDone();
if (waitQueue.remove(taskWrapper)) {
metrics.setExecutorNumQueuedRequests(waitQueue.size());
}
}
if (taskWrapper.isInPreemptionQueue()) {
LOG.debug("Removing {} from preemptionQueue", fragmentId);
removeFromPreemptionQueue(taskWrapper);
}
taskWrapper.getTaskRunnerCallable().setWmCountersDone();
// TODO: this will probably send a message to AM. Is that needed here?
taskWrapper.getTaskRunnerCallable().killTask();
} else {
LOG.info("Ignoring killFragment request for {} since it isn't known", fragmentId);
}
lock.notifyAll();
}
}
private static final class FragmentCompletion {
public FragmentCompletion(
State state, long completingTime) {
this.state = state;
this.completingTime = completingTime;
}
State state;
long completingTime;
}
/** A map where fragments live between when the TezTaskRunner2 calls taskSucceeded/Failed, or
* an external caller calls killTask, and when the TaskRunnerCallable comes out of the
* threadpool.
*/
@VisibleForTesting
final ConcurrentMap<String, FragmentCompletion>
completingFragmentMap = new ConcurrentHashMap<>();
@Override
public void fragmentCompleting(String fragmentId, State state) {
int val = 0;
do {
// Tez internals may register the same task as completing multiple times.
val = runningFragmentCount.get();
if (val == 0) {
LOG.warn("RunningFragmentCount is already 0. Multiple calls for the same completion.");
return;
}
} while (!runningFragmentCount.compareAndSet(val, val - 1));
completingFragmentMap.put(fragmentId, new FragmentCompletion(state, clock.getTime()));
}
@VisibleForTesting
/** Assumes the epic lock is already taken. */
void tryScheduleUnderLock(final TaskWrapper taskWrapper) throws RejectedExecutionException {
LOG.info("Attempting to execute {}", taskWrapper);
TaskRunnerCallable task = taskWrapper.getTaskRunnerCallable();
ListenableFuture<TaskRunner2Result> future = executorService.submit(task);
task.setWmCountersRunning();
runningFragmentCount.incrementAndGet();
taskWrapper.setIsInWaitQueue(false);
FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener(
taskWrapper);
// Callback on a separate thread so that when a task completes, the thread in the main queue
// is actually available for execution and will not potentially result in a RejectedExecution
Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService);
boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(),
isGuaranteed = taskWrapper.isGuaranteed();
if (LOG.isDebugEnabled()) {
LOG.debug("{} scheduled for execution. canFinish={}, isGuaranteed={}",
taskWrapper.getRequestId(), canFinish, isGuaranteed);
}
// only tasks that cannot finish immediately are pre-emptable. In other words, if all inputs
// to the tasks are not ready yet, the task is eligible for pre-emptable.
if (enablePreemption) {
if (!canFinish || !isGuaranteed) {
LOG.info("Adding {} to pre-emption queue", taskWrapper.getRequestId());
addToPreemptionQueue(taskWrapper);
}
}
numSlotsAvailable.decrementAndGet();
metrics.setNumExecutorsAvailable(numSlotsAvailable.get());
// Wait queue could have been re-ordered in the mean time because of concurrent task
// submission. So remove the specific task instead of the head task.
if (waitQueue.remove(taskWrapper)) {
metrics.setExecutorNumQueuedRequests(waitQueue.size());
}
}
private boolean handleScheduleAttemptedRejection(TaskWrapper rejected) {
// TODO: is this check even needed given what the caller checks?
if (!enablePreemption || preemptionQueue.isEmpty()) {
return false;
}
LOG.debug("Preemption Queue: {}", preemptionQueue);
// This call checks under lock if we can actually preempt the task.
// It is possible to have a race where the update (that's also under lock) makes the
// task finishable or guaranteed between the remove and kill, but it's the same timing
// issue as would happen is there was a tiny delay on the network, so we don't care.
TaskWrapper victim = getSuitableVictimFromPreemptionQueue(rejected);
if (victim == null) {
return false; // Woe us.
}
LOG.info("Invoking kill task for {} due to pre-emption to run {}", victim.getRequestId(), rejected.getRequestId());
// The task will either be killed or is already in the process of completing, which will
// trigger the next scheduling run, or result in available slots being higher than 0,
// which will cause the scheduler loop to continue.
victim.getTaskRunnerCallable().killTask();
// We've killed something and may want to wait for it to die.
return true;
}
private static class SanityChecker implements Function<TaskWrapper, Boolean> {
private TaskWrapper firstCannotFinish = null;
private TaskWrapper firstProblematic = null;
private final EvictingPriorityBlockingQueue<TaskWrapper> queue;
public SanityChecker(EvictingPriorityBlockingQueue<TaskWrapper> queue) {
this.queue = queue;
}
@Override
public Boolean apply(TaskWrapper input) {
if (input == null) return true;
boolean canFinish = input.getTaskRunnerCallable().canFinishForPriority();
if (firstCannotFinish == null && !canFinish) {
firstCannotFinish = input;
return true;
}
if (firstCannotFinish != null && canFinish) {
firstProblematic = input;
return false;
}
return true;
}
void run() {
queue.apply(this);
if (firstProblematic != null) {
final StringBuilder sb = new StringBuilder(
"Found finishable task behind non-finishable in the queue: ");
sb.append(firstProblematic).append(" was after ").append(firstCannotFinish).append("; ");
queue.apply(new Function<TaskExecutorService.TaskWrapper, Boolean>() {
@Override
public Boolean apply(TaskWrapper input) {
sb.append(input).append(", ");
return true;
}
});
LOG.error(sb.toString());
}
firstCannotFinish = firstProblematic = null;
}
}
private SanityChecker sanityCheckQueue(SanityChecker sc) {
if (sc == null) {
sc = new SanityChecker(waitQueue);
}
sc.run();
return sc;
}
@VisibleForTesting
void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) {
synchronized (lock) {
LOG.debug("Fragment {} guaranteed state changed to {}; finishable {}, in wait queue {}, "
+ "in preemption queue {}", taskWrapper.getRequestId(), taskWrapper.isGuaranteed(),
newFinishableState, taskWrapper.isInWaitQueue(), taskWrapper.isInPreemptionQueue());
// Do the removal before we change the element, to avoid invalid queue ordering.
if (newFinishableState && taskWrapper.isInPreemptionQueue() && taskWrapper.isGuaranteed()) {
removeFromPreemptionQueue(taskWrapper);
}
if (taskWrapper.isInWaitQueue()) {
// Re-order the wait queue. Note: we assume that noone will take our capacity based
// on the fact that we are doing this under the epic lock. If the epic lock is removed,
// we'd need to do the steps under the queue lock; we could pass in a f() to update state.
boolean isRemoved = waitQueue.remove(taskWrapper);
taskWrapper.updateCanFinishForPriority(newFinishableState);
forceReinsertIntoQueue(taskWrapper, isRemoved);
} else {
// if speculative task, any finishable state change should re-order the queue as speculative tasks are always
// not-guaranteed (re-order helps put non-finishable's ahead of finishable)
if (!taskWrapper.isGuaranteed()) {
removeFromPreemptionQueue(taskWrapper);
taskWrapper.updateCanFinishForPriority(newFinishableState);
addToPreemptionQueue(taskWrapper);
} else {
// if guaranteed task, if the finishable state changed to non-finishable and if the task doesn't exist
// pre-emption queue, then add it so that it becomes candidate to kill
taskWrapper.updateCanFinishForPriority(newFinishableState);
if (!newFinishableState && !taskWrapper.isInPreemptionQueue()) {
// No need to check guaranteed here; if it was false we would already be in the queue.
addToPreemptionQueue(taskWrapper);
}
}
}
lock.notifyAll();
}
}
private void addToPreemptionQueue(TaskWrapper taskWrapper) {
if (taskWrapper.isInPreemptionQueue()) {
return;
}
synchronized (lock) {
insertIntoPreemptionQueueOrFailUnlocked(taskWrapper);
taskWrapper.setIsInPreemptableQueue(true);
metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
}
}
private void insertIntoPreemptionQueueOrFailUnlocked(TaskWrapper taskWrapper) {
boolean added = preemptionQueue.offer(taskWrapper);
if (!added) {
LOG.warn("Failed to add element {} to preemption queue. Terminating", taskWrapper);
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
new IllegalStateException("Preemption queue full. Cannot proceed"));
}
}
/**
* Remove the specified taskWrapper from the preemption queue
* @param taskWrapper the taskWrapper to be removed
* @return true if the element existed in the queue and wasa removed, false otherwise
*/
private boolean removeFromPreemptionQueue(TaskWrapper taskWrapper) {
synchronized (lock) {
return removeFromPreemptionQueueUnlocked(taskWrapper);
}
}
private boolean removeFromPreemptionQueueUnlocked(
TaskWrapper taskWrapper) {
boolean removed = preemptionQueue.remove(taskWrapper);
taskWrapper.setIsInPreemptableQueue(false);
metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
return removed;
}
private TaskWrapper getSuitableVictimFromPreemptionQueue(TaskWrapper candidate) {
TaskWrapper taskWrapper;
synchronized (lock) {
taskWrapper = preemptionQueue.poll();
// Note that the code updating the state of the task does it when it's out of the queue.
// So, the priorities in the queue should be correct; if the top task is not killable then
// no task in the queue would be killable.
if (taskWrapper == null) return null;
if (!canPreempt(candidate, taskWrapper)) {
// The "most preemptable" task is still too important for us to kill. Put it back.
insertIntoPreemptionQueueOrFailUnlocked(taskWrapper);
return null;
}
taskWrapper.setIsInPreemptableQueue(false);
metrics.setExecutorNumPreemptableRequests(preemptionQueue.size());
return taskWrapper;
}
}
/**
* Victim Task (A) should be preempted in favor of a candidate Task (B) when:
* 1. A is NOT on the same Vertex as B AND
* 1.1. B is a Guaranteed Task while A is not OR
* 1.2. Both are guaranteed but A is not finishable and B is
* To make sure that Victim task is not behind some upstream updates (asynchronous),
* we check its sources' state (by QueryFragmentInfo.canFinish method)
* @param candidate Task
* @param victim Task
* @return True when victim should be preempted in favor of candidate Task
*/
private static boolean canPreempt(TaskWrapper candidate, TaskWrapper victim) {
if (victim == null) return false;
SignableVertexSpec candVrtx = candidate.getTaskRunnerCallable().getFragmentInfo().getVertexSpec();
SignableVertexSpec vicVrtx = victim.getTaskRunnerCallable().getFragmentInfo().getVertexSpec();
if (candVrtx.getHiveQueryId().equals(vicVrtx.getHiveQueryId()) &&
candVrtx.getVertexIndex() == vicVrtx.getVertexIndex()) return false;
if (candidate.isGuaranteed() && !victim.isGuaranteed()) return true;
return ((candidate.isGuaranteed() == victim.isGuaranteed())
&& candidate.canFinishForPriority() && !victim.getTaskRunnerCallable().canFinish());
}
@VisibleForTesting
InternalCompletionListener createInternalCompletionListener(TaskWrapper taskWrapper) {
return new InternalCompletionListener(taskWrapper);
}
@VisibleForTesting
class InternalCompletionListener implements
FutureCallback<TaskRunner2Result> {
private final TaskWrapper taskWrapper;
public InternalCompletionListener(TaskWrapper taskWrapper) {
this.taskWrapper = taskWrapper;
}
// By the time either success / failed are called, the task itself knows that it has terminated,
// and will ignore subsequent kill requests if they go out.
// There's a race between removing the current task from the preemption queue and the actual scheduler