Skip to content

Commit

Permalink
Issue #106: Handle IllegalAdaptableException for live responses via C…
Browse files Browse the repository at this point in the history
…onnectivity.

* Adjusted InboundDispatchingSink to handle IllegalAdaptableExceptions differently.
* Had to extend IllegalAdaptableException to convey signal type, too, in order to check whether the exception is about a response.
* Setting the signal type to IllegalAdaptableException made it necessary to adjust related mapping strategies as well.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Dec 20, 2021
1 parent fb9f818 commit ac832bd
Show file tree
Hide file tree
Showing 18 changed files with 1,491 additions and 625 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,28 @@
*/
package org.eclipse.ditto.connectivity.api.messaging.monitoring.logs;

import static org.eclipse.ditto.internal.models.signal.SignalInformationPoint.getCorrelationId;
import static org.eclipse.ditto.internal.models.signal.SignalInformationPoint.getEntityId;

import java.time.Instant;
import java.util.Optional;
import java.util.function.Predicate;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.LogCategory;
import org.eclipse.ditto.connectivity.model.LogEntry;
import org.eclipse.ditto.connectivity.model.LogLevel;
import org.eclipse.ditto.connectivity.model.LogType;
import org.eclipse.ditto.internal.models.signal.SignalInformationPoint;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.mappingstrategies.IllegalAdaptableException;
import org.eclipse.ditto.things.model.ThingId;

/**
* Factory for creating instances of {@link LogEntry}.
Expand All @@ -49,30 +55,91 @@ private LogEntryFactory() {
* @param commandResponse the response of the round-trip.
* @param detailMessage describes the reason for the failed round-trip.
* @throws NullPointerException if any argument is {@code null}.
* @throws IllegalArgumentException if {@code detailMessage} is blank.
* @throws IllegalArgumentException if {@code detailMessage} is empty or blank.
*/
public static LogEntry getLogEntryForFailedCommandResponseRoundTrip(final Command<?> command,
final CommandResponse<?> commandResponse,
final String detailMessage) {

ConditionChecker.checkNotNull(command, "command");
ConditionChecker.checkNotNull(commandResponse, "commandResponse");
ConditionChecker.checkArgument(ConditionChecker.checkNotNull(detailMessage, "detailMessage"),
Predicate.not(String::isBlank),
() -> "The detailMessage must not be blank.");

final var logEntryBuilder = ConnectivityModelFactory.newLogEntryBuilder(
getCorrelationId(command).or(() -> getCorrelationId(commandResponse)).orElse("n/a"),
Instant.now(),
LogCategory.RESPONSE,
LogType.DROPPED,
LogLevel.FAILURE,
detailMessage
validateDetailMessage(detailMessage)
);

getEntityId(command).or(() -> getEntityId(commandResponse)).ifPresent(logEntryBuilder::entityId);

return logEntryBuilder.build();
}

private static String validateDetailMessage(final String detailMessage) {
return ConditionChecker.checkArgument(ConditionChecker.checkNotNull(detailMessage, "detailMessage"),
Predicate.not(String::isBlank),
() -> "The detailMessage must not be blank.");
}

private static Optional<String> getCorrelationId(final WithDittoHeaders withDittoHeaders) {
final var dittoHeaders = withDittoHeaders.getDittoHeaders();
return dittoHeaders.getCorrelationId();
}

private static Optional<EntityId> getEntityId(final Signal<?> signal) {
return SignalInformationPoint.getEntityId(signal);
}

/**
* Returns a {@code LogEntry} for an invalid {@code CommandResponse} which led to
* an {@code IllegalAdaptableException}.
* The failure is described by the specified detail message string argument.
*
* @param illegalAdaptableException the exception that indicates that an {@code Adaptable} of a
* {@code CommandResponse} was invalid for some reason.
* @param detailMessage describes why the {@code CommandResponse} {@code Adaptable} is illegal.
* @throws NullPointerException if {@code illegalAdaptableException} is {@code null}.
* @throws IllegalArgumentException if {@code detailMessage} is empty or blank.
*/
public static LogEntry getLogEntryForIllegalCommandResponseAdaptable(
final IllegalAdaptableException illegalAdaptableException,
final String detailMessage
) {
ConditionChecker.checkNotNull(illegalAdaptableException, "illegalAdaptableException");

final var logEntryBuilder = ConnectivityModelFactory.newLogEntryBuilder(
getCorrelationId(illegalAdaptableException).orElse("n/a"),
Instant.now(),
LogCategory.RESPONSE,
LogType.DROPPED,
LogLevel.FAILURE,
validateDetailMessage(detailMessage)
);

getEntityId(illegalAdaptableException).ifPresent(logEntryBuilder::entityId);

return logEntryBuilder.build();
}

