Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch #8683

Merged
merged 16 commits into from Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,9 +16,11 @@
*/
package org.apache.kafka.clients;

import java.util.concurrent.ThreadLocalRandom;
import java.util.HashSet;
import java.util.Set;

import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

Expand All @@ -34,19 +36,32 @@
*
*/
final class ClusterConnectionStates {
private final long reconnectBackoffInitMs;
private final long reconnectBackoffMaxMs;
private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
ctan888 marked this conversation as resolved.
Show resolved Hide resolved
private final double reconnectBackoffMaxExp;
final static int RECONNECT_BACKOFF_EXP_BASE = 2;
final static double RECONNECT_BACKOFF_JITTER = 0.2;
final static int CONNECTION_SETUP_TIMEOUT_EXP_BASE = 2;
final static double CONNECTION_SETUP_TIMEOUT_JITTER = 0.2;
private final Map<String, NodeConnectionState> nodeState;
private final Logger log;
private Set<String> connectingNodes;
private ExponentialBackoff reconnectBackoff;
private ExponentialBackoff connectionSetupTimeout;

public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs, LogContext logContext) {
public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs,
long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs,
LogContext logContext) {
this.log = logContext.logger(ClusterConnectionStates.class);
this.reconnectBackoffInitMs = reconnectBackoffMs;
this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
this.reconnectBackoffMaxExp = Math.log(this.reconnectBackoffMaxMs / (double) Math.max(reconnectBackoffMs, 1)) / Math.log(RECONNECT_BACKOFF_EXP_BASE);
this.reconnectBackoff = new ExponentialBackoff(
reconnectBackoffMs,
RECONNECT_BACKOFF_EXP_BASE,
reconnectBackoffMaxMs,
RECONNECT_BACKOFF_JITTER);
this.connectionSetupTimeout = new ExponentialBackoff(
connectionSetupTimeoutMs,
CONNECTION_SETUP_TIMEOUT_EXP_BASE,
connectionSetupTimeoutMaxMs,
CONNECTION_SETUP_TIMEOUT_JITTER);
this.nodeState = new HashMap<>();
this.connectingNodes = new HashSet<>();
}

/**
Expand Down Expand Up @@ -131,6 +146,7 @@ public void connecting(String id, long now, String host, ClientDnsLookup clientD
connectionState.state = ConnectionState.CONNECTING;
// Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved
connectionState.moveToNextAddress();
connectingNodes.add(id);
return;
} else if (connectionState != null) {
log.info("Hostname for node {} changed from {} to {}.", id, connectionState.host(), host);
Expand All @@ -139,7 +155,8 @@ public void connecting(String id, long now, String host, ClientDnsLookup clientD
// Create a new NodeConnectionState if nodeState does not already contain one
// for the specified id or if the hostname associated with the node id changed.
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
this.reconnectBackoffInitMs, host, clientDnsLookup));
reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host, clientDnsLookup));
connectingNodes.add(id);
}

/**
Expand All @@ -158,9 +175,15 @@ public InetAddress currentAddress(String id) throws UnknownHostException {
*/
public void disconnected(String id, long now) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.DISCONNECTED;
nodeState.lastConnectAttemptMs = now;
updateReconnectBackoff(nodeState);
if (nodeState.state == ConnectionState.CONNECTING) {
updateConnectionSetupTimeout(nodeState);
connectingNodes.remove(id);
} else {
resetConnectionSetupTimeout(nodeState);
}
nodeState.state = ConnectionState.DISCONNECTED;
}

/**
Expand Down Expand Up @@ -213,6 +236,9 @@ public long pollDelayMs(String id, long now) {
public void checkingApiVersions(String id) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.CHECKING_API_VERSIONS;
resetReconnectBackoff(nodeState);
resetConnectionSetupTimeout(nodeState);
connectingNodes.remove(id);
}

/**
Expand All @@ -224,6 +250,8 @@ public void ready(String id) {
nodeState.state = ConnectionState.READY;
nodeState.authenticationException = null;
resetReconnectBackoff(nodeState);
resetConnectionSetupTimeout(nodeState);
connectingNodes.remove(id);
}

/**
Expand Down Expand Up @@ -304,26 +332,43 @@ public AuthenticationException authenticationException(String id) {
*/
private void resetReconnectBackoff(NodeConnectionState nodeState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we keep this method to perform reset (perhaps rename the method) and include all types of reset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including all types of reset together is probably not a good choice because the reset of failed attempts and the reset of the connection timeout may happen in different places.

However, I agree we should have some abstraction on the update and reset logic. I'll put the logic in new class methods.

nodeState.failedAttempts = 0;
nodeState.reconnectBackoffMs = this.reconnectBackoffInitMs;
nodeState.reconnectBackoffMs = reconnectBackoff.backoff(0);
}

/**
* Resets the failure count for a node and sets the connection setup timeout to the base
* value configured via socket.connection.setup.timeout.ms
*
* @param nodeState The node state object to update
*/
private void resetConnectionSetupTimeout(NodeConnectionState nodeState) {
nodeState.failedConnectAttempts = 0;
nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.backoff(0);
}

