/
NetworkClient.java
1305 lines (1173 loc) · 56.1 KB
/
NetworkClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* A network client for asynchronous request/response network i/o. This is an internal class used to implement the
* user-facing producer and consumer clients.
* <p>
* This class is not thread-safe!
*/
public class NetworkClient implements KafkaClient {
private enum State {
ACTIVE,
CLOSING,
CLOSED
}
private final Logger log;
/* the selector used to perform network i/o */
private final Selectable selector;
private final MetadataUpdater metadataUpdater;
private final Random randOffset;
/* the state of each node's connection */
private final ClusterConnectionStates connectionStates;
/* the set of requests currently being sent or awaiting a response */
private final InFlightRequests inFlightRequests;
/* the socket send buffer size in bytes */
private final int socketSendBuffer;
/* the socket receive size buffer in bytes */
private final int socketReceiveBuffer;
/* the client id used to identify this client in requests to the server */
private final String clientId;
/* the current correlation id to use when sending requests to servers */
private int correlation;
/* default timeout for individual requests to await acknowledgement from servers */
private final int defaultRequestTimeoutMs;
/* time in ms to wait before retrying to create connection to a server */
private final long reconnectBackoffMs;
private final ClientDnsLookup clientDnsLookup;
private final Time time;
/**
* True if we should send an ApiVersionRequest when first connecting to a broker.
*/
private final boolean discoverBrokerVersions;
private final ApiVersions apiVersions;
private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch = new HashMap<>();
private final List<ClientResponse> abortedSends = new LinkedList<>();
private final Sensor throttleTimeSensor;
private final AtomicReference<State> state;
public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
LogContext logContext) {
this(null,
metadata,
selector,
clientId,
maxInFlightRequestsPerConnection,
reconnectBackoffMs,
reconnectBackoffMax,
socketSendBuffer,
socketReceiveBuffer,
defaultRequestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
clientDnsLookup,
time,
discoverBrokerVersions,
apiVersions,
null,
logContext);
}
public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
this(null,
metadata,
selector,
clientId,
maxInFlightRequestsPerConnection,
reconnectBackoffMs,
reconnectBackoffMax,
socketSendBuffer,
socketReceiveBuffer,
defaultRequestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
clientDnsLookup,
time,
discoverBrokerVersions,
apiVersions,
throttleTimeSensor,
logContext);
}
public NetworkClient(Selectable selector,
MetadataUpdater metadataUpdater,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
LogContext logContext) {
this(metadataUpdater,
null,
selector,
clientId,
maxInFlightRequestsPerConnection,
reconnectBackoffMs,
reconnectBackoffMax,
socketSendBuffer,
socketReceiveBuffer,
defaultRequestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
clientDnsLookup,
time,
discoverBrokerVersions,
apiVersions,
null,
logContext);
}
private NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
*/
if (metadataUpdater == null) {
if (metadata == null)
throw new IllegalArgumentException("`metadata` must not be null");
this.metadataUpdater = new DefaultMetadataUpdater(metadata);
} else {
this.metadataUpdater = metadataUpdater;
}
this.selector = selector;
this.clientId = clientId;
this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
this.connectionStates = new ClusterConnectionStates(
reconnectBackoffMs, reconnectBackoffMax,
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext);
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
this.randOffset = new Random();
this.defaultRequestTimeoutMs = defaultRequestTimeoutMs;
this.reconnectBackoffMs = reconnectBackoffMs;
this.time = time;
this.discoverBrokerVersions = discoverBrokerVersions;
this.apiVersions = apiVersions;
this.throttleTimeSensor = throttleTimeSensor;
this.log = logContext.logger(NetworkClient.class);
this.clientDnsLookup = clientDnsLookup;
this.state = new AtomicReference<>(State.ACTIVE);
}
/**
* Begin connecting to the given node, return true if we are already connected and ready to send to that node.
*
* @param node The node to check
* @param now The current timestamp
* @return True if we are ready to send to the given node
*/
@Override
public boolean ready(Node node, long now) {
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);
if (isReady(node, now))
return true;
if (connectionStates.canConnect(node.idString(), now))
// if we are interested in sending to a node and we don't have a connection to it, initiate one
initiateConnect(node, now);
return false;
}
// Visible for testing
boolean canConnect(Node node, long now) {
return connectionStates.canConnect(node.idString(), now);
}
/**
* Disconnects the connection to a particular node, if there is one.
* Any pending ClientRequests for this connection will receive disconnections.
*
* @param nodeId The id of the node
*/
@Override
public void disconnect(String nodeId) {
if (connectionStates.isDisconnected(nodeId))
return;
selector.close(nodeId);
long now = time.milliseconds();
cancelInFlightRequests(nodeId, now, abortedSends);
connectionStates.disconnected(nodeId, now);
if (log.isTraceEnabled()) {
log.trace("Manually disconnected from {}. Aborted in-flight requests: {}.", nodeId, inFlightRequests);
}
}
private void cancelInFlightRequests(String nodeId, long now, Collection<ClientResponse> responses) {
Iterable<InFlightRequest> inFlightRequests = this.inFlightRequests.clearAll(nodeId);
for (InFlightRequest request : inFlightRequests) {
log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",
request.header.apiKey(), request.request, request.header.correlationId(), nodeId);
if (!request.isInternalRequest) {
if (responses != null)
responses.add(request.disconnected(now, null));
} else if (request.header.apiKey() == ApiKeys.METADATA) {
metadataUpdater.handleFailedRequest(now, Optional.empty());
}
}
}
/**
* Closes the connection to a particular node (if there is one).
* All requests on the connection will be cleared. ClientRequest callbacks will not be invoked
* for the cleared requests, nor will they be returned from poll().
*
* @param nodeId The id of the node
*/
@Override
public void close(String nodeId) {
selector.close(nodeId);
long now = time.milliseconds();
cancelInFlightRequests(nodeId, now, null);
connectionStates.remove(nodeId);
}
/**
* Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
* disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
* connections.
*
* @param node The node to check
* @param now The current timestamp
* @return The number of milliseconds to wait.
*/
@Override
public long connectionDelay(Node node, long now) {
return connectionStates.connectionDelay(node.idString(), now);
}
// Return the remaining throttling delay in milliseconds if throttling is in progress. Return 0, otherwise.
// This is for testing.
public long throttleDelayMs(Node node, long now) {
return connectionStates.throttleDelayMs(node.idString(), now);
}
/**
* Return the poll delay in milliseconds based on both connection and throttle delay.
* @param node the connection to check
* @param now the current time in ms
*/
@Override
public long pollDelayMs(Node node, long now) {
return connectionStates.pollDelayMs(node.idString(), now);
}
/**
* Check if the connection of the node has failed, based on the connection state. Such connection failure are
* usually transient and can be resumed in the next {@link #ready(org.apache.kafka.common.Node, long)} }
* call, but there are cases where transient failures needs to be caught and re-acted upon.
*
* @param node the node to check
* @return true iff the connection has failed and the node is disconnected
*/
@Override
public boolean connectionFailed(Node node) {
return connectionStates.isDisconnected(node.idString());
}
/**
* Check if authentication to this node has failed, based on the connection state. Authentication failures are
* propagated without any retries.
*
* @param node the node to check
* @return an AuthenticationException iff authentication has failed, null otherwise
*/
@Override
public AuthenticationException authenticationException(Node node) {
return connectionStates.authenticationException(node.idString());
}
/**
* Check if the node with the given id is ready to send more requests.
*
* @param node The node
* @param now The current time in ms
* @return true if the node is ready
*/
@Override
public boolean isReady(Node node, long now) {
// if we need to update our metadata now declare all requests unready to make metadata requests first
// priority
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString(), now);
}
/**
* Are we connected and ready and able to send more requests to the given connection?
*
* @param node The node
* @param now the current timestamp
*/
private boolean canSendRequest(String node, long now) {
return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
inFlightRequests.canSendMore(node);
}
/**
* Queue up the given request for sending. Requests can only be sent out to ready nodes.
* @param request The request
* @param now The current timestamp
*/
@Override
public void send(ClientRequest request, long now) {
doSend(request, false, now);
}
// package-private for testing
void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
doSend(clientRequest, true, now);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
ensureActive();
String nodeId = clientRequest.destination();
if (!isInternalRequest) {
// If this request came from outside the NetworkClient, validate
// that we can send data. If the request is internal, we trust
// that internal code has done this validation. Validation
// will be slightly different for some internal requests (for
// example, ApiVersionsRequests can be sent prior to being in
// READY state.)
if (!canSendRequest(nodeId, now))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending {} with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException unsupportedVersionException) {
// If the version is not supported, skip sending the request over the wire.
// Instead, simply add it to the local queue of aborted requests.
log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);
if (!isInternalRequest)
abortedSends.add(clientResponse);
else if (clientRequest.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
}
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
}
Send send = request.toSend(destination, header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
selector.send(send);
}
/**
* Do actual reads and writes to sockets.
*
* @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
* must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
* metadata timeout
* @param now The current time in milliseconds
* @return The list of responses received
*/
@Override
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutConnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
return responses;
}
private void completeResponses(List<ClientResponse> responses) {
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
/**
* Get the number of in-flight requests
*/
@Override
public int inFlightRequestCount() {
return this.inFlightRequests.count();
}
@Override
public boolean hasInFlightRequests() {
return !this.inFlightRequests.isEmpty();
}
/**
* Get the number of in-flight requests for a given node
*/
@Override
public int inFlightRequestCount(String node) {
return this.inFlightRequests.count(node);
}
@Override
public boolean hasInFlightRequests(String node) {
return !this.inFlightRequests.isEmpty(node);
}
@Override
public boolean hasReadyNodes(long now) {
return connectionStates.hasReadyNodes(now);
}
/**
* Interrupt the client if it is blocked waiting on I/O.
*/
@Override
public void wakeup() {
this.selector.wakeup();
}
@Override
public void initiateClose() {
if (state.compareAndSet(State.ACTIVE, State.CLOSING)) {
wakeup();
}
}
@Override
public boolean active() {
return state.get() == State.ACTIVE;
}
private void ensureActive() {
if (!active())
throw new DisconnectException("NetworkClient is no longer active, state is " + state);
}
/**
* Close the network client
*/
@Override
public void close() {
state.compareAndSet(State.ACTIVE, State.CLOSING);
if (state.compareAndSet(State.CLOSING, State.CLOSED)) {
this.selector.close();
this.metadataUpdater.close();
} else {
log.warn("Attempting to close NetworkClient that has already been closed.");
}
}
/**
* Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will
* prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
* connection if all existing connections are in use. If no connection exists, this method will prefer a node
* with least recent connection attempts. This method will never choose a node for which there is no
* existing connection and from which we have disconnected within the reconnect backoff period, or an active
* connection which is being throttled.
*
* @return The node with the fewest in-flight requests.
*/
@Override
public Node leastLoadedNode(long now) {
List<Node> nodes = this.metadataUpdater.fetchNodes();
if (nodes.isEmpty())
throw new IllegalStateException("There are no nodes in the Kafka cluster");
int inflight = Integer.MAX_VALUE;
Node foundConnecting = null;
Node foundCanConnect = null;
Node foundReady = null;
int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
if (canSendRequest(node.idString(), now)) {
int currInflight = this.inFlightRequests.count(node.idString());
if (currInflight == 0) {
// if we find an established connection with no in-flight requests we can stop right away
log.trace("Found least loaded node {} connected with no in-flight requests", node);
return node;
} else if (currInflight < inflight) {
// otherwise if this is the best we have found so far, record that
inflight = currInflight;
foundReady = node;
}
} else if (connectionStates.isPreparingConnection(node.idString())) {
foundConnecting = node;
} else if (canConnect(node, now)) {
if (foundCanConnect == null ||
this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
this.connectionStates.lastConnectAttemptMs(node.idString())) {
foundCanConnect = node;
}
} else {
log.trace("Removing node {} from least loaded node selection since it is neither ready " +
"for sending or connecting", node);
}
}
// We prefer established connections if possible. Otherwise, we will wait for connections
// which are being established before connecting to new nodes.
if (foundReady != null) {
log.trace("Found least loaded node {} with {} inflight requests", foundReady, inflight);
return foundReady;
} else if (foundConnecting != null) {
log.trace("Found least loaded connecting node {}", foundConnecting);
return foundConnecting;
} else if (foundCanConnect != null) {
log.trace("Found least loaded node {} with no active connection", foundCanConnect);
return foundCanConnect;
} else {
log.trace("Least loaded node selection failed to find an available node");
return null;
}
}
public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) {
try {
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0);
return AbstractResponse.parseResponse(requestHeader.apiKey(), responseStruct,
requestHeader.apiVersion());
} catch (BufferUnderflowException e) {
throw new SchemaException("Buffer underflow while parsing response for request with header " + requestHeader, e);
}
}
private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader,
Sensor throttleTimeSensor, long now) {
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer,
requestHeader.apiKey().responseHeaderVersion(requestHeader.apiVersion()));
// Always expect the response version id to be the same as the request version id
Struct responseBody = requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer);
correlate(requestHeader, responseHeader);
if (throttleTimeSensor != null && responseBody.hasField(CommonFields.THROTTLE_TIME_MS))
throttleTimeSensor.record(responseBody.get(CommonFields.THROTTLE_TIME_MS), now);
return responseBody;
}
/**
* Post process disconnection of a node
*
* @param responses The list of responses to update
* @param nodeId Id of the node to be disconnected
* @param now The current time
* @param disconnectState The state of the disconnected channel
*/
private void processDisconnection(List<ClientResponse> responses,
String nodeId,
long now,
ChannelState disconnectState) {
connectionStates.disconnected(nodeId, now);
apiVersions.remove(nodeId);
nodesNeedingApiVersionsFetch.remove(nodeId);
switch (disconnectState.state()) {
case AUTHENTICATION_FAILED:
AuthenticationException exception = disconnectState.exception();
connectionStates.authenticationFailed(nodeId, now, exception);
log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId,
disconnectState.remoteAddress(), exception.getMessage());
break;
case AUTHENTICATE:
log.warn("Connection to node {} ({}) terminated during authentication. This may happen " +
"due to any of the following reasons: (1) Authentication failed due to invalid " +
"credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS " +
"traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.",
nodeId, disconnectState.remoteAddress());
break;
case NOT_CONNECTED:
log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());
break;
default:
break; // Disconnections in other states are logged at debug level in Selector
}
cancelInFlightRequests(nodeId, now, responses);
metadataUpdater.handleServerDisconnect(now, nodeId, Optional.ofNullable(disconnectState.exception()));
}
/**
* Iterate over all the inflight requests and expire any requests that have exceeded the configured requestTimeout.
* The connection to the node associated with the request will be terminated and will be treated as a disconnection.
*
* @param responses The list of responses to update
* @param now The current time
*/
private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
List<String> nodeIds = this.inFlightRequests.nodesWithTimedOutRequests(now);
for (String nodeId : nodeIds) {
// close connection to the node
this.selector.close(nodeId);
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
}
}
private void handleAbortedSends(List<ClientResponse> responses) {
responses.addAll(abortedSends);
abortedSends.clear();
}
/**
* Handle socket channel connection timeout. The timeout will hit iff a connection
* stays at the ConnectionState.CONNECTING state longer than the timeout value,
* as indicated by ClusterConnectionStates.NodeConnectionState.
*
* @param responses The list of responses to update
* @param now The current time
*/
private void handleTimedOutConnections(List<ClientResponse> responses, long now) {
Set<String> nodes = connectionStates.nodesWithConnectionSetupTimeout(now);
for (String nodeId : nodes) {
this.selector.close(nodeId);
log.debug(
"Disconnecting from node {} due to socket connection setup timeout. " +
"The timeout value is {} ms.",
nodeId,
connectionStates.connectionSetupTimeoutMs(nodeId));
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
}
}
/**
* Handle any completed request send. In particular if no response is expected consider the request complete.
*
* @param responses The list of responses to update
* @param now The current time
*/
private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {
InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
if (!request.expectResponse) {
this.inFlightRequests.completeLastSent(send.destination());
responses.add(request.completed(null, now));
}
}
}
/**
* If a response from a node includes a non-zero throttle delay and client-side throttling has been enabled for
* the connection to the node, throttle the connection for the specified delay.
*
* @param response the response
* @param apiVersion the API version of the response
* @param nodeId the id of the node
* @param now The current time
*/
private void maybeThrottle(AbstractResponse response, short apiVersion, String nodeId, long now) {
int throttleTimeMs = response.throttleTimeMs();
if (throttleTimeMs > 0 && response.shouldClientThrottle(apiVersion)) {
connectionStates.throttle(nodeId, now + throttleTimeMs);
log.trace("Connection to node {} is throttled for {} ms until timestamp {}", nodeId, throttleTimeMs,
now + throttleTimeMs);
}
}
/**
* Handle any completed receives and update the response list with the responses received.
*
* @param responses The list of responses to update
* @param now The current time
*/
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
InFlightRequest req = inFlightRequests.completeNext(source);
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
AbstractResponse response = AbstractResponse.
parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
if (log.isDebugEnabled()) {
log.debug("Received {} response from node {} for request with header {}: {}",
req.header.apiKey(), req.destination, req.header, response);
}
// If the received response includes a throttle delay, throttle the connection.
maybeThrottle(response, req.header.apiVersion(), req.destination, now);
if (req.isInternalRequest && response instanceof MetadataResponse)
metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) response);
else if (req.isInternalRequest && response instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) response);
else
responses.add(req.completed(response, now));
}
}
private void handleApiVersionsResponse(List<ClientResponse> responses,
InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) {
final String node = req.destination;
if (apiVersionsResponse.data.errorCode() != Errors.NONE.code()) {
if (req.request.version() == 0 || apiVersionsResponse.data.errorCode() != Errors.UNSUPPORTED_VERSION.code()) {
log.warn("Received error {} from node {} when making an ApiVersionsRequest with correlation id {}. Disconnecting.",
Errors.forCode(apiVersionsResponse.data.errorCode()), node, req.header.correlationId());
this.selector.close(node);
processDisconnection(responses, node, now, ChannelState.LOCAL_CLOSE);
} else {
// Starting from Apache Kafka 2.4, ApiKeys field is populated with the supported versions of
// the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
// If not provided, the client falls back to version 0.
short maxApiVersion = 0;
if (apiVersionsResponse.data.apiKeys().size() > 0) {
ApiVersionsResponseKey apiVersion = apiVersionsResponse.data.apiKeys().find(ApiKeys.API_VERSIONS.id);
if (apiVersion != null) {
maxApiVersion = apiVersion.maxVersion();
}
}
nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder(maxApiVersion));
}
return;
}
NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.data.apiKeys());
apiVersions.update(node, nodeVersionInfo);
this.connectionStates.ready(node);
log.debug("Recorded API versions for node {}: {}", node, nodeVersionInfo);
}
/**
* Handle any disconnected connections
*
* @param responses The list of responses that completed with the disconnection
* @param now The current time
*/
private void handleDisconnections(List<ClientResponse> responses, long now) {
for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) {
String node = entry.getKey();
log.debug("Node {} disconnected.", node);
processDisconnection(responses, node, now, entry.getValue());
}
}
/**
* Record any newly completed connections
*/
private void handleConnections() {
for (String node : this.selector.connected()) {
// We are now connected. Note that we might not still be able to send requests. For instance,
// if SSL is enabled, the SSL handshake happens after the connection is established.
// Therefore, it is still necessary to check isChannelReady before attempting to send on this
// connection.
if (discoverBrokerVersions) {
this.connectionStates.checkingApiVersions(node);
nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
log.debug("Completed connection to node {}. Fetching API versions.", node);
} else {
this.connectionStates.ready(node);
log.debug("Completed connection to node {}. Ready.", node);
}
}
}
private void handleInitiateApiVersionRequests(long now) {
Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> iter = nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}
}
}
/**
* Validate that the response corresponds to the request we expect or else explode
*/
private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
if (requestHeader.correlationId() != responseHeader.correlationId()) {
if (SaslClientAuthenticator.isReserved(requestHeader.correlationId())
&& !SaslClientAuthenticator.isReserved(responseHeader.correlationId()))
throw new SchemaException("the response is unrelated to Sasl request since its correlation id is " + responseHeader.correlationId()
+ " and the reserved range for Sasl request is [ "
+ SaslClientAuthenticator.MIN_RESERVED_CORRELATION_ID + "," + SaslClientAuthenticator.MAX_RESERVED_CORRELATION_ID + "]");
throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
+ ") does not match request (" + requestHeader.correlationId() + "), request header: " + requestHeader);
}
}
/**
* Initiate a connection to the given node
* @param node the node to connect to
* @param now current time in epoch milliseconds
*/
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {