From 9698ae38088ebc084b3d5694093880571437cda6 Mon Sep 17 00:00:00 2001 From: Thomas Jaeckle Date: Thu, 30 Sep 2021 18:17:17 +0200 Subject: [PATCH] added new ResourcePlaceholder providing "resource:type" and "resource:path" * useful in combination with a "topic:" placeholder in order to e.g. only filter for thing "created" events Signed-off-by: Thomas Jaeckle --- .../ConnectivityPlaceholders.java | 8 ++ .../OutboundMappingProcessorActor.java | 14 ++- .../service/messaging/Resolvers.java | 1 + .../messaging/httppush/HttpPushValidator.java | 1 + .../messaging/mqtt/AbstractMqttValidator.java | 3 +- .../messaging/persistence/SignalFilter.java | 15 ++- .../validation/ConnectionValidator.java | 3 +- .../persistence/SignalFilterTest.java | 11 ++- .../routes/sse/ThingsSseRouteBuilder.java | 9 +- .../routes/websocket/WebSocketRoute.java | 17 ++-- .../streaming/actors/StreamingSession.java | 23 +++-- .../actors/StreamingSessionActor.java | 3 +- .../ImmutableResourcePlaceholder.java | 82 ++++++++++++++++ .../placeholders/ResourcePlaceholder.java | 33 +++++++ .../ImmutableResourcePlaceholderTest.java | 98 +++++++++++++++++++ 15 files changed, 282 insertions(+), 39 deletions(-) create mode 100644 protocol/src/main/java/org/eclipse/ditto/protocol/placeholders/ImmutableResourcePlaceholder.java create mode 100644 protocol/src/main/java/org/eclipse/ditto/protocol/placeholders/ResourcePlaceholder.java create mode 100644 protocol/src/test/java/org/eclipse/ditto/protocol/placeholders/ImmutableResourcePlaceholderTest.java diff --git a/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/placeholders/ConnectivityPlaceholders.java b/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/placeholders/ConnectivityPlaceholders.java index 8dc28de94a..151eab6848 100644 --- a/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/placeholders/ConnectivityPlaceholders.java +++ b/connectivity/api/src/main/java/org/eclipse/ditto/connectivity/api/placeholders/ConnectivityPlaceholders.java @@ -14,6 +14,7 @@ import org.eclipse.ditto.base.model.auth.AuthorizationContext; import org.eclipse.ditto.placeholders.Placeholder; +import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder; import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder; public final class ConnectivityPlaceholders { @@ -64,6 +65,13 @@ public static TopicPathPlaceholder newTopicPathPlaceholder() { return TopicPathPlaceholder.getInstance(); } + /** + * @return the singleton instance of {@link ResourcePlaceholder} + */ + public static ResourcePlaceholder newResourcePlaceholder() { + return ResourcePlaceholder.getInstance(); + } + /** * @return the singleton instance of {@link ConnectionIdPlaceholder}. */ diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java index 8d1c07a9b7..bd587a7666 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java @@ -43,6 +43,7 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.json.JsonSchemaVersion; import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.base.model.signals.WithResource; import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; import org.eclipse.ditto.base.model.signals.acks.Acknowledgements; import org.eclipse.ditto.base.model.signals.commands.CommandResponse; @@ -84,6 +85,7 @@ import org.eclipse.ditto.placeholders.PlaceholderResolver; import org.eclipse.ditto.protocol.TopicPath; import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; +import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder; import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder; import org.eclipse.ditto.rql.parser.RqlPredicateParser; import org.eclipse.ditto.rql.query.criteria.Criteria; @@ -125,6 +127,7 @@ public final class OutboundMappingProcessorActor private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance(); private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance(); + private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance(); private final ThreadSafeDittoLoggingAdapter dittoLoggingAdapter; @@ -645,16 +648,19 @@ private Collection applyFilter(final OutboundSignalWit // evaluate filter criteria again if signal enrichment is involved. final Signal signal = outboundSignalWithExtra.getSource(); final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal); - final PlaceholderResolver topicPathPlaceholderResolver = PlaceholderFactory.newPlaceholderResolver( - TOPIC_PATH_PLACEHOLDER, topicPath); + final PlaceholderResolver topicPathPlaceholderResolver = PlaceholderFactory + .newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath); + final PlaceholderResolver resourcePlaceholderResolver = PlaceholderFactory + .newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal); final DittoHeaders dittoHeaders = signal.getDittoHeaders(); final Criteria criteria = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(), - topicPathPlaceholderResolver + topicPathPlaceholderResolver, resourcePlaceholderResolver ).filterCriteria(filter.get(), dittoHeaders); return outboundSignalWithExtra.getExtra() .flatMap(extra -> ThingEventToThingConverter .mergeThingWithExtraFields(signal, extraFields.get(), extra) - .filter(ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver)) + .filter(ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver, + resourcePlaceholderResolver)) .map(thing -> outboundSignalWithExtra)) .map(Collections::singletonList) .orElse(List.of()); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/Resolvers.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/Resolvers.java index ce63da5dd7..2de050f75a 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/Resolvers.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/Resolvers.java @@ -64,6 +64,7 @@ private Resolvers() { } }), ResolverCreator.of(ConnectivityPlaceholders.newTopicPathPlaceholder(), (e, s, t, a, c) -> t), + ResolverCreator.of(ConnectivityPlaceholders.newResourcePlaceholder(), (e, s, t, a, c) -> s), ResolverCreator.of(ConnectivityPlaceholders.newRequestPlaceholder(), (e, s, t, a, c) -> a), ResolverCreator.of(ConnectivityPlaceholders.newConnectionIdPlaceholder(), (e, s, t, a, c) -> c) ); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/httppush/HttpPushValidator.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/httppush/HttpPushValidator.java index de0c55510d..b1f5c39c00 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/httppush/HttpPushValidator.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/httppush/HttpPushValidator.java @@ -94,6 +94,7 @@ protected void validateTarget(final Target target, final DittoHeaders dittoHeade ConnectivityPlaceholders.newEntityPlaceholder(), ConnectivityPlaceholders.newThingPlaceholder(), ConnectivityPlaceholders.newTopicPathPlaceholder(), + ConnectivityPlaceholders.newResourcePlaceholder(), newHeadersPlaceholder(), ConnectivityPlaceholders.newFeaturePlaceholder()); validateTargetAddress(target.getAddress(), dittoHeaders, targetDescription); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/AbstractMqttValidator.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/AbstractMqttValidator.java index 14cf36163e..6598c426d6 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/AbstractMqttValidator.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/AbstractMqttValidator.java @@ -15,6 +15,7 @@ import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newEntityPlaceholder; import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newFeaturePlaceholder; import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newPolicyPlaceholder; +import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newResourcePlaceholder; import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newSourceAddressPlaceholder; import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newThingPlaceholder; import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newTopicPathPlaceholder; @@ -113,7 +114,7 @@ protected void validateTarget(final Target target, final DittoHeaders dittoHeade validateTargetQoS(qos.get(), dittoHeaders, targetDescription); validateTemplate(target.getAddress(), dittoHeaders, newThingPlaceholder(), newTopicPathPlaceholder(), - newHeadersPlaceholder(), newFeaturePlaceholder()); + newResourcePlaceholder(), newHeadersPlaceholder(), newFeaturePlaceholder()); } /** diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilter.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilter.java index 6a9c3595f8..dcff3f260b 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilter.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilter.java @@ -29,6 +29,7 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.namespaces.NamespaceReader; import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.base.model.signals.WithResource; import org.eclipse.ditto.base.model.signals.commands.Command; import org.eclipse.ditto.base.model.signals.commands.CommandResponse; import org.eclipse.ditto.base.model.signals.events.Event; @@ -48,6 +49,7 @@ import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement; import org.eclipse.ditto.protocol.TopicPath; import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; +import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder; import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder; import org.eclipse.ditto.rql.parser.RqlPredicateParser; import org.eclipse.ditto.rql.query.criteria.Criteria; @@ -68,6 +70,7 @@ public final class SignalFilter { private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance(); private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance(); + private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance(); private final Connection connection; private final ConnectionMonitorRegistry connectionMonitorRegistry; @@ -161,20 +164,22 @@ private static boolean matchesFilterBeforeEnrichment(final FilteredTopic filtere final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal); final PlaceholderResolver topicPathPlaceholderResolver = PlaceholderFactory.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath); + final PlaceholderResolver resourcePlaceholderResolver = PlaceholderFactory + .newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal); final Criteria criteria = parseCriteria(filterOptional.get(), signal.getDittoHeaders(), - topicPathPlaceholderResolver); + topicPathPlaceholderResolver, resourcePlaceholderResolver); final Set extraFields = filteredTopic.getExtraFields() .map(JsonFieldSelector::getPointers) .orElse(Collections.emptySet()); if (signal instanceof ThingEvent) { return ThingEventToThingConverter.thingEventToThing((ThingEvent) signal) .filter(thing -> Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, thing, - topicPathPlaceholderResolver)) + topicPathPlaceholderResolver, resourcePlaceholderResolver)) .isPresent(); } else { final Thing emptyThing = Thing.newBuilder().build(); return Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, emptyThing, - topicPathPlaceholderResolver); + topicPathPlaceholderResolver, resourcePlaceholderResolver); } } else { return true; @@ -186,8 +191,8 @@ private static boolean matchesFilterBeforeEnrichment(final FilteredTopic filtere * mapped to a valid criterion */ private static Criteria parseCriteria(final String filter, final DittoHeaders dittoHeaders, - final PlaceholderResolver topicPathPlaceholderResolver) { - return QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(), topicPathPlaceholderResolver) + final PlaceholderResolver... placeholderResolvers) { + return QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(), placeholderResolvers) .filterCriteria(filter, dittoHeaders); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/validation/ConnectionValidator.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/validation/ConnectionValidator.java index 7d8fe7a027..90756cd5e8 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/validation/ConnectionValidator.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/validation/ConnectionValidator.java @@ -53,6 +53,7 @@ import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger; import org.eclipse.ditto.placeholders.ExpressionResolver; import org.eclipse.ditto.placeholders.PlaceholderFactory; +import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder; import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder; import org.eclipse.ditto.rql.parser.RqlPredicateParser; import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory; @@ -80,7 +81,7 @@ private ConnectionValidator(LoggingAdapter loggingAdapter, .collect(Collectors.toMap(AbstractProtocolValidator::type, Function.identity())); this.specMap = Collections.unmodifiableMap(theSpecMap); queryFilterCriteriaFactory = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(), - TopicPathPlaceholder.getInstance()); + TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance()); } /** diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilterTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilterTest.java index 9cc0b0971b..55c4bd26b7 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilterTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/persistence/SignalFilterTest.java @@ -159,7 +159,7 @@ public static Collection data() { final Target filteredEventTopicPath1 = ConnectivityModelFactory.newTargetBuilder(twinAuthd) .topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS) - .withFilter("in(topic:action,'created','modified')") + .withFilter("and(in(topic:action,'created','modified'),eq(resource:path,'/'))") .build()) .build(); final Target filteredEventTopicPath2 = ConnectivityModelFactory.newTargetBuilder(twinAuthd) @@ -167,11 +167,16 @@ public static Collection data() { .withFilter("and(eq(attributes/x,5),eq(topic:action,'modified'))") .build()) .build(); - final Target notFilteredEventTopicPath = ConnectivityModelFactory.newTargetBuilder(twinAuthd) + final Target notFilteredEventTopicPath1 = ConnectivityModelFactory.newTargetBuilder(twinAuthd) .topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS) .withFilter("eq(topic:action,'deleted')") .build()) .build(); + final Target notFilteredEventTopicPath2 = ConnectivityModelFactory.newTargetBuilder(twinAuthd) + .topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS) + .withFilter("ne(resource:path,'/')") + .build()) + .build(); final Target filteredLiveMessageTopicPath1 = ConnectivityModelFactory.newTargetBuilder(liveAuthd) .topics(ConnectivityModelFactory.newFilteredTopicBuilder(LIVE_MESSAGES) @@ -201,7 +206,7 @@ public static Collection data() { Lists.list(enrichedFiltered, enrichedNotFiltered1, enrichedNotFiltered2), Lists.list(enrichedFiltered)}); params.add(new Object[]{TWIN_EVENTS, readSubjects, - Lists.list(filteredEventTopicPath1, filteredEventTopicPath2, notFilteredEventTopicPath), + Lists.list(filteredEventTopicPath1, filteredEventTopicPath2, notFilteredEventTopicPath1, notFilteredEventTopicPath2), Lists.list(filteredEventTopicPath1, filteredEventTopicPath2)}); params.add(new Object[]{LIVE_EVENTS, readSubjects, Lists.list(twinAuthd), emptyList()}); diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java index af851227e0..523dcda5f4 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.java @@ -55,7 +55,7 @@ import org.eclipse.ditto.internal.utils.search.SearchSource; import org.eclipse.ditto.json.JsonFieldSelector; import org.eclipse.ditto.json.JsonObject; -import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; +import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder; import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder; import org.eclipse.ditto.rql.parser.RqlPredicateParser; import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory; @@ -107,8 +107,6 @@ public final class ThingsSseRouteBuilder extends RouteDirectives implements SseR private static final Counter THINGS_SSE_COUNTER = getCounterFor(PATH_THINGS); private static final Counter SEARCH_SSE_COUNTER = getCounterFor(PATH_SEARCH); - private static final DittoProtocolAdapter PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance(); - /** * Timeout asking the local streaming actor. */ @@ -154,7 +152,7 @@ public static ThingsSseRouteBuilder getInstance(final ActorRef streamingActor, checkNotNull(streamingActor, "streamingActor"); final var queryFilterCriteriaFactory = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(), - TopicPathPlaceholder.getInstance()); + TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance()); return new ThingsSseRouteBuilder(streamingActor, streamingConfig, queryFilterCriteriaFactory, pubSubMediator); } @@ -375,8 +373,7 @@ private CompletionStage> postprocess(final SessionedJsoni .map(session -> jsonifiable.retrieveExtraFields(facade) .thenApply(extra -> Optional.of(session.mergeThingWithExtra(event, extra)) - .filter(thing -> session.matchesFilter(thing, - PROTOCOL_ADAPTER.toTopicPath(event))) + .filter(thing -> session.matchesFilter(thing, event)) .map(thing -> toNonemptyThingJson(thing, event, fields)) .orElseGet(Collections::emptyList) ) diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute.java index 348f0893e2..4961503e02 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute.java @@ -85,7 +85,6 @@ import org.eclipse.ditto.protocol.HeaderTranslator; import org.eclipse.ditto.protocol.JsonifiableAdaptable; import org.eclipse.ditto.protocol.ProtocolFactory; -import org.eclipse.ditto.protocol.TopicPath; import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse; @@ -267,7 +266,7 @@ public Route build(final JsonSchemaVersion version, private CompletionStage retrieveWebsocketConfig() { return Patterns.ask(streamingActor, StreamingActor.Control.RETRIEVE_WEBSOCKET_CONFIG, LOCAL_ASK_TIMEOUT) - .thenApply(reply -> (WebsocketConfig) reply); // fail future with ClassCastException on type error + .thenApply(WebsocketConfig.class::cast); // fail future with ClassCastException on type error } private CompletionStage createWebSocket(final WebSocketUpgrade upgradeToWebSocket, @@ -671,7 +670,7 @@ private Function>> post final Adaptable adaptable = jsonifiableToAdaptable(jsonifiable, adapter); final CompletionStage extraFuture = sessionedJsonifiable.retrieveExtraFields(facade); return extraFuture.>thenApply(extra -> { - if (matchesFilter(sessionedJsonifiable, adaptable.getTopicPath(), extra)) { + if (matchesFilter(sessionedJsonifiable, extra)) { return Collections.singletonList(toJsonStringWithExtra(adaptable, extra)); } issuePotentialWeakAcknowledgements(sessionedJsonifiable); @@ -742,20 +741,18 @@ private static String toJsonStringWithExtra(final Adaptable adaptable, final Jso * Always return true for Jsonifiables without any session, e. g., errors, responses, stream control messages. * * @param sessionedJsonifiable the Jsonifiable with session information attached. - * @param topicPath the topic path of the Jsonifiable to process. * @param extra extra fields from signal enrichment. * @return whether the Jsonifiable passes filter defined in the session together with the extra fields. */ - private static boolean matchesFilter(final SessionedJsonifiable sessionedJsonifiable, - final TopicPath topicPath, - final JsonObject extra) { + private static boolean matchesFilter(final SessionedJsonifiable sessionedJsonifiable, final JsonObject extra) { final Jsonifiable.WithPredicate jsonifiable = sessionedJsonifiable.getJsonifiable(); return sessionedJsonifiable.getSession() .filter(session -> jsonifiable instanceof Signal) - .map(session -> + .map(session -> { // evaluate to false if filter is present but does not match or has insufficient info to match - session.matchesFilter(session.mergeThingWithExtra((Signal) jsonifiable, extra), topicPath) - ) + final Signal signal = (Signal) jsonifiable; + return session.matchesFilter(session.mergeThingWithExtra(signal, extra), signal); + }) .orElse(true); } diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSession.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSession.java index 35f0490902..f6b4d1a6f2 100644 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSession.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSession.java @@ -22,7 +22,8 @@ import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.placeholders.PlaceholderFactory; -import org.eclipse.ditto.protocol.TopicPath; +import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; +import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder; import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder; import org.eclipse.ditto.rql.query.criteria.Criteria; import org.eclipse.ditto.rql.query.things.ThingPredicateVisitor; @@ -38,9 +39,12 @@ public final class StreamingSession { private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance(); + private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance(); + + private static final DittoProtocolAdapter PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance(); private final List namespaces; - private final BiPredicate thingPredicate; + private final BiPredicate> thingPredicate; @Nullable private final ThingFieldSelector extraFields; private final ActorRef streamingSessionActor; private final ThreadSafeDittoLoggingAdapter logger; @@ -50,9 +54,11 @@ private StreamingSession(final List namespaces, @Nullable final Criteria final ThreadSafeDittoLoggingAdapter logger) { this.namespaces = namespaces; thingPredicate = eventFilterCriteria == null - ? (thing, topicPath) -> true - : (thing, topicPath) -> ThingPredicateVisitor.apply(eventFilterCriteria, - PlaceholderFactory.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath) + ? (thing, signal) -> true + : (thing, signal) -> ThingPredicateVisitor.apply(eventFilterCriteria, + PlaceholderFactory.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, + PROTOCOL_ADAPTER.toTopicPath(signal)), + PlaceholderFactory.newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal) ) .test(thing); this.extraFields = extraFields; @@ -98,11 +104,12 @@ public Thing mergeThingWithExtra(final Signal signal, final JsonObject extra) * Test whether a thing matches the filter defined in this session. * * @param thing the thing. - * @param topicPath the topic path to include for checking whether the filter matches. + * @param signal the signal to include for checking whether the filter matches, extracting {@code topic} and + * {@code resource} information from. * @return whether the thing passes the filter. */ - public boolean matchesFilter(final Thing thing, final TopicPath topicPath) { - return thingPredicate.test(thing, topicPath); + public boolean matchesFilter(final Thing thing, final Signal signal) { + return thingPredicate.test(thing, signal); } public ActorRef getStreamingSessionActor() { diff --git a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java index 3f378e2447..e7a2a6e87a 100755 --- a/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java +++ b/gateway/service/src/main/java/org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.java @@ -68,6 +68,7 @@ import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement; import org.eclipse.ditto.protocol.HeaderTranslator; import org.eclipse.ditto.protocol.TopicPath; +import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder; import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder; import org.eclipse.ditto.rql.parser.RqlPredicateParser; import org.eclipse.ditto.rql.query.criteria.Criteria; @@ -661,7 +662,7 @@ private static String namespaceFromId(final Signal signal) { private static Criteria parseCriteria(final String filter, final DittoHeaders dittoHeaders) { final var queryFilterCriteriaFactory = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(), - TopicPathPlaceholder.getInstance()); + TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance()); return queryFilterCriteriaFactory.filterCriteria(filter, dittoHeaders); } diff --git a/protocol/src/main/java/org/eclipse/ditto/protocol/placeholders/ImmutableResourcePlaceholder.java b/protocol/src/main/java/org/eclipse/ditto/protocol/placeholders/ImmutableResourcePlaceholder.java new file mode 100644 index 0000000000..84f0ee7bac --- /dev/null +++ b/protocol/src/main/java/org/eclipse/ditto/protocol/placeholders/ImmutableResourcePlaceholder.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.protocol.placeholders; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import javax.annotation.concurrent.Immutable; + +import org.eclipse.ditto.base.model.common.ConditionChecker; +import org.eclipse.ditto.base.model.signals.WithResource; + +/** + * Placeholder implementation that replaces: + *
    + *
  • {@code resource:type} -> {@code "thing"|"policy"|"message"|"connection"}
  • + *
  • {@code resource:path} to the path of the resource included in a Signal
  • + *
+ * The input value is a WithResource. + */ +@Immutable +final class ImmutableResourcePlaceholder implements ResourcePlaceholder { + + /** + * Singleton instance of the ImmutableResourcePlaceholder. + */ + static final ImmutableResourcePlaceholder INSTANCE = new ImmutableResourcePlaceholder(); + + private static final String TYPE_PLACEHOLDER = "type"; + private static final String PATH_PLACEHOLDER = "path"; + + private static final List SUPPORTED = Collections.unmodifiableList( + Arrays.asList(TYPE_PLACEHOLDER, PATH_PLACEHOLDER)); + + private ImmutableResourcePlaceholder() { + } + + @Override + public String getPrefix() { + return "resource"; + } + + @Override + public List getSupportedNames() { + return SUPPORTED; + } + + @Override + public boolean supports(final String name) { + return SUPPORTED.contains(name); + } + + @Override + public Optional resolve(final WithResource withResource, final String placeholder) { + ConditionChecker.argumentNotEmpty(placeholder, "placeholder"); + switch (placeholder) { + case TYPE_PLACEHOLDER: + return Optional.of(withResource.getResourceType()); + case PATH_PLACEHOLDER: + return Optional.of(withResource.getResourcePath().toString()); + default: + return Optional.empty(); + } + } + + @Override + public String toString() { + return getClass().getSimpleName() + "[]"; + } +} diff --git a/protocol/src/main/java/org/eclipse/ditto/protocol/placeholders/ResourcePlaceholder.java b/protocol/src/main/java/org/eclipse/ditto/protocol/placeholders/ResourcePlaceholder.java new file mode 100644 index 0000000000..f0b36ba825 --- /dev/null +++ b/protocol/src/main/java/org/eclipse/ditto/protocol/placeholders/ResourcePlaceholder.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.protocol.placeholders; + +import org.eclipse.ditto.base.model.signals.WithResource; +import org.eclipse.ditto.placeholders.Placeholder; + +/** + * A {@link Placeholder} that requires the {@link WithResource} to resolve its placeholders. + * + * @since 2.2.0 + */ +public interface ResourcePlaceholder extends Placeholder { + + /** + * Returns the singleton instance of the {@link ResourcePlaceholder}. + * + * @return the singleton instance. + */ + static ResourcePlaceholder getInstance() { + return ImmutableResourcePlaceholder.INSTANCE; + } +} diff --git a/protocol/src/test/java/org/eclipse/ditto/protocol/placeholders/ImmutableResourcePlaceholderTest.java b/protocol/src/test/java/org/eclipse/ditto/protocol/placeholders/ImmutableResourcePlaceholderTest.java new file mode 100644 index 0000000000..08e1ffc32f --- /dev/null +++ b/protocol/src/test/java/org/eclipse/ditto/protocol/placeholders/ImmutableResourcePlaceholderTest.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.protocol.placeholders; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.eclipse.ditto.base.model.signals.WithResource; +import org.eclipse.ditto.json.JsonPointer; +import org.junit.Test; +import org.mutabilitydetector.unittesting.MutabilityAssert; +import org.mutabilitydetector.unittesting.MutabilityMatchers; + +import nl.jqno.equalsverifier.EqualsVerifier; +import nl.jqno.equalsverifier.Warning; + +/** + * Tests {@link org.eclipse.ditto.protocol.placeholders.ImmutableResourcePlaceholder}. + */ +public final class ImmutableResourcePlaceholderTest { + + private static final JsonPointer KNOWN_JSON_POINTER_ROOT = JsonPointer.of("/"); + private static final String KNOWN_RESOURCE_TYPE_POLICY = "policy"; + private static final WithResource KNOWN_WITH_RESOURCE_ROOT = new WithResource() { + @Override + public JsonPointer getResourcePath() { + return KNOWN_JSON_POINTER_ROOT; + } + + @Override + public String getResourceType() { + return KNOWN_RESOURCE_TYPE_POLICY; + } + }; + + private static final JsonPointer KNOWN_JSON_POINTER_ATTRIBUTE = JsonPointer.of("/attributes/test"); + private static final String KNOWN_RESOURCE_TYPE_THING = "thing"; + private static final WithResource KNOWN_WITH_RESOURCE_ATTRIBUTE = new WithResource() { + @Override + public JsonPointer getResourcePath() { + return KNOWN_JSON_POINTER_ATTRIBUTE; + } + + @Override + public String getResourceType() { + return KNOWN_RESOURCE_TYPE_THING; + } + }; + + private static final ImmutableResourcePlaceholder UNDER_TEST = ImmutableResourcePlaceholder.INSTANCE; + + @Test + public void assertImmutability() { + MutabilityAssert.assertInstancesOf(ImmutableResourcePlaceholder.class, MutabilityMatchers.areImmutable()); + } + + @Test + public void testHashCodeAndEquals() { + EqualsVerifier.forClass(ImmutableResourcePlaceholder.class) + .suppress(Warning.INHERITED_DIRECTLY_FROM_OBJECT) + .usingGetClass() + .verify(); + } + + @Test + public void testReplaceTypePolicy() { + assertThat(UNDER_TEST.resolve(KNOWN_WITH_RESOURCE_ROOT, "type")) + .contains(KNOWN_RESOURCE_TYPE_POLICY); + } + + @Test + public void testReplacePath() { + assertThat(UNDER_TEST.resolve(KNOWN_WITH_RESOURCE_ROOT, "path")) + .contains(KNOWN_JSON_POINTER_ROOT.toString()); + } + + @Test + public void testReplaceTypeThing() { + assertThat(UNDER_TEST.resolve(KNOWN_WITH_RESOURCE_ATTRIBUTE, "type")) + .contains(KNOWN_RESOURCE_TYPE_THING); + } + + @Test + public void testReplacePathThing() { + assertThat(UNDER_TEST.resolve(KNOWN_WITH_RESOURCE_ATTRIBUTE, "path")) + .contains(KNOWN_JSON_POINTER_ATTRIBUTE.toString()); + } + +}