Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

Expand Down Expand Up @@ -590,7 +591,7 @@ private boolean addFailureIfIndexIsUnavailable(DocWriteRequest<?> request, int i
if (concreteIndex == null) {
try {
concreteIndex = concreteIndices.resolveIfAbsent(request);
} catch (IndexClosedException | IndexNotFoundException ex) {
} catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) {
addFailure(request, idx, ex);
return true;
}
Expand Down Expand Up @@ -636,8 +637,16 @@ Index resolveIfAbsent(DocWriteRequest<?> request) {
Index concreteIndex = indices.get(request.index());
if (concreteIndex == null) {
boolean includeDataStreams = request.opType() == DocWriteRequest.OpType.CREATE;
concreteIndex = indexNameExpressionResolver.concreteWriteIndex(state, request.indicesOptions(), request.indices()[0],
false, includeDataStreams);
try {
concreteIndex = indexNameExpressionResolver.concreteWriteIndex(state, request.indicesOptions(),
request.indices()[0], false, includeDataStreams);
} catch (IndexNotFoundException e) {
if (includeDataStreams == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) {
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
} else {
throw e;
}
}
indices.put(request.index(), concreteIndex);
}
return concreteIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -50,6 +51,8 @@

import java.io.IOException;

import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;

public abstract class TransportInstanceSingleOperationAction<
Request extends InstanceShardOperationRequest<Request>,
Response extends ActionResponse
Expand Down Expand Up @@ -143,7 +146,15 @@ protected void doStart(ClusterState clusterState) {
throw blockException;
}
}
request.concreteIndex(indexNameExpressionResolver.concreteWriteIndex(clusterState, request).getName());
try {
request.concreteIndex(indexNameExpressionResolver.concreteWriteIndex(clusterState, request).getName());
} catch (IndexNotFoundException e) {
if (request.includeDataStreams() == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) {
throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams");
} else {
throw e;
}
}
resolveRequest(clusterState, request);
blockException = checkRequestBlock(clusterState, request);
if (blockException != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@

public class IndexNameExpressionResolver {

public static final String EXCLUDED_DATA_STREAMS_KEY = "es.excluded_ds";

private final DateMathExpressionResolver dateMathExpressionResolver = new DateMathExpressionResolver();
private final WildcardExpressionResolver wildcardExpressionResolver = new WildcardExpressionResolver();
private final List<ExpressionResolver> expressionResolvers = List.of(dateMathExpressionResolver, wildcardExpressionResolver);
Expand Down Expand Up @@ -207,6 +209,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
}
}

boolean excludedDataStreams = false;
final Set<Index> concreteIndices = new HashSet<>(expressions.size());
for (String expression : expressions) {
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(expression);
Expand All @@ -231,6 +234,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
}
} else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM &&
context.includeDataStreams() == false) {
excludedDataStreams = true;
continue;
}

Expand Down Expand Up @@ -272,6 +276,10 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
if (options.allowNoIndices() == false && concreteIndices.isEmpty()) {
IndexNotFoundException infe = new IndexNotFoundException((String)null);
infe.setResources("index_expression", indexExpressions);
if (excludedDataStreams) {
// Allows callers to handle IndexNotFoundException differently based on whether data streams were excluded.
infe.addMetadata(EXCLUDED_DATA_STREAMS_KEY, "true");
}
throw infe;
}
return concreteIndices.toArray(new Index[concreteIndices.size()]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,50 @@
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged

---
"Non append-only writes into a data stream":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
features: allowed_warnings

- do:
allowed_warnings:
- "index template [my-template1] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
indices.put_index_template:
name: my-template1
body:
index_patterns: [logs-*]
data_stream: {}

- do:
catch: bad_request
index:
index: logs-foobar
id: "1"
body:
'@timestamp': '2020-12-12'

- do:
bulk:
body:
- index:
_index: logs-foobar
_id: "1"
- '@timestamp': '2020-12-12'
- create:
_index: logs-foobar
_id: "1"
- '@timestamp': '2020-12-12'
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: illegal_argument_exception }
- match: { items.0.index.error.reason: "only write ops with an op_type of create are allowed in data streams" }
- match: { items.1.create.result: created }
- match: { items.1.create._index: .ds-logs-foobar-000001 }

- do:
indices.delete_data_stream:
name: logs-foobar
- is_true: acknowledged
Original file line number Diff line number Diff line change
Expand Up @@ -223,18 +223,18 @@ public void testOtherWriteOps() throws Exception {

{
IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON);
Exception e = expectThrows(IndexNotFoundException.class, () -> client().index(indexRequest).actionGet());
assertThat(e.getMessage(), equalTo("no such index [null]"));
Exception e = expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet());
assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams"));
}
{
UpdateRequest updateRequest = new UpdateRequest(dataStreamName, "_id").doc("{}", XContentType.JSON);
Exception e = expectThrows(IndexNotFoundException.class, () -> client().update(updateRequest).actionGet());
assertThat(e.getMessage(), equalTo("no such index [null]"));
Exception e = expectThrows(IllegalArgumentException.class, () -> client().update(updateRequest).actionGet());
assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams"));
}
{
DeleteRequest deleteRequest = new DeleteRequest(dataStreamName, "_id");
Exception e = expectThrows(IndexNotFoundException.class, () -> client().delete(deleteRequest).actionGet());
assertThat(e.getMessage(), equalTo("no such index [null]"));
Exception e = expectThrows(IllegalArgumentException.class, () -> client().delete(deleteRequest).actionGet());
assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams"));
}
{
IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)
Expand Down