Skip to content

Commit

Permalink
#1583 apply RQL based filtering when streaming "historical" thing events
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Jan 5, 2024
1 parent 7a507f6 commit 914493f
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public final class SubscribeForPersistedEvents extends AbstractStreamingSubscrip
@Nullable private final Instant fromHistoricalTimestamp;
@Nullable private final Instant toHistoricalTimestamp;
@Nullable private final String prefix;
@Nullable private final String filter;

private SubscribeForPersistedEvents(final EntityId entityId,
final JsonPointer resourcePath,
Expand All @@ -69,6 +70,7 @@ private SubscribeForPersistedEvents(final EntityId entityId,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
@Nullable final String prefix,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

super(TYPE, entityId, resourcePath, dittoHeaders);
Expand All @@ -77,6 +79,7 @@ private SubscribeForPersistedEvents(final EntityId entityId,
this.fromHistoricalTimestamp = fromHistoricalTimestamp;
this.toHistoricalTimestamp = toHistoricalTimestamp;
this.prefix = prefix;
this.filter = filter != null ? filter.toString() : null;
}

/**
Expand All @@ -89,7 +92,9 @@ private SubscribeForPersistedEvents(final EntityId entityId,
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, long, long, CharSequence, DittoHeaders)}
*/
@Deprecated
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
final long fromHistoricalRevision,
Expand All @@ -103,6 +108,38 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
null,
null,
null,
null,
dittoHeaders);
}

/**
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code long} revisions.
*
* @param entityId the entityId that should be streamed.
* @param resourcePath the resource path for which to stream events.
* @param fromHistoricalRevision the revision to start the streaming from.
* @param toHistoricalRevision the revision to stop the streaming at.
* @param dittoHeaders the command headers of the request.
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 3.5.0
*/
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
final long fromHistoricalRevision,
final long toHistoricalRevision,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
resourcePath,
fromHistoricalRevision,
toHistoricalRevision,
null,
null,
null,
filter,
dittoHeaders);
}

Expand All @@ -116,7 +153,9 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, Instant, Instant, CharSequence, DittoHeaders)}
*/
@Deprecated
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Instant fromHistoricalTimestamp,
Expand All @@ -130,6 +169,72 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
null,
dittoHeaders);
}

/**
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code Instant} timestamps.
*
* @param entityId the entityId that should be streamed.
* @param resourcePath the resource path for which to stream events.
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
* @param dittoHeaders the command headers of the request.
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 3.5.0
*/
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
resourcePath,
0L,
Long.MAX_VALUE,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
filter,
dittoHeaders);
}

/**
* Creates a new {@code SudoStreamSnapshots} command based on "from" and "to" {@code Instant} timestamps.
*
* @param entityId the entityId that should be streamed.
* @param resourcePath the resource path for which to stream events.
* @param fromHistoricalRevision the revision to start the streaming from.
* @param toHistoricalRevision the revision to stop the streaming at.
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @deprecated since 3.5.0, use {@link #of(EntityId, JsonPointer, Long, Long, Instant, Instant, CharSequence, DittoHeaders)}
*/
@Deprecated
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Long fromHistoricalRevision,
@Nullable final Long toHistoricalRevision,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
resourcePath,
null != fromHistoricalRevision ? fromHistoricalRevision : 0L,
null != toHistoricalRevision ? toHistoricalRevision : Long.MAX_VALUE,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
null,
dittoHeaders);
}

