Skip to content

Commit

Permalink
added new ResourcePlaceholder providing "resource:type" and "resource…
Browse files Browse the repository at this point in the history
…:path"

* useful in combination with a "topic:" placeholder in order to e.g. only filter for thing "created" events

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Sep 30, 2021
1 parent a6b0ea7 commit 9698ae3
Show file tree
Hide file tree
Showing 15 changed files with 282 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.placeholders.Placeholder;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder;

public final class ConnectivityPlaceholders {
Expand Down Expand Up @@ -64,6 +65,13 @@ public static TopicPathPlaceholder newTopicPathPlaceholder() {
return TopicPathPlaceholder.getInstance();
}

/**
* @return the singleton instance of {@link ResourcePlaceholder}
*/
public static ResourcePlaceholder newResourcePlaceholder() {
return ResourcePlaceholder.getInstance();
}

/**
* @return the singleton instance of {@link ConnectionIdPlaceholder}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.WithResource;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
Expand Down Expand Up @@ -84,6 +85,7 @@
import org.eclipse.ditto.placeholders.PlaceholderResolver;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.query.criteria.Criteria;
Expand Down Expand Up @@ -125,6 +127,7 @@ public final class OutboundMappingProcessorActor

private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();

private final ThreadSafeDittoLoggingAdapter dittoLoggingAdapter;

Expand Down Expand Up @@ -645,16 +648,19 @@ private Collection<OutboundSignalWithSender> applyFilter(final OutboundSignalWit
// evaluate filter criteria again if signal enrichment is involved.
final Signal<?> signal = outboundSignalWithExtra.getSource();
final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver = PlaceholderFactory.newPlaceholderResolver(
TOPIC_PATH_PLACEHOLDER, topicPath);
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath);
final PlaceholderResolver<WithResource> resourcePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal);
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final Criteria criteria = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
topicPathPlaceholderResolver
topicPathPlaceholderResolver, resourcePlaceholderResolver
).filterCriteria(filter.get(), dittoHeaders);
return outboundSignalWithExtra.getExtra()
.flatMap(extra -> ThingEventToThingConverter
.mergeThingWithExtraFields(signal, extraFields.get(), extra)
.filter(ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver))
.filter(ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver,
resourcePlaceholderResolver))
.map(thing -> outboundSignalWithExtra))
.map(Collections::singletonList)
.orElse(List.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private Resolvers() {
}
}),
ResolverCreator.of(ConnectivityPlaceholders.newTopicPathPlaceholder(), (e, s, t, a, c) -> t),
ResolverCreator.of(ConnectivityPlaceholders.newResourcePlaceholder(), (e, s, t, a, c) -> s),
ResolverCreator.of(ConnectivityPlaceholders.newRequestPlaceholder(), (e, s, t, a, c) -> a),
ResolverCreator.of(ConnectivityPlaceholders.newConnectionIdPlaceholder(), (e, s, t, a, c) -> c)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ protected void validateTarget(final Target target, final DittoHeaders dittoHeade
ConnectivityPlaceholders.newEntityPlaceholder(),
ConnectivityPlaceholders.newThingPlaceholder(),
ConnectivityPlaceholders.newTopicPathPlaceholder(),
ConnectivityPlaceholders.newResourcePlaceholder(),
newHeadersPlaceholder(),
ConnectivityPlaceholders.newFeaturePlaceholder());
validateTargetAddress(target.getAddress(), dittoHeaders, targetDescription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newEntityPlaceholder;
import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newFeaturePlaceholder;
import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newPolicyPlaceholder;
import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newResourcePlaceholder;
import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newSourceAddressPlaceholder;
import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newThingPlaceholder;
import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newTopicPathPlaceholder;
Expand Down Expand Up @@ -113,7 +114,7 @@ protected void validateTarget(final Target target, final DittoHeaders dittoHeade

validateTargetQoS(qos.get(), dittoHeaders, targetDescription);
validateTemplate(target.getAddress(), dittoHeaders, newThingPlaceholder(), newTopicPathPlaceholder(),
newHeadersPlaceholder(), newFeaturePlaceholder());
newResourcePlaceholder(), newHeadersPlaceholder(), newFeaturePlaceholder());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.namespaces.NamespaceReader;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.WithResource;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.events.Event;
Expand All @@ -48,6 +49,7 @@
import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.query.criteria.Criteria;
Expand All @@ -68,6 +70,7 @@ public final class SignalFilter {

private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();

private final Connection connection;
private final ConnectionMonitorRegistry<ConnectionMonitor> connectionMonitorRegistry;
Expand Down Expand Up @@ -161,20 +164,22 @@ private static boolean matchesFilterBeforeEnrichment(final FilteredTopic filtere
final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver =
PlaceholderFactory.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath);
final PlaceholderResolver<WithResource> resourcePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal);
final Criteria criteria = parseCriteria(filterOptional.get(), signal.getDittoHeaders(),
topicPathPlaceholderResolver);
topicPathPlaceholderResolver, resourcePlaceholderResolver);
final Set<JsonPointer> extraFields = filteredTopic.getExtraFields()
.map(JsonFieldSelector::getPointers)
.orElse(Collections.emptySet());
if (signal instanceof ThingEvent) {
return ThingEventToThingConverter.thingEventToThing((ThingEvent<?>) signal)
.filter(thing -> Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, thing,
topicPathPlaceholderResolver))
topicPathPlaceholderResolver, resourcePlaceholderResolver))
.isPresent();
} else {
final Thing emptyThing = Thing.newBuilder().build();
return Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, emptyThing,
topicPathPlaceholderResolver);
topicPathPlaceholderResolver, resourcePlaceholderResolver);
}
} else {
return true;
Expand All @@ -186,8 +191,8 @@ private static boolean matchesFilterBeforeEnrichment(final FilteredTopic filtere
* mapped to a valid criterion
*/
private static Criteria parseCriteria(final String filter, final DittoHeaders dittoHeaders,
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver) {
return QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(), topicPathPlaceholderResolver)
final PlaceholderResolver<?>... placeholderResolvers) {
return QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(), placeholderResolvers)
.filterCriteria(filter, dittoHeaders);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
Expand Down Expand Up @@ -80,7 +81,7 @@ private ConnectionValidator(LoggingAdapter loggingAdapter,
.collect(Collectors.toMap(AbstractProtocolValidator::type, Function.identity()));
this.specMap = Collections.unmodifiableMap(theSpecMap);
queryFilterCriteriaFactory = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
TopicPathPlaceholder.getInstance());
TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,24 @@ public static Collection<Object[]> data() {

final Target filteredEventTopicPath1 = ConnectivityModelFactory.newTargetBuilder(twinAuthd)
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
.withFilter("in(topic:action,'created','modified')")
.withFilter("and(in(topic:action,'created','modified'),eq(resource:path,'/'))")
.build())
.build();
final Target filteredEventTopicPath2 = ConnectivityModelFactory.newTargetBuilder(twinAuthd)
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
.withFilter("and(eq(attributes/x,5),eq(topic:action,'modified'))")
.build())
.build();
final Target notFilteredEventTopicPath = ConnectivityModelFactory.newTargetBuilder(twinAuthd)
final Target notFilteredEventTopicPath1 = ConnectivityModelFactory.newTargetBuilder(twinAuthd)
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
.withFilter("eq(topic:action,'deleted')")
.build())
.build();
final Target notFilteredEventTopicPath2 = ConnectivityModelFactory.newTargetBuilder(twinAuthd)
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
.withFilter("ne(resource:path,'/')")
.build())
.build();

