Skip to content

Commit

Permalink
#1931: fix that not all placeholders were supported in connection tar…
Browse files Browse the repository at this point in the history
…get filtering
  • Loading branch information
thjaeckle committed May 14, 2024
1 parent 4da8acc commit d98cfa6
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingJsonPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.pekko.controlflow.AbstractGraphActor;
Expand All @@ -99,6 +102,7 @@
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.HeadersPlaceholder;
import org.eclipse.ditto.placeholders.PipelineElement;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.PlaceholderResolver;
Expand Down Expand Up @@ -138,9 +142,13 @@ public final class OutboundMappingProcessorActor

private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
private static final EntityIdPlaceholder ENTITY_ID_PLACEHOLDER = EntityIdPlaceholder.getInstance();
private static final ThingPlaceholder THING_PLACEHOLDER = ThingPlaceholder.getInstance();
private static final FeaturePlaceholder FEATURE_PLACEHOLDER = FeaturePlaceholder.getInstance();
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();
private static final ThingJsonPlaceholder THING_JSON_PLACEHOLDER = ThingJsonPlaceholder.getInstance();
private static final HeadersPlaceholder HEADERS_PLACEHOLDER = PlaceholderFactory.newHeadersPlaceholder();

private final ActorRef clientActor;
private final Connection connection;
Expand Down Expand Up @@ -787,22 +795,33 @@ private Collection<OutboundSignalWithSender> applyFilter(final OutboundSignalWit
final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath);
final PlaceholderResolver<EntityId> entityIdPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(ENTITY_ID_PLACEHOLDER,
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
final PlaceholderResolver<EntityId> thingPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(THING_PLACEHOLDER,
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
final PlaceholderResolver<Signal<?>> featurePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(FEATURE_PLACEHOLDER, signal);
final PlaceholderResolver<WithResource> resourcePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal);
final PlaceholderResolver<Object> timePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(TIME_PLACEHOLDER, new Object());
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final Criteria criteria = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver
).filterCriteria(filter.get(), dittoHeaders);
return outboundSignalWithExtra.getExtra()
.flatMap(extra -> ThingEventToThingConverter
.mergeThingWithExtraFields(signal, extraFields.get(), extra)
.filter(thing -> {
final PlaceholderResolver<Thing> thingPlaceholderResolver = PlaceholderFactory
final PlaceholderResolver<Thing> thingJsonPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(THING_JSON_PLACEHOLDER, thing);
return ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver,
resourcePlaceholderResolver, timePlaceholderResolver, thingPlaceholderResolver)
entityIdPlaceholderResolver, thingPlaceholderResolver,
featurePlaceholderResolver, resourcePlaceholderResolver,
timePlaceholderResolver, thingJsonPlaceholderResolver)
.test(thing);
})
.map(thing -> outboundSignalWithExtra))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.namespaces.NamespaceReader;
Expand All @@ -39,6 +40,9 @@
import org.eclipse.ditto.connectivity.model.signals.announcements.ConnectivityAnnouncement;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitorRegistry;
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
Expand Down Expand Up @@ -70,6 +74,9 @@ public final class SignalFilter {

private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance();
private static final TopicPathPlaceholder TOPIC_PATH_PLACEHOLDER = TopicPathPlaceholder.getInstance();
private static final EntityIdPlaceholder ENTITY_ID_PLACEHOLDER = EntityIdPlaceholder.getInstance();
private static final ThingPlaceholder THING_PLACEHOLDER = ThingPlaceholder.getInstance();
private static final FeaturePlaceholder FEATURE_PLACEHOLDER = FeaturePlaceholder.getInstance();
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();

Expand Down Expand Up @@ -165,24 +172,35 @@ private static boolean matchesFilterBeforeEnrichment(final FilteredTopic filtere
final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
final PlaceholderResolver<TopicPath> topicPathPlaceholderResolver =
PlaceholderFactory.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath);
final PlaceholderResolver<EntityId> entityIdPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(ENTITY_ID_PLACEHOLDER,
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
final PlaceholderResolver<EntityId> thingPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(THING_PLACEHOLDER,
(signal instanceof WithEntityId withEntityId) ? withEntityId.getEntityId() : null);
final PlaceholderResolver<Signal<?>> featurePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(FEATURE_PLACEHOLDER, signal);
final PlaceholderResolver<WithResource> resourcePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(RESOURCE_PLACEHOLDER, signal);
final PlaceholderResolver<Object> timePlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(TIME_PLACEHOLDER, new Object());
final Criteria criteria = parseCriteria(filterOptional.get(), signal.getDittoHeaders(),
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
final Set<JsonPointer> extraFields = filteredTopic.getExtraFields()
.map(JsonFieldSelector::getPointers)
.orElse(Collections.emptySet());
if (signal instanceof ThingEvent) {
return ThingEventToThingConverter.thingEventToThing((ThingEvent<?>) signal)
.filter(thing -> Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, thing,
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver))
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver))
.isPresent();
} else {
final Thing emptyThing = Thing.newBuilder().build();
return Thing3ValuePredicateVisitor.couldBeTrue(criteria, extraFields, emptyThing,
topicPathPlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver);
}
} else {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@

import javax.annotation.concurrent.Immutable;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.LoggingAdapter;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelInvalidException;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.connectivity.model.ClientCertificateCredentials;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
Expand All @@ -52,6 +53,10 @@
import org.eclipse.ditto.connectivity.service.config.mapping.MapperLimitsConfig;
import org.eclipse.ditto.connectivity.service.messaging.internal.ssl.SSLContextCreator;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
import org.eclipse.ditto.edge.service.placeholders.FeaturePlaceholder;
import org.eclipse.ditto.edge.service.placeholders.ThingPlaceholder;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.TimePlaceholder;
Expand All @@ -60,9 +65,6 @@
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.LoggingAdapter;

/**
* Validate a connection according to its type.
*/
Expand All @@ -83,7 +85,8 @@ private ConnectionValidator(LoggingAdapter loggingAdapter,
.collect(Collectors.toMap(AbstractProtocolValidator::type, Function.identity()));
this.specMap = Collections.unmodifiableMap(theSpecMap);
queryFilterCriteriaFactory = QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(),
TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance(), TimePlaceholder.getInstance());
TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance(), TimePlaceholder.getInstance(),
EntityIdPlaceholder.getInstance(), ThingPlaceholder.getInstance(), FeaturePlaceholder.getInstance());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThing;
import org.eclipse.ditto.things.model.signals.events.FeatureDesiredPropertiesModified;
import org.eclipse.ditto.things.model.signals.events.FeaturePropertiesModified;
import org.eclipse.ditto.things.model.signals.events.ThingModified;
import org.eclipse.ditto.things.model.signals.events.ThingModifiedEvent;
import org.mockito.Mockito;
Expand Down Expand Up @@ -331,6 +332,8 @@ public static final class Things {
public static final class Feature {

public static final String FEATURE_ID = "Feature";
public static final FeatureProperties FEATURE_PROPERTIES = FeatureProperties.newBuilder()
.set("property", "test").build();
public static final FeatureProperties FEATURE_DESIRED_PROPERTIES = FeatureProperties.newBuilder()
.set("property", "test").build();

Expand Down Expand Up @@ -996,8 +999,14 @@ public static ThingModifiedEvent<?> thingModified(final Collection<Authorization
TestConstants.INSTANT, dittoHeaders, TestConstants.METADATA);
}

public static ThingModifiedEvent<?> featurePropertiesModified(Collection<AuthorizationSubject> readSubjects) {
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
return FeaturePropertiesModified.of(Things.THING_ID, Feature.FEATURE_ID,
Feature.FEATURE_PROPERTIES, 1, null, dittoHeaders, null);
}

public static ThingModifiedEvent<?> featureDesiredPropertiesModified(Collection<AuthorizationSubject> readSubjects) {
DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readGrantedSubjects(readSubjects).build();
return FeatureDesiredPropertiesModified.of(Things.THING_ID, Feature.FEATURE_ID,
Feature.FEATURE_DESIRED_PROPERTIES, 1, null, dittoHeaders, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,16 +313,36 @@ public void applySignalFilterForMessagesWithExtraFieldsAndRqlFilter() {
*/
@Test
public void applySignalFilterOnFeatureDesiredPropertiesModified() {
Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
final Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
.authorizationContext(newAuthContext(DittoAuthorizationContextType.UNSPECIFIED, AUTHORIZED))
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
.withFilter("like(resource:path,'/features/" + TestConstants.Feature.FEATURE_ID + "*')")
.build()).build();
Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
Signal<?> signal = TestConstants.featureDesiredPropertiesModified(Collections.singletonList(AUTHORIZED));
final Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
final SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
final Signal<?> signal = TestConstants.featureDesiredPropertiesModified(Collections.singletonList(AUTHORIZED));

final List<Target> filteredTargets = signalFilter.filter(signal);
Assertions.assertThat(filteredTargets).hasSize(1).contains(target);
}

/**
* Test that target filtering works using feature:id placeholder
*/
@Test
public void applySignalFilterWithFeatureIdPlaceholder() {
Target target = ConnectivityModelFactory.newTargetBuilder().address("address")
.authorizationContext(newAuthContext(DittoAuthorizationContextType.UNSPECIFIED, AUTHORIZED))
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(TWIN_EVENTS)
.withFilter("eq(feature:id,'Feature')")
.build()
)
.build();
final Connection connection = TestConstants.createConnection(CONNECTION_ID, target);
final SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
final Signal<?> signal = TestConstants.featurePropertiesModified(Collections.singletonList(AUTHORIZED));

List<Target> filteredTargets = signalFilter.filter(signal);
final List<Target> filteredTargets = signalFilter.filter(signal);
Assertions.assertThat(filteredTargets).hasSize(1).contains(target);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.http.javadsl.model.Uri;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelInvalidException;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
Expand Down Expand Up @@ -75,11 +79,6 @@
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.http.javadsl.model.Uri;
import org.apache.pekko.testkit.javadsl.TestKit;

/**
* Tests {@link ConnectionValidator}.
*/
Expand Down Expand Up @@ -459,6 +458,22 @@ private static Connection createConnection(final ConnectionId connectionId) {
.build();
}

@Test
public void acceptValidConnectionWithValidTargetFilterContainingPlaceholders() {
final List<Target> targetWithValidFilter = singletonList(
ConnectivityModelFactory.newTargetBuilder(TestConstants.Targets.TWIN_TARGET)
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS)
.withFilter("and(exists(feature:id),eq(thing:namespace,'org.eclipse.ditto'))")
.build())
.build());
final Connection connection = createConnection(CONNECTION_ID)
.toBuilder()
.setTargets(targetWithValidFilter)
.build();
final ConnectionValidator underTest = getConnectionValidator();
underTest.validate(connection, DittoHeaders.empty(), actorSystem);
}

@Test
public void acceptValidConnectionWithValidNumberPayloadMapping() {
final Connection connection = createConnection(CONNECTION_ID)
Expand Down

0 comments on commit d98cfa6

Please sign in to comment.