Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12193: Re-resolve IPs after a client disconnects #9902

Merged
merged 4 commits into from Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -106,8 +106,9 @@ public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time ti
clientSaslMechanism, time, true, logContext);
}

static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException {
InetAddress[] addresses = InetAddress.getAllByName(host);
static List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup,
HostResolver hostResolver) throws UnknownHostException {
Comment on lines +109 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method and its usages look a bit weird to me now. It resolves an host based on clientDnsLookup and a hostResolver which resolves an host at its turn. We also have to pass the HostResolver wherever it is called now. I wonder if we should avoid this level of indirection here and directly encapsulate the entire logic in the HostResolver.

HostResolver would look like this:

public interface HostResolver {
    List<InetAddress> resolve(String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException;
}

In NodeConnectionState#currentAddress, we could replace addresses = ClientUtils.resolve(host, clientDnsLookup, hostResolver) by addresses = hostResolver.resolve(host, clientDnsLookup).

The DefaultHostResolver would do what ClientUtils.resolve does today and we can mock it in tests like you did.

Is this something that you have considered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac I started making this change, but while working on it, I realized that I don't think I like requiring the HostResolver to understand the client.dns.lookup config. In the same way that MockTime is only used to mock Java built-in methods, I think the HostResolver should only be responsible for the DNS resolution (either real or mocked), and that ClientUtils should be responsible for understanding the Kafka application behavior. That way, tests that mock DNS still test the actual ClientDnsLookup behavior, rather than relying on both the default resolver and any mocked resolvers handle it correctly. It also means that if we ever add an option for ClientDnsLookup or change its behavior, we don't need to remember to update all HostResolver implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bob-barrett. I do agree with your point. Let's keep it as you suggested.

InetAddress[] addresses = hostResolver.resolve(host);

switch (clientDnsLookup) {
case DEFAULT:
Expand Down
Expand Up @@ -43,13 +43,14 @@ final class ClusterConnectionStates {
final static double CONNECTION_SETUP_TIMEOUT_JITTER = 0.2;
private final Map<String, NodeConnectionState> nodeState;
private final Logger log;
private final HostResolver hostResolver;
private Set<String> connectingNodes;
private ExponentialBackoff reconnectBackoff;
private ExponentialBackoff connectionSetupTimeout;

public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs,
long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs,
LogContext logContext) {
LogContext logContext, HostResolver hostResolver) {
this.log = logContext.logger(ClusterConnectionStates.class);
this.reconnectBackoff = new ExponentialBackoff(
reconnectBackoffMs,
Expand All @@ -63,6 +64,7 @@ public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMax
CONNECTION_SETUP_TIMEOUT_JITTER);
this.nodeState = new HashMap<>();
this.connectingNodes = new HashSet<>();
this.hostResolver = hostResolver;
}

/**
Expand Down Expand Up @@ -156,7 +158,8 @@ public void connecting(String id, long now, String host, ClientDnsLookup clientD
// Create a new NodeConnectionState if nodeState does not already contain one
// for the specified id or if the hostname associated with the node id changed.
nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host, clientDnsLookup));
reconnectBackoff.backoff(0), connectionSetupTimeout.backoff(0), host,
clientDnsLookup, hostResolver));
connectingNodes.add(id);
}

Expand All @@ -183,6 +186,10 @@ public void disconnected(String id, long now) {
connectingNodes.remove(id);
} else {
resetConnectionSetupTimeout(nodeState);
if (nodeState.state.isConnected()) {
// If a connection had previously been established, re-resolve DNS because the IPs may have changed
nodeState.addresses = Collections.emptyList();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly weird that we're updating a "private" field here.

Also the comment is a bit misleading. We're not re-resolving DNS here but instead clearing state so if we reconnect later, the client will be forced to re-resolve then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison Java allows outer classes to access inner member's private members. This is described in detail at JLS-6.6.1

A member (class, interface, field, or method) of a reference (class, interface, or array) type or a constructor of a class type is accessible only if the type is accessible and the member or constructor is declared to permit access:

  • if the member or constructor is declared private, then access is permitted if and only if it occurs within the body of the top level class (§7.6) that encloses the declaration of the member or constructor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @mimaison on the comment, good to make it clear by mentioning that the addresses are cleared to re-resolve later when it reconnects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accessing the field is allowed, but I think @mimaison meant that it makes the code a little confusing and inconsistent. NodeConnectionState has some private fields and some package-private ones, and addresses previous was only modified by methods on NodeConnectionState, not accessed directly by the top-level class. I've moved the empty list assignment into a NodeConnectionState.clearAddresses method

Agree about the comment, I've updated it

}
}
nodeState.state = ConnectionState.DISCONNECTED;
}
Expand Down Expand Up @@ -470,9 +477,11 @@ private static class NodeConnectionState {
private int addressIndex;
private final String host;
private final ClientDnsLookup clientDnsLookup;
private final HostResolver hostResolver;

private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,
long connectionSetupTimeoutMs, String host, ClientDnsLookup clientDnsLookup) {
long connectionSetupTimeoutMs, String host, ClientDnsLookup clientDnsLookup,
HostResolver hostResolver) {
this.state = state;
this.addresses = Collections.emptyList();
this.addressIndex = -1;
Expand All @@ -484,6 +493,7 @@ private NodeConnectionState(ConnectionState state, long lastConnectAttempt, long
this.throttleUntilTimeMs = 0;
this.host = host;
this.clientDnsLookup = clientDnsLookup;
this.hostResolver = hostResolver;
}

public String host() {
Expand All @@ -498,7 +508,7 @@ public String host() {
private InetAddress currentAddress() throws UnknownHostException {
if (addresses.isEmpty()) {
// (Re-)initialize list
addresses = ClientUtils.resolve(host, clientDnsLookup);
addresses = ClientUtils.resolve(host, clientDnsLookup, hostResolver);
addressIndex = 0;
}

Expand Down
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class DefaultHostResolver implements HostResolver {

@Override
public InetAddress[] resolve(String host) throws UnknownHostException {
return InetAddress.getAllByName(host);
}
}
bob-barrett marked this conversation as resolved.
Show resolved Hide resolved
26 changes: 26 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/HostResolver.java
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients;

import java.net.InetAddress;
import java.net.UnknownHostException;

public interface HostResolver {

InetAddress[] resolve(String host) throws UnknownHostException;
}
bob-barrett marked this conversation as resolved.
Show resolved Hide resolved
80 changes: 61 additions & 19 deletions clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
Expand Up @@ -130,6 +130,8 @@ private enum State {

private final AtomicReference<State> state;

private final HostResolver hostResolver;

public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
Expand Down Expand Up @@ -239,24 +241,63 @@ public NetworkClient(Selectable selector,
logContext);
}

private NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
public NetworkClient(MetadataUpdater metadataUpdater,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this constructor? As far as I can tell, it's only called by the other 3 above. These could directly call the real one below instead of going through this new one. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, fixed. These are also weirdly inconsistent (selector is before metadata and metadataUpdater sometimes, after them other times), which I guess we could clean up, but I didn't want to remove any existing public constructors.

Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
this(metadataUpdater,
metadata,
selector,
clientId, maxInFlightRequestsPerConnection,
reconnectBackoffMs,
reconnectBackoffMax,
socketSendBuffer,
socketReceiveBuffer,
defaultRequestTimeoutMs,
connectionSetupTimeoutMs,
connectionSetupTimeoutMaxMs,
clientDnsLookup,
time,
discoverBrokerVersions,
apiVersions,
throttleTimeSensor,
logContext,
new DefaultHostResolver());
}

public NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
long reconnectBackoffMax,
int socketSendBuffer,
int socketReceiveBuffer,
int defaultRequestTimeoutMs,
long connectionSetupTimeoutMs,
long connectionSetupTimeoutMaxMs,
ClientDnsLookup clientDnsLookup,
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext,
HostResolver hostResolver) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
Expand All @@ -273,7 +314,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
this.connectionStates = new ClusterConnectionStates(
reconnectBackoffMs, reconnectBackoffMax,
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext);
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, logContext, hostResolver);
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
Expand All @@ -287,6 +328,7 @@ private NetworkClient(MetadataUpdater metadataUpdater,
this.log = logContext.logger(NetworkClient.class);
this.clientDnsLookup = clientDnsLookup;
this.state = new AtomicReference<>(State.ACTIVE);
this.hostResolver = hostResolver;
}

/**
Expand Down
Expand Up @@ -33,6 +33,7 @@

public class ClientUtilsTest {

private HostResolver hostResolver = new DefaultHostResolver();

@Test
public void testParseAndValidateAddresses() throws UnknownHostException {
Expand Down Expand Up @@ -102,25 +103,25 @@ public void testFilterPreferredAddresses() throws UnknownHostException {
@Test
public void testResolveUnknownHostException() {
assertThrows(UnknownHostException.class,
() -> ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS));
() -> ClientUtils.resolve("some.invalid.hostname.foo.bar.local", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver));
}

@Test
public void testResolveDnsLookup() throws UnknownHostException {
// Note that kafka.apache.org resolves to at least 2 IP addresses
assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT).size());
assertEquals(1, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.DEFAULT, hostResolver).size());
}

@Test
public void testResolveDnsLookupAllIps() throws UnknownHostException {
// Note that kafka.apache.org resolves to at least 2 IP addresses
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver).size() > 1);
}

@Test
public void testResolveDnsLookupResolveCanonicalBootstrapServers() throws UnknownHostException {
// Note that kafka.apache.org resolves to at least 2 IP addresses
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY).size() > 1);
assertTrue(ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, hostResolver).size() > 1);
}

private List<InetSocketAddress> checkWithoutLookup(String... url) {
Expand Down
Expand Up @@ -53,12 +53,15 @@ public class ClusterConnectionStatesTest {
private final String hostTwoIps = "kafka.apache.org";

private ClusterConnectionStates connectionStates;
private HostResolver hostResolver;

@BeforeEach
public void setup() {
this.connectionStates = new ClusterConnectionStates(
reconnectBackoffMs, reconnectBackoffMax,
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new LogContext());
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new LogContext(),
new DefaultHostResolver());
this.hostResolver = new DefaultHostResolver();
}

@Test
Expand Down Expand Up @@ -253,7 +256,7 @@ public void testSingleIPWithDefault() throws UnknownHostException {

@Test
public void testSingleIPWithUseAll() throws UnknownHostException {
assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.USE_ALL_DNS_IPS).size());
assertEquals(1, ClientUtils.resolve("localhost", ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver).size());

connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.USE_ALL_DNS_IPS);
InetAddress currAddress = connectionStates.currentAddress(nodeId1);
Expand All @@ -263,7 +266,7 @@ public void testSingleIPWithUseAll() throws UnknownHostException {

@Test
public void testMultipleIPsWithDefault() throws UnknownHostException {
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver).size() > 1);

connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
InetAddress currAddress = connectionStates.currentAddress(nodeId1);
Expand All @@ -273,7 +276,7 @@ public void testMultipleIPsWithDefault() throws UnknownHostException {

@Test
public void testMultipleIPsWithUseAll() throws UnknownHostException {
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver).size() > 1);

connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS);
InetAddress addr1 = connectionStates.currentAddress(nodeId1);
Expand All @@ -287,7 +290,7 @@ public void testMultipleIPsWithUseAll() throws UnknownHostException {

@Test
public void testHostResolveChange() throws UnknownHostException, ReflectiveOperationException {
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size() > 1);
assertTrue(ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS, hostResolver).size() > 1);

connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
InetAddress addr1 = connectionStates.currentAddress(nodeId1);
Expand Down