Skip to content

Commit

Permalink
Add AskWithRetry to EdgeCommandForwarder
Browse files Browse the repository at this point in the history
In order to stabilize requets during service restarts/ unavailabilities.
Currently missing are the pubSub forwards.

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Jun 8, 2022
1 parent 18dfac2 commit e4cc60e
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.edge.service.dispatching;

import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.exceptions.AskException;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.edge.service.EdgeServiceTimeoutException;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.cacheloaders.AskWithRetry;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;

import akka.actor.ActorRef;
import akka.actor.Scheduler;
import akka.pattern.AskTimeoutException;

/**
* Forwards commands from the edges to a specified ActorRef, waiting for a response. Uses retry mechanism if the
* response doesn't arrive.
*/
public final class AskWithRetryForwarder {

private final static ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(AskWithRetryForwarder.class);

private final Scheduler scheduler;
private final Executor executor;
private final AskWithRetryConfig askWithRetryConfig;

private AskWithRetryForwarder(final Scheduler scheduler, final Executor executor,
final AskWithRetryConfig askWithRetryConfig) {

this.scheduler = scheduler;
this.executor = executor;
this.askWithRetryConfig = askWithRetryConfig;
}

static AskWithRetryForwarder newInstance(final Scheduler scheduler, final Executor executor,
final AskWithRetryConfig askWithRetryConfig) {

return new AskWithRetryForwarder(scheduler, executor, askWithRetryConfig);
}

/**
* Asks the given {@code actorToAsk} for a response by telling {@code command}.
* This method uses {@link AskWithRetry}.
*
* @param actorToAsk the actor that should be asked.
* @param command the command that is used to ask.
* @return A completion stage which either completes with a filtered response of type {@link R} or fails with a
* {@link DittoRuntimeException}.
*/
<R extends Signal<?>, C extends Signal<?>> CompletionStage<R> ask(final ActorRef actorToAsk, final C command) {

return AskWithRetry.askWithRetry(actorToAsk, command, askWithRetryConfig, scheduler,
executor, getResponseCaster(command));
}

/**
* Returns a mapping function, which casts an Object response to the command response class.
*
* @return the mapping function.
*/
@SuppressWarnings("unchecked")
private <R extends Signal<?>, C extends Signal<?>> Function<Object, R> getResponseCaster(final C command) {
return response -> {
if (CommandResponse.class.isAssignableFrom(response.getClass())) {
return (R) response;
} else if (response instanceof AskException || response instanceof AskTimeoutException) {
final Optional<DittoRuntimeException> dittoRuntimeException =
handleAskTimeoutForCommand(command, (Throwable) response);
if (dittoRuntimeException.isPresent()) {
throw dittoRuntimeException.get();
} else {
return null;
}
} else {
throw reportErrorOrResponse(command, response, null);
}
};
}

/**
* Report unexpected error or unknown response.
*/
private <C extends Signal<?>> DittoRuntimeException reportErrorOrResponse(final C command,
@Nullable final Object response,
@Nullable final Throwable error) {

if (error != null) {
return reportError(command, error);
} else if (response instanceof Throwable) {
return reportError(command, (Throwable) response);
} else if (response != null) {
return reportUnknownResponse(command, response);
} else {
return reportError(command, new NullPointerException("Response and error were null."));
}
}

/**
* Reports an error differently based on type of the error. If the error is of type
* {@link DittoRuntimeException}, it is returned as is
* (without modification), otherwise it is wrapped inside a {@link DittoInternalErrorException}.
*
* @param throwable the error.
* @return DittoRuntimeException suitable for transmission of the error.
*/
private <C extends Signal<?>> DittoRuntimeException reportError(final C command,
@Nullable final Throwable throwable) {
final Throwable error = throwable == null
? new NullPointerException("Result and error are both null")
: throwable;
final var dre = DittoRuntimeException.asDittoRuntimeException(
error, t -> reportUnexpectedError(command, t));
LOGGER.info(" - {}: {}", dre.getClass().getSimpleName(), dre.getMessage());
return dre;
}


/**
* Report unexpected error.
*/
private <C extends Signal<?>> DittoRuntimeException reportUnexpectedError(final C command, final Throwable error) {
LOGGER.error("Unexpected error", error);

return DittoInternalErrorException.newBuilder()
.cause(error)
.dittoHeaders(command.getDittoHeaders())
.build();
}

/**
* Report unknown response.
*/
private <C extends Signal<?>> DittoInternalErrorException reportUnknownResponse(final C command,
final Object response) {

LOGGER.error("Unexpected response: <{}>", response);
return DittoInternalErrorException.newBuilder().dittoHeaders(command.getDittoHeaders()).build();
}

/**
* Report timeout.
*
* @param command the original command.
* @param askTimeout the timeout exception.
*/
private <C extends Signal<?>>Optional<DittoRuntimeException> handleAskTimeoutForCommand(final C command,
final Throwable askTimeout) {

LOGGER.withCorrelationId(command.getDittoHeaders()).error("Encountered timeout in edge forwarding", askTimeout);
return Optional.of(EdgeServiceTimeoutException.newBuilder()
.dittoHeaders(command.getDittoHeaders())
.build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@
*/
package org.eclipse.ditto.edge.service.dispatching;

import java.util.concurrent.CompletionException;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cacheloaders.config.DefaultAskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.policies.model.signals.commands.PolicyCommand;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
Expand All @@ -31,6 +39,7 @@

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;
Expand All @@ -40,6 +49,7 @@
* directly to the shard regions of the services the commands are targeted to.
* For "thing search" commands, it sends them via pub/sub to the SearchActor.
*/
//TODO CR-11315 Ask-retry for messages sent to PubSub?
public class EdgeCommandForwarderActor extends AbstractActor {

/**
Expand All @@ -52,6 +62,7 @@ public class EdgeCommandForwarderActor extends AbstractActor {
private final ActorRef pubSubMediator;
private final ShardRegions shardRegions;
private final SignalTransformer signalTransformer;
private final AskWithRetryForwarder askWithRetryForwarder;

@SuppressWarnings("unused")
private EdgeCommandForwarderActor(final ActorRef pubSubMediator, final ShardRegions shardRegions,
Expand All @@ -60,6 +71,13 @@ private EdgeCommandForwarderActor(final ActorRef pubSubMediator, final ShardRegi
this.pubSubMediator = pubSubMediator;
this.shardRegions = shardRegions;
this.signalTransformer = signalTransformer;
final AskWithRetryConfig config =
DefaultAskWithRetryConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()),
"ask-with-retry");
final ActorSystem actorSystem = getContext().getSystem();
askWithRetryForwarder = AskWithRetryForwarder.newInstance(actorSystem.getScheduler(),
actorSystem.dispatcher(), config);
}

/**
Expand All @@ -72,6 +90,7 @@ private EdgeCommandForwarderActor(final ActorRef pubSubMediator, final ShardRegi
*/
public static Props props(final ActorRef pubSubMediator, final ShardRegions shardRegions,
final SignalTransformer signalTransformer) {

return Props.create(EdgeCommandForwarderActor.class, pubSubMediator, shardRegions, signalTransformer);
}

Expand Down Expand Up @@ -108,8 +127,8 @@ private void forwardToThings(final MessageCommand<?, ?> messageCommand) {
log.withCorrelationId(transformedMessageCommand)
.info("Forwarding message command with ID <{}> and type <{}> to 'things' shard region",
transformedMessageCommand.getEntityId(), transformedMessageCommand.getType());
shardRegions.things()
.tell(transformedMessageCommand, sender);
askWithRetry(transformedMessageCommand, shardRegions.things(), sender);

});
}

Expand All @@ -121,8 +140,8 @@ private void forwardToThings(final ThingCommand<?> thingCommand) {
log.withCorrelationId(transformedThingCommand)
.info("Forwarding thing command with ID <{}> and type <{}> to 'things' shard region",
transformedThingCommand.getEntityId(), transformedThingCommand.getType());
shardRegions.things()
.tell(transformedThingCommand, sender);
askWithRetry(transformedThingCommand, shardRegions.things(), sender);

});
}

