Skip to content

Commit

Permalink
Issue #559 remove workaround and build response corrently
Browse files Browse the repository at this point in the history
Since live retrieve responses are now routed to concierge for filtering,
headers like the auth context are needed. Instead of adding the original
headers afterwards, build the response correctly on the first go.

Signed-off-by: Joel Bartelheimer <joel.bartelheimer@bosch.io>
  • Loading branch information
jbartelh committed Oct 21, 2021
1 parent 8ab9278 commit 1bd5711
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,15 @@ private void sendBackResponses(final OutboundSignal.MultiMapped multiMapped, @Nu
final ThreadSafeDittoLoggingAdapter l = logger.withCorrelationId(multiMapped.getSource());
if (!nonAcknowledgementsResponses.isEmpty() && sender != null) {
nonAcknowledgementsResponses.forEach(response -> {
// TODO remove header merging when mergeWithResponseHeaders in HttpPublisherActor is fixed
// and headers are added to the response
final var sourceDittoHeaders = multiMapped.getSource().getDittoHeaders();
final var responseDittoHeaders = response.getDittoHeaders();
final var combinedHeaders = DittoHeaders.newBuilder(sourceDittoHeaders)
.putHeaders(responseDittoHeaders)
.build();

final var responseWithPreservedHeaders =
response.setDittoHeaders(combinedHeaders);
if (responseWithPreservedHeaders instanceof ThingQueryCommandResponse
&& isLiveResponse(responseWithPreservedHeaders)) {
if (response instanceof ThingQueryCommandResponse && isLiveResponse(response)) {
l.debug("LiveQueryCommandResponse created from HTTP response. " +
"Sending response <{}> to concierge for filtering", responseWithPreservedHeaders);
"Sending response <{}> to concierge for filtering", response);

proxyActor.tell(responseWithPreservedHeaders, sender);
proxyActor.tell(response, sender);
} else {
l.debug("CommandResponse created from HTTP response. Replying to <{}>: <{}>", sender,
responseWithPreservedHeaders);
sender.tell(responseWithPreservedHeaders, getSelf());
response);
sender.tell(response, getSelf());
}
});
} else if (nonAcknowledgementsResponses.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.eclipse.ditto.messages.model.signals.commands.SendFeatureMessageResponse;
import org.eclipse.ditto.messages.model.signals.commands.SendThingMessage;
import org.eclipse.ditto.messages.model.signals.commands.SendThingMessageResponse;
import org.eclipse.ditto.protocol.JsonifiableAdaptable;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
Expand Down Expand Up @@ -185,7 +184,8 @@ private static Uri setPathAndQuery(final Uri uri, @Nullable final String path, @
return newUri;
}

private static Pair<Iterable<HttpHeader>, ContentType> getHttpHeadersPair(final Map<String, String> messageHeaders) {
private static Pair<Iterable<HttpHeader>, ContentType> getHttpHeadersPair(
final Map<String, String> messageHeaders) {
final Collection<HttpHeader> headers = new ArrayList<>(messageHeaders.size());
ContentType contentType = null;
for (final var entry : messageHeaders.entrySet()) {
Expand Down Expand Up @@ -465,7 +465,7 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
httpStatus);
} else if (sentSignal instanceof ThingCommand &&
SignalInformationPoint.isChannelLive(sentSignal)) {
result = toLiveCommandResponse(mergedDittoHeaders, body);
result = toLiveCommandResponse(sentSignal, mergedDittoHeaders, body);
} else {
result = null;
}
Expand All @@ -481,7 +481,7 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
.filter(org.eclipse.ditto.base.model.headers.contenttype.ContentType::isDittoProtocol)
.isPresent();
if (isDittoProtocolMessage && body.isObject()) {
final CommandResponse<?> parsedResponse = toCommandResponse(body.asObject());
final CommandResponse<?> parsedResponse = toCommandResponse(body.asObject(), sentSignal);
if (parsedResponse instanceof Acknowledgement) {
result = parsedResponse;
} else if (SignalInformationPoint.isLiveCommandResponse(parsedResponse)) {
Expand Down Expand Up @@ -529,7 +529,7 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
}

@Nullable
private MessageCommandResponse<?, ?> toMessageCommandResponse(final MessageCommand<?, ?> messageCommand,
private MessageCommandResponse<?, ?> toMessageCommandResponse(final MessageCommand<?, ?> sentMessageCommand,
final DittoHeaders dittoHeaders,
final JsonValue jsonValue,
final HttpStatus status) {
Expand All @@ -538,7 +538,7 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
.filter(org.eclipse.ditto.base.model.headers.contenttype.ContentType::isDittoProtocol)
.isPresent();
if (isDittoProtocolMessage && jsonValue.isObject()) {
final CommandResponse<?> commandResponse = toCommandResponse(jsonValue.asObject());
final CommandResponse<?> commandResponse = toCommandResponse(jsonValue.asObject(), sentMessageCommand);
if (commandResponse == null) {
return null;
} else if (commandResponse instanceof MessageCommandResponse) {
Expand All @@ -550,7 +550,7 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
return null;
}
} else {
final var commandMessage = messageCommand.getMessage();
final var commandMessage = sentMessageCommand.getMessage();
final var messageHeaders = MessageHeadersBuilder.of(commandMessage.getHeaders())
.httpStatus(status)
.putHeaders(dittoHeaders)
Expand All @@ -559,34 +559,35 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
.payload(jsonValue)
.build();

switch (messageCommand.getType()) {
switch (sentMessageCommand.getType()) {
case SendClaimMessage.TYPE:
return SendClaimMessageResponse.of(messageCommand.getEntityId(), message, status,
return SendClaimMessageResponse.of(sentMessageCommand.getEntityId(), message, status,
dittoHeaders);
case SendThingMessage.TYPE:
return SendThingMessageResponse.of(messageCommand.getEntityId(), message, status,
return SendThingMessageResponse.of(sentMessageCommand.getEntityId(), message, status,
dittoHeaders);
case SendFeatureMessage.TYPE:
final SendFeatureMessage<?> sendFeatureMessage = (SendFeatureMessage<?>) messageCommand;
return SendFeatureMessageResponse.of(messageCommand.getEntityId(),
final SendFeatureMessage<?> sendFeatureMessage = (SendFeatureMessage<?>) sentMessageCommand;
return SendFeatureMessageResponse.of(sentMessageCommand.getEntityId(),
sendFeatureMessage.getFeatureId(), message, status, dittoHeaders);
default:
connectionLogger.failure("Initial message command type <{0}> is unknown.",
messageCommand.getType());
sentMessageCommand.getType());
return null;
}
}
}

@Nullable
private CommandResponse<?> toLiveCommandResponse(final DittoHeaders dittoHeaders,
private CommandResponse<?> toLiveCommandResponse(final Signal<?> sentSignal,
final DittoHeaders dittoHeaders,
final JsonValue jsonValue) {

final boolean isDittoProtocolMessage = dittoHeaders.getDittoContentType()
.filter(org.eclipse.ditto.base.model.headers.contenttype.ContentType::isDittoProtocol)
.isPresent();
if (isDittoProtocolMessage && jsonValue.isObject()) {
final CommandResponse<?> commandResponse = toCommandResponse(jsonValue.asObject());
final var commandResponse = toCommandResponse(jsonValue.asObject(), sentSignal);
if (commandResponse == null) {
return null;
} else if (commandResponse instanceof ThingCommandResponse &&
Expand All @@ -604,11 +605,18 @@ private CommandResponse<?> toLiveCommandResponse(final DittoHeaders dittoHeaders
}

@Nullable
private CommandResponse<?> toCommandResponse(final JsonObject jsonObject) {
final JsonifiableAdaptable jsonifiableAdaptable = ProtocolFactory.jsonifiableAdaptableFromJson(jsonObject);
final Signal<?> signal = DITTO_PROTOCOL_ADAPTER.fromAdaptable(jsonifiableAdaptable);
private CommandResponse<?> toCommandResponse(final JsonObject jsonObject, final Signal<?> sentSignal) {
final var jsonifiableAdaptable = ProtocolFactory.jsonifiableAdaptableFromJson(jsonObject);
final var signal = DITTO_PROTOCOL_ADAPTER.fromAdaptable(jsonifiableAdaptable);
if (signal instanceof CommandResponse) {
return (CommandResponse<?>) signal;
// use ditto headers from original sent signal as base, so that headers like the ditto-auth-context are set,
// they might be needed by concierge for filtering,
final var commandResponse = (CommandResponse<?>) signal;
final var fromSourceHeaders = sentSignal.getDittoHeaders().toBuilder()
.putHeaders(commandResponse.getDittoHeaders())
.build();

return commandResponse.setDittoHeaders(fromSourceHeaders);
} else {
connectionLogger.exception("Expected <{}> to be of type <{}> but was of type <{}>.",
jsonObject, CommandResponse.class.getSimpleName(), signal.getClass().getSimpleName());
Expand Down

0 comments on commit 1bd5711

Please sign in to comment.