Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -113,23 +114,28 @@ public TransportFieldCapabilitiesAction(

@Override
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
executeRequest(task, request, REMOTE_TYPE, listener);
executeRequest(
task,
request,
(remoteClient, remoteRequest, remoteListener) -> remoteClient.execute(REMOTE_TYPE, remoteRequest, remoteListener),
listener
);
}

public void executeRequest(
Task task,
FieldCapabilitiesRequest request,
RemoteClusterActionType<FieldCapabilitiesResponse> remoteAction,
RemoteRequestExecutor remoteRequestExecutor,
ActionListener<FieldCapabilitiesResponse> listener
) {
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteAction, l)));
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteRequestExecutor, l)));
}

private void doExecuteForked(
Task task,
FieldCapabilitiesRequest request,
RemoteClusterActionType<FieldCapabilitiesResponse> remoteAction,
RemoteRequestExecutor remoteRequestExecutor,
ActionListener<FieldCapabilitiesResponse> listener
) {
if (ccsCheckCompatibility) {
Expand Down Expand Up @@ -282,8 +288,8 @@ private void doExecuteForked(
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
}
});
remoteClusterClient.execute(
remoteAction,
remoteRequestExecutor.executeRemoteRequest(
remoteClusterClient,
remoteRequest,
// The underlying transport service may call onFailure with a thread pool other than search_coordinator.
// This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator.
Expand All @@ -298,6 +304,14 @@ private void doExecuteForked(
}
}

public interface RemoteRequestExecutor {
void executeRemoteRequest(
RemoteClusterClient remoteClient,
FieldCapabilitiesRequest remoteRequest,
ActionListener<FieldCapabilitiesResponse> remoteListener
);
}

private static void checkIndexBlocks(ClusterState clusterState, String[] concreteIndices) {
var blocks = clusterState.blocks();
if (blocks.global().isEmpty() && blocks.indices().isEmpty()) {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugin/esql/qa/server/multi-clusters/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ dependencies {
}

def supportedVersion = bwcVersion -> {
// ESQL requires its own resolve_fields API
return bwcVersion.onOrAfter(Version.fromString("8.16.0"));
// CCS in ES|QL available in 8.13
return bwcVersion.onOrAfter(Version.fromString("8.13.0"));
}

BuildParams.bwcVersions.withWireCompatible(supportedVersion) { bwcVersion, baseName ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public MultiClusterSpecIT(
protected void shouldSkipTest(String testName) throws IOException {
super.shouldSkipTest(testName);
checkCapabilities(remoteClusterClient(), remoteFeaturesService(), testName, testCase);
assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api"));
assumeFalse("can't test with _index metadata", hasIndexMetadata(testCase.query));
assumeTrue(
"Test " + testName + " is skipped on " + Clusters.oldVersion(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ record Doc(int id, String color, long data) {

@Before
public void setUpIndices() throws Exception {
assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api"));
final String mapping = """
"properties": {
"data": { "type": "long" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.RemoteClusterActionType;
Expand All @@ -14,6 +15,7 @@
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
Expand All @@ -27,7 +29,7 @@
public class EsqlResolveFieldsAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
public static final String NAME = "indices:data/read/esql/resolve_fields";
public static final ActionType<FieldCapabilitiesResponse> TYPE = new ActionType<>(NAME);
public static final RemoteClusterActionType<FieldCapabilitiesResponse> REMOTE_TYPE = new RemoteClusterActionType<>(
public static final RemoteClusterActionType<FieldCapabilitiesResponse> RESOLVE_REMOTE_TYPE = new RemoteClusterActionType<>(
NAME,
FieldCapabilitiesResponse::new
);
Expand All @@ -47,6 +49,19 @@ public EsqlResolveFieldsAction(

@Override
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
fieldCapsAction.executeRequest(task, request, REMOTE_TYPE, listener);
fieldCapsAction.executeRequest(task, request, this::executeRemoteRequest, listener);
}

void executeRemoteRequest(
RemoteClusterClient remoteClient,
FieldCapabilitiesRequest remoteRequest,
ActionListener<FieldCapabilitiesResponse> remoteListener
) {
remoteClient.getConnection(remoteRequest, remoteListener.delegateFailure((l, conn) -> {
var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.ESQL_ORIGINAL_INDICES)
? RESOLVE_REMOTE_TYPE
: TransportFieldCapabilitiesAction.REMOTE_TYPE;
remoteClient.execute(conn, remoteAction, remoteRequest, l);
}));
}
}