Skip to content

Commit

Permalink
Extract RequestedAcksFilter from InboundDispatchingActor; switch to e…
Browse files Browse the repository at this point in the history
…xternal message dispatcher.

Reason: typical QoS headers are available in external message headers
and not in the signal headers.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Nov 7, 2020
1 parent 61818ea commit eafbfd0
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ private Set<AcknowledgementLabel> getDeclaredAckLabels(final InboundMappingOutco
}

@Override
public Optional<Signal<?>> onMapped(final String mapperId, final MappedInboundExternalMessage mappedInboundMessage) {
public Optional<Signal<?>> onMapped(final String mapperId,
final MappedInboundExternalMessage mappedInboundMessage) {
final ExternalMessage incomingMessage = mappedInboundMessage.getSource();
final String source = getAddress(incomingMessage);
final Signal<?> signal = mappedInboundMessage.getSignal();
Expand Down Expand Up @@ -484,7 +485,8 @@ private <T> Stream<T> forwardAcknowledgements(final Acknowledgements acks,
.map(label -> AcknowledgementLabelNotDeclaredException.of(label, acks.getDittoHeaders()))
.findAny();
if (ackLabelNotDeclaredException.isPresent()) {
onError(UNKNOWN_MAPPER_ID, ackLabelNotDeclaredException.get(), getTopicPath(acks), outcomes.getExternalMessage());
onError(UNKNOWN_MAPPER_ID, ackLabelNotDeclaredException.get(), getTopicPath(acks),
outcomes.getExternalMessage());
return Stream.empty();
}
return forwardToConnectionActor(acks, outboundMessageMappingProcessorActor);
Expand Down Expand Up @@ -538,19 +540,19 @@ private Signal<?> appendConnectionAcknowledgementsToSignal(final ExternalMessage
if (additionalAcknowledgementRequests.isEmpty()) {
// do not change the signal's header if no additional acknowledgementRequests are defined in the Source
// to preserve the default behavior for signals without the header 'requested-acks'
return filterAcknowledgements(signal, filter, connectionId);
return RequestedAcksFilter.filterAcknowledgements(signal, message, filter, connectionId);
} else {
// The Source's acknowledgementRequests get appended to the requested-acks DittoHeader of the mapped signal
final Set<AcknowledgementRequest> combinedRequestedAcks =
new HashSet<>(signal.getDittoHeaders().getAcknowledgementRequests());
combinedRequestedAcks.addAll(additionalAcknowledgementRequests);

return filterAcknowledgements(signal.setDittoHeaders(
return RequestedAcksFilter.filterAcknowledgements(signal.setDittoHeaders(
signal.getDittoHeaders()
.toBuilder()
.acknowledgementRequests(combinedRequestedAcks)
.build()),
filter,
message, filter,
connectionId);
}
}
Expand Down Expand Up @@ -633,49 +635,6 @@ private DittoHeaders applyInboundHeaderMapping(final Signal<?> signal,
return builder;
}

/**
* Only used for testing
* TODO: Extract logic into separate class and test it there.
*
* @param signal signal to filter requested acknowledges for
* @param filter the filter string
* @param connectionId the connection ID receiving the signal.
* @return the filtered signal.
*/
static Signal<?> filterAcknowledgements(final Signal<?> signal, final @Nullable String filter,
final ConnectionId connectionId) {
if (filter != null) {
final String requestedAcks = DittoHeaderDefinition.REQUESTED_ACKS.getKey();
final boolean headerDefined = signal.getDittoHeaders().containsKey(requestedAcks);
final String fullFilter = "header:" + requestedAcks + "|fn:default('[]')|" + filter;
final ExpressionResolver resolver = Resolvers.forSignal(signal, connectionId);
final Optional<String> resolverResult = resolver.resolveAsPipelineElement(fullFilter).toOptional();
if (resolverResult.isEmpty()) {
// filter tripped: set requested-acks to []
return signal.setDittoHeaders(DittoHeaders.newBuilder(signal.getDittoHeaders())
.acknowledgementRequests(Collections.emptySet())
.build());
} else if (headerDefined) {
// filter not tripped, header defined
return signal.setDittoHeaders(DittoHeaders.newBuilder(signal.getDittoHeaders())
.putHeader(requestedAcks, resolverResult.orElseThrow())
.build());
} else {
// filter not tripped, header not defined:
// - evaluate filter again against unresolved and set requested-acks accordingly
// - if filter is not resolved, then keep requested-acks undefined for the default behavior
final Optional<String> unsetFilterResult =
resolver.resolveAsPipelineElement(filter).toOptional();
return unsetFilterResult.<Signal<?>>map(newAckRequests ->
signal.setDittoHeaders(DittoHeaders.newBuilder(signal.getDittoHeaders())
.putHeader(requestedAcks, newAckRequests)
.build()))
.orElse(signal);
}
}
return signal;
}

/**
* Appends the ConnectionId to the processed {@code commandResponse} payload.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.ditto.services.connectivity.messaging.mappingoutcome.MappingOutcome;
import org.eclipse.ditto.services.connectivity.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
import org.eclipse.ditto.services.models.connectivity.ExternalMessageFactory;
import org.eclipse.ditto.services.models.connectivity.MappedInboundExternalMessage;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.signals.base.Signal;
Expand Down Expand Up @@ -158,8 +159,15 @@ private Stream<MappingOutcome<MappedInboundExternalMessage>> runMapper(final Mes
final DittoHeaders headersWithMapper =
dittoHeaders.toBuilder().inboundPayloadMapper(mapper.getId()).build();
final Signal<?> signalWithMapperHeader = signal.setDittoHeaders(headersWithMapper);
// TODO
final ExternalMessage messageWithMappingInfo =
message;
// ExternalMessageFactory.newExternalMessageBuilder(message)
// .withTopicPath(adaptable.getTopicPath())
// .withAuthorizationContext(dittoHeaders.getAuthorizationContext())
// .build();
final MappedInboundExternalMessage mappedMessage =
MappedInboundExternalMessage.of(message, adaptable.getTopicPath(),
MappedInboundExternalMessage.of(messageWithMappingInfo, adaptable.getTopicPath(),
signalWithMapperHeader);
mappedMessages.add(mappedMessage);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.connectivity.messaging;

import java.util.Collections;
import java.util.Optional;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.model.placeholders.ExpressionResolver;
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
import org.eclipse.ditto.signals.base.Signal;

/**
* Execute filter for requested-acks configured in connection sources.
*/
final class RequestedAcksFilter {

/**
* Apply the configured requested-acks filter to a signal mapped from an external message.
* - If the filter is tripped, set requested-acks to []
* - If the filter is not tripped and requested-acks is defined, leave it
* - If the filter is not tripped, requested-acks is not defined, fallback to the resolved value of the filter.
*
* @param signal signal to filter requested acknowledges for
* @param externalMessage incoming external message that mapped to the signal
* @param filter the filter string
* @param connectionId the connection ID receiving the signal.
* @return the filtered signal.
*/
static Signal<?> filterAcknowledgements(final Signal<?> signal,
final ExternalMessage externalMessage,
final @Nullable String filter,
final ConnectionId connectionId) {
if (filter != null) {
final String requestedAcks = DittoHeaderDefinition.REQUESTED_ACKS.getKey();
final boolean headerDefined = signal.getDittoHeaders().containsKey(requestedAcks);
final String requestedAcksValue = getDefaultRequestedAcks(headerDefined, signal);
final String fullFilter = "fn:default('" + requestedAcksValue + "')|" + filter;
final ExpressionResolver resolver = Resolvers.forExternalMessage(externalMessage, connectionId);
final Optional<String> resolverResult = resolver.resolveAsPipelineElement(fullFilter).toOptional();
if (resolverResult.isEmpty()) {
// filter tripped: set requested-acks to []
return signal.setDittoHeaders(DittoHeaders.newBuilder(signal.getDittoHeaders())
.acknowledgementRequests(Collections.emptySet())
.build());
} else if (headerDefined) {
// filter not tripped, header defined
return signal.setDittoHeaders(DittoHeaders.newBuilder(signal.getDittoHeaders())
.putHeader(requestedAcks, resolverResult.orElseThrow())
.build());
} else {
// filter not tripped, header not defined:
// - evaluate filter again against unresolved and set requested-acks accordingly
// - if filter is not resolved, then keep requested-acks undefined for the default behavior
final Optional<String> unsetFilterResult =
resolver.resolveAsPipelineElement(filter).toOptional();
return unsetFilterResult.<Signal<?>>map(newAckRequests ->
signal.setDittoHeaders(DittoHeaders.newBuilder(signal.getDittoHeaders())
.putHeader(requestedAcks, newAckRequests)
.build()))
.orElse(signal);
}
}
return signal;
}

private static String getDefaultRequestedAcks(final boolean headerDefined, final Signal<?> signal) {
if (headerDefined) {
final String headerValue = signal.getDittoHeaders().get(DittoHeaderDefinition.REQUESTED_ACKS.getKey());
if (!headerValue.contains("'")) {
return headerValue;
}
}
return "[]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
package org.eclipse.ditto.services.connectivity.messaging;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.eclipse.ditto.services.connectivity.messaging.InboundDispatchingActor.filterAcknowledgements;
import static org.eclipse.ditto.services.connectivity.messaging.TestConstants.disableLogging;

import java.time.Duration;
Expand All @@ -37,7 +35,6 @@
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotDeclaredException;
import org.eclipse.ditto.model.base.acks.AcknowledgementRequest;
import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.FilteredAcknowledgementRequest;
import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.auth.AuthorizationModelFactory;
Expand All @@ -47,7 +44,6 @@
import org.eclipse.ditto.model.base.common.ResponseType;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.model.connectivity.ConnectionSignalIdEnforcementFailedException;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.Enforcement;
Expand All @@ -56,7 +52,6 @@
import org.eclipse.ditto.model.connectivity.MessageMappingFailedException;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.model.connectivity.Topic;
import org.eclipse.ditto.model.placeholders.PlaceholderFunctionSignatureInvalidException;
import org.eclipse.ditto.model.placeholders.UnresolvedPlaceholderException;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.model.things.ThingId;
Expand All @@ -74,7 +69,6 @@
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
import org.eclipse.ditto.signals.commands.things.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.signals.commands.things.modify.DeleteThing;
import org.eclipse.ditto.signals.commands.things.modify.DeleteThingResponse;
import org.eclipse.ditto.signals.commands.things.modify.ModifyAttribute;
import org.eclipse.ditto.signals.commands.things.modify.ModifyAttributeResponse;
Expand All @@ -93,46 +87,6 @@
*/
public final class MessageMappingProcessorActorTest extends AbstractMessageMappingProcessorActorTest {

private static final ConnectionId connectionId = ConnectionId.generateRandom();

@Test
public void testRequestedAcknowledgementFilter() {
// GIVEN
final String requestedAcks = DittoHeaderDefinition.REQUESTED_ACKS.getKey();
final AcknowledgementRequest twinPersistedAckRequest =
AcknowledgementRequest.of(DittoAcknowledgementLabel.TWIN_PERSISTED);
final Signal<?> signal = DeleteThing.of(ThingId.of("thing:id"), DittoHeaders.empty());
final Signal<?> signalWithRequestedAcks = DeleteThing.of(ThingId.of("thing:id"), DittoHeaders.newBuilder()
.acknowledgementRequest(twinPersistedAckRequest)
.build());

// WHEN/THEN

final Signal<?> notFilteredSignal =
filterAcknowledgements(signal, "fn:filter('2+2','ne','5')", connectionId);
assertThat(notFilteredSignal.getDittoHeaders()).doesNotContainKey(requestedAcks);

final Signal<?> filteredSignal =
filterAcknowledgements(signal, "fn:filter('2+2','eq','5')", connectionId);
assertThat(filteredSignal.getDittoHeaders()).contains(Map.entry(requestedAcks, "[]"));

assertThatExceptionOfType(PlaceholderFunctionSignatureInvalidException.class).isThrownBy(() ->
filterAcknowledgements(signal, "fn:filter('2','+','2','eq','5')", connectionId)
);

final Signal<?> defaultValueSetSignal =
filterAcknowledgements(signal, "fn:default('[\"twin-persisted\"]')", connectionId);
assertThat(defaultValueSetSignal.getDittoHeaders().getAcknowledgementRequests())
.containsExactly(twinPersistedAckRequest);

final Signal<?> transformedSignal =
filterAcknowledgements(signalWithRequestedAcks, "fn:filter('2+2','eq','5')|fn:default('[\"custom\"]')",
connectionId);
assertThat(transformedSignal.getDittoHeaders().getAcknowledgementRequests())
.containsExactly(AcknowledgementRequest.parseAcknowledgementRequest("custom"));
}


@Test
public void testExternalMessageInDittoProtocolIsProcessedWithDefaultMapper() {
testExternalMessageInDittoProtocolIsProcessed(null);
Expand Down
Loading

0 comments on commit eafbfd0

Please sign in to comment.