Expand All @@ -140,8 +159,7 @@ private void forwardToPolicies(final PolicyCommand<?> policyCommand) {
log.withCorrelationId(transformedPolicyCommand)
.info("Forwarding policy command with ID <{}> and type <{}> to 'policies' shard region",
transformedPolicyCommand.getEntityId(), transformedPolicyCommand.getType());
shardRegions.policies()
.tell(transformedPolicyCommand, sender);
askWithRetry(transformedPolicyCommand, shardRegions.policies(), sender);
});

}
Expand All @@ -157,8 +175,7 @@ private void forwardToConnectivity(final ConnectivityCommand<?> connectivityComm
.info("Forwarding connectivity command with ID <{}> and type <{}> to 'connections' " +
"shard region", withEntityId.getEntityId(),
transformedConnectivityCommand.getType());
shardRegions.connections()
.tell(transformedConnectivityCommand, sender);
askWithRetry(transformedConnectivityCommand, shardRegions.connections(), sender);
});
} else {
log.withCorrelationId(connectivityCommand)
Expand All @@ -183,4 +200,30 @@ private void handleUnknownSignal(final Signal<?> signal) {
});
}

private void askWithRetry(final Signal<?> command,
final ActorRef receiver,
final ActorRef sender) {

askWithRetryForwarder.ask(receiver, command)
.exceptionally(t -> handleException(t, sender))
.thenAccept(response -> handleResponse(response, sender));
}

