Skip to content

Commit

Permalink
Ensure search features work with the new RCS model (#93720)
Browse files Browse the repository at this point in the history
This PR ensures most search features (scroll, async search, pit, field
caps, msearch, vector tile etc) work with the new RCS model. The main
code change is tested by adapting the common yaml CCS tests to use the
new RCS model to provide a broad test coverage. The tests ensure the new
RCS model works from search's perspective. We could still use more tests
from security's perspective, e.g. DLS/FLS, in separate PRs.

Note:  * Eql yaml test files are not located under `x-pack/plugin` and
this makes it hard to reuse. It should be possible to relocate them. But
I'll address it separately.  * Sql yaml requires special transformation
to work. I'll also have it separately.
  • Loading branch information
ywangd committed Feb 16, 2023
1 parent f49bb09 commit 26d01b7
Show file tree
Hide file tree
Showing 11 changed files with 614 additions and 95 deletions.
13 changes: 6 additions & 7 deletions qa/ccs-common-rest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,34 @@ apply plugin: 'elasticsearch.internal-yaml-rest-test'
restResources {
restApi {
include '_common', 'bulk', 'count', 'cluster', 'field_caps', 'knn_search', 'index', 'indices', 'msearch',
'search', 'async_search', 'graph', '*_point_in_time', 'info', 'scroll', 'clear_scroll'
'search', 'async_search', 'graph', '*_point_in_time', 'info', 'scroll', 'clear_scroll', 'search_mvt'
}
restTests {
includeCore 'field_caps', 'msearch', 'search', 'suggest', 'scroll', "indices.resolve_index"
includeXpack 'async_search'
includeXpack 'async_search', 'vector-tile'
}
}

dependencies {
clusterModules project(':x-pack:plugin:async-search')
clusterModules project(':modules:mapper-extras')
clusterModules project(':modules:aggregations')
clusterModules project(':modules:analysis-common')
clusterModules project(':x-pack:plugin:analytics')
clusterModules project(':x-pack:plugin:vector-tile')
clusterModules project(':modules:legacy-geo')
}

tasks.named("yamlRestTest") {
systemProperty 'tests.rest.blacklist',
[
'search/150_rewrite_on_coordinator/Ensure that we fetch the document only once', // terms lookup query with index
'search/170_terms_query/Terms Query with No.of terms exceeding index.max_terms_count should FAIL', // terms lookup query with index
'search/350_point_in_time/basic', // [indices] cannot be used with point in time
'search/350_point_in_time/point-in-time with slicing', // [indices] cannot be used with point in time
'search/350_point_in_time/msearch', // [indices] cannot be used with point in time
'search/350_point_in_time/wildcard', // [indices] cannot be used with point in time
'search.aggregation/220_filters_bucket/cache busting', // node_selector?
'search.aggregation/220_filters_bucket/cache hits', // node_selector?
'search.aggregation/50_filter/Standard queries get cached',
'search.aggregation/50_filter/Terms lookup gets cached', // terms lookup by "index" doesn't seem to work correctly
'search.aggregation/70_adjacency_matrix/Terms lookup', // terms lookup by "index" doesn't seem to work correctly
'async_search/20-with-poin-in-time/Async search with point in time' // [indices] cannot be used with point in time
].join(',')
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.test.rest.yaml.restspec.ClientYamlSuiteRestSpec;
import org.elasticsearch.test.rest.yaml.section.ClientYamlTestSection;
import org.elasticsearch.test.rest.yaml.section.DoSection;
import org.elasticsearch.test.rest.yaml.section.ExecutableSection;
Expand Down Expand Up @@ -63,14 +66,18 @@ public class CcsCommonYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
private static RestClient searchClient;
private static RestClient adminSearchClient;
private static List<HttpHost> clusterHosts;
private static ClientYamlTestClient searchYamlTestClient;
private static TestCandidateAwareClient searchYamlTestClient;
// the remote cluster is the one we write index operations etc... to
private static final String REMOTE_CLUSTER_NAME = "remote_cluster";

private static LocalClusterConfigProvider commonClusterConfig = cluster -> cluster.module("x-pack-async-search")
.module("aggregations")
.module("mapper-extras")
.module("vector-tile")
.module("x-pack-analytics")
.setting("xpack.security.enabled", "false")
// geohex_grid requires gold license
.setting("xpack.license.self_generated.type", "trial")
.feature(FeatureFlag.TIME_SERIES_MODE);

private static ElasticsearchCluster remoteCluster = ElasticsearchCluster.local()
Expand All @@ -93,7 +100,7 @@ public class CcsCommonYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);

// the CCS api calls that we run against the "search" cluster in this test setup
private static final Set<String> CCS_APIS = Set.of(
static final Set<String> CCS_APIS = Set.of(
"search",
"field_caps",
"msearch",
Expand All @@ -103,7 +110,9 @@ public class CcsCommonYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
"async_search.submit",
"async_search.get",
"async_search.status",
"async_search.delete"
"async_search.delete",
"open_point_in_time",
"close_point_in_time"
);

@Override
Expand Down Expand Up @@ -141,50 +150,15 @@ public void initSearchClient() throws IOException {
final Version masterVersion = versionVersionTuple.v2();
final String os = readOsFromNodesInfo(adminSearchClient);

searchYamlTestClient = new ClientYamlTestClient(
searchYamlTestClient = new TestCandidateAwareClient(
getRestSpec(),
searchClient,
hosts,
esVersion,
masterVersion,
os,
this::getClientBuilderWithSniffedHosts
) {
// we overwrite this method so the search client can modify the index names by prefixing them with the
// remote cluster name before sending the requests
public ClientYamlTestResponse callApi(
String apiName,
Map<String, String> params,
HttpEntity entity,
Map<String, String> headers,
NodeSelector nodeSelector
) throws IOException {
// on request, we need to replace index specifications by prefixing the remote cluster
if (apiName.equals("scroll") == false
&& apiName.equals("clear_scroll") == false
&& apiName.equals("async_search.get") == false
&& apiName.equals("async_search.delete") == false
&& apiName.equals("async_search.status") == false) {
String parameterName = "index";
if (apiName.equals("indices.resolve_index")) {
// in this specific api, the index parameter is called "name"
parameterName = "name";
}
String originalIndices = params.get(parameterName);
String expandedIndices = REMOTE_CLUSTER_NAME + ":*";
if (originalIndices != null && (originalIndices.isEmpty() == false)) {
String[] indices = originalIndices.split(",");
List<String> newIndices = new ArrayList<>();
for (String indexName : indices) {
newIndices.add(REMOTE_CLUSTER_NAME + ":" + indexName);
}
expandedIndices = String.join(",", newIndices);
}
params.put(parameterName, String.join(",", expandedIndices));
}
return super.callApi(apiName, params, entity, headers, nodeSelector);
};
};
);

// check that we have an established CCS connection
Request request = new Request("GET", "_remote/info");
Expand All @@ -198,6 +172,8 @@ public ClientYamlTestResponse callApi(
assert searchClient != null;
assert adminSearchClient != null;
assert clusterHosts != null;

searchYamlTestClient.setTestCandidate(getTestCandidate());
}

public CcsCommonYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) throws IOException {
Expand All @@ -208,7 +184,7 @@ public CcsCommonYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) throws IO
* we need to rewrite a few "match" sections in order to change the expected index name values
* to include the remote cluster prefix
*/
private static ClientYamlTestCandidate rewrite(ClientYamlTestCandidate clientYamlTestCandidate) {
static ClientYamlTestCandidate rewrite(ClientYamlTestCandidate clientYamlTestCandidate) {
ClientYamlTestSection testSection = clientYamlTestCandidate.getTestSection();
List<ExecutableSection> executableSections = testSection.getExecutableSections();
List<ExecutableSection> modifiedExecutableSections = new ArrayList<>();
Expand Down Expand Up @@ -244,6 +220,17 @@ private static ClientYamlTestCandidate rewrite(ClientYamlTestCandidate clientYam
if (body.containsKey("index")) {
String modifiedIndex = REMOTE_CLUSTER_NAME + ":" + body.get("index");
body.put("index", modifiedIndex);
} else if (body.containsKey("query") && body.containsKey("pit")) {
// search/350_point_in_time/msearch uses _index in a match query
@SuppressWarnings("unchecked")
final var query = (Map<String, Object>) body.get("query");
if (query.containsKey("match")) {
@SuppressWarnings("unchecked")
final var match = (Map<String, Object>) query.get("match");
if (match.containsKey("_index")) {
match.put("_index", REMOTE_CLUSTER_NAME + ":" + match.get("_index"));
}
}
}
}
}
Expand Down Expand Up @@ -306,4 +293,78 @@ public static void closeSearchClients() throws IOException {
clusterHosts = null;
}
}

static class TestCandidateAwareClient extends ClientYamlTestClient {
private ClientYamlTestCandidate testCandidate;

TestCandidateAwareClient(
ClientYamlSuiteRestSpec restSpec,
RestClient restClient,
List<HttpHost> hosts,
Version esVersion,
Version masterVersion,
String os,
CheckedSupplier<RestClientBuilder, IOException> clientBuilderWithSniffedNodes
) {
super(restSpec, restClient, hosts, esVersion, masterVersion, os, clientBuilderWithSniffedNodes);
}

public void setTestCandidate(ClientYamlTestCandidate testCandidate) {
this.testCandidate = testCandidate;
}

// we overwrite this method so the search client can modify the index names by prefixing them with the
// remote cluster name before sending the requests
public ClientYamlTestResponse callApi(
String apiName,
Map<String, String> params,
HttpEntity entity,
Map<String, String> headers,
NodeSelector nodeSelector
) throws IOException {
// on request, we need to replace index specifications by prefixing the remote cluster
if (shouldReplaceIndexWithRemote(apiName)) {
String parameterName = "index";
if (apiName.equals("indices.resolve_index")) {
// in this specific api, the index parameter is called "name"
parameterName = "name";
}
String originalIndices = params.get(parameterName);
String expandedIndices = REMOTE_CLUSTER_NAME + ":*";
if (originalIndices != null && (originalIndices.isEmpty() == false)) {
String[] indices = originalIndices.split(",");
List<String> newIndices = new ArrayList<>();
for (String indexName : indices) {
newIndices.add(REMOTE_CLUSTER_NAME + ":" + indexName);
}
expandedIndices = String.join(",", newIndices);
}
params.put(parameterName, String.join(",", expandedIndices));
}
return super.callApi(apiName, params, entity, headers, nodeSelector);
}

private boolean shouldReplaceIndexWithRemote(String apiName) {
if (apiName.equals("scroll")
|| apiName.equals("clear_scroll")
|| apiName.equals("async_search.get")
|| apiName.equals("async_search.delete")
|| apiName.equals("async_search.status")
|| apiName.equals("close_point_in_time")) {
return false;
}

if (apiName.equals("search") || apiName.equals("msearch") || apiName.equals("async_search.submit")) {
final String testCandidateTestPath = testCandidate.getTestPath();
if (testCandidateTestPath.equals("search/350_point_in_time/basic")
|| testCandidateTestPath.equals("search/350_point_in_time/point-in-time with slicing")
|| testCandidateTestPath.equals("search/350_point_in_time/msearch")
|| testCandidateTestPath.equals("search/350_point_in_time/wildcard")
|| testCandidateTestPath.equals("async_search/20-with-poin-in-time/Async search with point in time")) {
return false;
}
}
return true;
}
}
}

0 comments on commit 26d01b7

Please sign in to comment.