Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run more search Rest tests in a CCS setup #86521

Merged
merged 14 commits into from
May 24, 2022
98 changes: 98 additions & 0 deletions qa/ccs-common-rest/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/


import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import org.elasticsearch.gradle.testclusters.DefaultTestClustersTask


apply plugin: 'elasticsearch.internal-testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-resources'

// This subproject copies a subset of the rest-api-spec rest tests and runs them in a slightly
// modified way on two clusters connected via CCS. All operations except searches and a few other
// APIs that support CCS are run against the remote "write" cluster where all indexed data will live.
// All search requests however are run against the local cluster connected via CCS to the remote
// cluster. The test runner modifies index names on these API calls to route to the remote cluster
// and also modifies certain "match" sections to expect index names with the remote cluster prefix
// on the fly while running these tests.

restResources {
restApi {
include '_common', 'bulk', 'count', 'cluster', 'index', 'indices', 'field_caps', 'msearch',
'search', 'async_search', 'graph', '*_point_in_time', 'info', 'scroll', 'clear_scroll'
}
restTests {
includeCore 'field_caps', 'msearch', 'search', 'suggest', 'scroll', "indices.resolve_index"
includeXpack 'async_search'
}
}

def remoteCluster = testClusters.register("ccs-remote") {
numberOfNodes = 2
setting 'node.roles', '[data,ingest,master]'
}

def localCluster = testClusters.register("ccs-local") {
setting 'node.roles', '[data,ingest,master,remote_cluster_client]'
setting 'cluster.remote.connections_per_cluster', '1'
setting 'cluster.remote.remote_cluster.seeds',
{ "\"${remoteCluster.get().getAllTransportPortURI().get(0)}\"" }
}

testClusters.configureEach {
setting 'xpack.security.enabled', 'false'
requiresFeature 'es.index_mode_feature_flag_registered', Version.fromString("8.0.0")
}

// the following task is needed to make sure the remote cluster is running before the local cluster
// gets configured with the remotes cluster seed
tasks.register('startRemoteCluster', DefaultTestClustersTask) {
useCluster remoteCluster
doLast {
clusters.each { c ->
print "Remote cluster transport uri for ccs configuration is: "
println c.getAllTransportPortURI().get(0)
}
}
}

tasks.register("ccs-remote", RestIntegTestTask) {
mustRunAfter("precommit")
dependsOn startRemoteCluster

useCluster remoteCluster
useCluster localCluster

systemProperty 'tests.rest.blacklist',
[
'search/150_rewrite_on_coordinator/Ensure that we fetch the document only once', // terms lookup query with index
'search/170_terms_query/Terms Query with No.of terms exceeding index.max_terms_count should FAIL', // terms lookup query with index
'search/350_point_in_time/basic', // [indices] cannot be used with point in time
'search/350_point_in_time/point-in-time with slicing', // [indices] cannot be used with point in time
'search/350_point_in_time/msearch', // [indices] cannot be used with point in time
'search/350_point_in_time/wildcard', // [indices] cannot be used with point in time
'search.aggregation/220_filters_bucket/cache busting', // node_selector?
'search.aggregation/220_filters_bucket/cache hits', // node_selector?
'search.aggregation/50_filter/Standard queries get cached',
'search.aggregation/50_filter/Terms lookup gets cached', // terms lookup by "index" doesn't seem to work correctly
'search.aggregation/70_adjacency_matrix/Terms lookup', // terms lookup by "index" doesn't seem to work correctly
'async_search/20-with-poin-in-time/Async search with point in time' // [indices] cannot be used with point in time
].join(',')


doFirst {
println "Remote cluster endpoints are: ${-> remoteCluster.get().allHttpSocketURI.join(",")}"
println "Local cluster endpoints are: ${-> localCluster.get().allHttpSocketURI.join(",")}"
nonInputProperties.systemProperty('tests.rest.cluster', remoteCluster.map(c -> c.allHttpSocketURI.join(",")))
nonInputProperties.systemProperty('tests.rest.search_cluster', localCluster.map(c -> c.allHttpSocketURI.join(",")))
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test.rest.yaml;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.tests.util.TimeUnits;
import org.elasticsearch.Version;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.test.rest.yaml.section.ClientYamlTestSection;
import org.elasticsearch.test.rest.yaml.section.DoSection;
import org.elasticsearch.test.rest.yaml.section.ExecutableSection;
import org.elasticsearch.test.rest.yaml.section.IsFalseAssertion;
import org.elasticsearch.test.rest.yaml.section.IsTrueAssertion;
import org.elasticsearch.test.rest.yaml.section.MatchAssertion;
import org.junit.AfterClass;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.unmodifiableList;

/**
* This runner executes test suits against two clusters (a "write" (the remote) cluster and a
* "search" cluster) connected via CCS.
* The test runner maintains an additional client to the one provided by ESClientYamlSuiteTestCase
* That client instance (and a corresponding client only used for administration) is running all API calls
* defined in CCS_APIS against the "search" cluster, while all other operations like indexing are performed
* using the client running against the "write" cluster.
*
*/
@TimeoutSuite(millis = 15 * TimeUnits.MINUTE) // to account for slow as hell VMs
public class CcsCommonYamlTestSuiteIT extends ESClientYamlSuiteTestCase {

private static final Logger logger = LogManager.getLogger(CcsCommonYamlTestSuiteIT.class);
private static RestClient searchClient;
private static RestClient adminSearchClient;
private static List<HttpHost> clusterHosts;
private static ClientYamlTestClient searchYamlTestClient;

// the remote cluster is the one we write index operations etc... to
private static final String REMOTE_CLUSTER_NAME = "remote_cluster";
// the CCS api calls that we run against the "search" cluster in this test setup
private static final Set<String> CCS_APIS = Set.of(
"search",
"field_caps",
"msearch",
"scroll",
"clear_scroll",
"indices.resolve_index",
"async_search.submit",
"async_search.get",
"async_search.status",
"async_search.delete"
);

/**
* initialize the search client and an additional administration client and check for an established connection
*/
@Before
public void initSearchClient() throws IOException {
if (searchClient == null) {
assert adminSearchClient == null;
assert clusterHosts == null;

final String cluster = System.getProperty("tests.rest.search_cluster");
assertNotNull("[tests.rest.search_cluster] is not configured", cluster);
String[] stringUrls = cluster.split(",");
List<HttpHost> hosts = new ArrayList<>(stringUrls.length);
for (String stringUrl : stringUrls) {
int portSeparator = stringUrl.lastIndexOf(':');
if (portSeparator < 0) {
throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]");
}
String host = stringUrl.substring(0, portSeparator);
int port = Integer.parseInt(stringUrl.substring(portSeparator + 1));
hosts.add(buildHttpHost(host, port));
}
clusterHosts = unmodifiableList(hosts);
logger.info("initializing REST search clients against {}", clusterHosts);
searchClient = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()]));
adminSearchClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()]));

