Skip to content

Commit

Permalink
Add ShutdownResponse. Background-cleanup now responds to Shutdown by …
Browse files Browse the repository at this point in the history
…restarting stream.

Signed-off-by: Cai Yufei (INST/ECS1) <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Jun 24, 2019
1 parent 1922a91 commit 756b96f
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 3 deletions.
Expand Up @@ -12,7 +12,9 @@
*/
package org.eclipse.ditto.services.concierge.actors.cleanup;

import java.time.Duration;
import java.time.Instant;
import java.time.format.DateTimeParseException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
Expand Down Expand Up @@ -47,6 +49,9 @@
import org.eclipse.ditto.services.utils.health.StatusInfo;
import org.eclipse.ditto.signals.commands.cleanup.Cleanup;
import org.eclipse.ditto.signals.commands.cleanup.CleanupResponse;
import org.eclipse.ditto.signals.commands.common.Shutdown;
import org.eclipse.ditto.signals.commands.common.ShutdownReason;
import org.eclipse.ditto.signals.commands.common.ShutdownResponse;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
import org.eclipse.ditto.signals.commands.policies.PolicyCommand;
import org.eclipse.ditto.signals.commands.things.ThingCommand;
Expand Down Expand Up @@ -118,6 +123,11 @@ public final class EventSnapshotCleanupCoordinator extends AbstractActorWithTime
*/
public static final String ACTOR_NAME = "eventSnapshotCleanupCoordinator";

/**
* Shutdown-reason type to restart stream after non-default time.
*/
private static final String RESTART_AFTER = "restartAfter";

private static final JsonFieldDefinition<JsonArray> CREDIT_DECISIONS =
JsonFactory.newJsonArrayFieldDefinition("credit-decisions");

Expand Down Expand Up @@ -198,6 +208,7 @@ private Receive sleeping() {
return ReceiveBuilder.create()
.matchEquals(WokeUp.WOKE_UP, this::wokeUp)
.match(RetrieveHealth.class, this::retrieveHealth)
.match(Shutdown.class, this::shutdownStream)
.build()
.orElse(retrieveConfigBehavior())
.orElse(modifyConfigBehavior());
Expand All @@ -211,11 +222,40 @@ private Receive streaming() {
enqueue(actions, cleanupResponse, config.getKeptActions()))
.match(StreamTerminated.class, this::streamTerminated)
.match(RetrieveHealth.class, this::retrieveHealth)
.match(Shutdown.class, this::shutdownStream)
.build()
.orElse(retrieveConfigBehavior())
.orElse(modifyConfigBehavior());
}

private void shutdownStream(final Shutdown shutdown) {
log.info("Terminating stream on demand: <{}>", shutdown);
shutdownKillSwitch();

final Event streamTerminated = new StreamTerminated("Got " + shutdown);
enqueue(events, streamTerminated, config.getKeptEvents());
getContext().become(sleeping());

final ShutdownReason shutdownReason = shutdown.getReason();
Duration wakeUpDelay;
String message;
if (RESTART_AFTER.equals(shutdownReason.getType().toString()) && shutdownReason.getDetails().isPresent()) {
try {
wakeUpDelay = Duration.parse(shutdownReason.getDetailsOrThrow());
message = "Restarting stream in " + wakeUpDelay;
} catch (final DateTimeParseException e) {
wakeUpDelay = config.getQuietPeriod();
message = String.format("Unable to parse <%s> to duration; restarting in <%s>.",
shutdownReason.getDetailsOrThrow(), wakeUpDelay);
}
} else {
wakeUpDelay = config.getQuietPeriod();
message = String.format("There is no reason:{type=<%s>,details=<timestamp>}; restarting in <%s>.", RESTART_AFTER, wakeUpDelay);
}
scheduleWakeUp(wakeUpDelay);
getSender().tell(ShutdownResponse.of(message, shutdown.getDittoHeaders()), getSelf());
}

private void wokeUp(final Event wokeUp) {
log.info("Woke up.");
enqueue(events, wokeUp, config.getKeptEvents());
Expand All @@ -237,12 +277,15 @@ private <T> Flow<T, T, NotUsed> reportToSelf() {
});
}

