Skip to content

Commit

Permalink
Issue failed acks for failed enrichments
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Feb 22, 2022
1 parent 09e297a commit 52ec182
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 18 deletions.
Expand Up @@ -204,6 +204,32 @@ public static void issueWeakAcknowledgements(final Signal<?> signal,
}
}

private static void issueFailedAcknowledgements(final Signal<?> signal,
final Predicate<AcknowledgementLabel> isFailedAckLabel,
final DittoRuntimeException dre,
final ActorRef sender) {

final Set<AcknowledgementRequest> requestedAcks = signal.getDittoHeaders().getAcknowledgementRequests();
final boolean customAckRequested = requestedAcks.stream()
.anyMatch(request -> !DittoAcknowledgementLabel.contains(request.getLabel()));

final Optional<EntityId> entityIdWithType = extractEntityId(signal);
if (customAckRequested && entityIdWithType.isPresent()) {
final List<AcknowledgementLabel> failedAckLabels = requestedAcks.stream()
.map(AcknowledgementRequest::getLabel)
.filter(isFailedAckLabel)
.collect(Collectors.toList());
if (!failedAckLabels.isEmpty()) {
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final List<Acknowledgement> ackList = failedAckLabels.stream()
.map(label -> failedAck(label, entityIdWithType.get(), dittoHeaders, dre))
.collect(Collectors.toList());
final Acknowledgements failedAcks = Acknowledgements.of(ackList, dittoHeaders);
sender.tell(failedAcks, ActorRef.noSender());
}
}
}

private int determinePoolSize(final int connectionPoolSize, final int maxPoolSize) {
if (connectionPoolSize > maxPoolSize) {
dittoLoggingAdapter.info("Configured pool size <{}> is greater than the configured max pool size <{}>." +
Expand Down Expand Up @@ -403,20 +429,19 @@ private CompletionStage<Collection<OutboundSignalWithSender>> enrichAndFilterSig
.warning("Could not retrieve extra data due to: {} {}", error.getClass().getSimpleName(),
error.getMessage());
// recover from all errors to keep message-mapping-stream running despite enrichment failures
return Collections.singletonList(recoverFromEnrichmentError(outboundSignal, target, error));
return recoverFromEnrichmentError(outboundSignal, target, error);
});
}

private static Optional<EntityId> extractEntityId(Signal<?> signal) {
private static Optional<EntityId> extractEntityId(final Signal<?> signal) {
return Optional.of(signal)
.filter(WithEntityId.class::isInstance)
.map(WithEntityId.class::cast)
.map(WithEntityId::getEntityId);
}

// Called inside future; must be thread-safe
@Nullable
private OutboundSignalWithSender recoverFromEnrichmentError(final OutboundSignalWithSender outboundSignal,
private List<OutboundSignalWithSender> recoverFromEnrichmentError(final OutboundSignalWithSender outboundSignal,
final Target target, final Throwable error) {

final var dittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(error, t ->
Expand All @@ -443,9 +468,9 @@ private OutboundSignalWithSender recoverFromEnrichmentError(final OutboundSignal
clientActor.tell(connectionFailure, getSelf());
}
if (mappingConfig.getPublishFailedEnrichments()) {
return outboundSignal.setTargets(Collections.singletonList(target));
return Collections.singletonList(outboundSignal.setTargets(Collections.singletonList(target)));
} else {
return null;
return Collections.singletonList(outboundSignal.setFailedEnrichment(dittoRuntimeException, target));
}
}

Expand Down Expand Up @@ -629,17 +654,19 @@ private <T> CompletionStage<Collection<OutboundSignal.MultiMapped>> toMultiMappe
return List.of();
} else {
final ActorRef sender = outboundSignals.get(0).sender;
final List<Mapped> mappedSignals = outboundSignals.stream()
.map(OutboundSignalWithSender::asMapped)
.collect(Collectors.toList());
final List<Target> targetsToPublishAt = outboundSignals.stream()
.map(OutboundSignal::getTargets)
.flatMap(List::stream)
.collect(Collectors.toList());
final Predicate<AcknowledgementLabel> willPublish =
ConnectionValidator.getTargetIssuedAcknowledgementLabels(connection.getId(),
targetsToPublishAt)
targetsToPublishAt)
.collect(Collectors.toSet())::contains;
final var signalsWithoutEnrichmentFailures =
filterFailedEnrichments(outboundSignals, willPublish);
final List<Mapped> mappedSignals = signalsWithoutEnrichmentFailures
.map(OutboundSignalWithSender::asMapped)
.collect(Collectors.toList());
issueWeakAcknowledgements(outbound.getSource(),
willPublish.negate().and(outboundMappingProcessor::isTargetIssuedAck),
sender);
Expand All @@ -648,6 +675,29 @@ private <T> CompletionStage<Collection<OutboundSignal.MultiMapped>> toMultiMappe
});
}

