Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12193: Re-resolve IPs after a client disconnects #9902

Merged
merged 4 commits into from Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -106,8 +106,9 @@ public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time ti
clientSaslMechanism, time, true, logContext);
}

static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
InetAddress[] addresses = InetAddress.getAllByName(host);
static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup,
HostResolver hostResolver) throws UnknownHostException {
Comment on lines +109 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method and its usages look a bit weird to me now. It resolves an host based on clientDnsLookup and a hostResolver which resolves an host at its turn. We also have to pass the HostResolver wherever it is called now. I wonder if we should avoid this level of indirection here and directly encapsulate the entire logic in the HostResolver.

HostResolver would look like this:

public interface HostResolver {
    List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException;
}

In NodeConnectionState#currentAddress, we could replace addresses = ClientUtils.resolve(host, clientDnsLookup, hostResolver) by addresses = hostResolver.resolve(host, clientDnsLookup).

The DefaultHostResolver would do what ClientUtils.resolve does today and we can mock it in tests like you did.

Is this something that you have considered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac I started making this change, but while working on it, I realized that I don't think I like requiring the HostResolver to understand the client.dns.lookup config. In the same way that MockTime is only used to mock Java built-in methods, I think the HostResolver should only be responsible for the DNS resolution (either real or mocked), and that ClientUtils should be responsible for understanding the Kafka application behavior. That way, tests that mock DNS still test the actual ClientDnsLookup behavior, rather than relying on both the default resolver and any mocked resolvers handle it correctly. It also means that if we ever add an option for ClientDnsLookup or change its behavior, we don't need to remember to update all HostResolver implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bob-barrett. I do agree with your point. Let's keep it as you suggested.

InetAddress[] addresses = hostResolver.resolve(host);

switch (clientDnsLookup) {
case DEFAULT:
Expand Down
Expand Up @@ -43,13 +43,14 @@ final class ClusterConnectionStates {
final static double CONNECTION_SETUP_TIMEOUT_JITTER = 0.2;
private final Map<String, NodeConnectionState> nodeState;
private final Logger log;
private final HostResolver hostResolver;
private Set<String> connectingNodes;
private ExponentialBackoff reconnectBackoff;
private ExponentialBackoff connectionSetupTimeout;

public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs,
long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs,
LogContext logContext) {
LogContext logContext, HostResolver hostResolver) {
this.log = logContext.logger(ClusterConnectionStates.class);
this.reconnectBackoff = new ExponentialBackoff(
reconnectBackoffMs,
Expand All @@ -63,6 +64,7 @@ public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMax
CONNECTION_SETUP_TIMEOUT_JITTER);
this.nodeState = new HashMap<>();
this.connectingNodes = new HashSet<>();
this.hostResolver = hostResolver;
}

/**
Expand Down Expand Up @@ -156,7 +158,8 @@ public void connecting(String id, long now, String host, ClientDnsLookup clientD
// Create a new NodeConnectionState if nodeState does not already contain one
// for the specified id or if the hostname associated with the node id changed.
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host, clientDnsLookup));
reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host,
clientDnsLookup, hostResolver));
connectingNodes.add(id);
}

Expand All @@ -183,6 +186,11 @@ public void disconnected(String id, long now) {
connectingNodes.remove(id);
} else {
resetConnectionSetupTimeout(nodeState);
if (nodeState.state.isConnected()) {
// If a connection had previously been established, clear the addresses to trigger a new DNS resolution
// because the node IPs may have changed
nodeState.clearAddresses();
}
}
nodeState.state = ConnectionState.DISCONNECTED;
}
Expand Down Expand Up @@ -470,9 +478,11 @@ private static class NodeConnectionState {
private int addressIndex;
private final String host;
private final ClientDnsLookup clientDnsLookup;
private final HostResolver hostResolver;

private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
long connectionSetupTimeoutMs, String host, ClientDnsLookup clientDnsLookup) {
long connectionSetupTimeoutMs, String host, ClientDnsLookup clientDnsLookup,
HostResolver hostResolver) {
this.state = state;
this.addresses = Collections.emptyList();
this.addressIndex = -1;
Expand All @@ -484,6 +494,7 @@ private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long
this.throttleUntilTimeMs = 0;
this.host = host;
this.clientDnsLookup = clientDnsLookup;
this.hostResolver = hostResolver;
}

public String host() {
Expand All @@ -498,7 +509,7 @@ public String host() {
private InetAddress currentAddress() throws UnknownHostException {
if (addresses.isEmpty()) {
// (Re-)initialize list
addresses = ClientUtils.resolve(host, clientDnsLookup);
addresses = ClientUtils.resolve(host, clientDnsLookup, hostResolver);
addressIndex = 0;
}

Expand All @@ -518,6 +529,13 @@ private void moveToNextAddress() {
addresses = Collections.emptyList(); // Exhausted list. Re-resolve on next currentAddress() call
}

/**
* Clears the resolved addresses in order to trigger re-resolving on the next {@link #currentAddress()} call.
*/
private void clearAddresses() {
addresses = Collections.emptyList();
}

public String toString() {
return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ", " + throttleUntilTimeMs + ")";
}
Expand Down
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class DefaultHostResolver implements HostResolver {

@Override
public InetAddress[] resolve(String host) throws UnknownHostException {
return InetAddress.getAllByName(host);
}
}
26 changes: 26 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/HostResolver.java
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients;

import java.net.InetAddress;
import java.net.UnknownHostException;

public interface HostResolver {

InetAddress[] resolve(String host) throws UnknownHostException;
}
80 changes: 41 additions & 39 deletions clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
Expand Up @@ -146,9 +146,8 @@ public NetworkClient(Selectable selector,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
LogContext logContext) {
this(null,
this(selector,
metadata,
selector,
clientId,
maxInFlightRequestsPerConnection,
reconnectBackoffMs,
Expand All @@ -167,22 +166,22 @@ public NetworkClient(Selectable selector,
}

public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
Metadata metadata,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
this(null,
metadata,
selector,
Expand All @@ -200,7 +199,8 @@ public NetworkClient(Selectable selector,
discoverBrokerVersions,
apiVersions,
throttleTimeSensor,
logContext);
logContext,
new DefaultHostResolver());
}

public NetworkClient(Selectable selector,
Expand Down Expand Up @@ -236,27 +236,29 @@ public NetworkClient(Selectable selector,
discoverBrokerVersions,
apiVersions,
null,
logContext);
logContext,
new DefaultHostResolver());
}

private NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
public NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext,
HostResolver hostResolver) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
Expand All @@ -273,7 +275,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
this.connectionStates = new ClusterConnectionStates(
reconnectBackoffMs, reconnectBackoffMax,
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext);
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext, hostResolver);
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
Expand Down
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;

import java.net.InetAddress;

class AddressChangeHostResolver implements HostResolver {
private boolean useNewAddresses;
private InetAddress[] initialAddresses;
private InetAddress[] newAddresses;
private int resolutionCount = 0;

public AddressChangeHostResolver(InetAddress[] initialAddresses, InetAddress[] newAddresses) {
this.initialAddresses = initialAddresses;
this.newAddresses = newAddresses;
}

@Override
public InetAddress[] resolve(String host) {
++resolutionCount;
return useNewAddresses ? newAddresses : initialAddresses;
}

public void changeAddresses() {
useNewAddresses = true;
}

public boolean useNewAddresses() {
return useNewAddresses;
}

public int resolutionCount() {
return resolutionCount;
}
}
Expand Up @@ -33,6 +33,7 @@

public class ClientUtilsTest {

private HostResolver hostResolver = new DefaultHostResolver();

@Test
public void testParseAndValidateAddresses() throws UnknownHostException {
Expand Down Expand Up @@ -102,25 +103,25 @@ public void testFilterPreferredAddresses() throws UnknownHostException {
@Test
public void testResolveUnknownHostException() {
assertThrows(UnknownHostException.class,
() -> ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS));
() -> ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver));
}

@Test
public void testResolveDnsLookup() throws UnknownHostException {
// Note that kafka.apache.org resolves to at least 2 IP addresses
assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT).size());
assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT, hostResolver).size());
}

@Test
public void testResolveDnsLookupAllIps() throws UnknownHostException {
// Note that kafka.apache.org resolves to at least 2 IP addresses
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver).size() > 1);
}

@Test
public void testResolveDnsLookupResolveCanonicalBootstrapServers() throws UnknownHostException {
// Note that kafka.apache.org resolves to at least 2 IP addresses
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY).size() > 1);
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, hostResolver).size() > 1);
}

private List<InetSocketAddress> checkWithoutLookup(String... url) {
Expand Down