Skip to content

Commit

Permalink
forward "live" CommandResponses received in InboundDispatchingSink to…
Browse files Browse the repository at this point in the history
… edge commandForwarder

* let headers by Signal take precedence from inbound payload mapping

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jul 13, 2022
1 parent d25d348 commit f4dafd0
Showing 1 changed file with 8 additions and 5 deletions.
Expand Up @@ -306,7 +306,10 @@ public Optional<Signal<?>> onMapped(final String mapperId,

final DittoHeaders mappedHeaders =
applyInboundHeaderMapping(signal, incomingMessage, authorizationContext,
mappedInboundMessage.getTopicPath(), incomingMessage.getInternalHeaders());
mappedInboundMessage.getTopicPath(), incomingMessage.getInternalHeaders())
.toBuilder()
.putHeaders(signal.getDittoHeaders()) // headers from signal take precedence
.build();

logger.withCorrelationId(mappedHeaders).info("onMapped mappedHeaders {}", mappedHeaders);

Expand Down Expand Up @@ -501,9 +504,10 @@ private PartialFunction<Signal<?>, Stream<IncomingSignal>> dispatchResponsesAndS
forwardAcknowledgement(ack, declaredAckLabels, outcomes))
.match(Acknowledgements.class, acks ->
forwardAcknowledgements(acks, declaredAckLabels, outcomes))
.match(CommandResponse.class, ProtocolAdapter::isLiveSignal, liveResponse ->
forwardToClientActor(liveResponse, ActorRef.noSender())
)
.match(CommandResponse.class, ProtocolAdapter::isLiveSignal, liveResponse -> {
proxyActor.tell(liveResponse, ActorRef.noSender());
return Stream.empty();
})
.match(CreateSubscription.class, cmd -> forwardToConnectionActor(cmd, sender))
.match(WithSubscriptionId.class, cmd -> forwardToClientActor(cmd, sender))
.matchAny(baseSignal -> ackregatorStarter.preprocess(baseSignal,
Expand Down Expand Up @@ -638,7 +642,6 @@ private void handleErrorDuringStartingOfAckregator(final DittoRuntimeException e
* Only special Signals must be forwarded to the {@code ClientActor}:
* <ul>
* <li>{@code Acknowledgement}s which were received via an incoming connection source</li>
* <li>live {@code CommandResponse}s which were received via an incoming connection source</li>
* <li>{@code SearchCommand}s which were received via an incoming connection source</li>
* </ul>
*
Expand Down

0 comments on commit f4dafd0

Please sign in to comment.