Skip to content

Commit

Permalink
add poison pill devops command; add debug logs to reset operations.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed May 18, 2022
1 parent 9e2fe64 commit 8838f0c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
Expand Up @@ -62,6 +62,7 @@

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.cluster.Cluster;
Expand All @@ -86,6 +87,11 @@ public final class DevOpsCommandsActor extends AbstractActor implements Retrieve
*/
public static final String AGGREGATE_HEADER = "aggregate";

/**
* Name of the poison pill "command".
*/
private static final String POISON_PILL_NAME = "poison-pill";

private static final Duration DEFAULT_RECEIVE_TIMEOUT = Duration.ofMillis(10_000);
private static final String UNKNOWN_MESSAGE_TEMPLATE = "Unknown message: {}";
private static final String TOPIC_HEADER = "topic";
Expand Down Expand Up @@ -374,29 +380,34 @@ private void handleExecutePiggyBack(final ExecutePiggybackCommand command) {
}

private void deserializePiggybackCommand(final ExecutePiggybackCommand command,
final Consumer<Jsonifiable<?>> onSuccess, final Consumer<DittoRuntimeException> onError) {
final Consumer<Object> onSuccess, final Consumer<DittoRuntimeException> onError) {

final JsonObject piggybackCommandJson = command.getPiggybackCommand();
@Nullable final String piggybackCommandType = piggybackCommandJson.getValue(Command.JsonFields.TYPE)
.orElse(null);
final Consumer<JsonParsable<Jsonifiable<?>>> action = mappingStrategy -> {
try {
onSuccess.accept(mappingStrategy.parse(piggybackCommandJson, command.getDittoHeaders()));
} catch (final DittoRuntimeException e) {
logger.withCorrelationId(command)
.warning("Got DittoRuntimeException while parsing PiggybackCommand <{}>: {}!",
piggybackCommandType, e);
onError.accept(e);
}
};
final Runnable emptyAction = () -> {
final String msgPattern = "ExecutePiggybackCommand with PiggybackCommand <%s> cannot be executed by this"
+ " service as there is no mapping strategy for it!";
final String message = String.format(msgPattern, piggybackCommandType);
logger.withCorrelationId(command).warning(message);
onError.accept(JsonTypeNotParsableException.fromMessage(message, command.getDittoHeaders()));
};
serviceMappingStrategy.getMappingStrategy(piggybackCommandType).ifPresentOrElse(action, emptyAction);
if (POISON_PILL_NAME.equals(piggybackCommandType)) {
onSuccess.accept(PoisonPill.getInstance());
} else {
final Consumer<JsonParsable<Jsonifiable<?>>> action = mappingStrategy -> {
try {
onSuccess.accept(mappingStrategy.parse(piggybackCommandJson, command.getDittoHeaders()));
} catch (final DittoRuntimeException e) {
logger.withCorrelationId(command)
.warning("Got DittoRuntimeException while parsing PiggybackCommand <{}>: {}!",
piggybackCommandType, e);
onError.accept(e);
}
};
final Runnable emptyAction = () -> {
final String msgPattern =
"ExecutePiggybackCommand with PiggybackCommand <%s> cannot be executed by this"
+ " service as there is no mapping strategy for it!";
final String message = String.format(msgPattern, piggybackCommandType);
logger.withCorrelationId(command).warning(message);
onError.accept(JsonTypeNotParsableException.fromMessage(message, command.getDittoHeaders()));
};
serviceMappingStrategy.getMappingStrategy(piggybackCommandType).ifPresentOrElse(action, emptyAction);
}
}

private DevOpsErrorResponse getErrorResponse(final DevOpsCommand<?> command, final JsonObject error) {
Expand Down Expand Up @@ -603,6 +614,7 @@ private void handleDittoRuntimeException(final DittoRuntimeException dittoRuntim

@Override
public void preStart() throws Exception {
super.preStart();
final var context = getContext();
context.setReceiveTimeout(getReceiveTimeout());
}
Expand Down
Expand Up @@ -323,6 +323,7 @@ private void doRemoveSubscriber(final ActorRef subscriber) {
private void writeLocalDData() {
final var writeConsistency = (Replicator.WriteConsistency) Replicator.writeLocal();
if (resetProbability > 0 && Math.random() < resetProbability) {
log().debug("Resetting ddata ack-labels: <{}>", getSelf());
ackDData.getWriter().reset(ownAddress, exportNextUpdate(), writeConsistency).whenComplete(this::logError);
} else {
final LiteralUpdate diff = createAndSetDDataUpdate();
Expand Down
Expand Up @@ -233,6 +233,7 @@ private CompletionStage<SubscriptionsReader> performDDataOp(final boolean localS
final SubscriptionsReader snapshot = subscriptions.snapshot();
final CompletionStage<Void> ddataOp;
if (resetProbability > 0 && Math.random() < resetProbability) {
log().debug("Resetting ddata topics: <{}>", getSelf());
ddataOp = ddata.getWriter().reset(subscriber, subscriptions.export(), writeConsistency);
} else if (!localSubscriptionsChanged) {
ddataOp = CompletableFuture.completedStage(null);
Expand Down

0 comments on commit 8838f0c

Please sign in to comment.