-
Notifications
You must be signed in to change notification settings - Fork 13.6k
/
TaskManager.java
1581 lines (1388 loc) · 71.2 KB
/
TaskManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.intersection;
import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
public class TaskManager {
// initialize the task list
// activeTasks needs to be concurrent as it can be accessed
// by QueryableState
private final Logger log;
private final Time time;
private final Tasks tasks;
private final UUID processId;
private final String logPrefix;
private final Admin adminClient;
private final StateDirectory stateDirectory;
private final ProcessingMode processingMode;
private final ChangelogReader changelogReader;
private final TopologyMetadata topologyMetadata;
private final TaskExecutor taskExecutor;
private Consumer<byte[], byte[]> mainConsumer;
private DeleteRecordsResult deleteRecordsResult;
private boolean rebalanceInProgress = false; // if we are in the middle of a rebalance, it is not safe to commit
// includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
private final ActiveTaskCreator activeTaskCreator;
private final StandbyTaskCreator standbyTaskCreator;
private final StateUpdater stateUpdater;
TaskManager(final Time time,
final ChangelogReader changelogReader,
final UUID processId,
final String logPrefix,
final ActiveTaskCreator activeTaskCreator,
final StandbyTaskCreator standbyTaskCreator,
final TopologyMetadata topologyMetadata,
final Admin adminClient,
final StateDirectory stateDirectory,
final StateUpdater stateUpdater) {
this.time = time;
this.processId = processId;
this.logPrefix = logPrefix;
this.adminClient = adminClient;
this.stateDirectory = stateDirectory;
this.changelogReader = changelogReader;
this.topologyMetadata = topologyMetadata;
this.activeTaskCreator = activeTaskCreator;
this.standbyTaskCreator = standbyTaskCreator;
this.processingMode = topologyMetadata.processingMode();
final LogContext logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());
this.stateUpdater = stateUpdater;
this.tasks = new Tasks(logContext);
this.taskExecutor = new TaskExecutor(
tasks,
this,
topologyMetadata.taskExecutionMetadata(),
logContext
);
}
void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
this.mainConsumer = mainConsumer;
}
public double totalProducerBlockedTime() {
return activeTaskCreator.totalProducerBlockedTime();
}
public UUID processId() {
return processId;
}
public TopologyMetadata topologyMetadata() {
return topologyMetadata;
}
Consumer<byte[], byte[]> mainConsumer() {
return mainConsumer;
}
StreamsProducer streamsProducerForTask(final TaskId taskId) {
return activeTaskCreator.streamsProducerForTask(taskId);
}
StreamsProducer threadProducer() {
return activeTaskCreator.threadProducer();
}
boolean isRebalanceInProgress() {
return rebalanceInProgress;
}
void handleRebalanceStart(final Set<String> subscribedTopics) {
topologyMetadata.addSubscribedTopicsFromMetadata(subscribedTopics, logPrefix);
tryToLockAllNonEmptyTaskDirectories();
rebalanceInProgress = true;
}
void handleRebalanceComplete() {
// we should pause consumer only within the listener since
// before then the assignment has not been updated yet.
mainConsumer.pause(mainConsumer.assignment());
releaseLockedUnassignedTaskDirectories();
rebalanceInProgress = false;
}
/**
* @throws TaskMigratedException
*/
boolean handleCorruption(final Set<TaskId> corruptedTasks) {
final Set<Task> corruptedActiveTasks = new HashSet<>();
final Set<Task> corruptedStandbyTasks = new HashSet<>();
for (final TaskId taskId : corruptedTasks) {
final Task task = tasks.task(taskId);
if (task.isActive()) {
corruptedActiveTasks.add(task);
} else {
corruptedStandbyTasks.add(task);
}
}
// Make sure to clean up any corrupted standby tasks in their entirety before committing
// since TaskMigrated can be thrown and the resulting handleLostAll will only clean up active tasks
closeDirtyAndRevive(corruptedStandbyTasks, true);
// We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort
try {
final Collection<Task> tasksToCommit = allTasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
.filter(t -> !corruptedTasks.contains(t.id()))
.collect(Collectors.toSet());
commitTasksAndMaybeUpdateCommittableOffsets(tasksToCommit, new HashMap<>());
} catch (final TaskCorruptedException e) {
log.info("Some additional tasks were found corrupted while trying to commit, these will be added to the " +
"tasks to clean and revive: {}", e.corruptedTasks());
corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
} catch (final TimeoutException e) {
log.info("Hit TimeoutException when committing all non-corrupted tasks, these will be closed and revived");
final Collection<Task> uncorruptedTasks = new HashSet<>(tasks.activeTasks());
uncorruptedTasks.removeAll(corruptedActiveTasks);
// Those tasks which just timed out can just be closed dirty without marking changelogs as corrupted
closeDirtyAndRevive(uncorruptedTasks, false);
}
closeDirtyAndRevive(corruptedActiveTasks, true);
return !corruptedActiveTasks.isEmpty();
}
private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, final boolean markAsCorrupted) {
for (final Task task : taskWithChangelogs) {
final Collection<TopicPartition> corruptedPartitions = task.changelogPartitions();
// mark corrupted partitions to not be checkpointed, and then close the task as dirty
if (markAsCorrupted) {
task.markChangelogAsCorrupted(corruptedPartitions);
}
try {
// we do not need to take the returned offsets since we are not going to commit anyways;
// this call is only used for active tasks to flush the cache before suspending and
// closing the topology
task.prepareCommit();
} catch (final RuntimeException swallow) {
log.error("Error flushing cache for corrupted task {} ", task.id(), swallow);
}
try {
task.suspend();
// we need to enforce a checkpoint that removes the corrupted partitions
if (markAsCorrupted) {
task.postCommit(true);
}
} catch (final RuntimeException swallow) {
log.error("Error suspending corrupted task {} ", task.id(), swallow);
}
task.closeDirty();
// For active tasks pause their input partitions so we won't poll any more records
// for this task until it has been re-initialized;
// Note, closeDirty already clears the partition-group for the task.
if (task.isActive()) {
final Set<TopicPartition> currentAssignment = mainConsumer.assignment();
final Set<TopicPartition> taskInputPartitions = task.inputPartitions();
final Set<TopicPartition> assignedToPauseAndReset =
intersection(HashSet::new, currentAssignment, taskInputPartitions);
if (!assignedToPauseAndReset.equals(taskInputPartitions)) {
log.warn(
"Expected the current consumer assignment {} to contain the input partitions {}. " +
"Will proceed to recover.",
currentAssignment,
taskInputPartitions
);
}
task.addPartitionsForOffsetReset(assignedToPauseAndReset);
}
task.revive();
}
}
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
* @throws StreamsException fatal error while creating / initializing the task
*
* public for upgrade testing only
*/
public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks) {
log.info("Handle new assignment with:\n" +
"\tNew active tasks: {}\n" +
"\tNew standby tasks: {}\n" +
"\tExisting active tasks: {}\n" +
"\tExisting standby tasks: {}",
activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
topologyMetadata.addSubscribedTopicsFromAssignment(
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
logPrefix
);
final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new HashMap<>(activeTasks);
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new HashMap<>(standbyTasks);
final Map<Task, Set<TopicPartition>> tasksToRecycle = new HashMap<>();
final Set<Task> tasksToCloseClean = new TreeSet<>(Comparator.comparing(Task::id));
// first put aside those unrecognized tasks because of unknown named-topologies
tasks.clearPendingTasksToCreate();
tasks.addPendingActiveTasksToCreate(pendingTasksToCreate(activeTasksToCreate));
tasks.addPendingStandbyTasksToCreate(pendingTasksToCreate(standbyTasksToCreate));
// first rectify all existing tasks:
// 1. for tasks that are already owned, just update input partitions / resume and skip re-creating them
// 2. for tasks that have changed active/standby status, just recycle and skip re-creating them
// 3. otherwise, close them since they are no longer owned
if (stateUpdater == null) {
classifyTasksWithoutStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
} else {
classifyTasksWithStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
}
final Map<TaskId, RuntimeException> taskCloseExceptions = closeAndRecycleTasks(tasksToRecycle, tasksToCloseClean);
throwTaskExceptions(taskCloseExceptions);
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
}
private void throwTaskExceptions(final Map<TaskId, RuntimeException> taskExceptions) {
if (!taskExceptions.isEmpty()) {
log.error("Get exceptions for the following tasks: {}", taskExceptions);
for (final Map.Entry<TaskId, RuntimeException> entry : taskExceptions.entrySet()) {
if (!(entry.getValue() instanceof TaskMigratedException)) {
final TaskId taskId = entry.getKey();
final RuntimeException exception = entry.getValue();
if (exception instanceof StreamsException) {
((StreamsException) exception).setTaskId(taskId);
throw exception;
} else if (exception instanceof KafkaException) {
throw new StreamsException(exception, taskId);
} else {
throw new StreamsException(
"Unexpected failure to close " + taskExceptions.size() +
" task(s) [" + taskExceptions.keySet() + "]. " +
"First unexpected exception (for task " + taskId + ") follows.",
exception,
taskId
);
}
}
}
// If all exceptions are task-migrated, we would just throw the first one. No need to wrap with a
// StreamsException since TaskMigrated is handled explicitly by the StreamThread
final Map.Entry<TaskId, RuntimeException> first = taskExceptions.entrySet().iterator().next();
throw first.getValue();
}
}
private void createNewTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate) {
final Collection<Task> newActiveTasks = activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate);
final Collection<Task> newStandbyTask = standbyTaskCreator.createTasks(standbyTasksToCreate);
if (stateUpdater == null) {
tasks.addNewActiveTasks(newActiveTasks);
tasks.addNewStandbyTasks(newStandbyTask);
} else {
tasks.addPendingTaskToInit(newActiveTasks);
tasks.addPendingTaskToInit(newStandbyTask);
}
}
private void classifyTasksWithoutStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
final Map<Task, Set<TopicPartition>> tasksToRecycle,
final Set<Task> tasksToCloseClean) {
for (final Task task : tasks.allTasks()) {
final TaskId taskId = task.id();
if (activeTasksToCreate.containsKey(taskId)) {
if (task.isActive()) {
final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
}
task.resume();
} else {
tasksToRecycle.put(task, activeTasksToCreate.get(taskId));
}
activeTasksToCreate.remove(taskId);
} else if (standbyTasksToCreate.containsKey(taskId)) {
if (!task.isActive()) {
final Set<TopicPartition> topicPartitions = standbyTasksToCreate.get(taskId);
task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
task.resume();
} else {
tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
}
standbyTasksToCreate.remove(taskId);
} else {
tasksToCloseClean.add(task);
}
}
}
private void classifyRunningTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
final Map<Task, Set<TopicPartition>> tasksToRecycle,
final Set<Task> tasksToCloseClean) {
for (final Task task : tasks.allTasks()) {
final TaskId taskId = task.id();
if (activeTasksToCreate.containsKey(taskId)) {
if (task.isActive()) {
final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) {
task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
}
task.resume();
} else {
throw new IllegalStateException("Standby tasks should only be managed by the state updater");
}
activeTasksToCreate.remove(taskId);
} else if (standbyTasksToCreate.containsKey(taskId)) {
if (!task.isActive()) {
throw new IllegalStateException("Standby tasks should only be managed by the state updater");
} else {
tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
}
standbyTasksToCreate.remove(taskId);
} else {
tasksToCloseClean.add(task);
}
}
}
private void classifyTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate,
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate,
final Map<Task, Set<TopicPartition>> tasksToRecycle,
final Set<Task> tasksToCloseClean) {
classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
for (final Task task : stateUpdater.getTasks()) {
final TaskId taskId = task.id();
final Set<TopicPartition> topicPartitions = activeTasksToCreate.get(taskId);
if (activeTasksToCreate.containsKey(taskId)) {
if (task.isActive()) {
if (!task.inputPartitions().equals(topicPartitions)) {
stateUpdater.remove(taskId);
tasks.addPendingTaskToUpdateInputPartitions(taskId, topicPartitions);
}
} else {
stateUpdater.remove(taskId);
tasks.addPendingTaskToRecycle(taskId, topicPartitions);
}
activeTasksToCreate.remove(taskId);
} else if (standbyTasksToCreate.containsKey(taskId)) {
if (!task.isActive()) {
if (!task.inputPartitions().equals(topicPartitions)) {
stateUpdater.remove(taskId);
tasks.addPendingTaskToUpdateInputPartitions(taskId, topicPartitions);
}
} else {
stateUpdater.remove(taskId);
tasks.addPendingTaskToRecycle(taskId, topicPartitions);
}
standbyTasksToCreate.remove(taskId);
} else {
stateUpdater.remove(taskId);
tasks.addPendingTaskToCloseClean(taskId);
}
}
}
private Map<TaskId, Set<TopicPartition>> pendingTasksToCreate(final Map<TaskId, Set<TopicPartition>> tasksToCreate) {
final Map<TaskId, Set<TopicPartition>> pendingTasks = new HashMap<>();
final Iterator<Map.Entry<TaskId, Set<TopicPartition>>> iter = tasksToCreate.entrySet().iterator();
while (iter.hasNext()) {
final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
final TaskId taskId = entry.getKey();
if (taskId.topologyName() != null && !topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
log.info("Cannot create the assigned task {} since it's topology name cannot be recognized, will put it " +
"aside as pending for now and create later when topology metadata gets refreshed", taskId);
pendingTasks.put(taskId, entry.getValue());
iter.remove();
}
}
return pendingTasks;
}
private Map<TaskId, RuntimeException> closeAndRecycleTasks(final Map<Task, Set<TopicPartition>> tasksToRecycle,
final Set<Task> tasksToCloseClean) {
final Map<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
// for all tasks to close or recycle, we should first write a checkpoint as in post-commit
final List<Task> tasksToCheckpoint = new ArrayList<>(tasksToCloseClean);
tasksToCheckpoint.addAll(tasksToRecycle.keySet());
for (final Task task : tasksToCheckpoint) {
try {
// Note that we are not actually committing here but just check if we need to write checkpoint file:
// 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully,
// and their changelog positions should not change at all postCommit would not write the checkpoint again.
// 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably
// write the checkpoint file.
final Map<TopicPartition, OffsetAndMetadata> offsets = task.prepareCommit();
if (!offsets.isEmpty()) {
log.error("Task {} should have been committed when it was suspended, but it reports non-empty " +
"offsets {} to commit; this means it failed during last commit and hence should be closed dirty",
task.id(), offsets);
tasksToCloseDirty.add(task);
} else if (!task.isActive()) {
// For standby tasks, always try to first suspend before committing (checkpointing) it;
// Since standby tasks do not actually need to commit offsets but only need to
// flush / checkpoint state stores, so we only need to call postCommit here.
task.suspend();
task.postCommit(true);
}
} catch (final RuntimeException e) {
final String uncleanMessage = String.format(
"Failed to checkpoint task %s. Attempting to close remaining tasks before re-throwing:",
task.id());
log.error(uncleanMessage, e);
taskCloseExceptions.putIfAbsent(task.id(), e);
// We've already recorded the exception (which is the point of clean).
// Now, we should go ahead and complete the close because a half-closed task is no good to anyone.
tasksToCloseDirty.add(task);
}
}
tasksToCloseClean.removeAll(tasksToCloseDirty);
for (final Task task : tasksToCloseClean) {
try {
closeTaskClean(task);
} catch (final RuntimeException closeTaskException) {
final String uncleanMessage = String.format(
"Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:",
task.id());
log.error(uncleanMessage, closeTaskException);
if (task.state() != State.CLOSED) {
tasksToCloseDirty.add(task);
}
taskCloseExceptions.putIfAbsent(task.id(), closeTaskException);
}
}
tasksToRecycle.keySet().removeAll(tasksToCloseDirty);
for (final Map.Entry<Task, Set<TopicPartition>> entry : tasksToRecycle.entrySet()) {
final Task oldTask = entry.getKey();
final Set<TopicPartition> inputPartitions = entry.getValue();
try {
if (oldTask.isActive()) {
final StandbyTask standbyTask = convertActiveToStandby((StreamTask) oldTask, inputPartitions);
tasks.replaceActiveWithStandby(standbyTask);
} else {
final StreamTask activeTask = convertStandbyToActive((StandbyTask) oldTask, inputPartitions);
tasks.replaceStandbyWithActive(activeTask);
}
} catch (final RuntimeException e) {
final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " +
"Attempting to close remaining tasks before re-throwing:", oldTask.id());
log.error(uncleanMessage, e);
taskCloseExceptions.putIfAbsent(oldTask.id(), e);
tasksToCloseDirty.add(oldTask);
}
}
// for tasks that cannot be cleanly closed or recycled, close them dirty
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task);
}
return taskCloseExceptions;
}
private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Set<TopicPartition> partitions) {
final StandbyTask standbyTask = standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());
return standbyTask;
}
private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set<TopicPartition> partitions) {
return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer);
}
/**
* Tries to initialize any new or still-uninitialized tasks, then checks if they can/have completed restoration.
*
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
* @return {@code true} if all tasks are fully restored
*/
boolean tryToCompleteRestoration(final long now, final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
boolean allRunning = true;
if (stateUpdater == null) {
final List<Task> activeTasks = new LinkedList<>();
for (final Task task : tasks.allTasks()) {
try {
task.initializeIfNeeded();
task.clearTaskTimeout();
} catch (final LockException lockException) {
// it is possible that if there are multiple threads within the instance that one thread
// trying to grab the task from the other, while the other has not released the lock since
// it did not participate in the rebalance. In this case we can just retry in the next iteration
log.debug("Could not initialize task {} since: {}; will retry", task.id(), lockException.getMessage());
allRunning = false;
} catch (final TimeoutException timeoutException) {
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
allRunning = false;
}
if (task.isActive()) {
activeTasks.add(task);
}
}
if (allRunning && !activeTasks.isEmpty()) {
final Set<TopicPartition> restored = changelogReader.completedChangelogs();
for (final Task task : activeTasks) {
if (restored.containsAll(task.changelogPartitions())) {
try {
task.completeRestoration(offsetResetter);
task.clearTaskTimeout();
} catch (final TimeoutException timeoutException) {
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
log.debug(
String.format(
"Could not complete restoration for %s due to the following exception; will retry",
task.id()),
timeoutException
);
allRunning = false;
}
} else {
// we found a restoring task that isn't done restoring, which is evidence that
// not all tasks are running
allRunning = false;
}
}
}
} else {
addTasksToStateUpdater();
handleRemovedTasksFromStateUpdater();
// TODO: should add logic for checking and resuming when all active tasks have been restored
}
if (allRunning) {
// we can call resume multiple times since it is idempotent.
mainConsumer.resume(mainConsumer.assignment());
}
return allRunning;
}
private void addTasksToStateUpdater() {
for (final Task task : tasks.drainPendingTaskToInit()) {
task.initializeIfNeeded();
stateUpdater.add(task);
}
}
private void handleRemovedTasksFromStateUpdater() {
final Map<TaskId, RuntimeException> taskExceptions = new LinkedHashMap<>();
final Set<Task> tasksToCloseDirty = new TreeSet<>(Comparator.comparing(Task::id));
for (final Task task : stateUpdater.drainRemovedTasks()) {
final TaskId taskId = task.id();
Set<TopicPartition> inputPartitions;
if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) {
try {
final Task newTask = task.isActive() ?
convertActiveToStandby((StreamTask) task, inputPartitions) :
convertStandbyToActive((StandbyTask) task, inputPartitions);
newTask.initializeIfNeeded();
stateUpdater.add(newTask);
} catch (final RuntimeException e) {
final String uncleanMessage = String.format("Failed to recycle task %s cleanly. " +
"Attempting to handle remaining tasks before re-throwing:", taskId);
log.error(uncleanMessage, e);
if (task.state() != State.CLOSED) {
tasksToCloseDirty.add(task);
}
taskExceptions.putIfAbsent(taskId, e);
}
} else if (tasks.removePendingTaskToCloseClean(task.id())) {
try {
task.suspend();
task.closeClean();
if (task.isActive()) {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
}
} catch (final RuntimeException e) {
final String uncleanMessage = String.format("Failed to close task %s cleanly. " +
"Attempting to handle remaining tasks before re-throwing:", task.id());
log.error(uncleanMessage, e);
if (task.state() != State.CLOSED) {
tasksToCloseDirty.add(task);
}
taskExceptions.putIfAbsent(task.id(), e);
}
} else if (tasks.removePendingTaskToCloseDirty(task.id())) {
tasksToCloseDirty.add(task);
} else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
stateUpdater.add(task);
} else {
throw new IllegalStateException("Got a removed task " + task.id() + " from the state updater " +
" that is not for recycle, closing, or updating input partitions; this should not happen");
}
}
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task);
}
throwTaskExceptions(taskExceptions);
}
/**
* Handle the revoked partitions and prepare for closing the associated tasks in {@link #handleAssignment(Map, Map)}
* We should commit the revoking tasks first before suspending them as we will not officially own them anymore when
* {@link #handleAssignment(Map, Map)} is called. Note that only active task partitions are passed in from the
* rebalance listener, so we only need to consider/commit active tasks here
*
* If eos-v2 is used, we must commit ALL tasks. Otherwise, we can just commit those (active) tasks which are revoked
*
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
final Set<TopicPartition> remainingRevokedPartitions = new HashSet<>(revokedPartitions);
final Set<Task> revokedActiveTasks = new HashSet<>();
final Set<Task> commitNeededActiveTasks = new HashSet<>();
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
for (final Task task : activeTaskIterable()) {
if (remainingRevokedPartitions.containsAll(task.inputPartitions())) {
// when the task input partitions are included in the revoked list,
// this is an active task and should be revoked
revokedActiveTasks.add(task);
remainingRevokedPartitions.removeAll(task.inputPartitions());
} else if (task.commitNeeded()) {
commitNeededActiveTasks.add(task);
}
}
removeRevokedTasksFromStateUpdater(remainingRevokedPartitions);
if (!remainingRevokedPartitions.isEmpty()) {
log.debug("The following revoked partitions {} are missing from the current task partitions. It could "
+ "potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks " +
"have been cleaned up by the handleAssignment callback.", remainingRevokedPartitions);
}
prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask);
// if we need to commit any revoking task then we just commit all of those needed committing together
final boolean shouldCommitAdditionalTasks = !consumedOffsetsPerTask.isEmpty();
if (shouldCommitAdditionalTasks) {
prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask);
}
// even if commit failed, we should still continue and complete suspending those tasks, so we would capture
// any exception and rethrow it at the end. some exceptions may be handled immediately and then swallowed,
// as such we just need to skip those dirty tasks in the checkpoint
final Set<Task> dirtyTasks = new HashSet<>();
try {
// in handleRevocation we must call commitOffsetsOrTransaction() directly rather than
// commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the
// offset commit because we are in a rebalance
taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
} catch (final TaskCorruptedException e) {
log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
e.corruptedTasks());
// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
closeDirtyAndRevive(dirtyTasks, true);
} catch (final TimeoutException e) {
log.warn("Timed out while trying to commit all tasks during revocation, these will be cleaned and revived");
// If we hit a TimeoutException it must be ALOS, just close dirty and revive without wiping the state
dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
closeDirtyAndRevive(dirtyTasks, false);
} catch (final RuntimeException e) {
log.error("Exception caught while committing those revoked tasks " + revokedActiveTasks, e);
firstException.compareAndSet(null, e);
dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
}
// we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is
// going to be closed we would checkpoint by then
for (final Task task : revokedActiveTasks) {
if (!dirtyTasks.contains(task)) {
try {
task.postCommit(true);
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task " + task.id(), e);
maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
}
}
}
if (shouldCommitAdditionalTasks) {
for (final Task task : commitNeededActiveTasks) {
if (!dirtyTasks.contains(task)) {
try {
// for non-revoking active tasks, we should not enforce checkpoint
// since if it is EOS enabled, no checkpoint should be written while
// the task is in RUNNING tate
task.postCommit(false);
} catch (final RuntimeException e) {
log.error("Exception caught while post-committing task " + task.id(), e);
maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
}
}
}
}
for (final Task task : revokedActiveTasks) {
try {
task.suspend();
} catch (final RuntimeException e) {
log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e);
maybeSetFirstException(false, maybeWrapTaskException(e, task.id()), firstException);
}
}
if (firstException.get() != null) {
throw firstException.get();
}
}
private void removeRevokedTasksFromStateUpdater(final Set<TopicPartition> remainingRevokedPartitions) {
if (stateUpdater != null) {
for (final Task restoringTask : stateUpdater.getTasks()) {
if (restoringTask.isActive()) {
if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
tasks.addPendingTaskToCloseClean(restoringTask.id());
stateUpdater.remove(restoringTask.id());
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
}
}
}
}
}
private void prepareCommitAndAddOffsetsToMap(final Set<Task> tasksToPrepare,
final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
for (final Task task : tasksToPrepare) {
try {
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
if (!committableOffsets.isEmpty()) {
consumedOffsetsPerTask.put(task, committableOffsets);
}
} catch (final StreamsException e) {
e.setTaskId(task.id());
throw e;
} catch (final Exception e) {
throw new StreamsException(e, task.id());
}
}
}
/**
* Closes active tasks as zombies, as these partitions have been lost and are no longer owned.
* NOTE this method assumes that when it is called, EVERY task/partition has been lost and must
* be closed as a zombie.
*
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void handleLostAll() {
log.debug("Closing lost active tasks as zombies.");
closeRunningTasksDirty();
removeLostTasksFromStateUpdater();
if (processingMode == EXACTLY_ONCE_V2) {
activeTaskCreator.reInitializeThreadProducer();
}
}
private void closeRunningTasksDirty() {
final Set<Task> allTask = tasks.allTasks();
for (final Task task : allTask) {
// Even though we've apparently dropped out of the group, we can continue safely to maintain our
// standby tasks while we rejoin.
if (task.isActive()) {
closeTaskDirty(task);
}
}
}
private void removeLostTasksFromStateUpdater() {
if (stateUpdater != null) {
for (final Task restoringTask : stateUpdater.getTasks()) {
if (restoringTask.isActive()) {
tasks.addPendingTaskToCloseDirty(restoringTask.id());
stateUpdater.remove(restoringTask.id());
}
}
}
}
/**
* Compute the offset total summed across all stores in a task. Includes offset sum for any tasks we own the
* lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}.
* Does not include stateless or non-logged tasks.
*/
public Map<TaskId, Long> getTaskOffsetSums() {
final Map<TaskId, Long> taskOffsetSums = new HashMap<>();
// Not all tasks will create directories, and there may be directories for tasks we don't currently own,
// so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should
// just have an empty changelogOffsets map.
for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) {
final Task task = tasks.owned(id) ? tasks.task(id) : null;
// Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint
if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) {
final Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets();
if (changelogOffsets.isEmpty()) {
log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id);
} else {
taskOffsetSums.put(id, sumOfChangelogOffsets(id, changelogOffsets));
}
} else {
final File checkpointFile = stateDirectory.checkpointFileFor(id);
try {
if (checkpointFile.exists()) {
taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read()));
}
} catch (final IOException e) {
log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e);
}
}
}
return taskOffsetSums;
}
/**
* Makes a weak attempt to lock all non-empty task directories in the state dir. We are responsible for computing and
* reporting the offset sum for any unassigned tasks we obtain the lock for in the upcoming rebalance. Tasks
* that we locked but didn't own will be released at the end of the rebalance (unless of course we were
* assigned the task as a result of the rebalance). This method should be idempotent.
*/
private void tryToLockAllNonEmptyTaskDirectories() {
// Always clear the set at the beginning as we're always dealing with the
// current set of actually-locked tasks.
lockedTaskDirectories.clear();
for (final TaskDirectory taskDir : stateDirectory.listNonEmptyTaskDirectories()) {
final File dir = taskDir.file();
final String namedTopology = taskDir.namedTopology();
try {
final TaskId id = parseTaskDirectoryName(dir.getName(), namedTopology);
if (stateDirectory.lock(id)) {
lockedTaskDirectories.add(id);
if (!tasks.owned(id)) {
log.debug("Temporarily locked unassigned task {} for the upcoming rebalance", id);
}
}
} catch (final TaskIdFormatException e) {
// ignore any unknown files that sit in the same directory
}
}
}
/**
* Clean up after closed or removed tasks by making sure to unlock any remaining locked directories for them, for
* example unassigned tasks or those in the CREATED state when closed, since Task#close will not unlock them
*/
private void releaseLockedDirectoriesForTasks(final Set<TaskId> tasksToUnlock) {
final Iterator<TaskId> taskIdIterator = lockedTaskDirectories.iterator();
while (taskIdIterator.hasNext()) {
final TaskId id = taskIdIterator.next();
if (tasksToUnlock.contains(id)) {
stateDirectory.unlock(id);
taskIdIterator.remove();
}
}
}
/**
* We must release the lock for any unassigned tasks that we temporarily locked in preparation for a
* rebalance in {@link #tryToLockAllNonEmptyTaskDirectories()}.
*/