From 73b04f69ebdeadcb6d64bd61956be65262dfeffe Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 6 Oct 2025 14:01:49 +0200 Subject: [PATCH 1/3] Convert BytesTransportResponse when proxying response from/to local node (#135873) #127112 introduced `BytesTransportResponse` to be used in search batched execution, so that `NodeQueryResponse` could be written as `BytesTransportResponse` as opposed to materializing the response object on heap. When a proxy node acts as a proxy to query its local data, and the coordinating node is on a different version than the proxy node, the response will fail to deserialize in the coord node because it was written with the version of the proxy node as opposed to that of the coord (target) node. This is because `DirectResponseChannel` skips the step of reading and writing back such response, which would lead to it being converted to the right format. This commit attempts to fix this problem by tracking the version used to write the binary response, and conditionally converting it in the `ProxyRequestHandler` when the version don't align. --- docs/changelog/135873.yaml | 5 + .../SearchQueryThenFetchAsyncAction.java | 13 +- .../action/search/SearchTransportService.java | 82 ++++++-- .../TransportOpenPointInTimeAction.java | 8 +- .../action/search/TransportSearchAction.java | 2 +- .../transport/BytesTransportResponse.java | 23 ++- .../transport/TransportActionProxy.java | 44 ++++- .../transport/TransportActionProxyTests.java | 182 +++++++++++++++++- .../ClearCcrRestoreSessionAction.java | 26 ++- .../GetCcrRestoreFileChunkAction.java | 22 ++- .../ClearCcrRestoreSessionActionTests.java | 22 ++- .../GetCcrRestoreFileChunkActionTests.java | 14 +- 12 files changed, 393 insertions(+), 50 deletions(-) create mode 100644 docs/changelog/135873.yaml diff --git a/docs/changelog/135873.yaml b/docs/changelog/135873.yaml new file mode 100644 index 0000000000000..11311425296d7 --- /dev/null +++ b/docs/changelog/135873.yaml @@ -0,0 +1,5 @@ +pr: 135873 +summary: Convert `BytesTransportResponse` when proxying response from/to local node +area: "Network" +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 539ac4ccf955c..736f9da4414a5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -654,7 +654,13 @@ static void registerNodeSearchAction( } } ); - TransportActionProxy.registerProxyAction(transportService, NODE_SEARCH_ACTION_NAME, true, NodeQueryResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + NODE_SEARCH_ACTION_NAME, + true, + NodeQueryResponse::new, + namedWriteableRegistry + ); } private static void releaseLocalContext( @@ -987,7 +993,10 @@ void bwcRespond() { out.close(); } } - ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(out.moveToBytesReference())); + ActionListener.respondAndRelease( + channelListener, + new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()) + ); } private void maybeFreeContext( diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 1e0fa28889c97..fc885f7562a80 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -21,6 +21,7 @@ import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -384,7 +385,11 @@ public void writeTo(StreamOutput out) throws IOException { } } - public static void registerRequestHandler(TransportService transportService, SearchService searchService) { + public static void registerRequestHandler( + TransportService transportService, + SearchService searchService, + NamedWriteableRegistry namedWriteableRegistry + ) { final TransportRequestHandler freeContextHandler = (request, channel, task) -> { logger.trace("releasing search context [{}]", request.id()); boolean freed = searchService.freeReaderContext(request.id()); @@ -401,7 +406,8 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, false, - SearchFreeContextResponse::readFrom + SearchFreeContextResponse::readFrom, + namedWriteableRegistry ); // TODO: remove this handler once the lowest compatible version stops using it @@ -411,7 +417,13 @@ public static void registerRequestHandler(TransportService transportService, Sea OriginalIndices.readOriginalIndices(in); return res; }, freeContextHandler); - TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom); + TransportActionProxy.registerProxyAction( + transportService, + FREE_CONTEXT_ACTION_NAME, + false, + SearchFreeContextResponse::readFrom, + namedWriteableRegistry + ); transportService.registerRequestHandler( CLEAR_SCROLL_CONTEXTS_ACTION_NAME, @@ -426,7 +438,8 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, false, - (in) -> ActionResponse.Empty.INSTANCE + (in) -> ActionResponse.Empty.INSTANCE, + namedWriteableRegistry ); transportService.registerRequestHandler( @@ -435,7 +448,7 @@ public static void registerRequestHandler(TransportService transportService, Sea ShardSearchRequest::new, (request, channel, task) -> searchService.executeDfsPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel)) ); - TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, true, DfsSearchResult::new); + TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, true, DfsSearchResult::new, namedWriteableRegistry); transportService.registerRequestHandler( QUERY_ACTION_NAME, @@ -451,7 +464,8 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService, QUERY_ACTION_NAME, true, - (request) -> ((ShardSearchRequest) request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new + (request) -> ((ShardSearchRequest) request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new, + namedWriteableRegistry ); transportService.registerRequestHandler( @@ -465,7 +479,13 @@ public static void registerRequestHandler(TransportService transportService, Sea channel.getVersion() ) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, true, QuerySearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_ID_ACTION_NAME, + true, + QuerySearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( QUERY_SCROLL_ACTION_NAME, @@ -478,7 +498,13 @@ public static void registerRequestHandler(TransportService transportService, Sea channel.getVersion() ) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, true, ScrollQuerySearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_SCROLL_ACTION_NAME, + true, + ScrollQuerySearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( QUERY_FETCH_SCROLL_ACTION_NAME, @@ -490,7 +516,13 @@ public static void registerRequestHandler(TransportService transportService, Sea new ChannelActionListener<>(channel) ) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, true, ScrollQueryFetchSearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_FETCH_SCROLL_ACTION_NAME, + true, + ScrollQueryFetchSearchResult::new, + namedWriteableRegistry + ); final TransportRequestHandler rankShardFeatureRequest = (request, channel, task) -> searchService .executeRankFeaturePhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel)); @@ -500,7 +532,13 @@ public static void registerRequestHandler(TransportService transportService, Sea RankFeatureShardRequest::new, rankShardFeatureRequest ); - TransportActionProxy.registerProxyAction(transportService, RANK_FEATURE_SHARD_ACTION_NAME, true, RankFeatureResult::new); + TransportActionProxy.registerProxyAction( + transportService, + RANK_FEATURE_SHARD_ACTION_NAME, + true, + RankFeatureResult::new, + namedWriteableRegistry + ); final TransportRequestHandler shardFetchRequestHandler = (request, channel, task) -> searchService .executeFetchPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel)); @@ -510,7 +548,13 @@ public static void registerRequestHandler(TransportService transportService, Sea ShardFetchRequest::new, shardFetchRequestHandler ); - TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, true, FetchSearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + FETCH_ID_SCROLL_ACTION_NAME, + true, + FetchSearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( FETCH_ID_ACTION_NAME, @@ -520,7 +564,13 @@ public static void registerRequestHandler(TransportService transportService, Sea ShardFetchSearchRequest::new, shardFetchRequestHandler ); - TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, true, FetchSearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + FETCH_ID_ACTION_NAME, + true, + FetchSearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( QUERY_CAN_MATCH_NODE_NAME, @@ -528,7 +578,13 @@ public static void registerRequestHandler(TransportService transportService, Sea CanMatchNodeRequest::new, (request, channel, task) -> searchService.canMatch(request, new ChannelActionListener<>(channel)) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NODE_NAME, true, CanMatchNodeResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_CAN_MATCH_NODE_NAME, + true, + CanMatchNodeResponse::new, + namedWriteableRegistry + ); } private static Executor buildFreeContextExecutor(TransportService transportService) { diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index ac23731c38b84..383af67757290 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -94,7 +94,13 @@ public TransportOpenPointInTimeAction( ShardOpenReaderRequest::new, new ShardOpenReaderRequestHandler() ); - TransportActionProxy.registerProxyAction(transportService, OPEN_SHARD_READER_CONTEXT_NAME, false, ShardOpenReaderResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + OPEN_SHARD_READER_CONTEXT_NAME, + false, + ShardOpenReaderResponse::new, + namedWriteableRegistry + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 69260bcac105c..6056d8646338a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -193,7 +193,7 @@ public TransportSearchAction( this.searchPhaseController = searchPhaseController; this.searchTransportService = searchTransportService; this.remoteClusterService = searchTransportService.getRemoteClusterService(); - SearchTransportService.registerRequestHandler(transportService, searchService); + SearchTransportService.registerRequestHandler(transportService, searchService, namedWriteableRegistry); SearchQueryThenFetchAsyncAction.registerNodeSearchAction( searchTransportService, searchService, diff --git a/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java b/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java index 571d0d4008e24..528199b335373 100644 --- a/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java +++ b/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java @@ -9,10 +9,13 @@ package org.elasticsearch.transport; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; +import java.util.Objects; /** * A specialized, bytes only response, that can potentially be optimized on the network layer. @@ -20,9 +23,27 @@ public class BytesTransportResponse extends TransportResponse implements BytesTransportMessage { private final ReleasableBytesReference bytes; + private final TransportVersion version; - public BytesTransportResponse(ReleasableBytesReference bytes) { + public BytesTransportResponse(ReleasableBytesReference bytes, TransportVersion version) { this.bytes = bytes; + this.version = Objects.requireNonNull(version); + } + + /** + * Does the binary response need conversion before being sent to the provided target version? + */ + public boolean mustConvertResponseForVersion(TransportVersion targetVersion) { + return version.equals(targetVersion) == false; + } + + /** + * Returns a {@link StreamInput} configured to read the underlying bytes that this response holds. + */ + public StreamInput streamInput() throws IOException { + StreamInput streamInput = bytes.streamInput(); + streamInput.setTransportVersion(version); + return streamInput; } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index bc5dab2074a6e..d53ab209c3960 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -9,6 +9,8 @@ package org.elasticsearch.transport; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -18,6 +20,7 @@ import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import java.util.concurrent.Executor; import java.util.function.Function; @@ -36,15 +39,18 @@ private static class ProxyRequestHandler> responseFunction; + private final NamedWriteableRegistry namedWriteableRegistry; ProxyRequestHandler( TransportService service, String action, - Function> responseFunction + Function> responseFunction, + NamedWriteableRegistry namedWriteableRegistry ) { this.service = service; this.action = action; this.responseFunction = responseFunction; + this.namedWriteableRegistry = namedWriteableRegistry; } @Override @@ -62,7 +68,28 @@ public Executor executor() { @Override public void handleResponse(TransportResponse response) { - channel.sendResponse(response); + // This is a short term solution to ensure data node responses for batched search go back to the coordinating + // node in the expected format when a proxy data node proxies the request to itself. The response would otherwise + // be sent directly via DirectResponseChannel, skipping the read and write step that this handler normally performs. + if (response instanceof BytesTransportResponse btr && btr.mustConvertResponseForVersion(channel.getVersion())) { + try ( + NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput( + btr.streamInput(), + namedWriteableRegistry + ) + ) { + TransportResponse convertedResponse = responseFunction.apply(wrappedRequest).read(in); + try { + channel.sendResponse(convertedResponse); + } finally { + convertedResponse.decRef(); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else { + channel.sendResponse(response); + } } @Override @@ -73,7 +100,7 @@ public void handleException(TransportException exp) { @Override public TransportResponse read(StreamInput in) throws IOException { if (in.getTransportVersion().equals(channel.getVersion()) && in.supportReadAllToReleasableBytesReference()) { - return new BytesTransportResponse(in.readAllToReleasableBytesReference()); + return new BytesTransportResponse(in.readAllToReleasableBytesReference(), in.getTransportVersion()); } else { return responseFunction.apply(wrappedRequest).read(in); } @@ -144,7 +171,9 @@ public static void registerProxyActionWithDynamicResponseType( TransportService service, String action, boolean cancellable, - Function> responseFunction + Function> responseFunction, + NamedWriteableRegistry namedWriteableRegistry + ) { RequestHandlerRegistry requestHandler = service.getRequestHandler(action); service.registerRequestHandler( @@ -155,7 +184,7 @@ public static void registerProxyActionWithDynamicResponseType( in -> cancellable ? new CancellableProxyRequest<>(in, requestHandler::newRequest) : new ProxyRequest<>(in, requestHandler::newRequest), - new ProxyRequestHandler<>(service, action, responseFunction) + new ProxyRequestHandler<>(service, action, responseFunction, namedWriteableRegistry) ); } @@ -167,9 +196,10 @@ public static void registerProxyAction( TransportService service, String action, boolean cancellable, - Writeable.Reader reader + Writeable.Reader reader, + NamedWriteableRegistry namedWriteableRegistry ) { - registerProxyActionWithDynamicResponseType(service, action, cancellable, request -> reader); + registerProxyActionWithDynamicResponseType(service, action, cancellable, request -> reader, namedWriteableRegistry); } private static final String PROXY_ACTION_PREFIX = "internal:transport/proxy/"; diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index e1a525cab3f52..c4146634f10c6 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -14,6 +14,8 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; @@ -110,6 +112,7 @@ private MockTransportService buildService(VersionInformation version, TransportV } public void testSendMessage() throws InterruptedException { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); serviceA.registerRequestHandler( "internal:test", EsExecutors.DIRECT_EXECUTOR_SERVICE, @@ -123,7 +126,7 @@ public void testSendMessage() throws InterruptedException { } ); final boolean cancellable = randomBoolean(); - TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); serviceB.registerRequestHandler( @@ -139,7 +142,7 @@ public void testSendMessage() throws InterruptedException { assertThat(response.hasReferences(), equalTo(false)); } ); - TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceB, nodeC); serviceC.registerRequestHandler( "internal:test", @@ -155,7 +158,7 @@ public void testSendMessage() throws InterruptedException { } ); - TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); // Node A -> Node B -> Node C: different versions - serialize the response { final List responses = Collections.synchronizedList(new ArrayList<>()); @@ -277,7 +280,165 @@ public void testSendLocalRequest() throws Exception { latch.countDown(); } }); - TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction( + serviceB, + "internal:test", + cancellable, + // For a proxy node proxying to itself, the response is sent directly, without it being read by the proxy layer + r -> { throw new AssertionError(); }, + new NamedWriteableRegistry(Collections.emptyList()) + ); + AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); + + // Node A -> Proxy Node B (Local execution) + serviceA.sendRequest( + nodeB, + TransportActionProxy.getProxyAction("internal:test"), + TransportActionProxy.wrapRequest(nodeB, new SimpleTestRequest("TS_A", cancellable)), // Request + new TransportResponseHandler() { + @Override + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); + } + + @Override + public Executor executor() { + return TransportResponseHandler.TRANSPORT_WORKER; + } + + @Override + public void handleResponse(SimpleTestResponse response) { + try { + assertEquals("TS_B", response.targetNode); + } finally { + latch.countDown(); + } + } + + @Override + public void handleException(TransportException exp) { + try { + throw new AssertionError(exp); + } finally { + latch.countDown(); + } + } + } + ); + latch.await(); + + final var responseInstance = response.get(); + assertThat(responseInstance, notNullValue()); + responseInstance.decRef(); + assertBusy(() -> assertThat(responseInstance.hasReferences(), equalTo(false))); + } + + public void testSendLocalRequestBytesTransportResponseSameVersion() throws Exception { + final AtomicReference response = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(2); + + final boolean cancellable = randomBoolean(); + serviceB.registerRequestHandler("internal:test", randomExecutor(threadPool), SimpleTestRequest::new, (request, channel, task) -> { + try { + assertThat(task instanceof CancellableTask, equalTo(cancellable)); + assertEquals(request.sourceNode, "TS_A"); + + SimpleTestResponse tsB = new SimpleTestResponse("TS_B"); + try (RecyclerBytesStreamOutput out = serviceB.newNetworkBytesStream()) { + out.setTransportVersion(transportVersion1); + tsB.writeTo(out); + // simulate what happens in SearchQueryThenFetchAsyncAction with NodeQueryResponse + final BytesTransportResponse responseB = new BytesTransportResponse(out.moveToBytesReference(), transportVersion1); + channel.sendResponse(responseB); + response.set(responseB); + } finally { + tsB.decRef(); + } + } finally { + latch.countDown(); + } + }); + TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, in -> { + throw new AssertionError("read should not be called for local proxying when versions align"); + }, new NamedWriteableRegistry(Collections.emptyList())); + AbstractSimpleTransportTestCase.connectToNode(serviceC, nodeB); + + // Node C -> Proxy Node B (Local execution) + serviceC.sendRequest( + nodeB, + TransportActionProxy.getProxyAction("internal:test"), + TransportActionProxy.wrapRequest(nodeB, new SimpleTestRequest("TS_A", cancellable)), // Request + new TransportResponseHandler() { + @Override + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); + } + + @Override + public Executor executor() { + return TransportResponseHandler.TRANSPORT_WORKER; + } + + @Override + public void handleResponse(SimpleTestResponse response) { + try { + assertEquals("TS_B", response.targetNode); + } finally { + latch.countDown(); + } + } + + @Override + public void handleException(TransportException exp) { + try { + throw new AssertionError(exp); + } finally { + latch.countDown(); + } + } + } + ); + latch.await(); + + final var responseInstance = response.get(); + assertThat(responseInstance, notNullValue()); + responseInstance.decRef(); + assertBusy(() -> assertThat(responseInstance.hasReferences(), equalTo(false))); + } + + public void testSendLocalRequestBytesTransportResponseDifferentVersions() throws Exception { + final AtomicReference response = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(2); + + final boolean cancellable = randomBoolean(); + serviceB.registerRequestHandler("internal:test", randomExecutor(threadPool), SimpleTestRequest::new, (request, channel, task) -> { + try { + assertThat(task instanceof CancellableTask, equalTo(cancellable)); + assertEquals(request.sourceNode, "TS_A"); + + SimpleTestResponse tsB = new SimpleTestResponse("TS_B"); + try (RecyclerBytesStreamOutput out = serviceB.newNetworkBytesStream()) { + out.setTransportVersion(transportVersion1); + tsB.writeTo(out); + // simulate what happens in SearchQueryThenFetchAsyncAction with NodeQueryResponse + final BytesTransportResponse responseB = new BytesTransportResponse(out.moveToBytesReference(), transportVersion1); + channel.sendResponse(responseB); + response.set(responseB); + } finally { + tsB.decRef(); + } + } finally { + latch.countDown(); + } + }); + TransportActionProxy.registerProxyAction( + serviceB, + "internal:test", + cancellable, + // this is called by the conversion layer in ProxyRequestHandler + SimpleTestResponse::new, + new NamedWriteableRegistry(Collections.emptyList()) + ); AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); // Node A -> Proxy Node B (Local execution) @@ -324,6 +485,7 @@ public void handleException(TransportException exp) { } public void testException() throws InterruptedException { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); boolean cancellable = randomBoolean(); serviceA.registerRequestHandler( "internal:test", @@ -335,7 +497,7 @@ public void testException() throws InterruptedException { channel.sendResponse(response); } ); - TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); serviceB.registerRequestHandler( @@ -348,7 +510,7 @@ public void testException() throws InterruptedException { channel.sendResponse(response); } ); - TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceB, nodeC); serviceC.registerRequestHandler( "internal:test", @@ -358,7 +520,7 @@ public void testException() throws InterruptedException { throw new ElasticsearchException("greetings from TS_C"); } ); - TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); CountDownLatch latch = new CountDownLatch(1); serviceA.sendRequest( @@ -450,11 +612,17 @@ protected void closeInternal() {} SimpleTestResponse(StreamInput in) throws IOException { this.targetNode = in.readString(); + if (in.getTransportVersion().supports(transportVersion1)) { + in.readBoolean(); + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(targetNode); + if (out.getTransportVersion().supports(transportVersion1)) { + out.writeBoolean(true); + } } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 66210d43f2f7a..e586ed8679062 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.RemoteClusterActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; @@ -51,7 +52,8 @@ private TransportDeleteCcrRestoreSessionAction( String actionName, ActionFilters actionFilters, TransportService transportService, - CcrRestoreSourceService ccrRestoreService + CcrRestoreSourceService ccrRestoreService, + NamedWriteableRegistry namedWriteableRegistry ) { super( actionName, @@ -60,7 +62,13 @@ private TransportDeleteCcrRestoreSessionAction( ClearCcrRestoreSessionRequest::new, transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) ); - TransportActionProxy.registerProxyAction(transportService, actionName, false, in -> ActionResponse.Empty.INSTANCE); + TransportActionProxy.registerProxyAction( + transportService, + actionName, + false, + in -> ActionResponse.Empty.INSTANCE, + namedWriteableRegistry + ); this.ccrRestoreService = ccrRestoreService; } @@ -80,16 +88,22 @@ public static class InternalTransportAction extends TransportDeleteCcrRestoreSes public InternalTransportAction( ActionFilters actionFilters, TransportService transportService, - CcrRestoreSourceService ccrRestoreService + CcrRestoreSourceService ccrRestoreService, + NamedWriteableRegistry namedWriteableRegistry ) { - super(INTERNAL_NAME, actionFilters, transportService, ccrRestoreService); + super(INTERNAL_NAME, actionFilters, transportService, ccrRestoreService, namedWriteableRegistry); } } public static class TransportAction extends TransportDeleteCcrRestoreSessionAction { @Inject - public TransportAction(ActionFilters actionFilters, TransportService transportService, CcrRestoreSourceService ccrRestoreService) { - super(NAME, actionFilters, transportService, ccrRestoreService); + public TransportAction( + ActionFilters actionFilters, + TransportService transportService, + CcrRestoreSourceService ccrRestoreService, + NamedWriteableRegistry namedWriteableRegistry + ) { + super(NAME, actionFilters, transportService, ccrRestoreService, namedWriteableRegistry); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java index 1fb4fb6f0fb72..035eb88fb11b8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BigArrays; @@ -64,7 +65,8 @@ private TransportGetCcrRestoreFileChunkAction( BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, - CcrRestoreSourceService restoreSourceService + CcrRestoreSourceService restoreSourceService, + NamedWriteableRegistry namedWriteableRegistry ) { super( actionName, @@ -73,7 +75,13 @@ private TransportGetCcrRestoreFileChunkAction( GetCcrRestoreFileChunkRequest::new, transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) ); - TransportActionProxy.registerProxyAction(transportService, actionName, false, GetCcrRestoreFileChunkResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + actionName, + false, + GetCcrRestoreFileChunkResponse::new, + namedWriteableRegistry + ); this.restoreSourceService = restoreSourceService; this.bigArrays = bigArrays; } @@ -111,9 +119,10 @@ public InternalTransportAction( BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, - CcrRestoreSourceService restoreSourceService + CcrRestoreSourceService restoreSourceService, + NamedWriteableRegistry namedWriteableRegistry ) { - super(INTERNAL_NAME, bigArrays, transportService, actionFilters, restoreSourceService); + super(INTERNAL_NAME, bigArrays, transportService, actionFilters, restoreSourceService, namedWriteableRegistry); } } @@ -123,9 +132,10 @@ public TransportAction( BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, - CcrRestoreSourceService restoreSourceService + CcrRestoreSourceService restoreSourceService, + NamedWriteableRegistry namedWriteableRegistry ) { - super(NAME, bigArrays, transportService, actionFilters, restoreSourceService); + super(NAME, bigArrays, transportService, actionFilters, restoreSourceService, namedWriteableRegistry); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java index 540a6a8f7bcb5..f2bdda160bbf6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -23,6 +24,8 @@ import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver; import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege; +import java.util.Collections; + import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -54,21 +57,29 @@ public void testPrivilegeForActions() { } public void testActionNames() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); final CcrRestoreSourceService ccrRestoreSourceService = mock(CcrRestoreSourceService.class); - final var action = new ClearCcrRestoreSessionAction.TransportAction(actionFilters, transportService, ccrRestoreSourceService); + final var action = new ClearCcrRestoreSessionAction.TransportAction( + actionFilters, + transportService, + ccrRestoreSourceService, + namedWriteableRegistry + ); assertThat(action.actionName, equalTo(ClearCcrRestoreSessionAction.NAME)); final var internalAction = new ClearCcrRestoreSessionAction.InternalTransportAction( actionFilters, transportService, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); assertThat(internalAction.actionName, equalTo(ClearCcrRestoreSessionAction.INTERNAL_NAME)); } public void testRequestedShardIdMustBeConsistentWithSessionShardId() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); final CcrRestoreSourceService ccrRestoreSourceService = mock(CcrRestoreSourceService.class); @@ -87,7 +98,12 @@ public void testRequestedShardIdMustBeConsistentWithSessionShardId() { } }).when(ccrRestoreSourceService).ensureSessionShardIdConsistency(anyString(), any()); - final var action = new ClearCcrRestoreSessionAction.TransportAction(actionFilters, transportService, ccrRestoreSourceService); + final var action = new ClearCcrRestoreSessionAction.TransportAction( + actionFilters, + transportService, + ccrRestoreSourceService, + namedWriteableRegistry + ); final String sessionUUID = UUIDs.randomBase64UUID(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java index 61866dbf2029f..1b938b9150c6f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; @@ -27,6 +28,8 @@ import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver; import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege; +import java.util.Collections; + import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -58,6 +61,7 @@ public void testPrivilegeForActions() { } public void testActionNames() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final BigArrays bigArrays = mock(BigArrays.class); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); @@ -67,7 +71,8 @@ public void testActionNames() { bigArrays, transportService, actionFilters, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); assertThat(action.actionName, equalTo(GetCcrRestoreFileChunkAction.NAME)); @@ -75,12 +80,14 @@ public void testActionNames() { bigArrays, transportService, actionFilters, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); assertThat(internalAction.actionName, equalTo(GetCcrRestoreFileChunkAction.INTERNAL_NAME)); } public void testRequestedShardIdMustBeConsistentWithSessionShardId() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), ByteSizeValue.ofBytes(1024)); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); @@ -108,7 +115,8 @@ public void testRequestedShardIdMustBeConsistentWithSessionShardId() { bigArrays, transportService, actionFilters, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); final String expectedFileName = randomAlphaOfLengthBetween(3, 12); From 6413013f05e52ef993bf89ffaf341ec467dcc4ae Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Thu, 13 Nov 2025 23:28:08 -0500 Subject: [PATCH 2/3] Unmute tests --- muted-tests.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 59fe2d7f9162e..c6b25205ff291 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -398,15 +398,9 @@ tests: - class: org.elasticsearch.xpack.esql.inference.rerank.RerankOperatorTests method: testSimpleCircuitBreaking issue: https://github.com/elastic/elasticsearch/issues/133619 -- class: org.elasticsearch.upgrades.SearchStatesIT - method: testBWCSearchStates - issue: https://github.com/elastic/elasticsearch/issues/137681 - class: org.elasticsearch.readiness.ReadinessClusterIT method: testReadinessDuringRestartsNormalOrder issue: https://github.com/elastic/elasticsearch/issues/136955 -- class: org.elasticsearch.upgrades.SearchStatesIT - method: testCanMatch - issue: https://github.com/elastic/elasticsearch/issues/137687 - class: org.elasticsearch.xpack.inference.InferenceRestIT method: test {p0=inference/70_text_similarity_rank_retriever/Text similarity reranker with min_score zero includes all docs} issue: https://github.com/elastic/elasticsearch/issues/137732 From d50e3c7108060ae727d41a611b7794724e601fc3 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Thu, 13 Nov 2025 23:28:29 -0500 Subject: [PATCH 3/3] Fix --- .../action/search/SearchQueryThenFetchAsyncAction.java | 5 ++++- .../main/java/org/elasticsearch/search/SearchService.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 736f9da4414a5..b36c9536125fb 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -869,7 +869,10 @@ void onShardDone() { } } - ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(out.moveToBytesReference())); + ActionListener.respondAndRelease( + channelListener, + new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()) + ); } // Writes the "successful" response (see NodeQueryResponse for the corresponding read logic) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index aa9878e2db603..4e4015dfbe2fb 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -296,7 +296,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Setting.Property.NodeScope ); - private static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase").isEnabled(); + public static final boolean BATCHED_QUERY_PHASE_FEATURE_FLAG = new FeatureFlag("batched_query_phase").isEnabled(); /** * The size of the buffer used for memory accounting.