Skip to content

Commit

Permalink
Issue #106 add auth context of target to sending-context
Browse files Browse the repository at this point in the history
* used to route live-responses from httpPush
* disable replyTarget unit-tests for httpPush, because it's not possible via httpPush

Signed-off-by: Joel Bartelheimer <joel.bartelheimer@bosch.io>
  • Loading branch information
jbartelh committed Nov 2, 2021
1 parent 3be40a1 commit 407985c
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
import org.eclipse.ditto.base.model.common.Placeholders;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
Expand Down Expand Up @@ -439,6 +440,7 @@ private SendingContext getSendingContextForTarget(final OutboundSignal.Mapped ou
.droppedMonitor(droppedMonitor)
.acknowledgedMonitor(acknowledgedMonitor)
.autoAckTarget(autoAckTarget)
.targetAuthorizationContext(target.getAuthorizationContext())
.build();
}

Expand Down Expand Up @@ -478,6 +480,7 @@ private SendingOrDropped publishToGenericTarget(final ExpressionResolver resolve
l.debug("Publishing mapped message of type <{}> to PublishTarget <{}>: {}", outboundSource.getType(),
publishTarget, sendingContext.getExternalMessage());
@Nullable final Target autoAckTarget = sendingContext.getAutoAckTarget().orElse(null);

final HeaderMapping headerMapping = genericTarget.getHeaderMapping();
final ExternalMessage mappedMessage = applyHeaderMapping(resolver, outbound, headerMapping);
final Context context = DittoTracing.extractTraceContext(mappedMessage.getHeaders());
Expand All @@ -489,12 +492,14 @@ private SendingOrDropped publishToGenericTarget(final ExpressionResolver resolve
final ExternalMessage mappedMessageWithTraceContext =
DittoTracing.propagateContext(trace.getContext(), mappedMessage,
(msg, entry) -> msg.withHeader(entry.getKey(), entry.getValue()));

final CompletionStage<SendResult> responsesFuture = publishMessage(outboundSource,
autoAckTarget,
publishTarget,
mappedMessageWithTraceContext,
maxTotalMessageSize,
quota
quota,
sendingContext.getTargetAuthorizationContext().orElse(null)
);
responsesFuture.whenComplete((sr, throwable) -> trace.finish());
// set the external message after header mapping for the result of header mapping to show up in log
Expand Down Expand Up @@ -558,7 +563,8 @@ protected abstract CompletionStage<SendResult> publishMessage(Signal<?> signal,
T publishTarget,
ExternalMessage message,
int maxTotalMessageSize,
int ackSizeQuota);
int ackSizeQuota,
@Nullable AuthorizationContext targetAuthorizationContext);

/**
* Decode a byte buffer according to the charset specified in an external message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private void handleInboundResponseOrAcknowledgement(final Signal<?> responseOrAc

final var context = getContext();
final var proxyActor = settings.getProxyActor();
final Consumer<ActorRef> action = acknowledgementForwarder -> {
final Consumer<ActorRef> forwardAck = acknowledgementForwarder -> {
if (responseOrAck instanceof ThingQueryCommandResponse && isLiveResponse(responseOrAck)) {
// forward live command responses to concierge to filter response
proxyActor.tell(responseOrAck, getSender());
Expand All @@ -187,7 +187,7 @@ private void handleInboundResponseOrAcknowledgement(final Signal<?> responseOrAc
}
};

final Runnable emptyAction = () -> {
final Runnable forwardToConcierge = () -> {
final var forwarderActorClassName = AcknowledgementForwarderActor.class.getSimpleName();
final var template = "No {} found. Forwarding signal to concierge. <{}>";
if (logger.isDebugEnabled()) {
Expand All @@ -200,7 +200,7 @@ private void handleInboundResponseOrAcknowledgement(final Signal<?> responseOrAc
};

context.findChild(AcknowledgementForwarderActor.determineActorName(responseOrAck.getDittoHeaders()))
.ifPresentOrElse(action, emptyAction);
.ifPresentOrElse(forwardAck, forwardToConcierge);
}

private void denyNonSourceDeclaredAck(final Acknowledgement ack) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.connectivity.model.GenericTarget;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
Expand All @@ -38,14 +39,16 @@ final class SendingContext {
@Nullable private final ConnectionMonitor acknowledgedMonitor;
private final ConnectionMonitor droppedMonitor;
@Nullable private final Target autoAckTarget;
@Nullable private final AuthorizationContext targetAuthorizationContext;

private SendingContext(final OutboundSignal.Mapped outboundSignal,
final ExternalMessage externalMessage,
final GenericTarget genericTarget,
final ConnectionMonitor publishedMonitor,
@Nullable final ConnectionMonitor acknowledgedMonitor,
final ConnectionMonitor droppedMonitor,
@Nullable final Target autoAckTarget) {
@Nullable final Target autoAckTarget,
@Nullable final AuthorizationContext targetAuthorizationContext) {

this.outboundSignal = outboundSignal;
this.externalMessage = externalMessage;
Expand All @@ -54,6 +57,7 @@ private SendingContext(final OutboundSignal.Mapped outboundSignal,
this.acknowledgedMonitor = acknowledgedMonitor;
this.droppedMonitor = droppedMonitor;
this.autoAckTarget = autoAckTarget;
this.targetAuthorizationContext = targetAuthorizationContext;
}

private SendingContext(final Builder builder) {
Expand All @@ -64,6 +68,7 @@ private SendingContext(final Builder builder) {
acknowledgedMonitor = builder.acknowledgedMonitor;
droppedMonitor = checkNotNull(builder.droppedMonitor, "droppedMonitor");
autoAckTarget = builder.autoAckTarget;
targetAuthorizationContext = builder.targetAuthorizationContext;
}

static Builder newBuilder() {
Expand Down Expand Up @@ -98,6 +103,10 @@ Optional<Target> getAutoAckTarget() {
return Optional.ofNullable(autoAckTarget);
}

Optional<AuthorizationContext> getTargetAuthorizationContext() {
return Optional.ofNullable(targetAuthorizationContext);
}

boolean shouldAcknowledge() {
return null != autoAckTarget && null != acknowledgedMonitor;
}
Expand All @@ -109,7 +118,8 @@ SendingContext setExternalMessage(final ExternalMessage externalMessage) {
publishedMonitor,
acknowledgedMonitor,
droppedMonitor,
autoAckTarget);
autoAckTarget,
targetAuthorizationContext);
}

/**
Expand All @@ -125,6 +135,7 @@ static final class Builder {
@Nullable private ConnectionMonitor acknowledgedMonitor;
private ConnectionMonitor droppedMonitor;
@Nullable private Target autoAckTarget;
@Nullable private AuthorizationContext targetAuthorizationContext;

private Builder() {
outboundSignal = null;
Expand All @@ -134,6 +145,7 @@ private Builder() {
acknowledgedMonitor = null;
droppedMonitor = null;
autoAckTarget = null;
targetAuthorizationContext = null;
}

SendingContext build() {
Expand Down Expand Up @@ -175,6 +187,11 @@ Builder autoAckTarget(@Nullable final Target autoAckTarget) {
return this;
}

Builder targetAuthorizationContext(@Nullable final AuthorizationContext targetAuthorizationContext) {
this.targetAuthorizationContext = targetAuthorizationContext;
return this;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import org.apache.qpid.jms.message.JmsMessage;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.common.Placeholders;
import org.eclipse.ditto.base.model.entity.id.EntityId;
Expand Down Expand Up @@ -331,7 +332,8 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,
final AmqpTarget publishTarget,
final ExternalMessage message,
final int maxTotalMessageSize,
final int ackSizeQuota) {
final int ackSizeQuota,
@Nullable final AuthorizationContext targetAuthorizationContext) {

if (!isInBackOffMode) {
final CompletableFuture<SendResult> resultFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.common.HttpStatusCodeOutOfRangeException;
import org.eclipse.ditto.base.model.entity.id.EntityId;
Expand All @@ -45,6 +46,7 @@
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityInternalErrorException;
import org.eclipse.ditto.connectivity.model.GenericTarget;
import org.eclipse.ditto.connectivity.model.MessageSendingFailedException;
import org.eclipse.ditto.connectivity.model.Target;
Expand Down Expand Up @@ -322,12 +324,21 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,
final HttpPublishTarget publishTarget,
final ExternalMessage message,
final int maxTotalMessageSize,
final int ackSizeQuota) {
final int ackSizeQuota,
@Nullable final AuthorizationContext targetAuthorizationContext) {

if (null == targetAuthorizationContext) {
// only for reply-targets this context is empty, but this can never be the case for http.
throw ConnectivityInternalErrorException.fromMessage("Authorization context for target is missing",
message.getInternalHeaders());
}

final var resultFuture = new CompletableFuture<SendResult>();
final var request = createRequest(publishTarget, message);
final var context =
newContext(signal, autoAckTarget, request, message, maxTotalMessageSize, ackSizeQuota, resultFuture);
newContext(signal, autoAckTarget, request, message, maxTotalMessageSize, ackSizeQuota,
targetAuthorizationContext, resultFuture);

sourceQueue.offer(Pair.create(request, context))
.handle(handleQueueOfferResult(message, resultFuture));

Expand Down Expand Up @@ -392,6 +403,7 @@ private HttpPushContext newContext(final Signal<?> signal,
final ExternalMessage message,
final int maxTotalMessageSize,
final int ackSizeQuota,
final AuthorizationContext targetAuthorizationContext,
final CompletableFuture<SendResult> resultFuture) {

return tryResponse -> {
Expand All @@ -404,7 +416,8 @@ private HttpPushContext newContext(final Signal<?> signal,
l.debug("Got response <{} {} {}>", response.status(), response.getHeaders(),
response.entity().getContentType());

toCommandResponseOrAcknowledgement(signal, autoAckTarget, response, maxTotalMessageSize, ackSizeQuota)
toCommandResponseOrAcknowledgement(signal, autoAckTarget, response, maxTotalMessageSize, ackSizeQuota,
targetAuthorizationContext)
.thenAccept(resultFuture::complete)
.exceptionally(e -> {
resultFuture.completeExceptionally(e);
Expand Down Expand Up @@ -434,7 +447,8 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
@Nullable final Target autoAckTarget,
final HttpResponse response,
final int maxTotalMessageSize,
final int ackSizeQuota) {
final int ackSizeQuota,
final AuthorizationContext targetAuthorizationContext) {

final var autoAckLabel = getAcknowledgementLabel(autoAckTarget);

Expand Down Expand Up @@ -464,10 +478,10 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
// Live-Response is declared as issued ack => parse live response from response
if (sentSignal instanceof MessageCommand) {
result = toMessageCommandResponse((MessageCommand<?, ?>) sentSignal, mergedDittoHeaders, body,
httpStatus);
httpStatus, targetAuthorizationContext);
} else if (sentSignal instanceof ThingCommand &&
SignalInformationPoint.isChannelLive(sentSignal)) {
result = toLiveCommandResponse(sentSignal, mergedDittoHeaders, body);
result = toLiveCommandResponse(mergedDittoHeaders, body, targetAuthorizationContext);
} else {
result = null;
}
Expand All @@ -483,7 +497,7 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
.filter(org.eclipse.ditto.base.model.headers.contenttype.ContentType::isDittoProtocol)
.isPresent();
if (isDittoProtocolMessage && body.isObject()) {
final CommandResponse<?> parsedResponse = toCommandResponse(body.asObject(), sentSignal);
final CommandResponse<?> parsedResponse = toCommandResponse(body.asObject(), targetAuthorizationContext);
if (parsedResponse instanceof Acknowledgement) {
result = parsedResponse;
} else if (SignalInformationPoint.isLiveCommandResponse(parsedResponse)) {
Expand Down Expand Up @@ -543,13 +557,14 @@ private static Optional<SignalWithEntityId<?>> tryToGetAsLiveCommandWithEntityId
private MessageCommandResponse<?, ?> toMessageCommandResponse(final MessageCommand<?, ?> sentMessageCommand,
final DittoHeaders dittoHeaders,
final JsonValue jsonValue,
final HttpStatus status) {
final HttpStatus status,
final AuthorizationContext targetAuthorizationContext) {

final boolean isDittoProtocolMessage = dittoHeaders.getDittoContentType()
.filter(org.eclipse.ditto.base.model.headers.contenttype.ContentType::isDittoProtocol)
.isPresent();
if (isDittoProtocolMessage && jsonValue.isObject()) {
final CommandResponse<?> commandResponse = toCommandResponse(jsonValue.asObject(), sentMessageCommand);
final CommandResponse<?> commandResponse = toCommandResponse(jsonValue.asObject(), targetAuthorizationContext);
if (commandResponse == null) {
return null;
} else if (commandResponse instanceof MessageCommandResponse) {
Expand Down Expand Up @@ -590,15 +605,15 @@ private static Optional<SignalWithEntityId<?>> tryToGetAsLiveCommandWithEntityId
}

@Nullable
private CommandResponse<?> toLiveCommandResponse(final Signal<?> sentSignal,
final DittoHeaders dittoHeaders,
final JsonValue jsonValue) {
private CommandResponse<?> toLiveCommandResponse(final DittoHeaders dittoHeaders,
final JsonValue jsonValue,
final AuthorizationContext targetAuthorizationContext) {

final boolean isDittoProtocolMessage = dittoHeaders.getDittoContentType()
.filter(org.eclipse.ditto.base.model.headers.contenttype.ContentType::isDittoProtocol)
.isPresent();
if (isDittoProtocolMessage && jsonValue.isObject()) {
final var commandResponse = toCommandResponse(jsonValue.asObject(), sentSignal);
final var commandResponse = toCommandResponse(jsonValue.asObject(), targetAuthorizationContext);
if (commandResponse == null) {
return null;
} else if (commandResponse instanceof ThingCommandResponse &&
Expand All @@ -616,18 +631,20 @@ private CommandResponse<?> toLiveCommandResponse(final Signal<?> sentSignal,
}

@Nullable
private CommandResponse<?> toCommandResponse(final JsonObject jsonObject, final Signal<?> sentSignal) {
private CommandResponse<?> toCommandResponse(final JsonObject jsonObject,
final AuthorizationContext targetAuthorizationContext) {

final var jsonifiableAdaptable = ProtocolFactory.jsonifiableAdaptableFromJson(jsonObject);
final var signal = DITTO_PROTOCOL_ADAPTER.fromAdaptable(jsonifiableAdaptable);

if (signal instanceof CommandResponse) {
// use ditto headers from original sent signal as base, so that headers like the ditto-auth-context are set,
// they might be needed by concierge for filtering,
final var commandResponse = (CommandResponse<?>) signal;
final var fromSourceHeaders = sentSignal.getDittoHeaders().toBuilder()
.putHeaders(commandResponse.getDittoHeaders())
final var dittoHeadersWithTargetAuthorization = signal.getDittoHeaders().toBuilder()
.authorizationContext(targetAuthorizationContext)
.build();

return commandResponse.setDittoHeaders(fromSourceHeaders);
return commandResponse.setDittoHeaders(dittoHeadersWithTargetAuthorization);

} else {
connectionLogger.exception("Expected <{}> to be of type <{}> but was of type <{}>.",
jsonObject, CommandResponse.class.getSimpleName(), signal.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
Expand Down Expand Up @@ -161,7 +162,8 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,
final KafkaPublishTarget publishTarget,
final ExternalMessage message,
final int maxTotalMessageSize,
final int ackSizeQuota) {
final int ackSizeQuota,
@Nullable final AuthorizationContext targetAuthorizationContext) {

@Nullable final AcknowledgementLabel autoAckLabel = getAcknowledgementLabel(autoAckTarget).orElse(null);
final Function<RecordMetadata, SendResult> callback =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
Expand Down Expand Up @@ -165,7 +166,8 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,
final MqttPublishTarget publishTarget,
final ExternalMessage message,
final int maxTotalMessageSize,
final int ackSizeQuota) {
final int ackSizeQuota,
@Nullable final AuthorizationContext targetAuthorizationContext) {

try {
final var messageHeaders = message.getHeaders();
Expand Down

0 comments on commit 407985c

Please sign in to comment.