diff --git a/docs/changelog/137992.yaml b/docs/changelog/137992.yaml new file mode 100644 index 0000000000000..f23e7e9d1efe3 --- /dev/null +++ b/docs/changelog/137992.yaml @@ -0,0 +1,5 @@ +pr: 137992 +summary: Prevent passing a pipeline to a logs stream bulk index request body +area: Data streams +type: bug +issues: [] diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/40_index_request_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/40_index_request_restrictions.yml new file mode 100644 index 0000000000000..2df4311c1b37b --- /dev/null +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/40_index_request_restrictions.yml @@ -0,0 +1,25 @@ +--- +teardown: + - do: + streams.logs_disable: { } + +--- +"Check User Can't Provide Pipeline to Logs Stream": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + bulk: + index: logs + body: | + { "create": {"pipeline": "noop"}} + { "message": "hello streams!"} + - match: { errors: true } + - match: { items.0.create.status: 400 } + - match: { items.0.create.error.type: "illegal_argument_exception" } + - match: { items.0.create.error.reason: "Cannot provide a pipeline when writing to a stream however the [noop] pipeline was provided when writing to the [logs] stream" } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index c1b1cbf01741a..41da0aec941b9 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -476,7 +476,16 @@ private void doStreamsChecks( + "] stream instead" ); } - + if (e == null && streamType.getStreamName().equals(ir.index()) && ir.getPipeline() != null) { + e = new IllegalArgumentException( + "Cannot provide a pipeline when writing to a stream " + + "however the [" + + ir.getPipeline() + + "] pipeline was provided when writing to the [" + + streamType.getStreamName() + + "] stream" + ); + } if (e == null && streamsRestrictedParamsUsed(bulkRequest) && req.index().equals(streamType.getStreamName())) { e = new IllegalArgumentException( "When writing to a stream, only the following parameters are allowed: ["