Skip to content

Commit

Permalink
Introduce flag for testing CCS compatibility (#81809)
Browse files Browse the repository at this point in the history
CCS works with a subset of APIs and features depending on the versions of the clusters being communicated with.
Currently we limit this CCS compliance to one minor version backward and one minor forward.

This change adds a setting useful for testing in clients like Kibana that can be turned on to check if a search request
sent to one of the endpoints that are supporting CCS is compatible with a cluster that is on one minor version back.
We do this by trying to serialize the request to a stream with the earlier version. Features and components that are
not supported in that version should throw errors upon atempted serialization to indicate they are not compatible.
In addition we need components extending NamedWriteable (e.g. new queries) to also error when they are written to a
stream that has a version before the version they were released.
  • Loading branch information
Christoph Büscher committed Jan 27, 2022
1 parent 4ad7814 commit 051e1d6
Show file tree
Hide file tree
Showing 41 changed files with 878 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,30 @@

package org.elasticsearch.script.mustache;

import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.script.mustache.MultiSearchTemplateResponse.Item;
import org.elasticsearch.search.DummyQueryParserPlugin;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -34,7 +41,15 @@ public class MultiSearchTemplateIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(MustachePlugin.class);
return List.of(MustachePlugin.class, DummyQueryParserPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true")
.build();
}

public void testBasic() throws Exception {
Expand Down Expand Up @@ -164,4 +179,27 @@ public void testBasic() throws Exception {
assertThat(searchTemplateResponse5.hasResponse(), is(false));
assertThat(searchTemplateResponse5.getSource().utf8ToString(), equalTo("{\"query\":{\"terms\":{\"group\":[1,2,3,]}}}"));
}

/**
* Test that triggering the CCS compatibility check with a query that shouldn't go to the minor before Version.CURRENT works
*/
public void testCCSCheckCompatibility() throws Exception {
String templateString = """
{
"source": "{ \\"query\\":{\\"fail_before_current_version\\":{}} }"
}""";
SearchTemplateRequest searchTemplateRequest = SearchTemplateRequest.fromXContent(
createParser(JsonXContent.jsonXContent, templateString)
);
searchTemplateRequest.setRequest(new SearchRequest());
MultiSearchTemplateRequest request = new MultiSearchTemplateRequest();
request.add(searchTemplateRequest);
MultiSearchTemplateResponse multiSearchTemplateResponse = client().execute(MultiSearchTemplateAction.INSTANCE, request).get();
Item response = multiSearchTemplateResponse.getResponses()[0];
assertTrue(response.isFailure());
Exception ex = response.getFailure();
assertThat(ex.getMessage(), containsString("[class org.elasticsearch.action.search.SearchRequest] is not compatible with version"));
assertThat(ex.getMessage(), containsString("'search.check_ccs_compatibility' setting is enabled."));
assertEquals("This query isn't serializable to nodes before " + Version.CURRENT, ex.getCause().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
package org.elasticsearch.script.mustache;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.DummyQueryParserPlugin;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
Expand All @@ -23,7 +27,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand All @@ -38,7 +44,12 @@ public class SearchTemplateIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singleton(MustachePlugin.class);
return List.of(MustachePlugin.class, DummyQueryParserPlugin.class);
}

@Override
protected Settings nodeSettings() {
return Settings.builder().put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true").build();
}

@Before
Expand Down Expand Up @@ -346,4 +357,26 @@ public void testIndexedTemplateWithArray() throws Exception {
assertHitCount(searchResponse.getResponse(), 5);
}

/**
* Test that triggering the CCS compatibility check with a query that shouldn't go to the minor before Version.CURRENT works
*/
public void testCCSCheckCompatibility() throws Exception {
String templateString = """
{
"source": "{ \\"query\\":{\\"fail_before_current_version\\":{}} }"
}""";
SearchTemplateRequest request = SearchTemplateRequest.fromXContent(createParser(JsonXContent.jsonXContent, templateString));
request.setRequest(new SearchRequest());
ExecutionException ex = expectThrows(
ExecutionException.class,
() -> client().execute(SearchTemplateAction.INSTANCE, request).get()
);
assertThat(
ex.getCause().getMessage(),
containsString("[class org.elasticsearch.action.search.SearchRequest] is not compatible with version")
);
assertThat(ex.getCause().getMessage(), containsString("'search.check_ccs_compatibility' setting is enabled."));
assertEquals("This query isn't serializable to nodes before " + Version.CURRENT, ex.getCause().getCause().getMessage());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.DummyQueryBuilder;
import org.elasticsearch.search.DummyQueryParserPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Before;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@

package org.elasticsearch.search.msearch;

import org.elasticsearch.Version;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.DummyQueryBuilder;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentType;

Expand All @@ -22,6 +26,14 @@

public class MultiSearchIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true")
.build();
}

public void testSimpleMultiSearch() {
createIndex("test");
ensureGreen();
Expand Down Expand Up @@ -70,4 +82,32 @@ public void testSimpleMultiSearchMoreRequests() {
}
}

/**
* Test that triggering the CCS compatibility check with a query that shouldn't go to the minor before Version.CURRENT works
*/
public void testCCSCheckCompatibility() throws Exception {
createIndex("test");
ensureGreen();
client().prepareIndex("test").setId("1").setSource("field", "xxx").get();
client().prepareIndex("test").setId("2").setSource("field", "yyy").get();
refresh();
MultiSearchResponse response = client().prepareMultiSearch()
.add(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "xxx")))
.add(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "yyy")))
.add(client().prepareSearch("test").setQuery(new DummyQueryBuilder() {
@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT;
}
}))
.get();