private static Stream<OutboundSignalWithSender> filterFailedEnrichments(
final Collection<OutboundSignalWithSender> signals,
final Predicate<AcknowledgementLabel> predicate) {

return signals.stream().filter(signal -> {
if (null != signal.enrichmentFailure) {
final var optionalAcknowledgementLabel = signal.getTargets()
.get(signal.getTargets().indexOf(signal.enrichmentFailure.second()))
.getIssuedAcknowledgementLabel();
if (optionalAcknowledgementLabel.isPresent()) {
final Predicate<AcknowledgementLabel> perTargetPredicate =
optionalAcknowledgementLabel.get()::equals;
final var combinedPredicate = predicate.and(perTargetPredicate);
issueFailedAcknowledgements(signal.getSource(), combinedPredicate, signal.enrichmentFailure.first(),
signal.sender);
}
return false;
} else {
return true;
}
});
}

private Collection<OutboundSignalWithSender> applyFilter(final OutboundSignalWithSender outboundSignalWithExtra,
final FilteredTopic filteredTopic) {

Expand Down Expand Up @@ -745,31 +795,44 @@ private static Acknowledgement weakAck(final AcknowledgementLabel label,
return Acknowledgement.weak(label, entityId, dittoHeaders, payload);
}

private static Acknowledgement failedAck(final AcknowledgementLabel label,
final EntityId entityId,
final DittoHeaders dittoHeaders,
final DittoRuntimeException dre) {
final JsonValue payload = JsonValue.of("Acknowledgement was issued automatically as failed ack, " +
"because the signal enrichment failed: " + dre.getMessage());
return Acknowledgement.of(label, entityId, dre.getHttpStatus(), dittoHeaders, payload);
}

static final class OutboundSignalWithSender implements OutboundSignal {

private final OutboundSignal delegate;
private final ActorRef sender;

@Nullable
private final Pair<DittoRuntimeException, Target> enrichmentFailure;
@Nullable
private final JsonObject extra;

private OutboundSignalWithSender(final OutboundSignal delegate,
final ActorRef sender,
@Nullable final Pair<DittoRuntimeException, Target> enrichmentFailure,
@Nullable final JsonObject extra) {

this.delegate = delegate;
this.sender = sender;
this.enrichmentFailure = enrichmentFailure;
this.extra = extra;
}

static OutboundSignalWithSender of(final Signal<?> signal, final ActorRef sender) {
final OutboundSignal outboundSignal =
OutboundSignalFactory.newOutboundSignal(signal, Collections.emptyList());
return new OutboundSignalWithSender(outboundSignal, sender, null);
return new OutboundSignalWithSender(outboundSignal, sender, null, null);
}

static OutboundSignalWithSender of(final OutboundSignal outboundSignal, final ActorRef sender) {
return new OutboundSignalWithSender(outboundSignal, sender, null);
return new OutboundSignalWithSender(outboundSignal, sender, null, null);
}

@Override
Expand All @@ -794,18 +857,25 @@ public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<

private OutboundSignalWithSender setTargets(final List<Target> targets) {
return new OutboundSignalWithSender(OutboundSignalFactory.newOutboundSignal(delegate.getSource(), targets),
sender, extra);
sender, enrichmentFailure, extra);
}

// Also set target, because enrichment can fail per target.
private OutboundSignalWithSender setFailedEnrichment(final DittoRuntimeException e, final Target t) {
return new OutboundSignalWithSender(
OutboundSignalFactory.newOutboundSignal(delegate.getSource(), getTargets()),
sender, Pair.apply(e, t), extra);
}

private OutboundSignalWithSender setExtra(final JsonObject extra) {
return new OutboundSignalWithSender(
OutboundSignalFactory.newOutboundSignal(delegate.getSource(), getTargets()),
sender, extra
sender, enrichmentFailure, extra
);
}

private OutboundSignalWithSender mapped(final Mapped mapped) {
return new OutboundSignalWithSender(mapped, sender, extra);
return new OutboundSignalWithSender(mapped, sender, enrichmentFailure, extra);
}

private Mapped asMapped() {
Expand All @@ -817,6 +887,7 @@ public String toString() {
return getClass().getSimpleName() + " [" +
"delegate=" + delegate +
", sender=" + sender +
", enrichmentFailure=" + enrichmentFailure +
", extra=" + extra +
"]";
}
Expand All @@ -832,12 +903,13 @@ public boolean equals(final Object o) {
final OutboundSignalWithSender that = (OutboundSignalWithSender) o;
return Objects.equals(delegate, that.delegate) &&
Objects.equals(sender, that.sender) &&
Objects.equals(enrichmentFailure, that.enrichmentFailure) &&
Objects.equals(extra, that.extra);
}

@Override
public int hashCode() {
return Objects.hash(delegate, sender, extra);
return Objects.hash(delegate, sender, enrichmentFailure, extra);
}

}
Expand Down
Expand Up @@ -52,14 +52,16 @@
import org.eclipse.ditto.things.model.signals.events.ThingModified;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;

@FixMethodOrder(MethodSorters.DEFAULT)
/**
* Tests in addition to {@link MessageMappingProcessorActorTest}
* for {@link OutboundMappingProcessorActor} only.
Expand Down Expand Up @@ -121,6 +123,31 @@ public void sendWeakAckForAllSourcesAndTargetsWhenDroppedByAllTargets() {
}};
}

@Test
public void eventsWithFailedEnrichmentIssueFailedAcks() {
new TestKit(actorSystem) {{
final Props props =
OutboundMappingProcessorActor.props(clientActorProbe.ref(), getProcessors(), CONNECTION,
TestConstants.CONNECTIVITY_CONFIG, 3);
final ActorRef underTest = actorSystem.actorOf(props);

final OutboundSignal outboundSignal = outboundTwinEvent(
Attributes.newBuilder().set("target2", "wayne").build(),
List.of("source1", "target1", "target2", "unknown"),
List.of(target1(), target2())
);
underTest.tell(outboundSignal, getRef());
proxyActorProbe.expectMsgClass(RetrieveThing.class);

final Acknowledgements acks = expectMsgClass(Acknowledgements.class);
final List<String> fackLabels = acks.getFailedAcknowledgements()
.stream()
.map(ack -> ack.getLabel().toString())
.collect(Collectors.toList());
assertThat(fackLabels).containsExactlyInAnyOrder("target2");
}};
}

@Test
public void sendWeakAckWhenDroppedBySomeTarget() {
new TestKit(actorSystem) {{
Expand Down
2 changes: 1 addition & 1 deletion connectivity/service/src/test/resources/test.conf
Expand Up @@ -196,7 +196,7 @@ ditto {
signal-enrichment {
provider = "org.eclipse.ditto.connectivity.service.mapping.ConnectivityByRoundTripSignalEnrichmentProvider"
provider-config {
ask-timeout = 10s
ask-timeout = 2s
// configure cache here in case signal enrichment provider gets swapped to the caching implementation
cache {
maximum-size = 1000
Expand Down

0 comments on commit 52ec182

Please sign in to comment.