From c685ec4057b6d9f64248ef5e7f1eea6dc0e66f96 Mon Sep 17 00:00:00 2001 From: Dmitry Leontyev Date: Tue, 28 Oct 2025 10:40:06 +0100 Subject: [PATCH 1/5] [TEST] Add MultiClusterTimeSeriesIT Add MultiClusterTimeSeriesIT to make sure that the TS command works correctly in the multi-cluster environment. Closes #135218 --- .../esql/ccq/MultiClusterTimeSeriesIT.java | 385 ++++++++++++++++++ 1 file changed, 385 insertions(+) create mode 100644 x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java new file mode 100644 index 0000000000000..abf7b91d9cdee --- /dev/null +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java @@ -0,0 +1,385 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.ccq; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.MapMatcher; +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.esql.AssertWarnings; +import org.elasticsearch.xpack.esql.qa.rest.ProfileLogger; +import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; + +import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class MultiClusterTimeSeriesIT extends ESRestTestCase { + static ElasticsearchCluster remoteCluster = Clusters.remoteCluster(); + static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster); + + @ClassRule + public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster); + + @Rule(order = Integer.MIN_VALUE) + public ProfileLogger profileLogger = new ProfileLogger(); + + @Override + protected String getTestRestCluster() { + return localCluster.getHttpAddresses(); + } + + record TimeSeriesDoc( + String host, + String cluster, + long timestamp, + long requestCount, + long cpu, + ByteSizeValue memory, + String clusterTag + ) {} + + final String localIndex = "hosts-local"; + List localDocs = List.of(); + + final String remoteIndex = "hosts-remote"; + List remoteDocs = List.of(); + + final String allDataIndex = "all-data"; + + private Boolean shouldCheckShardCounts = null; + + @Before + public void setUpTimeSeriesIndices() throws Exception { + // generateData(); + + localDocs = getRandomDocs("local"); + RestClient localClient = client(); + createTimeSeriesIndex(localClient, localIndex); + indexTimeSeriesDocs(localClient, localIndex, localDocs); + + remoteDocs = getRandomDocs("remote"); + try (RestClient remoteClient = remoteClusterClient()) { + createTimeSeriesIndex(remoteClient, remoteIndex); + indexTimeSeriesDocs(remoteClient, remoteIndex, remoteDocs); + } + + createTimeSeriesIndex(localClient, allDataIndex); + indexTimeSeriesDocs(localClient, allDataIndex, Stream.concat(localDocs.stream(), remoteDocs.stream()).toList()); + } + + @After + public void wipeIndices() throws Exception { + try (RestClient remoteClient = remoteClusterClient()) { + deleteIndex(remoteClient, remoteIndex); + } + } + + private List getRandomDocs(String clusterTag) { + final List docs = new ArrayList<>(); + + Map hostToClusters = new HashMap<>(); + for (int i = 0; i < 5; i++) { + hostToClusters.put(clusterTag + "0" + i, randomFrom("qa", "prod")); + } + long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z"); + + Map requestCounts = new HashMap<>(); + int numDocs = between(20, 100); + for (int i = 0; i < numDocs; i++) { + List hosts = randomSubsetOf(between(1, hostToClusters.size()), hostToClusters.keySet()); + timestamp += between(1, 30) * 1000L; + for (String host : hosts) { + var requestCount = requestCounts.compute(host, (k, curr) -> { + if (curr == null || randomInt(100) <= 20) { + return randomIntBetween(0, 10); + } else { + return curr + randomIntBetween(1, 10); + } + }); + int cpu = randomIntBetween(0, 100); + ByteSizeValue memory = ByteSizeValue.ofBytes(randomIntBetween(1024, 1024 * 1024)); + docs.add(new TimeSeriesDoc(host, hostToClusters.get(host), timestamp, requestCount, cpu, memory, clusterTag)); + } + } + + Randomness.shuffle(docs); + + return docs; + } + + public void testAvg() throws Exception { + boolean includeCCSMetadata = includeCCSMetadata(); + + Map multiClusterResult = run(""" + TS hosts-local,*:hosts-remote + | STATS avg_cpu = AVG(cpu), avg_memory = AVG(memory) BY cluster + | SORT cluster + """, includeCCSMetadata); + + Map singleClusterResult = run( + "TS all-data | STATS avg_cpu = AVG(cpu), avg_memory = AVG(memory) BY cluster | SORT cluster", + includeCCSMetadata + ); + + assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult, false); + } + + public void testRateAndTBucket() throws Exception { + boolean includeCCSMetadata = includeCCSMetadata(); + + Map multiClusterResult = run(""" + TS hosts-local,*:hosts-remote + | WHERE cluster == "prod" + | STATS max_rate = MAX(RATE(request_count)) BY tb = TBUCKET(5minute) + | SORT tb""", includeCCSMetadata); + + Map singleClusterResult = run(""" + TS all-data + | WHERE cluster == "prod" + | STATS max_rate = MAX(RATE(request_count)) BY tb = TBUCKET(5minute) + | SORT tb""", includeCCSMetadata); + + assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult, false); + } + + public void testAvgOverTime() throws Exception { + boolean includeCCSMetadata = includeCCSMetadata(); + + Map multiClusterResult = run(""" + TS hosts-local,*:hosts-remote + | STATS avg_cpu = SUM(AVG_OVER_TIME(cpu)), max_memory = SUM(MAX_OVER_TIME(memory)) BY tb = TBUCKET(10minutes) + | SORT tb""", includeCCSMetadata); + + Map singleClusterResult = run(""" + TS all-data + | STATS avg_cpu = SUM(AVG_OVER_TIME(cpu)), max_memory = SUM(MAX_OVER_TIME(memory)) BY tb = TBUCKET(10minutes) + | SORT tb""", includeCCSMetadata); + + assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult, false); + } + + private void createTimeSeriesIndex(RestClient client, String indexName) throws IOException { + Request createIndex = new Request("PUT", "/" + indexName); + + String settings = Settings.builder() + .put("index.mode", "time_series") + .putList("index.routing_path", List.of("host", "cluster")) + .put("index.number_of_shards", randomIntBetween(1, 5)) + .put("index.time_series.start_time", "2024-04-14T00:00:00Z") + .put("index.time_series.end_time", "2024-04-16T00:00:00Z") + .build() + .toString(); + + final String mapping = """ + "properties": { + "@timestamp": { "type": "date" }, + "host": { "type": "keyword", "time_series_dimension": true }, + "cluster": { "type": "keyword", "time_series_dimension": true }, + "cpu": { "type": "long", "time_series_metric": "gauge" }, + "memory": { "type": "long", "time_series_metric": "gauge" }, + "request_count": { "type": "long", "time_series_metric": "counter" }, + "cluster_tag": { "type": "keyword" } + } + """; + + createIndex.setJsonEntity(Strings.format(""" + { + "settings": %s, + "mappings": { + %s + } + } + """, settings, mapping)); + assertOK(client.performRequest(createIndex)); + } + + private void indexTimeSeriesDocs(RestClient client, String index, List docs) throws IOException { + logger.info("--> indexing {} time series docs to index {}", docs.size(), index); + for (TimeSeriesDoc doc : docs) { + Request createDoc = new Request("POST", "/" + index + "/_doc"); + createDoc.addParameter("refresh", "true"); + createDoc.setJsonEntity(Strings.format(""" + { + "@timestamp": %d, + "host": "%s", + "cluster": "%s", + "cpu": %d, + "memory": %d, + "request_count": %d, + "cluster_tag": "%s" + } + """, doc.timestamp, doc.host, doc.cluster, doc.cpu, doc.memory.getBytes(), doc.requestCount, doc.clusterTag)); + assertOK(client.performRequest(createDoc)); + } + refresh(client, index); + } + + private Map run(String query, boolean includeCCSMetadata) throws IOException { + var queryBuilder = new RestEsqlTestCase.RequestObjectBuilder().query(query).profile(true); + if (includeCCSMetadata) { + queryBuilder.includeCCSMetadata(true); + } + Map resp = runEsql(queryBuilder.build()); + logger.info("--> query {} response {}", queryBuilder, resp); + return resp; + } + + protected boolean supportsAsync() { + return false; + } + + private Map runEsql(RestEsqlTestCase.RequestObjectBuilder requestObject) throws IOException { + if (supportsAsync()) { + return RestEsqlTestCase.runEsqlAsync(requestObject, new AssertWarnings.NoWarnings(), profileLogger); + } else { + return RestEsqlTestCase.runEsqlSync(requestObject, new AssertWarnings.NoWarnings(), profileLogger); + } + } + + private boolean checkShardCounts() { + if (shouldCheckShardCounts == null) { + try { + shouldCheckShardCounts = capabilitiesSupportedNewAndOld(List.of("correct_skipped_shard_count")); + } catch (IOException e) { + shouldCheckShardCounts = false; + } + } + return shouldCheckShardCounts; + } + + private boolean capabilitiesSupportedNewAndOld(List requiredCapabilities) throws IOException { + boolean isSupported = clusterHasCapability("POST", "/_query", List.of(), requiredCapabilities).orElse(false); + try (RestClient remoteClient = remoteClusterClient()) { + isSupported = isSupported + && clusterHasCapability(remoteClient, "POST", "/_query", List.of(), requiredCapabilities).orElse(false); + } + return isSupported; + } + + private void assertResultMap( + boolean includeCCSMetadata, + Map result, + Map expectedResult, + boolean remoteOnly + ) { + MapMatcher mapMatcher = getResultMatcher(result.containsKey("is_partial"), result.containsKey("documents_found")).extraOk(); + if (includeCCSMetadata) { + mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); + } + + assertMap(result, mapMatcher.entry("columns", expectedResult.get("columns")).entry("values", expectedResult.get("values"))); + + if (includeCCSMetadata) { + assertClusterDetailsMap(result, remoteOnly); + } + } + + private void assertClusterDetailsMap(Map result, boolean remoteOnly) { + @SuppressWarnings("unchecked") + Map clusters = (Map) result.get("_clusters"); + assertThat(clusters.size(), equalTo(7)); + assertThat(clusters.keySet(), equalTo(Set.of("total", "successful", "running", "skipped", "partial", "failed", "details"))); + int expectedNumClusters = remoteOnly ? 1 : 2; + Set expectedClusterAliases = remoteOnly ? Set.of("remote_cluster") : Set.of("remote_cluster", "(local)"); + + assertThat(clusters.get("total"), equalTo(expectedNumClusters)); + assertThat(clusters.get("successful"), equalTo(expectedNumClusters)); + assertThat(clusters.get("running"), equalTo(0)); + assertThat(clusters.get("skipped"), equalTo(0)); + assertThat(clusters.get("partial"), equalTo(0)); + assertThat(clusters.get("failed"), equalTo(0)); + + @SuppressWarnings("unchecked") + Map details = (Map) clusters.get("details"); + assertThat(details.keySet(), equalTo(expectedClusterAliases)); + + @SuppressWarnings("unchecked") + Map remoteCluster = (Map) details.get("remote_cluster"); + assertThat(remoteCluster.keySet(), equalTo(Set.of("status", "indices", "took", "_shards"))); + assertThat(remoteCluster.get("status"), equalTo("successful")); + assertThat(remoteCluster.get("indices"), equalTo("hosts-remote")); + assertThat((Integer) remoteCluster.get("took"), greaterThanOrEqualTo(0)); + + @SuppressWarnings("unchecked") + Map remoteClusterShards = (Map) remoteCluster.get("_shards"); + assertThat( + remoteClusterShards, + matchesMap().entry("total", greaterThanOrEqualTo(0)) + .entry("successful", greaterThanOrEqualTo(0)) + .entry("skipped", greaterThanOrEqualTo(0)) + .entry("failed", 0) + ); + if (checkShardCounts()) { + assertThat( + (int) remoteClusterShards.get("successful") + (int) remoteClusterShards.get("skipped"), + equalTo(remoteClusterShards.get("total")) + ); + } + if (remoteOnly == false) { + @SuppressWarnings("unchecked") + Map localCluster = (Map) details.get("(local)"); + assertThat(localCluster.keySet(), equalTo(Set.of("status", "indices", "took", "_shards"))); + assertThat(localCluster.get("status"), equalTo("successful")); + assertThat(localCluster.get("indices"), equalTo("hosts-local")); + assertThat((Integer) localCluster.get("took"), greaterThanOrEqualTo(0)); + + @SuppressWarnings("unchecked") + Map localClusterShards = (Map) localCluster.get("_shards"); + assertThat( + localClusterShards, + matchesMap().entry("total", greaterThanOrEqualTo(0)) + .entry("successful", greaterThanOrEqualTo(0)) + .entry("skipped", greaterThanOrEqualTo(0)) + .entry("failed", 0) + ); + if (checkShardCounts()) { + assertThat( + (int) localClusterShards.get("successful") + (int) localClusterShards.get("skipped"), + equalTo(localClusterShards.get("total")) + ); + } + } + } + + private RestClient remoteClusterClient() throws IOException { + var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses()); + return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0])); + } + + private static boolean includeCCSMetadata() { + return randomBoolean(); + } +} From 4da9e25208064aae6081f11dce2c23f38e7573c3 Mon Sep 17 00:00:00 2001 From: Dmitry Leontyev Date: Tue, 28 Oct 2025 12:55:16 +0100 Subject: [PATCH 2/5] [TEST] Add MultiClusterTimeSeriesIT Add MultiClusterTimeSeriesIT to make sure that the TS command works correctly in the multi-cluster environment. Closes #135218 --- .../esql/ccq/MultiClusterTimeSeriesIT.java | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java index abf7b91d9cdee..7600405c96f82 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java @@ -47,6 +47,9 @@ @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class MultiClusterTimeSeriesIT extends ESRestTestCase { + + static final List REQUIRED_CAPABILITIES = List.of("ts_command_v0"); + static ElasticsearchCluster remoteCluster = Clusters.remoteCluster(); static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster); @@ -83,8 +86,6 @@ record TimeSeriesDoc( @Before public void setUpTimeSeriesIndices() throws Exception { - // generateData(); - localDocs = getRandomDocs("local"); RestClient localClient = client(); createTimeSeriesIndex(localClient, localIndex); @@ -141,6 +142,8 @@ private List getRandomDocs(String clusterTag) { } public void testAvg() throws Exception { + assumeTrue("TS command not supported", capabilitiesSupportedNewAndOld(REQUIRED_CAPABILITIES)); + boolean includeCCSMetadata = includeCCSMetadata(); Map multiClusterResult = run(""" @@ -158,6 +161,8 @@ public void testAvg() throws Exception { } public void testRateAndTBucket() throws Exception { + assumeTrue("TS command not supported", capabilitiesSupportedNewAndOld(REQUIRED_CAPABILITIES)); + boolean includeCCSMetadata = includeCCSMetadata(); Map multiClusterResult = run(""" @@ -176,6 +181,8 @@ public void testRateAndTBucket() throws Exception { } public void testAvgOverTime() throws Exception { + assumeTrue("TS command not supported", capabilitiesSupportedNewAndOld(REQUIRED_CAPABILITIES)); + boolean includeCCSMetadata = includeCCSMetadata(); Map multiClusterResult = run(""" @@ -191,6 +198,24 @@ public void testAvgOverTime() throws Exception { assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult, false); } + public void testIRate() throws Exception { + assumeTrue("TS command not supported", capabilitiesSupportedNewAndOld(REQUIRED_CAPABILITIES)); + + boolean includeCCSMetadata = includeCCSMetadata(); + + Map multiClusterResult = run(""" + TS hosts-local,*:hosts-remote + | STATS irate_req_count = AVG(IRATE(request_count)) BY tb = TBUCKET(1minute) + | SORT tb""", includeCCSMetadata); + + Map singleClusterResult = run(""" + TS all-data + | STATS irate_req_count = AVG(IRATE(request_count)) BY tb = TBUCKET(1minute) + | SORT tb""", includeCCSMetadata); + + assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult, false); + } + private void createTimeSeriesIndex(RestClient client, String indexName) throws IOException { Request createIndex = new Request("PUT", "/" + indexName); From 34f69ca9a7fb14028a98b9c7589010a09f678d91 Mon Sep 17 00:00:00 2001 From: Dmitry Leontyev Date: Tue, 28 Oct 2025 15:47:25 +0100 Subject: [PATCH 3/5] [TEST] Add MultiClusterTimeSeriesIT Add MultiClusterTimeSeriesIT to make sure that the TS command works correctly in the multi-cluster environment. Closes #135218 --- .../esql/ccq/MultiClusterTimeSeriesIT.java | 43 ++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java index 7600405c96f82..7fc18c13bdb37 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.test.ListMatcher; import org.elasticsearch.test.MapMatcher; import org.elasticsearch.test.TestClustersThreadFilter; import org.elasticsearch.test.cluster.ElasticsearchCluster; @@ -23,6 +24,8 @@ import org.elasticsearch.xpack.esql.AssertWarnings; import org.elasticsearch.xpack.esql.qa.rest.ProfileLogger; import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -44,6 +47,7 @@ import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class MultiClusterTimeSeriesIT extends ESRestTestCase { @@ -325,13 +329,50 @@ private void assertResultMap( mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); } - assertMap(result, mapMatcher.entry("columns", expectedResult.get("columns")).entry("values", expectedResult.get("values"))); + assertMap( + result, + mapMatcher.entry("columns", expectedResult.get("columns")).entry("values", matcherFor(expectedResult.get("values"))) + ); if (includeCCSMetadata) { assertClusterDetailsMap(result, remoteOnly); } } + /** + * Converts an unknown {@link Object} to an equality {@link Matcher} + * for the public API methods that take {@linkplain Object}. + *
+ * This is a copy of org.elasticsearch.test.MapMatcher#matcherFor(java.lang.Object) to add support for Double values comparison + * with a given error. + */ + private static Matcher matcherFor(Object value) { + if (value == null) { + return nullValue(); + } + if (value instanceof List) { + return matchesList((List) value); + } + if (value instanceof Map) { + return matchesMap((Map) value); + } + if (value instanceof Matcher) { + return (Matcher) value; + } + if (value instanceof Double dvalue) { + return Matchers.closeTo(dvalue, 0.0000001); + } + return equalTo(value); + } + + public static ListMatcher matchesList(List list) { + ListMatcher matcher = ListMatcher.matchesList(); + for (Object item : list) { + matcher = matcher.item(matcherFor(matcherFor(item))); + } + return matcher; + } + private void assertClusterDetailsMap(Map result, boolean remoteOnly) { @SuppressWarnings("unchecked") Map clusters = (Map) result.get("_clusters"); From a94291a1da466ce3b91e1b2719437d1e6a1d1eda Mon Sep 17 00:00:00 2001 From: Dmitry Leontyev Date: Wed, 29 Oct 2025 10:36:05 +0100 Subject: [PATCH 4/5] [TEST] Add MultiClusterTimeSeriesIT Test async ESQL calls. Closes #135218 --- .../esql/ccq/MultiClusterTimeSeriesIT.java | 51 +++++++------------ 1 file changed, 19 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java index 7fc18c13bdb37..b3d275fd43d12 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java @@ -161,7 +161,7 @@ public void testAvg() throws Exception { includeCCSMetadata ); - assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult, false); + assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult); } public void testRateAndTBucket() throws Exception { @@ -181,7 +181,7 @@ public void testRateAndTBucket() throws Exception { | STATS max_rate = MAX(RATE(request_count)) BY tb = TBUCKET(5minute) | SORT tb""", includeCCSMetadata); - assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult, false); + assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult); } public void testAvgOverTime() throws Exception { @@ -199,7 +199,7 @@ public void testAvgOverTime() throws Exception { | STATS avg_cpu = SUM(AVG_OVER_TIME(cpu)), max_memory = SUM(MAX_OVER_TIME(memory)) BY tb = TBUCKET(10minutes) | SORT tb""", includeCCSMetadata); - assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult, false); + assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult); } public void testIRate() throws Exception { @@ -217,7 +217,7 @@ public void testIRate() throws Exception { | STATS irate_req_count = AVG(IRATE(request_count)) BY tb = TBUCKET(1minute) | SORT tb""", includeCCSMetadata); - assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult, false); + assertResultMap(includeCCSMetadata, multiClusterResult, singleClusterResult); } private void createTimeSeriesIndex(RestClient client, String indexName) throws IOException { @@ -287,7 +287,7 @@ private Map run(String query, boolean includeCCSMetadata) throws } protected boolean supportsAsync() { - return false; + return randomBoolean(); } private Map runEsql(RestEsqlTestCase.RequestObjectBuilder requestObject) throws IOException { @@ -318,12 +318,7 @@ private boolean capabilitiesSupportedNewAndOld(List requiredCapabilities return isSupported; } - private void assertResultMap( - boolean includeCCSMetadata, - Map result, - Map expectedResult, - boolean remoteOnly - ) { + private void assertResultMap(boolean includeCCSMetadata, Map result, Map expectedResult) { MapMatcher mapMatcher = getResultMatcher(result.containsKey("is_partial"), result.containsKey("documents_found")).extraOk(); if (includeCCSMetadata) { mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); @@ -335,7 +330,7 @@ private void assertResultMap( ); if (includeCCSMetadata) { - assertClusterDetailsMap(result, remoteOnly); + assertClusterDetailsMap(result); } } @@ -347,22 +342,14 @@ private void assertResultMap( * with a given error. */ private static Matcher matcherFor(Object value) { - if (value == null) { - return nullValue(); - } - if (value instanceof List) { - return matchesList((List) value); - } - if (value instanceof Map) { - return matchesMap((Map) value); - } - if (value instanceof Matcher) { - return (Matcher) value; - } - if (value instanceof Double dvalue) { - return Matchers.closeTo(dvalue, 0.0000001); - } - return equalTo(value); + return switch (value) { + case null -> nullValue(); + case List list -> matchesList(list); + case Map map -> matchesMap(map); + case Matcher matcher -> matcher; + case Double doubleValue -> Matchers.closeTo(doubleValue, 0.0000001); + default -> equalTo(value); + }; } public static ListMatcher matchesList(List list) { @@ -373,13 +360,13 @@ public static ListMatcher matchesList(List list) { return matcher; } - private void assertClusterDetailsMap(Map result, boolean remoteOnly) { + private void assertClusterDetailsMap(Map result) { @SuppressWarnings("unchecked") Map clusters = (Map) result.get("_clusters"); assertThat(clusters.size(), equalTo(7)); assertThat(clusters.keySet(), equalTo(Set.of("total", "successful", "running", "skipped", "partial", "failed", "details"))); - int expectedNumClusters = remoteOnly ? 1 : 2; - Set expectedClusterAliases = remoteOnly ? Set.of("remote_cluster") : Set.of("remote_cluster", "(local)"); + int expectedNumClusters = 2; + Set expectedClusterAliases = Set.of("remote_cluster", "(local)"); assertThat(clusters.get("total"), equalTo(expectedNumClusters)); assertThat(clusters.get("successful"), equalTo(expectedNumClusters)); @@ -414,7 +401,7 @@ private void assertClusterDetailsMap(Map result, boolean remoteO equalTo(remoteClusterShards.get("total")) ); } - if (remoteOnly == false) { + if (false == false) { @SuppressWarnings("unchecked") Map localCluster = (Map) details.get("(local)"); assertThat(localCluster.keySet(), equalTo(Set.of("status", "indices", "took", "_shards"))); From 58e9142ff98e5d41a8c0bd9057e14ba87fac96bf Mon Sep 17 00:00:00 2001 From: Dmitry Leontyev Date: Wed, 29 Oct 2025 11:01:23 +0100 Subject: [PATCH 5/5] [TEST] Add MultiClusterTimeSeriesIT Test async ESQL calls. Closes #135218 --- .../xpack/esql/ccq/MultiClusterTimeSeriesIT.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java index b3d275fd43d12..9c540bb88445a 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java @@ -286,16 +286,8 @@ private Map run(String query, boolean includeCCSMetadata) throws return resp; } - protected boolean supportsAsync() { - return randomBoolean(); - } - private Map runEsql(RestEsqlTestCase.RequestObjectBuilder requestObject) throws IOException { - if (supportsAsync()) { - return RestEsqlTestCase.runEsqlAsync(requestObject, new AssertWarnings.NoWarnings(), profileLogger); - } else { - return RestEsqlTestCase.runEsqlSync(requestObject, new AssertWarnings.NoWarnings(), profileLogger); - } + return RestEsqlTestCase.runEsqlSync(requestObject, new AssertWarnings.NoWarnings(), profileLogger); } private boolean checkShardCounts() {