From da8b647d9fff68fd4253e46ec1dff4e09d98fb7f Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 3 Feb 2025 10:24:54 -0700 Subject: [PATCH 1/2] Refactor CCS tests Clean up duplication, move stop tests to separate test --- muted-tests.yml | 9 - .../action/AbstractCrossClusterTestCase.java | 266 +++++++++++ .../esql/action/CrossClusterAsyncQueryIT.java | 415 +----------------- .../action/CrossClusterAsyncQueryStopIT.java | 239 ++++++++++ ...sQueryIT.java => CrossClusterQueryIT.java} | 169 +------ ...CrossClusterQueryUnavailableRemotesIT.java | 166 +------ .../action/CrossClustersUsageTelemetryIT.java | 2 +- ...rossClustersUsageTelemetryNoLicenseIT.java | 2 +- 8 files changed, 533 insertions(+), 735 deletions(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java rename x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/{CrossClustersQueryIT.java => CrossClusterQueryIT.java} (88%) diff --git a/muted-tests.yml b/muted-tests.yml index ccea465bc68f2..ddab67b5005b2 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -411,15 +411,6 @@ tests: - class: org.elasticsearch.xpack.transform.checkpoint.TransformCCSCanMatchIT method: testTransformLifecycle_RangeQueryThatMatchesNoShards issue: https://github.com/elastic/elasticsearch/issues/121480 -- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryIT - method: testStopQueryLocal - issue: https://github.com/elastic/elasticsearch/issues/121487 -- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryIT - method: testSuccessfulPathways - issue: https://github.com/elastic/elasticsearch/issues/121488 -- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryIT - method: testAsyncQueriesWithLimit0 - issue: https://github.com/elastic/elasticsearch/issues/121489 - class: org.elasticsearch.xpack.security.profile.ProfileIntegTests method: testSuggestProfilesWithHint issue: https://github.com/elastic/elasticsearch/issues/121116 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java new file mode 100644 index 0000000000000..e6927439c7e71 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java @@ -0,0 +1,266 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.XContentTestUtils; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class AbstractCrossClusterTestCase extends AbstractMultiClustersTestCase { + protected static final String REMOTE_CLUSTER_1 = "cluster-a"; + protected static final String REMOTE_CLUSTER_2 = "remote-b"; + protected static final String LOCAL_INDEX = "logs-1"; + protected static final String REMOTE_INDEX = "logs-2"; + protected static final String INDEX_WITH_BLOCKING_MAPPING = "blocking"; + protected static final String INDEX_WITH_FAIL_MAPPING = "failing"; + + @Override + protected List remoteClusterAlias() { + return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of(REMOTE_CLUSTER_1, false, REMOTE_CLUSTER_2, randomBoolean()); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); + plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action + plugins.add(CrossClusterAsyncQueryIT.InternalExchangePlugin.class); + plugins.add(SimplePauseFieldPlugin.class); + plugins.add(FailingPauseFieldPlugin.class); + plugins.add(CrossClusterAsyncQueryIT.CountingPauseFieldPlugin.class); + return plugins; + } + + public static class InternalExchangePlugin extends Plugin { + @Override + public List> getSettings() { + return List.of( + Setting.timeSetting( + ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, + TimeValue.timeValueSeconds(30), + Setting.Property.NodeScope + ) + ); + } + } + + public static class CountingPauseFieldPlugin extends SimplePauseFieldPlugin { + public static AtomicLong count = new AtomicLong(0); + + protected String scriptTypeName() { + return "pause_count"; + } + + public static void resetPlugin() { + count.set(0); + } + + @Override + public boolean onWait() throws InterruptedException { + count.incrementAndGet(); + return allowEmitting.await(30, TimeUnit.SECONDS); + } + } + + @Before + public void resetPlugin() { + SimplePauseFieldPlugin.resetPlugin(); + FailingPauseFieldPlugin.resetPlugin(); + CrossClusterAsyncQueryIT.CountingPauseFieldPlugin.resetPlugin(); + } + + protected void assertClusterInfoSuccess(EsqlExecutionInfo.Cluster cluster, int numShards) { + assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(cluster.getTotalShards(), equalTo(numShards)); + assertThat(cluster.getSuccessfulShards(), equalTo(numShards)); + assertThat(cluster.getSkippedShards(), equalTo(0)); + assertThat(cluster.getFailedShards(), equalTo(0)); + assertThat(cluster.getFailures().size(), equalTo(0)); + } + + protected static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta, int numClusters) { + try { + final Map esqlResponseAsMap = XContentTestUtils.convertToMap(resp); + final Object clusters = esqlResponseAsMap.get("_clusters"); + if (responseExpectMeta) { + assertNotNull(clusters); + // test a few entries to ensure it looks correct (other tests do a full analysis of the metadata in the response) + @SuppressWarnings("unchecked") + Map inner = (Map) clusters; + assertTrue(inner.containsKey("total")); + assertThat((int) inner.get("total"), equalTo(numClusters)); + assertTrue(inner.containsKey("details")); + } else { + assertNull(clusters); + } + } catch (IOException e) { + fail("Could not convert ESQLQueryResponse to Map: " + e); + } + } + + protected Map setupClusters(int numClusters) throws IOException { + assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters; + int numShardsLocal = randomIntBetween(1, 5); + populateLocalIndices(LOCAL_INDEX, numShardsLocal); + + int numShardsRemote = randomIntBetween(1, 5); + populateRemoteIndices(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); + + Map clusterInfo = new HashMap<>(); + clusterInfo.put("local.num_shards", numShardsLocal); + clusterInfo.put("local.index", LOCAL_INDEX); + clusterInfo.put("remote1.num_shards", numShardsRemote); + clusterInfo.put("remote1.index", REMOTE_INDEX); + clusterInfo.put("remote.num_shards", numShardsRemote); + clusterInfo.put("remote.index", REMOTE_INDEX); + + if (numClusters == 3) { + int numShardsRemote2 = randomIntBetween(1, 5); + populateRemoteIndices(REMOTE_CLUSTER_2, REMOTE_INDEX, numShardsRemote2); + clusterInfo.put("remote2.index", REMOTE_INDEX); + clusterInfo.put("remote2.num_shards", numShardsRemote2); + } + + String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER_1); + Setting skipUnavailableSetting = cluster(REMOTE_CLUSTER_1).clusterService().getClusterSettings().get(skipUnavailableKey); + boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService() + .getClusterSettings() + .get(skipUnavailableSetting); + clusterInfo.put("remote.skip_unavailable", skipUnavailable); + + return clusterInfo; + } + + protected void populateLocalIndices(String indexName, int numShards) { + Client localClient = client(LOCAL_CLUSTER); + assertAcked( + localClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long") + ); + for (int i = 0; i < 10; i++) { + localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get(); + } + localClient.admin().indices().prepareRefresh(indexName).get(); + } + + protected void populateRuntimeIndex(String clusterAlias, String langName, String indexName) throws IOException { + populateRuntimeIndex(clusterAlias, langName, indexName, 10); + } + + protected void populateRuntimeIndex(String clusterAlias, String langName, String indexName, int count) throws IOException { + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("const"); + { + mapping.field("type", "long"); + mapping.startObject("script").field("source", "").field("lang", langName).endObject(); + } + mapping.endObject(); + } + mapping.endObject(); + mapping.endObject(); + client(clusterAlias).admin().indices().prepareCreate(indexName).setMapping(mapping).get(); + BulkRequestBuilder bulk = client(clusterAlias).prepareBulk(indexName).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < count; i++) { + bulk.add(new IndexRequest().source("foo", i)); + } + bulk.get(); + } + + protected void populateRemoteIndices(String clusterAlias, String indexName, int numShards) throws IOException { + Client remoteClient = client(clusterAlias); + assertAcked( + remoteClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") + ); + for (int i = 0; i < 10; i++) { + remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get(); + } + remoteClient.admin().indices().prepareRefresh(indexName).get(); + } + + protected void setSkipUnavailable(String clusterAlias, boolean skip) { + client(LOCAL_CLUSTER).admin() + .cluster() + .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings(Settings.builder().put("cluster.remote." + clusterAlias + ".skip_unavailable", skip).build()) + .get(); + } + + protected void clearSkipUnavailable(int numClusters) { + assert numClusters == 2 || numClusters == 3 : "Only 2 or 3 clusters supported"; + Settings.Builder settingsBuilder = Settings.builder().putNull("cluster.remote." + REMOTE_CLUSTER_1 + ".skip_unavailable"); + if (numClusters == 3) { + settingsBuilder.putNull("cluster.remote." + REMOTE_CLUSTER_2 + ".skip_unavailable"); + } + client(LOCAL_CLUSTER).admin() + .cluster() + .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings(settingsBuilder.build()) + .get(); + } + + protected void clearSkipUnavailable() { + clearSkipUnavailable(3); + } + + protected EsqlQueryResponse runQuery(EsqlQueryRequest request) { + return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); + } + + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + request.profile(randomInt(5) == 2); + request.columnar(randomBoolean()); + if (ccsMetadataInResponse != null) { + request.includeCCSMetadata(ccsMetadataInResponse); + } + return runQuery(request); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java index 6be1518f65e63..d5c53e94f8687 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java @@ -7,43 +7,19 @@ package org.elasticsearch.xpack.esql.action; -import org.elasticsearch.Build; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.compute.operator.DriverTaskRunner; -import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.tasks.TaskInfo; -import org.elasticsearch.test.AbstractMultiClustersTestCase; -import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.core.async.AsyncStopRequest; -import org.junit.Before; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -52,66 +28,14 @@ import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.getAsyncResponse; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.runAsyncQuery; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery; -import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQueryWithPragmas; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; -public class CrossClusterAsyncQueryIT extends AbstractMultiClustersTestCase { - - private static final String REMOTE_CLUSTER_1 = "cluster-a"; - private static final String REMOTE_CLUSTER_2 = "remote-b"; - private static String LOCAL_INDEX = "logs-1"; - private static String REMOTE_INDEX = "logs-2"; - private static final String INDEX_WITH_RUNTIME_MAPPING = "blocking"; - private static final String INDEX_WITH_FAIL_MAPPING = "failing"; - - @Override - protected List remoteClusterAlias() { - return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2); - } - - @Override - protected Map skipUnavailableForRemoteClusters() { - return Map.of(REMOTE_CLUSTER_1, false, REMOTE_CLUSTER_2, randomBoolean()); - } - - @Override - protected Collection> nodePlugins(String clusterAlias) { - List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); - plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); - plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action - plugins.add(InternalExchangePlugin.class); - plugins.add(SimplePauseFieldPlugin.class); - plugins.add(FailingPauseFieldPlugin.class); - plugins.add(CountingPauseFieldPlugin.class); - return plugins; - } - - public static class InternalExchangePlugin extends Plugin { - @Override - public List> getSettings() { - return List.of( - Setting.timeSetting( - ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, - TimeValue.timeValueSeconds(30), - Setting.Property.NodeScope - ) - ); - } - } - - @Before - public void resetPlugin() { - SimplePauseFieldPlugin.resetPlugin(); - FailingPauseFieldPlugin.resetPlugin(); - CountingPauseFieldPlugin.resetPlugin(); - } - +public class CrossClusterAsyncQueryIT extends AbstractCrossClusterTestCase { /** * Includes testing for CCS metadata in the GET /_query/async/:id response while the search is still running */ @@ -119,7 +43,7 @@ public void testSuccessfulPathways() throws Exception { Map testClusterInfo = setupClusters(3); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); int remote1NumShards = (Integer) testClusterInfo.get("remote1.num_shards"); - populateRuntimeIndex(REMOTE_CLUSTER_2, "pause", INDEX_WITH_RUNTIME_MAPPING); + populateRuntimeIndex(REMOTE_CLUSTER_2, "pause", INDEX_WITH_BLOCKING_MAPPING); Tuple includeCCSMetadata = randomIncludeCCSMetadata(); boolean responseExpectMeta = includeCCSMetadata.v2(); @@ -265,203 +189,6 @@ public void testAsyncQueriesWithLimit0() throws IOException { } } - public void testStopQuery() throws Exception { - assumeTrue("Pragme does not work in release builds", Build.current().isSnapshot()); - Map testClusterInfo = setupClusters(3); - int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); - int remote1NumShards = (Integer) testClusterInfo.get("remote1.num_shards"); - // Create large index so we could be sure we're stopping before the end - populateRuntimeIndex(REMOTE_CLUSTER_2, "pause_count", INDEX_WITH_RUNTIME_MAPPING); - - Tuple includeCCSMetadata = randomIncludeCCSMetadata(); - boolean responseExpectMeta = includeCCSMetadata.v2(); - - final String asyncExecutionId = startAsyncQueryWithPragmas( - client(), - "FROM logs-*,cluster-a:logs-*,remote-b:blocking | STATS total=sum(coalesce(const,v)) | LIMIT 1", - includeCCSMetadata.v1(), - Map.of("page_size", 1, "data_partitioning", "shard", "task_concurrency", 1) - ); - - // wait until we know that the query against 'remote-b:blocking' has started - CountingPauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); - - // wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it) - waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId); - waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId); - - /* at this point: - * the query against cluster-a should be finished - * the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown) - * the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b - */ - - // run the stop query - AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId); - ActionFuture stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); - assertBusy(() -> { - List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); - List reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList(); - assertThat(reduceTasks, empty()); - }); - // allow remoteB query to proceed - CountingPauseFieldPlugin.allowEmitting.countDown(); - - // Since part of the query has not been stopped, we expect some result to emerge here - try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { - // Check that we did not process all the fields on remote-b - // Should not be getting more than one page here, and we set page size to 1 - assertThat(CountingPauseFieldPlugin.count.get(), lessThanOrEqualTo(1L)); - assertThat(asyncResponse.isRunning(), is(false)); - assertThat(asyncResponse.columns().size(), equalTo(1)); - assertThat(asyncResponse.values().hasNext(), is(true)); - Iterator row = asyncResponse.values().next(); - // sum of 0-9 is 45, and sum of 0-9 squared is 285 - assertThat(row.next(), equalTo(330L)); - - EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); - assertNotNull(executionInfo); - assertThat(executionInfo.isCrossClusterSearch(), is(true)); - long overallTookMillis = executionInfo.overallTook().millis(); - assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); - assertThat(executionInfo.isPartial(), equalTo(true)); - - EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); - assertClusterInfoSuccess(remoteCluster, remote1NumShards); - - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); - assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking")); - assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); - - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); - assertClusterInfoSuccess(localCluster, localNumShards); - - assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); - } finally { - assertAcked(deleteAsyncId(client(), asyncExecutionId)); - } - } - - public void testStopQueryLocal() throws Exception { - Map testClusterInfo = setupClusters(3); - int remote1NumShards = (Integer) testClusterInfo.get("remote1.num_shards"); - int remote2NumShards = (Integer) testClusterInfo.get("remote2.num_shards"); - populateRuntimeIndex(LOCAL_CLUSTER, "pause", INDEX_WITH_RUNTIME_MAPPING); - - Tuple includeCCSMetadata = randomIncludeCCSMetadata(); - boolean responseExpectMeta = includeCCSMetadata.v2(); - - final String asyncExecutionId = startAsyncQuery( - client(), - "FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1", - includeCCSMetadata.v1() - ); - - // wait until we know that the query against 'remote-b:blocking' has started - SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); - - // wait until the remotes are done - waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId); - waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId); - - /* at this point: - * the query against remotes should be finished - * the query against the local cluster should be running because it's blocked - */ - - // run the stop query - AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId); - ActionFuture stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); - // ensure stop operation is running - assertBusy(() -> { - try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) { - EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); - assertNotNull(executionInfo); - assertThat(executionInfo.isPartial(), is(true)); - } - }); - // allow local query to proceed - SimplePauseFieldPlugin.allowEmitting.countDown(); - - // Since part of the query has not been stopped, we expect some result to emerge here - try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { - assertThat(asyncResponse.isRunning(), is(false)); - assertThat(asyncResponse.columns().size(), equalTo(1)); - assertThat(asyncResponse.values().hasNext(), is(true)); - Iterator row = asyncResponse.values().next(); - // sum of 0-9 squared is 285, from two remotes it's 570 - assertThat(row.next(), equalTo(570L)); - - EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); - assertNotNull(executionInfo); - assertThat(executionInfo.isCrossClusterSearch(), is(true)); - long overallTookMillis = executionInfo.overallTook().millis(); - assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); - assertThat(executionInfo.isPartial(), equalTo(true)); - - EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); - assertClusterInfoSuccess(remoteCluster, remote1NumShards); - - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); - assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*")); - assertClusterInfoSuccess(remote2Cluster, remote2NumShards); - - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertThat(localCluster.getIndexExpression(), equalTo("blocking")); - assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); - - assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); - } finally { - assertAcked(deleteAsyncId(client(), asyncExecutionId)); - } - } - - public void testStopQueryLocalNoRemotes() throws Exception { - setupClusters(3); - populateRuntimeIndex(LOCAL_CLUSTER, "pause", INDEX_WITH_RUNTIME_MAPPING); - - Tuple includeCCSMetadata = randomIncludeCCSMetadata(); - boolean responseExpectMeta = includeCCSMetadata.v2(); - - final String asyncExecutionId = startAsyncQuery( - client(), - "FROM blocking | STATS total=count(const) | LIMIT 1", - includeCCSMetadata.v1() - ); - - // wait until we know that the query against 'remote-b:blocking' has started - SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); - - /* at this point: - * the query against the local cluster should be running because it's blocked - */ - - // run the stop query - var stopRequest = new AsyncStopRequest(asyncExecutionId); - var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); - // allow local query to proceed - SimplePauseFieldPlugin.allowEmitting.countDown(); - - try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { - assertThat(asyncResponse.isRunning(), is(false)); - assertThat(asyncResponse.columns().size(), equalTo(1)); - assertThat(asyncResponse.values().hasNext(), is(true)); - Iterator row = asyncResponse.values().next(); - assertThat((long) row.next(), greaterThanOrEqualTo(0L)); - - EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); - assertNotNull(executionInfo); - assertThat(executionInfo.isCrossClusterSearch(), is(false)); - } finally { - assertAcked(deleteAsyncId(client(), asyncExecutionId)); - } - } - public void testAsyncFailure() throws Exception { Map testClusterInfo = setupClusters(2); populateRuntimeIndex(REMOTE_CLUSTER_1, "pause_fail", INDEX_WITH_FAIL_MAPPING); @@ -519,142 +246,4 @@ public void testBadAsyncId() throws Exception { var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); assertThrows(ResourceNotFoundException.class, () -> stopAction.actionGet(1000, TimeUnit.SECONDS)); } - - private void assertClusterInfoSuccess(EsqlExecutionInfo.Cluster cluster, int numShards) { - assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); - assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); - assertThat(cluster.getTotalShards(), equalTo(numShards)); - assertThat(cluster.getSuccessfulShards(), equalTo(numShards)); - assertThat(cluster.getSkippedShards(), equalTo(0)); - assertThat(cluster.getFailedShards(), equalTo(0)); - assertThat(cluster.getFailures().size(), equalTo(0)); - } - - private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta, int numClusters) { - try { - final Map esqlResponseAsMap = XContentTestUtils.convertToMap(resp); - final Object clusters = esqlResponseAsMap.get("_clusters"); - if (responseExpectMeta) { - assertNotNull(clusters); - // test a few entries to ensure it looks correct (other tests do a full analysis of the metadata in the response) - @SuppressWarnings("unchecked") - Map inner = (Map) clusters; - assertTrue(inner.containsKey("total")); - assertThat((int) inner.get("total"), equalTo(numClusters)); - assertTrue(inner.containsKey("details")); - } else { - assertNull(clusters); - } - } catch (IOException e) { - fail("Could not convert ESQLQueryResponse to Map: " + e); - } - } - - Map setupClusters(int numClusters) throws IOException { - assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters; - int numShardsLocal = randomIntBetween(1, 5); - populateLocalIndices(LOCAL_INDEX, numShardsLocal); - - int numShardsRemote = randomIntBetween(1, 5); - populateRemoteIndices(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); - - Map clusterInfo = new HashMap<>(); - clusterInfo.put("local.num_shards", numShardsLocal); - clusterInfo.put("local.index", LOCAL_INDEX); - clusterInfo.put("remote1.num_shards", numShardsRemote); - clusterInfo.put("remote1.index", REMOTE_INDEX); - - if (numClusters == 3) { - int numShardsRemote2 = randomIntBetween(1, 5); - populateRemoteIndices(REMOTE_CLUSTER_2, REMOTE_INDEX, numShardsRemote2); - clusterInfo.put("remote2.index", REMOTE_INDEX); - clusterInfo.put("remote2.num_shards", numShardsRemote2); - } - - String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER_1); - Setting skipUnavailableSetting = cluster(REMOTE_CLUSTER_1).clusterService().getClusterSettings().get(skipUnavailableKey); - boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService() - .getClusterSettings() - .get(skipUnavailableSetting); - clusterInfo.put("remote.skip_unavailable", skipUnavailable); - - return clusterInfo; - } - - void populateLocalIndices(String indexName, int numShards) { - Client localClient = client(LOCAL_CLUSTER); - assertAcked( - localClient.admin() - .indices() - .prepareCreate(indexName) - .setSettings(Settings.builder().put("index.number_of_shards", numShards)) - .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long", "const", "type=long") - ); - for (int i = 0; i < 10; i++) { - localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get(); - } - localClient.admin().indices().prepareRefresh(indexName).get(); - } - - void populateRuntimeIndex(String clusterAlias, String langName, String indexName) throws IOException { - populateRuntimeIndex(clusterAlias, langName, indexName, 10); - } - - void populateRuntimeIndex(String clusterAlias, String langName, String indexName, int count) throws IOException { - XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); - mapping.startObject("runtime"); - { - mapping.startObject("const"); - { - mapping.field("type", "long"); - mapping.startObject("script").field("source", "").field("lang", langName).endObject(); - } - mapping.endObject(); - } - mapping.endObject(); - mapping.endObject(); - client(clusterAlias).admin().indices().prepareCreate(indexName).setMapping(mapping).get(); - BulkRequestBuilder bulk = client(clusterAlias).prepareBulk(indexName).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < count; i++) { - bulk.add(new IndexRequest().source("foo", i)); - } - bulk.get(); - } - - void populateRemoteIndices(String clusterAlias, String indexName, int numShards) throws IOException { - Client remoteClient = client(clusterAlias); - assertAcked( - remoteClient.admin() - .indices() - .prepareCreate(indexName) - .setSettings(Settings.builder().put("index.number_of_shards", numShards)) - .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") - ); - for (int i = 0; i < 10; i++) { - remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get(); - } - remoteClient.admin().indices().prepareRefresh(indexName).get(); - } - - public static class CountingPauseFieldPlugin extends SimplePauseFieldPlugin { - public static AtomicLong count = new AtomicLong(0); - - protected String scriptTypeName() { - return "pause_count"; - } - - public static void resetPlugin() { - count.set(0); - } - - @Override - public boolean onWait() throws InterruptedException { - count.incrementAndGet(); - return allowEmitting.await(30, TimeUnit.SECONDS); - } - } - - private static List getDriverTasks(Client client) { - return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks(); - } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java new file mode 100644 index 0000000000000..1c67b956de5e5 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -0,0 +1,239 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.Build; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.compute.operator.DriverTaskRunner; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.xpack.core.async.AsyncStopRequest; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase.randomIncludeCCSMetadata; +import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId; +import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.getAsyncResponse; +import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery; +import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQueryWithPragmas; +import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.waitForCluster; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class CrossClusterAsyncQueryStopIT extends AbstractCrossClusterTestCase { + + public void testStopQuery() throws Exception { + assumeTrue("Pragma does not work in release builds", Build.current().isSnapshot()); + Map testClusterInfo = setupClusters(3); + int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); + int remote1NumShards = (Integer) testClusterInfo.get("remote1.num_shards"); + // Create large index so we could be sure we're stopping before the end + populateRuntimeIndex(REMOTE_CLUSTER_2, "pause_count", INDEX_WITH_BLOCKING_MAPPING); + + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + final String asyncExecutionId = startAsyncQueryWithPragmas( + client(), + "FROM logs-*,cluster-a:logs-*,remote-b:blocking | STATS total=sum(coalesce(const,v)) | LIMIT 1", + includeCCSMetadata.v1(), + Map.of("page_size", 1, "data_partitioning", "shard", "task_concurrency", 1) + ); + + // wait until we know that the query against 'remote-b:blocking' has started + CountingPauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); + + // wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it) + waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId); + waitForCluster(client(), LOCAL_CLUSTER, asyncExecutionId); + + /* at this point: + * the query against cluster-a should be finished + * the query against remote-b should be running (blocked on the PauseFieldPlugin.allowEmitting CountDown) + * the query against the local cluster should be running because it has a STATS clause that needs to wait on remote-b + */ + + // run the stop query + AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId); + ActionFuture stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + assertBusy(() -> { + List tasks = getDriverTasks(client(REMOTE_CLUSTER_2)); + List reduceTasks = tasks.stream().filter(t -> t.description().contains("_LuceneSourceOperator") == false).toList(); + assertThat(reduceTasks, empty()); + }); + // allow remoteB query to proceed + CountingPauseFieldPlugin.allowEmitting.countDown(); + + // Since part of the query has not been stopped, we expect some result to emerge here + try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { + // Check that we did not process all the fields on remote-b + // Should not be getting more than one page here, and we set page size to 1 + assertThat(CountingPauseFieldPlugin.count.get(), lessThanOrEqualTo(1L)); + assertThat(asyncResponse.isRunning(), is(false)); + assertThat(asyncResponse.columns().size(), equalTo(1)); + assertThat(asyncResponse.values().hasNext(), is(true)); + Iterator row = asyncResponse.values().next(); + // sum of 0-9 is 45, and sum of 0-9 squared is 285 + assertThat(row.next(), equalTo(330L)); + + EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); + assertThat(executionInfo.isPartial(), equalTo(true)); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); + assertClusterInfoSuccess(remoteCluster, remote1NumShards); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Cluster.getIndexExpression(), equalTo("blocking")); + assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); + assertClusterInfoSuccess(localCluster, localNumShards); + + assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); + } finally { + assertAcked(deleteAsyncId(client(), asyncExecutionId)); + } + } + + public void testStopQueryLocal() throws Exception { + Map testClusterInfo = setupClusters(3); + int remote1NumShards = (Integer) testClusterInfo.get("remote1.num_shards"); + int remote2NumShards = (Integer) testClusterInfo.get("remote2.num_shards"); + populateRuntimeIndex(LOCAL_CLUSTER, "pause", INDEX_WITH_BLOCKING_MAPPING); + + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + final String asyncExecutionId = startAsyncQuery( + client(), + "FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1", + includeCCSMetadata.v1() + ); + + // wait until we know that the query against 'remote-b:blocking' has started + SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); + + // wait until the remotes are done + waitForCluster(client(), REMOTE_CLUSTER_1, asyncExecutionId); + waitForCluster(client(), REMOTE_CLUSTER_2, asyncExecutionId); + + /* at this point: + * the query against remotes should be finished + * the query against the local cluster should be running because it's blocked + */ + + // run the stop query + AsyncStopRequest stopRequest = new AsyncStopRequest(asyncExecutionId); + ActionFuture stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + // ensure stop operation is running + assertBusy(() -> { + try (EsqlQueryResponse asyncResponse = getAsyncResponse(client(), asyncExecutionId)) { + EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isPartial(), is(true)); + } + }); + // allow local query to proceed + SimplePauseFieldPlugin.allowEmitting.countDown(); + + // Since part of the query has not been stopped, we expect some result to emerge here + try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { + assertThat(asyncResponse.isRunning(), is(false)); + assertThat(asyncResponse.columns().size(), equalTo(1)); + assertThat(asyncResponse.values().hasNext(), is(true)); + Iterator row = asyncResponse.values().next(); + // sum of 0-9 squared is 285, from two remotes it's 570 + assertThat(row.next(), equalTo(570L)); + + EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); + assertThat(executionInfo.isPartial(), equalTo(true)); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); + assertClusterInfoSuccess(remoteCluster, remote1NumShards); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*")); + assertClusterInfoSuccess(remote2Cluster, remote2NumShards); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getIndexExpression(), equalTo("blocking")); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + + assertClusterMetadataInResponse(asyncResponse, responseExpectMeta, 3); + } finally { + assertAcked(deleteAsyncId(client(), asyncExecutionId)); + } + } + + public void testStopQueryLocalNoRemotes() throws Exception { + setupClusters(3); + populateRuntimeIndex(LOCAL_CLUSTER, "pause", INDEX_WITH_BLOCKING_MAPPING); + + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + final String asyncExecutionId = startAsyncQuery( + client(), + "FROM blocking | STATS total=count(const) | LIMIT 1", + includeCCSMetadata.v1() + ); + + // wait until we know that the query against 'remote-b:blocking' has started + SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS); + + /* at this point: + * the query against the local cluster should be running because it's blocked + */ + + // run the stop query + var stopRequest = new AsyncStopRequest(asyncExecutionId); + var stopAction = client().execute(EsqlAsyncStopAction.INSTANCE, stopRequest); + // allow local query to proceed + SimplePauseFieldPlugin.allowEmitting.countDown(); + + try (EsqlQueryResponse asyncResponse = stopAction.actionGet(30, TimeUnit.SECONDS)) { + assertThat(asyncResponse.isRunning(), is(false)); + assertThat(asyncResponse.columns().size(), equalTo(1)); + assertThat(asyncResponse.values().hasNext(), is(true)); + Iterator row = asyncResponse.values().next(); + assertThat((long) row.next(), greaterThanOrEqualTo(0L)); + + EsqlExecutionInfo executionInfo = asyncResponse.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(false)); + } finally { + assertAcked(deleteAsyncId(client(), asyncExecutionId)); + } + } + + private static List getDriverTasks(Client client) { + return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks(); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java similarity index 88% rename from x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java rename to x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java index 1a99fd9d0383d..0ca01ae7ec69e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java @@ -15,28 +15,21 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.operator.DriverProfile; -import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.XContentTestUtils; -import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -58,49 +51,19 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; -public class CrossClustersQueryIT extends AbstractMultiClustersTestCase { - private static final String REMOTE_CLUSTER_1 = "cluster-a"; - private static final String REMOTE_CLUSTER_2 = "remote-b"; - private static String LOCAL_INDEX = "logs-1"; - private static String IDX_ALIAS = "alias1"; - private static String FILTERED_IDX_ALIAS = "alias-filtered-1"; - private static String REMOTE_INDEX = "logs-2"; - - @Override - protected List remoteClusterAlias() { - return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2); - } +public class CrossClusterQueryIT extends AbstractCrossClusterTestCase { + private static final String IDX_ALIAS = "alias1"; + private static final String FILTERED_IDX_ALIAS = "alias-filtered-1"; @Override protected Map skipUnavailableForRemoteClusters() { return Map.of(REMOTE_CLUSTER_1, randomBoolean(), REMOTE_CLUSTER_2, randomBoolean()); } - @Override - protected Collection> nodePlugins(String clusterAlias) { - List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); - plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); - plugins.add(InternalExchangePlugin.class); - return plugins; - } - - public static class InternalExchangePlugin extends Plugin { - @Override - public List> getSettings() { - return List.of( - Setting.timeSetting( - ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, - TimeValue.timeValueSeconds(30), - Setting.Property.NodeScope - ) - ); - } - } - - public void testSuccessfulPathways() { + public void testSuccessfulPathways() throws Exception { Map testClusterInfo = setupTwoClusters(); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); - int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); + int remoteNumShards = (Integer) testClusterInfo.get("remote1.num_shards"); Tuple includeCCSMetadata = randomIncludeCCSMetadata(); Boolean requestIncludeMeta = includeCCSMetadata.v1(); @@ -183,7 +146,7 @@ public void testSuccessfulPathways() { } } - public void testSearchesAgainstNonMatchingIndicesWithLocalOnly() { + public void testSearchesAgainstNonMatchingIndicesWithLocalOnly() throws Exception { Map testClusterInfo = setupTwoClusters(); String localIndex = (String) testClusterInfo.get("local.index"); @@ -230,7 +193,7 @@ public void testSearchesAgainstNonMatchingIndicesWithLocalOnly() { } } - public void testSearchesAgainstIndicesWithNoMappingsSkipUnavailableTrue() { + public void testSearchesAgainstIndicesWithNoMappingsSkipUnavailableTrue() throws Exception { int numClusters = 2; setupClusters(numClusters); Map clusterToEmptyIndexMap = createEmptyIndicesWithNoMappings(numClusters); @@ -288,13 +251,13 @@ public void testSearchesAgainstIndicesWithNoMappingsSkipUnavailableTrue() { } } - public void testSearchesAgainstNonMatchingIndices() { + public void testSearchesAgainstNonMatchingIndices() throws Exception { int numClusters = 3; Map testClusterInfo = setupClusters(numClusters); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); - int remote1NumShards = (Integer) testClusterInfo.get("remote.num_shards"); + int remote1NumShards = (Integer) testClusterInfo.get("remote1.num_shards"); String localIndex = (String) testClusterInfo.get("local.index"); - String remote1Index = (String) testClusterInfo.get("remote.index"); + String remote1Index = (String) testClusterInfo.get("remote1.index"); String remote2Index = (String) testClusterInfo.get("remote2.index"); createIndexAliases(numClusters); @@ -494,7 +457,7 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu } } - public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() { + public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() throws Exception { Map testClusterInfo = setupTwoClusters(); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); @@ -571,7 +534,7 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() { * Note: the tests covering "nonmatching indices" also do LIMIT 0 tests. * This one is mostly focuses on took time values. */ - public void testCCSExecutionOnSearchesWithLimit0() { + public void testCCSExecutionOnSearchesWithLimit0() throws Exception { setupTwoClusters(); Tuple includeCCSMetadata = randomIncludeCCSMetadata(); Boolean requestIncludeMeta = includeCCSMetadata.v1(); @@ -617,10 +580,10 @@ public void testCCSExecutionOnSearchesWithLimit0() { } } - public void testMetadataIndex() { + public void testMetadataIndex() throws Exception { Map testClusterInfo = setupTwoClusters(); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); - int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); + int remoteNumShards = (Integer) testClusterInfo.get("remote1.num_shards"); Tuple includeCCSMetadata = randomIncludeCCSMetadata(); Boolean requestIncludeMeta = includeCCSMetadata.v1(); @@ -662,10 +625,10 @@ public void testMetadataIndex() { } } - public void testProfile() { + public void testProfile() throws Exception { Map testClusterInfo = setupTwoClusters(); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); - int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); + int remoteNumShards = (Integer) testClusterInfo.get("remote1.num_shards"); assumeTrue("pragmas only enabled on snapshot builds", Build.current().isSnapshot()); // uses shard partitioning as segments can be merged during these queries @@ -785,7 +748,7 @@ public void testProfile() { public void testWarnings() throws Exception { Map testClusterInfo = setupTwoClusters(); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); - int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); + int remoteNumShards = (Integer) testClusterInfo.get("remote1.num_shards"); EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); request.query("FROM logs*,c*:logs* | EVAL ip = to_ip(id) | STATS total = sum(v) by ip | LIMIT 10"); @@ -855,22 +818,6 @@ private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, bool } } - protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { - EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); - request.query(query); - request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); - request.profile(randomInt(5) == 2); - request.columnar(randomBoolean()); - if (ccsMetadataInResponse != null) { - request.includeCCSMetadata(ccsMetadataInResponse); - } - return runQuery(request); - } - - protected EsqlQueryResponse runQuery(EsqlQueryRequest request) { - return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); - } - void waitForNoInitializingShards(Client client, TimeValue timeout, String... indices) { ClusterHealthResponse resp = client.admin() .cluster() @@ -883,41 +830,10 @@ void waitForNoInitializingShards(Client client, TimeValue timeout, String... ind assertFalse(Strings.toString(resp, true, true), resp.isTimedOut()); } - Map setupTwoClusters() { + Map setupTwoClusters() throws IOException { return setupClusters(2); } - Map setupClusters(int numClusters) { - assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters; - int numShardsLocal = randomIntBetween(1, 5); - populateLocalIndices(LOCAL_INDEX, numShardsLocal); - - int numShardsRemote = randomIntBetween(1, 5); - populateRemoteIndices(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); - - Map clusterInfo = new HashMap<>(); - clusterInfo.put("local.num_shards", numShardsLocal); - clusterInfo.put("local.index", LOCAL_INDEX); - clusterInfo.put("remote.num_shards", numShardsRemote); - clusterInfo.put("remote.index", REMOTE_INDEX); - - if (numClusters == 3) { - int numShardsRemote2 = randomIntBetween(1, 5); - populateRemoteIndices(REMOTE_CLUSTER_2, REMOTE_INDEX, numShardsRemote2); - clusterInfo.put("remote2.index", REMOTE_INDEX); - clusterInfo.put("remote2.num_shards", numShardsRemote2); - } - - String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER_1); - Setting skipUnavailableSetting = cluster(REMOTE_CLUSTER_1).clusterService().getClusterSettings().get(skipUnavailableKey); - boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService() - .getClusterSettings() - .get(skipUnavailableSetting); - clusterInfo.put("remote.skip_unavailable", skipUnavailable); - - return clusterInfo; - } - /** * For the local cluster and REMOTE_CLUSTER_1 it creates a standard alias to the index created in populateLocalIndices * and populateRemoteIndices. It also creates a filtered alias against those indices that looks like: @@ -1009,53 +925,4 @@ Map createEmptyIndicesWithNoMappings(int numClusters) { return clusterToEmptyIndexMap; } - - void populateLocalIndices(String indexName, int numShards) { - Client localClient = client(LOCAL_CLUSTER); - assertAcked( - localClient.admin() - .indices() - .prepareCreate(indexName) - .setSettings(Settings.builder().put("index.number_of_shards", numShards)) - .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") - ); - for (int i = 0; i < 10; i++) { - localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get(); - } - localClient.admin().indices().prepareRefresh(indexName).get(); - } - - void populateRemoteIndices(String clusterAlias, String indexName, int numShards) { - Client remoteClient = client(clusterAlias); - assertAcked( - remoteClient.admin() - .indices() - .prepareCreate(indexName) - .setSettings(Settings.builder().put("index.number_of_shards", numShards)) - .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") - ); - for (int i = 0; i < 10; i++) { - remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get(); - } - remoteClient.admin().indices().prepareRefresh(indexName).get(); - } - - private void setSkipUnavailable(String clusterAlias, boolean skip) { - client(LOCAL_CLUSTER).admin() - .cluster() - .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) - .setPersistentSettings(Settings.builder().put("cluster.remote." + clusterAlias + ".skip_unavailable", skip).build()) - .get(); - } - - private void clearSkipUnavailable() { - Settings.Builder settingsBuilder = Settings.builder() - .putNull("cluster.remote." + REMOTE_CLUSTER_1 + ".skip_unavailable") - .putNull("cluster.remote." + REMOTE_CLUSTER_2 + ".skip_unavailable"); - client(LOCAL_CLUSTER).admin() - .cluster() - .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) - .setPersistentSettings(settingsBuilder.build()) - .get(); - } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java index eb728895cd00c..0ed6897f0ee7d 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java @@ -8,27 +8,13 @@ package org.elasticsearch.xpack.esql.action; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.compute.operator.exchange.ExchangeService; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.AbstractMultiClustersTestCase; -import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.xpack.esql.core.type.DataType; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase.randomIncludeCCSMetadata; import static org.hamcrest.Matchers.equalTo; @@ -37,41 +23,13 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; -public class CrossClusterQueryUnavailableRemotesIT extends AbstractMultiClustersTestCase { - private static final String REMOTE_CLUSTER_1 = "cluster-a"; - private static final String REMOTE_CLUSTER_2 = "cluster-b"; - - @Override - protected List remoteClusterAlias() { - return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2); - } +public class CrossClusterQueryUnavailableRemotesIT extends AbstractCrossClusterTestCase { @Override protected boolean reuseClusters() { return false; } - @Override - protected Collection> nodePlugins(String clusterAlias) { - List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); - plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); - plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class); - return plugins; - } - - public static class InternalExchangePlugin extends Plugin { - @Override - public List> getSettings() { - return List.of( - Setting.timeSetting( - ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, - TimeValue.timeValueSeconds(30), - Setting.Property.NodeScope - ) - ); - } - } - public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exception { int numClusters = 3; Map testClusterInfo = setupClusters(numClusters); @@ -133,7 +91,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc assertThat(localCluster.getFailedShards(), equalTo(0)); // ensure that the _clusters metadata is present only if requested - assertClusterMetadataInResponse(resp, responseExpectMeta); + assertClusterMetadataInResponse(resp, responseExpectMeta, numClusters); } // scenario where there are no indices to match because @@ -186,7 +144,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc assertThat(localCluster.getFailedShards(), equalTo(0)); // ensure that the _clusters metadata is present only if requested - assertClusterMetadataInResponse(resp, responseExpectMeta); + assertClusterMetadataInResponse(resp, responseExpectMeta, numClusters); } // close remote-cluster-2 so that it is also unavailable @@ -237,7 +195,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc assertThat(localCluster.getFailedShards(), equalTo(0)); // ensure that the _clusters metadata is present only if requested - assertClusterMetadataInResponse(resp, responseExpectMeta); + assertClusterMetadataInResponse(resp, responseExpectMeta, numClusters); } } finally { clearSkipUnavailable(numClusters); @@ -289,7 +247,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() assertThat(remoteCluster.getFailedShards(), equalTo(0)); // ensure that the _clusters metadata is present only if requested - assertClusterMetadataInResponse(resp, responseExpectMeta); + assertClusterMetadataInResponse(resp, responseExpectMeta, numClusters); } // close remote cluster 2 so that it is also unavailable @@ -341,7 +299,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() assertThat(remote2Cluster.getFailedShards(), equalTo(0)); // ensure that the _clusters metadata is present only if requested - assertClusterMetadataInResponse(resp, responseExpectMeta); + assertClusterMetadataInResponse(resp, responseExpectMeta, numClusters); } } finally { @@ -396,116 +354,4 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableFalse() clearSkipUnavailable(numClusters); } } - - private void setSkipUnavailable(String clusterAlias, boolean skip) { - client(LOCAL_CLUSTER).admin() - .cluster() - .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) - .setPersistentSettings(Settings.builder().put("cluster.remote." + clusterAlias + ".skip_unavailable", skip).build()) - .get(); - } - - private void clearSkipUnavailable(int numClusters) { - assert numClusters == 2 || numClusters == 3 : "Only 2 or 3 clusters supported"; - Settings.Builder settingsBuilder = Settings.builder().putNull("cluster.remote." + REMOTE_CLUSTER_1 + ".skip_unavailable"); - if (numClusters == 3) { - settingsBuilder.putNull("cluster.remote." + REMOTE_CLUSTER_2 + ".skip_unavailable"); - } - client(LOCAL_CLUSTER).admin() - .cluster() - .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) - .setPersistentSettings(settingsBuilder.build()) - .get(); - } - - private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) { - try { - final Map esqlResponseAsMap = XContentTestUtils.convertToMap(resp); - final Object clusters = esqlResponseAsMap.get("_clusters"); - if (responseExpectMeta) { - assertNotNull(clusters); - // test a few entries to ensure it looks correct (other tests do a full analysis of the metadata in the response) - @SuppressWarnings("unchecked") - Map inner = (Map) clusters; - assertTrue(inner.containsKey("total")); - assertTrue(inner.containsKey("details")); - } else { - assertNull(clusters); - } - } catch (IOException e) { - fail("Could not convert ESQL response to Map: " + e); - } - } - - protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { - EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); - request.query(query); - request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); - request.profile(randomInt(5) == 2); - request.columnar(randomBoolean()); - if (ccsMetadataInResponse != null) { - request.includeCCSMetadata(ccsMetadataInResponse); - } - return runQuery(request); - } - - protected EsqlQueryResponse runQuery(EsqlQueryRequest request) { - return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); - } - - Map setupClusters(int numClusters) { - assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters; - String localIndex = "logs-1"; - int numShardsLocal = randomIntBetween(1, 5); - populateLocalIndices(localIndex, numShardsLocal); - - String remoteIndex = "logs-2"; - int numShardsRemote = randomIntBetween(1, 5); - populateRemoteIndices(REMOTE_CLUSTER_1, remoteIndex, numShardsRemote); - - Map clusterInfo = new HashMap<>(); - clusterInfo.put("local.num_shards", numShardsLocal); - clusterInfo.put("local.index", localIndex); - clusterInfo.put("remote.num_shards", numShardsRemote); - clusterInfo.put("remote.index", remoteIndex); - - if (numClusters == 3) { - int numShardsRemote2 = randomIntBetween(1, 5); - populateRemoteIndices(REMOTE_CLUSTER_2, remoteIndex, numShardsRemote2); - clusterInfo.put("remote2.index", remoteIndex); - clusterInfo.put("remote2.num_shards", numShardsRemote2); - } - - return clusterInfo; - } - - void populateLocalIndices(String indexName, int numShards) { - Client localClient = client(LOCAL_CLUSTER); - assertAcked( - localClient.admin() - .indices() - .prepareCreate(indexName) - .setSettings(Settings.builder().put("index.number_of_shards", numShards)) - .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") - ); - for (int i = 0; i < 10; i++) { - localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get(); - } - localClient.admin().indices().prepareRefresh(indexName).get(); - } - - void populateRemoteIndices(String clusterAlias, String indexName, int numShards) { - Client remoteClient = client(clusterAlias); - assertAcked( - remoteClient.admin() - .indices() - .prepareCreate(indexName) - .setSettings(Settings.builder().put("index.number_of_shards", numShards)) - .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") - ); - for (int i = 0; i < 10; i++) { - remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get(); - } - remoteClient.admin().indices().prepareRefresh(indexName).get(); - } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java index 89f7fdca79135..29fbc19b7274f 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java @@ -41,7 +41,7 @@ public class CrossClustersUsageTelemetryIT extends AbstractCrossClustersUsageTel protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); - plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class); + plugins.add(CrossClusterQueryIT.InternalExchangePlugin.class); plugins.add(SimplePauseFieldPlugin.class); plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action return plugins; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java index 2b993e9474062..5736c485aec1a 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java @@ -23,7 +23,7 @@ public class CrossClustersUsageTelemetryNoLicenseIT extends AbstractCrossCluster protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); plugins.add(EsqlPluginWithNonEnterpriseOrExpiredLicense.class); - plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class); + plugins.add(CrossClusterQueryIT.InternalExchangePlugin.class); return plugins; } From 7dd0fe3e3d6acbbc14f86f419f53506800e92a83 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 3 Feb 2025 10:48:03 -0700 Subject: [PATCH 2/2] Make it abstract --- .../xpack/esql/action/AbstractCrossClusterTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java index e6927439c7e71..d9149ee291fdf 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java @@ -37,7 +37,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -public class AbstractCrossClusterTestCase extends AbstractMultiClustersTestCase { +public abstract class AbstractCrossClusterTestCase extends AbstractMultiClustersTestCase { protected static final String REMOTE_CLUSTER_1 = "cluster-a"; protected static final String REMOTE_CLUSTER_2 = "remote-b"; protected static final String LOCAL_INDEX = "logs-1";