Skip to content

Commit

Permalink
Issue #852: change how connectivity issues weak acks.
Browse files Browse the repository at this point in the history
- ConnectionPersistenceActor: issue weak acks for all requested
  source-declared or target-issued custom acks if signal is dropped
  due to authorization or RQL filter before enrichment.

- OutboundMappingProcessorActor: issue weak acks for all requested
  source-declared or target-issued custom acks if signal is dropped
  at all targets due to payload mapping or RQL filter after
  enrichment.

- OutboundMappingProcessorActor: issue weak acks for requested
  target-issued acknowledgements if some but not all targets
  dropped the signal due to payload mapping or RQL filter after
  enrichment.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Nov 4, 2020
1 parent 90e682b commit 2aa45e6
Show file tree
Hide file tree
Showing 6 changed files with 491 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.AcknowledgementRequest;
import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.entity.id.EntityIdWithType;
Expand All @@ -57,7 +59,6 @@
import org.eclipse.ditto.model.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.model.query.things.ThingPredicateVisitor;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.protocoladapter.ProtocolAdapter;
import org.eclipse.ditto.services.base.config.limits.DefaultLimitsConfig;
import org.eclipse.ditto.services.base.config.limits.LimitsConfig;
import org.eclipse.ditto.services.connectivity.mapping.ConnectivitySignalEnrichmentProvider;
Expand All @@ -71,6 +72,7 @@
import org.eclipse.ditto.services.connectivity.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.services.connectivity.messaging.monitoring.DefaultConnectionMonitorRegistry;
import org.eclipse.ditto.services.connectivity.messaging.monitoring.logs.InfoProviderFactory;
import org.eclipse.ditto.services.connectivity.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.services.connectivity.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.models.connectivity.OutboundSignal;
Expand All @@ -82,6 +84,7 @@
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.base.WithId;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
Expand Down Expand Up @@ -123,7 +126,7 @@ public final class OutboundMappingProcessorActor
private final ActorRef clientActor;
private final OutboundMappingProcessor outboundMappingProcessor;
private final ConnectionId connectionId;
private final ActorRef connectionActor;
private final ActorRef connectionActor; // TODO: remove field
private final MappingConfig mappingConfig;
private final DefaultConnectionMonitorRegistry connectionMonitorRegistry;
private final ConnectionMonitor responseDispatchedMonitor;
Expand All @@ -132,6 +135,8 @@ public final class OutboundMappingProcessorActor
private final SignalEnrichmentFacade signalEnrichmentFacade;
private final int processorPoolSize;
private final DittoRuntimeExceptionToErrorResponseFunction toErrorResponseFunction;
private final Predicate<AcknowledgementLabel> isSourceDeclaredAck;
private final Predicate<AcknowledgementLabel> isTargetIssuedAck;