private void restartStream() {
private void shutdownKillSwitch() {
if (killSwitch != null) {
log.info("Shutting down previous stream.");
killSwitch.shutdown();
killSwitch = null;
}
}

private void restartStream() {
shutdownKillSwitch();

Pair<UniqueKillSwitch, CompletionStage<Done>> materializedValues =
assembleSource().viaMat(KillSwitches.single(), Keep.right())
Expand All @@ -262,7 +305,11 @@ private void restartStream() {
}

private void scheduleWakeUp() {
getTimers().startSingleTimer(WokeUp.WOKE_UP, WokeUp.WOKE_UP, config.getQuietPeriod());
scheduleWakeUp(config.getQuietPeriod());
}

private void scheduleWakeUp(final Duration when) {
getTimers().startSingleTimer(WokeUp.WOKE_UP, WokeUp.WOKE_UP, when);
}

private Source<EntityIdWithRevision, NotUsed> assembleSource() {
Expand Down
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.signals.commands.common;

import java.util.Objects;
import java.util.function.Predicate;

import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.json.JsonParsableCommandResponse;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;

/**
* Response to {@code Shutdown} containing the retrieved config.
*/
@JsonParsableCommandResponse(type = ShutdownResponse.TYPE)
public final class ShutdownResponse extends CommonCommandResponse<ShutdownResponse> {

/**
* Type of this command response.
*/
public static final String TYPE = TYPE_PREFIX + Shutdown.NAME;

private static final JsonFieldDefinition<JsonValue> MESSAGE = JsonFactory.newJsonValueFieldDefinition("message");

private final JsonValue message;

private ShutdownResponse(final JsonValue message, final DittoHeaders dittoHeaders) {
super(TYPE, HttpStatusCode.OK, dittoHeaders);
this.message = message;
}

/**
* Create a {@code ShutdownResponse}.
*
* @param message what to say in the response.
* @param headers Ditto headers.
* @return the {@code ShutdownResponse}.
*/
public static ShutdownResponse of(final Object message, final DittoHeaders headers) {
return new ShutdownResponse(JsonValue.of(message), headers);
}

/**
* Creates a new {@code ShutdownResponse} from the given JSON object.
*
* @param jsonObject the JSON object of which the Shutdown is to be created.
* @param dittoHeaders the headers.
* @return the command.
* @throws NullPointerException if {@code jsonObject} is {@code null}.
* @throws org.eclipse.ditto.json.JsonMissingFieldException if the JSON object does not contain the field "config".
*/
public static ShutdownResponse fromJson(final JsonObject jsonObject, final DittoHeaders dittoHeaders) {
return new ShutdownResponse(jsonObject.getValueOrThrow(MESSAGE), dittoHeaders);
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> predicate) {

jsonObjectBuilder.set(MESSAGE, message);
}

@Override
public ShutdownResponse setDittoHeaders(final DittoHeaders dittoHeaders) {
return new ShutdownResponse(message, dittoHeaders);
}

@Override
public boolean equals(final Object that) {
if (super.equals(that) && that instanceof ShutdownResponse) {
return Objects.equals(message, ((ShutdownResponse) that).message);
} else {
return false;
}

}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), message);
}

@Override
public String toString() {
return "ShutdownResponse[message=" + message + "," + super.toString() + "]";
}
}
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.signals.commands.common;

import static org.mutabilitydetector.unittesting.AllowedReason.provided;
import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf;
import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable;

import org.eclipse.ditto.json.JsonValue;
import org.junit.Test;

import nl.jqno.equalsverifier.EqualsVerifier;

/**
* Tests {@link ShutdownResponse}.
*/
public final class ShutdownResponseTest {

@Test
public void assertImmutability() {
assertInstancesOf(ShutdownResponse.class, areImmutable(), provided(JsonValue.class).isAlsoImmutable());
}

@Test
public void testHashCodeAndEquals() {
EqualsVerifier.forClass(ShutdownResponse.class)
.usingGetClass()
.verify();
}

}

0 comments on commit 756b96f

Please sign in to comment.