Tuple<Version, Version> versionVersionTuple = readVersionsFromCatNodes(adminSearchClient);
final Version esVersion = versionVersionTuple.v1();
final Version masterVersion = versionVersionTuple.v2();
final String os = readOsFromNodesInfo(adminSearchClient);

searchYamlTestClient = new ClientYamlTestClient(
getRestSpec(),
searchClient,
hosts,
esVersion,
masterVersion,
os,
this::getClientBuilderWithSniffedHosts
) {
// we overwrite this method so the search client can modify the index names by prefixing them with the
// remote cluster name before sending the requests
public ClientYamlTestResponse callApi(
String apiName,
Map<String, String> params,
HttpEntity entity,
Map<String, String> headers,
NodeSelector nodeSelector
) throws IOException {
// on request, we need to replace index specifications by prefixing the remote cluster
if (apiName.equals("scroll") == false
&& apiName.equals("clear_scroll") == false
&& apiName.equals("async_search.get") == false
&& apiName.equals("async_search.delete") == false
&& apiName.equals("async_search.status") == false) {
String parameterName = "index";
if (apiName.equals("indices.resolve_index")) {
// in this specific api, the index parameter is called "name"
parameterName = "name";
}
String originalIndices = params.get(parameterName);
String expandedIndices = REMOTE_CLUSTER_NAME + ":*";
if (originalIndices != null && (originalIndices.isEmpty() == false)) {
String[] indices = originalIndices.split(",");
List<String> newIndices = new ArrayList<>();
for (String indexName : indices) {
newIndices.add(REMOTE_CLUSTER_NAME + ":" + indexName);
}
expandedIndices = String.join(",", newIndices);
}
params.put(parameterName, String.join(",", expandedIndices));
}
return super.callApi(apiName, params, entity, headers, nodeSelector);
};
};

// check that we have an established CCS connection
Request request = new Request("GET", "_remote/info");
Response response = adminSearchClient.performRequest(request);
assertOK(response);
ObjectPath responseObject = ObjectPath.createFromResponse(response);
assertNotNull(responseObject.evaluate(REMOTE_CLUSTER_NAME));
logger.info("Established connection to remote cluster [" + REMOTE_CLUSTER_NAME + "]");
}

assert searchClient != null;
assert adminSearchClient != null;
assert clusterHosts != null;
}

public CcsCommonYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) throws IOException {
super(rewrite(testCandidate));
}

