Skip to content

Commit

Permalink
Run yaml rest tests using CCS in multi-cluster setup (#86521)
Browse files Browse the repository at this point in the history
Currently we only test a small subset of cross cluster search functionality in
rest tests living in the 'multi-cluster-search' qa module. In order to increase
test coverage for basic CCS functionality , this change adds a new qa modula
that re-uses a subset of existing yaml rest tests and runs them in a slightly
modified fashion in a CCS scenario.
Document data and other write operations are executed agains a "remote"
cluster, while all calls to the search API aand other APIs that support CCS are
performed on a local cluster connected to the remote with all the data via CCS.

Relates to #84481
  • Loading branch information
Christoph Büscher committed May 24, 2022
1 parent e12e452 commit 410d381
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 3 deletions.
98 changes: 98 additions & 0 deletions qa/ccs-common-rest/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/


import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import org.elasticsearch.gradle.testclusters.DefaultTestClustersTask


apply plugin: 'elasticsearch.internal-testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-resources'

// This subproject copies a subset of the rest-api-spec rest tests and runs them in a slightly
// modified way on two clusters connected via CCS. All operations except searches and a few other
// APIs that support CCS are run against the remote "write" cluster where all indexed data will live.
// All search requests however are run against the local cluster connected via CCS to the remote
// cluster. The test runner modifies index names on these API calls to route to the remote cluster
// and also modifies certain "match" sections to expect index names with the remote cluster prefix
// on the fly while running these tests.

restResources {
restApi {
include '_common', 'bulk', 'count', 'cluster', 'index', 'indices', 'field_caps', 'msearch',
'search', 'async_search', 'graph', '*_point_in_time', 'info', 'scroll', 'clear_scroll'
}
restTests {
includeCore 'field_caps', 'msearch', 'search', 'suggest', 'scroll', "indices.resolve_index"
includeXpack 'async_search'
}
}

def remoteCluster = testClusters.register("ccs-remote") {
numberOfNodes = 2
setting 'node.roles', '[data,ingest,master]'
}

def localCluster = testClusters.register("ccs-local") {
setting 'node.roles', '[data,ingest,master,remote_cluster_client]'
setting 'cluster.remote.connections_per_cluster', '1'
setting 'cluster.remote.remote_cluster.seeds',
{ "\"${remoteCluster.get().getAllTransportPortURI().get(0)}\"" }
}

testClusters.configureEach {
setting 'xpack.security.enabled', 'false'
requiresFeature 'es.index_mode_feature_flag_registered', Version.fromString("8.0.0")
}

// the following task is needed to make sure the remote cluster is running before the local cluster
// gets configured with the remotes cluster seed
tasks.register('startRemoteCluster', DefaultTestClustersTask) {
useCluster remoteCluster
doLast {
clusters.each { c ->
print "Remote cluster transport uri for ccs configuration is: "
println c.getAllTransportPortURI().get(0)
}
}
}

tasks.register("ccs-remote", RestIntegTestTask) {
mustRunAfter("precommit")
dependsOn startRemoteCluster

useCluster remoteCluster
useCluster localCluster

systemProperty 'tests.rest.blacklist',
[
'search/150_rewrite_on_coordinator/Ensure that we fetch the document only once', // terms lookup query with index
'search/170_terms_query/Terms Query with No.of terms exceeding index.max_terms_count should FAIL', // terms lookup query with index
'search/350_point_in_time/basic', // [indices] cannot be used with point in time
'search/350_point_in_time/point-in-time with slicing', // [indices] cannot be used with point in time
'search/350_point_in_time/msearch', // [indices] cannot be used with point in time
'search/350_point_in_time/wildcard', // [indices] cannot be used with point in time
'search.aggregation/220_filters_bucket/cache busting', // node_selector?
'search.aggregation/220_filters_bucket/cache hits', // node_selector?
'search.aggregation/50_filter/Standard queries get cached',
'search.aggregation/50_filter/Terms lookup gets cached', // terms lookup by "index" doesn't seem to work correctly
'search.aggregation/70_adjacency_matrix/Terms lookup', // terms lookup by "index" doesn't seem to work correctly
'async_search/20-with-poin-in-time/Async search with point in time' // [indices] cannot be used with point in time
].join(',')


doFirst {
println "Remote cluster endpoints are: ${-> remoteCluster.get().allHttpSocketURI.join(",")}"
println "Local cluster endpoints are: ${-> localCluster.get().allHttpSocketURI.join(",")}"
nonInputProperties.systemProperty('tests.rest.cluster', remoteCluster.map(c -> c.allHttpSocketURI.join(",")))
nonInputProperties.systemProperty('tests.rest.search_cluster', localCluster.map(c -> c.allHttpSocketURI.join(",")))
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test.rest.yaml;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.tests.util.TimeUnits;
import org.elasticsearch.Version;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.test.rest.yaml.section.ClientYamlTestSection;
import org.elasticsearch.test.rest.yaml.section.DoSection;
import org.elasticsearch.test.rest.yaml.section.ExecutableSection;
import org.elasticsearch.test.rest.yaml.section.IsFalseAssertion;
import org.elasticsearch.test.rest.yaml.section.IsTrueAssertion;
import org.elasticsearch.test.rest.yaml.section.MatchAssertion;
import org.junit.AfterClass;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.unmodifiableList;

/**
* This runner executes test suits against two clusters (a "write" (the remote) cluster and a
* "search" cluster) connected via CCS.
* The test runner maintains an additional client to the one provided by ESClientYamlSuiteTestCase
* That client instance (and a corresponding client only used for administration) is running all API calls
* defined in CCS_APIS against the "search" cluster, while all other operations like indexing are performed
* using the client running against the "write" cluster.
*
*/
@TimeoutSuite(millis = 15 * TimeUnits.MINUTE) // to account for slow as hell VMs
public class CcsCommonYamlTestSuiteIT extends ESClientYamlSuiteTestCase {

private static final Logger logger = LogManager.getLogger(CcsCommonYamlTestSuiteIT.class);
private static RestClient searchClient;
private static RestClient adminSearchClient;
private static List<HttpHost> clusterHosts;
private static ClientYamlTestClient searchYamlTestClient;

// the remote cluster is the one we write index operations etc... to
private static final String REMOTE_CLUSTER_NAME = "remote_cluster";
// the CCS api calls that we run against the "search" cluster in this test setup
private static final Set<String> CCS_APIS = Set.of(
"search",
"field_caps",
"msearch",
"scroll",
"clear_scroll",
"indices.resolve_index",
"async_search.submit",
"async_search.get",
"async_search.status",
"async_search.delete"
);

/**
* initialize the search client and an additional administration client and check for an established connection
*/
@Before
public void initSearchClient() throws IOException {
if (searchClient == null) {
assert adminSearchClient == null;
assert clusterHosts == null;

final String cluster = System.getProperty("tests.rest.search_cluster");
assertNotNull("[tests.rest.search_cluster] is not configured", cluster);
String[] stringUrls = cluster.split(",");
List<HttpHost> hosts = new ArrayList<>(stringUrls.length);
for (String stringUrl : stringUrls) {
int portSeparator = stringUrl.lastIndexOf(':');
if (portSeparator < 0) {
throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]");
}
String host = stringUrl.substring(0, portSeparator);
int port = Integer.parseInt(stringUrl.substring(portSeparator + 1));
hosts.add(buildHttpHost(host, port));
}
clusterHosts = unmodifiableList(hosts);
logger.info("initializing REST search clients against {}", clusterHosts);
searchClient = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()]));
adminSearchClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()]));

