diff --git a/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json b/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json index bf525456c2..b34288fff4 100644 --- a/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json +++ b/documentation/src/main/resources/jsonschema/protocol-streaming-subscription-subscribe-for-persisted-events-payload.json @@ -21,6 +21,10 @@ "type": "string", "format": "date-time", "description": "The timestamp to stop the streaming at." + }, + "filter": { + "type": "string", + "description": "An RQL expression defining which events to filter for in the stream. Only supported for thing events." } } } diff --git a/documentation/src/main/resources/pages/ditto/basic-history.md b/documentation/src/main/resources/pages/ditto/basic-history.md index fdf9c908c7..1a6dfff801 100644 --- a/documentation/src/main/resources/pages/ditto/basic-history.md +++ b/documentation/src/main/resources/pages/ditto/basic-history.md @@ -138,6 +138,27 @@ curl --http2 -u ditto:ditto -H 'Accept:text/event-stream' -N \ http://localhost:8080/api/2/things/org.eclipse.ditto:thing-2?from-historical-revision=0&fields=thingId,attributes,features,_revision,_modified,_context ``` +#### Filtering streamed historical events for things via SEE + +When streaming historical events for [things](basic-thing.html), an optional `filter` in form of an +[RQL](basic-rql.html) may be declared in order to only receive thing events matching the defined query. + +This can e.g. be useful to only stream events in which a certain feature or a certain property/attribute was included. + +In addition to the parameters selecting from/to revision or timestamp, the following parameter can be defined: +* `filter`: specifies the [RQL](basic-rql.html) filter which events to return in the stream must match + +Examples: +```bash +# stream complete history starting from earliest available revision of a thing, but only those where a feature "bamboo" was modified: +curl --http2 -u ditto:ditto -H 'Accept:text/event-stream' -N \ + http://localhost:8080/api/2/things/org.eclipse.ditto:thing-2?from-historical-revision=0&fields=thingId,attributes,features,_revision,_modified&filter=exists(features/bamboo) + +# stream specific history range of a thing based on timestamps, filtering for temperature values of a sensor being greater than 50: +curl --http2 -u ditto:ditto -H 'Accept:text/event-stream' -N \ + http://localhost:8080/api/2/things/org.eclipse.ditto:thing-2?from-historical-timestamp=2022-10-24T11:44:36Z&to-historical-timestamp=2022-10-24T11:44:37Z&fields=thingId,attributes,features,_revision,_modified&filter=gt(features/temperature/properties/value,50) +``` + ### Streaming historical events via Ditto Protocol Please inspect the [protocol specification of DittoProtocol messages for streaming persisted events](protocol-specification-streaming-subscription.html) @@ -219,6 +240,24 @@ It will do so either until all existing events were sent, in that case a `comple Or it will stop after the `demand` was fulfilled, waiting for the requester to claim more demand with a new `request` message. +#### Filtering streamed historical events for things via Ditto Protocol + +The `filter` for streaming historical thing events may also be specified via Ditto Protocol. + +Example protocol message for subscribing for the persisted events of a thing with a `filter`: +```json +{ + "topic": "org.eclipse.ditto/thing-2/things/twin/streaming/subscribeForPersistedEvents", + "path": "/", + "headers": {}, + "value": { + "fromHistoricalRevision": 1, + "toHistoricalRevision": 10, + "filter": "exists(features/bamboo)" + } +} +``` + ## Configuring historical headers to persist diff --git a/protocol/src/main/java/org/eclipse/ditto/protocol/mapper/StreamingSubscriptionCommandSignalMapper.java b/protocol/src/main/java/org/eclipse/ditto/protocol/mapper/StreamingSubscriptionCommandSignalMapper.java index 49e1e1b6e5..fddd390b77 100644 --- a/protocol/src/main/java/org/eclipse/ditto/protocol/mapper/StreamingSubscriptionCommandSignalMapper.java +++ b/protocol/src/main/java/org/eclipse/ditto/protocol/mapper/StreamingSubscriptionCommandSignalMapper.java @@ -72,6 +72,9 @@ void enhancePayloadBuilder(final T command, final PayloadBuilder payloadBuilder) subscribeCommand.getToHistoricalTimestamp().ifPresent(toTs -> payloadContentBuilder.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, toTs.toString())); + subscribeCommand.getFilter().ifPresent(filter -> + payloadContentBuilder.set(SubscribeForPersistedEvents.JsonFields.FILTER, filter) + ); } else if (command instanceof CancelStreamingSubscription) { final CancelStreamingSubscription cancelCommand = (CancelStreamingSubscription) command; payloadContentBuilder