Skip to content

Commit

Permalink
Issue #106: Moved command-response-round-trip validation to 'Acknowle…
Browse files Browse the repository at this point in the history
…dgementAggregationActor'.

This was necessary to avoid unpredictable behaviour regarding timeouts.
To prevent unnecessary dependencies, actual handling of validation failures is delegated to a Consumer.
Extended 'MatchingValidationResult.Failure' to contain all information required for meaningful handling of validation failures.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Nov 17, 2021
1 parent a6aecc6 commit ad49942
Show file tree
Hide file tree
Showing 18 changed files with 1,039 additions and 506 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -61,6 +62,7 @@
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.api.InboundSignal;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
import org.eclipse.ditto.connectivity.api.messaging.monitoring.logs.LogEntryFactory;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
Expand Down Expand Up @@ -106,6 +108,7 @@
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState;
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
Expand Down Expand Up @@ -326,7 +329,7 @@ protected void init() {
final var actorPair = startOutboundActors(protocolAdapter);
outboundDispatchingActor = actorPair.first();

final var inboundDispatchingSink = getInboundDispatchingSink(connection, protocolAdapter, actorPair.second());
final var inboundDispatchingSink = getInboundDispatchingSink(actorPair.second());
inboundMappingSink = getInboundMappingSink(protocolAdapter, inboundDispatchingSink);
subscriptionManager = startSubscriptionManager(proxyActorSelection, connectivityConfig().getClientConfig());

Expand Down Expand Up @@ -1703,8 +1706,7 @@ private CompletionStage<Status.Status> tryToConfigureMessageMappingProcessor() {
return CompletableFuture.completedFuture(new Status.Success("mapping"));
}

private Pair<ActorRef, ActorRef> startOutboundActors(
final ProtocolAdapter protocolAdapter) {
private Pair<ActorRef, ActorRef> startOutboundActors(final ProtocolAdapter protocolAdapter) {
final OutboundMappingSettings settings;
final OutboundMappingProcessor outboundMappingProcessor;
try {
Expand Down Expand Up @@ -1741,19 +1743,27 @@ private Pair<ActorRef, ActorRef> startOutboundActors(
* @return the ref to the started {@link InboundDispatchingSink}
* @throws DittoRuntimeException when mapping processor could not get started.
*/
private Sink<Object, NotUsed> getInboundDispatchingSink(final Connection connection,
final ProtocolAdapter protocolAdapter,
final ActorRef outboundMappingProcessorActor) {

private Sink<Object, NotUsed> getInboundDispatchingSink(final ActorRef outboundMappingProcessorActor) {
final var actorContext = getContext();
final var actorSystem = actorContext.getSystem();

return InboundDispatchingSink.createSink(connection,
protocolAdapter.headerTranslator(),
proxyActorSelection,
connectionActor,
outboundMappingProcessorActor,
getSelf(),
actorContext,
actorContext.system().settings().config());
actorSystem.settings().config(),
getResponseValidationFailureConsumer());
}

private Consumer<MatchingValidationResult.Failure> getResponseValidationFailureConsumer() {
return failure -> connectionLogger.logEntry(
LogEntryFactory.getLogEntryForFailedCommandResponseRoundTrip(failure.getCommand(),
failure.getCommandResponse(),
failure.getDetailMessage())
);
}

/**
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@
import javax.annotation.concurrent.NotThreadSafe;

import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.UnsupportedSignalException;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.connectivity.api.messaging.monitoring.logs.LogEntryFactory;
import org.eclipse.ditto.connectivity.model.ConnectionIdInvalidException;
import org.eclipse.ditto.connectivity.model.LogEntry;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.internal.models.signal.correlation.CommandAndCommandResponseMatchingValidator;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;

/**
* Validates that a specified {@link Command} and {@link CommandResponse} are associated with each other, i.e. that the
Expand All @@ -47,6 +53,8 @@
@NotThreadSafe
final class HttpPushRoundTripSignalsValidator implements BiConsumer<Command<?>, CommandResponse<?>> {

private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(HttpPushRoundTripSignalsValidator.class);

private final ConnectionLogger connectionLogger;
private final CommandAndCommandResponseMatchingValidator validator;

Expand All @@ -61,16 +69,38 @@ static HttpPushRoundTripSignalsValidator newInstance(final ConnectionLogger conn

@Override
public void accept(final Command<?> command, final CommandResponse<?> commandResponse) {
final var validationResult = validator.apply(command, commandResponse);
final var validationResult = tryToValidate(command, commandResponse);
if (!validationResult.isSuccess()) {
final var detailMessage = validationResult.getDetailMessageOrThrow();
final var failure = validationResult.asFailureOrThrow();
final var detailMessage = failure.getDetailMessage();
connectionLogger.logEntry(LogEntryFactory.getLogEntryForFailedCommandResponseRoundTrip(command,
commandResponse,
detailMessage));
throw newUnsupportedSignalException(command, commandResponse, detailMessage);
}
}

private MatchingValidationResult tryToValidate(final Command<?> command, final CommandResponse<?> commandResponse) {
try {
return validator.apply(command, commandResponse);
} catch (final ConnectionIdInvalidException e) {

/*
* In this case an invalid connection ID is no problem at all.
* As the ConnectionLogger is already known, a log entry can be
* directly added.
* Besides, an invalid connection ID in response headers is very
* unlikely as it gets set by Ditto itself.
*/
LOGGER.withCorrelationId(command)
.warn("Headers of command response contain an invalid connection ID: {}", e.getMessage());
final var headersWithoutConnectionId = DittoHeaders.newBuilder(commandResponse.getDittoHeaders())
.removeHeader(DittoHeaderDefinition.CONNECTION_ID.getKey())
.build();
return validator.apply(command, commandResponse.setDittoHeaders(headersWithoutConnectionId));
}
}

private static UnsupportedSignalException newUnsupportedSignalException(final Command<?> command,
final CommandResponse<?> commandResponse,
final String detailMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ protected void stopConsumerActor(final ActorRef underTest) {
// implement in subclasses if required
}


protected Sink<Object, NotUsed> setupInboundMappingSink(final ActorRef clientActor, final ActorRef proxyActor) {
final Map<String, MappingContext> mappings = new HashMap<>();
mappings.put("ditto", DittoMessageMapper.CONTEXT);
Expand Down Expand Up @@ -256,13 +255,22 @@ protected Sink<Object, NotUsed> setupInboundMappingSink(final ActorRef clientAct
final ActorRef outboundProcessorActor = actorSystem.actorOf(props,
OutboundMappingProcessorActor.ACTOR_NAME + "-" + name.getMethodName());

final Sink<Object, NotUsed> inboundDispatchingSink =
InboundDispatchingSink.createSink(CONNECTION, protocolAdapter.headerTranslator(),
ActorSelection.apply(proxyActor, ""), connectionActorProbe.ref(), outboundProcessorActor,
TestProbe.apply(actorSystem).ref(), actorSystem, actorSystem.settings().config());

return InboundMappingSink.createSink(inboundMappingProcessor, CONNECTION_ID, 99,
inboundDispatchingSink, connectivityConfig.getMappingConfig(), null,
final Sink<Object, NotUsed> inboundDispatchingSink = InboundDispatchingSink.createSink(CONNECTION,
protocolAdapter.headerTranslator(),
ActorSelection.apply(proxyActor, ""),
connectionActorProbe.ref(),
outboundProcessorActor,
TestProbe.apply(actorSystem).ref(),
actorSystem,
actorSystem.settings().config(),
null);

return InboundMappingSink.createSink(inboundMappingProcessor,
CONNECTION_ID,
99,
inboundDispatchingSink,
connectivityConfig.getMappingConfig(),
null,
actorSystem.dispatchers().defaultGlobalDispatcher());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -34,9 +33,11 @@
import org.eclipse.ditto.base.model.auth.AuthorizationModelFactory;
import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
import org.eclipse.ditto.base.model.common.ResponseType;
import org.eclipse.ditto.base.model.correlationid.TestNameCorrelationId;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.contenttype.ContentType;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
Expand Down Expand Up @@ -70,19 +71,18 @@
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyAttributeResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.mockito.Mockito;

import com.typesafe.config.ConfigValueFactory;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.scaladsl.Source;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
Expand Down Expand Up @@ -118,6 +118,9 @@ public abstract class AbstractMessageMappingProcessorActorTest {
.set("qos", "{{ header:qos }}")
.build());

@Rule
public final TestNameCorrelationId testNameCorrelationId = TestNameCorrelationId.newInstance();

ActorSystem actorSystem;
ProtocolAdapterProvider protocolAdapterProvider;
TestProbe connectionActorProbe;
Expand Down Expand Up @@ -328,13 +331,13 @@ ActorRef createInboundMappingProcessorActor(final TestKit kit, final ActorRef ou
ActorRef createInboundMappingProcessorActor(final ActorRef proxyActor,
final ActorRef outboundMappingProcessorActor,
final TestKit testKit) {

final Map<String, MappingContext> mappingDefinitions = new HashMap<>();
mappingDefinitions.put(FAULTY_MAPPER, FaultyMessageMapper.CONTEXT);
mappingDefinitions.put(ADD_HEADER_MAPPER, AddHeaderMessageMapper.CONTEXT);
mappingDefinitions.put(DUPLICATING_MAPPER, DuplicatingMessageMapper.CONTEXT);
final PayloadMappingDefinition payloadMappingDefinition =
ConnectivityModelFactory.newPayloadMappingDefinition(mappingDefinitions);
final ThreadSafeDittoLoggingAdapter logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
final var payloadMappingDefinition = ConnectivityModelFactory.newPayloadMappingDefinition(mappingDefinitions);
final var logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class);
Mockito.when(logger.withCorrelationId(Mockito.any(DittoHeaders.class)))
.thenReturn(logger);
Mockito.when(logger.withCorrelationId(Mockito.nullable(CharSequence.class)))
Expand All @@ -343,22 +346,31 @@ ActorRef createInboundMappingProcessorActor(final ActorRef proxyActor,
.thenReturn(logger);
Mockito.when(logger.withMdcEntry(Mockito.any(CharSequence.class), Mockito.nullable(CharSequence.class)))
.thenReturn(logger);
final ProtocolAdapter protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null);
final InboundMappingProcessor inboundMappingProcessor = InboundMappingProcessor.of(
final var protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null);
final var inboundMappingProcessor = InboundMappingProcessor.of(
CONNECTION.toBuilder().payloadMappingDefinition(payloadMappingDefinition).build(),
TestConstants.CONNECTIVITY_CONFIG,
actorSystem,
protocolAdapter,
logger);
final Sink<Object, NotUsed> inboundDispatchingSink =
InboundDispatchingSink.createSink(CONNECTION, protocolAdapter.headerTranslator(),
ActorSelection.apply(proxyActor, ""), connectionActorProbe.ref(), outboundMappingProcessorActor,
testKit.getRef(), actorSystem, actorSystem.settings().config());
logger
);
final var inboundDispatchingSink = InboundDispatchingSink.createSink(CONNECTION,
protocolAdapter.headerTranslator(),
ActorSelection.apply(proxyActor, ""),
connectionActorProbe.ref(),
outboundMappingProcessorActor,
testKit.getRef(),
actorSystem,
actorSystem.settings().config(),
null);

final Sink<Object, NotUsed> inboundMappingSink =
InboundMappingSink.createSink(inboundMappingProcessor, CONNECTION_ID, 99,
inboundDispatchingSink, TestConstants.CONNECTIVITY_CONFIG.getMappingConfig(), null,
actorSystem.dispatchers().defaultGlobalDispatcher());
final var inboundMappingSink = InboundMappingSink.createSink(inboundMappingProcessor,
CONNECTION_ID,
99,
inboundDispatchingSink,
TestConstants.CONNECTIVITY_CONFIG.getMappingConfig(),
null,
actorSystem.dispatchers().defaultGlobalDispatcher());

return Source.actorRef(99, OverflowStrategy.dropNew())
.to(inboundMappingSink)
Expand Down Expand Up @@ -389,10 +401,8 @@ void setUpProxyActor(final ActorRef recipient) {
}

ExternalMessage toExternalMessage(final Signal<?> signal, Consumer<SourceBuilder<?>> sourceModifier) {
final AuthorizationContext context =
AuthorizationModelFactory.newAuthContext(
DittoAuthorizationContextType.UNSPECIFIED,
AuthorizationModelFactory.newAuthSubject("ditto:ditto"));
final var context = AuthorizationModelFactory.newAuthContext(DittoAuthorizationContextType.UNSPECIFIED,
AuthorizationModelFactory.newAuthSubject("ditto:ditto"));
final JsonifiableAdaptable adaptable = ProtocolFactory
.wrapAsJsonifiableAdaptable(DITTO_PROTOCOL_ADAPTER.toAdaptable(signal));
final SourceBuilder<?> sourceBuilder = ConnectivityModelFactory.newSourceBuilder()
Expand Down Expand Up @@ -421,12 +431,14 @@ ExternalMessage toExternalMessage(final Signal<?> signal) {
});
}

static ModifyAttribute createModifyAttributeCommand() {
final Map<String, String> headers = new HashMap<>();
final String correlationId = UUID.randomUUID().toString();
headers.put("correlation-id", correlationId);
headers.put("content-type", "application/json");
return ModifyAttribute.of(KNOWN_THING_ID, JsonPointer.of("foo"), JsonValue.of(42), DittoHeaders.of(headers));
private ModifyAttribute createModifyAttributeCommand() {
return ModifyAttribute.of(KNOWN_THING_ID,
JsonPointer.of("foo"),
JsonValue.of(42),
DittoHeaders.newBuilder()
.correlationId(testNameCorrelationId.getCorrelationId())
.contentType(ContentType.APPLICATION_JSON)
.build());
}

static ThreadSafeDittoLoggingAdapter mockLoggingAdapter() {
Expand Down Expand Up @@ -465,4 +477,5 @@ public Optional<String> resolve(final String placeholderSource, final String nam
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
Expand All @@ -94,7 +93,6 @@
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.MessageDispatcher;
import akka.stream.javadsl.Sink;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
Expand Down Expand Up @@ -321,21 +319,28 @@ private Sink<Object, NotUsed> setupMappingSink(final ActorRef testRef,
AbstractConsumerActorTest.actorSystem,
protocolAdapter,
logger);
final Sink<Object, NotUsed> inboundDispatchingSink =
InboundDispatchingSink.createSink(CONNECTION, protocolAdapter.headerTranslator(),
ActorSelection.apply(testRef, ""), connectionActorProbe.ref(),
testRef,
TestProbe.apply(actorSystem).ref(), actorSystem, actorSystem.settings().config());

final MessageDispatcher messageDispatcher = actorSystem.dispatchers().defaultGlobalDispatcher();
return InboundMappingSink.createSink(inboundMappingProcessor, CONNECTION_ID, 99, inboundDispatchingSink,
TestConstants.MAPPING_CONFIG, null, messageDispatcher);
final var inboundDispatchingSink = InboundDispatchingSink.createSink(CONNECTION,
protocolAdapter.headerTranslator(),
ActorSelection.apply(testRef, ""),
connectionActorProbe.ref(),
testRef,
TestProbe.apply(actorSystem).ref(),
actorSystem,
actorSystem.settings().config(),
null);

return InboundMappingSink.createSink(inboundMappingProcessor,
CONNECTION_ID,
99,
inboundDispatchingSink,
TestConstants.MAPPING_CONFIG,
null,
actorSystem.dispatchers().defaultGlobalDispatcher());
}

@Test
public void jmsMessageWithNullPropertyAndNullContentTypeTest() throws JMSException {
new TestKit(actorSystem) {{

final ActorRef testActor = getTestActor();
final Sink<Object, NotUsed> mappingSink = setupMappingSink(testActor, null, actorSystem);

Expand Down

0 comments on commit ad49942

Please sign in to comment.