diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index c6c7fbf395247..eaf7824d8c2da 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -89,6 +89,7 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyMap; +import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -590,7 +591,7 @@ private boolean addFailureIfIndexIsUnavailable(DocWriteRequest request, int i if (concreteIndex == null) { try { concreteIndex = concreteIndices.resolveIfAbsent(request); - } catch (IndexClosedException | IndexNotFoundException ex) { + } catch (IndexClosedException | IndexNotFoundException | IllegalArgumentException ex) { addFailure(request, idx, ex); return true; } @@ -636,8 +637,16 @@ Index resolveIfAbsent(DocWriteRequest request) { Index concreteIndex = indices.get(request.index()); if (concreteIndex == null) { boolean includeDataStreams = request.opType() == DocWriteRequest.OpType.CREATE; - concreteIndex = indexNameExpressionResolver.concreteWriteIndex(state, request.indicesOptions(), request.indices()[0], - false, includeDataStreams); + try { + concreteIndex = indexNameExpressionResolver.concreteWriteIndex(state, request.indicesOptions(), + request.indices()[0], false, includeDataStreams); + } catch (IndexNotFoundException e) { + if (includeDataStreams == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) { + throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); + } else { + throw e; + } + } indices.put(request.index(), concreteIndex); } return concreteIndex; diff --git a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index 75b9402cd1342..1eb767f3ccecd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -50,6 +51,8 @@ import java.io.IOException; +import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY; + public abstract class TransportInstanceSingleOperationAction< Request extends InstanceShardOperationRequest, Response extends ActionResponse @@ -143,7 +146,15 @@ protected void doStart(ClusterState clusterState) { throw blockException; } } - request.concreteIndex(indexNameExpressionResolver.concreteWriteIndex(clusterState, request).getName()); + try { + request.concreteIndex(indexNameExpressionResolver.concreteWriteIndex(clusterState, request).getName()); + } catch (IndexNotFoundException e) { + if (request.includeDataStreams() == false && e.getMetadataKeys().contains(EXCLUDED_DATA_STREAMS_KEY)) { + throw new IllegalArgumentException("only write ops with an op_type of create are allowed in data streams"); + } else { + throw e; + } + } resolveRequest(clusterState, request); blockException = checkRequestBlock(clusterState, request); if (blockException != null) { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java index 6b1b1db0ef792..67b891002a653 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java @@ -60,6 +60,8 @@ public class IndexNameExpressionResolver { + public static final String EXCLUDED_DATA_STREAMS_KEY = "es.excluded_ds"; + private final DateMathExpressionResolver dateMathExpressionResolver = new DateMathExpressionResolver(); private final WildcardExpressionResolver wildcardExpressionResolver = new WildcardExpressionResolver(); private final List expressionResolvers = List.of(dateMathExpressionResolver, wildcardExpressionResolver); @@ -207,6 +209,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) { } } + boolean excludedDataStreams = false; final Set concreteIndices = new HashSet<>(expressions.size()); for (String expression : expressions) { IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(expression); @@ -231,6 +234,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) { } } else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM && context.includeDataStreams() == false) { + excludedDataStreams = true; continue; } @@ -272,6 +276,10 @@ Index[] concreteIndices(Context context, String... indexExpressions) { if (options.allowNoIndices() == false && concreteIndices.isEmpty()) { IndexNotFoundException infe = new IndexNotFoundException((String)null); infe.setResources("index_expression", indexExpressions); + if (excludedDataStreams) { + // Allows callers to handle IndexNotFoundException differently based on whether data streams were excluded. + infe.addMetadata(EXCLUDED_DATA_STREAMS_KEY, "true"); + } throw infe; } return concreteIndices.toArray(new Index[concreteIndices.size()]); diff --git a/x-pack/plugin/data-streams/qa/rest/src/test/resources/rest-api-spec/test/data-streams/20_unsupported_apis.yml b/x-pack/plugin/data-streams/qa/rest/src/test/resources/rest-api-spec/test/data-streams/20_unsupported_apis.yml index e2b18d1ce08c8..06a800b061fd9 100644 --- a/x-pack/plugin/data-streams/qa/rest/src/test/resources/rest-api-spec/test/data-streams/20_unsupported_apis.yml +++ b/x-pack/plugin/data-streams/qa/rest/src/test/resources/rest-api-spec/test/data-streams/20_unsupported_apis.yml @@ -196,3 +196,50 @@ indices.delete_data_stream: name: simple-data-stream1 - is_true: acknowledged + +--- +"Non append-only writes into a data stream": + - skip: + version: " - 7.8.99" + reason: "data streams only supported in 7.9+" + features: allowed_warnings + + - do: + allowed_warnings: + - "index template [my-template1] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation" + indices.put_index_template: + name: my-template1 + body: + index_patterns: [logs-*] + data_stream: {} + + - do: + catch: bad_request + index: + index: logs-foobar + id: "1" + body: + '@timestamp': '2020-12-12' + + - do: + bulk: + body: + - index: + _index: logs-foobar + _id: "1" + - '@timestamp': '2020-12-12' + - create: + _index: logs-foobar + _id: "1" + - '@timestamp': '2020-12-12' + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: illegal_argument_exception } + - match: { items.0.index.error.reason: "only write ops with an op_type of create are allowed in data streams" } + - match: { items.1.create.result: created } + - match: { items.1.create._index: .ds-logs-foobar-000001 } + + - do: + indices.delete_data_stream: + name: logs-foobar + - is_true: acknowledged diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 69858e81d4463..1d15b20fd571e 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -223,18 +223,18 @@ public void testOtherWriteOps() throws Exception { { IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON); - Exception e = expectThrows(IndexNotFoundException.class, () -> client().index(indexRequest).actionGet()); - assertThat(e.getMessage(), equalTo("no such index [null]")); + Exception e = expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet()); + assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams")); } { UpdateRequest updateRequest = new UpdateRequest(dataStreamName, "_id").doc("{}", XContentType.JSON); - Exception e = expectThrows(IndexNotFoundException.class, () -> client().update(updateRequest).actionGet()); - assertThat(e.getMessage(), equalTo("no such index [null]")); + Exception e = expectThrows(IllegalArgumentException.class, () -> client().update(updateRequest).actionGet()); + assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams")); } { DeleteRequest deleteRequest = new DeleteRequest(dataStreamName, "_id"); - Exception e = expectThrows(IndexNotFoundException.class, () -> client().delete(deleteRequest).actionGet()); - assertThat(e.getMessage(), equalTo("no such index [null]")); + Exception e = expectThrows(IllegalArgumentException.class, () -> client().delete(deleteRequest).actionGet()); + assertThat(e.getMessage(), equalTo("only write ops with an op_type of create are allowed in data streams")); } { IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{\"@timestamp\": \"2020-12-12\"}", XContentType.JSON)