Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
8d77992
Tweaked CCS tests for debugging
Mikep86 Oct 22, 2025
c3e28a5
Added a method to compute service to build a query rewrite context
Mikep86 Oct 23, 2025
a1a1423
Add remote coordinator rewrite
Mikep86 Oct 23, 2025
cc28bb0
Added linear retriever cross-cluster search test
Mikep86 Oct 24, 2025
ac38af0
Added get inference fields API
Mikep86 Oct 24, 2025
0a15496
Use the correct inject annotation
Mikep86 Oct 24, 2025
5bb1f6e
Update query rewrite context to execute remote cluster actions
Mikep86 Oct 24, 2025
6fde53e
Use direct executor service
Mikep86 Oct 24, 2025
d246828
Add remote cluster action type
Mikep86 Oct 24, 2025
3f7a3fd
Disable coordinator rewrite
Mikep86 Oct 24, 2025
c2d5763
Fix test plugins
Mikep86 Oct 24, 2025
b8df821
Add fields and query to request
Mikep86 Oct 24, 2025
b8976c2
Add resolve wildcards to request
Mikep86 Oct 24, 2025
ad5b8f2
Build the inference fields map
Mikep86 Oct 24, 2025
f4add01
Get inference results
Mikep86 Oct 24, 2025
91204c9
Don't hard-code useDefaultFields
Mikep86 Oct 24, 2025
ee36f8d
Added code to semantic query builder to get remote inference results
Mikep86 Oct 27, 2025
81bf2bf
Added remote inference results map supplier
Mikep86 Oct 27, 2025
6a5c7bb
Update semantic query to remote ccs_minimize_roundtrips=false restric…
Mikep86 Oct 27, 2025
2b60947
Fix logic errors
Mikep86 Oct 27, 2025
81f2638
Update semantic query builder CCS test
Mikep86 Oct 27, 2025
988e344
Updated intercepted queries to handle ccs_minimize_roundtrips=false
Mikep86 Oct 27, 2025
6b7e784
Updated intercepted queries to detect when no inference fields are qu…
Mikep86 Oct 27, 2025
e48bd2e
Get remote inference results during local cluster coordinator rewrite…
Mikep86 Oct 27, 2025
503286a
Pre-allocate hashmap size
Mikep86 Oct 27, 2025
230a96c
Update match query builder CCS integration tests
Mikep86 Oct 27, 2025
9f0fcb1
Fix logic error
Mikep86 Oct 27, 2025
44a0f08
Remove debug code
Mikep86 Oct 27, 2025
b4d9b70
Revert changes to ClusterComputeHandler
Mikep86 Oct 27, 2025
54b14cc
Spotless
Mikep86 Oct 27, 2025
7dc2f11
Revert changes to ComputeService
Mikep86 Oct 27, 2025
f50bb6a
Revert changes to ResolvedIndices
Mikep86 Oct 27, 2025
d2e659e
Added ES|QL cross-cluster query test for semantic text fields
Mikep86 Oct 27, 2025
e6eec87
Merge branch 'main' into semantic-search_ccs-esql-discovery
elasticmachine Oct 28, 2025
d5284b1
Revert changes to Clusters
Mikep86 Oct 28, 2025
5b2d0df
Adjusted interception logic when ccs_minimize_roundtrips: true
Mikep86 Oct 28, 2025
e1ef7ea
Disable broken unit test
Mikep86 Oct 28, 2025
8a50bd3
Disable broken integration tests
Mikep86 Oct 28, 2025
49daf0f
Disable broken integration test
Mikep86 Oct 28, 2025
51a7a3c
Fix class cast exceptions
Mikep86 Oct 28, 2025
5aab46d
Return 400 error when attempting to run a CCS query on an outdated cl…
Mikep86 Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ResolvedIndices;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.DataTier;
Expand All @@ -36,11 +37,13 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -52,6 +55,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE;