Tuple<Version, Version> versionVersionTuple = readVersionsFromCatNodes(adminSearchClient);
final Version esVersion = versionVersionTuple.v1();
final Version masterVersion = versionVersionTuple.v2();
final String os = readOsFromNodesInfo(adminSearchClient);

searchYamlTestClient = new ClientYamlTestClient(
getRestSpec(),
searchClient,
hosts,
esVersion,
masterVersion,
os,
this::getClientBuilderWithSniffedHosts
) {
// we overwrite this method so the search client can modify the index names by prefixing them with the
// remote cluster name before sending the requests
public ClientYamlTestResponse callApi(
String apiName,
Map<String, String> params,
HttpEntity entity,
Map<String, String> headers,
NodeSelector nodeSelector
) throws IOException {
// on request, we need to replace index specifications by prefixing the remote cluster
if (apiName.equals("scroll") == false
&& apiName.equals("clear_scroll") == false
&& apiName.equals("async_search.get") == false
&& apiName.equals("async_search.delete") == false
&& apiName.equals("async_search.status") == false) {
String parameterName = "index";
if (apiName.equals("indices.resolve_index")) {
// in this specific api, the index parameter is called "name"
parameterName = "name";
}
String originalIndices = params.get(parameterName);
String expandedIndices = REMOTE_CLUSTER_NAME + ":*";
if (originalIndices != null && (originalIndices.isEmpty() == false)) {
String[] indices = originalIndices.split(",");
List<String> newIndices = new ArrayList<>();
for (String indexName : indices) {
newIndices.add(REMOTE_CLUSTER_NAME + ":" + indexName);
}
expandedIndices = String.join(",", newIndices);
}
params.put(parameterName, String.join(",", expandedIndices));
}
return super.callApi(apiName, params, entity, headers, nodeSelector);
};
};

// check that we have an established CCS connection
Request request = new Request("GET", "_remote/info");
Response response = adminSearchClient.performRequest(request);
assertOK(response);
ObjectPath responseObject = ObjectPath.createFromResponse(response);
assertNotNull(responseObject.evaluate(REMOTE_CLUSTER_NAME));
logger.info("Established connection to remote cluster [" + REMOTE_CLUSTER_NAME + "]");
}

assert searchClient != null;
assert adminSearchClient != null;
assert clusterHosts != null;
}

public CcsCommonYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) throws IOException {
super(rewrite(testCandidate));
}

