/
WorkerCoordinator.java
644 lines (558 loc) · 25.1 KB
/
WorkerCoordinator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER;
/**
* This class manages the coordination process with the Kafka group coordinator on the broker for managing assignments
* to workers.
*/
public class WorkerCoordinator extends AbstractCoordinator implements Closeable {
private final Logger log;
private final String restUrl;
private final ConfigBackingStore configStorage;
private volatile ExtendedAssignment assignmentSnapshot;
private ClusterConfigState configSnapshot;
private final WorkerRebalanceListener listener;
private final ConnectProtocolCompatibility protocolCompatibility;
private LeaderState leaderState;
private boolean rejoinRequested;
private volatile ConnectProtocolCompatibility currentConnectProtocol;
private volatile int lastCompletedGenerationId;
private final ConnectAssignor eagerAssignor;
private final ConnectAssignor incrementalAssignor;
private final int coordinatorDiscoveryTimeoutMs;
/**
* Initialize the coordination manager.
*/
public WorkerCoordinator(GroupRebalanceConfig config,
LogContext logContext,
ConsumerNetworkClient client,
Metrics metrics,
String metricGrpPrefix,
Time time,
String restUrl,
ConfigBackingStore configStorage,
WorkerRebalanceListener listener,
ConnectProtocolCompatibility protocolCompatibility,
int maxDelay) {
super(config,
logContext,
client,
metrics,
metricGrpPrefix,
time);
this.log = logContext.logger(WorkerCoordinator.class);
this.restUrl = restUrl;
this.configStorage = configStorage;
this.assignmentSnapshot = null;
new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
this.listener = listener;
this.rejoinRequested = false;
this.protocolCompatibility = protocolCompatibility;
this.incrementalAssignor = new IncrementalCooperativeAssignor(logContext, time, maxDelay);
this.eagerAssignor = new EagerAssignor(logContext);
this.currentConnectProtocol = protocolCompatibility;
this.coordinatorDiscoveryTimeoutMs = config.heartbeatIntervalMs;
this.lastCompletedGenerationId = Generation.NO_GENERATION.generationId;
}
@Override
public void requestRejoin(final String reason) {
log.debug("Request joining group due to: {}", reason);
rejoinRequested = true;
}
@Override
public String protocolType() {
return "connect";
}
// expose for tests
@Override
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
return super.ensureCoordinatorReady(timer);
}
public void poll(long timeout) {
// poll for io until the timeout expires
final long start = time.milliseconds();
long now = start;
long remaining;
do {
if (coordinatorUnknown()) {
log.debug("Broker coordinator is marked unknown. Attempting discovery with a timeout of {}ms",
coordinatorDiscoveryTimeoutMs);
if (ensureCoordinatorReady(time.timer(coordinatorDiscoveryTimeoutMs))) {
log.debug("Broker coordinator is ready");
} else {
log.debug("Can not connect to broker coordinator");
final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
if (localAssignmentSnapshot != null && !localAssignmentSnapshot.failed()) {
log.info("Broker coordinator was unreachable for {}ms. Revoking previous assignment {} to " +
"avoid running tasks while not being a member the group", coordinatorDiscoveryTimeoutMs, localAssignmentSnapshot);
listener.onRevoked(localAssignmentSnapshot.leader(), localAssignmentSnapshot.connectors(), localAssignmentSnapshot.tasks());
assignmentSnapshot = null;
}
}
now = time.milliseconds();
}
if (rejoinNeededOrPending()) {
ensureActiveGroup();
now = time.milliseconds();
}
pollHeartbeat(now);
long elapsed = now - start;
remaining = timeout - elapsed;
// Note that because the network client is shared with the background heartbeat thread,
// we do not want to block in poll longer than the time to the next heartbeat.
long pollTimeout = Math.min(Math.max(0, remaining), timeToNextHeartbeat(now));
client.poll(time.timer(pollTimeout));
now = time.milliseconds();
elapsed = now - start;
remaining = timeout - elapsed;
} while (remaining > 0);
}
@Override
public JoinGroupRequestProtocolCollection metadata() {
configSnapshot = configStorage.snapshot();
final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
ExtendedWorkerState workerState = new ExtendedWorkerState(restUrl, configSnapshot.offset(), localAssignmentSnapshot);
switch (protocolCompatibility) {
case EAGER:
return ConnectProtocol.metadataRequest(workerState);
case COMPATIBLE:
return IncrementalCooperativeConnectProtocol.metadataRequest(workerState, false);
case SESSIONED:
return IncrementalCooperativeConnectProtocol.metadataRequest(workerState, true);
default:
throw new IllegalStateException("Unknown Connect protocol compatibility mode " + protocolCompatibility);
}
}
@Override
protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
ExtendedAssignment newAssignment = IncrementalCooperativeConnectProtocol.deserializeAssignment(memberAssignment);
log.debug("Deserialized new assignment: {}", newAssignment);
currentConnectProtocol = ConnectProtocolCompatibility.fromProtocol(protocol);
// At this point we always consider ourselves to be a member of the cluster, even if there was an assignment
// error (the leader couldn't make the assignment) or we are behind the config and cannot yet work on our assigned
// tasks. It's the responsibility of the code driving this process to decide how to react (e.g. trying to get
// up to date, try to rejoin again, leaving the group and backing off, etc.).
rejoinRequested = false;
if (currentConnectProtocol != EAGER) {
if (!newAssignment.revokedConnectors().isEmpty() || !newAssignment.revokedTasks().isEmpty()) {
listener.onRevoked(newAssignment.leader(), newAssignment.revokedConnectors(), newAssignment.revokedTasks());
}
final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
if (localAssignmentSnapshot != null) {
localAssignmentSnapshot.connectors().removeAll(newAssignment.revokedConnectors());
localAssignmentSnapshot.tasks().removeAll(newAssignment.revokedTasks());
log.debug("After revocations snapshot of assignment: {}", localAssignmentSnapshot);
newAssignment.connectors().addAll(localAssignmentSnapshot.connectors());
newAssignment.tasks().addAll(localAssignmentSnapshot.tasks());
}
log.debug("Augmented new assignment: {}", newAssignment);
}
assignmentSnapshot = newAssignment;
lastCompletedGenerationId = generation;
listener.onAssigned(newAssignment, generation);
}
@Override
protected Map<String, ByteBuffer> onLeaderElected(String leaderId,
String protocol,
List<JoinGroupResponseMember> allMemberMetadata,
boolean skipAssignment) {
if (skipAssignment)
throw new IllegalStateException("Can't skip assignment because Connect does not support static membership.");
return ConnectProtocolCompatibility.fromProtocol(protocol) == EAGER
? eagerAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this)
: incrementalAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this);
}
@Override
protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
log.info("Rebalance started");
leaderState(null);
final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
if (currentConnectProtocol == EAGER) {
log.debug("Revoking previous assignment {}", localAssignmentSnapshot);
if (localAssignmentSnapshot != null && !localAssignmentSnapshot.failed())
listener.onRevoked(localAssignmentSnapshot.leader(), localAssignmentSnapshot.connectors(), localAssignmentSnapshot.tasks());
} else {
log.debug("Cooperative rebalance triggered. Keeping assignment {} until it's "
+ "explicitly revoked.", localAssignmentSnapshot);
}
return true;
}
@Override
protected boolean rejoinNeededOrPending() {
final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
return super.rejoinNeededOrPending() || (localAssignmentSnapshot == null || localAssignmentSnapshot.failed()) || rejoinRequested;
}
@Override
public String memberId() {
Generation generation = generationIfStable();
if (generation != null)
return generation.memberId;
return JoinGroupRequest.UNKNOWN_MEMBER_ID;
}
/**
* Return the current generation. The generation refers to this worker's knowledge with
* respect to which generation is the latest one and, therefore, this information is local.
*
* @return the generation ID or -1 if no generation is defined
*/
public int generationId() {
return super.generation().generationId;
}
/**
* Return id that corresponds to the group generation that was active when the last join was successful
*
* @return the generation ID of the last group that was joined successfully by this member or -1 if no generation
* was stable at that point
*/
public int lastCompletedGenerationId() {
return lastCompletedGenerationId;
}
public void revokeAssignment(ExtendedAssignment assignment) {
listener.onRevoked(assignment.leader(), assignment.connectors(), assignment.tasks());
}
private boolean isLeader() {
final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
return localAssignmentSnapshot != null && memberId().equals(localAssignmentSnapshot.leader());
}
public String ownerUrl(String connector) {
if (rejoinNeededOrPending() || !isLeader())
return null;
return leaderState().ownerUrl(connector);
}
public String ownerUrl(ConnectorTaskId task) {
if (rejoinNeededOrPending() || !isLeader())
return null;
return leaderState().ownerUrl(task);
}
/**
* Get an up-to-date snapshot of the cluster configuration.
*
* @return the state of the cluster configuration; the result is not locally cached
*/
public ClusterConfigState configFreshSnapshot() {
return configStorage.snapshot();
}
/**
* Get a snapshot of the cluster configuration.
*
* @return the state of the cluster configuration
*/
public ClusterConfigState configSnapshot() {
return configSnapshot;
}
/**
* Set the state of the cluster configuration to this worker coordinator.
*
* @param update the updated state of the cluster configuration
*/
public void configSnapshot(ClusterConfigState update) {
configSnapshot = update;
}
/**
* Get the leader state stored in this worker coordinator.
*
* @return the leader state
*/
private LeaderState leaderState() {
return leaderState;
}
/**
* Store the leader state to this worker coordinator.
*
* @param update the updated leader state
*/
public void leaderState(LeaderState update) {
leaderState = update;
}
/**
* Get the version of the connect protocol that is currently active in the group of workers.
*
* @return the current connect protocol version
*/
public short currentProtocolVersion() {
return currentConnectProtocol.protocolVersion();
}
private class WorkerCoordinatorMetrics {
public final String metricGrpName;
public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
Measurable numConnectors = (config, now) -> {
final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
if (localAssignmentSnapshot == null) {
return 0.0;
}
return localAssignmentSnapshot.connectors().size();
};
Measurable numTasks = (config, now) -> {
final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
if (localAssignmentSnapshot == null) {
return 0.0;
}
return localAssignmentSnapshot.tasks().size();
};
metrics.addMetric(metrics.metricName("assigned-connectors",
this.metricGrpName,
"The number of connector instances currently assigned to this worker"), numConnectors);
metrics.addMetric(metrics.metricName("assigned-tasks",
this.metricGrpName,
"The number of tasks currently assigned to this worker"), numTasks);
}
}
public static <K, V> Map<V, K> invertAssignment(Map<K, Collection<V>> assignment) {
Map<V, K> inverted = new HashMap<>();
for (Map.Entry<K, Collection<V>> assignmentEntry : assignment.entrySet()) {
K key = assignmentEntry.getKey();
for (V value : assignmentEntry.getValue())
inverted.put(value, key);
}
return inverted;
}
public static class LeaderState {
private final Map<String, ExtendedWorkerState> allMembers;
private final Map<String, String> connectorOwners;
private final Map<ConnectorTaskId, String> taskOwners;
public LeaderState(Map<String, ExtendedWorkerState> allMembers,
Map<String, Collection<String>> connectorAssignment,
Map<String, Collection<ConnectorTaskId>> taskAssignment) {
this.allMembers = allMembers;
this.connectorOwners = invertAssignment(connectorAssignment);
this.taskOwners = invertAssignment(taskAssignment);
}
private String ownerUrl(ConnectorTaskId id) {
String ownerId = taskOwners.get(id);
if (ownerId == null)
return null;
return allMembers.get(ownerId).url();
}
private String ownerUrl(String connector) {
String ownerId = connectorOwners.get(connector);
if (ownerId == null)
return null;
return allMembers.get(ownerId).url();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof LeaderState)) return false;
LeaderState that = (LeaderState) o;
return Objects.equals(allMembers, that.allMembers)
&& Objects.equals(connectorOwners, that.connectorOwners)
&& Objects.equals(taskOwners, that.taskOwners);
}
@Override
public int hashCode() {
return Objects.hash(allMembers, connectorOwners, taskOwners);
}
@Override
public String toString() {
return "LeaderState{"
+ "allMembers=" + allMembers
+ ", connectorOwners=" + connectorOwners
+ ", taskOwners=" + taskOwners
+ '}';
}
}
public static class ConnectorsAndTasks {
public static final ConnectorsAndTasks EMPTY =
new ConnectorsAndTasks(Collections.emptyList(), Collections.emptyList());
private final Collection<String> connectors;
private final Collection<ConnectorTaskId> tasks;
private ConnectorsAndTasks(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
this.connectors = connectors;
this.tasks = tasks;
}
public static class Builder {
private Set<String> withConnectors = new LinkedHashSet<>();
private Set<ConnectorTaskId> withTasks = new LinkedHashSet<>();
public Builder() {
}
public ConnectorsAndTasks.Builder with(Collection<String> connectors,
Collection<ConnectorTaskId> tasks) {
withConnectors = new LinkedHashSet<>(connectors);
withTasks = new LinkedHashSet<>(tasks);
return this;
}
public ConnectorsAndTasks.Builder addConnectors(Collection<String> connectors) {
this.withConnectors.addAll(connectors);
return this;
}
public ConnectorsAndTasks.Builder addTasks(Collection<ConnectorTaskId> tasks) {
this.withTasks.addAll(tasks);
return this;
}
public ConnectorsAndTasks.Builder addAll(ConnectorsAndTasks connectorsAndTasks) {
return this
.addConnectors(connectorsAndTasks.connectors())
.addTasks(connectorsAndTasks.tasks());
}
public ConnectorsAndTasks build() {
return new ConnectorsAndTasks(withConnectors, withTasks);
}
}
public Collection<String> connectors() {
return connectors;
}
public Collection<ConnectorTaskId> tasks() {
return tasks;
}
public int size() {
return connectors.size() + tasks.size();
}
public boolean isEmpty() {
return connectors.isEmpty() && tasks.isEmpty();
}
@Override
public String toString() {
return "{ connectorIds=" + connectors + ", taskIds=" + tasks + '}';
}
}
public static class WorkerLoad {
private final String worker;
private final Collection<String> connectors;
private final Collection<ConnectorTaskId> tasks;
private WorkerLoad(
String worker,
Collection<String> connectors,
Collection<ConnectorTaskId> tasks
) {
this.worker = worker;
this.connectors = connectors;
this.tasks = tasks;
}
public static class Builder {
private final String withWorker;
private Collection<String> withConnectors;
private Collection<ConnectorTaskId> withTasks;
public Builder(String worker) {
this.withWorker = Objects.requireNonNull(worker, "worker cannot be null");
}
public WorkerLoad.Builder withCopies(Collection<String> connectors,
Collection<ConnectorTaskId> tasks) {
withConnectors = new ArrayList<>(
Objects.requireNonNull(connectors, "connectors may be empty but not null"));
withTasks = new ArrayList<>(
Objects.requireNonNull(tasks, "tasks may be empty but not null"));
return this;
}
public WorkerLoad.Builder with(Collection<String> connectors,
Collection<ConnectorTaskId> tasks) {
withConnectors = Objects.requireNonNull(connectors,
"connectors may be empty but not null");
withTasks = Objects.requireNonNull(tasks, "tasks may be empty but not null");
return this;
}
public WorkerLoad build() {
return new WorkerLoad(
withWorker,
withConnectors != null ? withConnectors : new ArrayList<>(),
withTasks != null ? withTasks : new ArrayList<>());
}
}
public String worker() {
return worker;
}
public Collection<String> connectors() {
return connectors;
}
public Collection<ConnectorTaskId> tasks() {
return tasks;
}
public int connectorsSize() {
return connectors.size();
}
public int tasksSize() {
return tasks.size();
}
public void assign(String connector) {
connectors.add(connector);
}
public void assign(ConnectorTaskId task) {
tasks.add(task);
}
public int size() {
return connectors.size() + tasks.size();
}
public boolean isEmpty() {
return connectors.isEmpty() && tasks.isEmpty();
}
public static Comparator<WorkerLoad> connectorComparator() {
return (left, right) -> {
int res = left.connectors.size() - right.connectors.size();
return res != 0 ? res : left.worker == null
? right.worker == null ? 0 : -1
: left.worker.compareTo(right.worker);
};
}
public static Comparator<WorkerLoad> taskComparator() {
return (left, right) -> {
int res = left.tasks.size() - right.tasks.size();
return res != 0 ? res : left.worker == null
? right.worker == null ? 0 : -1
: left.worker.compareTo(right.worker);
};
}
@Override
public String toString() {
return "{ worker=" + worker + ", connectorIds=" + connectors + ", taskIds=" + tasks + '}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof WorkerLoad)) {
return false;
}
WorkerLoad that = (WorkerLoad) o;
return worker.equals(that.worker) &&
connectors.equals(that.connectors) &&
tasks.equals(that.tasks);
}
@Override
public int hashCode() {
return Objects.hash(worker, connectors, tasks);
}
}
}