final Target filteredLiveMessageTopicPath1 = ConnectivityModelFactory.newTargetBuilder(liveAuthd)
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(LIVE_MESSAGES)
Expand Down Expand Up @@ -201,7 +206,7 @@ public static Collection<Object[]> data() {
Lists.list(enrichedFiltered, enrichedNotFiltered1, enrichedNotFiltered2),
Lists.list(enrichedFiltered)});
params.add(new Object[]{TWIN_EVENTS, readSubjects,
Lists.list(filteredEventTopicPath1, filteredEventTopicPath2, notFilteredEventTopicPath),
Lists.list(filteredEventTopicPath1, filteredEventTopicPath2, notFilteredEventTopicPath1, notFilteredEventTopicPath2),
Lists.list(filteredEventTopicPath1, filteredEventTopicPath2)});

params.add(new Object[]{LIVE_EVENTS, readSubjects, Lists.list(twinAuthd), emptyList()});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import org.eclipse.ditto.internal.utils.search.SearchSource;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
Expand Down Expand Up @@ -107,8 +107,6 @@ public final class ThingsSseRouteBuilder extends RouteDirectives implements SseR
private static final Counter THINGS_SSE_COUNTER = getCounterFor(PATH_THINGS);
private static final Counter SEARCH_SSE_COUNTER = getCounterFor(PATH_SEARCH);