/**
* Update the node reconnect backoff exponentially.
* Increment the failure counter, update the node reconnect backoff exponentially,
* and record the current timestamp.
* The delay is reconnect.backoff.ms * 2**(failures - 1) * (+/- 20% random jitter)
* Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
*
* @param nodeState The node state object to update
*/
private void updateReconnectBackoff(NodeConnectionState nodeState) {
if (this.reconnectBackoffMaxMs > this.reconnectBackoffInitMs) {
nodeState.failedAttempts += 1;
double backoffExp = Math.min(nodeState.failedAttempts - 1, this.reconnectBackoffMaxExp);
double backoffFactor = Math.pow(RECONNECT_BACKOFF_EXP_BASE, backoffExp);
long reconnectBackoffMs = (long) (this.reconnectBackoffInitMs * backoffFactor);
// Actual backoff is randomized to avoid connection storms.
double randomFactor = ThreadLocalRandom.current().nextDouble(0.8, 1.2);
nodeState.reconnectBackoffMs = (long) (randomFactor * reconnectBackoffMs);
}
nodeState.reconnectBackoffMs = reconnectBackoff.backoff(nodeState.failedAttempts);
nodeState.failedAttempts++;
ctan888 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Increment the failure counter and update the node connection setup timeout exponentially.
* The delay is socket.connection.setup.timeout.ms * 2**(failures) * (+/- 20% random jitter)
* Up to a (pre-jitter) maximum of reconnect.backoff.max.ms
*
* @param nodeState The node state object to update
*/
private void updateConnectionSetupTimeout(NodeConnectionState nodeState) {
nodeState.failedConnectAttempts++;
nodeState.connectionSetupTimeoutMs = connectionSetupTimeout.backoff(nodeState.failedConnectAttempts);
}

/**
Expand Down Expand Up @@ -357,6 +402,44 @@ private NodeConnectionState nodeState(String id) {
return state;
}

/**
* Get the id set of nodes which are in CONNECTING state
*/
public Set<String> connectingNodes() {
return this.connectingNodes;
}

/**
* Get the timestamp of the latest connection attempt of a given node
* @param id the connection to fetch the state for
*/
public long lastConnectAttemptMs(String id) {
NodeConnectionState nodeState = this.nodeState.get(id);
return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
}

/**
* Get the current socket connection setup timeout of the given node.
* The base value is defined via socket.connection.setup.timeout.
* @param id the connection to fetch the state for
*/
public long connectionSetupTimeoutMs(String id) {
ctan888 marked this conversation as resolved.
Show resolved Hide resolved
NodeConnectionState nodeState = this.nodeState(id);
return nodeState.connectionSetupTimeoutMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ensure that nodeState is not null here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The caller will ensure that the node is in the connecting state. I'll add an IllegalStateException here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather prefer to handle this like we did in lastConnectAttemptMs in order to remain consistent. If nodeState is null, we can return 0.

Copy link
Contributor Author

@ctan888 ctan888 Jun 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When NetworkClient initializes a connection to a given node (NetworkClient::initiateConnect), it's guaranteed that the nodeState will get initialized and won't be null. I think it's probably not reasonable if the caller wants to get the connection timeout of a given node before the connection initialization, which is the reason I prevent this kind of calling by throwing the exception.

However, it might be reasonable for a caller to get the lastConnectAttemptMs before initializing the connection. For example, the node provider wants to provide a node with the least recent connection attempt. For those nodes haven't been connected yet, their NodeConnectionState does not exist. However, this implies that the node has the highest priority and we may assume their lastConnectAttemptMs is 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. In this case, let's reuse the nodeState method which check null and throws an IllegalStateException as you do here. We may be able to use it in isConnectionSetupTimeout as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Now isConnectionSetupTimeout is also using this checker.

}

/**
* Test if the connection to the given node has reached its timeout
* @param id the connection to fetch the state for
* @param now the current time in ms
*/
public boolean isConnectionSetupTimeout(String id, long now) {
NodeConnectionState nodeState = this.nodeState(id);
if (nodeState.state != ConnectionState.CONNECTING)
throw new IllegalStateException("Node " + id + " is not in connecting state");
return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we also check that the node is in connecting state?

Copy link
Contributor Author

@ctan888 ctan888 Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to check if the node is in connecting state because the caller is only applying this test to all the nodes in the connecting state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is indeed true today as the caller only calls with nodes in connectingNodes but that may not be true forever. I would add the check as suggested by Rajini here to make the implementation safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Checker added.

ctan888 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* The state of our connection to a node.
*/
Expand All @@ -366,7 +449,9 @@ private static class NodeConnectionState {
AuthenticationException authenticationException;
long lastConnectAttemptMs;
long failedAttempts;
long failedConnectAttempts;
long reconnectBackoffMs;
long connectionSetupTimeoutMs;
// Connection is being throttled if current time < throttleUntilTimeMs.
long throttleUntilTimeMs;
private List<InetAddress> addresses;
Expand All @@ -375,14 +460,15 @@ private static class NodeConnectionState {
private final ClientDnsLookup clientDnsLookup;

private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
String host, ClientDnsLookup clientDnsLookup) {
long connectionSetupTimeoutMs, String host, ClientDnsLookup clientDnsLookup) {
this.state = state;
this.addresses = Collections.emptyList();
this.addressIndex = -1;
this.authenticationException = null;
this.lastConnectAttemptMs = lastConnectAttempt;
this.failedAttempts = 0;
this.reconnectBackoffMs = reconnectBackoffMs;
this.connectionSetupTimeoutMs = connectionSetupTimeoutMs;
this.throttleUntilTimeMs = 0;
this.host = host;
this.clientDnsLookup = clientDnsLookup;
Expand Down
Expand Up @@ -103,6 +103,14 @@ public class CommonClientConfigs {
Utils.join(SecurityProtocol.names(), ", ") + ".";
public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";

public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms";
public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.";
public static final Long DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS = 10 * 1000L;

public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms";
public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.";
public static final Long DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS = 127 * 1000L;

public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config.";

Expand Down
52 changes: 49 additions & 3 deletions clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
Expand Up @@ -62,6 +62,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -140,6 +141,8 @@ public NetworkClient(Selectable selector,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
Expand All @@ -155,6 +158,8 @@ public NetworkClient(Selectable selector,
socketSendBuffer,
socketReceiveBuffer,
defaultRequestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
clientDnsLookup,
time,
discoverBrokerVersions,
Expand All @@ -172,6 +177,8 @@ public NetworkClient(Selectable selector,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
Expand All @@ -188,6 +195,8 @@ public NetworkClient(Selectable selector,
socketSendBuffer,
socketReceiveBuffer,
defaultRequestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
clientDnsLookup,
time,
discoverBrokerVersions,
Expand All @@ -205,6 +214,8 @@ public NetworkClient(Selectable selector,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
Expand All @@ -220,6 +231,8 @@ public NetworkClient(Selectable selector,
socketSendBuffer,
socketReceiveBuffer,
defaultRequestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
clientDnsLookup,
time,
discoverBrokerVersions,
Expand All @@ -238,6 +251,8 @@ private NetworkClient(MetadataUpdater metadataUpdater,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
Expand All @@ -258,7 +273,9 @@ private NetworkClient(MetadataUpdater metadataUpdater,
this.selector = selector;
this.clientId = clientId;
this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax, logContext);
this.connectionStates = new ClusterConnectionStates(
reconnectBackoffMs, reconnectBackoffMax,
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext);
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
Expand Down Expand Up @@ -554,6 +571,7 @@ public List<ClientResponse> poll(long timeout, long now) {
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutConnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);

Expand Down Expand Up @@ -643,7 +661,8 @@ public void close() {
/**
* Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will
* prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
* connection if all existing connections are in use. This method will never choose a node for which there is no
* connection if all existing connections are in use. If no connection exists, this method will prefer a node
* with least recent connection attempts. This method will never choose a node for which there is no
* existing connection and from which we have disconnected within the reconnect backoff period, or an active
* connection which is being throttled.
*
Expand Down Expand Up @@ -678,7 +697,11 @@ public Node leastLoadedNode(long now) {
} else if (connectionStates.isPreparingConnection(node.idString())) {
foundConnecting = node;
} else if (canConnect(node, now)) {
foundCanConnect = node;
if (foundCanConnect == null ||
this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
this.connectionStates.lastConnectAttemptMs(node.idString())) {
foundCanConnect = node;
}
ctan888 marked this conversation as resolved.
Show resolved Hide resolved
} else {
log.trace("Removing node {} from least loaded node selection since it is neither ready " +
"for sending or connecting", node);
Expand Down Expand Up @@ -786,6 +809,29 @@ private void handleAbortedSends(List<ClientResponse> responses) {
abortedSends.clear();
}

/**
* Handle socket channel connection timeout. The timeout will hit iff a connection
* stays at the ConnectionState.CONNECTING state longer than the timeout value,
* as indicated by ClusterConnectionStates.NodeConnectionState.
*
* @param responses The list of responses to update
* @param now The current time
*/
private void handleTimedOutConnections(List<ClientResponse> responses, long now) {
Set<String> connectingNodes = connectionStates.connectingNodes();
for (String nodeId : connectingNodes) {
if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
this.selector.close(nodeId);
log.debug(
"Disconnecting from node {} due to socket connection setup timeout. " +
"The timeout value is {} ms.",
nodeId,
connectionStates.connectionSetupTimeoutMs(nodeId));
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Thanks for reporting the exception in this code.
@d8tltanc @dajac This code segment is unsafe, we are removing node from connectingNodes in pprocessDisconnection() while iterating over the set. We must be missing a test too (or we have a test with only one connection).

}
}
}

/**
* Handle any completed request send. In particular if no response is expected consider the request complete.
*
Expand Down