diff --git a/services/concierge/actors/src/main/java/org/eclipse/ditto/services/concierge/actors/cleanup/EventSnapshotCleanupCoordinator.java b/services/concierge/actors/src/main/java/org/eclipse/ditto/services/concierge/actors/cleanup/EventSnapshotCleanupCoordinator.java index f0a6674b26..324ad1aaa5 100644 --- a/services/concierge/actors/src/main/java/org/eclipse/ditto/services/concierge/actors/cleanup/EventSnapshotCleanupCoordinator.java +++ b/services/concierge/actors/src/main/java/org/eclipse/ditto/services/concierge/actors/cleanup/EventSnapshotCleanupCoordinator.java @@ -12,7 +12,9 @@ */ package org.eclipse.ditto.services.concierge.actors.cleanup; +import java.time.Duration; import java.time.Instant; +import java.time.format.DateTimeParseException; import java.util.ArrayDeque; import java.util.Collections; import java.util.Deque; @@ -47,6 +49,9 @@ import org.eclipse.ditto.services.utils.health.StatusInfo; import org.eclipse.ditto.signals.commands.cleanup.Cleanup; import org.eclipse.ditto.signals.commands.cleanup.CleanupResponse; +import org.eclipse.ditto.signals.commands.common.Shutdown; +import org.eclipse.ditto.signals.commands.common.ShutdownReason; +import org.eclipse.ditto.signals.commands.common.ShutdownResponse; import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand; import org.eclipse.ditto.signals.commands.policies.PolicyCommand; import org.eclipse.ditto.signals.commands.things.ThingCommand; @@ -118,6 +123,11 @@ public final class EventSnapshotCleanupCoordinator extends AbstractActorWithTime */ public static final String ACTOR_NAME = "eventSnapshotCleanupCoordinator"; + /** + * Shutdown-reason type to restart stream after non-default time. + */ + private static final String RESTART_AFTER = "restartAfter"; + private static final JsonFieldDefinition CREDIT_DECISIONS = JsonFactory.newJsonArrayFieldDefinition("credit-decisions"); @@ -198,6 +208,7 @@ private Receive sleeping() { return ReceiveBuilder.create() .matchEquals(WokeUp.WOKE_UP, this::wokeUp) .match(RetrieveHealth.class, this::retrieveHealth) + .match(Shutdown.class, this::shutdownStream) .build() .orElse(retrieveConfigBehavior()) .orElse(modifyConfigBehavior()); @@ -211,11 +222,40 @@ private Receive streaming() { enqueue(actions, cleanupResponse, config.getKeptActions())) .match(StreamTerminated.class, this::streamTerminated) .match(RetrieveHealth.class, this::retrieveHealth) + .match(Shutdown.class, this::shutdownStream) .build() .orElse(retrieveConfigBehavior()) .orElse(modifyConfigBehavior()); } + private void shutdownStream(final Shutdown shutdown) { + log.info("Terminating stream on demand: <{}>", shutdown); + shutdownKillSwitch(); + + final Event streamTerminated = new StreamTerminated("Got " + shutdown); + enqueue(events, streamTerminated, config.getKeptEvents()); + getContext().become(sleeping()); + + final ShutdownReason shutdownReason = shutdown.getReason(); + Duration wakeUpDelay; + String message; + if (RESTART_AFTER.equals(shutdownReason.getType().toString()) && shutdownReason.getDetails().isPresent()) { + try { + wakeUpDelay = Duration.parse(shutdownReason.getDetailsOrThrow()); + message = "Restarting stream in " + wakeUpDelay; + } catch (final DateTimeParseException e) { + wakeUpDelay = config.getQuietPeriod(); + message = String.format("Unable to parse <%s> to duration; restarting in <%s>.", + shutdownReason.getDetailsOrThrow(), wakeUpDelay); + } + } else { + wakeUpDelay = config.getQuietPeriod(); + message = String.format("There is no reason:{type=<%s>,details=}; restarting in <%s>.", RESTART_AFTER, wakeUpDelay); + } + scheduleWakeUp(wakeUpDelay); + getSender().tell(ShutdownResponse.of(message, shutdown.getDittoHeaders()), getSelf()); + } + private void wokeUp(final Event wokeUp) { log.info("Woke up."); enqueue(events, wokeUp, config.getKeptEvents()); @@ -237,12 +277,15 @@ private Flow reportToSelf() { }); } - private void restartStream() { + private void shutdownKillSwitch() { if (killSwitch != null) { - log.info("Shutting down previous stream."); killSwitch.shutdown(); killSwitch = null; } + } + + private void restartStream() { + shutdownKillSwitch(); Pair> materializedValues = assembleSource().viaMat(KillSwitches.single(), Keep.right()) @@ -262,7 +305,11 @@ private void restartStream() { } private void scheduleWakeUp() { - getTimers().startSingleTimer(WokeUp.WOKE_UP, WokeUp.WOKE_UP, config.getQuietPeriod()); + scheduleWakeUp(config.getQuietPeriod()); + } + + private void scheduleWakeUp(final Duration when) { + getTimers().startSingleTimer(WokeUp.WOKE_UP, WokeUp.WOKE_UP, when); } private Source assembleSource() { diff --git a/signals/commands/common/src/main/java/org/eclipse/ditto/signals/commands/common/ShutdownResponse.java b/signals/commands/common/src/main/java/org/eclipse/ditto/signals/commands/common/ShutdownResponse.java new file mode 100644 index 0000000000..9559148107 --- /dev/null +++ b/signals/commands/common/src/main/java/org/eclipse/ditto/signals/commands/common/ShutdownResponse.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2019 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.signals.commands.common; + +import java.util.Objects; +import java.util.function.Predicate; + +import org.eclipse.ditto.json.JsonFactory; +import org.eclipse.ditto.json.JsonField; +import org.eclipse.ditto.json.JsonFieldDefinition; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonObjectBuilder; +import org.eclipse.ditto.json.JsonValue; +import org.eclipse.ditto.model.base.common.HttpStatusCode; +import org.eclipse.ditto.model.base.headers.DittoHeaders; +import org.eclipse.ditto.model.base.json.JsonParsableCommandResponse; +import org.eclipse.ditto.model.base.json.JsonSchemaVersion; + +/** + * Response to {@code Shutdown} containing the retrieved config. + */ +@JsonParsableCommandResponse(type = ShutdownResponse.TYPE) +public final class ShutdownResponse extends CommonCommandResponse { + + /** + * Type of this command response. + */ + public static final String TYPE = TYPE_PREFIX + Shutdown.NAME; + + private static final JsonFieldDefinition MESSAGE = JsonFactory.newJsonValueFieldDefinition("message"); + + private final JsonValue message; + + private ShutdownResponse(final JsonValue message, final DittoHeaders dittoHeaders) { + super(TYPE, HttpStatusCode.OK, dittoHeaders); + this.message = message; + } + + /** + * Create a {@code ShutdownResponse}. + * + * @param message what to say in the response. + * @param headers Ditto headers. + * @return the {@code ShutdownResponse}. + */ + public static ShutdownResponse of(final Object message, final DittoHeaders headers) { + return new ShutdownResponse(JsonValue.of(message), headers); + } + + /** + * Creates a new {@code ShutdownResponse} from the given JSON object. + * + * @param jsonObject the JSON object of which the Shutdown is to be created. + * @param dittoHeaders the headers. + * @return the command. + * @throws NullPointerException if {@code jsonObject} is {@code null}. + * @throws org.eclipse.ditto.json.JsonMissingFieldException if the JSON object does not contain the field "config". + */ + public static ShutdownResponse fromJson(final JsonObject jsonObject, final DittoHeaders dittoHeaders) { + return new ShutdownResponse(jsonObject.getValueOrThrow(MESSAGE), dittoHeaders); + } + + @Override + protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion, + final Predicate predicate) { + + jsonObjectBuilder.set(MESSAGE, message); + } + + @Override + public ShutdownResponse setDittoHeaders(final DittoHeaders dittoHeaders) { + return new ShutdownResponse(message, dittoHeaders); + } + + @Override + public boolean equals(final Object that) { + if (super.equals(that) && that instanceof ShutdownResponse) { + return Objects.equals(message, ((ShutdownResponse) that).message); + } else { + return false; + } + + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), message); + } + + @Override + public String toString() { + return "ShutdownResponse[message=" + message + "," + super.toString() + "]"; + } +} diff --git a/signals/commands/common/src/test/java/org/eclipse/ditto/signals/commands/common/ShutdownResponseTest.java b/signals/commands/common/src/test/java/org/eclipse/ditto/signals/commands/common/ShutdownResponseTest.java new file mode 100644 index 0000000000..48645dda7d --- /dev/null +++ b/signals/commands/common/src/test/java/org/eclipse/ditto/signals/commands/common/ShutdownResponseTest.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2019 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.signals.commands.common; + +import static org.mutabilitydetector.unittesting.AllowedReason.provided; +import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf; +import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable; + +import org.eclipse.ditto.json.JsonValue; +import org.junit.Test; + +import nl.jqno.equalsverifier.EqualsVerifier; + +/** + * Tests {@link ShutdownResponse}. + */ +public final class ShutdownResponseTest { + + @Test + public void assertImmutability() { + assertInstancesOf(ShutdownResponse.class, areImmutable(), provided(JsonValue.class).isAlsoImmutable()); + } + + @Test + public void testHashCodeAndEquals() { + EqualsVerifier.forClass(ShutdownResponse.class) + .usingGetClass() + .verify(); + } + +}