Expand All @@ -142,16 +247,19 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
* @param toHistoricalRevision the revision to stop the streaming at.
* @param fromHistoricalTimestamp the timestamp to start the streaming from.
* @param toHistoricalTimestamp the timestamp to stop the streaming at.
* @param filter the optional RQL filter to apply for persisted events before publishing to the stream
* @param dittoHeaders the command headers of the request.
* @return the command.
* @throws NullPointerException if any non-nullable argument is {@code null}.
* @since 3.5.0
*/
public static SubscribeForPersistedEvents of(final EntityId entityId,
final JsonPointer resourcePath,
@Nullable final Long fromHistoricalRevision,
@Nullable final Long toHistoricalRevision,
@Nullable final Instant fromHistoricalTimestamp,
@Nullable final Instant toHistoricalTimestamp,
@Nullable final CharSequence filter,
final DittoHeaders dittoHeaders) {

return new SubscribeForPersistedEvents(entityId,
Expand All @@ -161,6 +269,7 @@ public static SubscribeForPersistedEvents of(final EntityId entityId,
fromHistoricalTimestamp,
toHistoricalTimestamp,
null,
filter,
dittoHeaders);
}

Expand All @@ -182,6 +291,7 @@ public static SubscribeForPersistedEvents fromJson(final JsonObject jsonObject,
jsonObject.getValue(JsonFields.JSON_FROM_HISTORICAL_TIMESTAMP).map(Instant::parse).orElse(null),
jsonObject.getValue(JsonFields.JSON_TO_HISTORICAL_TIMESTAMP).map(Instant::parse).orElse(null),
jsonObject.getValue(JsonFields.PREFIX).orElse(null),
jsonObject.getValue(JsonFields.FILTER).orElse(null),
dittoHeaders
);
}
Expand All @@ -195,7 +305,7 @@ public static SubscribeForPersistedEvents fromJson(final JsonObject jsonObject,
*/
public SubscribeForPersistedEvents setPrefix(@Nullable final String prefix) {
return new SubscribeForPersistedEvents(entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, getDittoHeaders());
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter, getDittoHeaders());
}

/**
Expand Down Expand Up @@ -244,6 +354,14 @@ public Optional<String> getPrefix() {
return Optional.ofNullable(prefix);
}

/**
* @return the optional RQL filter to apply for persisted events before publishing to the stream
* @since 3.5.0
*/
public Optional<String> getFilter() {
return Optional.ofNullable(filter);
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
final JsonSchemaVersion schemaVersion,
Expand All @@ -263,6 +381,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
jsonObjectBuilder.set(JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, toHistoricalTimestamp.toString(), predicate);
}
getPrefix().ifPresent(thePrefix -> jsonObjectBuilder.set(JsonFields.PREFIX, thePrefix));
getFilter().ifPresent(theFilter -> jsonObjectBuilder.set(JsonFields.FILTER, theFilter));
}

@Override
Expand All @@ -273,13 +392,13 @@ public String getTypePrefix() {
@Override
public SubscribeForPersistedEvents setDittoHeaders(final DittoHeaders dittoHeaders) {
return new SubscribeForPersistedEvents(entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, dittoHeaders);
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter, dittoHeaders);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), entityId, resourcePath, fromHistoricalRevision, toHistoricalRevision,
fromHistoricalTimestamp, toHistoricalTimestamp, prefix);
fromHistoricalTimestamp, toHistoricalTimestamp, prefix, filter);
}

@Override
Expand All @@ -297,7 +416,8 @@ public boolean equals(@Nullable final Object obj) {
toHistoricalRevision == that.toHistoricalRevision &&
Objects.equals(fromHistoricalTimestamp, that.fromHistoricalTimestamp) &&
Objects.equals(toHistoricalTimestamp, that.toHistoricalTimestamp) &&
Objects.equals(prefix, that.prefix);
Objects.equals(prefix, that.prefix) &&
Objects.equals(filter, that.filter);
}

@Override
Expand All @@ -313,6 +433,7 @@ public String toString() {
+ ", fromHistoricalTimestamp=" + fromHistoricalTimestamp
+ ", toHistoricalTimestamp=" + toHistoricalTimestamp
+ ", prefix=" + prefix
+ ", filter=" + filter
+ "]";
}

