From f69482a00ac93f3cd23cf8a9b1d037d5b08b0fd8 Mon Sep 17 00:00:00 2001 From: Cai Yufei Date: Wed, 20 Dec 2017 12:12:07 +0100 Subject: [PATCH] add MongoReadJournal to access event journal from streaming actors Signed-off-by: Cai Yufei --- ...iciesPersistenceStreamingActorCreator.java | 15 +- .../policies/starter/PoliciesRootActor.java | 2 +- ...hingsPersistenceStreamingActorCreator.java | 13 +- .../AbstractPersistenceStreamingActor.java | 29 ++- .../DefaultPersistenceStreamingActor.java | 47 ++--- .../persistence/mongo/MongoClientWrapper.java | 3 +- .../mongo/streaming/MongoReadJournal.java | 187 ++++++++++++++++++ .../mongo/streaming/PidWithSeqNr.java | 78 ++++++++ .../DefaultPersistenceStreamingActorTest.java | 19 +- 9 files changed, 334 insertions(+), 59 deletions(-) create mode 100644 services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoReadJournal.java create mode 100644 services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/PidWithSeqNr.java diff --git a/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policies/PoliciesPersistenceStreamingActorCreator.java b/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policies/PoliciesPersistenceStreamingActorCreator.java index dcc06a8a9e..ddea2a4bf4 100644 --- a/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policies/PoliciesPersistenceStreamingActorCreator.java +++ b/services/policies/persistence/src/main/java/org/eclipse/ditto/services/policies/persistence/actors/policies/PoliciesPersistenceStreamingActorCreator.java @@ -13,8 +13,10 @@ import org.eclipse.ditto.services.models.policies.PolicyTag; import org.eclipse.ditto.services.policies.persistence.actors.policy.PolicyPersistenceActor; -import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr; import org.eclipse.ditto.services.utils.persistence.mongo.DefaultPersistenceStreamingActor; +import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr; + +import com.typesafe.config.Config; import akka.actor.Props; @@ -32,21 +34,22 @@ public final class PoliciesPersistenceStreamingActorCreator { private PoliciesPersistenceStreamingActorCreator() { throw new AssertionError(); } - + /** * Creates Akka configuration object Props for this PersistenceQueriesActor. * + * @param config the actor system configuration. * @param streamingCacheSize the size of the streaming cache. * @return the Akka configuration Props object. */ - public static Props props(final int streamingCacheSize) { - return DefaultPersistenceStreamingActor.props(streamingCacheSize, + public static Props props(final Config config, final int streamingCacheSize) { + return DefaultPersistenceStreamingActor.props(config, streamingCacheSize, PoliciesPersistenceStreamingActorCreator::createElement); } private static PolicyTag createElement(final PidWithSeqNr pidWithSeqNr) { - final String id = pidWithSeqNr.persistenceId() + final String id = pidWithSeqNr.getPersistenceId() .replaceFirst(PolicyPersistenceActor.PERSISTENCE_ID_PREFIX, ""); - return PolicyTag.of(id, pidWithSeqNr.sequenceNr()); + return PolicyTag.of(id, pidWithSeqNr.getSequenceNr()); } } diff --git a/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java b/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java index 63166c1544..5a6bd5990d 100755 --- a/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java +++ b/services/policies/starter/src/main/java/org/eclipse/ditto/services/policies/starter/PoliciesRootActor.java @@ -135,7 +135,7 @@ private PoliciesRootActor(final Config config, final ActorRef pubSubMediator, final int tagsStreamingCacheSize = config.getInt(ConfigKeys.POLICIES_TAGS_STREAMING_CACHE_SIZE); final ActorRef persistenceStreamingActor = startChildActor(PoliciesPersistenceStreamingActorCreator.ACTOR_NAME, - PoliciesPersistenceStreamingActorCreator.props(tagsStreamingCacheSize)); + PoliciesPersistenceStreamingActorCreator.props(config, tagsStreamingCacheSize)); pubSubMediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf()); pubSubMediator.tell(new DistributedPubSubMediator.Put(persistenceStreamingActor), getSelf()); diff --git a/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingsPersistenceStreamingActorCreator.java b/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingsPersistenceStreamingActorCreator.java index d7dd182165..683f8d9b9c 100644 --- a/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingsPersistenceStreamingActorCreator.java +++ b/services/things/persistence/src/main/java/org/eclipse/ditto/services/things/persistence/actors/ThingsPersistenceStreamingActorCreator.java @@ -12,8 +12,10 @@ package org.eclipse.ditto.services.things.persistence.actors; import org.eclipse.ditto.services.models.things.ThingTag; -import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr; import org.eclipse.ditto.services.utils.persistence.mongo.DefaultPersistenceStreamingActor; +import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr; + +import com.typesafe.config.Config; import akka.actor.Props; @@ -35,17 +37,18 @@ private ThingsPersistenceStreamingActorCreator() { /** * Creates Akka configuration object Props for this PersistenceQueriesActor. * + * @param config the actor system configuration. * @param streamingCacheSize the size of the streaming cache. * @return the Akka configuration Props object. */ - public static Props props(final int streamingCacheSize) { - return DefaultPersistenceStreamingActor.props(streamingCacheSize, + public static Props props(final Config config, final int streamingCacheSize) { + return DefaultPersistenceStreamingActor.props(config, streamingCacheSize, ThingsPersistenceStreamingActorCreator::createElement); } private static ThingTag createElement(final PidWithSeqNr pidWithSeqNr) { - final String id = pidWithSeqNr.persistenceId() + final String id = pidWithSeqNr.getPersistenceId() .replaceFirst(ThingPersistenceActor.PERSISTENCE_ID_PREFIX, ""); - return ThingTag.of(id, pidWithSeqNr.sequenceNr()); + return ThingTag.of(id, pidWithSeqNr.getSequenceNr()); } } diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java index 1e8ce9dcfd..a1ffe670d8 100644 --- a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java +++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java @@ -15,29 +15,27 @@ import java.util.function.Function; -import javax.annotation.Nullable; - import org.eclipse.ditto.services.models.streaming.EntityIdWithRevision; import org.eclipse.ditto.services.models.streaming.SudoStreamModifiedEntities; import org.eclipse.ditto.services.utils.akka.streaming.AbstractStreamingActor; -import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.DittoJavaDslMongoReadJournal; -import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.DittoMongoReadJournal; -import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr; +import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal; +import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr; +import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault; import akka.NotUsed; -import akka.persistence.query.PersistenceQuery; import akka.stream.javadsl.Source; /** * Abstract implementation of an Actor that streams information about persisted entities modified in a time window in * the past. */ +@AllValuesAreNonnullByDefault public abstract class AbstractPersistenceStreamingActor extends AbstractStreamingActor { private final int streamingCacheSize; private final Function entityMapper; - private final DittoJavaDslMongoReadJournal readJournal; + private final MongoReadJournal readJournal; /** * Constructor. @@ -45,15 +43,14 @@ public abstract class AbstractPersistenceStreamingActor entityMapper, @Nullable final DittoJavaDslMongoReadJournal readJournal) { + final Function entityMapper, final MongoReadJournal readJournal) { this.streamingCacheSize = streamingCacheSize; this.entityMapper = requireNonNull(entityMapper); - this.readJournal = readJournal != null ? readJournal : PersistenceQuery.get(getContext().getSystem()) - .getReadJournalFor(DittoJavaDslMongoReadJournal.class, DittoMongoReadJournal.Identifier()); + this.readJournal = readJournal; } @Override @@ -74,16 +71,16 @@ protected final Source createSource(final SudoStreamModifiedEntities // create a separate cache per stream (don't use member variable!) final ComparableCache cache = new ComparableCache<>(streamingCacheSize); - return readJournal.sequenceNumbersOfPidsByInterval(command.getStart(), command.getEnd()) + return readJournal.getPidWithSeqNrsByInterval(command.getStart(), command.getEnd()) .log(unfilteredStreamingLogName, log) // avoid unnecessary streaming of old sequence numbers - .filter(pidWithSeqNr -> cache.updateIfNewOrGreater(pidWithSeqNr.persistenceId(), - pidWithSeqNr.sequenceNr())) + .filter(pidWithSeqNr -> + cache.updateIfNewOrGreater(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr())) .map(this::mapEntity) .log(filteredStreamingLogName, log); } private T mapEntity(final PidWithSeqNr pidWithSeqNr) { - return entityMapper.apply(pidWithSeqNr) ; + return entityMapper.apply(pidWithSeqNr); } } diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActor.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActor.java index c38968a3d7..f0662193cc 100644 --- a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActor.java +++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActor.java @@ -14,54 +14,57 @@ import java.util.function.Function; import org.eclipse.ditto.services.models.streaming.EntityIdWithRevision; -import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.DittoJavaDslMongoReadJournal; -import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr; +import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal; +import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr; +import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault; + +import com.typesafe.config.Config; import akka.actor.Props; -import akka.japi.Creator; /** * Configurable default implementation of {@link AbstractPersistenceStreamingActor}. */ +@AllValuesAreNonnullByDefault public final class DefaultPersistenceStreamingActor extends AbstractPersistenceStreamingActor { - private DefaultPersistenceStreamingActor(final int streamingCacheSize, - final Function entityMapper) { - this(streamingCacheSize, entityMapper, null); - } + private final MongoClientWrapper mongoClientWrapper; + + DefaultPersistenceStreamingActor(final int streamingCacheSize, + final Function entityMapper, + final MongoReadJournal readJournal, + final MongoClientWrapper mongoClientWrapper) { - private DefaultPersistenceStreamingActor(final int streamingCacheSize, - final Function entityMapper, final DittoJavaDslMongoReadJournal readJournal) { super(streamingCacheSize, entityMapper, readJournal); + this.mongoClientWrapper = mongoClientWrapper; } /** * Creates Akka configuration object Props for this PersistenceStreamingActor. * + * @param config the configuration of the akka system. * @param streamingCacheSize the size of the streaming cache. * @param entityMapper the mapper used to map {@link PidWithSeqNr} to {@code T}. The resulting entity will be * streamed to the recipient actor. * @return the Akka configuration Props object. */ - public static Props props(final int streamingCacheSize, + public static Props props(final Config config, + final int streamingCacheSize, final Function entityMapper) { - return Props.create(DefaultPersistenceStreamingActor.class, - () -> new DefaultPersistenceStreamingActor<>(streamingCacheSize, entityMapper)); + return Props.create(DefaultPersistenceStreamingActor.class, () -> { + final MongoClientWrapper mongoClient = MongoClientWrapper.newInstance(config); + final MongoReadJournal readJournal = MongoReadJournal.newInstance(config, mongoClient); + return new DefaultPersistenceStreamingActor<>(streamingCacheSize, entityMapper, readJournal, mongoClient); + }); } - static Props props(final int streamingCacheSize, - final Function entityMapper, final DittoJavaDslMongoReadJournal readJournal) { - return Props.create(DefaultPersistenceStreamingActor.class, new Creator() { - private static final long serialVersionUID = 1L; - - @Override - public DefaultPersistenceStreamingActor create() { - return new DefaultPersistenceStreamingActor<>(streamingCacheSize, entityMapper, readJournal); - } - }); + @Override + public void postStop() throws Exception { + mongoClientWrapper.close(); + super.postStop(); } } diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/MongoClientWrapper.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/MongoClientWrapper.java index 87e55e70cb..5bcec742e1 100644 --- a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/MongoClientWrapper.java +++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/MongoClientWrapper.java @@ -34,7 +34,8 @@ /** * MongoDB Client Wrapper. */ -public final class MongoClientWrapper implements Closeable { +public class MongoClientWrapper implements Closeable { + // not final to test with Mockito private final MongoClient mongoClient; private final MongoDatabase mongoDatabase; diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoReadJournal.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoReadJournal.java new file mode 100644 index 0000000000..1a8a9a967f --- /dev/null +++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/MongoReadJournal.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2017 Bosch Software Innovations GmbH. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * which accompanies this distribution, and is available at + * https://www.eclipse.org/org/documents/epl-2.0/index.php + * + * Contributors: + * Bosch Software Innovations GmbH - initial contribution + */ +package org.eclipse.ditto.services.utils.persistence.mongo.streaming; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.bson.Document; +import org.bson.types.ObjectId; +import org.eclipse.ditto.services.utils.persistence.mongo.MongoClientWrapper; +import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.QueryOperators; +import com.mongodb.reactivestreams.client.MongoCollection; +import com.typesafe.config.Config; + +import akka.NotUsed; +import akka.contrib.persistence.mongodb.JournallingFieldNames$; +import akka.stream.javadsl.Source; + +/** + * Reads the event journal of com.github.scullxbones.akka-persistence-mongo plugin. + *

+ * In the Akka system configuration, + *

    + *
  • + * {@code akka.persistence.journal.auto-start-journals} must contain exactly 1 configuration key {@code + * }, + *
  • + *
  • + * {@code .overrides.journal-collection} must be defined and equal to the name of the event journal + * collection. + *
  • + *
+ *

+ */ +@AllValuesAreNonnullByDefault +public class MongoReadJournal { + // not a final class to test with Mockito + + private static final String AKKA_PERSISTENCE_JOURNAL_AUTO_START_JOURNALS = + "akka.persistence.journal.auto-start-journals"; + + private static final String JOURNAL_COLLECTION_NAME_SUFFIX = ".overrides.journal-collection"; + + private static final String ID = JournallingFieldNames$.MODULE$.ID(); + private static final String PROCESSOR_ID = JournallingFieldNames$.MODULE$.PROCESSOR_ID(); + private static final String TO = JournallingFieldNames$.MODULE$.TO(); + private static final String GTE = QueryOperators.GTE; + private static final String LT = QueryOperators.LT; + + private static final Integer PROJECT_INCLUDE = 1; + private static final Integer SORT_DESCENDING = -1; + + private static final Document PROJECT_DOCUMENT = + toDocument(new Object[][]{{PROCESSOR_ID, PROJECT_INCLUDE}, {TO, PROJECT_INCLUDE}}); + + private static final Document SORT_DOCUMENT = toDocument(new Object[][]{{ID, SORT_DESCENDING}}); + + private final Logger log; + + private final MongoCollection journal; + + private MongoReadJournal(final MongoCollection journal) { + this.journal = journal; + this.log = LoggerFactory.getLogger(MongoSearchSyncPersistence.class); + } + + /** + * Creates a new {@code MongoReadJournal}. + * + * @param config The Akka system configuration. + * @param clientWrapper The Mongo client wrapper. + */ + public static MongoReadJournal newInstance(final Config config, final MongoClientWrapper clientWrapper) { + final String journalCollectionName = resolveJournalCollectionName(config); + final MongoCollection journal = clientWrapper.getDatabase().getCollection(journalCollectionName); + return new MongoReadJournal(journal); + } + + /** + * Retrieve sequence numbers for persistence IDs modified within the time interval as a source of {@code + * PidWithSeqNr}. A persistence ID may appear multiple times with various sequence numbers. + * + * @param start start of the time window. + * @param end end of the time window. + */ + public Source getPidWithSeqNrsByInterval(final Instant start, final Instant end) { + + final Document filterDocument = createFilterObject(start, end); + + final Publisher publisher = + journal.find(filterDocument, Document.class).projection(PROJECT_DOCUMENT).sort(SORT_DOCUMENT); + + return Source.fromPublisher(publisher) + .map(doc -> new PidWithSeqNr(doc.getString(PROCESSOR_ID), doc.getLong(TO))); + } + + private Document createFilterObject(final Instant start, final Instant end) { + final ObjectId startObjectId = instantToObjectIdBoundary(start); + final ObjectId endObjectId = instantToObjectIdBoundary(end.plus(1L, ChronoUnit.SECONDS)); + log.debug("Limiting query to ObjectIds $gte {} and $lt {}", startObjectId, endObjectId); + return toDocument(new Object[][]{ + {ID, toDocument(new Object[][]{ + {GTE, startObjectId}, + {LT, endObjectId} + })} + }); + } + + /* Create a ObjectID boundary from a timestamp to be used for comparison in MongoDB queries. */ + private static ObjectId instantToObjectIdBoundary(final Instant instant) { + // MongoDBObject IDs only contain dates with precision of seconds, thus adjust the range of the query + // appropriately to make sure a client does not miss data when providing Instants with higher precision. + // + // Do not use + // + // new ObjectId(Date.from(startTruncatedToSecs)) + // + // to compute object ID boundaries. The 1-argument constructor above appends incidental non-zero bits after + // the timestamp and may filter out events persisted after 'instant' if they happen to have + // a lower machine ID, process ID or counter value. (A MongoDB ObjectID is a byte array with fields for + // timestamp, machine ID, process ID and counter such that timestamp occupies the most significant bits.) + return new ObjectId(Date.from(instant.truncatedTo(ChronoUnit.SECONDS)), 0, (short) 0, 0); + } + + private static Document toDocument(final Object[][] keyValuePairs) { + final Map map = new HashMap<>(keyValuePairs.length); + for (Object[] keyValuePair : keyValuePairs) { + map.put(keyValuePair[0].toString(), keyValuePair[1]); + } + return new Document(map); + } + + /** + * Resolve event journal collection name from an Akka configuration object. + *

+ * It assumes that in the Akka system configuration, + *

    + *
  • + * {@code akka.persistence.journal.auto-start-journals} contains exactly 1 configuration key {@code + * }, + *
  • + *
  • + * {@code .overrides.journal-collection} is defined and equal to the name of the event journal + * collection. + *
  • + *
+ *

+ * + * @param config The configuration. + * @return The name of the event journal collection. + * @throws IllegalArgumentException if {@code akka.persistence.journal.auto-start-journal} is not a singleton list. + * @throws com.typesafe.config.ConfigException.Missing if a relevant config value is missing. + * @throws com.typesafe.config.ConfigException.WrongType if a relevant config value has not the expected type. + */ + private static String resolveJournalCollectionName(final Config config) { + final List autoStartJournals = config.getStringList(AKKA_PERSISTENCE_JOURNAL_AUTO_START_JOURNALS); + if (autoStartJournals.size() != 1) { + final String message = String.format("Expect %s to be a singleton list, but it is List(%s)", + AKKA_PERSISTENCE_JOURNAL_AUTO_START_JOURNALS, + autoStartJournals.stream().collect(Collectors.joining(", "))); + throw new IllegalArgumentException(message); + } else { + final String journalKey = autoStartJournals.get(0); + final String journalCollectionNameKey = journalKey + JOURNAL_COLLECTION_NAME_SUFFIX; + return config.getString(journalCollectionNameKey); + } + } +} diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/PidWithSeqNr.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/PidWithSeqNr.java new file mode 100644 index 0000000000..dd75f502fa --- /dev/null +++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/streaming/PidWithSeqNr.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2017 Bosch Software Innovations GmbH. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * which accompanies this distribution, and is available at + * https://www.eclipse.org/org/documents/epl-2.0/index.php + * + * Contributors: + * Bosch Software Innovations GmbH - initial contribution + */ +package org.eclipse.ditto.services.utils.persistence.mongo.streaming; + +import java.util.Objects; + +/** + * A pair of {@code String} and {@code long} to store persistence ID and sequence number of an event in a journal. + */ +public final class PidWithSeqNr { + + private final String persistenceId; + private final long sequenceNr; + + /** + * Creates a pair from {@code String} and {@code long}. + * + * @param persistenceId the Akka persistence persistenceId. + * @param sequenceNr the sequence number. + */ + public PidWithSeqNr(final String persistenceId, final long sequenceNr) { + this.persistenceId = persistenceId; + this.sequenceNr = sequenceNr; + } + + /** + * Retrieve the persistence ID. + * + * @return The persistence ID. + */ + public String getPersistenceId() { + return persistenceId; + } + + /** + * Retrieve the sequence number. + * + * @return The sequence number. + */ + public long getSequenceNr() { + return sequenceNr; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } else if (o == null || getClass() != o.getClass()) { + return false; + } else { + final PidWithSeqNr that = (PidWithSeqNr) o; + return Objects.equals(persistenceId, that.persistenceId) && + sequenceNr == that.sequenceNr; + } + } + + @Override + public int hashCode() { + return Objects.hash(persistenceId, sequenceNr); + } + + @Override + public String toString() { + return getClass().getSimpleName() + " [" + + "persistenceId=" + persistenceId + + ", sequenceNr=" + sequenceNr + + "]"; + } +} diff --git a/services/utils/persistence/src/test/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActorTest.java b/services/utils/persistence/src/test/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActorTest.java index b574176d81..b5ee0ea010 100644 --- a/services/utils/persistence/src/test/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActorTest.java +++ b/services/utils/persistence/src/test/java/org/eclipse/ditto/services/utils/persistence/mongo/DefaultPersistenceStreamingActorTest.java @@ -22,8 +22,8 @@ import org.eclipse.ditto.model.base.headers.DittoHeaders; import org.eclipse.ditto.services.models.streaming.AbstractEntityIdWithRevision; import org.eclipse.ditto.services.models.streaming.SudoStreamModifiedEntities; -import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.DittoJavaDslMongoReadJournal; -import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr; +import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal; +import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr; import org.eclipse.ditto.signals.commands.base.Command; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -77,7 +77,7 @@ public void retrieveEmptyStream() { @Test public void retrieveNonEmptyStream() { new TestKit(actorSystem) {{ - final Source mockedSource = Source.single(PidWithSeqNr.apply(ID, REVISION)); + final Source mockedSource = Source.single(new PidWithSeqNr(ID, REVISION)); final ActorRef underTest = createPersistenceQueriesActor(mockedSource); final Command command = createStreamingRequest(); @@ -97,10 +97,12 @@ private Command createStreamingRequest() { } private static ActorRef createPersistenceQueriesActor(final Source mockedSource) { - final DittoJavaDslMongoReadJournal mock = mock(DittoJavaDslMongoReadJournal.class); - when(mock.sequenceNumbersOfPidsByInterval(any(), any())).thenReturn(mockedSource); - final Props props = DefaultPersistenceStreamingActor.props(100, - DefaultPersistenceStreamingActorTest::mapEntity, mock); + final MongoClientWrapper mockClient = mock(MongoClientWrapper.class); + final MongoReadJournal mockJournal = mock(MongoReadJournal.class); + when(mockJournal.getPidWithSeqNrsByInterval(any(), any())).thenReturn(mockedSource); + final Props props = Props.create(DefaultPersistenceStreamingActor.class, () -> + new DefaultPersistenceStreamingActor<>(100, + DefaultPersistenceStreamingActorTest::mapEntity, mockJournal, mockClient)); return actorSystem.actorOf(props, "persistenceQueriesActor-" + UUID.randomUUID()); } @@ -109,10 +111,11 @@ private static void sendCommand(final TestKit testKit, final ActorRef actorRef, } private static SimpleEntityIdWithRevision mapEntity(final PidWithSeqNr pidWithSeqNr) { - return new SimpleEntityIdWithRevision(pidWithSeqNr.persistenceId(), pidWithSeqNr.sequenceNr()); + return new SimpleEntityIdWithRevision(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr()); } private static final class SimpleEntityIdWithRevision extends AbstractEntityIdWithRevision { + private SimpleEntityIdWithRevision(final String id, final long revision) { super(id, revision); }