From 9b0537d419afd7bbe8ca99865bd1100349c46b8e Mon Sep 17 00:00:00 2001 From: Cai Yufei Date: Thu, 16 Nov 2017 11:03:36 +0100 Subject: [PATCH] add abstract streaming actor for persistence ID and sequence number Signed-off-by: Cai Yufei --- .../sudo/SudoStreamModifiedEntities.java | 50 ++++++++----- .../CommandAndEventJsonExamplesProducer.java | 5 +- .../sudo/SudoStreamModifiedEntitiesTest.java | 4 +- services/utils/persistence/pom.xml | 4 ++ .../AbstractPersistenceStreamingActor.java | 72 +++++++++++++++++++ 5 files changed, 111 insertions(+), 24 deletions(-) create mode 100644 services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java diff --git a/services/models/things/src/main/java/org/eclipse/ditto/services/models/things/commands/sudo/SudoStreamModifiedEntities.java b/services/models/things/src/main/java/org/eclipse/ditto/services/models/things/commands/sudo/SudoStreamModifiedEntities.java index 92f9fb5d9a..6e6c1bdafb 100755 --- a/services/models/things/src/main/java/org/eclipse/ditto/services/models/things/commands/sudo/SudoStreamModifiedEntities.java +++ b/services/models/things/src/main/java/org/eclipse/ditto/services/models/things/commands/sudo/SudoStreamModifiedEntities.java @@ -64,9 +64,6 @@ public final class SudoStreamModifiedEntities extends AbstractCommand JSON_ELEMENTS_PER_SECOND = JsonFactory.newIntFieldDefinition("payload/elementsPerSecond", REGULAR, V_1, V_2); - static final JsonFieldDefinition JSON_MAX_QUERY_TIME = - JsonFactory.newStringFieldDefinition("payload/maxQueryTime", REGULAR, V_1, V_2); - static final JsonFieldDefinition JSON_ELEMENT_RECIPIENT = JsonFactory.newStringFieldDefinition("payload/elementRecipient", REGULAR, V_1, V_2); @@ -82,14 +79,11 @@ public final class SudoStreamModifiedEntities extends AbstractCommand thePredicate) { @@ -191,19 +208,18 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Js jsonObjectBuilder.set(JSON_TIMESPAN, timespan.toString(), predicate); jsonObjectBuilder.set(JSON_OFFSET, offset.toString(), predicate); jsonObjectBuilder.set(JSON_ELEMENTS_PER_SECOND, elementsPerSecond, predicate); - jsonObjectBuilder.set(JSON_MAX_QUERY_TIME, maxQueryTime.toString(), predicate); jsonObjectBuilder.set(JSON_ELEMENT_RECIPIENT, elementRecipient, predicate); jsonObjectBuilder.set(JSON_STATUS_RECIPIENT, statusRecipient, predicate); } @Override public SudoStreamModifiedEntities setDittoHeaders(final DittoHeaders dittoHeaders) { - return of(timespan, offset, elementsPerSecond, maxQueryTime, elementRecipient, statusRecipient, dittoHeaders); + return of(timespan, offset, elementsPerSecond, elementRecipient, statusRecipient, dittoHeaders); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), timespan, offset, elementsPerSecond, maxQueryTime, elementRecipient, + return Objects.hash(super.hashCode(), timespan, offset, elementsPerSecond, elementRecipient, statusRecipient); } @@ -219,7 +235,6 @@ public boolean equals(@Nullable final Object obj) { final SudoStreamModifiedEntities that = (SudoStreamModifiedEntities) obj; return that.canEqual(this) && Objects.equals(timespan, that.timespan) && Objects.equals(offset, that.offset) && Objects.equals(elementsPerSecond, that.elementsPerSecond) - && Objects.equals(maxQueryTime, that.maxQueryTime) && Objects.equals(elementRecipient, that.elementRecipient) && Objects.equals(statusRecipient, that.statusRecipient) && super.equals(that); @@ -236,7 +251,6 @@ public String toString() { + ", timespan=" + timespan + ", offset= " + offset + ", elementsPerSecond= " + elementsPerSecond - + ", maxQueryTime= " + maxQueryTime + ", elementRecipient=" + elementRecipient + ", statusRecipient=" + statusRecipient + "]"; diff --git a/services/models/things/src/test/java/org/eclipse/ditto/services/models/things/CommandAndEventJsonExamplesProducer.java b/services/models/things/src/test/java/org/eclipse/ditto/services/models/things/CommandAndEventJsonExamplesProducer.java index a036d2df81..cd42be7d4a 100755 --- a/services/models/things/src/test/java/org/eclipse/ditto/services/models/things/CommandAndEventJsonExamplesProducer.java +++ b/services/models/things/src/test/java/org/eclipse/ditto/services/models/things/CommandAndEventJsonExamplesProducer.java @@ -112,13 +112,12 @@ private static void produceSudoCommands(final Path rootPath) throws IOException final Duration timespan = Duration.ofMinutes(5); final Duration offset = Duration.ofMinutes(1); final int elementsPerSecond = 100; - final Duration maxQueryTime = Duration.ofMinutes(4); final String elementRecipient = "akka.tcp://actorSystem@hostname/user/elementRecipientActor"; final String statusRecipient = "akka.tcp://actorSystem@hostname/user/statusRecipientActor"; final SudoStreamModifiedEntities sudoRetrieveModifiedThingTags = - SudoStreamModifiedEntities.of(timespan, offset, elementsPerSecond, maxQueryTime, elementRecipient, - statusRecipient, TestConstants.EMPTY_HEADERS); + SudoStreamModifiedEntities.of(timespan, offset, elementsPerSecond, elementRecipient, statusRecipient, + TestConstants.EMPTY_HEADERS); writeJson(sudoCommandsDir.resolve(Paths.get("sudoStreamModifiedEntities.json")), sudoRetrieveModifiedThingTags); diff --git a/services/models/things/src/test/java/org/eclipse/ditto/services/models/things/commands/sudo/SudoStreamModifiedEntitiesTest.java b/services/models/things/src/test/java/org/eclipse/ditto/services/models/things/commands/sudo/SudoStreamModifiedEntitiesTest.java index e4ae1e4d57..d7f33c30a9 100755 --- a/services/models/things/src/test/java/org/eclipse/ditto/services/models/things/commands/sudo/SudoStreamModifiedEntitiesTest.java +++ b/services/models/things/src/test/java/org/eclipse/ditto/services/models/things/commands/sudo/SudoStreamModifiedEntitiesTest.java @@ -37,7 +37,6 @@ public final class SudoStreamModifiedEntitiesTest { private static final Duration KNOWN_TIMESPAN = Duration.ofMinutes(5); private static final Duration KNOWN_OFFSET = Duration.ofMinutes(1); private static final int KNOWN_ELEMENTS_PER_SECOND = 1234; - private static final Duration KNOWN_MAX_QUERY_TIME = Duration.ofDays(365); private static final String KNOWN_ELEMENT_RECIPIENT = "akka.tcp://known@element/user/recipient"; private static final String KNOWN_STATUS_RECIPIENT = "akka.tcp://known@status/user/recipient"; @@ -46,7 +45,6 @@ public final class SudoStreamModifiedEntitiesTest { .set(SudoStreamModifiedEntities.JSON_TIMESPAN, KNOWN_TIMESPAN.toString()) .set(SudoStreamModifiedEntities.JSON_OFFSET, KNOWN_OFFSET.toString()) .set(SudoStreamModifiedEntities.JSON_ELEMENTS_PER_SECOND, KNOWN_ELEMENTS_PER_SECOND) - .set(SudoStreamModifiedEntities.JSON_MAX_QUERY_TIME, KNOWN_MAX_QUERY_TIME.toString()) .set(SudoStreamModifiedEntities.JSON_ELEMENT_RECIPIENT, KNOWN_ELEMENT_RECIPIENT) .set(SudoStreamModifiedEntities.JSON_STATUS_RECIPIENT, KNOWN_STATUS_RECIPIENT) .build(); @@ -71,7 +69,7 @@ public void testHashCodeAndEquals() { public void toJsonReturnsExpected() { final SudoStreamModifiedEntities underTest = SudoStreamModifiedEntities.of(KNOWN_TIMESPAN, KNOWN_OFFSET, KNOWN_ELEMENTS_PER_SECOND, - KNOWN_MAX_QUERY_TIME, KNOWN_ELEMENT_RECIPIENT, KNOWN_STATUS_RECIPIENT, EMPTY_DITTO_HEADERS); + KNOWN_ELEMENT_RECIPIENT, KNOWN_STATUS_RECIPIENT, EMPTY_DITTO_HEADERS); final JsonObject actualJson = underTest.toJson(FieldType.regularOrSpecial()); assertThat(actualJson).isEqualTo(KNOWN_JSON); diff --git a/services/utils/persistence/pom.xml b/services/utils/persistence/pom.xml index 01c4e94f0f..8f71c05018 100755 --- a/services/utils/persistence/pom.xml +++ b/services/utils/persistence/pom.xml @@ -39,6 +39,10 @@ ditto-services-utils-health ${project.version} + + org.eclipse.ditto + ditto-services-models-things + com.typesafe.akka diff --git a/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java new file mode 100644 index 0000000000..3376a4ad36 --- /dev/null +++ b/services/utils/persistence/src/main/java/org/eclipse/ditto/services/utils/persistence/mongo/AbstractPersistenceStreamingActor.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2017 Bosch Software Innovations GmbH. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * which accompanies this distribution, and is available at + * https://www.eclipse.org/org/documents/epl-2.0/index.php + * + * Contributors: + * Bosch Software Innovations GmbH - initial contribution + */ +package org.eclipse.ditto.services.utils.persistence.mongo; + +import org.eclipse.ditto.services.models.things.commands.sudo.SudoStreamModifiedEntities; +import org.eclipse.ditto.services.utils.akka.streaming.AbstractStreamingActor; + +import akka.NotUsed; +import akka.actor.ActorRef; +import akka.actor.ActorRefProvider; +import akka.contrib.persistence.mongodb.DittoJavaDslMongoReadJournal; +import akka.stream.javadsl.Source; + +/** + * Actor that streams information about persisted entities modified in a time window in the past. + */ +public abstract class AbstractPersistenceStreamingActor + extends AbstractStreamingActor { + + private final ActorRefProvider actorRefProvider = getContext().system().provider(); + + /** + * Returns the journal to query for entities modified in a time window in the past. + * + * @return The journal. + */ + protected abstract DittoJavaDslMongoReadJournal getJournal(); + + /** + * Transforms persistence ID and sequence number into an object to stream to the recipient actor. + * + * @param pid The persistence ID. + * @param sequenceNumber The latest sequence number. + * @return Element to stream to the recipient actor. + */ + protected abstract T createElement(final String pid, final long sequenceNumber); + + @Override + protected final Class getCommandClass() { + return SudoStreamModifiedEntities.class; + } + + @Override + protected final ActorRef getElementRecipient(final SudoStreamModifiedEntities command) { + return actorRefProvider.resolveActorRef(command.getElementRecipient()); + } + + @Override + protected final ActorRef getStatusRecipient(final SudoStreamModifiedEntities command) { + return actorRefProvider.resolveActorRef(command.getStatusRecipient()); + } + + @Override + protected final int getElementsPerSecond(final SudoStreamModifiedEntities command) { + return command.getElementsPerSecond(); + } + + @Override + protected final Source createSource(final SudoStreamModifiedEntities command) { + return getJournal().sequenceNumbersOfPidsByDuration(command.getTimespan(), command.getOffset()) + .map(pidWithSeqNr -> createElement(pidWithSeqNr.persistenceId(), pidWithSeqNr.sequenceNr())); + } +}