Skip to content

Commit

Permalink
[#1228] moved restoration from connectivity command headers from Ackn…
Browse files Browse the repository at this point in the history
…owledgementAggregatorActor.restoreCommandConnectivityHeaders

to own utility class CommandHeaderRestoration and also used in ThingCommandEnforcement

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Dec 13, 2021
1 parent b1be814 commit 3a0417c
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ default JsonSchemaVersion getImplementedSchemaVersion() {

/**
* Indicates whether this response is of a type contained in
* {@link org.eclipse.ditto.base.model.headers.DittoHeaderDefinition#EXPECTED_RESPONSE_TYPES} header.
* {@link org.eclipse.ditto.base.model.headers.DittoHeaderDefinition#EXPECTED_RESPONSE_TYPES} header or whether it
* was a {@code "live"} response which always is expected.
*
* @return true if this response is expected, false if not.
* @since 1.2.0
*/
default boolean isOfExpectedResponseType() {
return getDittoHeaders().getExpectedResponseTypes().contains(getResponseType());
return getDittoHeaders().getExpectedResponseTypes().contains(getResponseType()) ||
getDittoHeaders().getChannel().map("live"::equals).orElse(false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.eclipse.ditto.concierge.service.actors.LiveResponseAndAcknowledgementForwarder;
import org.eclipse.ditto.concierge.service.enforcement.placeholders.references.PolicyIdReferencePlaceholderResolver;
import org.eclipse.ditto.concierge.service.enforcement.placeholders.references.ReferencePlaceholder;
import org.eclipse.ditto.internal.models.signal.CommandHeaderRestoration;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
Expand Down Expand Up @@ -329,7 +330,9 @@ private Contextual<WithDittoHeaders> enforceThingCommandByPolicyEnforcer(
private CompletionStage<ThingQueryCommandResponse<?>> doSmartChannelSelection(final ThingQueryCommand<?> command,
final ThingQueryCommandResponse<?> response, final Instant startTime, final Enforcer enforcer) {

final var twinResponse = setTwinChannel(response);
final ThingQueryCommandResponse<?> twinResponseWithTwinChannel = setTwinChannel(response);
final ThingQueryCommandResponse<?> twinResponse = CommandHeaderRestoration.restoreCommandConnectivityHeaders(
twinResponseWithTwinChannel, command.getDittoHeaders());
if (command.getDittoHeaders().getLiveChannelCondition().isEmpty() ||
!twinResponse.getDittoHeaders().didLiveChannelConditionMatch()) {
return CompletableFuture.completedStage(twinResponse);
Expand All @@ -349,10 +352,11 @@ private Function<Object, CompletionStage<ThingQueryCommandResponse<?>>> getFallb
return response -> {
if (response instanceof ThingQueryCommandResponse) {
return CompletableFuture.completedStage(
setAdditionalHeaders((ThingQueryCommandResponse<?>) response));
setAdditionalHeaders((ThingQueryCommandResponse<?>) response, liveCommand.getDittoHeaders()));
} else if (response instanceof ErrorResponse) {
return CompletableFuture.failedStage(
setAdditionalHeaders(((ErrorResponse<?>) response).getDittoRuntimeException()));
setAdditionalHeaders(((ErrorResponse<?>) response),
liveCommand.getDittoHeaders()).getDittoRuntimeException());
} else if (response instanceof AskException || response instanceof AskTimeoutException) {
return applyTimeoutStrategy(liveCommand, twinResponse);
} else {
Expand All @@ -364,11 +368,14 @@ private Function<Object, CompletionStage<ThingQueryCommandResponse<?>>> getFallb
};
}

private static <T extends DittoHeadersSettable<T>> T setAdditionalHeaders(final DittoHeadersSettable<T> settable) {
private static <T extends DittoHeadersSettable<T>> T setAdditionalHeaders(final DittoHeadersSettable<T> settable,
final DittoHeaders commandHeaders) {
// TODO: ensure pre-enforcer headers in responses
return settable.setDittoHeaders(settable.getDittoHeaders()
final T dittoHeadersSettable =
CommandHeaderRestoration.restoreCommandConnectivityHeaders(settable, commandHeaders);
return dittoHeadersSettable.setDittoHeaders(dittoHeadersSettable.getDittoHeaders()
.toBuilder()
.putHeaders(getAdditionalLiveResponseHeaders(settable.getDittoHeaders()))
.putHeaders(getAdditionalLiveResponseHeaders(dittoHeadersSettable.getDittoHeaders()))
.build());
}

Expand All @@ -380,14 +387,17 @@ private static CompletionStage<ThingQueryCommandResponse<?>> applyTimeoutStrateg
return CompletableFuture.completedStage(twinResponse);
} else {
final var timeout = LiveSignalEnforcement.getLiveSignalTimeout(command);
final var timeoutException = GatewayCommandTimeoutException.newBuilder(timeout)
final GatewayCommandTimeoutException timeoutException = GatewayCommandTimeoutException.newBuilder(timeout)
.dittoHeaders(twinResponse.getDittoHeaders()
.toBuilder()
.channel(TopicPath.Channel.LIVE.getName())
.putHeaders(getAdditionalLiveResponseHeaders(twinResponse.getDittoHeaders()))
.build())
.build();
return CompletableFuture.failedStage(timeoutException);
final GatewayCommandTimeoutException timeoutExceptionWithConnectivityHeaders =
CommandHeaderRestoration.restoreCommandConnectivityHeaders(timeoutException,
command.getDittoHeaders());
return CompletableFuture.failedStage(timeoutExceptionWithConnectivityHeaders);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.eclipse.ditto.base.model.common.Placeholders;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
Expand Down Expand Up @@ -83,10 +82,8 @@
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionEvent;

import akka.actor.AbstractActor;
Expand Down Expand Up @@ -257,10 +254,6 @@ private void sendBackResponses(final OutboundSignal.MultiMapped multiMapped, @Nu
}
}

private boolean isLiveResponse(final WithDittoHeaders response) {
return response.getDittoHeaders().getChannel().filter(TopicPath.Channel.LIVE.getName()::equals).isPresent();
}

/**
* Gets the converter from publisher exceptions to Acknowledgements.
* Override to handle client-specific exceptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.InfoProviderFactory;
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.models.acks.AcknowledgementAggregatorActor;
import org.eclipse.ditto.internal.models.acks.AcknowledgementAggregatorActorStarter;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.models.signal.CommandHeaderRestoration;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
Expand Down Expand Up @@ -443,7 +443,7 @@ private int dispatchIncomingSignal(final IncomingSignal incomingSignal) {
.orElse(acknowledgementConfig.getForwarderFallbackTimeout())
).thenApply(response -> {
if (response instanceof WithDittoHeaders) {
return AcknowledgementAggregatorActor.restoreCommandConnectivityHeaders(
return CommandHeaderRestoration.restoreCommandConnectivityHeaders(
(DittoHeadersSettable<?>) response,
originalHeaders);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private Consumer<MatchingValidationResult.Failure> getResponseValidationFailureC
() -> logger.withCorrelationId(failure.getCommand())
.warning("Discarding invalid response as connection ID of sender could not be determined.");
failure.getConnectionId()
.map(connectionId -> getAddConnectionLogEntry(connectionId, failure))
.map(connectionId -> getAddConnectionLogEntry(ConnectionId.of(connectionId), failure))
.ifPresentOrElse(addConnectionLogEntry, logMissingConnectionId);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayCommandTimeoutException;
import org.eclipse.ditto.connectivity.model.ConnectionIdInvalidException;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.models.signal.CommandHeaderRestoration;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.internal.models.signal.correlation.CommandAndCommandResponseMatchingValidator;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
Expand Down Expand Up @@ -397,25 +398,9 @@ private void completeAcknowledgements(@Nullable final CommandResponse<?> respons
getContext().stop(getSelf());
}

public static DittoHeadersSettable<?> restoreCommandConnectivityHeaders(final DittoHeadersSettable<?> signal,
final DittoHeaders requestCommandHeaders) {

final var signalDittoHeaders = signal.getDittoHeaders();
final var enhancedHeadersBuilder = signalDittoHeaders.toBuilder()
.removeHeader(DittoHeaderDefinition.EXPECTED_RESPONSE_TYPES.getKey())
.removeHeader(DittoHeaderDefinition.INBOUND_PAYLOAD_MAPPER.getKey())
.removeHeader(DittoHeaderDefinition.REPLY_TARGET.getKey());
if (requestCommandHeaders.containsKey(DittoHeaderDefinition.EXPECTED_RESPONSE_TYPES.getKey())) {
enhancedHeadersBuilder.expectedResponseTypes(requestCommandHeaders.getExpectedResponseTypes());
}
requestCommandHeaders.getInboundPayloadMapper().ifPresent(enhancedHeadersBuilder::inboundPayloadMapper);
requestCommandHeaders.getReplyTarget().ifPresent(enhancedHeadersBuilder::replyTarget);

return signal.setDittoHeaders(enhancedHeadersBuilder.build());
}

private void handleSignal(final DittoHeadersSettable<?> signal) {
responseSignalConsumer.accept(restoreCommandConnectivityHeaders(signal, originatingSignal.getDittoHeaders()));
responseSignalConsumer.accept(
CommandHeaderRestoration.restoreCommandConnectivityHeaders(signal, originatingSignal.getDittoHeaders()));
}

private static boolean containsOnlyTwinPersistedOrLiveResponse(final Acknowledgements aggregatedAcknowledgements) {
Expand Down Expand Up @@ -446,7 +431,7 @@ private static Duration getTimeout(final Signal<?> originatingSignal, final Dura
}

private enum Control {
WAITING_FOR_ACKS_TIMED_OUT;
WAITING_FOR_ACKS_TIMED_OUT
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.AcknowledgementCorrelationIdMissingException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
Expand Down Expand Up @@ -98,8 +98,8 @@ public Receive createReceive() {

private void forwardCommandResponse(final WithDittoHeaders acknowledgement) {
log.withCorrelationId(acknowledgement)
.debug("Received Acknowledgement / live CommandResponse, forwarding to original requester: " +
"<{}>", acknowledgement);
.debug("Received Acknowledgement / live CommandResponse, forwarding to original requester <{}>: " +
"<{}>", acknowledgementRequester, acknowledgement);
acknowledgementRequester.tell(acknowledgement, getSender());
}

Expand Down
5 changes: 0 additions & 5 deletions internal/models/signal/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@
<artifactId>ditto-messages-model</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-connectivity-model</artifactId>
</dependency>

<!-- Testing -->
<dependency>
<groupId>org.eclipse.ditto</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.models.signal;

import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersSettable;

/**
* Helpers for restoring command headers, e.g. for connectivity.
*/
public final class CommandHeaderRestoration {

private CommandHeaderRestoration() {
throw new AssertionError();
}

/**
* Restores the connectivity relevant headers for commands and/or command responses.
*
* @param signal the signal to adjust the headers in.
* @param headersToRestoreFrom the original headers to restore connectivity headers from.
* @return the potentially adjusted signal with restored connectivity headers.
*/
@SuppressWarnings("unchecked")
public static <T extends DittoHeadersSettable<?>> T restoreCommandConnectivityHeaders(
final DittoHeadersSettable<?> signal,
final DittoHeaders headersToRestoreFrom) {

final var signalDittoHeaders = signal.getDittoHeaders();
final var enhancedHeadersBuilder = signalDittoHeaders.toBuilder()
.removeHeader(DittoHeaderDefinition.EXPECTED_RESPONSE_TYPES.getKey())
.removeHeader(DittoHeaderDefinition.INBOUND_PAYLOAD_MAPPER.getKey())
.removeHeader(DittoHeaderDefinition.REPLY_TARGET.getKey());
if (headersToRestoreFrom.containsKey(DittoHeaderDefinition.EXPECTED_RESPONSE_TYPES.getKey())) {
enhancedHeadersBuilder.expectedResponseTypes(headersToRestoreFrom.getExpectedResponseTypes());
}
headersToRestoreFrom.getInboundPayloadMapper().ifPresent(enhancedHeadersBuilder::inboundPayloadMapper);
headersToRestoreFrom.getReplyTarget().ifPresent(enhancedHeadersBuilder::replyTarget);

return (T) signal.setDittoHeaders(enhancedHeadersBuilder.build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.connectivity.model.ConnectionId;

/**
* Represents the result of validating whether two particular signals are correlated to each other.
Expand Down Expand Up @@ -64,7 +65,7 @@ public static MatchingValidationResult success() {
* @param detailMessage the detail message of the failure.
* @return the instance.
* @throws NullPointerException if any argument is {@code null}.
* @throws org.eclipse.ditto.connectivity.model.ConnectionIdInvalidException if the {@code DittoHeaders} of
* @throws org.eclipse.ditto.base.model.entity.id.EntityIdInvalidException if the {@code DittoHeaders} of
* {@code commandResponse} contain an invalid value for {@link DittoHeaderDefinition#CONNECTION_ID}.
* @throws IllegalArgumentException if {@code detailMessage} is empty or blank.
*/
Expand Down Expand Up @@ -126,7 +127,7 @@ public static final class Failure extends MatchingValidationResult {

private final Command<?> command;
private final CommandResponse<?> commandResponse;
private final Optional<ConnectionId> connectionId;
@Nullable private final EntityId connectionId;
private final String detailMessage;

private Failure(final Command<?> command,
Expand All @@ -143,18 +144,18 @@ private Failure(final Command<?> command,
* Connection ID in DittoHeaders could be invalid as it gets not
* validated by DittoHeaders.
*/
connectionId = getConnectionId(commandResponse);
connectionId = getConnectionId(commandResponse).orElse(null);
this.detailMessage = checkArgument(checkNotNull(detailMessage, "detailMessage"),
Predicate.not(String::isBlank),
() -> "The detailMessage must not be blank.");
}

private static Optional<ConnectionId> getConnectionId(final CommandResponse<?> commandResponse) {
final Optional<ConnectionId> result;
private static Optional<EntityId> getConnectionId(final CommandResponse<?> commandResponse) {
final Optional<EntityId> result;
final var responseDittoHeaders = commandResponse.getDittoHeaders();
final var connectionIdString = responseDittoHeaders.get(DittoHeaderDefinition.CONNECTION_ID.getKey());
if (null != connectionIdString) {
result = Optional.of(ConnectionId.of(connectionIdString));
result = Optional.of(EntityId.of(EntityType.of("connection"), connectionIdString));
} else {
result = Optional.empty();
}
Expand Down Expand Up @@ -204,8 +205,8 @@ public CommandResponse<?> getCommandResponse() {
*
* @return the connection ID.
*/
public Optional<ConnectionId> getConnectionId() {
return connectionId;
public Optional<EntityId> getConnectionId() {
return Optional.ofNullable(connectionId);
}

@Override
Expand Down

0 comments on commit 3a0417c

Please sign in to comment.