diff --git a/docs/changelog/127734.yaml b/docs/changelog/127734.yaml new file mode 100644 index 0000000000000..d33b201744c46 --- /dev/null +++ b/docs/changelog/127734.yaml @@ -0,0 +1,5 @@ +pr: 127734 +summary: Run coordinating `can_match` in field-caps +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/fieldcaps/FieldCapsWithFilterIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/fieldcaps/FieldCapsWithFilterIT.java new file mode 100644 index 0000000000000..d2765342e6a86 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/fieldcaps/FieldCapsWithFilterIT.java @@ -0,0 +1,181 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.fieldcaps; + +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.PointValues; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.shard.IndexLongFieldRange; +import org.elasticsearch.index.shard.ShardLongFieldRange; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; + +public class FieldCapsWithFilterIT extends ESIntegTestCase { + @Override + protected boolean addMockInternalEngine() { + return false; + } + + private static class EngineWithExposingTimestamp extends InternalEngine { + EngineWithExposingTimestamp(EngineConfig engineConfig) { + super(engineConfig); + assert IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(config().getIndexSettings().getSettings()) : "require read-only index"; + } + + @Override + public ShardLongFieldRange getRawFieldRange(String field) { + try (Searcher searcher = acquireSearcher("test")) { + final DirectoryReader directoryReader = searcher.getDirectoryReader(); + + final byte[] minPackedValue = PointValues.getMinPackedValue(directoryReader, field); + final byte[] maxPackedValue = PointValues.getMaxPackedValue(directoryReader, field); + if (minPackedValue == null || maxPackedValue == null) { + assert minPackedValue == null && maxPackedValue == null + : Arrays.toString(minPackedValue) + "-" + Arrays.toString(maxPackedValue); + return ShardLongFieldRange.EMPTY; + } + + return ShardLongFieldRange.of(LongPoint.decodeDimension(minPackedValue, 0), LongPoint.decodeDimension(maxPackedValue, 0)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + public static class ExposingTimestampEnginePlugin extends Plugin implements EnginePlugin { + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) { + return Optional.of(EngineWithExposingTimestamp::new); + } else { + return Optional.of(new InternalEngineFactory()); + } + } + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), ExposingTimestampEnginePlugin.class); + } + + void createIndexAndIndexDocs(String index, Settings.Builder indexSettings, long timestamp, boolean exposeTimestamp) throws Exception { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate(index) + .setSettings(indexSettings) + .setMapping("@timestamp", "type=date", "position", "type=long") + ); + int numDocs = between(100, 500); + for (int i = 0; i < numDocs; i++) { + client.prepareIndex(index).setSource("position", i, "@timestamp", timestamp + i).get(); + } + if (exposeTimestamp) { + client.admin().indices().prepareClose(index).get(); + client.admin() + .indices() + .prepareUpdateSettings(index) + .setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build()) + .get(); + client.admin().indices().prepareOpen(index).get(); + assertBusy(() -> { + IndexLongFieldRange timestampRange = clusterService().state().metadata().index(index).getTimestampRange(); + assertTrue(Strings.toString(timestampRange), timestampRange.containsAllShardRanges()); + }); + } else { + client.admin().indices().prepareRefresh(index).get(); + } + } + + public void testSkipUnmatchedShards() throws Exception { + long oldTimestamp = randomLongBetween(10_000_000, 20_000_000); + long newTimestamp = randomLongBetween(30_000_000, 50_000_000); + String redNode = internalCluster().startDataOnlyNode(); + String blueNode = internalCluster().startDataOnlyNode(); + createIndexAndIndexDocs( + "index_old", + indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", redNode), + oldTimestamp, + true + ); + internalCluster().stopNode(redNode); + createIndexAndIndexDocs( + "index_new", + indexSettings(between(1, 5), 0).put("index.routing.allocation.include._name", blueNode), + newTimestamp, + false + ); + // fails without index filter + { + FieldCapabilitiesRequest request = new FieldCapabilitiesRequest(); + request.indices("index_*"); + request.fields("*"); + request.setMergeResults(false); + if (randomBoolean()) { + request.indexFilter(new RangeQueryBuilder("@timestamp").from(oldTimestamp)); + } + var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request)); + assertThat(response.getIndexResponses(), hasSize(1)); + assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new")); + assertThat(response.getFailures(), hasSize(1)); + assertThat(response.getFailures().get(0).getIndices(), equalTo(new String[] { "index_old" })); + assertThat(response.getFailures().get(0).getException(), instanceOf(NoShardAvailableActionException.class)); + } + // skip unavailable shards with index filter + { + FieldCapabilitiesRequest request = new FieldCapabilitiesRequest(); + request.indices("index_*"); + request.fields("*"); + request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp)); + request.setMergeResults(false); + var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request)); + assertThat(response.getIndexResponses(), hasSize(1)); + assertThat(response.getIndexResponses().get(0).getIndexName(), equalTo("index_new")); + assertThat(response.getFailures(), empty()); + } + // skip both indices on the coordinator, one the data nodes + { + FieldCapabilitiesRequest request = new FieldCapabilitiesRequest(); + request.indices("index_*"); + request.fields("*"); + request.indexFilter(new RangeQueryBuilder("@timestamp").from(newTimestamp * 2L)); + request.setMergeResults(false); + var response = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, request)); + assertThat(response.getIndexResponses(), empty()); + assertThat(response.getFailures(), empty()); + } + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java index cbd22856f09a2..5fa2ec29e3bdf 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java @@ -928,6 +928,10 @@ static void unblockOnRewrite() { @Override protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException { + // skip rewriting on the coordinator + if (queryRewriteContext.convertToCoordinatorRewriteContext() != null) { + return this; + } try { blockingLatch.await(); } catch (InterruptedException e) { diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index efebe6b485fde..404d6517406d8 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -220,6 +220,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING_8_19 = def(8_841_0_29); public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION_8_19 = def(8_841_0_30); public static final TransportVersion ML_INFERENCE_HUGGING_FACE_CHAT_COMPLETION_ADDED_8_19 = def(8_841_0_31); + public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(8_841_0_32); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java index 88eb2ef4fb13d..efbde6264e91c 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -37,6 +38,8 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind public static final String NAME = "field_caps_request"; public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed(); + private String clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + private String[] indices = Strings.EMPTY_ARRAY; private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; private String[] fields = Strings.EMPTY_ARRAY; @@ -67,6 +70,11 @@ public FieldCapabilitiesRequest(StreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) { includeEmptyFields = in.readBoolean(); } + if (in.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) { + clusterAlias = in.readOptionalString(); + } else { + clusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + } } public FieldCapabilitiesRequest() {} @@ -90,6 +98,14 @@ public void setMergeResults(boolean mergeResults) { this.mergeResults = mergeResults; } + void clusterAlias(String clusterAlias) { + this.clusterAlias = clusterAlias; + } + + String clusterAlias() { + return clusterAlias; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -108,6 +124,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) { out.writeBoolean(includeEmptyFields); } + if (out.getTransportVersion().onOrAfter(TransportVersions.FIELD_CAPS_ADD_CLUSTER_ALIAS)) { + out.writeOptionalString(clusterAlias); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java index 15577632176f5..afcfd3f9533cc 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java @@ -24,8 +24,14 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -69,6 +75,7 @@ final class RequestDispatcher { RequestDispatcher( ClusterService clusterService, TransportService transportService, + CoordinatorRewriteContextProvider coordinatorRewriteContextProvider, Task parentTask, FieldCapabilitiesRequest fieldCapsRequest, OriginalIndices originalIndices, @@ -99,8 +106,14 @@ final class RequestDispatcher { onIndexFailure.accept(index, e); continue; } - final IndexSelector indexResult = new IndexSelector(shardIts); - if (indexResult.nodeToShards.isEmpty()) { + final IndexSelector indexResult = new IndexSelector( + fieldCapsRequest.clusterAlias(), + shardIts, + fieldCapsRequest.indexFilter(), + nowInMillis, + coordinatorRewriteContextProvider + ); + if (indexResult.nodeToShards.isEmpty() && indexResult.unmatchedShardIds.isEmpty()) { onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy")); } else { this.indexSelectors.put(index, indexResult); @@ -249,10 +262,34 @@ private static class IndexSelector { private final Set unmatchedShardIds = new HashSet<>(); private final Map failures = new HashMap<>(); - IndexSelector(List shardIts) { + IndexSelector( + String clusterAlias, + List shardIts, + QueryBuilder indexFilter, + long nowInMillis, + CoordinatorRewriteContextProvider coordinatorRewriteContextProvider + ) { for (ShardIterator shardIt : shardIts) { - for (ShardRouting shard : shardIt) { - nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard); + boolean canMatch = true; + final ShardId shardId = shardIt.shardId(); + if (indexFilter != null && indexFilter instanceof MatchAllQueryBuilder == false) { + var coordinatorRewriteContext = coordinatorRewriteContextProvider.getCoordinatorRewriteContext(shardId.getIndex()); + if (coordinatorRewriteContext != null) { + var shardRequest = new ShardSearchRequest(shardId, nowInMillis, AliasFilter.EMPTY, clusterAlias); + shardRequest.source(new SearchSourceBuilder().query(indexFilter)); + try { + canMatch = SearchService.queryStillMatchesAfterRewrite(shardRequest, coordinatorRewriteContext); + } catch (Exception e) { + // treat as if shard is still a potential match + } + } + } + if (canMatch) { + for (ShardRouting shard : shardIt) { + nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard); + } + } else { + unmatchedShardIds.add(shardId); } } } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 37af070effc9f..a24ed3bfd203b 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -240,6 +240,7 @@ private void doExecuteForked( final RequestDispatcher requestDispatcher = new RequestDispatcher( clusterService, transportService, + indicesService.getCoordinatorRewriteContextProvider(() -> nowInMillis), task, request, localIndices, @@ -263,7 +264,7 @@ private void doExecuteForked( searchCoordinationExecutor, RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE ); - FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis); + FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis); ActionListener remoteListener = ActionListener.wrap(response -> { for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) { String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName()); @@ -350,11 +351,13 @@ private void mergeIndexResponses( } private static FieldCapabilitiesRequest prepareRemoteRequest( + String clusterAlias, FieldCapabilitiesRequest request, OriginalIndices originalIndices, long nowInMillis ) { FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest(); + remoteRequest.clusterAlias(clusterAlias); remoteRequest.setMergeResults(false); // we need to merge on this node remoteRequest.indicesOptions(originalIndices.indicesOptions()); remoteRequest.indices(originalIndices.indices()); diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java index c877ee13abe17..e1315c290ddea 100644 --- a/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java @@ -45,6 +45,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexVersions; +import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.shard.ShardId; @@ -129,6 +130,7 @@ public void testHappyCluster() throws Exception { final RequestDispatcher dispatcher = new RequestDispatcher( mockClusterService(clusterState), transportService, + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -198,6 +200,7 @@ public void testRetryThenOk() throws Exception { final RequestDispatcher dispatcher = new RequestDispatcher( mockClusterService(clusterState), transportService, + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -318,6 +321,7 @@ public void testRetryButFails() throws Exception { final RequestDispatcher dispatcher = new RequestDispatcher( mockClusterService(clusterState), transportService, + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -440,6 +444,7 @@ public void testSuccessWithAnyMatch() throws Exception { final RequestDispatcher dispatcher = new RequestDispatcher( mockClusterService(clusterState), transportService, + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -536,6 +541,7 @@ public void testStopAfterAllShardsUnmatched() throws Exception { final RequestDispatcher dispatcher = new RequestDispatcher( mockClusterService(clusterState), transportService, + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -626,6 +632,7 @@ public void testFailWithSameException() throws Exception { final RequestDispatcher dispatcher = new RequestDispatcher( mockClusterService(clusterState), transportService, + coordinatorRewriteContextProvider(), newRandomParentTask(), randomFieldCapRequest(withFilter), OriginalIndices.NONE, @@ -1012,4 +1019,8 @@ static ClusterService mockClusterService(ClusterState clusterState) { when(clusterService.operationRouting()).thenReturn(operationRouting); return clusterService; } + + static CoordinatorRewriteContextProvider coordinatorRewriteContextProvider() { + return mock(CoordinatorRewriteContextProvider.class); + } }