Skip to content

Commit

Permalink
Merge pull request #1866 from eclipse-ditto/feature/historical-stream…
Browse files Browse the repository at this point in the history
…ing-negative

Provide option to provide negative numbers to historical event streaming
  • Loading branch information
thjaeckle committed Jan 17, 2024
2 parents 5d0a5af + 17ad339 commit 0ad1528
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
"properties": {
"fromHistoricalRevision": {
"type": "integer",
"description": "The revision to start the streaming from."
"description": "The revision to start the streaming from. May also be negative in order to specify to get the last n revisions relative to the 'toHistoricalRevision'."
},
"toHistoricalRevision": {
"type": "integer",
"description": "The revision to stop the streaming at."
"description": "The revision to stop the streaming at. May also be 0 or negative in order to specify to get either the latest (0) or the nth most recent revision."
},
"fromHistoricalTimestamp": {
"type": "string",
Expand Down
6 changes: 4 additions & 2 deletions documentation/src/main/resources/pages/ditto/basic-history.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ This API is however **only available for things** (not for policies).
Use the following query parameters in order to specify the start/stop revision/timestamp.

Either use the revision based parameters:
* `from-historical-revision`: specifies the revision number to start streaming historical modification events from
* `to-historical-revision`: optionally specifies the revision number to stop streaming at (if omitted, it streams events until the current state of the entity)
* `from-historical-revision`: Specifies the revision number to start streaming historical modification events from.
May also be negative in order to specify to get the last `n` revisions relative to the `to-historical-revision`.
* `to-historical-revision`: Optionally specifies the revision number to stop streaming at (if omitted, it streams events until the current state of the entity).
May also be 0 or negative in order to specify to get either the latest (`0`) or the `n`th most recent revision.

Alternatively, use the timestamp based parameters:
* `from-historical-timestamp`: specifies the timestamp to start streaming historical modification events from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.persistence.query.EventEnvelope;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.persistence.query.PersistenceQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentEventsByPersistenceIdQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentEventsByTagQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentPersistenceIdsQuery;
import org.apache.pekko.persistence.query.javadsl.EventsByPersistenceIdQuery;
import org.apache.pekko.persistence.query.javadsl.EventsByTagQuery;
import org.apache.pekko.persistence.query.javadsl.PersistenceIdsQuery;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.RestartSettings;
import org.apache.pekko.stream.SystemMaterializer;
import org.apache.pekko.stream.javadsl.RestartSource;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
Expand Down Expand Up @@ -54,27 +74,6 @@
import com.mongodb.reactivestreams.client.MongoCollection;
import com.typesafe.config.Config;

import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.persistence.query.EventEnvelope;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.persistence.query.PersistenceQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentEventsByPersistenceIdQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentEventsByTagQuery;
import org.apache.pekko.persistence.query.javadsl.CurrentPersistenceIdsQuery;
import org.apache.pekko.persistence.query.javadsl.EventsByPersistenceIdQuery;
import org.apache.pekko.persistence.query.javadsl.EventsByTagQuery;
import org.apache.pekko.persistence.query.javadsl.PersistenceIdsQuery;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.RestartSettings;
import org.apache.pekko.stream.SystemMaterializer;
import org.apache.pekko.stream.javadsl.RestartSource;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;

import pekko.contrib.persistence.mongodb.JavaDslMongoReadJournal;
import pekko.contrib.persistence.mongodb.JournallingFieldNames$;
import pekko.contrib.persistence.mongodb.SnapshottingFieldNames$;
Expand Down Expand Up @@ -547,6 +546,23 @@ public Source<Optional<Long>, NotUsed> getSmallestEventSeqNo(final String pid) {
.orElse(Source.single(Optional.empty()));
}

/**
* Find the latest/newest event sequence number of a PID.
*
* @param pid the PID to search for.
* @return source of the latest event sequence number, or an empty optional.
*/
public Source<Optional<Long>, NotUsed> getLatestEventSeqNo(final String pid) {
return getJournal()
.flatMapConcat(journal -> Source.fromPublisher(
journal.find(Filters.eq(J_PROCESSOR_ID, pid))
.sort(Sorts.descending(J_TO))
.limit(1)
))
.map(document -> Optional.of(document.getLong(J_TO)))
.orElse(Source.single(Optional.empty()));
}

/**
* Find the smallest snapshot sequence number of a PID.
*
Expand Down Expand Up @@ -603,7 +619,16 @@ public Source<DeleteResult, NotUsed> deleteSnapshots(final String pid, final lon
public Source<EventEnvelope, NotUsed> currentEventsByPersistenceId(final String persistenceId,
final long fromSequenceNr,
final long toSequenceNr) {
return pekkoReadJournal.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr);
if (fromSequenceNr <= 0 || toSequenceNr <= 0) {
return getLatestEventSeqNo(persistenceId).flatMapConcat(latestSnOpt -> {
final long effectiveTo = toSequenceNr <= 0 ?
latestSnOpt.map(latest -> latest + toSequenceNr).orElse(toSequenceNr) : toSequenceNr;
final long effectiveFrom = fromSequenceNr <= 0 ? effectiveTo + 1 + fromSequenceNr : fromSequenceNr;
return pekkoReadJournal.currentEventsByPersistenceId(persistenceId, effectiveFrom, effectiveTo);
});
} else {
return pekkoReadJournal.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr);
}
}

@Override
Expand Down

0 comments on commit 0ad1528

Please sign in to comment.