@Nullable
private <T extends Signal<?>> T handleException(final Throwable t, final ActorRef sender) {
if (t instanceof CompletionException && t.getCause() instanceof DittoRuntimeException) {
sender.tell(t.getCause(), getSelf());
} else {
throw (RuntimeException) t;
}
return null;
}

private <T extends Signal<?>> void handleResponse(@Nullable final T response, final ActorRef sender) {
if (null != response) {
log.withCorrelationId(response.getDittoHeaders()).debug("Forwarding response: {}", response);
sender.tell(response, getSelf());
}
}

}
33 changes: 29 additions & 4 deletions edge/service/src/main/resources/ditto-edge-service.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,29 @@
ditto.edge-command-forwarder-extension = org.eclipse.ditto.edge.service.dispatching.NoOpEdgeCommandForwarderExtension
ditto.edge-command-forwarder-extension = ${?DITTO_EDGE_COMMAND_FORWARDER_EXTENSION}
ditto.signal-transformer = org.eclipse.ditto.edge.service.dispatching.DefaultNamespaceAppender
ditto.signal-transformer = ${?DITTO_SIGNAL_TRANSFORMER}
ditto {
edge-command-forwarder-extension = org.eclipse.ditto.edge.service.dispatching.NoOpEdgeCommandForwarderExtension
edge-command-forwarder-extension = ${?DITTO_EDGE_COMMAND_FORWARDER_EXTENSION}
signal-transformer = org.eclipse.ditto.edge.service.dispatching.DefaultNamespaceAppender
signal-transformer = ${?DITTO_SIGNAL_TRANSFORMER}

ask-with-retry {
# maximum duration to wait for answers from entity shard regions
ask-timeout = 3s
ask-timeout = ${?CONCIERGE_CACHES_ASK_TIMEOUT}

# one of: OFF, NO_DELAY, FIXED_DELAY, BACKOFF_DELAY
retry-strategy = BACKOFF_DELAY
retry-strategy = ${?CONCIERGE_CACHES_ASK_RETRY_STRATEGY}

retry-attempts = 3
retry-attempts = ${?CONCIERGE_CACHES_ASK_TIMEOUT_RETRIES}

fixed-delay = 5s
fixed-delay = ${?CONCIERGE_CACHES_ASK_FIXED_DELAY}

backoff-delay {
min = 100ms
max = 10s
# must be between 0.0 and 1.0:
random-factor = 0.5
}
}
}

0 comments on commit e4cc60e

Please sign in to comment.