Skip to content

Commit

Permalink
Remove legacy default header mapping and reply-to address for responses.
Browse files Browse the repository at this point in the history
Responses and errors are now only published at reply-targets.

Signed-off-by: Yufei Cai <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Nov 11, 2019
1 parent d563dd8 commit 90a939a
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,6 @@ private static String doApply(final String template, final ExpressionResolver ex
.build());
}

static String checkAllPlaceholdersResolved(final String input) {
if (Placeholders.containsAnyPlaceholder(input)) {
throw UnresolvedPlaceholderException.newBuilder(input).build();
}
return input;
}

private PlaceholderFilter() {
throw new AssertionError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionId;
Expand Down Expand Up @@ -73,16 +72,6 @@
*/
public abstract class BasePublisherActor<T extends PublishTarget> extends AbstractActor {

/**
* Legacy header mapping for responses to "reply-to" addresses without header mapping.
*/
protected static final HeaderMapping LEGACY_DEFAULT_HEADER_MAPPER =
ConnectivityModelFactory.newHeaderMapping(JsonObject.newBuilder()
.set("correlation-id", "{{header:correlation-id|fn:default(fn:delete())}}")
.set("reply-to", "{{header:correlation-id|fn:default(fn:delete())}}")
.set("content-type", "{{header:content-type|fn:default(fn:delete())}}")
.build());

private static final HeadersPlaceholder HEADERS_PLACEHOLDER = PlaceholderFactory.newHeadersPlaceholder();
private static final ThingPlaceholder THING_PLACEHOLDER = PlaceholderFactory.newThingPlaceholder();
private static final TopicPathPlaceholder TOPIC_PLACEHOLDER = PlaceholderFactory.newTopicPathPlaceholder();
Expand Down Expand Up @@ -137,28 +126,31 @@ public Receive createReceive() {
final Optional<ReplyTarget> replyTargetOptional = response.getInternalHeaders()
.getReplyTarget()
.flatMap(this::getReplyTargetByIndex);
final Optional<String> replyAddressFromHeader =
Optional.ofNullable(response.getHeaders().get(ExternalMessage.REPLY_TO_HEADER));
if (replyTargetOptional.isPresent()) {
final ReplyTarget replyTarget = replyTargetOptional.get();
final ExpressionResolver expressionResolver =
getExpressionResolver(outbound.getExternalMessage(), outbound.getSource());
final T replyTargetAddress =
toPublishTarget(applyForTargetAddress(expressionResolver, replyTarget.getAddress()));
final HeaderMapping headerMapping =
replyTargetOptional.flatMap(ReplyTarget::getHeaderMapping)
.orElse(getDefaultHeaderMapping());
final ExternalMessage responseWithMappedHeaders =
applyHeaderMapping(expressionResolver, outbound, headerMapping, log());
publishResponseOrError(replyTargetAddress, outbound, responseWithMappedHeaders);
} else if (replyAddressFromHeader.isPresent()) {
final T replyToAddress = toReplyToTarget(replyAddressFromHeader.get());
final ExternalMessage responseWithMappedHeaders = applyHeaderMappingForReplyToAddress(response);
publishResponseOrError(replyToAddress, outbound, responseWithMappedHeaders);
final String address = replyTarget.getAddress();
final Optional<T> resolvedAddress =
applyForReplyTargetAddress(expressionResolver, address).map(this::toPublishTarget);

if (resolvedAddress.isPresent()) {
final HeaderMapping headerMapping =
replyTargetOptional.flatMap(ReplyTarget::getHeaderMapping).orElse(null);
final ExternalMessage responseWithMappedHeaders =
applyHeaderMapping(expressionResolver, outbound, headerMapping, log());
publishResponseOrError(resolvedAddress.get(), outbound, responseWithMappedHeaders);
} else {
log().debug("Response dropped, reply-target address unresolved: <{}>", address);
responseDroppedMonitor.failure(outbound.getSource(),
"Response dropped since its reply-target's address " +
"cannot be resolved to a value: {0}",
address);
}
} else {
log().info("Response dropped, missing reply-to or reply-target address: {}", response);
log().debug("Response dropped, missing reply-target: {}", response);
responseDroppedMonitor.failure(outbound.getSource(),
"Response dropped since it was missing a reply-to or reply-target address.");
"Response dropped since it was missing a reply-target.");
}
})
.match(OutboundSignal.WithExternalMessage.class, outbound -> {
Expand All @@ -170,7 +162,7 @@ public Receive createReceive() {
log().debug("Publishing mapped message of type <{}> to targets <{}>: {}",
outboundSource.getType(), outbound.getTargets(), message);
outbound.getTargets().forEach(target -> {
log().info("Publishing mapped message of type <{}> to target address <{}>",
log().debug("Publishing mapped message of type <{}> to target address <{}>",
outboundSource.getType(), target.getAddress());

final ConnectionMonitor publishedMonitor =
Expand All @@ -179,8 +171,7 @@ public Receive createReceive() {
try {
final T publishTarget = toPublishTarget(target.getAddress());
final ExternalMessage messageWithMappedHeaders =
applyHeaderMapping(outbound,
target.getHeaderMapping().orElse(getDefaultHeaderMapping()), log());
applyHeaderMapping(outbound, target.getHeaderMapping().orElse(null), log());
publishMessage(target, publishTarget, messageWithMappedHeaders, publishedMonitor);
} catch (final DittoRuntimeException e) {
// TODO: might there be private information in the exception message so we shouldn't be allowed to see them?
Expand All @@ -204,21 +195,6 @@ public Receive createReceive() {
return receiveBuilder.build();
}

@Nullable
protected HeaderMapping getDefaultHeaderMapping() {
return LEGACY_DEFAULT_HEADER_MAPPER;
}

/**
* By default, keep all headers in the mapped outbound signal.
* Override this method for protocol-specific handling.
*
* @param response the outbound signal.
*/
protected ExternalMessage applyHeaderMappingForReplyToAddress(final ExternalMessage response) {
return response;
}

private void publishResponseOrError(final T address, final OutboundSignal outbound,
final ExternalMessage response) {

Expand Down Expand Up @@ -347,9 +323,11 @@ private static Optional<String> applyExpressionResolver(final ExpressionResolver
return PlaceholderFilter.applyWithDeletion(value, resolver);
}

// For target address: Leave it as-is if resolution fails or results in deletion.
private static String applyForTargetAddress(final ExpressionResolver resolver, final String value) {
return PlaceholderFilter.applyOrElseRetain(value, resolver);
/**
* Resolve target address. If not resolvable,
*/
private static Optional<String> applyForReplyTargetAddress(final ExpressionResolver resolver, final String value) {
return resolver.resolve(value, false).toOptional();
}

private static ExpressionResolver getExpressionResolver(final ExternalMessage originalMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,21 @@ private void handleInboundMessage(final ExternalMessage externalMessage) {
try {
mapExternalMessageToSignalAndForwardToConcierge(externalMessage);
} catch (final Exception e) {
handleException(e, externalMessage.getHeaders());
handleException(e, externalMessage);
}
}

private void handleException(final Exception e, final Map<String, String> headers) {
private void handleException(final Exception e, final ExternalMessage message) {
if (e instanceof DittoRuntimeException) {
final DittoRuntimeException dittoRuntimeException = (DittoRuntimeException) e;
responseMappedMonitor.getLogger()
.failure("Got exception {0} when processing external message: {1}",
dittoRuntimeException.getErrorCode(),
e.getMessage());
handleDittoRuntimeException(dittoRuntimeException, headers);
handleDittoRuntimeException(dittoRuntimeException, DittoHeaders.newBuilder()
.putHeaders(message.getHeaders())
.putHeaders(message.getInternalHeaders())
.build());
} else {
responseMappedMonitor.getLogger()
.failure("Got unknown exception when processing external message: {1}", e.getMessage());
Expand Down Expand Up @@ -242,7 +245,7 @@ private void mapExternalMessageToSignalAndForwardToConcierge(final ExternalMessa
conciergeForwarder.tell(adjustedSignal, getSelf());
},
() -> log.debug("Message mapping returned null, message is dropped."),
exception -> this.handleException(exception, externalMessage.getHeaders()),
exception -> this.handleException(exception, externalMessage),
inboundMapped, inboundDropped, InfoProviderFactory.forExternalMessage(externalMessage));

messageMappingProcessor.process(messageWithAuthSubject, inboundMappingHandlers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,6 @@ protected void publishMessage(@Nullable final Target target, final AmqpTarget pu
}
}

// Override header-mapping for reply-to addresses by mapping all headers to application properties.
@Override
protected ExternalMessage applyHeaderMappingForReplyToAddress(final ExternalMessage response) {
return ExternalMessageFactory.newExternalMessageBuilder(response)
.withHeaders(JMSPropertyMapper.mapAsApplicationProperties(
response.getHeaders(),
LEGACY_DEFAULT_HEADER_MAPPER.getMapping().keySet()
))
.build();
}

private void tryToPublishMessage(final AmqpTarget publishTarget,
final ExternalMessage message, final ConnectionMonitor publishedMonitor) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.HeaderMapping;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.services.connectivity.messaging.BasePublisherActor;
import org.eclipse.ditto.services.connectivity.messaging.config.ConnectionConfig;
Expand Down Expand Up @@ -149,12 +148,6 @@ protected DiagnosticLoggingAdapter log() {
return log;
}

@Override
protected HeaderMapping getDefaultHeaderMapping() {
// HTTP-push connections do not include any headers by default.
return null;
}

private HttpRequest createRequest(final HttpPublishTarget publishTarget, final ExternalMessage message) {
final Pair<Iterable<HttpHeader>, ContentType> headersPair = getHttpHeadersPair(message);
final HttpRequest requestWithoutEntity = factory.newRequest(publishTarget).addHeaders(headersPair.first());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.ditto.model.connectivity.ConnectionType;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.ConnectivityStatus;
import org.eclipse.ditto.model.connectivity.ReplyTarget;
import org.eclipse.ditto.model.connectivity.Source;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.model.connectivity.Topic;
Expand Down Expand Up @@ -238,10 +239,8 @@ public void testConsumeFromTopicWithIdEnforcement() {
public void testConsumeFromTopicWithIdEnforcementExpectErrorResponse() {
disableLogging(actorSystem);

final Source mqttSource = newFilteredMqttSource(
"eclipse/{{ thing:namespace }}/{{ thing:name }}", // enforcement filter
"eclipse/+/+" // subscribed topic
);
final Source mqttSource =
newFilteredMqttSource("eclipse/{{ thing:namespace }}/{{ thing:name }}", "eclipse/+/+");

final Connection connectionWithEnforcement =
ConnectivityModelFactory.newConnectionBuilder(connectionId, ConnectionType.MQTT,
Expand Down Expand Up @@ -271,10 +270,12 @@ private TestKit testConsumeModifyThing(final Connection connection, final String
private static Source newFilteredMqttSource(final String filter, final String... sources) {
return ConnectivityModelFactory.newSourceBuilder()
.authorizationContext(AUTHORIZATION_CONTEXT)
.index(1)
.consumerCount(1)
.addresses(TestConstants.asSet(sources))
.enforcement(ConnectivityModelFactory.newSourceAddressEnforcement(filter))
.replyTarget(ReplyTarget.newBuilder()
.address("{{ header:reply-to }}")
.build())
.qos(1)
.build();
}
Expand Down

0 comments on commit 90a939a

Please sign in to comment.