Skip to content

Commit

Permalink
#1692 fix filtering of live message with empty resolved extraFields
Browse files Browse the repository at this point in the history
  • Loading branch information
thjaeckle committed Jul 17, 2023
1 parent 3a0419a commit b889786
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.eclipse.ditto.base.model.auth.AuthorizationModelFactory.newAuthContext;
import static org.eclipse.ditto.base.model.auth.AuthorizationModelFactory.newAuthSubject;
import static org.eclipse.ditto.connectivity.model.Topic.LIVE_EVENTS;
import static org.eclipse.ditto.connectivity.model.Topic.LIVE_MESSAGES;
import static org.eclipse.ditto.connectivity.model.Topic.TWIN_EVENTS;

import java.time.Instant;
Expand All @@ -24,24 +25,29 @@
import java.util.List;

import org.assertj.core.api.Assertions;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitorRegistry;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.HeaderMapping;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.connectivity.service.messaging.TestConstants;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitorRegistry;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.messages.model.Message;
import org.eclipse.ditto.messages.model.MessageDirection;
import org.eclipse.ditto.messages.model.MessageHeaders;
import org.eclipse.ditto.messages.model.signals.commands.SendThingMessage;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingFieldSelector;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.connectivity.service.messaging.TestConstants;
import org.eclipse.ditto.things.model.signals.events.ThingModified;
import org.junit.Test;

Expand Down Expand Up @@ -255,6 +261,53 @@ public void applySignalFilterWithNamespacesAndRqlFilter() {
targetD); // THEN: only targetA and targetD should be in the filtered targets
}

@Test
public void applySignalFilterForMessagesWithExtraFieldsAndRqlFilter() {

// targetA does filter for resource path "/inbox/messages/fubar"
final String filterA = "eq(resource:path,'/inbox/messages/fubar')";
final Target targetA = ConnectivityModelFactory.newTargetBuilder()
.address("message/a")
.authorizationContext(newAuthContext(DittoAuthorizationContextType.UNSPECIFIED, AUTHORIZED))
.headerMapping(HEADER_MAPPING)
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(LIVE_MESSAGES)
.withExtraFields(ThingFieldSelector.fromString("attributes"))
.withFilter(filterA)
.build())
.build();

// targetB does filter for resource path "/inbox/messages/booo"
final String filterB = "eq(resource:path,'/inbox/messages/booo')";
final Target targetB = ConnectivityModelFactory.newTargetBuilder()
.address("message/a")
.authorizationContext(newAuthContext(DittoAuthorizationContextType.UNSPECIFIED, AUTHORIZED))
.headerMapping(HEADER_MAPPING)
.topics(ConnectivityModelFactory.newFilteredTopicBuilder(LIVE_MESSAGES)
.withExtraFields(ThingFieldSelector.fromString("attributes"))
.withFilter(filterB)
.build())
.build();

final Connection connection =
ConnectivityModelFactory.newConnectionBuilder(CONNECTION_ID, ConnectionType.AMQP_10,
ConnectivityStatus.OPEN, URI)
.targets(Arrays.asList(targetA, targetB))
.build();

final DittoHeaders headers = DittoHeaders.newBuilder()
.channel(TopicPath.Channel.LIVE.getName())
.readGrantedSubjects(Collections.singletonList(AUTHORIZED))
.build();
final SendThingMessage<Object> sendThingMessage = SendThingMessage.of(THING_ID,
Message.newBuilder(MessageHeaders.newBuilder(MessageDirection.TO, THING_ID, "fubar").build())
.build(), headers);

final SignalFilter signalFilter = new SignalFilter(connection, connectionMonitorRegistry);
final List<Target> filteredTargets = signalFilter.filter(sendThingMessage);

assertThat(filteredTargets).containsOnly(targetA); // THEN: only targetA should be in the filtered targets
}

/**
* Test that target filtering works also for desired properties events. Issue #1599
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,8 @@ public static Optional<Thing> mergeThingWithExtraFields(final Signal<?> signal,
final Thing baseThing = thingFromSignal.get();
final JsonObject baseThingJson = baseThing.toJson(baseThing.getImplementedSchemaVersion());
thing = ThingsModelFactory.newThing(JsonFactory.newObject(baseThingJson, extra));
} else if (thingFromSignal.isPresent()) {
thing = thingFromSignal.get();
} else if (hasExtra) {
thing = ThingsModelFactory.newThing(extra);
} else {
// no information; there is no thing.
return Optional.empty();
thing = thingFromSignal.orElseGet(() -> ThingsModelFactory.newThing(extra));
}

if (signal instanceof EventsourcedEvent) {
Expand Down

0 comments on commit b889786

Please sign in to comment.