From d569fb8254fe13a0a6f425a46e86326191b82c88 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 8 Oct 2025 13:22:41 +0200 Subject: [PATCH 1/5] Populate EsqlExecutionInfo.clusterInfo from field caps response --- .../xpack/esql/session/EsqlCCSUtils.java | 22 +++++++++++++++++++ .../xpack/esql/session/EsqlSession.java | 3 +-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 752165df9dd7f..c6740077f9c8f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -40,7 +40,9 @@ import java.util.Objects; import java.util.Set; +import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toSet; public class EsqlCCSUtils { @@ -351,6 +353,26 @@ public static void initCrossClusterState( } } + public static void initCrossClusterState(Set resolvedIndices, EsqlExecutionInfo executionInfo, XPackLicenseState licenseState) { + executionInfo.clusterInfoInitializing(true); + try { + resolvedIndices.stream() + .map(RemoteClusterAware::splitIndexName) + .collect(groupingBy(it -> it[0], mapping(it -> it[1], joining(",")))) + .forEach((clusterAlias, indexExpr) -> { + executionInfo.swapCluster(clusterAlias, (k, v) -> { + assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; + return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); + }); + }); + } finally { + executionInfo.clusterInfoInitializing(false); + } + if (executionInfo.isCrossClusterSearch() && EsqlLicenseChecker.isCcsAllowed(licenseState) == false) { + throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState); + } + } + /** * Mark cluster with a final status (success or failure). * Most metrics are set to 0 if not set yet, except for "took" which is set to the total time taken so far. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index ded34107f5aec..5ce1055a6fb9d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -453,8 +453,6 @@ private void resolveIndices( PreAnalysisResult result, ActionListener logicalPlanListener ) { - EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo); - SubscribableListener.newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l)) .andThenApply(r -> { if (r.indices.isValid() @@ -731,6 +729,7 @@ private void preAnalyzeMainIndices( preAnalysis.supportsAggregateMetricDouble(), preAnalysis.supportsDenseVector(), listener.delegateFailureAndWrap((l, indexResolution) -> { + EsqlCCSUtils.initCrossClusterState(indexResolution.resolvedIndices(), executionInfo, verifier.licenseState()); EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures()); l.onResponse(result.withIndices(indexResolution)); }) From 74193e88bb6c91deeb64f3f2ab925d93113c71c2 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 8 Oct 2025 13:43:07 +0200 Subject: [PATCH 2/5] fix unit test --- .../xpack/esql/session/EsqlCCSUtils.java | 56 ++----------------- .../xpack/esql/session/EsqlCCSUtilsTests.java | 42 +++++--------- 2 files changed, 18 insertions(+), 80 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index c6740077f9c8f..6f4b06e1833dc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -8,20 +8,16 @@ package org.elasticsearch.xpack.esql.session; import org.elasticsearch.ElasticsearchSecurityException; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.VerificationException; @@ -29,7 +25,6 @@ import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster; import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.index.IndexResolution; -import org.elasticsearch.xpack.esql.plan.IndexPattern; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import java.util.ArrayList; @@ -304,61 +299,18 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { } /** - * Checks the index expression for the presence of remote clusters. + * Checks resolved indices for the presence of remote clusters. * If found, it will ensure that the caller has a valid Enterprise (or Trial) license on the querying cluster * as well as initialize corresponding cluster state in execution info. * @throws org.elasticsearch.ElasticsearchStatusException if the license is not valid (or present) for ES|QL CCS search. */ - public static void initCrossClusterState( - IndicesExpressionGrouper indicesGrouper, - XPackLicenseState licenseState, - IndexPattern indexPattern, - EsqlExecutionInfo executionInfo - ) throws ElasticsearchStatusException { - if (indexPattern == null) { - return; - } - try { - var groupedIndices = indicesGrouper.groupIndices( - IndicesOptions.DEFAULT, - Strings.splitStringByCommaToArray(indexPattern.indexPattern()), - false - ); - - executionInfo.clusterInfoInitializing(true); - // initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error - // so that the CCS telemetry handler can recognize that this error is CCS-related - try { - for (var entry : groupedIndices.entrySet()) { - final String clusterAlias = entry.getKey(); - final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); - executionInfo.swapCluster(clusterAlias, (k, v) -> { - assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; - return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); - }); - } - } finally { - executionInfo.clusterInfoInitializing(false); - } - - if (executionInfo.isCrossClusterSearch() && EsqlLicenseChecker.isCcsAllowed(licenseState) == false) { - throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState); - } - } catch (NoSuchRemoteClusterException e) { - if (EsqlLicenseChecker.isCcsAllowed(licenseState)) { - throw e; - } else { - throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState); - } - } - } - public static void initCrossClusterState(Set resolvedIndices, EsqlExecutionInfo executionInfo, XPackLicenseState licenseState) { executionInfo.clusterInfoInitializing(true); try { resolvedIndices.stream() - .map(RemoteClusterAware::splitIndexName) - .collect(groupingBy(it -> it[0], mapping(it -> it[1], joining(",")))) + .collect( + groupingBy(RemoteClusterAware::parseClusterAlias, mapping(it -> RemoteClusterAware.splitIndexName(it)[1], joining(","))) + ) .forEach((clusterAlias, indexExpr) -> { executionInfo.swapCluster(clusterAlias, (k, v) -> { assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index 3a2eaa544c6ab..1607767c2c898 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.index.IndexResolution; -import org.elasticsearch.xpack.esql.plan.IndexPattern; import org.elasticsearch.xpack.esql.type.EsFieldTests; import org.hamcrest.Matcher; @@ -45,7 +44,6 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.initCrossClusterState; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -666,34 +664,32 @@ public void testConcreteIndexRequested() { } public void testInitCrossClusterState() { - final TestIndicesExpressionGrouper indicesGrouper = new TestIndicesExpressionGrouper(); - // local only search works with any license state { - var localOnly = new IndexPattern(EMPTY, randomFrom("idx", "idx1,idx2*")); + var localOnly = randomFrom(Set.of("idx"), Set.of("idx1", "idx2")); - assertLicenseCheckPasses(indicesGrouper, null, localOnly, ""); + assertLicenseCheckPasses(localOnly, null, ""); for (var mode : License.OperationMode.values()) { - assertLicenseCheckPasses(indicesGrouper, activeLicenseStatus(mode), localOnly, ""); - assertLicenseCheckPasses(indicesGrouper, inactiveLicenseStatus(mode), localOnly, ""); + assertLicenseCheckPasses(localOnly, activeLicenseStatus(mode), ""); + assertLicenseCheckPasses(localOnly, inactiveLicenseStatus(mode), ""); } } // cross-cluster search requires a valid (active, non-expired) enterprise license OR a valid trial license { - var remote = new IndexPattern(EMPTY, randomFrom("idx,remote:idx", "idx1,remote:idx2*,remote:logs")); + var remote = Set.of("idx", "remote:idx"); var supportedLicenses = EnumSet.of(License.OperationMode.TRIAL, License.OperationMode.ENTERPRISE); var unsupportedLicenses = EnumSet.complementOf(supportedLicenses); - assertLicenseCheckFails(indicesGrouper, null, remote, "none"); + assertLicenseCheckFails(remote, null, "none"); for (var mode : supportedLicenses) { - assertLicenseCheckPasses(indicesGrouper, activeLicenseStatus(mode), remote, "", "remote"); - assertLicenseCheckFails(indicesGrouper, inactiveLicenseStatus(mode), remote, "expired " + nameOf(mode) + " license"); + assertLicenseCheckPasses(remote, activeLicenseStatus(mode), "", "remote"); + assertLicenseCheckFails(remote, inactiveLicenseStatus(mode), "expired " + nameOf(mode) + " license"); } for (var mode : unsupportedLicenses) { - assertLicenseCheckFails(indicesGrouper, activeLicenseStatus(mode), remote, "active " + nameOf(mode) + " license"); - assertLicenseCheckFails(indicesGrouper, inactiveLicenseStatus(mode), remote, "expired " + nameOf(mode) + " license"); + assertLicenseCheckFails(remote, activeLicenseStatus(mode), "active " + nameOf(mode) + " license"); + assertLicenseCheckFails(remote, inactiveLicenseStatus(mode), "expired " + nameOf(mode) + " license"); } } } @@ -706,29 +702,19 @@ private static XPackLicenseState createLicenseState(XPackLicenseStatus status) { return status != null ? new XPackLicenseState(System::currentTimeMillis, status) : null; } - private void assertLicenseCheckPasses( - TestIndicesExpressionGrouper indicesGrouper, - XPackLicenseStatus status, - IndexPattern pattern, - String... expectedRemotes - ) { + private void assertLicenseCheckPasses(Set resolvedIndices, XPackLicenseStatus status, String... expectedRemotes) { var executionInfo = new EsqlExecutionInfo(true); - initCrossClusterState(indicesGrouper, createLicenseState(status), pattern, executionInfo); + initCrossClusterState(resolvedIndices, executionInfo, createLicenseState(status)); assertThat(executionInfo.clusterAliases(), containsInAnyOrder(expectedRemotes)); } - private void assertLicenseCheckFails( - TestIndicesExpressionGrouper indicesGrouper, - XPackLicenseStatus licenseStatus, - IndexPattern pattern, - String expectedErrorMessageSuffix - ) { + private void assertLicenseCheckFails(Set resolvedIndices, XPackLicenseStatus licenseStatus, String expectedErrorMessageSuffix) { ElasticsearchStatusException e = expectThrows( ElasticsearchStatusException.class, equalTo( "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: " + expectedErrorMessageSuffix ), - () -> initCrossClusterState(indicesGrouper, createLicenseState(licenseStatus), pattern, new EsqlExecutionInfo(true)) + () -> initCrossClusterState(resolvedIndices, new EsqlExecutionInfo(true), createLicenseState(licenseStatus)) ); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); } From 66eb8af17c0a73f78377b84d9fe0f76bd6077bdf Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 8 Oct 2025 16:23:43 +0200 Subject: [PATCH 3/5] fix --- .../xpack/esql/session/EsqlSession.java | 55 ++++++++----------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 5ce1055a6fb9d..27624d6a47db2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -126,7 +126,6 @@ public interface PlanRunner { private final Mapper mapper; private final PhysicalPlanOptimizer physicalPlanOptimizer; private final PlanTelemetry planTelemetry; - private final IndicesExpressionGrouper indicesExpressionGrouper; private final InferenceService inferenceService; private final RemoteClusterService remoteClusterService; private final BlockFactory blockFactory; @@ -148,7 +147,7 @@ public EsqlSession( Mapper mapper, Verifier verifier, PlanTelemetry planTelemetry, - IndicesExpressionGrouper indicesExpressionGrouper, + IndicesExpressionGrouper indicesExpressionGrouper, // TODO remove TransportActionServices services ) { this.sessionId = sessionId; @@ -163,7 +162,6 @@ public EsqlSession( this.logicalPlanOptimizer = logicalPlanOptimizer; this.physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration)); this.planTelemetry = planTelemetry; - this.indicesExpressionGrouper = indicesExpressionGrouper; this.inferenceService = services.inferenceService(); this.preMapper = new PreMapper(services); this.remoteClusterService = services.transportService().getRemoteClusterService(); @@ -706,35 +704,28 @@ private void preAnalyzeMainIndices( ThreadPool.Names.SYSTEM_READ ); if (preAnalysis.indexPattern() != null) { - if (executionInfo.clusterAliases().isEmpty()) { - // return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index) - listener.onResponse( - result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of()))) - ); - } else { - indexResolver.resolveAsMergedMapping( - preAnalysis.indexPattern().indexPattern(), - result.fieldNames, - // Maybe if no indices are returned, retry without index mode and provide a clearer error message. - switch (preAnalysis.indexMode()) { - case IndexMode.TIME_SERIES -> { - var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName()); - yield requestFilter != null - ? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter) - : indexModeFilter; - } - default -> requestFilter; - }, - preAnalysis.indexMode() == IndexMode.TIME_SERIES, - preAnalysis.supportsAggregateMetricDouble(), - preAnalysis.supportsDenseVector(), - listener.delegateFailureAndWrap((l, indexResolution) -> { - EsqlCCSUtils.initCrossClusterState(indexResolution.resolvedIndices(), executionInfo, verifier.licenseState()); - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures()); - l.onResponse(result.withIndices(indexResolution)); - }) - ); - } + indexResolver.resolveAsMergedMapping( + preAnalysis.indexPattern().indexPattern(), + result.fieldNames, + // Maybe if no indices are returned, retry without index mode and provide a clearer error message. + switch (preAnalysis.indexMode()) { + case IndexMode.TIME_SERIES -> { + var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName()); + yield requestFilter != null + ? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter) + : indexModeFilter; + } + default -> requestFilter; + }, + preAnalysis.indexMode() == IndexMode.TIME_SERIES, + preAnalysis.supportsAggregateMetricDouble(), + preAnalysis.supportsDenseVector(), + listener.delegateFailureAndWrap((l, indexResolution) -> { + EsqlCCSUtils.initCrossClusterState(indexResolution.resolvedIndices(), executionInfo, verifier.licenseState()); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures()); + l.onResponse(result.withIndices(indexResolution)); + }) + ); } else { // occurs when dealing with local relations (row a = 1) listener.onResponse(result.withIndices(IndexResolution.invalid("[none specified]"))); From fba9246e505da20e4804814cd79470d588ff8fbb Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 8 Oct 2025 16:28:11 +0200 Subject: [PATCH 4/5] remove unused references --- .../xpack/esql/execution/PlanExecutor.java | 3 --- .../esql/plugin/TransportEsqlQueryAction.java | 1 - .../xpack/esql/session/EsqlSession.java | 2 -- .../org/elasticsearch/xpack/esql/CsvTests.java | 1 - .../esql/telemetry/PlanExecutorMetricsTests.java | 14 ++------------ 5 files changed, 2 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java index 974b73718ff0b..391fa0badee65 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.esql.execution; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; @@ -76,7 +75,6 @@ public void esql( FoldContext foldContext, EnrichPolicyResolver enrichPolicyResolver, EsqlExecutionInfo executionInfo, - IndicesExpressionGrouper indicesExpressionGrouper, EsqlSession.PlanRunner planRunner, TransportActionServices services, ActionListener listener @@ -94,7 +92,6 @@ public void esql( mapper, verifier, planTelemetry, - indicesExpressionGrouper, services ); QueryMetric clientId = QueryMetric.fromString("rest"); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index db702cdc325db..464dcc05e44b4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -279,7 +279,6 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 27624d6a47db2..b9554e9cfb763 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -29,7 +29,6 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchShardTarget; @@ -147,7 +146,6 @@ public EsqlSession( Mapper mapper, Verifier verifier, PlanTelemetry planTelemetry, - IndicesExpressionGrouper indicesExpressionGrouper, // TODO remove TransportActionServices services ) { this.sessionId = sessionId; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 1c822a11a811f..9913982ed59d6 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -595,7 +595,6 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { mapper, TEST_VERIFIER, new PlanTelemetry(functionRegistry), - null, EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES ); TestPhysicalOperationProviders physicalOperationProviders = testOperationProviders(foldCtx, testDatasets); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java index 7a2565aaad9ce..5a5902883e88c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/PlanExecutorMetricsTests.java @@ -9,13 +9,11 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.fieldcaps.FieldCapabilities; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesBuilder; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; import org.elasticsearch.action.fieldcaps.IndexFieldCapabilitiesBuilder; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -23,7 +21,6 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.SlowLogFieldProvider; import org.elasticsearch.index.SlowLogFields; -import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; @@ -68,12 +65,12 @@ public class PlanExecutorMetricsTests extends ESTestCase { private ThreadPool threadPool; @Before - public void setUpThreadPool() throws Exception { + public void setUpThreadPool() { threadPool = new TestThreadPool(PlanExecutorMetricsTests.class.getSimpleName()); } @After - public void shutdownThreadPool() throws Exception { + public void shutdownThreadPool() { terminate(threadPool); } @@ -129,7 +126,6 @@ public void testFailedMetric() { String[] indices = new String[] { "test" }; Client qlClient = mock(Client.class); - IndexResolver idxResolver = new IndexResolver(qlClient); // simulate a valid field_caps response so we can parse and correctly analyze de query FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class); when(fieldCapabilitiesResponse.getIndices()).thenReturn(indices); @@ -164,10 +160,6 @@ public void testFailedMetric() { // test a failed query: xyz field doesn't exist request.query("from test | stats m = max(xyz)"); EsqlSession.PlanRunner runPhase = (p, r) -> fail("this shouldn't happen"); - IndicesExpressionGrouper groupIndicesByCluster = (indicesOptions, indexExpressions, returnLocalAll) -> Map.of( - "", - new OriginalIndices(new String[] { "test" }, IndicesOptions.DEFAULT) - ); planExecutor.esql( request, @@ -176,7 +168,6 @@ public void testFailedMetric() { FoldContext.small(), enrichResolver, new EsqlExecutionInfo(randomBoolean()), - groupIndicesByCluster, runPhase, EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES, new ActionListener<>() { @@ -207,7 +198,6 @@ public void onFailure(Exception e) { FoldContext.small(), enrichResolver, new EsqlExecutionInfo(randomBoolean()), - groupIndicesByCluster, runPhase, EsqlTestUtils.MOCK_TRANSPORT_ACTION_SERVICES, new ActionListener<>() { From 90f911c07a9c07a4897fd9bba40f0d3be2c1a17e Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 9 Oct 2025 09:47:34 +0200 Subject: [PATCH 5/5] fix some tests --- .../xpack/esql/index/IndexResolution.java | 4 ++ .../xpack/esql/session/EsqlCCSUtils.java | 26 ++++++---- .../xpack/esql/session/EsqlSession.java | 26 ++++++++-- .../xpack/esql/session/EsqlCCSUtilsTests.java | 50 ++++--------------- 4 files changed, 53 insertions(+), 53 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java index 4d31f48da77de..83f70e7e532f3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java @@ -41,6 +41,10 @@ public static IndexResolution invalid(String invalid) { return new IndexResolution(null, invalid, Set.of(), Map.of()); } + public static IndexResolution empty(String indexPattern) { + return valid(new EsIndex(indexPattern, Map.of(), Map.of())); + } + public static IndexResolution notFound(String name) { Objects.requireNonNull(name, "name must not be null"); return invalid("Unknown index [" + name + "]"); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 6f4b06e1833dc..b27a77bce0ecf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Stream; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.joining; @@ -304,19 +305,24 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { * as well as initialize corresponding cluster state in execution info. * @throws org.elasticsearch.ElasticsearchStatusException if the license is not valid (or present) for ES|QL CCS search. */ - public static void initCrossClusterState(Set resolvedIndices, EsqlExecutionInfo executionInfo, XPackLicenseState licenseState) { + public static void initCrossClusterState( + IndexResolution resolvedIndices, + EsqlExecutionInfo executionInfo, + XPackLicenseState licenseState + ) { executionInfo.clusterInfoInitializing(true); try { - resolvedIndices.stream() - .collect( - groupingBy(RemoteClusterAware::parseClusterAlias, mapping(it -> RemoteClusterAware.splitIndexName(it)[1], joining(","))) - ) - .forEach((clusterAlias, indexExpr) -> { - executionInfo.swapCluster(clusterAlias, (k, v) -> { - assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; - return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); - }); + Stream.concat( + resolvedIndices.failures().keySet().stream().map(remote -> Map.entry(remote, "")), + resolvedIndices.resolvedIndices() + .stream() + .map(index -> Map.entry(RemoteClusterAware.parseClusterAlias(index), RemoteClusterAware.splitIndexName(index)[1])) + ).collect(groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, joining(",")))).forEach((clusterAlias, indexExpr) -> { + executionInfo.swapCluster(clusterAlias, (k, v) -> { + assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; + return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); }); + }); } finally { executionInfo.clusterInfoInitializing(false); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index b9554e9cfb763..08f6ae72b7dca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.data.Block; @@ -33,6 +34,7 @@ import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.xpack.esql.VerificationException; @@ -80,12 +82,14 @@ import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -718,10 +722,26 @@ private void preAnalyzeMainIndices( preAnalysis.indexMode() == IndexMode.TIME_SERIES, preAnalysis.supportsAggregateMetricDouble(), preAnalysis.supportsDenseVector(), - listener.delegateFailureAndWrap((l, indexResolution) -> { - EsqlCCSUtils.initCrossClusterState(indexResolution.resolvedIndices(), executionInfo, verifier.licenseState()); + ActionListener.wrap(indexResolution -> { + + // TODO this could be removed when allow_empty=true is properly handled + if (Objects.equals(indexResolution, IndexResolution.notFound(preAnalysis.indexPattern().indexPattern())) + && Arrays.stream(Strings.commaDelimitedListToStringArray(preAnalysis.indexPattern().indexPattern())) + .map(RemoteClusterAware::parseClusterAlias) + .allMatch(remote -> remote.contains("*"))) { + indexResolution = IndexResolution.empty(preAnalysis.indexPattern().indexPattern()); + } + + EsqlCCSUtils.initCrossClusterState(indexResolution, executionInfo, verifier.licenseState()); EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures()); - l.onResponse(result.withIndices(indexResolution)); + listener.onResponse(result.withIndices(indexResolution)); + }, failure -> { + if (failure instanceof NoSuchRemoteClusterException + && EsqlLicenseChecker.isCcsAllowed(verifier.licenseState()) == false) { + listener.onFailure(EsqlLicenseChecker.invalidLicenseForCcsException(verifier.licenseState())); + } else { + listener.onFailure(failure); + } }) ); } else { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index 1607767c2c898..8d8cfb4632a83 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -9,13 +9,10 @@ import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.index.IndexMode; -import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.License; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.internal.XPackLicenseStatus; @@ -34,7 +31,6 @@ import org.hamcrest.Matcher; import java.util.ArrayList; -import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -42,7 +38,6 @@ import java.util.Map; import java.util.Set; import java.util.function.Predicate; -import java.util.stream.Collectors; import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.initCrossClusterState; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -704,7 +699,7 @@ private static XPackLicenseState createLicenseState(XPackLicenseStatus status) { private void assertLicenseCheckPasses(Set resolvedIndices, XPackLicenseStatus status, String... expectedRemotes) { var executionInfo = new EsqlExecutionInfo(true); - initCrossClusterState(resolvedIndices, executionInfo, createLicenseState(status)); + initCrossClusterState(createIndexResolution(resolvedIndices), executionInfo, createLicenseState(status)); assertThat(executionInfo.clusterAliases(), containsInAnyOrder(expectedRemotes)); } @@ -714,11 +709,19 @@ private void assertLicenseCheckFails(Set resolvedIndices, XPackLicenseSt equalTo( "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: " + expectedErrorMessageSuffix ), - () -> initCrossClusterState(resolvedIndices, new EsqlExecutionInfo(true), createLicenseState(licenseStatus)) + () -> initCrossClusterState( + createIndexResolution(resolvedIndices), + new EsqlExecutionInfo(true), + createLicenseState(licenseStatus) + ) ); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); } + private static IndexResolution createIndexResolution(Set resolvedIndices) { + return IndexResolution.valid(new EsIndex("", Map.of()), resolvedIndices, Map.of()); + } + private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMode) { return new XPackLicenseStatus(operationMode, true, null); } @@ -726,37 +729,4 @@ private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMo private XPackLicenseStatus inactiveLicenseStatus(License.OperationMode operationMode) { return new XPackLicenseStatus(operationMode, false, "License Expired 123"); } - - static class TestIndicesExpressionGrouper implements IndicesExpressionGrouper { - @Override - public Map groupIndices(IndicesOptions indicesOptions, String[] indexExpressions, boolean returnLocalAll) { - final Map originalIndicesMap = new HashMap<>(); - final String localKey = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - - for (String expr : indexExpressions) { - assertFalse(Strings.isNullOrBlank(expr)); - String[] split = expr.split(":", 2); - assertTrue("Bad index expression: " + expr, split.length < 3); - String clusterAlias; - String indexExpr; - if (split.length == 1) { - clusterAlias = localKey; - indexExpr = expr; - } else { - clusterAlias = split[0]; - indexExpr = split[1]; - - } - OriginalIndices currIndices = originalIndicesMap.get(clusterAlias); - if (currIndices == null) { - originalIndicesMap.put(clusterAlias, new OriginalIndices(new String[] { indexExpr }, indicesOptions)); - } else { - List indicesList = Arrays.stream(currIndices.indices()).collect(Collectors.toList()); - indicesList.add(indexExpr); - originalIndicesMap.put(clusterAlias, new OriginalIndices(indicesList.toArray(new String[0]), indicesOptions)); - } - } - return originalIndicesMap; - } - } }