Skip to content

Commit

Permalink
Issue #878: move auth and acknowledgement forwarding from ConnectionP…
Browse files Browse the repository at this point in the history
…ersistenceActor to OutboundDispatchingActor; add InboundSignal; fix deserialization of null event metadata.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Nov 24, 2020
1 parent 6da2ccc commit 542384f
Show file tree
Hide file tree
Showing 22 changed files with 808 additions and 327 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -85,7 +86,11 @@ public List<ActorRef> getOtherActors(final ActorRef clientActor) {
* @param entityId the entity ID.
* @return the client actor responsible for it.
*/
public ActorRef lookUp(final EntityId entityId) {
return sortedRefs.get(PubSubFactory.hashForPubSub(entityId) % sortedRefs.size());
public Optional<ActorRef> lookup(final EntityId entityId) {
if (sortedRefs.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(sortedRefs.get(PubSubFactory.hashForPubSub(entityId) % sortedRefs.size()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.eclipse.ditto.services.models.acks.AcknowledgementAggregatorActor;
import org.eclipse.ditto.services.models.acks.AcknowledgementAggregatorActorStarter;
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
import org.eclipse.ditto.services.models.connectivity.InboundSignal;
import org.eclipse.ditto.services.models.connectivity.MappedInboundExternalMessage;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
Expand Down Expand Up @@ -334,9 +335,9 @@ private PartialFunction<Signal<?>, Stream<IncomingSignal>> dispatchResponsesAndS
.match(Acknowledgements.class, acks ->
forwardAcknowledgements(acks, declaredAckLabels, outcomes))
.match(CommandResponse.class, ProtocolAdapter::isLiveSignal, liveResponse ->
forwardToConnectionActor(liveResponse, ActorRef.noSender())
forwardToClientActor(liveResponse, ActorRef.noSender())
)
.match(ThingSearchCommand.class, cmd -> forwardToConnectionActor(cmd, sender))
.match(ThingSearchCommand.class, cmd -> forwardToClientActor(cmd, sender))
.matchAny(baseSignal -> ackregatorStarter.preprocess(baseSignal,
(signal, isAckRequesting) -> Stream.of(new IncomingSignal(signal,
getReturnAddress(sender, isAckRequesting, signal),
Expand Down Expand Up @@ -457,17 +458,23 @@ private void handleErrorDuringStartingOfAckregator(final DittoRuntimeException e
* @param <T> type of elements for the next step..
* @return an empty source of Signals
*/
private <T> Stream<T> forwardToConnectionActor(final Signal<?> signal,
private <T> Stream<T> forwardToClientActor(final Signal<?> signal,
@Nullable final ActorRef sender) {
connectionActor.tell(signal, sender);
// TODO: only forward CreateSubscription to connection actor; dispatch commands with prefix directly
if (signal instanceof ThingSearchCommand) {
connectionActor.tell(signal, sender);
} else {
// wrap response or search command for dispatching by entity ID
getContext().parent().tell(InboundSignal.of(signal), sender);
}
return Stream.empty();
}

private <T> Stream<T> forwardAcknowledgement(final Acknowledgement ack,
final Set<AcknowledgementLabel> declaredAckLabels,
final InboundMappingOutcomes outcomes) {
if (declaredAckLabels.contains(ack.getLabel())) {
return forwardToConnectionActor(ack, outboundMessageMappingProcessorActor);
return forwardToClientActor(ack, outboundMessageMappingProcessorActor);
} else {
final AcknowledgementLabelNotDeclaredException exception =
AcknowledgementLabelNotDeclaredException.of(ack.getLabel(), ack.getDittoHeaders());
Expand All @@ -489,7 +496,7 @@ private <T> Stream<T> forwardAcknowledgements(final Acknowledgements acks,
outcomes.getExternalMessage());
return Stream.empty();
}
return forwardToConnectionActor(acks, outboundMessageMappingProcessorActor);
return forwardToClientActor(acks, outboundMessageMappingProcessorActor);
}

private TopicPath getTopicPath(final Acknowledgement ack) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.connectivity.messaging;

import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotDeclaredException;
import org.eclipse.ditto.model.base.acks.AcknowledgementRequest;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.model.things.WithThingId;
import org.eclipse.ditto.services.models.acks.AcknowledgementForwarderActor;
import org.eclipse.ditto.services.models.connectivity.InboundSignal;
import org.eclipse.ditto.services.models.connectivity.OutboundSignal;
import org.eclipse.ditto.services.models.connectivity.OutboundSignalFactory;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.CommandResponse;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;

/**
* This Actor makes the decision whether to dispatch outbound signals and their acknowledgements or to drop them.
*/
final class OutboundDispatchingActor extends AbstractActor {

/**
* The name of this Actor in the ActorSystem.
*/
public static final String ACTOR_NAME = "outboundDispatching";

private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

private final OutboundMappingSettings settings;
private final ActorRef outboundMappingProcessorActor;

@SuppressWarnings("unused")
private OutboundDispatchingActor(final OutboundMappingSettings settings,
final ActorRef outboundMappingProcessorActor) {
this.settings = settings;
this.outboundMappingProcessorActor = outboundMappingProcessorActor;
}

static Props props(final OutboundMappingSettings settings, final ActorRef outboundMappingProcessorActor) {
return Props.create(OutboundDispatchingActor.class, settings, outboundMappingProcessorActor);
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(InboundSignal.class, this::inboundSignal)
.match(CommandResponse.class, this::commandResponse)
.match(Signal.class, this::handleSignal)
.matchAny(message -> logger.warning("Unknown message: <{}>", message))
.build();
}

private void inboundSignal(final InboundSignal inboundSignal) {
handleInboundResponseOrAcknowledgement(inboundSignal.getSignal());
}

private void commandResponse(final Object message) {
outboundMappingProcessorActor.tell(message, getSender());
}

private void handleSignal(final Signal<?> signal) {
if (settings.getConnectionId().toString().equals(signal.getDittoHeaders().getOrigin().orElse(null))) {
logDroppedSignal(signal, signal.getType(), "Was sent by myself.");
return;
}
final List<Target> subscribedAndAuthorizedTargets = settings.getSignalFilter().filter(signal);

if (subscribedAndAuthorizedTargets.isEmpty()) {
logDroppedSignal(signal, signal.getType(), "No subscribed and authorized targets present");
// issue weak acks here as the signal will not reach OutboundMappingProcessorActor
issueWeakAcknowledgements(signal, getSender());
return;
}

final Signal<?> signalToForward;
if (signal instanceof WithThingId) {
final WithThingId thingEvent = (WithThingId) signal;
signalToForward = adjustSignalAndStartAckForwarder(signal, thingEvent.getThingEntityId());
} else {
signalToForward = signal;
}

logger.debug("Forwarding signal <{}> to client actor with targets: {}.", signalToForward.getType(),
subscribedAndAuthorizedTargets);

final OutboundSignal outbound =
OutboundSignalFactory.newOutboundSignal(signalToForward, subscribedAndAuthorizedTargets);

outboundMappingProcessorActor.tell(outbound, getSender());
}

private void logDroppedSignal(final WithDittoHeaders<?> withDittoHeaders, final String type, final String reason) {
logger.withCorrelationId(withDittoHeaders).debug("Signal ({}) dropped: {}", type, reason);
}

private void issueWeakAcknowledgements(final Signal<?> signal, final ActorRef sender) {
OutboundMappingProcessorActor.issueWeakAcknowledgements(signal,
this::isSourceDeclaredOrTargetIssuedAck,
sender);
}

private void denyNonSourceDeclaredAck(final Acknowledgement ack) {
getSender().tell(AcknowledgementLabelNotDeclaredException.of(ack.getLabel(), ack.getDittoHeaders()),
ActorRef.noSender());
}

private boolean isNotSourceDeclaredAck(final Acknowledgement acknowledgement) {
return !settings.getSourceDeclaredAcks().contains(acknowledgement.getLabel());
}

private boolean isSourceDeclaredOrTargetIssuedAck(final AcknowledgementLabel label) {
return settings.getSourceDeclaredAcks().contains(label) || settings.getTargetIssuedAcks().contains(label);
}

private Signal<?> adjustSignalAndStartAckForwarder(final Signal<?> signal, final ThingId thingId) {
final Collection<AcknowledgementRequest> ackRequests = signal.getDittoHeaders().getAcknowledgementRequests();
if (ackRequests.isEmpty()) {
return signal;
}
final Predicate<AcknowledgementLabel> isSourceDeclaredAck = settings.getSourceDeclaredAcks()::contains;
final Set<AcknowledgementLabel> targetIssuedAcks = settings.getTargetIssuedAcks();
final boolean hasSourceDeclaredAcks = ackRequests.stream()
.map(AcknowledgementRequest::getLabel)
.anyMatch(isSourceDeclaredAck);
if (hasSourceDeclaredAcks) {
// start ackregator for source declared acks
return AcknowledgementForwarderActor.startAcknowledgementForwarder(getContext(),
thingId,
signal,
settings.getAcknowledgementConfig(),
this::isSourceDeclaredOrTargetIssuedAck
);
} else {
// no need to start ackregator for target-issued acks; they go to the sender directly
return signal.setDittoHeaders(signal.getDittoHeaders().toBuilder()
.acknowledgementRequests(ackRequests.stream()
.filter(request -> targetIssuedAcks.contains(request.getLabel()))
.collect(Collectors.toList()))
.build());
}
}

private void handleInboundResponseOrAcknowledgement(final WithDittoHeaders<?> responseOrAck) {
if (responseOrAck instanceof Acknowledgement) {
final Acknowledgement ack = (Acknowledgement) responseOrAck;
if (isNotSourceDeclaredAck(ack)) {
denyNonSourceDeclaredAck(ack);
return;
}
}

final ActorContext context = getContext();
final Consumer<ActorRef> action = forwarder -> forwarder.forward(responseOrAck, context);
final Runnable emptyAction = () -> {
final String template = "No AcknowledgementForwarderActor found, forwarding to concierge: <{}>";
if (logger.isDebugEnabled()) {
logger.withCorrelationId(responseOrAck).debug(template, responseOrAck);
} else {
logger.withCorrelationId(responseOrAck).info(template, responseOrAck.getClass().getCanonicalName());
}
settings.getProxyActor().tell(responseOrAck, ActorRef.noSender());
};

context.findChild(AcknowledgementForwarderActor.determineActorName(responseOrAck.getDittoHeaders()))
.ifPresentOrElse(action, emptyAction);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@
*/
package org.eclipse.ditto.services.connectivity.messaging;

import static org.eclipse.ditto.services.connectivity.messaging.validation.ConnectionValidator.resolveConnectionIdPlaceholder;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -26,7 +23,6 @@

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.AcknowledgementRequest;
import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
Expand All @@ -35,27 +31,22 @@
import org.eclipse.ditto.model.connectivity.ConnectionType;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.PayloadMapping;
import org.eclipse.ditto.model.connectivity.PayloadMappingDefinition;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.ProtocolAdapter;
import org.eclipse.ditto.protocoladapter.ProtocolFactory;
import org.eclipse.ditto.services.connectivity.mapping.DefaultMessageMapperFactory;
import org.eclipse.ditto.services.connectivity.mapping.DittoMessageMapper;
import org.eclipse.ditto.services.connectivity.mapping.MessageMapper;
import org.eclipse.ditto.services.connectivity.mapping.MessageMapperFactory;
import org.eclipse.ditto.services.connectivity.mapping.MessageMapperRegistry;
import org.eclipse.ditto.services.connectivity.messaging.config.ConnectivityConfig;
import org.eclipse.ditto.services.connectivity.messaging.mappingoutcome.MappingOutcome;
import org.eclipse.ditto.services.connectivity.messaging.validation.ConnectionValidator;
import org.eclipse.ditto.services.connectivity.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
import org.eclipse.ditto.services.models.connectivity.ExternalMessageFactory;
import org.eclipse.ditto.services.models.connectivity.OutboundSignal;
import org.eclipse.ditto.services.models.connectivity.OutboundSignalFactory;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.signals.base.Signal;

import akka.actor.ActorSelection;
import akka.actor.ActorSystem;

/**
Expand Down Expand Up @@ -103,29 +94,21 @@ public static OutboundMappingProcessor of(final Connection connection,
final ProtocolAdapter protocolAdapter,
final ThreadSafeDittoLoggingAdapter logger) {

final ConnectionId connectionId = connection.getId();
final ConnectionType connectionType = connection.getConnectionType();
final PayloadMappingDefinition mappingDefinition = connection.getPayloadMappingDefinition();
final Set<AcknowledgementLabel> sourceDeclaredAcks =
ConnectionValidator.getSourceDeclaredAcknowledgementLabels(connectionId, connection.getSources())
.collect(Collectors.toSet());
final Set<AcknowledgementLabel> targetIssuedAcks =
ConnectionValidator.getTargetIssuedAcknowledgementLabels(connectionId, connection.getTargets())
// live response does not require a weak ack
.filter(ackLabel -> !DittoAcknowledgementLabel.LIVE_RESPONSE.equals(ackLabel))
.collect(Collectors.toSet());

final ThreadSafeDittoLoggingAdapter loggerWithConnectionId =
logger.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connectionId);

final MessageMapperFactory messageMapperFactory =
DefaultMessageMapperFactory.of(connectionId, actorSystem, connectivityConfig.getMappingConfig(),
loggerWithConnectionId);
final MessageMapperRegistry registry =
messageMapperFactory.registryOf(DittoMessageMapper.CONTEXT, mappingDefinition);
final ActorSelection deadLetterSelection = actorSystem.actorSelection(actorSystem.deadLetters().path());
return of(OutboundMappingSettings.of(connection, actorSystem, deadLetterSelection, connectivityConfig,
protocolAdapter, logger));
}

return new OutboundMappingProcessor(connectionId, connectionType, registry, loggerWithConnectionId,
protocolAdapter, sourceDeclaredAcks, targetIssuedAcks);
/**
* Create an {@code OutboundMappingProcessor} from its settings.
*
* @param settings Settings of an outbound mapping processor.
* @return the processor.
*/
public static OutboundMappingProcessor of(final OutboundMappingSettings settings) {
return new OutboundMappingProcessor(settings.getConnectionId(), settings.getConnectionType(),
settings.getRegistry(), settings.getLogger(), settings.getProtocolAdapter(),
settings.getSourceDeclaredAcks(), settings.getTargetIssuedAcks());
}

boolean isSourceDeclaredAck(final AcknowledgementLabel label) {
Expand Down
Loading

0 comments on commit 542384f

Please sign in to comment.