Skip to content

Commit

Permalink
add MongoReadJournal to access event journal from streaming actors
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Yufei <Yufei.Cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Dec 20, 2017
1 parent 4ffb50d commit f69482a
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 59 deletions.
Expand Up @@ -13,8 +13,10 @@

import org.eclipse.ditto.services.models.policies.PolicyTag;
import org.eclipse.ditto.services.policies.persistence.actors.policy.PolicyPersistenceActor;
import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr;
import org.eclipse.ditto.services.utils.persistence.mongo.DefaultPersistenceStreamingActor;
import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr;

import com.typesafe.config.Config;

import akka.actor.Props;

Expand All @@ -32,21 +34,22 @@ public final class PoliciesPersistenceStreamingActorCreator {
private PoliciesPersistenceStreamingActorCreator() {
throw new AssertionError();
}

/**
* Creates Akka configuration object Props for this PersistenceQueriesActor.
*
* @param config the actor system configuration.
* @param streamingCacheSize the size of the streaming cache.
* @return the Akka configuration Props object.
*/
public static Props props(final int streamingCacheSize) {
return DefaultPersistenceStreamingActor.props(streamingCacheSize,
public static Props props(final Config config, final int streamingCacheSize) {
return DefaultPersistenceStreamingActor.props(config, streamingCacheSize,
PoliciesPersistenceStreamingActorCreator::createElement);
}

private static PolicyTag createElement(final PidWithSeqNr pidWithSeqNr) {
final String id = pidWithSeqNr.persistenceId()
final String id = pidWithSeqNr.getPersistenceId()
.replaceFirst(PolicyPersistenceActor.PERSISTENCE_ID_PREFIX, "");
return PolicyTag.of(id, pidWithSeqNr.sequenceNr());
return PolicyTag.of(id, pidWithSeqNr.getSequenceNr());
}
}
Expand Up @@ -135,7 +135,7 @@ private PoliciesRootActor(final Config config, final ActorRef pubSubMediator,

final int tagsStreamingCacheSize = config.getInt(ConfigKeys.POLICIES_TAGS_STREAMING_CACHE_SIZE);
final ActorRef persistenceStreamingActor = startChildActor(PoliciesPersistenceStreamingActorCreator.ACTOR_NAME,
PoliciesPersistenceStreamingActorCreator.props(tagsStreamingCacheSize));
PoliciesPersistenceStreamingActorCreator.props(config, tagsStreamingCacheSize));

pubSubMediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
pubSubMediator.tell(new DistributedPubSubMediator.Put(persistenceStreamingActor), getSelf());
Expand Down
Expand Up @@ -12,8 +12,10 @@
package org.eclipse.ditto.services.things.persistence.actors;

import org.eclipse.ditto.services.models.things.ThingTag;
import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr;
import org.eclipse.ditto.services.utils.persistence.mongo.DefaultPersistenceStreamingActor;
import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr;

import com.typesafe.config.Config;

import akka.actor.Props;

Expand All @@ -35,17 +37,18 @@ private ThingsPersistenceStreamingActorCreator() {
/**
* Creates Akka configuration object Props for this PersistenceQueriesActor.
*
* @param config the actor system configuration.
* @param streamingCacheSize the size of the streaming cache.
* @return the Akka configuration Props object.
*/
public static Props props(final int streamingCacheSize) {
return DefaultPersistenceStreamingActor.props(streamingCacheSize,
public static Props props(final Config config, final int streamingCacheSize) {
return DefaultPersistenceStreamingActor.props(config, streamingCacheSize,
ThingsPersistenceStreamingActorCreator::createElement);
}

private static ThingTag createElement(final PidWithSeqNr pidWithSeqNr) {
final String id = pidWithSeqNr.persistenceId()
final String id = pidWithSeqNr.getPersistenceId()
.replaceFirst(ThingPersistenceActor.PERSISTENCE_ID_PREFIX, "");
return ThingTag.of(id, pidWithSeqNr.sequenceNr());
return ThingTag.of(id, pidWithSeqNr.getSequenceNr());
}
}
Expand Up @@ -15,45 +15,42 @@

import java.util.function.Function;

import javax.annotation.Nullable;

import org.eclipse.ditto.services.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.services.models.streaming.SudoStreamModifiedEntities;
import org.eclipse.ditto.services.utils.akka.streaming.AbstractStreamingActor;
import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.DittoJavaDslMongoReadJournal;
import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.DittoMongoReadJournal;
import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr;
import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

import akka.NotUsed;
import akka.persistence.query.PersistenceQuery;
import akka.stream.javadsl.Source;

/**
* Abstract implementation of an Actor that streams information about persisted entities modified in a time window in
* the past.
*/
@AllValuesAreNonnullByDefault
public abstract class AbstractPersistenceStreamingActor<T extends EntityIdWithRevision>
extends AbstractStreamingActor<SudoStreamModifiedEntities, T> {

private final int streamingCacheSize;
private final Function<PidWithSeqNr, T> entityMapper;
private final DittoJavaDslMongoReadJournal readJournal;
private final MongoReadJournal readJournal;

/**
* Constructor.
*
* @param streamingCacheSize the size of the streaming cache.
* @param entityMapper the mapper used to map {@link PidWithSeqNr} to {@code T}. The resulting entity will be
* streamed to the recipient actor.
* @param readJournal the journal to query for entities modified in a time window in the past. If {@code null},
* the read journal will be retrieved from the actor context.
* @param readJournal the journal to query for entities modified in a time window in the past. If {@code null}, the
* read journal will be retrieved from the actor context.
*/
protected AbstractPersistenceStreamingActor(final int streamingCacheSize,
final Function<PidWithSeqNr, T> entityMapper, @Nullable final DittoJavaDslMongoReadJournal readJournal) {
final Function<PidWithSeqNr, T> entityMapper, final MongoReadJournal readJournal) {
this.streamingCacheSize = streamingCacheSize;
this.entityMapper = requireNonNull(entityMapper);
this.readJournal = readJournal != null ? readJournal : PersistenceQuery.get(getContext().getSystem())
.getReadJournalFor(DittoJavaDslMongoReadJournal.class, DittoMongoReadJournal.Identifier());
this.readJournal = readJournal;
}

@Override
Expand All @@ -74,16 +71,16 @@ protected final Source<T, NotUsed> createSource(final SudoStreamModifiedEntities

// create a separate cache per stream (don't use member variable!)
final ComparableCache<String, Long> cache = new ComparableCache<>(streamingCacheSize);
return readJournal.sequenceNumbersOfPidsByInterval(command.getStart(), command.getEnd())
return readJournal.getPidWithSeqNrsByInterval(command.getStart(), command.getEnd())
.log(unfilteredStreamingLogName, log)
// avoid unnecessary streaming of old sequence numbers
.filter(pidWithSeqNr -> cache.updateIfNewOrGreater(pidWithSeqNr.persistenceId(),
pidWithSeqNr.sequenceNr()))
.filter(pidWithSeqNr ->
cache.updateIfNewOrGreater(pidWithSeqNr.getPersistenceId(), pidWithSeqNr.getSequenceNr()))
.map(this::mapEntity)
.log(filteredStreamingLogName, log);
}

private T mapEntity(final PidWithSeqNr pidWithSeqNr) {
return entityMapper.apply(pidWithSeqNr) ;
return entityMapper.apply(pidWithSeqNr);
}
}
Expand Up @@ -14,54 +14,57 @@
import java.util.function.Function;

import org.eclipse.ditto.services.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.DittoJavaDslMongoReadJournal;
import org.eclipse.ditto.services.utils.akkapersistence.mongoaddons.PidWithSeqNr;
import org.eclipse.ditto.services.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.services.utils.persistence.mongo.streaming.PidWithSeqNr;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

import com.typesafe.config.Config;

import akka.actor.Props;
import akka.japi.Creator;


/**
* Configurable default implementation of {@link AbstractPersistenceStreamingActor}.
*/
@AllValuesAreNonnullByDefault
public final class DefaultPersistenceStreamingActor<T extends EntityIdWithRevision>
extends AbstractPersistenceStreamingActor<T> {

private DefaultPersistenceStreamingActor(final int streamingCacheSize,
final Function<PidWithSeqNr, T> entityMapper) {
this(streamingCacheSize, entityMapper, null);
}
private final MongoClientWrapper mongoClientWrapper;

DefaultPersistenceStreamingActor(final int streamingCacheSize,
final Function<PidWithSeqNr, T> entityMapper,
final MongoReadJournal readJournal,
final MongoClientWrapper mongoClientWrapper) {

private DefaultPersistenceStreamingActor(final int streamingCacheSize,
final Function<PidWithSeqNr, T> entityMapper, final DittoJavaDslMongoReadJournal readJournal) {
super(streamingCacheSize, entityMapper, readJournal);
this.mongoClientWrapper = mongoClientWrapper;
}

/**
* Creates Akka configuration object Props for this PersistenceStreamingActor.
*
* @param config the configuration of the akka system.
* @param streamingCacheSize the size of the streaming cache.
* @param entityMapper the mapper used to map {@link PidWithSeqNr} to {@code T}. The resulting entity will be
* streamed to the recipient actor.
* @return the Akka configuration Props object.
*/
public static <T extends EntityIdWithRevision> Props props(final int streamingCacheSize,
public static <T extends EntityIdWithRevision> Props props(final Config config,
final int streamingCacheSize,
final Function<PidWithSeqNr, T> entityMapper) {

return Props.create(DefaultPersistenceStreamingActor.class,
() -> new DefaultPersistenceStreamingActor<>(streamingCacheSize, entityMapper));
return Props.create(DefaultPersistenceStreamingActor.class, () -> {
final MongoClientWrapper mongoClient = MongoClientWrapper.newInstance(config);
final MongoReadJournal readJournal = MongoReadJournal.newInstance(config, mongoClient);
return new DefaultPersistenceStreamingActor<>(streamingCacheSize, entityMapper, readJournal, mongoClient);
});
}

static <T extends EntityIdWithRevision> Props props(final int streamingCacheSize,
final Function<PidWithSeqNr, T> entityMapper, final DittoJavaDslMongoReadJournal readJournal) {
return Props.create(DefaultPersistenceStreamingActor.class, new Creator<DefaultPersistenceStreamingActor>() {
private static final long serialVersionUID = 1L;

@Override
public DefaultPersistenceStreamingActor create() {
return new DefaultPersistenceStreamingActor<>(streamingCacheSize, entityMapper, readJournal);
}
});
@Override
public void postStop() throws Exception {
mongoClientWrapper.close();
super.postStop();
}

}
Expand Up @@ -34,7 +34,8 @@
/**
* MongoDB Client Wrapper.
*/
public final class MongoClientWrapper implements Closeable {
public class MongoClientWrapper implements Closeable {
// not final to test with Mockito

private final MongoClient mongoClient;
private final MongoDatabase mongoDatabase;
Expand Down

0 comments on commit f69482a

Please sign in to comment.