diff --git a/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policies/PoliciesPersistenceStreamingActorCreator.java b/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policies/PoliciesPersistenceStreamingActorCreator.java
index dcc06a8a9e..ddea2a4bf4 100644
--- a/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policies/PoliciesPersistenceStreamingActorCreator.java
+++ b/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policies/PoliciesPersistenceStreamingActorCreator.java
@@ -13,8 +13,10 @@
import org.eclipse.ditto.services.models.policies.PolicyTag;
import org.eclipse.ditto.services.policies.persistence.actors.policy.PolicyPersistenceActor;
-import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr;
import org.eclipse.ditto.services.utils.persistence.mongo.DefaultPersistenceStreamingActor;
+import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr;
+
+import com.typesafe.config.Config;
import akka.actor.Props;
@@ -32,21 +34,22 @@ public final class PoliciesPersistenceStreamingActorCreator {
private PoliciesPersistenceStreamingActorCreator() {
throw new AssertionError();
}
-
+
/**
* Creates Akka configuration object Props for this PersistenceQueriesActor.
*
+ * @param config the actor system configuration.
* @param streamingCacheSize the size of the streaming cache.
* @return the Akka configuration Props object.
*/
- public static Props props(final int streamingCacheSize) {
- return DefaultPersistenceStreamingActor.props(streamingCacheSize,
+ public static Props props(final Config config, final int streamingCacheSize) {
+ return DefaultPersistenceStreamingActor.props(config, streamingCacheSize,
PoliciesPersistenceStreamingActorCreator::createElement);
}
private static PolicyTag createElement(final PidWithSeqNr pidWithSeqNr) {
- final String id = pidWithSeqNr.persistenceId()
+ final String id = pidWithSeqNr.getPersistenceId()
.replaceFirst(PolicyPersistenceActor.PERSISTENCE_ID_PREFIX, "");
- return PolicyTag.of(id, pidWithSeqNr.sequenceNr());
+ return PolicyTag.of(id, pidWithSeqNr.getSequenceNr());
}
}
diff --git a/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java b/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java
index 63166c1544..5a6bd5990d 100755
--- a/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java
+++ b/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java
@@ -135,7 +135,7 @@ private PoliciesRootActor(final Config config, final ActorRef pubSubMediator,
final int tagsStreamingCacheSize = config.getInt(ConfigKeys.POLICIES_TAGS_STREAMING_CACHE_SIZE);
final ActorRef persistenceStreamingActor = startChildActor(PoliciesPersistenceStreamingActorCreator.ACTOR_NAME,
- PoliciesPersistenceStreamingActorCreator.props(tagsStreamingCacheSize));
+ PoliciesPersistenceStreamingActorCreator.props(config, tagsStreamingCacheSize));
pubSubMediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
pubSubMediator.tell(new DistributedPubSubMediator.Put(persistenceStreamingActor), getSelf());
diff --git a/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingsPersistenceStreamingActorCreator.java b/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingsPersistenceStreamingActorCreator.java
index d7dd182165..683f8d9b9c 100644
--- a/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingsPersistenceStreamingActorCreator.java
+++ b/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingsPersistenceStreamingActorCreator.java
@@ -12,8 +12,10 @@
package org.eclipse.ditto.services.things.persistence.actors;
import org.eclipse.ditto.services.models.things.ThingTag;
-import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr;
import org.eclipse.ditto.services.utils.persistence.mongo.DefaultPersistenceStreamingActor;
+import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr;
+
+import com.typesafe.config.Config;
import akka.actor.Props;
@@ -35,17 +37,18 @@ private ThingsPersistenceStreamingActorCreator() {
/**
* Creates Akka configuration object Props for this PersistenceQueriesActor.
*
+ * @param config the actor system configuration.
* @param streamingCacheSize the size of the streaming cache.
* @return the Akka configuration Props object.
*/
- public static Props props(final int streamingCacheSize) {
- return DefaultPersistenceStreamingActor.props(streamingCacheSize,
+ public static Props props(final Config config, final int streamingCacheSize) {
+ return DefaultPersistenceStreamingActor.props(config, streamingCacheSize,
ThingsPersistenceStreamingActorCreator::createElement);
}
private static ThingTag createElement(final PidWithSeqNr pidWithSeqNr) {
- final String id = pidWithSeqNr.persistenceId()
+ final String id = pidWithSeqNr.getPersistenceId()
.replaceFirst(ThingPersistenceActor.PERSISTENCE_ID_PREFIX, "");
- return ThingTag.of(id, pidWithSeqNr.sequenceNr());
+ return ThingTag.of(id, pidWithSeqNr.getSequenceNr());
}
}
diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java
index 1e8ce9dcfd..a1ffe670d8 100644
--- a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java
+++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java
@@ -15,29 +15,27 @@
import java.util.function.Function;
-import javax.annotation.Nullable;
-
import org.eclipse.ditto.services.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.services.models.streaming.SudoStreamModifiedEntities;
import org.eclipse.ditto.services.utils.akka.streaming.AbstractStreamingActor;
-import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.DittoJavaDslMongoReadJournal;
-import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.DittoMongoReadJournal;
-import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr;
+import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal;
+import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr;
+import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;
import akka.NotUsed;
-import akka.persistence.query.PersistenceQuery;
import akka.stream.javadsl.Source;
/**
* Abstract implementation of an Actor that streams information about persisted entities modified in a time window in
* the past.
*/
+@AllValuesAreNonnullByDefault
public abstract class AbstractPersistenceStreamingActor
extends AbstractStreamingActor {
private final int streamingCacheSize;
private final Function entityMapper;
- private final DittoJavaDslMongoReadJournal readJournal;
+ private final MongoReadJournal readJournal;
/**
* Constructor.
@@ -45,15 +43,14 @@ public abstract class AbstractPersistenceStreamingActor entityMapper, @Nullable final DittoJavaDslMongoReadJournal readJournal) {
+ final Function entityMapper, final MongoReadJournal readJournal) {
this.streamingCacheSize = streamingCacheSize;
this.entityMapper = requireNonNull(entityMapper);
- this.readJournal = readJournal != null ? readJournal : PersistenceQuery.get(getContext().getSystem())
- .getReadJournalFor(DittoJavaDslMongoReadJournal.class, DittoMongoReadJournal.Identifier());
+ this.readJournal = readJournal;
}
@Override
@@ -74,16 +71,16 @@ protected final Source createSource(final SudoStreamModifiedEntities
// create a separate cache per stream (don't use member variable!)
final ComparableCache cache = new ComparableCache<>(streamingCacheSize);
- return readJournal.sequenceNumbersOfPidsByInterval(command.getStart(), command.getEnd())
+ return readJournal.getPidWithSeqNrsByInterval(command.getStart(), command.getEnd())
.log(unfilteredStreamingLogName, log)
// avoid unnecessary streaming of old sequence numbers
- .filter(pidWithSeqNr -> cache.updateIfNewOrGreater(pidWithSeqNr.persistenceId(),
- pidWithSeqNr.sequenceNr()))
+ .filter(pidWithSeqNr ->
+ cache.updateIfNewOrGreater(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr()))
.map(this::mapEntity)
.log(filteredStreamingLogName, log);
}
private T mapEntity(final PidWithSeqNr pidWithSeqNr) {
- return entityMapper.apply(pidWithSeqNr) ;
+ return entityMapper.apply(pidWithSeqNr);
}
}
diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActor.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActor.java
index c38968a3d7..f0662193cc 100644
--- a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActor.java
+++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActor.java
@@ -14,54 +14,57 @@
import java.util.function.Function;
import org.eclipse.ditto.services.models.streaming.EntityIdWithRevision;
-import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.DittoJavaDslMongoReadJournal;
-import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr;
+import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal;
+import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr;
+import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;
+
+import com.typesafe.config.Config;
import akka.actor.Props;
-import akka.japi.Creator;
/**
* Configurable default implementation of {@link AbstractPersistenceStreamingActor}.
*/
+@AllValuesAreNonnullByDefault
public final class DefaultPersistenceStreamingActor
extends AbstractPersistenceStreamingActor {
- private DefaultPersistenceStreamingActor(final int streamingCacheSize,
- final Function entityMapper) {
- this(streamingCacheSize, entityMapper, null);
- }
+ private final MongoClientWrapper mongoClientWrapper;
+
+ DefaultPersistenceStreamingActor(final int streamingCacheSize,
+ final Function entityMapper,
+ final MongoReadJournal readJournal,
+ final MongoClientWrapper mongoClientWrapper) {
- private DefaultPersistenceStreamingActor(final int streamingCacheSize,
- final Function entityMapper, final DittoJavaDslMongoReadJournal readJournal) {
super(streamingCacheSize, entityMapper, readJournal);
+ this.mongoClientWrapper = mongoClientWrapper;
}
/**
* Creates Akka configuration object Props for this PersistenceStreamingActor.
*
+ * @param config the configuration of the akka system.
* @param streamingCacheSize the size of the streaming cache.
* @param entityMapper the mapper used to map {@link PidWithSeqNr} to {@code T}. The resulting entity will be
* streamed to the recipient actor.
* @return the Akka configuration Props object.
*/
- public static Props props(final int streamingCacheSize,
+ public static Props props(final Config config,
+ final int streamingCacheSize,
final Function entityMapper) {
- return Props.create(DefaultPersistenceStreamingActor.class,
- () -> new DefaultPersistenceStreamingActor<>(streamingCacheSize, entityMapper));
+ return Props.create(DefaultPersistenceStreamingActor.class, () -> {
+ final MongoClientWrapper mongoClient = MongoClientWrapper.newInstance(config);
+ final MongoReadJournal readJournal = MongoReadJournal.newInstance(config, mongoClient);
+ return new DefaultPersistenceStreamingActor<>(streamingCacheSize, entityMapper, readJournal, mongoClient);
+ });
}
- static Props props(final int streamingCacheSize,
- final Function entityMapper, final DittoJavaDslMongoReadJournal readJournal) {
- return Props.create(DefaultPersistenceStreamingActor.class, new Creator() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public DefaultPersistenceStreamingActor create() {
- return new DefaultPersistenceStreamingActor<>(streamingCacheSize, entityMapper, readJournal);
- }
- });
+ @Override
+ public void postStop() throws Exception {
+ mongoClientWrapper.close();
+ super.postStop();
}
}
diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/MongoClientWrapper.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/MongoClientWrapper.java
index 87e55e70cb..5bcec742e1 100644
--- a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/MongoClientWrapper.java
+++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/MongoClientWrapper.java
@@ -34,7 +34,8 @@
/**
* MongoDB Client Wrapper.
*/
-public final class MongoClientWrapper implements Closeable {
+public class MongoClientWrapper implements Closeable {
+ // not final to test with Mockito
private final MongoClient mongoClient;
private final MongoDatabase mongoDatabase;
diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoReadJournal.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoReadJournal.java
new file mode 100644
index 0000000000..1a8a9a967f
--- /dev/null
+++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoReadJournal.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright (c) 2017 Bosch Software Innovations GmbH.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v2.0
+ * which accompanies this distribution, and is available at
+ * https://www.eclipse.org/org/documents/epl-2.0/index.php
+ *
+ * Contributors:
+ * Bosch Software Innovations GmbH - initial contribution
+ */
+package org.eclipse.ditto.services.utils.persistence.mongo.streaming;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.bson.Document;
+import org.bson.types.ObjectId;
+import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientWrapper;
+import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.QueryOperators;
+import com.mongodb.reactivestreams.client.MongoCollection;
+import com.typesafe.config.Config;
+
+import akka.NotUsed;
+import akka.contrib.persistence.mongodb.JournallingFieldNames$;
+import akka.stream.javadsl.Source;
+
+/**
+ * Reads the event journal of com.github.scullxbones.akka-persistence-mongo plugin.
+ *
+ * In the Akka system configuration,
+ *
+ * -
+ * {@code akka.persistence.journal.auto-start-journals} must contain exactly 1 configuration key {@code
+ * },
+ *
+ * -
+ * {@code .overrides.journal-collection} must be defined and equal to the name of the event journal
+ * collection.
+ *
+ *
+ *
+ */
+@AllValuesAreNonnullByDefault
+public class MongoReadJournal {
+ // not a final class to test with Mockito
+
+ private static final String AKKA_PERSISTENCE_JOURNAL_AUTO_START_JOURNALS =
+ "akka.persistence.journal.auto-start-journals";
+
+ private static final String JOURNAL_COLLECTION_NAME_SUFFIX = ".overrides.journal-collection";
+
+ private static final String ID = JournallingFieldNames$.MODULE$.ID();
+ private static final String PROCESSOR_ID = JournallingFieldNames$.MODULE$.PROCESSOR_ID();
+ private static final String TO = JournallingFieldNames$.MODULE$.TO();
+ private static final String GTE = QueryOperators.GTE;
+ private static final String LT = QueryOperators.LT;
+
+ private static final Integer PROJECT_INCLUDE = 1;
+ private static final Integer SORT_DESCENDING = -1;
+
+ private static final Document PROJECT_DOCUMENT =
+ toDocument(new Object[][]{{PROCESSOR_ID, PROJECT_INCLUDE}, {TO, PROJECT_INCLUDE}});
+
+ private static final Document SORT_DOCUMENT = toDocument(new Object[][]{{ID, SORT_DESCENDING}});
+
+ private final Logger log;
+
+ private final MongoCollection journal;
+
+ private MongoReadJournal(final MongoCollection journal) {
+ this.journal = journal;
+ this.log = LoggerFactory.getLogger(MongoSearchSyncPersistence.class);
+ }
+
+ /**
+ * Creates a new {@code MongoReadJournal}.
+ *
+ * @param config The Akka system configuration.
+ * @param clientWrapper The Mongo client wrapper.
+ */
+ public static MongoReadJournal newInstance(final Config config, final MongoClientWrapper clientWrapper) {
+ final String journalCollectionName = resolveJournalCollectionName(config);
+ final MongoCollection journal = clientWrapper.getDatabase().getCollection(journalCollectionName);
+ return new MongoReadJournal(journal);
+ }
+
+ /**
+ * Retrieve sequence numbers for persistence IDs modified within the time interval as a source of {@code
+ * PidWithSeqNr}. A persistence ID may appear multiple times with various sequence numbers.
+ *
+ * @param start start of the time window.
+ * @param end end of the time window.
+ */
+ public Source getPidWithSeqNrsByInterval(final Instant start, final Instant end) {
+
+ final Document filterDocument = createFilterObject(start, end);
+
+ final Publisher publisher =
+ journal.find(filterDocument, Document.class).projection(PROJECT_DOCUMENT).sort(SORT_DOCUMENT);
+
+ return Source.fromPublisher(publisher)
+ .map(doc -> new PidWithSeqNr(doc.getString(PROCESSOR_ID), doc.getLong(TO)));
+ }
+
+ private Document createFilterObject(final Instant start, final Instant end) {
+ final ObjectId startObjectId = instantToObjectIdBoundary(start);
+ final ObjectId endObjectId = instantToObjectIdBoundary(end.plus(1L, ChronoUnit.SECONDS));
+ log.debug("Limiting query to ObjectIds $gte {} and $lt {}", startObjectId, endObjectId);
+ return toDocument(new Object[][]{
+ {ID, toDocument(new Object[][]{
+ {GTE, startObjectId},
+ {LT, endObjectId}
+ })}
+ });
+ }
+
+ /* Create a ObjectID boundary from a timestamp to be used for comparison in MongoDB queries. */
+ private static ObjectId instantToObjectIdBoundary(final Instant instant) {
+ // MongoDBObject IDs only contain dates with precision of seconds, thus adjust the range of the query
+ // appropriately to make sure a client does not miss data when providing Instants with higher precision.
+ //
+ // Do not use
+ //
+ // new ObjectId(Date.from(startTruncatedToSecs))
+ //
+ // to compute object ID boundaries. The 1-argument constructor above appends incidental non-zero bits after
+ // the timestamp and may filter out events persisted after 'instant' if they happen to have
+ // a lower machine ID, process ID or counter value. (A MongoDB ObjectID is a byte array with fields for
+ // timestamp, machine ID, process ID and counter such that timestamp occupies the most significant bits.)
+ return new ObjectId(Date.from(instant.truncatedTo(ChronoUnit.SECONDS)), 0, (short) 0, 0);
+ }
+
+ private static Document toDocument(final Object[][] keyValuePairs) {
+ final Map map = new HashMap<>(keyValuePairs.length);
+ for (Object[] keyValuePair : keyValuePairs) {
+ map.put(keyValuePair[0].toString(), keyValuePair[1]);
+ }
+ return new Document(map);
+ }
+
+ /**
+ * Resolve event journal collection name from an Akka configuration object.
+ *
+ * It assumes that in the Akka system configuration,
+ *
+ * -
+ * {@code akka.persistence.journal.auto-start-journals} contains exactly 1 configuration key {@code
+ * },
+ *
+ * -
+ * {@code .overrides.journal-collection} is defined and equal to the name of the event journal
+ * collection.
+ *
+ *
+ *
+ *
+ * @param config The configuration.
+ * @return The name of the event journal collection.
+ * @throws IllegalArgumentException if {@code akka.persistence.journal.auto-start-journal} is not a singleton list.
+ * @throws com.typesafe.config.ConfigException.Missing if a relevant config value is missing.
+ * @throws com.typesafe.config.ConfigException.WrongType if a relevant config value has not the expected type.
+ */
+ private static String resolveJournalCollectionName(final Config config) {
+ final List autoStartJournals = config.getStringList(AKKA_PERSISTENCE_JOURNAL_AUTO_START_JOURNALS);
+ if (autoStartJournals.size() != 1) {
+ final String message = String.format("Expect %s to be a singleton list, but it is List(%s)",
+ AKKA_PERSISTENCE_JOURNAL_AUTO_START_JOURNALS,
+ autoStartJournals.stream().collect(Collectors.joining(", ")));
+ throw new IllegalArgumentException(message);
+ } else {
+ final String journalKey = autoStartJournals.get(0);
+ final String journalCollectionNameKey = journalKey + JOURNAL_COLLECTION_NAME_SUFFIX;
+ return config.getString(journalCollectionNameKey);
+ }
+ }
+}
diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/PidWithSeqNr.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/PidWithSeqNr.java
new file mode 100644
index 0000000000..dd75f502fa
--- /dev/null
+++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/PidWithSeqNr.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2017 Bosch Software Innovations GmbH.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v2.0
+ * which accompanies this distribution, and is available at
+ * https://www.eclipse.org/org/documents/epl-2.0/index.php
+ *
+ * Contributors:
+ * Bosch Software Innovations GmbH - initial contribution
+ */
+package org.eclipse.ditto.services.utils.persistence.mongo.streaming;
+
+import java.util.Objects;
+
+/**
+ * A pair of {@code String} and {@code long} to store persistence ID and sequence number of an event in a journal.
+ */
+public final class PidWithSeqNr {
+
+ private final String persistenceId;
+ private final long sequenceNr;
+
+ /**
+ * Creates a pair from {@code String} and {@code long}.
+ *
+ * @param persistenceId the Akka persistence persistenceId.
+ * @param sequenceNr the sequence number.
+ */
+ public PidWithSeqNr(final String persistenceId, final long sequenceNr) {
+ this.persistenceId = persistenceId;
+ this.sequenceNr = sequenceNr;
+ }
+
+ /**
+ * Retrieve the persistence ID.
+ *
+ * @return The persistence ID.
+ */
+ public String getPersistenceId() {
+ return persistenceId;
+ }
+
+ /**
+ * Retrieve the sequence number.
+ *
+ * @return The sequence number.
+ */
+ public long getSequenceNr() {
+ return sequenceNr;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ } else {
+ final PidWithSeqNr that = (PidWithSeqNr) o;
+ return Objects.equals(persistenceId, that.persistenceId) &&
+ sequenceNr == that.sequenceNr;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(persistenceId, sequenceNr);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " ["
+ + "persistenceId=" + persistenceId
+ + ", sequenceNr=" + sequenceNr
+ + "]";
+ }
+}
diff --git a/services/utils/persistence/src/test/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActorTest.java b/services/utils/persistence/src/test/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActorTest.java
index b574176d81..b5ee0ea010 100644
--- a/services/utils/persistence/src/test/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActorTest.java
+++ b/services/utils/persistence/src/test/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActorTest.java
@@ -22,8 +22,8 @@
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.services.models.streaming.AbstractEntityIdWithRevision;
import org.eclipse.ditto.services.models.streaming.SudoStreamModifiedEntities;
-import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.DittoJavaDslMongoReadJournal;
-import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr;
+import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal;
+import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr;
import org.eclipse.ditto.signals.commands.base.Command;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -77,7 +77,7 @@ public void retrieveEmptyStream() {
@Test
public void retrieveNonEmptyStream() {
new TestKit(actorSystem) {{
- final Source mockedSource = Source.single(PidWithSeqNr.apply(ID, REVISION));
+ final Source mockedSource = Source.single(new PidWithSeqNr(ID, REVISION));
final ActorRef underTest = createPersistenceQueriesActor(mockedSource);
final Command> command = createStreamingRequest();
@@ -97,10 +97,12 @@ private Command> createStreamingRequest() {
}
private static ActorRef createPersistenceQueriesActor(final Source mockedSource) {
- final DittoJavaDslMongoReadJournal mock = mock(DittoJavaDslMongoReadJournal.class);
- when(mock.sequenceNumbersOfPidsByInterval(any(), any())).thenReturn(mockedSource);
- final Props props = DefaultPersistenceStreamingActor.props(100,
- DefaultPersistenceStreamingActorTest::mapEntity, mock);
+ final MongoClientWrapper mockClient = mock(MongoClientWrapper.class);
+ final MongoReadJournal mockJournal = mock(MongoReadJournal.class);
+ when(mockJournal.getPidWithSeqNrsByInterval(any(), any())).thenReturn(mockedSource);
+ final Props props = Props.create(DefaultPersistenceStreamingActor.class, () ->
+ new DefaultPersistenceStreamingActor<>(100,
+ DefaultPersistenceStreamingActorTest::mapEntity, mockJournal, mockClient));
return actorSystem.actorOf(props, "persistenceQueriesActor-" + UUID.randomUUID());
}
@@ -109,10 +111,11 @@ private static void sendCommand(final TestKit testKit, final ActorRef actorRef,
}
private static SimpleEntityIdWithRevision mapEntity(final PidWithSeqNr pidWithSeqNr) {
- return new SimpleEntityIdWithRevision(pidWithSeqNr.persistenceId(), pidWithSeqNr.sequenceNr());
+ return new SimpleEntityIdWithRevision(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr());
}
private static final class SimpleEntityIdWithRevision extends AbstractEntityIdWithRevision {
+
private SimpleEntityIdWithRevision(final String id, final long revision) {
super(id, revision);
}