From ba9ecca1a1a0d044499d3490bea819633f375cfd Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 6 Apr 2023 16:12:39 -0500 Subject: [PATCH 1/2] Fixing BulkProcessor2RetryIT --- .../action/bulk/BulkProcessor2RetryIT.java | 3 +- .../TransportClientNodesService.java | 96 +++++++++++-------- 2 files changed, 57 insertions(+), 42 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java index 205457b264bb3..b22fe7706e2e5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java @@ -58,7 +58,6 @@ public void testBulkRejectionLoadWithoutBackoff() throws Throwable { // value = "org.elasticsearch.action.bulk.Retry2:trace", // reason = "Logging information about locks useful for tracking down deadlock" // ) - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/94941") public void testBulkRejectionLoadWithBackoff() throws Throwable { boolean rejectedExecutionExpected = false; executeBulkRejectionLoad(8, rejectedExecutionExpected); @@ -128,6 +127,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure) rejectedAfterAllRetries = true; } // ignored, we exceeded the write queue size when dispatching the initial bulk request + } else if (ExceptionsHelper.status(failureTuple.v2()) == RestStatus.SERVICE_UNAVAILABLE) { + // The test framework throws this at us sometimes } else { Throwable t = failureTuple.v2(); // we're not expecting any other errors diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 49c10f3f606cb..ac3a4ebab2fb7 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -410,50 +410,64 @@ protected void doSample() { HashSet newNodes = new HashSet<>(); ArrayList newFilteredNodes = new ArrayList<>(); for (DiscoveryNode listedNode : listedNodes) { - try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)) { - final PlainTransportFuture handler = new PlainTransportFuture<>( - new FutureTransportResponseHandler() { - @Override - public LivenessResponse read(StreamInput in) throws IOException { - return new LivenessResponse(in); + transportService.openConnection(listedNode, LISTED_NODES_PROFILE, new ActionListener() { + @Override + public void onResponse(Transport.Connection connection) { + try { + final PlainTransportFuture handler = new PlainTransportFuture<>( + new FutureTransportResponseHandler() { + @Override + public LivenessResponse read(StreamInput in) throws IOException { + return new LivenessResponse(in); + } + } + ); + transportService.sendRequest( + connection, + TransportLivenessAction.NAME, + new LivenessRequest(), + TransportRequestOptions.of(pingTimeout, TransportRequestOptions.Type.STATE), + handler + ); + final LivenessResponse livenessResponse = handler.txGet(); + if (ignoreClusterName == false && clusterName.equals(livenessResponse.getClusterName()) == false) { + logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName); + newFilteredNodes.add(listedNode); + } else { + // use discovered information but do keep the original transport address, + // so people can control which address is exactly used. + DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode(); + newNodes.add( + new DiscoveryNode( + nodeWithInfo.getName(), + nodeWithInfo.getId(), + nodeWithInfo.getEphemeralId(), + nodeWithInfo.getHostName(), + nodeWithInfo.getHostAddress(), + listedNode.getAddress(), + nodeWithInfo.getAttributes(), + nodeWithInfo.getRoles(), + nodeWithInfo.getVersion() + ) + ); + } + } catch (ConnectTransportException e) { + logger.debug(() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e); + hostFailureListener.onNodeDisconnected(listedNode, e); + } catch (Exception e) { + logger.info(() -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e); + } finally { + if (connection != null) { + connection.close(); } } - ); - transportService.sendRequest( - connection, - TransportLivenessAction.NAME, - new LivenessRequest(), - TransportRequestOptions.of(pingTimeout, TransportRequestOptions.Type.STATE), - handler - ); - final LivenessResponse livenessResponse = handler.txGet(); - if (ignoreClusterName == false && clusterName.equals(livenessResponse.getClusterName()) == false) { - logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName); - newFilteredNodes.add(listedNode); - } else { - // use discovered information but do keep the original transport address, - // so people can control which address is exactly used. - DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode(); - newNodes.add( - new DiscoveryNode( - nodeWithInfo.getName(), - nodeWithInfo.getId(), - nodeWithInfo.getEphemeralId(), - nodeWithInfo.getHostName(), - nodeWithInfo.getHostAddress(), - listedNode.getAddress(), - nodeWithInfo.getAttributes(), - nodeWithInfo.getRoles(), - nodeWithInfo.getVersion() - ) - ); } - } catch (ConnectTransportException e) { - logger.debug(() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e); - hostFailureListener.onNodeDisconnected(listedNode, e); - } catch (Exception e) { - logger.info(() -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e); - } + + @Override + public void onFailure(Exception e) { + logger.warn("Unable to open connection", e); + } + }); } nodes = establishNodeConnections(newNodes); From ea85b19cb4ea86e68f3df80f0f6322d5e00aba99 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 6 Apr 2023 16:57:22 -0500 Subject: [PATCH 2/2] removing dumb change to TransportClientNodesService and fixing BulkProcessor2RetryIT --- .../action/bulk/BulkProcessor2RetryIT.java | 12 ++- .../TransportClientNodesService.java | 96 ++++++++----------- 2 files changed, 52 insertions(+), 56 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java index b22fe7706e2e5..caf2b8e73a2fe 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessor2RetryIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.query.QueryBuilders; @@ -34,6 +35,12 @@ public class BulkProcessor2RetryIT extends ESIntegTestCase { private static final String INDEX_NAME = "test"; Map requestToExecutionCountMap = new ConcurrentHashMap<>(); + /* + * We can't call ESIntegTestCase.client() from a transport thread because it winds up calling a blocking operation that trips an + * assertion error if you're doing it from the transport thread. So we stash a random client in this variable for use when we nned a + * client in a transport thread. + */ + private Client clientsForTransportThread; @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { @@ -65,6 +72,7 @@ public void testBulkRejectionLoadWithBackoff() throws Throwable { @SuppressWarnings("unchecked") private void executeBulkRejectionLoad(int maxRetries, boolean rejectedExecutionExpected) throws Throwable { + clientsForTransportThread = client(); int numberOfAsyncOps = randomIntBetween(600, 700); final CountDownLatch latch = new CountDownLatch(numberOfAsyncOps); final Set successfulResponses = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -109,6 +117,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure) } rejectedAfterAllRetries = true; } + } else if (failure.getStatus() == RestStatus.SERVICE_UNAVAILABLE) { + // The test framework throws this at us sometimes } else { throw new AssertionError("Unexpected failure status: " + failure.getStatus()); } @@ -165,7 +175,7 @@ void countAndBulk(BulkRequest request, ActionListener listener) { for (DocWriteRequest docWriteRequest : request.requests) { requestToExecutionCountMap.compute(docWriteRequest.id(), (key, value) -> value == null ? 1 : value + 1); } - client().bulk(request, listener); + clientsForTransportThread.bulk(request, listener); } } diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index ac3a4ebab2fb7..49c10f3f606cb 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -410,64 +410,50 @@ protected void doSample() { HashSet newNodes = new HashSet<>(); ArrayList newFilteredNodes = new ArrayList<>(); for (DiscoveryNode listedNode : listedNodes) { - transportService.openConnection(listedNode, LISTED_NODES_PROFILE, new ActionListener() { - @Override - public void onResponse(Transport.Connection connection) { - try { - final PlainTransportFuture handler = new PlainTransportFuture<>( - new FutureTransportResponseHandler() { - @Override - public LivenessResponse read(StreamInput in) throws IOException { - return new LivenessResponse(in); - } - } - ); - transportService.sendRequest( - connection, - TransportLivenessAction.NAME, - new LivenessRequest(), - TransportRequestOptions.of(pingTimeout, TransportRequestOptions.Type.STATE), - handler - ); - final LivenessResponse livenessResponse = handler.txGet(); - if (ignoreClusterName == false && clusterName.equals(livenessResponse.getClusterName()) == false) { - logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName); - newFilteredNodes.add(listedNode); - } else { - // use discovered information but do keep the original transport address, - // so people can control which address is exactly used. - DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode(); - newNodes.add( - new DiscoveryNode( - nodeWithInfo.getName(), - nodeWithInfo.getId(), - nodeWithInfo.getEphemeralId(), - nodeWithInfo.getHostName(), - nodeWithInfo.getHostAddress(), - listedNode.getAddress(), - nodeWithInfo.getAttributes(), - nodeWithInfo.getRoles(), - nodeWithInfo.getVersion() - ) - ); - } - } catch (ConnectTransportException e) { - logger.debug(() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e); - hostFailureListener.onNodeDisconnected(listedNode, e); - } catch (Exception e) { - logger.info(() -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e); - } finally { - if (connection != null) { - connection.close(); + try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)) { + final PlainTransportFuture handler = new PlainTransportFuture<>( + new FutureTransportResponseHandler() { + @Override + public LivenessResponse read(StreamInput in) throws IOException { + return new LivenessResponse(in); } } + ); + transportService.sendRequest( + connection, + TransportLivenessAction.NAME, + new LivenessRequest(), + TransportRequestOptions.of(pingTimeout, TransportRequestOptions.Type.STATE), + handler + ); + final LivenessResponse livenessResponse = handler.txGet(); + if (ignoreClusterName == false && clusterName.equals(livenessResponse.getClusterName()) == false) { + logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName); + newFilteredNodes.add(listedNode); + } else { + // use discovered information but do keep the original transport address, + // so people can control which address is exactly used. + DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode(); + newNodes.add( + new DiscoveryNode( + nodeWithInfo.getName(), + nodeWithInfo.getId(), + nodeWithInfo.getEphemeralId(), + nodeWithInfo.getHostName(), + nodeWithInfo.getHostAddress(), + listedNode.getAddress(), + nodeWithInfo.getAttributes(), + nodeWithInfo.getRoles(), + nodeWithInfo.getVersion() + ) + ); } - - @Override - public void onFailure(Exception e) { - logger.warn("Unable to open connection", e); - } - }); + } catch (ConnectTransportException e) { + logger.debug(() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e); + hostFailureListener.onNodeDisconnected(listedNode, e); + } catch (Exception e) { + logger.info(() -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e); + } } nodes = establishNodeConnections(newNodes);