diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 8e222e7197180..37af070effc9f 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.client.internal.RemoteClusterClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -113,23 +114,28 @@ public TransportFieldCapabilitiesAction( @Override protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener listener) { - executeRequest(task, request, REMOTE_TYPE, listener); + executeRequest( + task, + request, + (remoteClient, remoteRequest, remoteListener) -> remoteClient.execute(REMOTE_TYPE, remoteRequest, remoteListener), + listener + ); } public void executeRequest( Task task, FieldCapabilitiesRequest request, - RemoteClusterActionType remoteAction, + RemoteRequestExecutor remoteRequestExecutor, ActionListener listener ) { // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can - searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteAction, l))); + searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteRequestExecutor, l))); } private void doExecuteForked( Task task, FieldCapabilitiesRequest request, - RemoteClusterActionType remoteAction, + RemoteRequestExecutor remoteRequestExecutor, ActionListener listener ) { if (ccsCheckCompatibility) { @@ -282,8 +288,8 @@ private void doExecuteForked( handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex); } }); - remoteClusterClient.execute( - remoteAction, + remoteRequestExecutor.executeRemoteRequest( + remoteClusterClient, remoteRequest, // The underlying transport service may call onFailure with a thread pool other than search_coordinator. // This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator. @@ -298,6 +304,14 @@ private void doExecuteForked( } } + public interface RemoteRequestExecutor { + void executeRemoteRequest( + RemoteClusterClient remoteClient, + FieldCapabilitiesRequest remoteRequest, + ActionListener remoteListener + ); + } + private static void checkIndexBlocks(ClusterState clusterState, String[] concreteIndices) { var blocks = clusterState.blocks(); if (blocks.global().isEmpty() && blocks.indices().isEmpty()) { diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/build.gradle b/x-pack/plugin/esql/qa/server/multi-clusters/build.gradle index 676729573b69d..aa19371685ce1 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/build.gradle +++ b/x-pack/plugin/esql/qa/server/multi-clusters/build.gradle @@ -18,8 +18,8 @@ dependencies { } def supportedVersion = bwcVersion -> { - // ESQL requires its own resolve_fields API - return bwcVersion.onOrAfter(Version.fromString("8.16.0")); + // CCS in ES|QL available in 8.13 + return bwcVersion.onOrAfter(Version.fromString("8.13.0")); } BuildParams.bwcVersions.withWireCompatible(supportedVersion) { bwcVersion, baseName -> diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 3e799730f7269..3cd70026c4133 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -104,7 +104,6 @@ public MultiClusterSpecIT( protected void shouldSkipTest(String testName) throws IOException { super.shouldSkipTest(testName); checkCapabilities(remoteClusterClient(), remoteFeaturesService(), testName, testCase); - assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api")); assumeFalse("can't test with _index metadata", hasIndexMetadata(testCase.query)); assumeTrue( "Test " + testName + " is skipped on " + Clusters.oldVersion(), diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java index 1f72827057c5b..dbeaed1596eff 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java @@ -67,7 +67,6 @@ record Doc(int id, String color, long data) { @Before public void setUpIndices() throws Exception { - assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api")); final String mapping = """ "properties": { "data": { "type": "long" }, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java index 2161efca1d2b4..f7e6793fc4fb3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RemoteClusterActionType; @@ -14,6 +15,7 @@ import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.RemoteClusterClient; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; @@ -27,7 +29,7 @@ public class EsqlResolveFieldsAction extends HandledTransportAction { public static final String NAME = "indices:data/read/esql/resolve_fields"; public static final ActionType TYPE = new ActionType<>(NAME); - public static final RemoteClusterActionType REMOTE_TYPE = new RemoteClusterActionType<>( + public static final RemoteClusterActionType RESOLVE_REMOTE_TYPE = new RemoteClusterActionType<>( NAME, FieldCapabilitiesResponse::new ); @@ -47,6 +49,19 @@ public EsqlResolveFieldsAction( @Override protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener listener) { - fieldCapsAction.executeRequest(task, request, REMOTE_TYPE, listener); + fieldCapsAction.executeRequest(task, request, this::executeRemoteRequest, listener); + } + + void executeRemoteRequest( + RemoteClusterClient remoteClient, + FieldCapabilitiesRequest remoteRequest, + ActionListener remoteListener + ) { + remoteClient.getConnection(remoteRequest, remoteListener.delegateFailure((l, conn) -> { + var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.ESQL_ORIGINAL_INDICES) + ? RESOLVE_REMOTE_TYPE + : TransportFieldCapabilitiesAction.REMOTE_TYPE; + remoteClient.execute(conn, remoteAction, remoteRequest, l); + })); } }