Skip to content

Commit

Permalink
For live signals it's required to restore command connectivity headers
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Oct 14, 2020
1 parent 87de0c5 commit 205e732
Showing 1 changed file with 37 additions and 1 deletion.
Expand Up @@ -14,6 +14,7 @@

import static org.eclipse.ditto.services.connectivity.messaging.validation.ConnectionValidator.resolveConnectionIdPlaceholder;

import java.time.Duration;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -39,6 +40,7 @@
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.model.connectivity.ConnectivityInternalErrorException;
import org.eclipse.ditto.model.connectivity.Source;
import org.eclipse.ditto.model.placeholders.ExpressionResolver;
import org.eclipse.ditto.model.placeholders.PlaceholderFactory;
Expand All @@ -55,6 +57,7 @@
import org.eclipse.ditto.services.connectivity.messaging.monitoring.DefaultConnectionMonitorRegistry;
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.InfoProviderFactory;
import org.eclipse.ditto.services.connectivity.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.services.models.acks.AcknowledgementAggregatorActor;
import org.eclipse.ditto.services.models.acks.AcknowledgementAggregatorActorStarter;
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
import org.eclipse.ditto.services.models.connectivity.MappedInboundExternalMessage;
Expand All @@ -67,7 +70,10 @@
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityErrorResponse;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.messages.acks.MessageCommandAckRequestSetter;
import org.eclipse.ditto.signals.commands.things.ThingCommand;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.eclipse.ditto.signals.commands.things.acks.ThingLiveCommandAckRequestSetter;
import org.eclipse.ditto.signals.commands.things.acks.ThingModifyCommandAckRequestSetter;
Expand All @@ -78,6 +84,7 @@
import akka.actor.Props;
import akka.japi.pf.PFBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import scala.PartialFunction;
import scala.util.Either;
import scala.util.Left;
Expand Down Expand Up @@ -354,11 +361,40 @@ private int dispatchIncomingSignal(final IncomingSignal incomingSignal) {
}
return 1;
} else {
proxyActor.tell(signal, sender);
if (sender != null && isLive(signal)) {
final DittoHeaders originalHeaders = signal.getDittoHeaders();
Patterns.ask(proxyActor, signal, originalHeaders.getTimeout().orElse(Duration.ofSeconds(10)))
.thenApply(response -> {
if (response instanceof WithDittoHeaders<?>) {
return AcknowledgementAggregatorActor.restoreCommandConnectivityHeaders(
(WithDittoHeaders<?>) response,
originalHeaders);
} else {
final String messageTemplate =
"Expected response <%s> to be of type <%s> but was of type <%s>.";
final String errorMessage =
String.format(messageTemplate, response, WithDittoHeaders.class.getName(),
response.getClass().getName());
final ConnectivityInternalErrorException dre =
ConnectivityInternalErrorException.newBuilder()
.cause(new ClassCastException(errorMessage))
.build();
return ConnectivityErrorResponse.of(dre, originalHeaders);
}
})
.thenAccept(response -> sender.tell(response, ActorRef.noSender()));
} else {
proxyActor.tell(signal, sender);
}
return 0;
}
}

private static boolean isLive(final Signal<?> signal) {
return (signal instanceof MessageCommand ||
(signal instanceof ThingCommand && ProtocolAdapter.isLiveSignal(signal)));
}

private void startAckregatorAndForwardSignal(final Signal<?> signal, @Nullable final ActorRef sender) {
ackregatorStarter.doStart(signal,
responseSignal -> {
Expand Down

0 comments on commit 205e732

Please sign in to comment.