/**
* we need to rewrite a few "match" sections in order to change the expected index name values
* to include the remote cluster prefix
*/
private static ClientYamlTestCandidate rewrite(ClientYamlTestCandidate clientYamlTestCandidate) {
ClientYamlTestSection testSection = clientYamlTestCandidate.getTestSection();
List<ExecutableSection> executableSections = testSection.getExecutableSections();
List<ExecutableSection> modifiedExecutableSections = new ArrayList<>();
String lastAPIDoSection = "";
for (ExecutableSection section : executableSections) {
ExecutableSection rewrittenSection = section;
if (section instanceof MatchAssertion matchSection) {
Object modifiedExpectedValue = ((MatchAssertion) section).getExpectedValue();
if (matchSection.getField().endsWith("_index") || matchSection.getField().contains("fields._index")) {
modifiedExpectedValue = rewriteExpectedIndexValue(matchSection.getExpectedValue());
}
if (lastAPIDoSection.equals("indices.resolve_index") && matchSection.getField().endsWith("name")) {
// modify " indices.resolve_index" expected index names
modifiedExpectedValue = rewriteExpectedIndexValue(matchSection.getExpectedValue());
}
if (lastAPIDoSection.equals("field_caps") && matchSection.getField().endsWith("indices")) {
modifiedExpectedValue = rewriteExpectedIndexValue(matchSection.getExpectedValue());
}
rewrittenSection = new MatchAssertion(matchSection.getLocation(), matchSection.getField(), modifiedExpectedValue);
} else if (section instanceof IsFalseAssertion falseAssertion) {
if ((lastAPIDoSection.startsWith("async_") || lastAPIDoSection.equals("search"))
&& ((IsFalseAssertion) section).getField().endsWith("_clusters")) {
// in ccs scenarios, the response "_cluster" section will be there
rewrittenSection = new IsTrueAssertion(falseAssertion.getLocation(), falseAssertion.getField());
}
} else if (section instanceof DoSection) {
lastAPIDoSection = ((DoSection) section).getApiCallSection().getApi();
if (lastAPIDoSection.equals("msearch")) {
// modify "msearch" body sections so the "index" part is targeting the remote cluster
DoSection doSection = ((DoSection) section);
List<Map<String, Object>> bodies = doSection.getApiCallSection().getBodies();
for (Map<String, Object> body : bodies) {
if (body.containsKey("index")) {
String modifiedIndex = REMOTE_CLUSTER_NAME + ":" + body.get("index");
body.put("index", modifiedIndex);
}
}
}
}
modifiedExecutableSections.add(rewrittenSection);
}
return new ClientYamlTestCandidate(
clientYamlTestCandidate.getRestTestSuite(),
new ClientYamlTestSection(
testSection.getLocation(),
testSection.getName(),
testSection.getSkipSection(),
modifiedExecutableSections
)
);
}

/**
* add the remote cluster prefix to either a single index name or a list of expected index names
*/
private static Object rewriteExpectedIndexValue(Object expectedValue) {
if (expectedValue instanceof String) {
return REMOTE_CLUSTER_NAME + ":" + expectedValue;
}
if (expectedValue instanceof List) {
@SuppressWarnings("unchecked")
List<String> expectedValues = (List<String>) expectedValue;
return expectedValues.stream().map(s -> REMOTE_CLUSTER_NAME + ":" + s).toList();
}
throw new IllegalArgumentException("Either String or List<String> expected");
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return createParameters();
}

@Override
protected ClientYamlTestExecutionContext createRestTestExecutionContext(
ClientYamlTestCandidate clientYamlTestCandidate,
ClientYamlTestClient clientYamlTestClient
) {
// depending on the API called, we either return the client running against the "write" or the "search" cluster here
return new ClientYamlTestExecutionContext(clientYamlTestCandidate, clientYamlTestClient, randomizeContentType()) {
protected ClientYamlTestClient clientYamlTestClient(String apiName) {
if (CCS_APIS.contains(apiName)) {
return searchYamlTestClient;
} else {
return super.clientYamlTestClient(apiName);
}
}
};
}

@AfterClass
public static void closeSearchClients() throws IOException {
try {
IOUtils.close(searchClient, adminSearchClient);
} finally {
clusterHosts = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public SetupSection getSetupSection() {
return restTestSuite.getSetupSection();
}

public ClientYamlTestSuite getRestTestSuite() {
return restTestSuite;
}

public TeardownSection getTeardownSection() {
return restTestSuite.getTeardownSection();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ ClientYamlTestResponse callApiInternal(
Map<String, String> headers,
NodeSelector nodeSelector
) throws IOException {
return clientYamlTestClient.callApi(apiName, params, entity, headers, nodeSelector);
return clientYamlTestClient(apiName).callApi(apiName, params, entity, headers, nodeSelector);
}

protected ClientYamlTestClient clientYamlTestClient(String apiName) {
return clientYamlTestClient;
}

/**
Expand Down

0 comments on commit 410d381

Please sign in to comment.