private static Optional<EntityId> getEntityId(final IllegalAdaptableException illegalAdaptableException) {
final var adaptable = illegalAdaptableException.getAdaptable();
return getEntityIdFromTopicPath(adaptable.getTopicPath());
}

private static Optional<EntityId> getEntityIdFromTopicPath(final TopicPath topicPath) {
final Optional<EntityId> result;
final var group = topicPath.getGroup();
if (TopicPath.Group.THINGS == group) {
result = Optional.of(ThingId.of(topicPath.getNamespace(), topicPath.getEntityName()));
} else if (TopicPath.Group.POLICIES == group) {
result = Optional.of(PolicyId.of(topicPath.getNamespace(), topicPath.getEntityName()));
} else {
result = Optional.empty();
}
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.Instant;

import org.assertj.core.api.SoftAssertions;
import org.eclipse.ditto.base.model.correlationid.TestNameCorrelationId;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
Expand All @@ -29,6 +30,10 @@
import org.eclipse.ditto.connectivity.model.LogCategory;
import org.eclipse.ditto.connectivity.model.LogLevel;
import org.eclipse.ditto.connectivity.model.LogType;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.mappingstrategies.IllegalAdaptableException;
import org.eclipse.ditto.things.model.ThingId;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -54,6 +59,9 @@ public final class LogEntryFactoryTest {
@Mock
private CommandResponse<?> commandResponse;

@Mock
private Adaptable adaptable;

private DittoHeaders dittoHeadersWithCorrelationId;

@Before
Expand All @@ -64,6 +72,7 @@ public void before() {
final var emptyDittoHeaders = DittoHeaders.empty();
Mockito.when(command.getDittoHeaders()).thenReturn(emptyDittoHeaders);
Mockito.when(commandResponse.getDittoHeaders()).thenReturn(emptyDittoHeaders);
Mockito.when(adaptable.getDittoHeaders()).thenReturn(dittoHeadersWithCorrelationId);
}

@Test
Expand Down Expand Up @@ -234,4 +243,59 @@ public void getLogEntryForFailedCommandResponseRoundTripReturnsLogEntryWithEntit
assertThat(logEntry.getEntityId()).hasValue(entityId);
}

@Test
public void getLogEntryForIllegalCommandResponseAdaptableWithNullIllegalAdaptableExceptionThrowsException() {
assertThatNullPointerException()
.isThrownBy(() -> LogEntryFactory.getLogEntryForIllegalCommandResponseAdaptable(null,
DETAIL_MESSAGE_FAILURE))
.withMessage("The illegalAdaptableException must not be null!")
.withNoCause();
}

@Test
public void getLogEntryForIllegalCommandResponseAdaptableWithNullDetailMessageThrowsException() {
final var illegalAdaptableException = IllegalAdaptableException.newInstance(DETAIL_MESSAGE_FAILURE, adaptable);

assertThatNullPointerException()
.isThrownBy(() -> LogEntryFactory.getLogEntryForIllegalCommandResponseAdaptable(
illegalAdaptableException,
null))
.withMessage("The detailMessage must not be null!")
.withNoCause();
}

@Test
public void getLogEntryForIllegalCommandResponseAdaptableWithBlankDetailMessageThrowsException() {
final var illegalAdaptableException = IllegalAdaptableException.newInstance(DETAIL_MESSAGE_FAILURE, adaptable);

assertThatIllegalArgumentException()
.isThrownBy(() -> LogEntryFactory.getLogEntryForIllegalCommandResponseAdaptable(
illegalAdaptableException,
" "))
.withMessage("The detailMessage must not be blank.")
.withNoCause();
}

@Test
public void getLogEntryForIllegalCommandResponseAdaptableReturnsExpected() {
final var thingId = ThingId.generateRandom();
final var topicPath = TopicPath.newBuilder(thingId).live().things().commands().modify().build();
Mockito.when(adaptable.getTopicPath()).thenReturn(topicPath);
final var illegalAdaptableException = IllegalAdaptableException.newInstance(DETAIL_MESSAGE_FAILURE, adaptable);
final var logEntry = LogEntryFactory.getLogEntryForIllegalCommandResponseAdaptable(illegalAdaptableException,
illegalAdaptableException.getMessage());

final var instantNow = Instant.now();
final var softly = new SoftAssertions();
softly.assertThat(logEntry.getCorrelationId())
.as("correlation ID")
.isEqualTo(String.valueOf(testNameCorrelationId.getCorrelationId()));
softly.assertThat(logEntry.getTimestamp()).as("timestamp").isBetween(instantNow.minusMillis(500L), instantNow);
softly.assertThat(logEntry.getLogCategory()).as("log category").isEqualTo(LogCategory.RESPONSE);
softly.assertThat(logEntry.getLogType()).as("log type").isEqualTo(LogType.DROPPED);
softly.assertThat(logEntry.getLogLevel()).as("log level").isEqualTo(LogLevel.FAILURE);
softly.assertThat(logEntry.getEntityId()).as("entity ID").contains(thingId);
softly.assertAll();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.cluster.pubsub.DistributedPubSub;
import akka.dispatch.MessageDispatcher;
import akka.http.javadsl.ConnectionContext;
import akka.japi.Pair;
import akka.japi.pf.DeciderBuilder;
Expand Down Expand Up @@ -1757,16 +1756,13 @@ private Pair<ActorRef, ActorRef> startOutboundActors(final ProtocolAdapter proto
* @throws DittoRuntimeException when mapping processor could not get started.
*/
private Sink<Object, NotUsed> getInboundDispatchingSink(final ActorRef outboundMappingProcessorActor) {
final var actorContext = getContext();
final var actorSystem = actorContext.getSystem();

return InboundDispatchingSink.createSink(connection,
protocolAdapter.headerTranslator(),
proxyActorSelection,
connectionActor,
outboundMappingProcessorActor,
getSelf(),
actorContext,
getContext(),
connectivityConfig,
getResponseValidationFailureConsumer());
}
Expand All @@ -1787,16 +1783,16 @@ private Consumer<MatchingValidationResult.Failure> getResponseValidationFailureC
* @return the Sink.
* @throws DittoRuntimeException when mapping processor could not get started.
*/
private Sink<Object, NotUsed> getInboundMappingSink(
final ProtocolAdapter protocolAdapter,
private Sink<Object, NotUsed> getInboundMappingSink(final ProtocolAdapter protocolAdapter,
final Sink<Object, NotUsed> inboundDispatchingSink) {

final InboundMappingProcessor inboundMappingProcessor;
final var context = getContext();
final var actorSystem = context.getSystem();
try {
// this one throws DittoRuntimeExceptions when the mapper could not be configured
inboundMappingProcessor =
InboundMappingProcessor.of(connection, connectivityConfig, getContext().getSystem(),
protocolAdapter, logger);
InboundMappingProcessor.of(connection, connectivityConfig, actorSystem, protocolAdapter, logger);
} catch (final DittoRuntimeException dre) {
connectionLogger.failure("Failed to start message mapping processor due to: {0}", dre.getMessage());
logger.info("Got DittoRuntimeException during initialization of MessageMappingProcessor: {} {} - desc: {}",
Expand All @@ -1807,16 +1803,13 @@ private Sink<Object, NotUsed> getInboundMappingSink(
final int processorPoolSize = connection.getProcessorPoolSize();
logger.debug("Starting inbound mapping processor actors with pool size of <{}>.", processorPoolSize);

final var mappingConfig = connectivityConfig().getMappingConfig();
final MessageDispatcher messageMappingProcessorDispatcher =
getContext().system().dispatchers().lookup(MESSAGE_MAPPING_PROCESSOR_DISPATCHER);
return InboundMappingSink.createSink(inboundMappingProcessor,
connection.getId(),
processorPoolSize,
inboundDispatchingSink,
mappingConfig,
connectivityConfig.getMappingConfig(),
getThrottlingConfig().orElse(null),
messageMappingProcessorDispatcher);
actorSystem.dispatchers().lookup(MESSAGE_MAPPING_PROCESSOR_DISPATCHER));
}

protected Optional<ConnectionThrottlingConfig> getThrottlingConfig() {
Expand Down

0 comments on commit ac832bd

Please sign in to comment.