@SuppressWarnings("unused")
private OutboundMappingProcessorActor(final ActorRef clientActor,
Expand Down Expand Up @@ -166,6 +171,48 @@ private OutboundMappingProcessorActor(final ActorRef clientActor,
ConnectivitySignalEnrichmentProvider.get(getContext().getSystem()).getFacade(connectionId);
this.processorPoolSize = determinePoolSize(processorPoolSize, mappingConfig.getMaxPoolSize());
toErrorResponseFunction = DittoRuntimeExceptionToErrorResponseFunction.of(limitsConfig.getHeadersMaxSize());

isSourceDeclaredAck =
ConnectionValidator.getSourceDeclaredAcknowledgementLabels(connectionId, connection.getSources())
.collect(Collectors.toSet())::contains;
isTargetIssuedAck =
ConnectionValidator.getTargetIssuedAcknowledgementLabels(connectionId, connection.getTargets())
// live response does not require a weak ack
.filter(ackLabel -> !DittoAcknowledgementLabel.LIVE_RESPONSE.equals(ackLabel))
.collect(Collectors.toSet())::contains;
}

/**
* Issue weak acknowledgements to the sender of a signal.
*
* @param signal the signal with 0 or more acknowledgement requests.
* @param isWeakAckLabel the predicate to test if a requested acknowledgement label should generate a weak ack.
* @param sender the actor who send the signal and who should receive the weak acknowledgements.
*/
public static void issueWeakAcknowledgements(final Signal<?> signal,
final Predicate<AcknowledgementLabel> isWeakAckLabel,
final ActorRef sender) {
final Set<AcknowledgementRequest> requestedAcks = signal.getDittoHeaders().getAcknowledgementRequests();
final boolean customAckRequested = requestedAcks.stream()
.anyMatch(request -> !DittoAcknowledgementLabel.contains(request.getLabel()));
final EntityId entityId = signal.getEntityId();
if (customAckRequested && entityId instanceof EntityIdWithType) {
final List<AcknowledgementLabel> weakAckLabels = requestedAcks.stream()
.map(AcknowledgementRequest::getLabel)
.filter(isWeakAckLabel)
.collect(Collectors.toList());
if (!weakAckLabels.isEmpty()) {
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final JsonValue ackBody = JsonValue.of("Acknowledgement was issued automatically, " +
"because the connection targets are unauthorized, " +
"or because signal was dropped by a configured RQL filter or by the payload mapper.");
final List<Acknowledgement> ackList = weakAckLabels.stream()
.map(label -> Acknowledgement.weak(label, (EntityIdWithType) entityId, dittoHeaders, ackBody))
.collect(Collectors.toList());
final Acknowledgements weakAcks = Acknowledgements.of(ackList, dittoHeaders);
sender.tell(weakAcks, ActorRef.noSender());
}
}
}

private int determinePoolSize(final int connectionPoolSize, final int maxPoolSize) {
Expand Down Expand Up @@ -255,8 +302,10 @@ protected OutboundSignalWithId mapMessage(final OutboundSignal message) {
@Override
protected Sink<OutboundSignalWithId, ?> createSink() {
// Enrich outbound signals by extra fields if necessary.
// Targets attached to the OutboundSignal are pre-selected by authorization, topic and filter sans enrichment.
final Flow<OutboundSignalWithId, OutboundSignal.MultiMapped, ?> flow = Flow.<OutboundSignalWithId>create()
.mapAsync(processorPoolSize, outbound -> toMultiMappedOutboundSignal(
outbound,
Source.single(outbound)
.via(splitByTargetExtraFieldsFlow())
.mapAsync(mappingConfig.getParallelism(), this::enrichAndFilterSignal)
Expand Down Expand Up @@ -330,7 +379,7 @@ private CompletionStage<Collection<OutboundSignalWithId>> enrichAndFilterSignal(
final DittoHeaders headers = DittoHeaders.newBuilder()
.authorizationContext(target.getAuthorizationContext())
// the correlation-id MUST NOT be set! as the DittoHeaders are used as a caching key in the Caffeine
// cache this would break the cache loading
// cache this would break the cache loading
// schema version is always the latest for connectivity signal enrichment.
.schemaVersion(JsonSchemaVersion.LATEST)
.build();
Expand Down Expand Up @@ -543,17 +592,33 @@ private Set<ConnectionMonitor> getMonitorsForOutboundSignal(final OutboundSignal
}

private <T> CompletionStage<Collection<OutboundSignal.MultiMapped>> toMultiMappedOutboundSignal(
final OutboundSignalWithId outbound,
final Source<OutboundSignalWithId, T> source) {

return source.runWith(Sink.seq(), materializer)
.thenApply(outboundSignals -> {
if (outboundSignals.isEmpty()) {
// signal dropped; issue weak acks for all requested acks belonging to this connection
issueWeakAcknowledgements(outbound.getSource(),
isSourceDeclaredAck.or(isTargetIssuedAck),
outbound.sender);
return List.of();
} else {
final ActorRef sender = outboundSignals.get(0).sender;
final List<Mapped> mappedSignals = outboundSignals.stream()
.map(OutboundSignalWithId::asMapped)
.collect(Collectors.toList());
final List<Target> targetsToPublishAt = outboundSignals.stream()
.map(OutboundSignal::getTargets)
.flatMap(List::stream)
.collect(Collectors.toList());
final Predicate<AcknowledgementLabel> willPublish =
ConnectionValidator.getTargetIssuedAcknowledgementLabels(connectionId,
targetsToPublishAt)
.collect(Collectors.toSet())::contains;
issueWeakAcknowledgements(outbound.getSource(),
isTargetIssuedAck.and(willPublish.negate()),
sender);
return List.of(OutboundSignalFactory.newMultiMappedOutboundSignal(mappedSignals, sender));
}
});
Expand All @@ -576,30 +641,13 @@ private Collection<OutboundSignalWithId> applyFilter(final OutboundSignalWithId
.filter(ThingPredicateVisitor.apply(criteria))
.map(thing -> outboundSignalWithExtra))
.map(Collections::singletonList)
.orElseGet(() -> {
issuePotentialWeakAcknowledgements(signal);
return List.of();
});
.orElse(List.of());
} else {
// no signal enrichment: filtering is already done in SignalFilter since there is no ignored field
return Collections.singletonList(outboundSignalWithExtra);
}
}

private void issuePotentialWeakAcknowledgements(final Signal<?> signal) {
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final EntityId entityId = signal.getEntityId();
if (entityId instanceof EntityIdWithType) {
final JsonValue ackBody = JsonValue.of("Acknowledgement was issued automatically, " +
"because the event was filtered due to a configured RQL filter.");
dittoHeaders.getAcknowledgementRequests()
.stream()
.map(AcknowledgementRequest::getLabel)
.map(label -> Acknowledgement.weak(label, (EntityIdWithType) entityId, dittoHeaders, ackBody))
.forEach(weakAck -> connectionActor.tell(weakAck, ActorRef.noSender()));
}
}

private static String stackTraceAsString(final DittoRuntimeException exception) {
final StringWriter stringWriter = new StringWriter();
exception.printStackTrace(new PrintWriter(stringWriter));
Expand Down Expand Up @@ -649,7 +697,7 @@ private static Pair<List<Target>, List<Pair<Target, FilteredTopic>>> splitTarget
private static boolean isTwinCommandResponseWithReplyTarget(final Signal<?> signal) {
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
return signal instanceof CommandResponse &&
!ProtocolAdapter.isLiveSignal(signal) &&
// !ProtocolAdapter.isLiveSignal(signal) && // TODO: confirm live signal check not needed; rename method
dittoHeaders.getReplyTarget().isPresent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.services.connectivity.messaging.persistence.stages.ConnectionAction.UPDATE_SUBSCRIPTIONS;
import static org.eclipse.ditto.services.connectivity.messaging.validation.ConnectionValidator.resolveConnectionIdPlaceholder;
import static org.eclipse.ditto.services.models.connectivity.ConnectivityMessagingConstants.CLUSTER_ROLE;

import java.time.Duration;
Expand All @@ -23,26 +22,21 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;

import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotDeclaredException;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.model.base.acks.AcknowledgementRequest;
import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.entity.id.EntityIdWithType;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
Expand All @@ -58,11 +52,10 @@
import org.eclipse.ditto.model.connectivity.FilteredTopic;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.model.connectivity.Topic;
import org.eclipse.ditto.model.placeholders.ExpressionResolver;
import org.eclipse.ditto.model.placeholders.PlaceholderFactory;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.model.things.WithThingId;
import org.eclipse.ditto.services.connectivity.messaging.ClientActorPropsFactory;
import org.eclipse.ditto.services.connectivity.messaging.OutboundMappingProcessorActor;
import org.eclipse.ditto.services.connectivity.messaging.amqp.AmqpValidator;
import org.eclipse.ditto.services.connectivity.messaging.config.ConnectionConfig;
import org.eclipse.ditto.services.connectivity.messaging.config.ConnectivityConfig;
Expand Down Expand Up @@ -192,7 +185,6 @@ public final class ConnectionPersistenceActor
private final Duration loggingEnabledDuration;
private final ConnectionConfig config;
private final MonitoringConfig monitoringConfig;
private final ExpressionResolver connectionIdResolver;

private int subscriptionCounter = 0;
private ConnectivityStatus pubSubStatus = ConnectivityStatus.UNKNOWN;
Expand Down Expand Up @@ -259,9 +251,6 @@ public final class ConnectionPersistenceActor
this.loggingEnabledDuration = monitoringConfig.logger().logDuration();
this.checkLoggingActiveInterval = monitoringConfig.logger().loggingActiveCheckInterval();
this.clientActorsPerNode = clientActorsPerNode;

connectionIdResolver = PlaceholderFactory.newExpressionResolver(PlaceholderFactory.newConnectionIdPlaceholder(),
connectionId);
}

@Override
Expand Down Expand Up @@ -562,10 +551,13 @@ private void handleSignal(final Signal<?> signal) {
logDroppedSignal(signal, signal.getType(), "Was sent by myself.");
return;
}
final List<Target> subscribedAndAuthorizedTargets =
signalFilter.filter(signal, target -> issuePotentialWeakAcknowledgements(target, signal));
// TODO: second argument of SignalFilter#filter always the empty consumer?
final List<Target> subscribedAndAuthorizedTargets = signalFilter.filter(signal, target -> {});

if (subscribedAndAuthorizedTargets.isEmpty()) {
logDroppedSignal(signal, signal.getType(), "No subscribed and authorized targets present");
// issue weak acks here as the signal will not reach OutboundMappingProcessorActor
issueWeakAcknowledgements(signal, getSender());
return;
}

Expand All @@ -586,23 +578,10 @@ private void handleSignal(final Signal<?> signal) {
clientActorRouter.tell(msg, getSender());
}

private void issuePotentialWeakAcknowledgements(final Target filteringTarget, final Signal<?> signal) {
final EntityId entityId = signal.getEntityId();
if (entityId instanceof EntityIdWithType) {
final ActorRef sender = getSender();
final JsonValue ackBody = JsonValue.of("Acknowledgement was issued automatically, " +
"because the event was filtered due to a configured RQL filter.");
final Set<AcknowledgementLabel> sourceDeclaredAcks = getSourceDeclaredAcks();
final Optional<AcknowledgementLabel> optionalIssuedAck = filteringTarget.getIssuedAcknowledgementLabel();
final Set<AcknowledgementLabel> ackLabelsToAcknowledgeWeakly =
Stream.concat(sourceDeclaredAcks.stream(), optionalIssuedAck.stream()).collect(Collectors.toSet());
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
dittoHeaders.getAcknowledgementRequests().stream()
.map(AcknowledgementRequest::getLabel)
.filter(ackLabelsToAcknowledgeWeakly::contains)
.map(label -> Acknowledgement.weak(label, (EntityIdWithType) entityId, dittoHeaders, ackBody))
.forEach(weakAck -> sender.tell(weakAck, ActorRef.noSender()));
}
private void issueWeakAcknowledgements(final Signal<?> signal, final ActorRef sender) {
OutboundMappingProcessorActor.issueWeakAcknowledgements(signal,
getAckLabelsToDeclare()::contains,
sender);
}

private Signal<?> adjustSignalAndStartAckForwarder(final Signal<?> signal, final ThingId thingId) {
Expand Down Expand Up @@ -641,12 +620,7 @@ private Signal<?> adjustSignalAndStartAckForwarder(final Signal<?> signal, final
private Set<AcknowledgementLabel> getSourceDeclaredAcks() {

if (entity != null && ackLabelsDeclared) {
return entity.getSources()
.stream()
.flatMap(source -> source.getDeclaredAcknowledgementLabels().stream())
.map(ackLabel -> resolveConnectionIdPlaceholder(connectionIdResolver, ackLabel))
.filter(Optional::isPresent)
.map(Optional::get)
return ConnectionValidator.getSourceDeclaredAcknowledgementLabels(entityId, entity.getSources())
.collect(Collectors.toSet());
} else {
return Set.of();
Expand All @@ -656,13 +630,7 @@ private Set<AcknowledgementLabel> getSourceDeclaredAcks() {
private Set<AcknowledgementLabel> getTargetIssuedAcks() {

if (entity != null && ackLabelsDeclared) {
return entity.getTargets()
.stream()
.map(Target::getIssuedAcknowledgementLabel)
.flatMap(Optional::stream)
.map(ackLabel -> resolveConnectionIdPlaceholder(connectionIdResolver, ackLabel))
.filter(Optional::isPresent)
.flatMap(Optional::stream)
return ConnectionValidator.getTargetIssuedAcknowledgementLabels(entityId, entity.getTargets())
.collect(Collectors.toSet());
} else {
return Set.of();
Expand Down
Loading

0 comments on commit 2aa45e6

Please sign in to comment.