assertThat(response.getResponses().length, equalTo(3));
assertHitCount(response.getResponses()[0].getResponse(), 1L);
assertHitCount(response.getResponses()[1].getResponse(), 1L);
assertTrue(response.getResponses()[2].isFailure());
assertTrue(
response.getResponses()[2].getFailure().getMessage().contains("the 'search.check_ccs_compatibility' setting is enabled")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
Expand All @@ -54,6 +55,8 @@
import java.util.SortedMap;
import java.util.TreeMap;

import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility;

public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response> {

public static final ResolveIndexAction INSTANCE = new ResolveIndexAction();
Expand Down Expand Up @@ -436,6 +439,7 @@ public static class TransportAction extends HandledTransportAction<Request, Resp
private final ClusterService clusterService;
private final RemoteClusterService remoteClusterService;
private final IndexAbstractionResolver indexAbstractionResolver;
private final boolean ccsCheckCompatibility;

@Inject
public TransportAction(
Expand All @@ -450,10 +454,14 @@ public TransportAction(
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.indexAbstractionResolver = new IndexAbstractionResolver(indexNameExpressionResolver);
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
}

@Override
protected void doExecute(Task task, Request request, final ActionListener<Response> listener) {
if (ccsCheckCompatibility) {
checkCCSVersionCompatibility(request);
}
final ClusterState clusterState = clusterService.state();
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(
request.indicesOptions(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
Expand All @@ -45,6 +46,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility;

public class TransportFieldCapabilitiesAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
public static final String ACTION_NODE_NAME = FieldCapabilitiesAction.NAME + "[n]";

Expand All @@ -55,6 +58,7 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie

private final FieldCapabilitiesFetcher fieldCapabilitiesFetcher;
private final Predicate<String> metadataFieldPred;
private final boolean ccsCheckCompatibility;

@Inject
public TransportFieldCapabilitiesAction(
Expand All @@ -79,10 +83,14 @@ public TransportFieldCapabilitiesAction(
FieldCapabilitiesNodeRequest::new,
new NodeTransportHandler()
);
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
}

@Override
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
if (ccsCheckCompatibility) {
checkCCSVersionCompatibility(request);
}
// retrieve the initial timestamp in case the action is a cross cluster search
long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis();
final ClusterState clusterState = clusterService.state();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@

import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility;
import static org.elasticsearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort;
import static org.elasticsearch.threadpool.ThreadPool.Names.SYSTEM_CRITICAL_READ;
import static org.elasticsearch.threadpool.ThreadPool.Names.SYSTEM_READ;
Expand Down Expand Up @@ -132,6 +133,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final CircuitBreaker circuitBreaker;
private final ExecutorSelector executorSelector;
private final int defaultPreFilterShardSize;
private final boolean ccsCheckCompatibility;

@Inject
public TransportSearchAction(
Expand Down Expand Up @@ -160,6 +162,7 @@ public TransportSearchAction(
this.namedWriteableRegistry = namedWriteableRegistry;
this.executorSelector = executorSelector;
this.defaultPreFilterShardSize = DEFAULT_PRE_FILTER_SHARD_SIZE.get(clusterService.getSettings());
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
}

private Map<String, OriginalIndices> buildPerIndexOriginalIndices(
Expand Down Expand Up @@ -371,6 +374,9 @@ private void executeRequest(
ActionListener<SearchRequest> rewriteListener = ActionListener.wrap(rewritten -> {
final SearchContextId searchContext;
final Map<String, OriginalIndices> remoteClusterIndices;
if (ccsCheckCompatibility) {
checkCCSVersionCompatibility(rewritten);
}
if (rewritten.pointInTimeBuilder() != null) {
searchContext = rewritten.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, rewritten.indicesOptions());
Expand Down

0 comments on commit 051e1d6

Please sign in to comment.