From d90d0833bc393a73f02d345467599d14bb4ab6a7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 13 Jan 2025 13:24:25 +0100 Subject: [PATCH 1/4] Fix SearchStatesIT.testCanMatch We need to override the proxy handler here so that the old type free request gets translated to a scroll free request the same way this works out in local cluster execution. closes #118718 Signed-off-by: Armin Braun --- muted-tests.yml | 3 --- .../action/search/SearchTransportService.java | 22 +++++++++++++++++-- .../transport/TransportActionProxy.java | 12 +++++----- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 64a890f5e7298..b47b791e506f2 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -425,9 +425,6 @@ tests: - class: org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshotTests method: testSkipNonRootOfNestedDocuments issue: https://github.com/elastic/elasticsearch/issues/119553 -- class: org.elasticsearch.upgrades.SearchStatesIT - method: testCanMatch - issue: https://github.com/elastic/elasticsearch/issues/118718 - class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT method: "test {p0=search.vectors/110_knn_query_with_filter/PRE_FILTER: knn query with internal filter as pre-filter}" issue: https://github.com/elastic/elasticsearch/issues/119804 diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 2041754bc2bcc..a26d654e9e252 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -400,14 +400,32 @@ public static void registerRequestHandler(TransportService transportService, Sea ); // TODO: remove this handler once the lowest compatible version stops using it + // this handler exists for BwC purposes only, we don't need the original indices to free the context transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, freeContextExecutor, in -> { var res = new ScrollFreeContextRequest(in); // this handler exists for BwC purposes only, we don't need the original indices to free the context OriginalIndices.readOriginalIndices(in); return res; }, freeContextHandler); - TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom); - + transportService.registerRequestHandler( + TransportActionProxy.getProxyAction(FREE_CONTEXT_ACTION_NAME), + EsExecutors.DIRECT_EXECUTOR_SERVICE, + true, + false, + in -> { + return new TransportActionProxy.ProxyRequest<>(in, i -> { + var res = new ScrollFreeContextRequest(i); + // this handler exists for BwC purposes only, we don't need the original indices to free the context + OriginalIndices.readOriginalIndices(i); + return res; + }); + }, + new TransportActionProxy.ProxyRequestHandler<>( + transportService, + FREE_CONTEXT_SCROLL_ACTION_NAME, + request -> SearchFreeContextResponse::readFrom + ) + ); transportService.registerRequestHandler( CLEAR_SCROLL_CONTEXTS_ACTION_NAME, freeContextExecutor, diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index 25278e5b276fd..97be009271822 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -32,13 +32,13 @@ public final class TransportActionProxy { private TransportActionProxy() {} // no instance - private static class ProxyRequestHandler> implements TransportRequestHandler { + public static class ProxyRequestHandler> implements TransportRequestHandler { private final TransportService service; private final String action; private final Function> responseFunction; - ProxyRequestHandler( + public ProxyRequestHandler( TransportService service, String action, Function> responseFunction @@ -126,7 +126,7 @@ public boolean decRef() { } } - static class ProxyRequest extends TransportRequest { + public static class ProxyRequest extends TransportRequest { final T wrapped; final DiscoveryNode targetNode; @@ -135,7 +135,7 @@ static class ProxyRequest extends TransportRequest { this.targetNode = targetNode; } - ProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { + public ProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { super(in); targetNode = new DiscoveryNode(in); wrapped = reader.read(in); @@ -150,8 +150,8 @@ public void writeTo(StreamOutput out) throws IOException { } } - private static class CancellableProxyRequest extends ProxyRequest { - CancellableProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { + public static class CancellableProxyRequest extends ProxyRequest { + public CancellableProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { super(in, reader); } From 6172b267085fce226449c33c4130ffe601cb5d29 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 13 Jan 2025 15:23:14 +0100 Subject: [PATCH 2/4] cleanup --- .../action/search/SearchTransportService.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index a26d654e9e252..fbbabaedebb8f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -407,19 +407,18 @@ public static void registerRequestHandler(TransportService transportService, Sea OriginalIndices.readOriginalIndices(in); return res; }, freeContextHandler); + // BwC handler that translates FREE_CONTEXT_ACTION_NAME into FREE_CONTEXT_SCROLL_ACTION_NAME same way we use + // FREE_CONTEXT_SCROLL_ACTION_NAME throughout on local nodes transportService.registerRequestHandler( TransportActionProxy.getProxyAction(FREE_CONTEXT_ACTION_NAME), EsExecutors.DIRECT_EXECUTOR_SERVICE, true, false, - in -> { - return new TransportActionProxy.ProxyRequest<>(in, i -> { - var res = new ScrollFreeContextRequest(i); - // this handler exists for BwC purposes only, we don't need the original indices to free the context - OriginalIndices.readOriginalIndices(i); - return res; - }); - }, + in -> new TransportActionProxy.ProxyRequest<>(in, i -> { + var res = new ScrollFreeContextRequest(i); + OriginalIndices.readOriginalIndices(i); + return res; + }), new TransportActionProxy.ProxyRequestHandler<>( transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, From 793539c4ac97d4878d718f4048b5efcf62240fd6 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 13 Jan 2025 16:25:35 +0100 Subject: [PATCH 3/4] cleanup --- .../action/search/SearchTransportService.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index fbbabaedebb8f..6ec9f4b6143bd 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -401,12 +401,12 @@ public static void registerRequestHandler(TransportService transportService, Sea // TODO: remove this handler once the lowest compatible version stops using it // this handler exists for BwC purposes only, we don't need the original indices to free the context - transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, freeContextExecutor, in -> { + final Writeable.Reader bwcFreeCtxRequestReader = in -> { var res = new ScrollFreeContextRequest(in); - // this handler exists for BwC purposes only, we don't need the original indices to free the context OriginalIndices.readOriginalIndices(in); return res; - }, freeContextHandler); + }; + transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, freeContextExecutor, bwcFreeCtxRequestReader, freeContextHandler); // BwC handler that translates FREE_CONTEXT_ACTION_NAME into FREE_CONTEXT_SCROLL_ACTION_NAME same way we use // FREE_CONTEXT_SCROLL_ACTION_NAME throughout on local nodes transportService.registerRequestHandler( @@ -414,11 +414,7 @@ public static void registerRequestHandler(TransportService transportService, Sea EsExecutors.DIRECT_EXECUTOR_SERVICE, true, false, - in -> new TransportActionProxy.ProxyRequest<>(in, i -> { - var res = new ScrollFreeContextRequest(i); - OriginalIndices.readOriginalIndices(i); - return res; - }), + in -> new TransportActionProxy.ProxyRequest<>(in, bwcFreeCtxRequestReader::read), new TransportActionProxy.ProxyRequestHandler<>( transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, From d5a161c203d75f38bf8d8b31dbfd5a3e75a7b1d4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 17 Jan 2025 12:50:45 +0100 Subject: [PATCH 4/4] old style request class is back --- .../action/search/SearchTransportService.java | 58 +++++++++++++------ .../transport/TransportActionProxy.java | 12 ++-- 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 6ec9f4b6143bd..1dc23519f23ab 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -13,10 +13,12 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction; import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -350,6 +352,38 @@ private static class ClearScrollContextsRequest extends TransportRequest { } } + static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest { + private final OriginalIndices originalIndices; + + SearchFreeContextRequest(StreamInput in) throws IOException { + super(in); + originalIndices = OriginalIndices.readOriginalIndices(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + OriginalIndices.writeOriginalIndices(originalIndices, out); + } + + @Override + public String[] indices() { + if (originalIndices == null) { + return null; + } + return originalIndices.indices(); + } + + @Override + public IndicesOptions indicesOptions() { + if (originalIndices == null) { + return null; + } + return originalIndices.indicesOptions(); + } + + } + public static class SearchFreeContextResponse extends TransportResponse { private static final SearchFreeContextResponse FREED = new SearchFreeContextResponse(true); @@ -401,26 +435,14 @@ public static void registerRequestHandler(TransportService transportService, Sea // TODO: remove this handler once the lowest compatible version stops using it // this handler exists for BwC purposes only, we don't need the original indices to free the context - final Writeable.Reader bwcFreeCtxRequestReader = in -> { - var res = new ScrollFreeContextRequest(in); - OriginalIndices.readOriginalIndices(in); - return res; - }; - transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, freeContextExecutor, bwcFreeCtxRequestReader, freeContextHandler); - // BwC handler that translates FREE_CONTEXT_ACTION_NAME into FREE_CONTEXT_SCROLL_ACTION_NAME same way we use - // FREE_CONTEXT_SCROLL_ACTION_NAME throughout on local nodes transportService.registerRequestHandler( - TransportActionProxy.getProxyAction(FREE_CONTEXT_ACTION_NAME), - EsExecutors.DIRECT_EXECUTOR_SERVICE, - true, - false, - in -> new TransportActionProxy.ProxyRequest<>(in, bwcFreeCtxRequestReader::read), - new TransportActionProxy.ProxyRequestHandler<>( - transportService, - FREE_CONTEXT_SCROLL_ACTION_NAME, - request -> SearchFreeContextResponse::readFrom - ) + FREE_CONTEXT_ACTION_NAME, + freeContextExecutor, + SearchFreeContextRequest::new, + freeContextHandler ); + TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom); + transportService.registerRequestHandler( CLEAR_SCROLL_CONTEXTS_ACTION_NAME, freeContextExecutor, diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index 97be009271822..25278e5b276fd 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -32,13 +32,13 @@ public final class TransportActionProxy { private TransportActionProxy() {} // no instance - public static class ProxyRequestHandler> implements TransportRequestHandler { + private static class ProxyRequestHandler> implements TransportRequestHandler { private final TransportService service; private final String action; private final Function> responseFunction; - public ProxyRequestHandler( + ProxyRequestHandler( TransportService service, String action, Function> responseFunction @@ -126,7 +126,7 @@ public boolean decRef() { } } - public static class ProxyRequest extends TransportRequest { + static class ProxyRequest extends TransportRequest { final T wrapped; final DiscoveryNode targetNode; @@ -135,7 +135,7 @@ public static class ProxyRequest extends TransportRe this.targetNode = targetNode; } - public ProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { + ProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { super(in); targetNode = new DiscoveryNode(in); wrapped = reader.read(in); @@ -150,8 +150,8 @@ public void writeTo(StreamOutput out) throws IOException { } } - public static class CancellableProxyRequest extends ProxyRequest { - public CancellableProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { + private static class CancellableProxyRequest extends ProxyRequest { + CancellableProxyRequest(StreamInput in, Writeable.Reader reader) throws IOException { super(in, reader); }