diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java index 99e89f0e31cc5..d91a395c7f3ba 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java @@ -14,13 +14,17 @@ import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Request; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.ErrorTraceHelper; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.xcontent.XContentType; import org.junit.Before; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Collection; import java.util.function.BooleanSupplier; import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery; @@ -28,6 +32,11 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase { private BooleanSupplier hasStackTrace; + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class); + } + @Before public void setupMessageListener() { hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster()); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 0d359300bbdc1..26818002e1129 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -66,7 +66,6 @@ import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.io.UncheckedIOException; @@ -1052,8 +1051,7 @@ public void testAbortWaitsOnDataNode() throws Exception { final AtomicBoolean blocked = new AtomicBoolean(true); - final TransportService transportService = internalCluster().getInstance(TransportService.class, otherDataNode); - transportService.addMessageListener(new TransportMessageListener() { + MockTransportService.getInstance(otherDataNode).addMessageListener(new TransportMessageListener() { @Override public void onRequestSent( DiscoveryNode node, diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index b546b8cdd0f5f..1968a07b20eb4 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -60,11 +60,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -103,8 +101,7 @@ public class TransportService extends AbstractLifecycleComponent Setting.Property.Deprecated ); - private final AtomicBoolean handleIncomingRequests = new AtomicBoolean(); - private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); + private volatile boolean handleIncomingRequests; protected final Transport transport; protected final ConnectionManager connectionManager; protected final ThreadPool threadPool; @@ -134,7 +131,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { // tracer log - private final Logger tracerLog; + private static final Logger tracerLog = Loggers.getLogger(logger, ".tracer"); private final Tracer tracer; volatile String[] tracerLogInclude; @@ -291,7 +288,6 @@ public TransportService( this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings)); - tracerLog = Loggers.getLogger(logger, ".tracer"); this.taskManager = taskManger; this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); @@ -432,8 +428,8 @@ protected void doClose() throws IOException { * reject any incoming requests, including handshakes, by closing the connection. */ public final void acceptIncomingRequests() { - final boolean startedWithThisCall = handleIncomingRequests.compareAndSet(false, true); - assert startedWithThisCall : "transport service was already accepting incoming requests"; + assert handleIncomingRequests == false : "transport service was already accepting incoming requests"; + handleIncomingRequests = true; logger.debug("now accepting incoming requests"); } @@ -750,14 +746,6 @@ public void disconnectFromNode(DiscoveryNode node) { connectionManager.disconnectFromNode(node); } - public void addMessageListener(TransportMessageListener listener) { - messageListener.listeners.add(listener); - } - - public void removeMessageListener(TransportMessageListener listener) { - messageListener.listeners.remove(listener); - } - public void addConnectionListener(TransportConnectionListener listener) { connectionManager.addListener(listener); } @@ -1265,13 +1253,12 @@ public void registerRequestHandler( */ @Override public void onRequestReceived(long requestId, String action) { - if (handleIncomingRequests.get() == false) { + if (handleIncomingRequests == false) { throw new TransportNotReadyException(); } if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] received request", requestId, action); } - messageListener.onRequestReceived(requestId, action); } /** called by the {@link Transport} implementation once a request has been sent */ @@ -1286,7 +1273,6 @@ public void onRequestSent( if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout()); } - messageListener.onRequestSent(node, requestId, action, request, options); } @Override @@ -1297,7 +1283,6 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder) } else if (tracerLog.isTraceEnabled() && shouldTraceAction(holder.action())) { tracerLog.trace("[{}][{}] received response from [{}]", requestId, holder.action(), holder.connection().getNode()); } - messageListener.onResponseReceived(requestId, holder); } /** called by the {@link Transport} implementation once a response was sent to calling node */ @@ -1306,7 +1291,6 @@ public void onResponseSent(long requestId, String action, TransportResponse resp if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] sent response", requestId, action); } - messageListener.onResponseSent(requestId, action, response); } /** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */ @@ -1315,7 +1299,6 @@ public void onResponseSent(long requestId, String action, Exception e) { if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace(() -> format("[%s][%s] sent error response", requestId, action), e); } - messageListener.onResponseSent(requestId, action, e); } public RequestHandlerRegistry getRequestHandler(String action) { @@ -1453,6 +1436,7 @@ public void run() { public void cancel() { assert responseHandlers.contains(requestId) == false : "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers"; + var cancellable = this.cancellable; if (cancellable != null) { cancellable.cancel(); } @@ -1492,6 +1476,7 @@ public T read(StreamInput in) throws IOException { @Override public void handleResponse(T response) { + var handler = this.handler; if (handler != null) { handler.cancel(); } @@ -1502,6 +1487,7 @@ public void handleResponse(T response) { @Override public void handleException(TransportException exp) { + var handler = this.handler; if (handler != null) { handler.cancel(); } @@ -1666,53 +1652,6 @@ private boolean isLocalNode(DiscoveryNode discoveryNode) { return discoveryNode.equals(localNode); } - private static final class DelegatingTransportMessageListener implements TransportMessageListener { - - private final List listeners = new CopyOnWriteArrayList<>(); - - @Override - public void onRequestReceived(long requestId, String action) { - for (TransportMessageListener listener : listeners) { - listener.onRequestReceived(requestId, action); - } - } - - @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { - for (TransportMessageListener listener : listeners) { - listener.onResponseSent(requestId, action, response); - } - } - - @Override - public void onResponseSent(long requestId, String action, Exception error) { - for (TransportMessageListener listener : listeners) { - listener.onResponseSent(requestId, action, error); - } - } - - @Override - public void onRequestSent( - DiscoveryNode node, - long requestId, - String action, - TransportRequest request, - TransportRequestOptions finalOptions - ) { - for (TransportMessageListener listener : listeners) { - listener.onRequestSent(node, requestId, action, request, finalOptions); - } - } - - @Override - @SuppressWarnings("rawtypes") - public void onResponseReceived(long requestId, Transport.ResponseContext holder) { - for (TransportMessageListener listener : listeners) { - listener.onResponseReceived(requestId, holder); - } - } - } - private static class PendingDirectHandlers extends AbstractRefCounted { // To handle a response we (i) remove the handler from responseHandlers and then (ii) enqueue an action to complete the handler on diff --git a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java index a9fa5ba36fde0..5a69674b341b4 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java @@ -11,6 +11,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportService; @@ -18,6 +19,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; +import static org.elasticsearch.test.ESTestCase.asInstanceOf; + /** * Utilities around testing the `error_trace` message header in search. */ @@ -26,16 +29,20 @@ public enum ErrorTraceHelper { public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) { final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false); - internalCluster.getDataNodeInstances(TransportService.class).forEach(ts -> ts.addMessageListener(new TransportMessageListener() { - @Override - public void onResponseSent(long requestId, String action, Exception error) { - TransportMessageListener.super.onResponseSent(requestId, action, error); - if (action.startsWith("indices:data/read/search")) { - Optional throwable = ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> t.getStackTrace().length > 0); - transportMessageHasStackTrace.set(throwable.isPresent()); + internalCluster.getDataNodeInstances(TransportService.class) + .forEach(ts -> asInstanceOf(MockTransportService.class, ts).addMessageListener(new TransportMessageListener() { + @Override + public void onResponseSent(long requestId, String action, Exception error) { + TransportMessageListener.super.onResponseSent(requestId, action, error); + if (action.startsWith("indices:data/read/search")) { + Optional throwable = ExceptionsHelper.unwrapCausesAndSuppressed( + error, + t -> t.getStackTrace().length > 0 + ); + transportMessageHasStackTrace.set(throwable.isPresent()); + } } - } - })); + })); return transportMessageHasStackTrace::get; } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 18c591166e720..0ecd9bcd86dfc 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -57,8 +57,10 @@ import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.netty4.Netty4Transport; @@ -106,6 +108,8 @@ public class MockTransportService extends TransportService { private final List onStopListeners = new CopyOnWriteArrayList<>(); private final AtomicReference> onConnectionClosedCallback = new AtomicReference<>(); + private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); + public static class TestPlugin extends Plugin { @Override public List> getSettings() { @@ -814,4 +818,96 @@ protected void doClose() throws IOException { assertTrue(ThreadPool.terminate(testExecutor, 10, TimeUnit.SECONDS)); } } + + @Override + public void onRequestReceived(long requestId, String action) { + super.onRequestReceived(requestId, action); + messageListener.onRequestReceived(requestId, action); + } + + @Override + public void onRequestSent( + DiscoveryNode node, + long requestId, + String action, + TransportRequest request, + TransportRequestOptions options + ) { + super.onRequestSent(node, requestId, action, request, options); + messageListener.onRequestSent(node, requestId, action, request, options); + } + + @Override + @SuppressWarnings("rawtypes") + public void onResponseReceived(long requestId, Transport.ResponseContext holder) { + super.onResponseReceived(requestId, holder); + messageListener.onResponseReceived(requestId, holder); + } + + @Override + public void onResponseSent(long requestId, String action, TransportResponse response) { + super.onResponseSent(requestId, action, response); + messageListener.onResponseSent(requestId, action, response); + } + + @Override + public void onResponseSent(long requestId, String action, Exception e) { + super.onResponseSent(requestId, action, e); + messageListener.onResponseSent(requestId, action, e); + } + + public void addMessageListener(TransportMessageListener listener) { + messageListener.listeners.add(listener); + } + + public void removeMessageListener(TransportMessageListener listener) { + messageListener.listeners.remove(listener); + } + + private static final class DelegatingTransportMessageListener implements TransportMessageListener { + + private final List listeners = new CopyOnWriteArrayList<>(); + + @Override + public void onRequestReceived(long requestId, String action) { + for (TransportMessageListener listener : listeners) { + listener.onRequestReceived(requestId, action); + } + } + + @Override + public void onResponseSent(long requestId, String action, TransportResponse response) { + for (TransportMessageListener listener : listeners) { + listener.onResponseSent(requestId, action, response); + } + } + + @Override + public void onResponseSent(long requestId, String action, Exception error) { + for (TransportMessageListener listener : listeners) { + listener.onResponseSent(requestId, action, error); + } + } + + @Override + public void onRequestSent( + DiscoveryNode node, + long requestId, + String action, + TransportRequest request, + TransportRequestOptions finalOptions + ) { + for (TransportMessageListener listener : listeners) { + listener.onRequestSent(node, requestId, action, request, finalOptions); + } + } + + @Override + @SuppressWarnings("rawtypes") + public void onResponseReceived(long requestId, Transport.ResponseContext holder) { + for (TransportMessageListener listener : listeners) { + listener.onResponseReceived(requestId, holder); + } + } + } } diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java index 8583844e76aec..ba06de652dc70 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java @@ -9,17 +9,18 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.ErrorTraceHelper; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.xcontent.XContentType; import org.junit.Before; import java.io.IOException; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.function.BooleanSupplier; @@ -31,8 +32,9 @@ protected boolean addMockHttpTransport() { } @Override + @SuppressWarnings("unchecked") protected Collection> nodePlugins() { - return List.of(AsyncSearch.class); + return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), AsyncSearch.class, MockTransportService.TestPlugin.class); } private BooleanSupplier transportMessageHasStackTrace;