From 93a2db826420dada6245dab58248e7843b1b52aa Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 8 Mar 2025 21:51:58 +0100 Subject: [PATCH 1/4] Remove some overhead from TransportService message handling Avoiding some indirection, volatile-reads and moving the listener functionality that needlessly kept iterating an empty CoW list (creating iterator instances, volatile reads, more code) in an effort to improve the low IPC on transport threads. --- .../DedicatedClusterSnapshotRestoreIT.java | 4 +- .../transport/TransportService.java | 77 ++------------- .../search/ErrorTraceHelper.java | 23 +++-- .../test/transport/MockTransportService.java | 96 +++++++++++++++++++ 4 files changed, 119 insertions(+), 81 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 0d359300bbdc1..26818002e1129 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -66,7 +66,6 @@ import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.io.UncheckedIOException; @@ -1052,8 +1051,7 @@ public void testAbortWaitsOnDataNode() throws Exception { final AtomicBoolean blocked = new AtomicBoolean(true); - final TransportService transportService = internalCluster().getInstance(TransportService.class, otherDataNode); - transportService.addMessageListener(new TransportMessageListener() { + MockTransportService.getInstance(otherDataNode).addMessageListener(new TransportMessageListener() { @Override public void onRequestSent( DiscoveryNode node, diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index b546b8cdd0f5f..1968a07b20eb4 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -60,11 +60,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -103,8 +101,7 @@ public class TransportService extends AbstractLifecycleComponent Setting.Property.Deprecated ); - private final AtomicBoolean handleIncomingRequests = new AtomicBoolean(); - private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); + private volatile boolean handleIncomingRequests; protected final Transport transport; protected final ConnectionManager connectionManager; protected final ThreadPool threadPool; @@ -134,7 +131,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { // tracer log - private final Logger tracerLog; + private static final Logger tracerLog = Loggers.getLogger(logger, ".tracer"); private final Tracer tracer; volatile String[] tracerLogInclude; @@ -291,7 +288,6 @@ public TransportService( this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings)); - tracerLog = Loggers.getLogger(logger, ".tracer"); this.taskManager = taskManger; this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); @@ -432,8 +428,8 @@ protected void doClose() throws IOException { * reject any incoming requests, including handshakes, by closing the connection. */ public final void acceptIncomingRequests() { - final boolean startedWithThisCall = handleIncomingRequests.compareAndSet(false, true); - assert startedWithThisCall : "transport service was already accepting incoming requests"; + assert handleIncomingRequests == false : "transport service was already accepting incoming requests"; + handleIncomingRequests = true; logger.debug("now accepting incoming requests"); } @@ -750,14 +746,6 @@ public void disconnectFromNode(DiscoveryNode node) { connectionManager.disconnectFromNode(node); } - public void addMessageListener(TransportMessageListener listener) { - messageListener.listeners.add(listener); - } - - public void removeMessageListener(TransportMessageListener listener) { - messageListener.listeners.remove(listener); - } - public void addConnectionListener(TransportConnectionListener listener) { connectionManager.addListener(listener); } @@ -1265,13 +1253,12 @@ public void registerRequestHandler( */ @Override public void onRequestReceived(long requestId, String action) { - if (handleIncomingRequests.get() == false) { + if (handleIncomingRequests == false) { throw new TransportNotReadyException(); } if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] received request", requestId, action); } - messageListener.onRequestReceived(requestId, action); } /** called by the {@link Transport} implementation once a request has been sent */ @@ -1286,7 +1273,6 @@ public void onRequestSent( if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] sent to [{}] (timeout: [{}])", requestId, action, node, options.timeout()); } - messageListener.onRequestSent(node, requestId, action, request, options); } @Override @@ -1297,7 +1283,6 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder) } else if (tracerLog.isTraceEnabled() && shouldTraceAction(holder.action())) { tracerLog.trace("[{}][{}] received response from [{}]", requestId, holder.action(), holder.connection().getNode()); } - messageListener.onResponseReceived(requestId, holder); } /** called by the {@link Transport} implementation once a response was sent to calling node */ @@ -1306,7 +1291,6 @@ public void onResponseSent(long requestId, String action, TransportResponse resp if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] sent response", requestId, action); } - messageListener.onResponseSent(requestId, action, response); } /** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */ @@ -1315,7 +1299,6 @@ public void onResponseSent(long requestId, String action, Exception e) { if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace(() -> format("[%s][%s] sent error response", requestId, action), e); } - messageListener.onResponseSent(requestId, action, e); } public RequestHandlerRegistry getRequestHandler(String action) { @@ -1453,6 +1436,7 @@ public void run() { public void cancel() { assert responseHandlers.contains(requestId) == false : "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers"; + var cancellable = this.cancellable; if (cancellable != null) { cancellable.cancel(); } @@ -1492,6 +1476,7 @@ public T read(StreamInput in) throws IOException { @Override public void handleResponse(T response) { + var handler = this.handler; if (handler != null) { handler.cancel(); } @@ -1502,6 +1487,7 @@ public void handleResponse(T response) { @Override public void handleException(TransportException exp) { + var handler = this.handler; if (handler != null) { handler.cancel(); } @@ -1666,53 +1652,6 @@ private boolean isLocalNode(DiscoveryNode discoveryNode) { return discoveryNode.equals(localNode); } - private static final class DelegatingTransportMessageListener implements TransportMessageListener { - - private final List listeners = new CopyOnWriteArrayList<>(); - - @Override - public void onRequestReceived(long requestId, String action) { - for (TransportMessageListener listener : listeners) { - listener.onRequestReceived(requestId, action); - } - } - - @Override - public void onResponseSent(long requestId, String action, TransportResponse response) { - for (TransportMessageListener listener : listeners) { - listener.onResponseSent(requestId, action, response); - } - } - - @Override - public void onResponseSent(long requestId, String action, Exception error) { - for (TransportMessageListener listener : listeners) { - listener.onResponseSent(requestId, action, error); - } - } - - @Override - public void onRequestSent( - DiscoveryNode node, - long requestId, - String action, - TransportRequest request, - TransportRequestOptions finalOptions - ) { - for (TransportMessageListener listener : listeners) { - listener.onRequestSent(node, requestId, action, request, finalOptions); - } - } - - @Override - @SuppressWarnings("rawtypes") - public void onResponseReceived(long requestId, Transport.ResponseContext holder) { - for (TransportMessageListener listener : listeners) { - listener.onResponseReceived(requestId, holder); - } - } - } - private static class PendingDirectHandlers extends AbstractRefCounted { // To handle a response we (i) remove the handler from responseHandlers and then (ii) enqueue an action to complete the handler on diff --git a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java index a9fa5ba36fde0..567a86c6377ad 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java @@ -11,6 +11,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportService; @@ -26,16 +27,20 @@ public enum ErrorTraceHelper { public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) { final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false); - internalCluster.getDataNodeInstances(TransportService.class).forEach(ts -> ts.addMessageListener(new TransportMessageListener() { - @Override - public void onResponseSent(long requestId, String action, Exception error) { - TransportMessageListener.super.onResponseSent(requestId, action, error); - if (action.startsWith("indices:data/read/search")) { - Optional throwable = ExceptionsHelper.unwrapCausesAndSuppressed(error, t -> t.getStackTrace().length > 0); - transportMessageHasStackTrace.set(throwable.isPresent()); + internalCluster.getDataNodeInstances(TransportService.class) + .forEach(ts -> ((MockTransportService) ts).addMessageListener(new TransportMessageListener() { + @Override + public void onResponseSent(long requestId, String action, Exception error) { + TransportMessageListener.super.onResponseSent(requestId, action, error); + if (action.startsWith("indices:data/read/search")) { + Optional throwable = ExceptionsHelper.unwrapCausesAndSuppressed( + error, + t -> t.getStackTrace().length > 0 + ); + transportMessageHasStackTrace.set(throwable.isPresent()); + } } - } - })); + })); return transportMessageHasStackTrace::get; } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 18c591166e720..0ecd9bcd86dfc 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -57,8 +57,10 @@ import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.netty4.Netty4Transport; @@ -106,6 +108,8 @@ public class MockTransportService extends TransportService { private final List onStopListeners = new CopyOnWriteArrayList<>(); private final AtomicReference> onConnectionClosedCallback = new AtomicReference<>(); + private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); + public static class TestPlugin extends Plugin { @Override public List> getSettings() { @@ -814,4 +818,96 @@ protected void doClose() throws IOException { assertTrue(ThreadPool.terminate(testExecutor, 10, TimeUnit.SECONDS)); } } + + @Override + public void onRequestReceived(long requestId, String action) { + super.onRequestReceived(requestId, action); + messageListener.onRequestReceived(requestId, action); + } + + @Override + public void onRequestSent( + DiscoveryNode node, + long requestId, + String action, + TransportRequest request, + TransportRequestOptions options + ) { + super.onRequestSent(node, requestId, action, request, options); + messageListener.onRequestSent(node, requestId, action, request, options); + } + + @Override + @SuppressWarnings("rawtypes") + public void onResponseReceived(long requestId, Transport.ResponseContext holder) { + super.onResponseReceived(requestId, holder); + messageListener.onResponseReceived(requestId, holder); + } + + @Override + public void onResponseSent(long requestId, String action, TransportResponse response) { + super.onResponseSent(requestId, action, response); + messageListener.onResponseSent(requestId, action, response); + } + + @Override + public void onResponseSent(long requestId, String action, Exception e) { + super.onResponseSent(requestId, action, e); + messageListener.onResponseSent(requestId, action, e); + } + + public void addMessageListener(TransportMessageListener listener) { + messageListener.listeners.add(listener); + } + + public void removeMessageListener(TransportMessageListener listener) { + messageListener.listeners.remove(listener); + } + + private static final class DelegatingTransportMessageListener implements TransportMessageListener { + + private final List listeners = new CopyOnWriteArrayList<>(); + + @Override + public void onRequestReceived(long requestId, String action) { + for (TransportMessageListener listener : listeners) { + listener.onRequestReceived(requestId, action); + } + } + + @Override + public void onResponseSent(long requestId, String action, TransportResponse response) { + for (TransportMessageListener listener : listeners) { + listener.onResponseSent(requestId, action, response); + } + } + + @Override + public void onResponseSent(long requestId, String action, Exception error) { + for (TransportMessageListener listener : listeners) { + listener.onResponseSent(requestId, action, error); + } + } + + @Override + public void onRequestSent( + DiscoveryNode node, + long requestId, + String action, + TransportRequest request, + TransportRequestOptions finalOptions + ) { + for (TransportMessageListener listener : listeners) { + listener.onRequestSent(node, requestId, action, request, finalOptions); + } + } + + @Override + @SuppressWarnings("rawtypes") + public void onResponseReceived(long requestId, Transport.ResponseContext holder) { + for (TransportMessageListener listener : listeners) { + listener.onResponseReceived(requestId, holder); + } + } + } } From c1080dfa9ddaf5b4549dec28938c346419228bab Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 9 Mar 2025 08:12:42 +0100 Subject: [PATCH 2/4] add mock --- .../java/org/elasticsearch/http/SearchErrorTraceIT.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java index 99e89f0e31cc5..d91a395c7f3ba 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java @@ -14,13 +14,17 @@ import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Request; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.ErrorTraceHelper; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.xcontent.XContentType; import org.junit.Before; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Collection; import java.util.function.BooleanSupplier; import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery; @@ -28,6 +32,11 @@ public class SearchErrorTraceIT extends HttpSmokeTestCase { private BooleanSupplier hasStackTrace; + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class); + } + @Before public void setupMessageListener() { hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster()); From 1257c24a89cec512f9022566b7aa4c642829561b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 9 Mar 2025 11:07:47 +0100 Subject: [PATCH 3/4] fix another test --- .../elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java index 8583844e76aec..6466bda14dd57 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java @@ -14,6 +14,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.ErrorTraceHelper; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.xcontent.XContentType; import org.junit.Before; @@ -32,7 +33,7 @@ protected boolean addMockHttpTransport() { @Override protected Collection> nodePlugins() { - return List.of(AsyncSearch.class); + return List.of(AsyncSearch.class, MockTransportService.TestPlugin.class); } private BooleanSupplier transportMessageHasStackTrace; From b71ecd6bf1fd6c2cfe3037d30ca09b6b0fafefba Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 9 Mar 2025 14:44:40 +0100 Subject: [PATCH 4/4] CR comments --- .../main/java/org/elasticsearch/search/ErrorTraceHelper.java | 4 +++- .../elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java index 567a86c6377ad..5a69674b341b4 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/search/ErrorTraceHelper.java @@ -19,6 +19,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; +import static org.elasticsearch.test.ESTestCase.asInstanceOf; + /** * Utilities around testing the `error_trace` message header in search. */ @@ -28,7 +30,7 @@ public enum ErrorTraceHelper { public static BooleanSupplier setupErrorTraceListener(InternalTestCluster internalCluster) { final AtomicBoolean transportMessageHasStackTrace = new AtomicBoolean(false); internalCluster.getDataNodeInstances(TransportService.class) - .forEach(ts -> ((MockTransportService) ts).addMessageListener(new TransportMessageListener() { + .forEach(ts -> asInstanceOf(MockTransportService.class, ts).addMessageListener(new TransportMessageListener() { @Override public void onResponseSent(long requestId, String action, Exception error) { TransportMessageListener.super.onResponseSent(requestId, action, error); diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java index 6466bda14dd57..ba06de652dc70 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java @@ -9,6 +9,7 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; @@ -20,7 +21,6 @@ import java.io.IOException; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.function.BooleanSupplier; @@ -32,8 +32,9 @@ protected boolean addMockHttpTransport() { } @Override + @SuppressWarnings("unchecked") protected Collection> nodePlugins() { - return List.of(AsyncSearch.class, MockTransportService.TestPlugin.class); + return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), AsyncSearch.class, MockTransportService.TestPlugin.class); } private BooleanSupplier transportMessageHasStackTrace;