Skip to content

Commit

Permalink
fixed that signal enrichment errors were not handled correctly as the…
Browse files Browse the repository at this point in the history
…y were wrapped in

a CompletionException

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jul 2, 2021
1 parent 32bbf7a commit b48865b
Showing 1 changed file with 42 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,6 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringConfig;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.connectivity.service.mapping.ConnectivitySignalEnrichmentProvider;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.DefaultConnectionMonitorRegistry;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.InfoProviderFactory;
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
Expand All @@ -57,35 +42,50 @@
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.ErrorResponse;
import org.eclipse.ditto.base.service.config.limits.DefaultLimitsConfig;
import org.eclipse.ditto.base.service.config.limits.LimitsConfig;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
import org.eclipse.ditto.connectivity.api.OutboundSignal.Mapped;
import org.eclipse.ditto.connectivity.api.OutboundSignalFactory;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.FilteredTopic;
import org.eclipse.ditto.connectivity.model.LogCategory;
import org.eclipse.ditto.connectivity.model.LogType;
import org.eclipse.ditto.connectivity.model.MetricDirection;
import org.eclipse.ditto.connectivity.model.MetricType;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.rql.query.criteria.Criteria;
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.rql.query.things.ThingPredicateVisitor;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.base.service.config.limits.DefaultLimitsConfig;
import org.eclipse.ditto.base.service.config.limits.LimitsConfig;
import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringConfig;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.connectivity.service.mapping.ConnectivitySignalEnrichmentProvider;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.DefaultConnectionMonitorRegistry;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.InfoProviderFactory;
import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
import org.eclipse.ditto.connectivity.api.OutboundSignal.Mapped;
import org.eclipse.ditto.connectivity.api.OutboundSignalFactory;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.akka.controlflow.AbstractGraphActor;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.ErrorResponse;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.query.criteria.Criteria;
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.rql.query.things.ThingPredicateVisitor;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.things.model.signals.events.ThingEventToThingConverter;

Expand Down Expand Up @@ -410,35 +410,33 @@ private static Optional<EntityId> extractEntityId(Signal<?> signal) {
private OutboundSignalWithSender recoverFromEnrichmentError(final OutboundSignalWithSender outboundSignal,
final Target target, final Throwable error) {

final var dittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(error, t ->
SignalEnrichmentFailedException.newBuilder()
.dittoHeaders(outboundSignal.getSource().getDittoHeaders())
.cause(t)
.build());
// show enrichment failure in the connection logs
logEnrichmentFailure(outboundSignal, error);
logEnrichmentFailure(outboundSignal, dittoRuntimeException);
// show enrichment failure in service logs according to severity
if (error instanceof ThingNotAccessibleException) {
if (dittoRuntimeException instanceof ThingNotAccessibleException) {
// This error should be rare but possible due to user action; log on INFO level
dittoLoggingAdapter.withCorrelationId(outboundSignal.getSource()).info("Enrichment of <{}> failed due to <{}>.",
outboundSignal.getSource().getClass(), error);
outboundSignal.getSource().getClass(), dittoRuntimeException);
} else {
// This error should not have happened during normal operation.
// There is a (possibly transient) problem with the Ditto cluster. Request parent to restart.
dittoLoggingAdapter.withCorrelationId(outboundSignal.getSource())
.error("Enrichment of <{}> failed due to <{}>.", outboundSignal, error);
.error(dittoRuntimeException, "Enrichment of <{}> failed due to <{}>.", outboundSignal, dittoRuntimeException);
final ConnectionFailure connectionFailure =
new ImmutableConnectionFailure(getSelf(), error, "Signal enrichment failed");
new ImmutableConnectionFailure(getSelf(), dittoRuntimeException, "Signal enrichment failed");
clientActor.tell(connectionFailure, getSelf());
}
return outboundSignal.setTargets(Collections.singletonList(target));
}

private void logEnrichmentFailure(final OutboundSignal outboundSignal, final Throwable error) {
private void logEnrichmentFailure(final OutboundSignal outboundSignal, final DittoRuntimeException error) {

final DittoRuntimeException errorToLog;
if (error instanceof DittoRuntimeException) {
errorToLog = SignalEnrichmentFailedException.dueTo((DittoRuntimeException) error);
} else {
errorToLog = SignalEnrichmentFailedException.newBuilder()
.dittoHeaders(outboundSignal.getSource().getDittoHeaders())
.build();
}
final DittoRuntimeException errorToLog = SignalEnrichmentFailedException.dueTo(error);
getMonitorsForMappedSignal(outboundSignal)
.forEach(monitor -> monitor.failure(outboundSignal.getSource(), errorToLog));
}
Expand Down

0 comments on commit b48865b

Please sign in to comment.