private static final DittoProtocolAdapter PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();

/**
* Timeout asking the local streaming actor.
*/
Expand Down Expand Up @@ -154,7 +152,7 @@ public static ThingsSseRouteBuilder getInstance(final ActorRef streamingActor,
checkNotNull(streamingActor, "streamingActor");
final var queryFilterCriteriaFactory =
QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
TopicPathPlaceholder.getInstance());
TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance());

return new ThingsSseRouteBuilder(streamingActor, streamingConfig, queryFilterCriteriaFactory, pubSubMediator);
}
Expand Down Expand Up @@ -375,8 +373,7 @@ private CompletionStage<Collection<JsonObject>> postprocess(final SessionedJsoni
.map(session -> jsonifiable.retrieveExtraFields(facade)
.thenApply(extra ->
Optional.of(session.mergeThingWithExtra(event, extra))
.filter(thing -> session.matchesFilter(thing,
PROTOCOL_ADAPTER.toTopicPath(event)))
.filter(thing -> session.matchesFilter(thing, event))
.map(thing -> toNonemptyThingJson(thing, event, fields))
.orElseGet(Collections::emptyList)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.protocol.JsonifiableAdaptable;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;
Expand Down Expand Up @@ -267,7 +266,7 @@ public Route build(final JsonSchemaVersion version,

private CompletionStage<WebsocketConfig> retrieveWebsocketConfig() {
return Patterns.ask(streamingActor, StreamingActor.Control.RETRIEVE_WEBSOCKET_CONFIG, LOCAL_ASK_TIMEOUT)
.thenApply(reply -> (WebsocketConfig) reply); // fail future with ClassCastException on type error
.thenApply(WebsocketConfig.class::cast); // fail future with ClassCastException on type error
}

private CompletionStage<HttpResponse> createWebSocket(final WebSocketUpgrade upgradeToWebSocket,
Expand Down Expand Up @@ -671,7 +670,7 @@ private Function<SessionedJsonifiable, CompletionStage<Collection<String>>> post
final Adaptable adaptable = jsonifiableToAdaptable(jsonifiable, adapter);
final CompletionStage<JsonObject> extraFuture = sessionedJsonifiable.retrieveExtraFields(facade);
return extraFuture.<Collection<String>>thenApply(extra -> {
if (matchesFilter(sessionedJsonifiable, adaptable.getTopicPath(), extra)) {
if (matchesFilter(sessionedJsonifiable, extra)) {
return Collections.singletonList(toJsonStringWithExtra(adaptable, extra));
}
issuePotentialWeakAcknowledgements(sessionedJsonifiable);
Expand Down Expand Up @@ -742,20 +741,18 @@ private static String toJsonStringWithExtra(final Adaptable adaptable, final Jso
* Always return true for Jsonifiables without any session, e. g., errors, responses, stream control messages.
*
* @param sessionedJsonifiable the Jsonifiable with session information attached.
* @param topicPath the topic path of the Jsonifiable to process.
* @param extra extra fields from signal enrichment.
* @return whether the Jsonifiable passes filter defined in the session together with the extra fields.
*/
private static boolean matchesFilter(final SessionedJsonifiable sessionedJsonifiable,
final TopicPath topicPath,
final JsonObject extra) {
private static boolean matchesFilter(final SessionedJsonifiable sessionedJsonifiable, final JsonObject extra) {
final Jsonifiable.WithPredicate<JsonObject, JsonField> jsonifiable = sessionedJsonifiable.getJsonifiable();
return sessionedJsonifiable.getSession()
.filter(session -> jsonifiable instanceof Signal)
.map(session ->
.map(session -> {
// evaluate to false if filter is present but does not match or has insufficient info to match
session.matchesFilter(session.mergeThingWithExtra((Signal<?>) jsonifiable, extra), topicPath)
)
final Signal<?> signal = (Signal<?>) jsonifiable;
return session.matchesFilter(session.mergeThingWithExtra(signal, extra), signal);
})
.orElse(true);
}

Expand Down

0 comments on commit 9698ae3

Please sign in to comment.