Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log when probe succeeds but full connection fails #51304

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.HandshakingTransportAddressConnector;
import org.elasticsearch.discovery.PeerFinder;
import org.elasticsearch.discovery.SeedHostsResolver;
import org.elasticsearch.discovery.SettingsBasedSeedHostsProvider;
Expand Down Expand Up @@ -466,7 +467,9 @@ public void apply(Settings value, Settings current, Settings previous) {
TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING,
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING,
ClusterBootstrapService.UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING,
LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING);
LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING,
HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING,
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING);

static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionL

@Override
protected void doRun() {
// TODO if transportService is already connected to this address then skip the handshaking
// We could skip this if the transportService were already connected to the given address, but the savings would be minimal
// so we open a new connection anyway.

final DiscoveryNode targetNode = new DiscoveryNode("", transportAddress.toString(),
UUIDs.randomBase64UUID(Randomness.get()), // generated deterministically for reproducible tests
Expand All @@ -99,17 +100,30 @@ protected void innerOnResponse(DiscoveryNode remoteNode) {
IOUtils.closeWhileHandlingException(connection);

if (remoteNode.equals(transportService.getLocalNode())) {
// TODO cache this result for some time? forever?
listener.onFailure(new ConnectTransportException(remoteNode, "local node found"));
} else if (remoteNode.isMasterNode() == false) {
// TODO cache this result for some time?
listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found"));
} else {
transportService.connectToNode(remoteNode, ActionListener.delegateFailure(listener,
(l, ignored) -> {
logger.trace("[{}] full connection successful: {}", thisConnectionAttempt, remoteNode);
transportService.connectToNode(remoteNode, new ActionListener<>() {
@Override
public void onResponse(Void ignored) {
logger.trace("[{}] completed full connection with [{}]", thisConnectionAttempt, remoteNode);
listener.onResponse(remoteNode);
}));
}

@Override
public void onFailure(Exception e) {
// we opened a connection and successfully performed a handshake, so we're definitely
// talking to a master-eligible node with a matching cluster name and a good version, but
// the attempt to open a full connection to its publish address failed; a common reason is
// that the remote node is listening on 0.0.0.0 but has made an inappropriate choice for its
// publish address.
logger.warn(new ParameterizedMessage(
"[{}] completed handshake with [{}] but followup connection failed",
thisConnectionAttempt, remoteNode), e);
listener.onFailure(e);
}
});
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,27 @@

package org.elasticsearch.discovery;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportService.HandshakeResponse;
Expand All @@ -44,17 +55,22 @@
import static org.elasticsearch.discovery.HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.oneOf;

public class HandshakingTransportAddressConnectorTests extends ESTestCase {

private DiscoveryNode remoteNode;
private TransportAddress discoveryAddress;
private TransportService transportService;
private ThreadPool threadPool;
private String remoteClusterName;
private HandshakingTransportAddressConnector handshakingTransportAddressConnector;
private DiscoveryNode localNode;

private boolean dropHandshake;
@Nullable // unless we want the full connection to fail
private TransportException fullConnectionFailure;

@Before
public void startServices() {
Expand All @@ -66,17 +82,24 @@ public void startServices() {
threadPool = new TestThreadPool("node", settings);

remoteNode = null;
discoveryAddress = null;
remoteClusterName = null;
dropHandshake = false;
fullConnectionFailure = null;

final MockTransport mockTransport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
super.onSendRequest(requestId, action, request, node);
assertThat(action, equalTo(TransportService.HANDSHAKE_ACTION_NAME));
assertEquals(remoteNode.getAddress(), node.getAddress());
assertThat(discoveryAddress, notNullValue());
assertThat(node.getAddress(), oneOf(discoveryAddress, remoteNode.getAddress()));
if (dropHandshake == false) {
handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT));
if (fullConnectionFailure != null && node.getAddress().equals(remoteNode.getAddress())) {
handleError(requestId, fullConnectionFailure);
} else {
handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT));
}
}
}
};
Expand All @@ -91,7 +114,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
}

@After
public void stopServices() throws InterruptedException {
public void stopServices() {
transportService.stop();
terminate(threadPool);
}
Expand All @@ -102,8 +125,9 @@ public void testConnectsToMasterNode() throws InterruptedException {

remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
remoteClusterName = "local-cluster";
discoveryAddress = getDiscoveryAddress();

handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), new ActionListener<DiscoveryNode>() {
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, new ActionListener<DiscoveryNode>() {
@Override
public void onResponse(DiscoveryNode discoveryNode) {
receivedNode.set(discoveryNode);
Expand All @@ -120,44 +144,84 @@ public void onFailure(Exception e) {
assertEquals(remoteNode, receivedNode.get());
}

@TestLogging(reason="ensure logging happens", value="org.elasticsearch.discovery.HandshakingTransportAddressConnector:INFO")
public void testLogsFullConnectionFailureAfterSuccessfulHandshake() throws Exception {

remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
remoteClusterName = "local-cluster";
discoveryAddress = buildNewFakeTransportAddress();

fullConnectionFailure = new ConnectTransportException(remoteNode, "simulated", new ElasticsearchException("root cause"));

FailureListener failureListener = new FailureListener();

MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"message",
HandshakingTransportAddressConnector.class.getCanonicalName(),
Level.WARN,
"*completed handshake with [*] but followup connection failed*"));
Logger targetLogger = LogManager.getLogger(HandshakingTransportAddressConnector.class);
Loggers.addAppender(targetLogger, mockAppender);

try {
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener);
failureListener.assertFailure();
mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(targetLogger, mockAppender);
mockAppender.stop();
}
}

public void testDoesNotConnectToNonMasterNode() throws InterruptedException {
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
discoveryAddress = getDiscoveryAddress();
remoteClusterName = "local-cluster";

FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener);
failureListener.assertFailure();
}

public void testDoesNotConnectToLocalNode() throws Exception {
remoteNode = localNode;
discoveryAddress = getDiscoveryAddress();
remoteClusterName = "local-cluster";

FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener);
failureListener.assertFailure();
}

public void testDoesNotConnectToDifferentCluster() throws InterruptedException {
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
discoveryAddress = getDiscoveryAddress();
remoteClusterName = "another-cluster";

FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener);
failureListener.assertFailure();
}

public void testHandshakeTimesOut() throws InterruptedException {
remoteNode = new DiscoveryNode("remote-node", buildNewFakeTransportAddress(), Version.CURRENT);
discoveryAddress = getDiscoveryAddress();
remoteClusterName = "local-cluster";
dropHandshake = true;

FailureListener failureListener = new FailureListener();
handshakingTransportAddressConnector.connectToRemoteMasterNode(remoteNode.getAddress(), failureListener);
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener);
Thread.sleep(PROBE_HANDSHAKE_TIMEOUT_SETTING.get(Settings.EMPTY).millis());
failureListener.assertFailure();
}

private TransportAddress getDiscoveryAddress() {
return randomBoolean() ? remoteNode.getAddress() : buildNewFakeTransportAddress();
}

private class FailureListener implements ActionListener<DiscoveryNode> {
final CountDownLatch completionLatch = new CountDownLatch(1);

Expand Down