Skip to content

Commit

Permalink
add abstract streaming actor for persistence ID and sequence number
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Yufei <Yufei.Cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Nov 16, 2017
1 parent 47b9f74 commit 9b0537d
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 24 deletions.
Expand Up @@ -64,9 +64,6 @@ public final class SudoStreamModifiedEntities extends AbstractCommand<SudoStream
static final JsonFieldDefinition<Integer> JSON_ELEMENTS_PER_SECOND =
JsonFactory.newIntFieldDefinition("payload/elementsPerSecond", REGULAR, V_1, V_2);

static final JsonFieldDefinition<String> JSON_MAX_QUERY_TIME =
JsonFactory.newStringFieldDefinition("payload/maxQueryTime", REGULAR, V_1, V_2);

static final JsonFieldDefinition<String> JSON_ELEMENT_RECIPIENT =
JsonFactory.newStringFieldDefinition("payload/elementRecipient", REGULAR, V_1, V_2);

Expand All @@ -82,14 +79,11 @@ public final class SudoStreamModifiedEntities extends AbstractCommand<SudoStream

private final int elementsPerSecond;

private final Duration maxQueryTime;

private final String elementRecipient;

private final String statusRecipient;

private SudoStreamModifiedEntities(final Duration timespan, final Duration offset,
final int elementsPerSecond, final Duration maxQueryTime,
private SudoStreamModifiedEntities(final Duration timespan, final Duration offset, final int elementsPerSecond,
final String elementRecipient, final String statusRecipient, final DittoHeaders dittoHeaders) {
super(TYPE, dittoHeaders);

Expand All @@ -98,7 +92,6 @@ private SudoStreamModifiedEntities(final Duration timespan, final Duration offse
this.elementRecipient = elementRecipient;
this.statusRecipient = statusRecipient;
this.elementsPerSecond = elementsPerSecond;
this.maxQueryTime = maxQueryTime;
}

/**
Expand All @@ -113,9 +106,9 @@ private SudoStreamModifiedEntities(final Duration timespan, final Duration offse
* @throws NullPointerException if any argument is {@code null}.
*/
public static SudoStreamModifiedEntities of(final Duration timespan, final Duration offset,
final int elementsPerSecond, final Duration maxQueryTime,
final String elementRecipient, final String statusRecipient, final DittoHeaders dittoHeaders) {
return new SudoStreamModifiedEntities(timespan, offset, elementsPerSecond, maxQueryTime, elementRecipient,
final int elementsPerSecond, final String elementRecipient, final String statusRecipient,
final DittoHeaders dittoHeaders) {
return new SudoStreamModifiedEntities(timespan, offset, elementsPerSecond, elementRecipient,
statusRecipient, dittoHeaders);
}

Expand Down Expand Up @@ -152,10 +145,9 @@ public static SudoStreamModifiedEntities fromJson(final JsonObject jsonObject,
final Duration extractedTimespan = Duration.parse(jsonObject.getValueOrThrow(JSON_TIMESPAN));
final Duration extractedOffset = Duration.parse(jsonObject.getValue(JSON_OFFSET).orElse(NULL_OFFSET));
final int elementsPerSecond = jsonObject.getValueOrThrow(JSON_ELEMENTS_PER_SECOND);
final Duration maxQueryTime = Duration.parse(jsonObject.getValueOrThrow(JSON_MAX_QUERY_TIME));
final String elementRecipient = jsonObject.getValueOrThrow(JSON_ELEMENT_RECIPIENT);
final String statusRecipient = jsonObject.getValueOrThrow(JSON_STATUS_RECIPIENT);
return SudoStreamModifiedEntities.of(extractedTimespan, extractedOffset, elementsPerSecond, maxQueryTime,
return SudoStreamModifiedEntities.of(extractedTimespan, extractedOffset, elementsPerSecond,
elementRecipient, statusRecipient, dittoHeaders);
} catch (final DateTimeParseException e) {
throw JsonParseException.newBuilder()
Expand Down Expand Up @@ -184,26 +176,50 @@ public Duration getOffset() {
return offset;
}

/**
* Returns the streaming rate.
*
* @return Number of elements to stream per second.
*/
public int getElementsPerSecond() {
return elementsPerSecond;
}

/**
* Returns the serialized actor reference to stream elements to.
*
* @return The actor reference.
*/
public String getElementRecipient() {
return elementRecipient;
}

/**
* Returns the serialized actor reference to stream status messages to.
*/
public String getStatusRecipient() {
return statusRecipient;
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> thePredicate) {
final Predicate<JsonField> predicate = schemaVersion.and(thePredicate);
jsonObjectBuilder.set(JSON_TIMESPAN, timespan.toString(), predicate);
jsonObjectBuilder.set(JSON_OFFSET, offset.toString(), predicate);
jsonObjectBuilder.set(JSON_ELEMENTS_PER_SECOND, elementsPerSecond, predicate);
jsonObjectBuilder.set(JSON_MAX_QUERY_TIME, maxQueryTime.toString(), predicate);
jsonObjectBuilder.set(JSON_ELEMENT_RECIPIENT, elementRecipient, predicate);
jsonObjectBuilder.set(JSON_STATUS_RECIPIENT, statusRecipient, predicate);
}

@Override
public SudoStreamModifiedEntities setDittoHeaders(final DittoHeaders dittoHeaders) {
return of(timespan, offset, elementsPerSecond, maxQueryTime, elementRecipient, statusRecipient, dittoHeaders);
return of(timespan, offset, elementsPerSecond, elementRecipient, statusRecipient, dittoHeaders);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), timespan, offset, elementsPerSecond, maxQueryTime, elementRecipient,
return Objects.hash(super.hashCode(), timespan, offset, elementsPerSecond, elementRecipient,
statusRecipient);
}

Expand All @@ -219,7 +235,6 @@ public boolean equals(@Nullable final Object obj) {
final SudoStreamModifiedEntities that = (SudoStreamModifiedEntities) obj;
return that.canEqual(this) && Objects.equals(timespan, that.timespan) && Objects.equals(offset, that.offset)
&& Objects.equals(elementsPerSecond, that.elementsPerSecond)
&& Objects.equals(maxQueryTime, that.maxQueryTime)
&& Objects.equals(elementRecipient, that.elementRecipient)
&& Objects.equals(statusRecipient, that.statusRecipient)
&& super.equals(that);
Expand All @@ -236,7 +251,6 @@ public String toString() {
+ ", timespan=" + timespan
+ ", offset= " + offset
+ ", elementsPerSecond= " + elementsPerSecond
+ ", maxQueryTime= " + maxQueryTime
+ ", elementRecipient=" + elementRecipient
+ ", statusRecipient=" + statusRecipient
+ "]";
Expand Down
Expand Up @@ -112,13 +112,12 @@ private static void produceSudoCommands(final Path rootPath) throws IOException
final Duration timespan = Duration.ofMinutes(5);
final Duration offset = Duration.ofMinutes(1);
final int elementsPerSecond = 100;
final Duration maxQueryTime = Duration.ofMinutes(4);
final String elementRecipient = "akka.tcp://actorSystem@hostname/user/elementRecipientActor";
final String statusRecipient = "akka.tcp://actorSystem@hostname/user/statusRecipientActor";

final SudoStreamModifiedEntities sudoRetrieveModifiedThingTags =
SudoStreamModifiedEntities.of(timespan, offset, elementsPerSecond, maxQueryTime, elementRecipient,
statusRecipient, TestConstants.EMPTY_HEADERS);
SudoStreamModifiedEntities.of(timespan, offset, elementsPerSecond, elementRecipient, statusRecipient,
TestConstants.EMPTY_HEADERS);
writeJson(sudoCommandsDir.resolve(Paths.get("sudoStreamModifiedEntities.json")),
sudoRetrieveModifiedThingTags);

Expand Down
Expand Up @@ -37,7 +37,6 @@ public final class SudoStreamModifiedEntitiesTest {
private static final Duration KNOWN_TIMESPAN = Duration.ofMinutes(5);
private static final Duration KNOWN_OFFSET = Duration.ofMinutes(1);
private static final int KNOWN_ELEMENTS_PER_SECOND = 1234;
private static final Duration KNOWN_MAX_QUERY_TIME = Duration.ofDays(365);
private static final String KNOWN_ELEMENT_RECIPIENT = "akka.tcp://known@element/user/recipient";
private static final String KNOWN_STATUS_RECIPIENT = "akka.tcp://known@status/user/recipient";

Expand All @@ -46,7 +45,6 @@ public final class SudoStreamModifiedEntitiesTest {
.set(SudoStreamModifiedEntities.JSON_TIMESPAN, KNOWN_TIMESPAN.toString())
.set(SudoStreamModifiedEntities.JSON_OFFSET, KNOWN_OFFSET.toString())
.set(SudoStreamModifiedEntities.JSON_ELEMENTS_PER_SECOND, KNOWN_ELEMENTS_PER_SECOND)
.set(SudoStreamModifiedEntities.JSON_MAX_QUERY_TIME, KNOWN_MAX_QUERY_TIME.toString())
.set(SudoStreamModifiedEntities.JSON_ELEMENT_RECIPIENT, KNOWN_ELEMENT_RECIPIENT)
.set(SudoStreamModifiedEntities.JSON_STATUS_RECIPIENT, KNOWN_STATUS_RECIPIENT)
.build();
Expand All @@ -71,7 +69,7 @@ public void testHashCodeAndEquals() {
public void toJsonReturnsExpected() {
final SudoStreamModifiedEntities underTest =
SudoStreamModifiedEntities.of(KNOWN_TIMESPAN, KNOWN_OFFSET, KNOWN_ELEMENTS_PER_SECOND,
KNOWN_MAX_QUERY_TIME, KNOWN_ELEMENT_RECIPIENT, KNOWN_STATUS_RECIPIENT, EMPTY_DITTO_HEADERS);
KNOWN_ELEMENT_RECIPIENT, KNOWN_STATUS_RECIPIENT, EMPTY_DITTO_HEADERS);
final JsonObject actualJson = underTest.toJson(FieldType.regularOrSpecial());

assertThat(actualJson).isEqualTo(KNOWN_JSON);
Expand Down
4 changes: 4 additions & 0 deletions services/utils/persistence/pom.xml
Expand Up @@ -39,6 +39,10 @@
<artifactId>ditto-services-utils-health</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-models-things</artifactId>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
Expand Down
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2017 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* Contributors:
* Bosch Software Innovations GmbH - initial contribution
*/
package org.eclipse.ditto.services.utils.persistence.mongo;

import org.eclipse.ditto.services.models.things.commands.sudo.SudoStreamModifiedEntities;
import org.eclipse.ditto.services.utils.akka.streaming.AbstractStreamingActor;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorRefProvider;
import akka.contrib.persistence.mongodb.DittoJavaDslMongoReadJournal;
import akka.stream.javadsl.Source;

/**
* Actor that streams information about persisted entities modified in a time window in the past.
*/
public abstract class AbstractPersistenceStreamingActor<T>
extends AbstractStreamingActor<SudoStreamModifiedEntities, T> {

private final ActorRefProvider actorRefProvider = getContext().system().provider();

/**
* Returns the journal to query for entities modified in a time window in the past.
*
* @return The journal.
*/
protected abstract DittoJavaDslMongoReadJournal getJournal();

/**
* Transforms persistence ID and sequence number into an object to stream to the recipient actor.
*
* @param pid The persistence ID.
* @param sequenceNumber The latest sequence number.
* @return Element to stream to the recipient actor.
*/
protected abstract T createElement(final String pid, final long sequenceNumber);

@Override
protected final Class<SudoStreamModifiedEntities> getCommandClass() {
return SudoStreamModifiedEntities.class;
}

@Override
protected final ActorRef getElementRecipient(final SudoStreamModifiedEntities command) {
return actorRefProvider.resolveActorRef(command.getElementRecipient());
}

@Override
protected final ActorRef getStatusRecipient(final SudoStreamModifiedEntities command) {
return actorRefProvider.resolveActorRef(command.getStatusRecipient());
}

@Override
protected final int getElementsPerSecond(final SudoStreamModifiedEntities command) {
return command.getElementsPerSecond();
}

@Override
protected final Source<T, NotUsed> createSource(final SudoStreamModifiedEntities command) {
return getJournal().sequenceNumbersOfPidsByDuration(command.getTimespan(), command.getOffset())
.map(pidWithSeqNr -> createElement(pidWithSeqNr.persistenceId(), pidWithSeqNr.sequenceNr()));
}
}

0 comments on commit 9b0537d

Please sign in to comment.