Skip to content

Commit

Permalink
Expect null message in EnforcementScheduler due to pre-enforcer failu…
Browse files Browse the repository at this point in the history
…res.

Signed-off-by: Yufei Cai <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Mar 23, 2020
1 parent 203c55e commit 053c692
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 19 deletions.
Expand Up @@ -130,6 +130,15 @@ Contextual<T> withAskFuture(final Supplier<CompletionStage<Object>> askFuture) {
startedTimer, receiver, receiverWrapperFunction, responseReceivers, askFuture, changesAuthorization);
}

/**
* Retrieve the message but tolerate that it may be null.
*
* @return the optional message.
*/
public Optional<T> getMessageOptional() {
return Optional.ofNullable(message);
}

@Override
public T getMessage() {
if (message == null) {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
Expand Down Expand Up @@ -98,27 +99,34 @@ private void futureComplete(final FutureComplete futureComplete) {

private Void dispatchEnforcedMessage(final Contextual<?> enforcementResult) {
final DittoDiagnosticLoggingAdapter logger = enforcementResult.getLog();
logger.setCorrelationId(enforcementResult.getMessage());
final Optional<ActorRef> receiverOpt = enforcementResult.getReceiver();
final Optional<Supplier<CompletionStage<Object>>> askFutureOpt = enforcementResult.getAskFuture();
if (askFutureOpt.isPresent() && receiverOpt.isPresent()) {
final ActorRef receiver = receiverOpt.get();
logger.debug("About to pipe contextual message <{}> after ask-step to receiver: <{}>",
enforcementResult.getMessage(), receiver);
// It does not disrupt command order guarantee to run the ask-future here if the ask-future
// is initiated by a call to Patterns.ask(), because Patterns.ask() calls ActorRef.tell()
// in the calling thread.
Patterns.pipe(askFutureOpt.get().get(), getContext().dispatcher()).to(receiver);
} else if (receiverOpt.isPresent()) {
final ActorRef receiver = receiverOpt.get();
final Object wrappedMsg =
enforcementResult.getReceiverWrapperFunction().apply(enforcementResult.getMessage());
logger.debug("About to send contextual message <{}> to receiver: <{}>", wrappedMsg, receiver);
receiver.tell(wrappedMsg, enforcementResult.getSender());
final Optional<? extends WithDittoHeaders> messageOpt = enforcementResult.getMessageOptional();
if (messageOpt.isPresent()) {
final WithDittoHeaders<?> message = messageOpt.get();
logger.setCorrelationId(message);
final Optional<ActorRef> receiverOpt = enforcementResult.getReceiver();
final Optional<Supplier<CompletionStage<Object>>> askFutureOpt = enforcementResult.getAskFuture();
if (askFutureOpt.isPresent() && receiverOpt.isPresent()) {
final ActorRef receiver = receiverOpt.get();
logger.debug("About to pipe contextual message <{}> after ask-step to receiver: <{}>",
message, receiver);
// It does not disrupt command order guarantee to run the ask-future here if the ask-future
// is initiated by a call to Patterns.ask(), because Patterns.ask() calls ActorRef.tell()
// in the calling thread.
Patterns.pipe(askFutureOpt.get().get(), getContext().dispatcher()).to(receiver);
} else if (receiverOpt.isPresent()) {
final ActorRef receiver = receiverOpt.get();
final Object wrappedMsg =
enforcementResult.getReceiverWrapperFunction().apply(message);
logger.debug("About to send contextual message <{}> to receiver: <{}>", wrappedMsg, receiver);
receiver.tell(wrappedMsg, enforcementResult.getSender());
} else {
logger.debug("No receiver found in Contextual - as a result just ignoring it: <{}>", enforcementResult);
}
logger.discardCorrelationId();
} else {
logger.debug("No receiver found in Contextual - as a result just ignoring it: <{}>", enforcementResult);
// message does not exist; nothing to dispatch
logger.debug("Not dispatching due to lack of message: {}", enforcementResult);
}
logger.discardCorrelationId();
return null;
}

Expand Down

0 comments on commit 053c692

Please sign in to comment.