diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/httppush/HttpPushValidator.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/httppush/HttpPushValidator.java index 8d782245a4..0fafcd18db 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/httppush/HttpPushValidator.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/httppush/HttpPushValidator.java @@ -12,8 +12,6 @@ */ package org.eclipse.ditto.connectivity.service.messaging.httppush; -import static org.eclipse.ditto.placeholders.PlaceholderFactory.newHeadersPlaceholder; - import java.util.Collection; import java.util.List; import java.util.Map; @@ -22,7 +20,6 @@ import java.util.stream.Collectors; import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders; import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException; import org.eclipse.ditto.connectivity.model.ConnectionType; @@ -31,6 +28,7 @@ import org.eclipse.ditto.connectivity.model.Target; import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; import org.eclipse.ditto.connectivity.service.config.HttpPushConfig; +import org.eclipse.ditto.connectivity.service.messaging.Resolvers; import org.eclipse.ditto.connectivity.service.messaging.validation.AbstractProtocolValidator; import akka.actor.ActorSystem; @@ -102,14 +100,7 @@ protected void validateSource(final Source source, final DittoHeaders dittoHeade protected void validateTarget(final Target target, final DittoHeaders dittoHeaders, final Supplier targetDescription) { validateHeaderMapping(target.getHeaderMapping(), dittoHeaders); - validateTemplate(target.getAddress(), dittoHeaders, - ConnectivityPlaceholders.newEntityPlaceholder(), - ConnectivityPlaceholders.newThingPlaceholder(), - ConnectivityPlaceholders.newTopicPathPlaceholder(), - ConnectivityPlaceholders.newResourcePlaceholder(), - ConnectivityPlaceholders.newTimePlaceholder(), - newHeadersPlaceholder(), - ConnectivityPlaceholders.newFeaturePlaceholder()); + validateTemplate(target.getAddress(), dittoHeaders, Resolvers.getPlaceholders()); validateTargetAddress(target.getAddress(), dittoHeaders, targetDescription); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/AbstractMqttValidator.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/AbstractMqttValidator.java index 32e5988083..7f98acb0bf 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/AbstractMqttValidator.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/AbstractMqttValidator.java @@ -13,14 +13,9 @@ package org.eclipse.ditto.connectivity.service.messaging.mqtt; import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newEntityPlaceholder; -import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newFeaturePlaceholder; import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newPolicyPlaceholder; -import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newResourcePlaceholder; import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newSourceAddressPlaceholder; import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newThingPlaceholder; -import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newTimePlaceholder; -import static org.eclipse.ditto.connectivity.api.placeholders.ConnectivityPlaceholders.newTopicPathPlaceholder; -import static org.eclipse.ditto.placeholders.PlaceholderFactory.newHeadersPlaceholder; import java.text.MessageFormat; import java.util.AbstractMap; @@ -51,6 +46,7 @@ import org.eclipse.ditto.connectivity.model.Source; import org.eclipse.ditto.connectivity.model.Target; import org.eclipse.ditto.connectivity.service.config.MqttConfig; +import org.eclipse.ditto.connectivity.service.messaging.Resolvers; import org.eclipse.ditto.connectivity.service.messaging.validation.AbstractProtocolValidator; import org.eclipse.ditto.placeholders.ExpressionResolver; import org.eclipse.ditto.placeholders.Placeholder; @@ -114,8 +110,7 @@ protected void validateTarget(final Target target, final DittoHeaders dittoHeade } validateTargetQoS(qos.get(), dittoHeaders, targetDescription); - validateTemplate(target.getAddress(), dittoHeaders, newThingPlaceholder(), newTopicPathPlaceholder(), - newResourcePlaceholder(), newTimePlaceholder(), newHeadersPlaceholder(), newFeaturePlaceholder()); + validateTemplate(target.getAddress(), dittoHeaders, Resolvers.getPlaceholders()); } /** diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java index 03734adab0..6456126172 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java @@ -226,6 +226,7 @@ public final class ConnectionPersistenceActor private ConnectivityConfig getConnectivityConfigWithOverwrites(final Config connectivityConfigOverwrites) { final Config defaultConfig = getContext().getSystem().settings().config(); final Config withOverwrites = connectivityConfigOverwrites.withFallback(defaultConfig); + return ConnectivityConfig.of(withOverwrites); } @@ -414,6 +415,7 @@ protected ConnectivityEvent modifyEventBeforePersist(final ConnectivityEvent< .build(); return superEvent.setDittoHeaders(headersWithJournalTags); } + return superEvent; } @@ -776,6 +778,7 @@ private void testConnection(final StagedCommand command) { command.withResponse( toDittoRuntimeException(error, entityId, command.getDittoHeaders())), ActorRef.noSender()); + return null; }); } @@ -885,6 +888,7 @@ private void respondWithEmptyLogs(final WithDittoHeaders command, final ActorRef private CompletionStage startAndAskClientActors(final SignalWithEntityId cmd, final int clientCount) { startClientActorsIfRequired(clientCount, cmd.getDittoHeaders()); final Object msg = consistentHashableEnvelope(cmd, cmd.getEntityId().toString()); + return processClientAskResult(Patterns.ask(clientActorRouter, msg, clientActorAskTimeout)); } @@ -905,6 +909,7 @@ private CompletionStage broadcastToClientActorsIfStarted(final Command on connection <{}> failed due to {}: {}.", action, entityId, dre.getClass().getSimpleName(), dre.getMessage()); + return null; }