Skip to content

Commit

Permalink
add AbstractStreamingActor, replace SudoRetrieveModifiedThingTags by …
Browse files Browse the repository at this point in the history
…SudoStreamModifiedEntities

Signed-off-by: Cai Yufei <Yufei.Cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Nov 16, 2017
1 parent 7c1ede8 commit 47b9f74
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 79 deletions.
Expand Up @@ -20,7 +20,7 @@
import org.eclipse.ditto.services.gateway.proxy.actors.handlers.RetrieveThingHandlerActor;
import org.eclipse.ditto.services.gateway.proxy.actors.handlers.ThingHandlerCreator;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoCommand;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveModifiedThingTags;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoStreamModifiedEntities;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThings;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThingsResponse;
import org.eclipse.ditto.services.utils.akka.LogUtil;
Expand Down Expand Up @@ -100,7 +100,7 @@ protected void addCommandBehaviour(final ReceiveBuilder receiveBuilder) {
thingsAggregator.forward(command, getContext());
}
})
.match(SudoRetrieveModifiedThingTags.class, command -> {
.match(SudoStreamModifiedEntities.class, command -> {
getLogger().debug(
"Got 'SudoRetrieveModifiedThingTags' message, forwarding to the Things Persistence");
pubSubMediator.tell(
Expand Down
Expand Up @@ -45,7 +45,7 @@ public static SudoCommandRegistry newInstance() {
parseStrategies.put(TakeSnapshot.TYPE, TakeSnapshot::fromJson);
parseStrategies.put(SudoRetrieveThing.TYPE, SudoRetrieveThing::fromJson);
parseStrategies.put(SudoRetrieveThings.TYPE, SudoRetrieveThings::fromJson);
parseStrategies.put(SudoRetrieveModifiedThingTags.TYPE, SudoRetrieveModifiedThingTags::fromJson);
parseStrategies.put(SudoStreamModifiedEntities.TYPE, SudoStreamModifiedEntities::fromJson);

return new SudoCommandRegistry(parseStrategies);
}
Expand Down
Expand Up @@ -39,7 +39,7 @@
import org.eclipse.ditto.signals.commands.base.CommandResponseJsonDeserializer;

/**
* Response to a {@link SudoRetrieveModifiedThingTags} command.
* Response to a {@link SudoStreamModifiedEntities} command.
*/
@Immutable
public final class SudoRetrieveModifiedThingTagsResponse
Expand Down
Expand Up @@ -12,6 +12,9 @@
package org.eclipse.ditto.services.models.things.commands.sudo;

import static java.util.Objects.requireNonNull;
import static org.eclipse.ditto.model.base.json.FieldType.REGULAR;
import static org.eclipse.ditto.model.base.json.JsonSchemaVersion.V_1;
import static org.eclipse.ditto.model.base.json.JsonSchemaVersion.V_2;

import java.time.Duration;
import java.time.format.DateTimeParseException;
Expand All @@ -29,7 +32,6 @@
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonParseException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.json.FieldType;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.services.models.things.ThingTag;
import org.eclipse.ditto.signals.commands.base.AbstractCommand;
Expand All @@ -40,68 +42,84 @@
* cache.
*/
@Immutable
public final class SudoRetrieveModifiedThingTags extends AbstractCommand<SudoRetrieveModifiedThingTags> implements
SudoCommand<SudoRetrieveModifiedThingTags> {
public final class SudoStreamModifiedEntities extends AbstractCommand<SudoStreamModifiedEntities> implements
SudoCommand<SudoStreamModifiedEntities> {

/**
* Name of this command.
*/
public static final String NAME = "sudoRetrieveModifiedThingTags";
public static final String NAME = "sudoStreamModifiedEntities";

/**
* Type of this command.
*/
public static final String TYPE = TYPE_PREFIX + NAME;

static final JsonFieldDefinition<String> JSON_TIMESPAN =
JsonFactory.newStringFieldDefinition("payload/timespan", FieldType.REGULAR, JsonSchemaVersion.V_1,
JsonSchemaVersion.V_2);
JsonFactory.newStringFieldDefinition("payload/timespan", REGULAR, V_1, V_2);

static final JsonFieldDefinition<String> JSON_OFFSET =
JsonFactory.newStringFieldDefinition("payload/offset", FieldType.REGULAR, JsonSchemaVersion.V_1,
JsonSchemaVersion.V_2);
JsonFactory.newStringFieldDefinition("payload/offset", REGULAR, V_1, V_2);

static final JsonFieldDefinition<Integer> JSON_ELEMENTS_PER_SECOND =
JsonFactory.newIntFieldDefinition("payload/elementsPerSecond", REGULAR, V_1, V_2);

static final JsonFieldDefinition<String> JSON_MAX_QUERY_TIME =
JsonFactory.newStringFieldDefinition("payload/maxQueryTime", REGULAR, V_1, V_2);

static final JsonFieldDefinition<String> JSON_ELEMENT_RECIPIENT =
JsonFactory.newStringFieldDefinition("payload/elementRecipient", REGULAR, V_1, V_2);

static final JsonFieldDefinition<String> JSON_STATUS_RECIPIENT =
JsonFactory.newStringFieldDefinition("payload/statusRecipient", REGULAR, V_1, V_2);


private static final String NULL_OFFSET = "PT0M";

private final Duration timespan;

private final Duration offset;

private SudoRetrieveModifiedThingTags(final Duration timespan, final Duration offset,
final DittoHeaders dittoHeaders) {
private final int elementsPerSecond;

private final Duration maxQueryTime;

private final String elementRecipient;

private final String statusRecipient;

private SudoStreamModifiedEntities(final Duration timespan, final Duration offset,
final int elementsPerSecond, final Duration maxQueryTime,
final String elementRecipient, final String statusRecipient, final DittoHeaders dittoHeaders) {
super(TYPE, dittoHeaders);

this.timespan = requireNonNull(timespan, "The timespan must not be null!");
this.offset = offset;
this.elementRecipient = elementRecipient;
this.statusRecipient = statusRecipient;
this.elementsPerSecond = elementsPerSecond;
this.maxQueryTime = maxQueryTime;
}

/**
* Creates a new {@code SudoRetrieveModifiedThingTags} command.
*
* @param timespan the duration for which all modified things should be retrieved.
* @param dittoHeaders the command headers of the request.
* @return a command for retrieving modified Things without authorization.
* @throws NullPointerException if any argument is {@code null}.
*/
public static SudoRetrieveModifiedThingTags of(final Duration timespan, final DittoHeaders dittoHeaders) {
return new SudoRetrieveModifiedThingTags(timespan, Duration.parse(NULL_OFFSET), dittoHeaders);
}


/**
* Creates a new {@code SudoRetrieveModifiedThingTags} command.
*
* @param timespan the duration for which all modified things should be retrieved.
* @param offset the duration for which the modified things should be skipped.
* @param elementRecipient TODO
* @param statusRecipient TODO
* @param dittoHeaders the command headers of the request.
* @return a command for retrieving modified Things without authorization.
* @throws NullPointerException if any argument is {@code null}.
*/
public static SudoRetrieveModifiedThingTags of(final Duration timespan, final Duration offset,
final DittoHeaders dittoHeaders) {
return new SudoRetrieveModifiedThingTags(timespan, offset, dittoHeaders);
public static SudoStreamModifiedEntities of(final Duration timespan, final Duration offset,
final int elementsPerSecond, final Duration maxQueryTime,
final String elementRecipient, final String statusRecipient, final DittoHeaders dittoHeaders) {
return new SudoStreamModifiedEntities(timespan, offset, elementsPerSecond, maxQueryTime, elementRecipient,
statusRecipient, dittoHeaders);
}


/**
* Creates a new {@code SudoRetrieveModifiedThingTags} from a JSON string.
*
Expand All @@ -114,7 +132,7 @@ public static SudoRetrieveModifiedThingTags of(final Duration timespan, final Du
* valid JSON.
* @throws JsonMissingFieldException if the passed in {@code jsonString} was not in the expected format.
*/
public static SudoRetrieveModifiedThingTags fromJson(final String jsonString, final DittoHeaders dittoHeaders) {
public static SudoStreamModifiedEntities fromJson(final String jsonString, final DittoHeaders dittoHeaders) {
return fromJson(JsonFactory.newObject(jsonString), dittoHeaders);
}

Expand All @@ -127,13 +145,18 @@ public static SudoRetrieveModifiedThingTags fromJson(final String jsonString, fi
* @throws NullPointerException if {@code jsonObject} is {@code null}.
* @throws JsonMissingFieldException if the passed in {@code jsonObject} was not in the expected format.
*/
public static SudoRetrieveModifiedThingTags fromJson(final JsonObject jsonObject,
public static SudoStreamModifiedEntities fromJson(final JsonObject jsonObject,
final DittoHeaders dittoHeaders) {

try {
final Duration extractedTimespan = Duration.parse(jsonObject.getValueOrThrow(JSON_TIMESPAN));
final Duration extractedOffset = Duration.parse(jsonObject.getValue(JSON_OFFSET).orElse(NULL_OFFSET));
return SudoRetrieveModifiedThingTags.of(extractedTimespan, extractedOffset, dittoHeaders);
final int elementsPerSecond = jsonObject.getValueOrThrow(JSON_ELEMENTS_PER_SECOND);
final Duration maxQueryTime = Duration.parse(jsonObject.getValueOrThrow(JSON_MAX_QUERY_TIME));
final String elementRecipient = jsonObject.getValueOrThrow(JSON_ELEMENT_RECIPIENT);
final String statusRecipient = jsonObject.getValueOrThrow(JSON_STATUS_RECIPIENT);
return SudoStreamModifiedEntities.of(extractedTimespan, extractedOffset, elementsPerSecond, maxQueryTime,
elementRecipient, statusRecipient, dittoHeaders);
} catch (final DateTimeParseException e) {
throw JsonParseException.newBuilder()
.message("The given timespan is no valid Duration.")
Expand Down Expand Up @@ -167,20 +190,21 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Js
final Predicate<JsonField> predicate = schemaVersion.and(thePredicate);
jsonObjectBuilder.set(JSON_TIMESPAN, timespan.toString(), predicate);
jsonObjectBuilder.set(JSON_OFFSET, offset.toString(), predicate);
jsonObjectBuilder.set(JSON_ELEMENTS_PER_SECOND, elementsPerSecond, predicate);
jsonObjectBuilder.set(JSON_MAX_QUERY_TIME, maxQueryTime.toString(), predicate);
jsonObjectBuilder.set(JSON_ELEMENT_RECIPIENT, elementRecipient, predicate);
jsonObjectBuilder.set(JSON_STATUS_RECIPIENT, statusRecipient, predicate);
}

@Override
public SudoRetrieveModifiedThingTags setDittoHeaders(final DittoHeaders dittoHeaders) {
return of(timespan, offset, dittoHeaders);
public SudoStreamModifiedEntities setDittoHeaders(final DittoHeaders dittoHeaders) {
return of(timespan, offset, elementsPerSecond, maxQueryTime, elementRecipient, statusRecipient, dittoHeaders);
}

@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + Objects.hashCode(timespan);
result = prime * result + Objects.hashCode(offset);
return result;
return Objects.hash(super.hashCode(), timespan, offset, elementsPerSecond, maxQueryTime, elementRecipient,
statusRecipient);
}

@SuppressWarnings({"squid:MethodCyclomaticComplexity", "squid:S1067", "pmd:SimplifyConditional"})
Expand All @@ -192,19 +216,29 @@ public boolean equals(@Nullable final Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final SudoRetrieveModifiedThingTags that = (SudoRetrieveModifiedThingTags) obj;
final SudoStreamModifiedEntities that = (SudoStreamModifiedEntities) obj;
return that.canEqual(this) && Objects.equals(timespan, that.timespan) && Objects.equals(offset, that.offset)
&& Objects.equals(elementsPerSecond, that.elementsPerSecond)
&& Objects.equals(maxQueryTime, that.maxQueryTime)
&& Objects.equals(elementRecipient, that.elementRecipient)
&& Objects.equals(statusRecipient, that.statusRecipient)
&& super.equals(that);
}

@Override
protected boolean canEqual(@Nullable final Object other) {
return other instanceof SudoRetrieveModifiedThingTags;
return other instanceof SudoStreamModifiedEntities;
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" + super.toString() + ", timespan=" + timespan + ", offset= " + offset
return getClass().getSimpleName() + " [" + super.toString()
+ ", timespan=" + timespan
+ ", offset= " + offset
+ ", elementsPerSecond= " + elementsPerSecond
+ ", maxQueryTime= " + maxQueryTime
+ ", elementRecipient=" + elementRecipient
+ ", statusRecipient=" + statusRecipient
+ "]";
}

Expand Down
Expand Up @@ -31,12 +31,12 @@
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.model.things.ThingLifecycle;
import org.eclipse.ditto.model.things.ThingsModelFactory;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveModifiedThingTags;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveModifiedThingTagsResponse;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThing;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThingResponse;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThings;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoRetrieveThingsResponse;
import org.eclipse.ditto.services.models.things.commands.sudo.SudoStreamModifiedEntities;

/* */
public final class CommandAndEventJsonExamplesProducer {
Expand Down Expand Up @@ -111,10 +111,15 @@ private static void produceSudoCommands(final Path rootPath) throws IOException

final Duration timespan = Duration.ofMinutes(5);
final Duration offset = Duration.ofMinutes(1);

final SudoRetrieveModifiedThingTags sudoRetrieveModifiedThingTags =
SudoRetrieveModifiedThingTags.of(timespan, offset, TestConstants.EMPTY_HEADERS);
writeJson(sudoCommandsDir.resolve(Paths.get("sudoRetrieveModifiedThingTags.json")),
final int elementsPerSecond = 100;
final Duration maxQueryTime = Duration.ofMinutes(4);
final String elementRecipient = "akka.tcp://actorSystem@hostname/user/elementRecipientActor";
final String statusRecipient = "akka.tcp://actorSystem@hostname/user/statusRecipientActor";

final SudoStreamModifiedEntities sudoRetrieveModifiedThingTags =
SudoStreamModifiedEntities.of(timespan, offset, elementsPerSecond, maxQueryTime, elementRecipient,
statusRecipient, TestConstants.EMPTY_HEADERS);
writeJson(sudoCommandsDir.resolve(Paths.get("sudoStreamModifiedEntities.json")),
sudoRetrieveModifiedThingTags);

final SudoRetrieveModifiedThingTagsResponse sudoRetrieveModifiedThingTagsResponse =
Expand Down
Expand Up @@ -30,62 +30,66 @@
import nl.jqno.equalsverifier.EqualsVerifier;

/**
* Unit test for {@link SudoRetrieveModifiedThingTags}.
* Unit test for {@link SudoStreamModifiedEntities}.
*/
public final class SudoRetrieveModifiedThingTagsTest {
public final class SudoStreamModifiedEntitiesTest {

private static final Duration KNOWN_TIMESPAN = Duration.ofMinutes(5);
private static final Duration KNOWN_OFFSET = Duration.ofMinutes(1);
private static final int KNOWN_ELEMENTS_PER_SECOND = 1234;
private static final Duration KNOWN_MAX_QUERY_TIME = Duration.ofDays(365);
private static final String KNOWN_ELEMENT_RECIPIENT = "akka.tcp://known@element/user/recipient";
private static final String KNOWN_STATUS_RECIPIENT = "akka.tcp://known@status/user/recipient";

private static final JsonObject KNOWN_JSON = JsonFactory.newObjectBuilder()
.set(Command.JsonFields.TYPE, SudoRetrieveModifiedThingTags.TYPE)
.set(SudoRetrieveModifiedThingTags.JSON_TIMESPAN, KNOWN_TIMESPAN.toString())
.set(SudoRetrieveModifiedThingTags.JSON_OFFSET, KNOWN_OFFSET.toString())
.set(Command.JsonFields.TYPE, SudoStreamModifiedEntities.TYPE)
.set(SudoStreamModifiedEntities.JSON_TIMESPAN, KNOWN_TIMESPAN.toString())
.set(SudoStreamModifiedEntities.JSON_OFFSET, KNOWN_OFFSET.toString())
.set(SudoStreamModifiedEntities.JSON_ELEMENTS_PER_SECOND, KNOWN_ELEMENTS_PER_SECOND)
.set(SudoStreamModifiedEntities.JSON_MAX_QUERY_TIME, KNOWN_MAX_QUERY_TIME.toString())
.set(SudoStreamModifiedEntities.JSON_ELEMENT_RECIPIENT, KNOWN_ELEMENT_RECIPIENT)
.set(SudoStreamModifiedEntities.JSON_STATUS_RECIPIENT, KNOWN_STATUS_RECIPIENT)
.build();

private static final DittoHeaders EMPTY_DITTO_HEADERS = DittoHeaders.empty();

/** */
@Test
public void assertImmutability() {
assertInstancesOf(SudoRetrieveModifiedThingTags.class,
assertInstancesOf(SudoStreamModifiedEntities.class,
areImmutable(),
provided(AuthorizationContext.class, JsonFieldSelector.class).isAlsoImmutable());
}

/** */
@Test
public void testHashCodeAndEquals() {
EqualsVerifier.forClass(SudoRetrieveModifiedThingTags.class)
EqualsVerifier.forClass(SudoStreamModifiedEntities.class)
.withRedefinedSuperclass()
.verify();
}

/** */
@Test
public void toJsonReturnsExpected() {
final SudoRetrieveModifiedThingTags underTest =
SudoRetrieveModifiedThingTags.of(KNOWN_TIMESPAN, KNOWN_OFFSET, EMPTY_DITTO_HEADERS);
final SudoStreamModifiedEntities underTest =
SudoStreamModifiedEntities.of(KNOWN_TIMESPAN, KNOWN_OFFSET, KNOWN_ELEMENTS_PER_SECOND,
KNOWN_MAX_QUERY_TIME, KNOWN_ELEMENT_RECIPIENT, KNOWN_STATUS_RECIPIENT, EMPTY_DITTO_HEADERS);
final JsonObject actualJson = underTest.toJson(FieldType.regularOrSpecial());

assertThat(actualJson).isEqualTo(KNOWN_JSON);
}

/** */
@Test
public void createInstanceFromValidJson() {
final SudoRetrieveModifiedThingTags underTest =
SudoRetrieveModifiedThingTags.fromJson(KNOWN_JSON.toString(), EMPTY_DITTO_HEADERS);
final SudoStreamModifiedEntities underTest =
SudoStreamModifiedEntities.fromJson(KNOWN_JSON.toString(), EMPTY_DITTO_HEADERS);

assertThat(underTest).isNotNull();
assertThat(underTest.getTimespan()).isEqualTo(KNOWN_TIMESPAN);
}

/** */
@Test
public void checkSudoCommandTypeWorks() {
final SudoRetrieveModifiedThingTags sudoRetrieveModifiedThingTags =
SudoRetrieveModifiedThingTags.fromJson(KNOWN_JSON.toString(), EMPTY_DITTO_HEADERS);
final SudoStreamModifiedEntities sudoRetrieveModifiedThingTags =
SudoStreamModifiedEntities.fromJson(KNOWN_JSON.toString(), EMPTY_DITTO_HEADERS);

final SudoCommand sudoCommand =
SudoCommandRegistry.newInstance().parse(KNOWN_JSON.toString(), EMPTY_DITTO_HEADERS);
Expand Down

0 comments on commit 47b9f74

Please sign in to comment.