/**
* we need to rewrite a few "match" sections in order to change the expected index name values
* to include the remote cluster prefix
*/
private static ClientYamlTestCandidate rewrite(ClientYamlTestCandidate clientYamlTestCandidate) {
ClientYamlTestSection testSection = clientYamlTestCandidate.getTestSection();
List<ExecutableSection> executableSections = testSection.getExecutableSections();
List<ExecutableSection> modifiedExecutableSections = new ArrayList<>();
String lastAPIDoSection = "";
for (ExecutableSection section : executableSections) {
ExecutableSection rewrittenSection = section;
if (section instanceof MatchAssertion matchSection) {
Object modifiedExpectedValue = ((MatchAssertion) section).getExpectedValue();
if (matchSection.getField().endsWith("_index") || matchSection.getField().contains("fields._index")) {
modifiedExpectedValue = rewriteExpectedIndexValue(matchSection.getExpectedValue());
}
if (lastAPIDoSection.equals("indices.resolve_index") && matchSection.getField().endsWith("name")) {
// modify " indices.resolve_index" expected index names
modifiedExpectedValue = rewriteExpectedIndexValue(matchSection.getExpectedValue());
}
if (lastAPIDoSection.equals("field_caps") && matchSection.getField().endsWith("indices")) {
modifiedExpectedValue = rewriteExpectedIndexValue(matchSection.getExpectedValue());
}
rewrittenSection = new MatchAssertion(matchSection.getLocation(), matchSection.getField(), modifiedExpectedValue);
} else if (section instanceof IsFalseAssertion falseAssertion) {
if ((lastAPIDoSection.startsWith("async_") || lastAPIDoSection.equals("search"))
&& ((IsFalseAssertion) section).getField().endsWith("_clusters")) {
// in ccs scenarios, the response "_cluster" section will be there
rewrittenSection = new IsTrueAssertion(falseAssertion.getLocation(), falseAssertion.getField());
cbuescher marked this conversation as resolved.
Show resolved Hide resolved
}
} else if (section instanceof DoSection) {
lastAPIDoSection = ((DoSection) section).getApiCallSection().getApi();
if (lastAPIDoSection.equals("msearch")) {
// modify "msearch" body sections so the "index" part is targeting the remote cluster
DoSection doSection = ((DoSection) section);
List<Map<String, Object>> bodies = doSection.getApiCallSection().getBodies();
for (Map<String, Object> body : bodies) {
if (body.containsKey("index")) {
String modifiedIndex = REMOTE_CLUSTER_NAME + ":" + body.get("index");
body.put("index", modifiedIndex);
}
}
}
}
modifiedExecutableSections.add(rewrittenSection);
}
return new ClientYamlTestCandidate(
clientYamlTestCandidate.getRestTestSuite(),
new ClientYamlTestSection(
testSection.getLocation(),
testSection.getName(),
testSection.getSkipSection(),
modifiedExecutableSections
)
);
}

/**
* add the remote cluster prefix to either a single index name or a list of expected index names
*/
private static Object rewriteExpectedIndexValue(Object expectedValue) {
if (expectedValue instanceof String) {
return REMOTE_CLUSTER_NAME + ":" + expectedValue;
}
if (expectedValue instanceof List) {
@SuppressWarnings("unchecked")
List<String> expectedValues = (List<String>) expectedValue;
return expectedValues.stream().map(s -> REMOTE_CLUSTER_NAME + ":" + s).toList();
}
throw new IllegalArgumentException("Either String or List<String> expected");
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return createParameters();
}

@Override
protected ClientYamlTestExecutionContext createRestTestExecutionContext(
ClientYamlTestCandidate clientYamlTestCandidate,
ClientYamlTestClient clientYamlTestClient
) {
// depending on the API called, we either return the client running against the "write" or the "search" cluster here
return new ClientYamlTestExecutionContext(clientYamlTestCandidate, clientYamlTestClient, randomizeContentType()) {
protected ClientYamlTestClient clientYamlTestClient(String apiName) {
if (CCS_APIS.contains(apiName)) {
return searchYamlTestClient;
} else {
return super.clientYamlTestClient(apiName);
}
}
};
}

@AfterClass
public static void closeSearchClients() throws IOException {
try {
IOUtils.close(searchClient, adminSearchClient);
} finally {
clusterHosts = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public SetupSection getSetupSection() {
return restTestSuite.getSetupSection();
}

public ClientYamlTestSuite getRestTestSuite() {
return restTestSuite;
}

public TeardownSection getTeardownSection() {
return restTestSuite.getTeardownSection();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ ClientYamlTestResponse callApiInternal(
Map<String, String> headers,
NodeSelector nodeSelector
) throws IOException {
return clientYamlTestClient.callApi(apiName, params, entity, headers, nodeSelector);
return clientYamlTestClient(apiName).callApi(apiName, params, entity, headers, nodeSelector);
}

protected ClientYamlTestClient clientYamlTestClient(String apiName) {
return clientYamlTestClient;
}

/**
Expand Down
Loading