Skip to content

Commit

Permalink
[7.x] Ignore matching data streams if include_data_streams is false (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Jul 3, 2020
1 parent b9d9964 commit 7c43cbc
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
- match: { hits.hits.0._source.foo: 'bar' }

- do:
catch: bad_request
catch: missing
indices.delete:
index: logs-foobar

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -208,32 +207,21 @@ public void testOtherWriteOps() throws Exception {
client().admin().indices().createDataStream(createDataStreamRequest).get();

{
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(dataStreamName).source("{}", XContentType.JSON));
expectFailure(dataStreamName, () -> client().bulk(bulkRequest).actionGet());
}
{
BulkRequest bulkRequest = new BulkRequest()
.add(new DeleteRequest(dataStreamName, "_id"));
expectFailure(dataStreamName, () -> client().bulk(bulkRequest).actionGet());
}
{
BulkRequest bulkRequest = new BulkRequest()
.add(new UpdateRequest(dataStreamName, "_id").doc("{}", XContentType.JSON));
expectFailure(dataStreamName, () -> client().bulk(bulkRequest).actionGet());
}
{
IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{}", XContentType.JSON);
expectFailure(dataStreamName, () -> client().index(indexRequest).actionGet());
IndexRequest indexRequest = new IndexRequest(dataStreamName)
.source("{\"@timestamp1\": \"2020-12-12\"}", XContentType.JSON);
Exception e = expectThrows(IndexNotFoundException.class, () -> client().index(indexRequest).actionGet());
assertThat(e.getMessage(), equalTo("no such index [null]"));
}
{
UpdateRequest updateRequest = new UpdateRequest(dataStreamName, "_id")
.doc("{}", XContentType.JSON);
expectFailure(dataStreamName, () -> client().update(updateRequest).actionGet());
Exception e = expectThrows(IndexNotFoundException.class, () -> client().update(updateRequest).actionGet());
assertThat(e.getMessage(), equalTo("no such index [null]"));
}
{
DeleteRequest deleteRequest = new DeleteRequest(dataStreamName, "_id");
expectFailure(dataStreamName, () -> client().delete(deleteRequest).actionGet());
Exception e = expectThrows(IndexNotFoundException.class, () -> client().delete(deleteRequest).actionGet());
assertThat(e.getMessage(), equalTo("no such index [null]"));
}
{
IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{}", XContentType.JSON)
Expand Down Expand Up @@ -424,7 +412,7 @@ public void testResolvabilityOfDataStreamsInAPIs() throws Exception {
verifyResolvability(wildcardExpression, client().admin().indices().prepareUpgrade(wildcardExpression), false);
verifyResolvability(wildcardExpression, client().admin().indices().prepareRecoveries(wildcardExpression), false);
verifyResolvability(wildcardExpression, client().admin().indices().prepareUpgradeStatus(wildcardExpression), false);
verifyResolvability(wildcardExpression, getAliases(wildcardExpression), true);
verifyResolvability(wildcardExpression, getAliases(wildcardExpression), false);
verifyResolvability(wildcardExpression, getFieldMapping(wildcardExpression), false);
verifyResolvability(wildcardExpression,
putMapping("{\"_doc\":{\"properties\": {\"my_field\":{\"type\":\"keyword\"}}}}", wildcardExpression), false);
Expand Down Expand Up @@ -473,7 +461,8 @@ public void testAliasActionsFailOnDataStreams() throws Exception {
.index(dataStreamName).aliases("foo");
IndicesAliasesRequest aliasesAddRequest = new IndicesAliasesRequest();
aliasesAddRequest.addAliasAction(addAction);
expectFailure(dataStreamName, () -> client().admin().indices().aliases(aliasesAddRequest).actionGet());
Exception e = expectThrows(IndexNotFoundException.class, () -> client().admin().indices().aliases(aliasesAddRequest).actionGet());
assertThat(e.getMessage(), equalTo("no such index [" + dataStreamName +"]"));
}

public void testAliasActionsFailOnDataStreamBackingIndices() throws Exception {
Expand Down Expand Up @@ -729,19 +718,18 @@ private static void verifyResolvability(String dataStream, ActionRequestBuilder

private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail, long expectedCount) {
if (fail) {
String expectedErrorMessage = "The provided expression [" + dataStream +
"] matches a data stream, specify the corresponding concrete indices instead.";
String expectedErrorMessage = "no such index [" + dataStream + "]";
if (requestBuilder instanceof MultiSearchRequestBuilder) {
MultiSearchResponse multiSearchResponse = ((MultiSearchRequestBuilder) requestBuilder).get();
assertThat(multiSearchResponse.getResponses().length, equalTo(1));
assertThat(multiSearchResponse.getResponses()[0].isFailure(), is(true));
assertThat(multiSearchResponse.getResponses()[0].getFailure(), instanceOf(IllegalArgumentException.class));
assertThat(multiSearchResponse.getResponses()[0].getFailure().getMessage(), equalTo(expectedErrorMessage));
} else if (requestBuilder instanceof ValidateQueryRequestBuilder) {
ValidateQueryResponse response = (ValidateQueryResponse) requestBuilder.get();
assertThat(response.getQueryExplanation().get(0).getError(), equalTo(expectedErrorMessage));
Exception e = expectThrows(IndexNotFoundException.class, requestBuilder::get);
assertThat(e.getMessage(), equalTo(expectedErrorMessage));
} else {
Exception e = expectThrows(IllegalArgumentException.class, requestBuilder::get);
Exception e = expectThrows(IndexNotFoundException.class, requestBuilder::get);
assertThat(e.getMessage(), equalTo(expectedErrorMessage));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
}
} else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM &&
context.includeDataStreams() == false) {
throw dataStreamsNotSupportedException(expression);
continue;
}

if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS && context.isResolveToWriteIndex()) {
Expand Down Expand Up @@ -301,11 +301,6 @@ private static IllegalArgumentException aliasesNotSupportedException(String expr
"alias, specify the corresponding concrete indices instead.");
}

private static IllegalArgumentException dataStreamsNotSupportedException(String expression) {
return new IllegalArgumentException("The provided expression [" + expression + "] matches a " +
"data stream, specify the corresponding concrete indices instead.");
}

/**
* Utility method that allows to resolve an index expression to its corresponding single concrete index.
* Callers should make sure they provide proper {@link org.elasticsearch.action.support.IndicesOptions}
Expand Down Expand Up @@ -357,7 +352,12 @@ public Index concreteWriteIndex(ClusterState state, IndicesRequest request) {
*/
public Index concreteWriteIndex(ClusterState state, IndicesOptions options, String index, boolean allowNoIndices,
boolean includeDataStreams) {
Context context = new Context(state, options, false, true, includeDataStreams);
IndicesOptions combinedOptions = IndicesOptions.fromOptions(options.ignoreUnavailable(), allowNoIndices,
options.expandWildcardsOpen(), options.expandWildcardsClosed(), options.expandWildcardsHidden(),
options.allowAliasesToMultipleIndices(), options.forbidClosedIndices(), options.ignoreAliases(),
options.ignoreThrottled());

Context context = new Context(state, combinedOptions, false, true, includeDataStreams);
Index[] indices = concreteIndices(context, index);
if (allowNoIndices && indices.length == 0) {
return null;
Expand Down Expand Up @@ -732,10 +732,20 @@ public List<String> resolve(Context context, List<String> expressions) {
}

if (isEmptyOrTrivialWildcard(expressions)) {
if (context.includeDataStreams() == false && metadata.dataStreams().isEmpty() == false) {
throw dataStreamsNotSupportedException(expressions.toString());
List<String> resolvedExpressions = resolveEmptyOrTrivialWildcard(options, metadata);
if (context.includeDataStreams()) {
final IndexMetadata.State excludeState = excludeState(options);
final Map<String, IndexAbstraction> dataStreamsAbstractions = metadata.getIndicesLookup().entrySet()
.stream()
.filter(entry -> entry.getValue().getType() == IndexAbstraction.Type.DATA_STREAM)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// dedup backing indices if expand hidden indices option is true
Set<String> resolvedIncludingDataStreams = new HashSet<>(resolvedExpressions);
resolvedIncludingDataStreams.addAll(expand(context, excludeState, dataStreamsAbstractions,
expressions.isEmpty() ? "_all" : expressions.get(0), options.expandWildcardsHidden()));
return new ArrayList<>(resolvedIncludingDataStreams);
}
return resolveEmptyOrTrivialWildcard(options, metadata);
return resolvedExpressions;
}

Set<String> result = innerResolve(context, expressions, options, metadata);
Expand Down Expand Up @@ -786,8 +796,8 @@ private Set<String> innerResolve(Context context, List<String> expressions, Indi
} else if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS && options.ignoreAliases()) {
throw aliasesNotSupportedException(expression);
} else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM &&
context.includeDataStreams() == false) {
throw dataStreamsNotSupportedException(expression);
context.includeDataStreams() == false) {
throw indexNotFoundException(expression);
}
}
if (add) {
Expand Down Expand Up @@ -867,7 +877,7 @@ private static IndexMetadata.State excludeState(IndicesOptions options) {

public static Map<String, IndexAbstraction> matches(Context context, Metadata metadata, String expression) {
if (Regex.isMatchAllPattern(expression)) {
return filterIndicesLookup(context, metadata.getIndicesLookup(), null, expression, context.getOptions());
return filterIndicesLookup(context, metadata.getIndicesLookup(), null, context.getOptions());
} else if (expression.indexOf("*") == expression.length() - 1) {
return suffixWildcard(context, metadata, expression);
} else {
Expand All @@ -882,18 +892,17 @@ private static Map<String, IndexAbstraction> suffixWildcard(Context context, Met
toPrefixCharArr[toPrefixCharArr.length - 1]++;
String toPrefix = new String(toPrefixCharArr);
SortedMap<String, IndexAbstraction> subMap = metadata.getIndicesLookup().subMap(fromPrefix, toPrefix);
return filterIndicesLookup(context, subMap, null, expression, context.getOptions());
return filterIndicesLookup(context, subMap, null, context.getOptions());
}

private static Map<String, IndexAbstraction> otherWildcard(Context context, Metadata metadata, String expression) {
final String pattern = expression;
return filterIndicesLookup(context, metadata.getIndicesLookup(), e -> Regex.simpleMatch(pattern, e.getKey()),
expression, context.getOptions());
context.getOptions());
}

private static Map<String, IndexAbstraction> filterIndicesLookup(Context context, SortedMap<String, IndexAbstraction> indicesLookup,
Predicate<? super Map.Entry<String, IndexAbstraction>> filter,
String expression,
IndicesOptions options) {
boolean shouldConsumeStream = false;
Stream<Map.Entry<String, IndexAbstraction>> stream = indicesLookup.entrySet().stream();
Expand All @@ -907,11 +916,7 @@ private static Map<String, IndexAbstraction> filterIndicesLookup(Context context
}
if (context.includeDataStreams() == false) {
shouldConsumeStream = true;
stream = stream.peek(e -> {
if (e.getValue().getType() == IndexAbstraction.Type.DATA_STREAM) {
throw dataStreamsNotSupportedException(expression);
}
});
stream = stream.filter(e -> e.getValue().getType() != IndexAbstraction.Type.DATA_STREAM);
}
if (shouldConsumeStream) {
return stream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Expand Down

0 comments on commit 7c43cbc

Please sign in to comment.