/**
* Context object used to rewrite {@link QueryBuilder} instances into simplified version.
*/
Expand All @@ -72,6 +77,7 @@ public class QueryRewriteContext {
protected final Client client;
protected final LongSupplier nowInMillis;
private final List<BiConsumer<Client, ActionListener<?>>> asyncActions = new ArrayList<>();
private final Map<String, List<BiConsumer<RemoteClusterClient, ActionListener<?>>>> remoteAsyncActions = new HashMap<>();
protected boolean allowUnmappedFields;
protected boolean mapUnmappedFieldAsString;
protected Predicate<String> allowedFields;
Expand Down Expand Up @@ -346,22 +352,35 @@ public void registerAsyncAction(BiConsumer<Client, ActionListener<?>> asyncActio
asyncActions.add(asyncAction);
}

public void registerRemoteAsyncAction(String clusterAlias, BiConsumer<RemoteClusterClient, ActionListener<?>> asyncAction) {
List<BiConsumer<RemoteClusterClient, ActionListener<?>>> asyncActions = remoteAsyncActions.computeIfAbsent(
clusterAlias,
k -> new ArrayList<>()
);
asyncActions.add(asyncAction);
}

/**
* Returns <code>true</code> if there are any registered async actions.
*/
public boolean hasAsyncActions() {
return asyncActions.isEmpty() == false;
return asyncActions.isEmpty() == false || remoteAsyncActions.isEmpty() == false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some interplay here with remote and non-remote actions. In registerRemoteAsyncAction we're adding the remote ones in the non-remote list too. Thus, the check on asyncActions should be enough? Otherwise, should we not be mixing remote/non-remote at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, remote and local async actions are stored separate lists. This is necessary because the remote async actions are mapped by cluster alias.

}

/**
* Executes all registered async actions and notifies the listener once it's done. The value that is passed to the listener is always
* <code>null</code>. The list of registered actions is cleared once this method returns.
*/
public void executeAsyncActions(ActionListener<Void> listener) {
if (asyncActions.isEmpty()) {
if (asyncActions.isEmpty() && remoteAsyncActions.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would break out remote async actions into their own method.

Copy link
Contributor Author

@Mikep86 Mikep86 Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with breaking remote async actions out into their own method is that it adds considerable complexity to callers of QueryRewriteContext that want to ensure that all (i.e. local and remote) async actions are executed. IMO that should be the default, executing only local async actions could lead to strange edge cases.

Having a different method for executing remote async actions would have the following side effects:

  • Callers would need to remember to check both hasAsyncActions and hasRemoteAsyncActions
  • Callers would need to construct a GroupedActionListener that encapsulates calls to executeAsyncActions and executeRemoteAsyncActions to ensure that both are complete before asynchronously moving to the next rewrite iteration. See how executeAsyncActions is called in rewriteAndFetch for a concrete example of the additional complexity pushed to the caller.

listener.onResponse(null);
} else {
CountDown countDown = new CountDown(asyncActions.size());
int actionCount = asyncActions.size();
for (var remoteAsyncActionList : remoteAsyncActions.values()) {
actionCount += remoteAsyncActionList.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Are we double-counting remote actions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

CountDown countDown = new CountDown(actionCount);
ActionListener<?> internalListener = new ActionListener<>() {
@Override
public void onResponse(Object o) {
Expand All @@ -377,12 +396,28 @@ public void onFailure(Exception e) {
}
}
};

// make a copy to prevent concurrent modification exception
List<BiConsumer<Client, ActionListener<?>>> biConsumers = new ArrayList<>(asyncActions);
asyncActions.clear();
for (BiConsumer<Client, ActionListener<?>> action : biConsumers) {
action.accept(client, internalListener);
}

for (var entry : remoteAsyncActions.entrySet()) {
String clusterAlias = entry.getKey();
List<BiConsumer<RemoteClusterClient, ActionListener<?>>> remoteBiConsumers = entry.getValue();

RemoteClusterClient remoteClient = client.getRemoteClusterClient(
clusterAlias,
DIRECT_EXECUTOR_SERVICE,
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
);
for (BiConsumer<RemoteClusterClient, ActionListener<?>> action : remoteBiConsumers) {
action.accept(remoteClient, internalListener);
}
}
remoteAsyncActions.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.inference.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.RemoteClusterActionType;
import org.elasticsearch.cluster.metadata.InferenceFieldMetadata;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.inference.InferenceResults;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class GetInferenceFieldsAction extends ActionType<GetInferenceFieldsAction.Response> {
public static final GetInferenceFieldsAction INSTANCE = new GetInferenceFieldsAction();
public static final RemoteClusterActionType<Response> REMOTE_TYPE = new RemoteClusterActionType<>(INSTANCE.name(), Response::new);

public static final String NAME = "cluster:monitor/xpack/inference_fields/get";

public GetInferenceFieldsAction() {
super(NAME);
}

public static class Request extends ActionRequest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to add BWC serialization tests for these (and the associated responses) & make sure the json docs are added too

private final List<String> indices;
private final List<String> fields;
private final boolean resolveWildcards;
private final boolean useDefaultFields;
private final String query;

public Request(
List<String> indices,
List<String> fields,
boolean resolveWildcards,
boolean useDefaultFields,
@Nullable String query
) {
this.indices = indices;
this.fields = fields;
this.resolveWildcards = resolveWildcards;
this.useDefaultFields = useDefaultFields;
this.query = query;
}

public Request(StreamInput in) throws IOException {
super(in);
this.indices = in.readCollectionAsList(StreamInput::readString);
this.fields = in.readCollectionAsList(StreamInput::readString);
this.resolveWildcards = in.readBoolean();
this.useDefaultFields = in.readBoolean();
this.query = in.readOptionalString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringCollection(indices);
out.writeStringCollection(fields);
out.writeBoolean(resolveWildcards);
out.writeBoolean(useDefaultFields);
out.writeOptionalString(query);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

public List<String> getIndices() {
return Collections.unmodifiableList(indices);
}

public List<String> getFields() {
return Collections.unmodifiableList(fields);
}

public boolean resolveWildcards() {
return resolveWildcards;
}

public boolean useDefaultFields() {
return useDefaultFields;
}

public String getQuery() {
return query;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(indices, request.indices)
&& Objects.equals(fields, request.fields)
&& resolveWildcards == request.resolveWildcards
&& useDefaultFields == request.useDefaultFields
&& Objects.equals(query, request.query);
}

@Override
public int hashCode() {
return Objects.hash(indices, fields, resolveWildcards, useDefaultFields, query);
}
}

public static class Response extends ActionResponse {
private final Map<String, List<InferenceFieldMetadata>> inferenceFieldsMap;
private final Map<String, InferenceResults> inferenceResultsMap;

public Response(Map<String, List<InferenceFieldMetadata>> inferenceFieldsMap, Map<String, InferenceResults> inferenceResultsMap) {
this.inferenceFieldsMap = inferenceFieldsMap;
this.inferenceResultsMap = inferenceResultsMap;
}

public Response(StreamInput in) throws IOException {
this.inferenceFieldsMap = in.readImmutableMap(i -> i.readCollectionAsImmutableList(InferenceFieldMetadata::new));
this.inferenceResultsMap = in.readImmutableMap(i -> i.readNamedWriteable(InferenceResults.class));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(inferenceFieldsMap, StreamOutput::writeCollection);
out.writeMap(inferenceResultsMap, StreamOutput::writeNamedWriteable);
}

public Map<String, List<InferenceFieldMetadata>> getInferenceFieldsMap() {
return Collections.unmodifiableMap(this.inferenceFieldsMap);
}

public Map<String, InferenceResults> getInferenceResultsMap() {
return Collections.unmodifiableMap(this.inferenceResultsMap);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return Objects.equals(inferenceFieldsMap, response.inferenceFieldsMap)
&& Objects.equals(inferenceResultsMap, response.inferenceResultsMap);
}

@Override
public int hashCode() {
return Objects.hash(inferenceFieldsMap, inferenceResultsMap);
}
}
}
3 changes: 3 additions & 0 deletions x-pack/plugin/esql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ dependencies {
testImplementation('org.webjars.npm:fontsource__roboto-mono:4.5.7')

internalClusterTestImplementation project(":modules:mapper-extras")
internalClusterTestImplementation project(xpackModule('inference'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I found out the hard way, that ES|QL will enter some weird jarhell state and tests won't pass. You're going to want to make sure to not have this inference dependency for your non-draft PR. I solved this in my PR by refactoring the classes I needed to core.

internalClusterTestImplementation testArtifact(project(xpackModule('inference')))
internalClusterTestImplementation testArtifact(project(xpackModule('inference')), 'internalClusterTest')
}

tasks.named("dependencyLicenses").configure {
Expand Down
Loading