Expand All @@ -339,6 +460,9 @@ private JsonFields() {

static final JsonFieldDefinition<String> PREFIX =
JsonFactory.newStringFieldDefinition("prefix", REGULAR, V_2);

public static final JsonFieldDefinition<String> FILTER =
JsonFactory.newStringFieldDefinition("filter", REGULAR, V_2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class SubscribeForPersistedEventsTest {
private static final long KNOWN_TO_REV = 42L;
private static final String KNOWN_FROM_TS = "2022-10-25T14:00:00Z";
private static final String KNOWN_TO_TS = "2022-10-25T15:00:00Z";
private static final String KNOWN_FILTER = "exists(thingId)";

private static final String JSON_ALL_FIELDS = JsonFactory.newObjectBuilder()
.set(Command.JsonFields.TYPE, SubscribeForPersistedEvents.TYPE)
Expand All @@ -53,6 +54,7 @@ public final class SubscribeForPersistedEventsTest {
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_REVISION, KNOWN_TO_REV)
.set(SubscribeForPersistedEvents.JsonFields.JSON_FROM_HISTORICAL_TIMESTAMP, KNOWN_FROM_TS)
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_TIMESTAMP, KNOWN_TO_TS)
.set(SubscribeForPersistedEvents.JsonFields.FILTER, KNOWN_FILTER)
.build()
.toString();

Expand All @@ -63,6 +65,7 @@ public final class SubscribeForPersistedEventsTest {
.set(StreamingSubscriptionCommand.JsonFields.JSON_RESOURCE_PATH, KNOWN_RESOURCE_PATH)
.set(SubscribeForPersistedEvents.JsonFields.JSON_FROM_HISTORICAL_REVISION, KNOWN_FROM_REV)
.set(SubscribeForPersistedEvents.JsonFields.JSON_TO_HISTORICAL_REVISION, KNOWN_TO_REV)
.set(SubscribeForPersistedEvents.JsonFields.FILTER, KNOWN_FILTER)
.build().toString();

@Test
Expand All @@ -88,6 +91,7 @@ public void toJsonWithAllFieldsSet() {
KNOWN_TO_REV,
Instant.parse(KNOWN_FROM_TS),
Instant.parse(KNOWN_TO_TS),
KNOWN_FILTER,
DittoHeaders.empty()
);

Expand All @@ -102,6 +106,7 @@ public void toJsonWithOnlyRequiredFieldsSet() {
JsonPointer.of(KNOWN_RESOURCE_PATH),
KNOWN_FROM_REV,
KNOWN_TO_REV,
KNOWN_FILTER,
DittoHeaders.empty());
final String json = command.toJsonString();
assertThat(json).isEqualTo(JSON_MINIMAL);
Expand All @@ -116,6 +121,7 @@ public void fromJsonWithAllFieldsSet() {
KNOWN_TO_REV,
Instant.parse(KNOWN_FROM_TS),
Instant.parse(KNOWN_TO_TS),
KNOWN_FILTER,
DittoHeaders.empty()
);
assertThat(SubscribeForPersistedEvents.fromJson(JsonObject.of(JSON_ALL_FIELDS), DittoHeaders.empty()))
Expand All @@ -130,6 +136,7 @@ public void fromJsonWithOnlyRequiredFieldsSet() {
JsonPointer.of(KNOWN_RESOURCE_PATH),
KNOWN_FROM_REV,
KNOWN_TO_REV,
KNOWN_FILTER,
DittoHeaders.empty()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,22 @@
import javax.annotation.Nullable;
import javax.jms.JMSRuntimeException;

import org.apache.pekko.actor.ActorKilledException;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.OneForOneStrategy;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.ReceiveTimeout;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.apache.pekko.japi.pf.FI;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.base.service.config.supervision.LocalAskTimeoutConfig;
Expand All @@ -48,17 +60,6 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.pekko.actor.ActorKilledException;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.OneForOneStrategy;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.ReceiveTimeout;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.apache.pekko.japi.pf.FI;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;

/**
* Supervisor for {@link ConnectionPersistenceActor} which means it will create, start and watch it as child actor.
* <p>
Expand Down Expand Up @@ -158,6 +159,11 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));
}

@Override
protected boolean applyPersistedEventFilter(final Event<?> event, final SubscribeForPersistedEvents subscribe) {
return true;
}

@Override
protected boolean shouldBecomeTwinSignalProcessingAwaiting(final Signal<?> signal) {
return super.shouldBecomeTwinSignalProcessingAwaiting(signal) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
fieldPointer,
fromHistoricalRevision,
null != toHistoricalRevision ? toHistoricalRevision : Long.MAX_VALUE,
filterString,
dittoHeaders);
} else if (null != fromHistoricalTimestamp) {
FeatureToggle
Expand All @@ -401,6 +402,7 @@ private Route createSseRoute(final RequestContext ctx, final CompletionStage<Dit
fieldPointer,
fromHistoricalTimestamp,
toHistoricalTimestamp,
filterString,
dittoHeaders);
} else {
startStreaming =
Expand Down
Loading

0 comments on commit 914493f

Please sign in to comment.