diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/AbstractMessageMapper.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/AbstractMessageMapper.java index 774da6687a..725c14e985 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/AbstractMessageMapper.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/AbstractMessageMapper.java @@ -49,12 +49,13 @@ public Collection getContentTypeBlocklist() { } @Override - public final void configure(final MappingConfig mappingConfig, final MessageMapperConfiguration configuration) { + public final void configure(final ConnectionContext connectionContext, + final MessageMapperConfiguration configuration) { this.id = configuration.getId(); this.incomingConditions = configuration.getIncomingConditions(); this.outgoingConditions = configuration.getOutgoingConditions(); this.contentTypeBlocklist = configuration.getContentTypeBlocklist(); - doConfigure(mappingConfig, configuration); + doConfigure(connectionContext.getConnectivityConfig().getMappingConfig(), configuration); } /** diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/ConnectionContext.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/ConnectionContext.java new file mode 100644 index 0000000000..f04755ed3d --- /dev/null +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/ConnectionContext.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.connectivity.service.mapping; + +import org.eclipse.ditto.connectivity.model.Connection; +import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; + +/** + * Connection-related information relevant to a message mapper. + */ +public interface ConnectionContext { + + /** + * @return the connection in which the mapper is defined. + */ + Connection getConnection(); + + /** + * @return the connectivity config for the connection in which the mapper is defined. + */ + ConnectivityConfig getConnectivityConfig(); + + /** + * Create a copy of this context with a modified connectivity config. + * + * @param modifiedConfig the modified config. + * @return the new context. + */ + ConnectionContext withConnectivityConfig(ConnectivityConfig modifiedConfig); + +} diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/DefaultMessageMapperFactory.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/DefaultMessageMapperFactory.java index 5b56e8a19e..da44fe2bb5 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/DefaultMessageMapperFactory.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/DefaultMessageMapperFactory.java @@ -29,13 +29,11 @@ import javax.annotation.concurrent.Immutable; import org.atteo.classindex.ClassIndex; -import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; import org.eclipse.ditto.connectivity.model.MappingContext; import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationFailedException; import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationInvalidException; import org.eclipse.ditto.connectivity.model.PayloadMappingDefinition; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger; import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; import org.eclipse.ditto.json.JsonObject; @@ -66,8 +64,7 @@ public final class DefaultMessageMapperFactory implements MessageMapperFactory { private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(DefaultMessageMapperFactory.class); - private final ConnectionId connectionId; - private final MappingConfig mappingConfig; + private final ConnectionContext connectionContext; /** * The actor system used for dynamic class instantiation. @@ -85,14 +82,12 @@ public final class DefaultMessageMapperFactory implements MessageMapperFactory { private final LoggingAdapter log; - private DefaultMessageMapperFactory(final ConnectionId connectionId, - final MappingConfig mappingConfig, + private DefaultMessageMapperFactory(final ConnectionContext connectionContext, final ExtendedActorSystem actorSystem, final List messageMapperExtensions, final LoggingAdapter log) { - this.connectionId = checkNotNull(connectionId); - this.mappingConfig = checkNotNull(mappingConfig, "MappingConfig"); + this.connectionContext = checkNotNull(connectionContext, "connectionContext"); this.actorSystem = checkNotNull(actorSystem); this.messageMapperExtensions = checkNotNull(messageMapperExtensions); this.log = checkNotNull(log); @@ -101,22 +96,20 @@ private DefaultMessageMapperFactory(final ConnectionId connectionId, /** * Creates a new factory and returns the instance * - * @param connectionId ID of the connection. + * @param connectionContext context of the connection for which this factory is instantiated. * @param actorSystem the actor system to use for mapping config + dynamicAccess. - * @param mappingConfig the configuration of the mapping behaviour. * @param log the log adapter used for debug and warning logs. * @return the new instance. * @throws NullPointerException if any argument is {@code null}. */ - public static DefaultMessageMapperFactory of(final ConnectionId connectionId, + public static DefaultMessageMapperFactory of(final ConnectionContext connectionContext, final ActorSystem actorSystem, - final MappingConfig mappingConfig, final LoggingAdapter log) { final ExtendedActorSystem extendedActorSystem = (ExtendedActorSystem) actorSystem; final List messageMapperExtensions = tryToLoadMessageMappersExtensions(extendedActorSystem); - return new DefaultMessageMapperFactory(connectionId, mappingConfig, extendedActorSystem, + return new DefaultMessageMapperFactory(connectionContext, extendedActorSystem, messageMapperExtensions, log); } @@ -246,6 +239,7 @@ private static List> loadMessageMapperEx * @return the instantiated mapper if it can be instantiated from the configured factory class. */ Optional createMessageMapperInstance(final String mappingEngine) { + final var connectionId = connectionContext.getConnection().getId(); if (registeredMappers.containsKey(mappingEngine)) { final Class messageMapperClass = registeredMappers.get(mappingEngine); MessageMapper result = createAnyMessageMapper(messageMapperClass, @@ -295,7 +289,7 @@ private static PayloadMapper getPayloadMapperAnnotation(final Map.Entry configureInstance(final MessageMapper mapper, final MessageMapperConfiguration options) { try { - mapper.configure(mappingConfig, options); + mapper.configure(connectionContext, options); return Optional.of(mapper); } catch (final MessageMapperConfigurationInvalidException e) { log.warning("Failed to apply configuration <{}> to mapper instance <{}>: {}", options, mapper, @@ -309,8 +303,7 @@ public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final DefaultMessageMapperFactory that = (DefaultMessageMapperFactory) o; - return Objects.equals(connectionId, that.connectionId) && - Objects.equals(mappingConfig, that.mappingConfig) && + return Objects.equals(connectionContext, that.connectionContext) && Objects.equals(actorSystem, that.actorSystem) && Objects.equals(messageMapperExtensions, that.messageMapperExtensions) && Objects.equals(log, that.log); @@ -318,6 +311,6 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(connectionId, mappingConfig, actorSystem, messageMapperExtensions, log); + return Objects.hash(connectionContext, actorSystem, messageMapperExtensions, log); } } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/DittoConnectionContext.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/DittoConnectionContext.java new file mode 100644 index 0000000000..5d81e77beb --- /dev/null +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/DittoConnectionContext.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.connectivity.service.mapping; + +import java.util.Objects; + +import org.eclipse.ditto.connectivity.model.Connection; +import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; + +/** + * Implementation of {@link ConnectionContext}. + */ +public final class DittoConnectionContext implements ConnectionContext { + + private final Connection connection; + private final ConnectivityConfig connectivityConfig; + + private DittoConnectionContext(final Connection connection, final ConnectivityConfig connectivityConfig) { + this.connection = connection; + this.connectivityConfig = connectivityConfig; + } + + /** + * Create a connection context from a connection and its connectivity config. + * + * @param connection the connection. + * @param config the connectivity config. + * @return the connection context. + */ + public static DittoConnectionContext of(final Connection connection, final ConnectivityConfig config) { + return new DittoConnectionContext(connection, config); + } + + @Override + public Connection getConnection() { + return connection; + } + + @Override + public ConnectivityConfig getConnectivityConfig() { + return connectivityConfig; + } + + @Override + public ConnectionContext withConnectivityConfig(final ConnectivityConfig modifiedConfig) { + return new DittoConnectionContext(connection, modifiedConfig); + } + + @Override + public String toString() { + return getClass().getSimpleName() + + "[connection=" + connection + + ",connectivityConfig=" + connectivityConfig + + "]"; + } + + @Override + public boolean equals(final Object other) { + if (other instanceof DittoConnectionContext) { + final var that = (DittoConnectionContext) other; + return Objects.equals(connection, that.connection) && + Objects.equals(connectivityConfig, that.connectivityConfig); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(connection, connectivityConfig); + } +} diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/MessageMapper.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/MessageMapper.java index a786eb4ce4..8010869a19 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/MessageMapper.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/MessageMapper.java @@ -19,11 +19,10 @@ import java.util.Map; import java.util.Optional; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; -import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.connectivity.api.ExternalMessage; import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationInvalidException; +import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.protocol.Adaptable; -import org.eclipse.ditto.connectivity.api.ExternalMessage; /** * Defines a message mapper which maps a {@link ExternalMessage} to a {@link Adaptable} and vice versa. @@ -45,7 +44,7 @@ public interface MessageMapper { /** * Returns a blocklist of content-types which shall not be handled by this message mapper. * Is determined from the passed in {@code MessageMapperConfiguration} in - * {@link #configure(org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig, MessageMapperConfiguration)}. + * {@link #configure(ConnectionContext, MessageMapperConfiguration)}. * * @return a blocklist of content-types which shall not be handled by this message mapper. */ @@ -54,13 +53,13 @@ public interface MessageMapper { /** * Applies configuration for this MessageMapper. * - * @param mappingConfig the config scoped to the mapping section "ditto.connectivity.mapping". + * @param connectionContext the connection and its contextual information including its connectivity config. * @param configuration the configuration to apply. * @throws MessageMapperConfigurationInvalidException if configuration is invalid. * @throws org.eclipse.ditto.connectivity.model.MessageMapperConfigurationFailedException if the configuration * failed for a mapper specific reason. */ - void configure(MappingConfig mappingConfig, MessageMapperConfiguration configuration); + void configure(ConnectionContext connectionContext, MessageMapperConfiguration configuration); /** * Maps an {@link ExternalMessage} to an {@link Adaptable} @@ -91,16 +90,16 @@ default JsonObject getDefaultOptions() { /** * Returns the conditions to be checked before mapping incoming messages. - * @return the conditions. * + * @return the conditions. * @since 1.3.0 */ Map getIncomingConditions(); /** * Returns the conditions to be checked before mapping outgoing messages. - * @return the conditions. * + * @return the conditions. * @since 1.3.0 */ Map getOutgoingConditions(); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/RawMessageMapper.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/RawMessageMapper.java index e3c23f25cb..7290f5340c 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/RawMessageMapper.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/RawMessageMapper.java @@ -32,6 +32,8 @@ import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; import org.eclipse.ditto.connectivity.model.MappingContext; import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; +import org.eclipse.ditto.internal.models.placeholders.ExpressionResolver; +import org.eclipse.ditto.internal.models.placeholders.PlaceholderFactory; import org.eclipse.ditto.json.JsonArray; import org.eclipse.ditto.json.JsonCollectors; import org.eclipse.ditto.json.JsonFactory; @@ -52,8 +54,6 @@ import org.eclipse.ditto.protocol.PayloadBuilder; import org.eclipse.ditto.protocol.ProtocolFactory; import org.eclipse.ditto.protocol.TopicPath; -import org.eclipse.ditto.internal.models.placeholders.ExpressionResolver; -import org.eclipse.ditto.internal.models.placeholders.PlaceholderFactory; import org.eclipse.ditto.things.model.Thing; import org.eclipse.ditto.things.model.ThingId; @@ -176,7 +176,7 @@ public JsonObject getDefaultOptions() { @Override protected void doConfigure(final MappingConfig mappingConfig, final MessageMapperConfiguration configuration) { - dittoMessageMapper.configure(mappingConfig, configuration); + dittoMessageMapper.doConfigure(mappingConfig, configuration); fallbackOutgoingContentType = configuration.findProperty(OUTGOING_CONTENT_TYPE_KEY) .map(ContentType::of) .orElse(fallbackOutgoingContentType); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/WrappingMessageMapper.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/WrappingMessageMapper.java index ea46844971..e4a2c92f1b 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/WrappingMessageMapper.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/mapping/WrappingMessageMapper.java @@ -21,14 +21,13 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; -import org.eclipse.ditto.connectivity.service.config.mapping.MapperLimitsConfig; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; -import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.connectivity.model.MessageMappingFailedException; -import org.eclipse.ditto.protocol.Adaptable; import org.eclipse.ditto.connectivity.api.ExternalMessage; import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; +import org.eclipse.ditto.connectivity.model.MessageMappingFailedException; +import org.eclipse.ditto.connectivity.service.config.mapping.MapperLimitsConfig; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.protocol.Adaptable; /** * Enforce message size limits on a {@link MessageMapper} and adds random correlation IDs should they not be present @@ -88,11 +87,12 @@ public MessageMapper getDelegate() { } @Override - public void configure(final MappingConfig mappingConfig, final MessageMapperConfiguration configuration) { - final MapperLimitsConfig mapperLimitsConfig = mappingConfig.getMapperLimitsConfig(); + public void configure(final ConnectionContext connectionContext, final MessageMapperConfiguration configuration) { + final MapperLimitsConfig mapperLimitsConfig = + connectionContext.getConnectivityConfig().getMappingConfig().getMapperLimitsConfig(); inboundMessageLimit = mapperLimitsConfig.getMaxMappedInboundMessages(); outboundMessageLimit = mapperLimitsConfig.getMaxMappedOutboundMessages(); - delegate.configure(mappingConfig, configuration); + delegate.configure(connectionContext, configuration); } @Override diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java index e34dd596d4..c2b3b7d13f 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java @@ -97,6 +97,8 @@ import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig; import org.eclipse.ditto.connectivity.service.config.MonitoringConfig; import org.eclipse.ditto.connectivity.service.config.mapping.MapperLimitsConfig; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; import org.eclipse.ditto.connectivity.service.messaging.internal.ClientConnected; import org.eclipse.ditto.connectivity.service.messaging.internal.ClientDisconnected; import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure; @@ -168,6 +170,8 @@ public abstract class BaseClientActor extends AbstractFSMWithStash tryToConfigureMessageMappingProcessor() { final ActorSystem actorSystem = getContext().getSystem(); // this one throws DittoRuntimeExceptions when the mapper could not be configured - InboundMappingProcessor.of(connection.getId(), - connection.getConnectionType(), - connection.getPayloadMappingDefinition(), - actorSystem, - connectivityConfig, - protocolAdapter, - logger); - OutboundMappingProcessor.of(connection, - actorSystem, - connectivityConfig, - protocolAdapter, - logger); + InboundMappingProcessor.of(connectionContext, actorSystem, protocolAdapter, logger); + OutboundMappingProcessor.of(connectionContext, actorSystem, protocolAdapter, logger); return CompletableFuture.completedFuture(new Status.Success("mapping")); } @@ -1616,11 +1612,11 @@ private Pair startOutboundActors(final Connection connection try { ConnectivityConfig retrievedConnectivityConfig = connectivityConfigProvider.getConnectivityConfig(connection.getId()); + final var newConnectionContext = connectionContext.withConnectivityConfig(retrievedConnectivityConfig); // this one throws DittoRuntimeExceptions when the mapper could not be configured - settings = OutboundMappingSettings.of(connection, + settings = OutboundMappingSettings.of(connectionContext, getContext().getSystem(), proxyActorSelection, - retrievedConnectivityConfig, protocolAdapter, logger); outboundMappingProcessor = OutboundMappingProcessor.of(settings); @@ -1683,11 +1679,9 @@ private ActorRef startInboundMappingProcessorActor(final Connection connection, connectivityConfigProvider.getConnectivityConfig(connection.getId()); // this one throws DittoRuntimeExceptions when the mapper could not be configured - inboundMappingProcessor = InboundMappingProcessor.of(connection.getId(), - connection.getConnectionType(), - connection.getPayloadMappingDefinition(), + inboundMappingProcessor = InboundMappingProcessor.of( + connectionContext.withConnectivityConfig(retrievedConnectivityConfig), getContext().getSystem(), - retrievedConnectivityConfig, protocolAdapter, logger); } catch (final DittoRuntimeException dre) { @@ -2169,6 +2163,7 @@ private boolean shutdownAfterDisconnect() { } private static final class CloseConnectionAndShutdown { + private static final CloseConnectionAndShutdown INSTANCE = new CloseConnectionAndShutdown(); private CloseConnectionAndShutdown() { diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessor.java index e0031750b2..4bb2fffa8d 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessor.java @@ -29,25 +29,24 @@ import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder; import org.eclipse.ditto.base.model.headers.DittoHeadersSizeChecker; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; +import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.base.service.config.limits.LimitsConfig; +import org.eclipse.ditto.connectivity.api.ExternalMessage; +import org.eclipse.ditto.connectivity.api.MappedInboundExternalMessage; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectionType; -import org.eclipse.ditto.connectivity.model.PayloadMappingDefinition; -import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; import org.eclipse.ditto.connectivity.service.mapping.DefaultMessageMapperFactory; import org.eclipse.ditto.connectivity.service.mapping.DittoMessageMapper; import org.eclipse.ditto.connectivity.service.mapping.MessageMapper; import org.eclipse.ditto.connectivity.service.mapping.MessageMapperFactory; import org.eclipse.ditto.connectivity.service.mapping.MessageMapperRegistry; import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome; -import org.eclipse.ditto.things.model.ThingConstants; -import org.eclipse.ditto.protocol.Adaptable; -import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; -import org.eclipse.ditto.base.service.config.limits.LimitsConfig; import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey; -import org.eclipse.ditto.connectivity.api.ExternalMessage; -import org.eclipse.ditto.connectivity.api.MappedInboundExternalMessage; import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; -import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.protocol.Adaptable; +import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; +import org.eclipse.ditto.things.model.ThingConstants; import akka.actor.ActorSystem; @@ -77,11 +76,8 @@ private InboundMappingProcessor(final ConnectionId connectionId, * Initializes a new command processor with mappers defined in mapping mappingContext. * The dynamic access is needed to instantiate message mappers for an actor system. * - * @param connectionId the connection that the processor works for. - * @param connectionType the type of the connection that the processor works for. - * @param mappingDefinition the configured mappings used by this processor + * @param connectionContext the context of the connection that the processor works for. * @param actorSystem the dynamic access used for message mapper instantiation. - * @param connectivityConfig the configuration settings of the Connectivity service. * @param protocolAdapter the ProtocolAdapter to be used. * @param logger the logging adapter to be used for log statements. * @return the processor instance. @@ -90,20 +86,19 @@ private InboundMappingProcessor(final ConnectionId connectionId, * @throws org.eclipse.ditto.connectivity.model.MessageMapperConfigurationFailedException if the configuration of * one of the {@code mappingContext} failed for a mapper specific reason. */ - public static InboundMappingProcessor of(final ConnectionId connectionId, - final ConnectionType connectionType, - final PayloadMappingDefinition mappingDefinition, + public static InboundMappingProcessor of(final ConnectionContext connectionContext, final ActorSystem actorSystem, - final ConnectivityConfig connectivityConfig, final ProtocolAdapter protocolAdapter, final ThreadSafeDittoLoggingAdapter logger) { + final var connectionId = connectionContext.getConnection().getId(); + final var mappingDefinition = connectionContext.getConnection().getPayloadMappingDefinition(); + final var connectivityConfig = connectionContext.getConnectivityConfig(); final ThreadSafeDittoLoggingAdapter loggerWithConnectionId = logger.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connectionId); final MessageMapperFactory messageMapperFactory = - DefaultMessageMapperFactory.of(connectionId, actorSystem, connectivityConfig.getMappingConfig(), - loggerWithConnectionId); + DefaultMessageMapperFactory.of(connectionContext, actorSystem, loggerWithConnectionId); final MessageMapperRegistry registry = messageMapperFactory.registryOf(DittoMessageMapper.CONTEXT, mappingDefinition); @@ -111,13 +106,14 @@ public static InboundMappingProcessor of(final ConnectionId connectionId, final DittoHeadersSizeChecker dittoHeadersSizeChecker = DittoHeadersSizeChecker.of(limitsConfig.getHeadersMaxSize(), limitsConfig.getAuthSubjectsMaxCount()); - return of(connectionId, connectionType, registry, loggerWithConnectionId, protocolAdapter, - dittoHeadersSizeChecker); + return of(connectionContext, registry, loggerWithConnectionId, protocolAdapter, dittoHeadersSizeChecker); } - static InboundMappingProcessor of(final ConnectionId connectionId, final ConnectionType connectionType, + static InboundMappingProcessor of(final ConnectionContext connectionContext, final MessageMapperRegistry registry, final ThreadSafeDittoLoggingAdapter logger, final ProtocolAdapter adapter, final DittoHeadersSizeChecker sizeChecker) { + final var connectionId = connectionContext.getConnection().getId(); + final var connectionType = connectionContext.getConnection().getConnectionType(); return new InboundMappingProcessor(connectionId, connectionType, registry, logger, adapter, sizeChecker); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessor.java index 4d76d94576..da264c2958 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessor.java @@ -26,25 +26,24 @@ import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; -import org.eclipse.ditto.connectivity.model.Connection; +import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.connectivity.api.ExternalMessage; +import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; +import org.eclipse.ditto.connectivity.api.OutboundSignal; +import org.eclipse.ditto.connectivity.api.OutboundSignalFactory; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectionType; import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; import org.eclipse.ditto.connectivity.model.PayloadMapping; import org.eclipse.ditto.connectivity.model.Target; -import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; import org.eclipse.ditto.connectivity.service.mapping.MessageMapper; import org.eclipse.ditto.connectivity.service.mapping.MessageMapperRegistry; import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome; +import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.protocol.Adaptable; -import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; import org.eclipse.ditto.protocol.ProtocolFactory; -import org.eclipse.ditto.connectivity.api.ExternalMessage; -import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; -import org.eclipse.ditto.connectivity.api.OutboundSignal; -import org.eclipse.ditto.connectivity.api.OutboundSignalFactory; -import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; -import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; import akka.actor.ActorSelection; import akka.actor.ActorSystem; @@ -77,9 +76,8 @@ private OutboundMappingProcessor(final ConnectionId connectionId, * Initializes a new command processor with mappers defined in mapping mappingContext. * The dynamic access is needed to instantiate message mappers for an actor system. * - * @param connection the connection that the processor works for. + * @param connectionContext the context of the connection that the processor works for. * @param actorSystem the dynamic access used for message mapper instantiation. - * @param connectivityConfig the configuration settings of the Connectivity service. * @param protocolAdapter the ProtocolAdapter to be used. * @param logger the logging adapter to be used for log statements. * @return the processor instance. @@ -88,15 +86,14 @@ private OutboundMappingProcessor(final ConnectionId connectionId, * @throws org.eclipse.ditto.connectivity.model.MessageMapperConfigurationFailedException if the configuration of * one of the {@code mappingContext} failed for a mapper specific reason. */ - public static OutboundMappingProcessor of(final Connection connection, + public static OutboundMappingProcessor of(final ConnectionContext connectionContext, final ActorSystem actorSystem, - final ConnectivityConfig connectivityConfig, final ProtocolAdapter protocolAdapter, final ThreadSafeDittoLoggingAdapter logger) { final ActorSelection deadLetterSelection = actorSystem.actorSelection(actorSystem.deadLetters().path()); - return of(OutboundMappingSettings.of(connection, actorSystem, deadLetterSelection, connectivityConfig, - protocolAdapter, logger)); + return of(OutboundMappingSettings.of(connectionContext, actorSystem, deadLetterSelection, protocolAdapter, + logger)); } /** diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingSettings.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingSettings.java index 1a155466bb..2b175785d1 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingSettings.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingSettings.java @@ -19,11 +19,10 @@ import org.eclipse.ditto.base.model.acks.AcknowledgementLabel; import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel; -import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectionType; import org.eclipse.ditto.connectivity.model.PayloadMappingDefinition; -import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; import org.eclipse.ditto.connectivity.service.mapping.DefaultMessageMapperFactory; import org.eclipse.ditto.connectivity.service.mapping.DittoMessageMapper; import org.eclipse.ditto.connectivity.service.mapping.MessageMapperFactory; @@ -33,10 +32,10 @@ import org.eclipse.ditto.connectivity.service.messaging.monitoring.DefaultConnectionMonitorRegistry; import org.eclipse.ditto.connectivity.service.messaging.persistence.SignalFilter; import org.eclipse.ditto.connectivity.service.messaging.validation.ConnectionValidator; -import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey; import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig; import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; +import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; import akka.actor.ActorSelection; import akka.actor.ActorSystem; @@ -81,13 +80,14 @@ private OutboundMappingSettings(final ConnectionId connectionId, this.proxyActor = proxyActor; } - static OutboundMappingSettings of(final Connection connection, + static OutboundMappingSettings of(final ConnectionContext connectionContext, final ActorSystem actorSystem, final ActorSelection proxyActor, - final ConnectivityConfig connectivityConfig, final ProtocolAdapter protocolAdapter, final ThreadSafeDittoLoggingAdapter logger) { + final var connection = connectionContext.getConnection(); + final var connectivityConfig = connectionContext.getConnectivityConfig(); final ConnectionId connectionId = connection.getId(); final ConnectionType connectionType = connection.getConnectionType(); final PayloadMappingDefinition mappingDefinition = connection.getPayloadMappingDefinition(); @@ -104,8 +104,7 @@ static OutboundMappingSettings of(final Connection connection, logger.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connectionId); final MessageMapperFactory messageMapperFactory = - DefaultMessageMapperFactory.of(connectionId, actorSystem, connectivityConfig.getMappingConfig(), - loggerWithConnectionId); + DefaultMessageMapperFactory.of(connectionContext, actorSystem, loggerWithConnectionId); final MessageMapperRegistry registry = messageMapperFactory.registryOf(DittoMessageMapper.CONTEXT, mappingDefinition); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/validation/AbstractProtocolValidator.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/validation/AbstractProtocolValidator.java index 3c8872a586..13a4fe8584 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/validation/AbstractProtocolValidator.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/validation/AbstractProtocolValidator.java @@ -28,14 +28,14 @@ import org.eclipse.ditto.connectivity.model.Target; import org.eclipse.ditto.connectivity.service.config.ConnectivityConfigProvider; import org.eclipse.ditto.connectivity.service.config.ConnectivityConfigProviderFactory; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; import org.eclipse.ditto.connectivity.service.mapping.DefaultMessageMapperFactory; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; import org.eclipse.ditto.connectivity.service.mapping.DittoMessageMapper; import org.eclipse.ditto.connectivity.service.mapping.MessageMapperFactory; import org.eclipse.ditto.connectivity.service.mapping.MessageMapperRegistry; +import org.eclipse.ditto.connectivity.service.messaging.Resolvers; import org.eclipse.ditto.internal.models.placeholders.Placeholder; import org.eclipse.ditto.internal.models.placeholders.PlaceholderFilter; -import org.eclipse.ditto.connectivity.service.messaging.Resolvers; import akka.actor.ActorSystem; @@ -147,10 +147,10 @@ protected void validatePayloadMappings(final Connection connection, final ActorS final DittoHeaders dittoHeaders) { final ConnectivityConfigProvider connectivityConfigProvider = ConnectivityConfigProviderFactory.getInstance(actorSystem); - final MappingConfig mappingConfig = - connectivityConfigProvider.getConnectivityConfig(connection.getId()).getMappingConfig(); + final var connectivityConfig = connectivityConfigProvider.getConnectivityConfig(connection.getId()); + final var connectionContext = DittoConnectionContext.of(connection, connectivityConfig); final MessageMapperFactory messageMapperFactory = - DefaultMessageMapperFactory.of(connection.getId(), actorSystem, mappingConfig, actorSystem.log()); + DefaultMessageMapperFactory.of(connectionContext, actorSystem, actorSystem.log()); try { final MessageMapperRegistry messageMapperRegistry = diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/ConnectionStatusMessageMapperTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/ConnectionStatusMessageMapperTest.java index 130190dd6c..284bc22d33 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/ConnectionStatusMessageMapperTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/ConnectionStatusMessageMapperTest.java @@ -21,20 +21,19 @@ import java.util.Map; import java.util.Optional; -import org.eclipse.ditto.connectivity.service.config.mapping.DefaultMappingConfig; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; -import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException; import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; +import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.connectivity.api.ExternalMessage; +import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationInvalidException; +import org.eclipse.ditto.connectivity.service.messaging.TestConstants; +import org.eclipse.ditto.json.JsonValue; +import org.eclipse.ditto.protocol.Adaptable; +import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; import org.eclipse.ditto.things.model.DefinitionIdentifier; import org.eclipse.ditto.things.model.FeatureDefinition; import org.eclipse.ditto.things.model.ThingId; -import org.eclipse.ditto.protocol.Adaptable; -import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; -import org.eclipse.ditto.connectivity.api.ExternalMessage; -import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; -import org.eclipse.ditto.base.model.signals.Signal; import org.eclipse.ditto.things.model.signals.commands.modify.ModifyFeature; import org.eclipse.ditto.things.model.signals.commands.modify.ModifyFeatureProperty; import org.junit.Before; @@ -42,8 +41,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -import com.typesafe.config.ConfigFactory; - /** * Tests {@link ConnectionStatusMessageMapper}. */ @@ -56,7 +53,7 @@ public class ConnectionStatusMessageMapperTest { private static final DittoProtocolAdapter DITTO_PROTOCOL_ADAPTER = DittoProtocolAdapter.newInstance(); private static final String EXPECTED_READY_UNTIL_IN_DISTANT_FUTURE = "9999-12-31T23:59:59Z"; - private static MappingConfig mappingConfig; + private static ConnectionContext connectionContext; private Map validHeader; private Map validConfigProps; @@ -66,7 +63,8 @@ public class ConnectionStatusMessageMapperTest { @Before public void setUp() { - mappingConfig = DefaultMappingConfig.of(ConfigFactory.empty()); + connectionContext = + DittoConnectionContext.of(TestConstants.createConnection(), TestConstants.CONNECTIVITY_CONFIG); underTest = new ConnectionStatusMessageMapper(); validConfigProps = Map.of(ConnectionStatusMessageMapper.MAPPING_OPTIONS_PROPERTIES_THING_ID, @@ -87,7 +85,7 @@ public void setUp() { @Test public void doForwardMapWithValidUseCase() { // GIVEN - underTest.configure(mappingConfig, validMapperConfig); + underTest.configure(connectionContext, validMapperConfig); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(validHeader).build(); // WHEN @@ -109,7 +107,7 @@ public void doForwardMapWithValidUseCase() { @Test public void emittedCommandShouldExplicitlyRequestNoAcknowledgements() { // GIVEN - underTest.configure(mappingConfig, validMapperConfig); + underTest.configure(connectionContext, validMapperConfig); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(validHeader).build(); // WHEN @@ -127,7 +125,7 @@ public void emittedCommandShouldExplicitlyRequestNoAcknowledgements() { @Test public void doForwardMapWithValidUseCaseTtdZero() { // GIVEN - underTest.configure(mappingConfig, validMapperConfig); + underTest.configure(connectionContext, validMapperConfig); validHeader.put(ConnectionStatusMessageMapper.HEADER_HONO_TTD, "0"); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(validHeader).build(); @@ -147,7 +145,7 @@ public void doForwardMapWithValidUseCaseTtdZero() { @Test public void doForwardMapWithValidUseCaseTtdMinusOne() { // GIVEN - underTest.configure(mappingConfig, validMapperConfig); + underTest.configure(connectionContext, validMapperConfig); validHeader.put(ConnectionStatusMessageMapper.HEADER_HONO_TTD, "-1"); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(validHeader).build(); @@ -170,7 +168,7 @@ public void doForwardMapWithValidUseCaseTtdMinusOne() { @Test public void doForwardMapWithMissingHeaderTTD() { // GIVEN - underTest.configure(mappingConfig, validMapperConfig); + underTest.configure(connectionContext, validMapperConfig); final Map invalidHeader = validHeader; invalidHeader.remove(ConnectionStatusMessageMapper.HEADER_HONO_TTD); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(invalidHeader).build(); @@ -182,7 +180,7 @@ public void doForwardMapWithMissingHeaderTTD() { @Test public void doForwardMapWithMissingHeaderCreationTime() { - underTest.configure(mappingConfig, validMapperConfig); + underTest.configure(connectionContext, validMapperConfig); final Map invalidHeader = validHeader; invalidHeader.remove(ConnectionStatusMessageMapper.HEADER_HONO_CREATION_TIME); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(invalidHeader).build(); @@ -193,10 +191,11 @@ public void doForwardMapWithMissingHeaderCreationTime() { @Test public void doForwardMapWithMissingHeader() { final Map props = - Map.of(ConnectionStatusMessageMapper.MAPPING_OPTIONS_PROPERTIES_THING_ID, JsonValue.of("{{ header:thing-id }}")); + Map.of(ConnectionStatusMessageMapper.MAPPING_OPTIONS_PROPERTIES_THING_ID, + JsonValue.of("{{ header:thing-id }}")); final MessageMapperConfiguration thingIdWithPlaceholder = DefaultMessageMapperConfiguration.of("placeholder", props, validConditions, validConditions); - underTest.configure(mappingConfig, thingIdWithPlaceholder); + underTest.configure(connectionContext, thingIdWithPlaceholder); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(validHeader).build(); final List mappingResult = underTest.map(externalMessage); assertThat(mappingResult).isEmpty(); @@ -204,7 +203,7 @@ public void doForwardMapWithMissingHeader() { @Test public void doForwardMapWithInvalidHeaderTTD() { - underTest.configure(mappingConfig, validMapperConfig); + underTest.configure(connectionContext, validMapperConfig); final Map invalidHeader = validHeader; invalidHeader.replace(ConnectionStatusMessageMapper.HEADER_HONO_TTD, "Invalid Value"); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(invalidHeader).build(); @@ -214,7 +213,7 @@ public void doForwardMapWithInvalidHeaderTTD() { @Test public void doForwardMapWithInvalidHeaderCreationTime() { - underTest.configure(mappingConfig, validMapperConfig); + underTest.configure(connectionContext, validMapperConfig); final Map invalidHeader = validHeader; invalidHeader.replace(ConnectionStatusMessageMapper.HEADER_HONO_CREATION_TIME, "Invalid Value"); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(invalidHeader).build(); @@ -224,7 +223,7 @@ public void doForwardMapWithInvalidHeaderCreationTime() { @Test public void doForwardMapWithInvalidTTDValue() { - underTest.configure(mappingConfig, validMapperConfig); + underTest.configure(connectionContext, validMapperConfig); final Map invalidHeader = validHeader; final String invalidTTDValue = "-5625"; invalidHeader.replace(ConnectionStatusMessageMapper.HEADER_HONO_TTD, invalidTTDValue); @@ -236,7 +235,7 @@ public void doForwardMapWithInvalidTTDValue() { //Validate mapping context options @Test public void doForwardMappingContextWithoutFeatureId() { - underTest.configure(mappingConfig, validMapperConfig); + underTest.configure(connectionContext, validMapperConfig); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(validHeader).build(); final List mappingResult = underTest.map(externalMessage); assertThat(mappingResult.get(0).getPayload().getPath().getFeatureId()) @@ -247,10 +246,11 @@ public void doForwardMappingContextWithoutFeatureId() { public void doForwardMappingContextWithIndividualFeatureId() { final String individualFeatureId = "individualFeatureId"; final Map props = new HashMap<>(validConfigProps); - props.put(ConnectionStatusMessageMapper.MAPPING_OPTIONS_PROPERTIES_FEATURE_ID, JsonValue.of(individualFeatureId)); + props.put(ConnectionStatusMessageMapper.MAPPING_OPTIONS_PROPERTIES_FEATURE_ID, + JsonValue.of(individualFeatureId)); final MessageMapperConfiguration individualFeatureIdConfig = DefaultMessageMapperConfiguration.of("placeholder", props, validConditions, validConditions); - underTest.configure(mappingConfig, individualFeatureIdConfig); + underTest.configure(connectionContext, individualFeatureIdConfig); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(validHeader).build(); final List mappingResult = underTest.map(externalMessage); assertThat(mappingResult.get(0).getPayload().getPath().getFeatureId()) @@ -260,7 +260,7 @@ public void doForwardMappingContextWithIndividualFeatureId() { @Test public void doForwardMappingContextWithoutThingId() { exception.expect(MessageMapperConfigurationInvalidException.class); - underTest.configure(mappingConfig, + underTest.configure(connectionContext, DefaultMessageMapperConfiguration.of("valid", Collections.emptyMap(), validConditions, validConditions)); } @@ -269,19 +269,21 @@ public void doForwardMappingContextWithoutThingId() { public void doForwardMappingContextWithWrongThingId() { exception.expect(DittoRuntimeException.class); final Map props = - Map.of(ConnectionStatusMessageMapper.MAPPING_OPTIONS_PROPERTIES_THING_ID, JsonValue.of("Invalid Value")); + Map.of(ConnectionStatusMessageMapper.MAPPING_OPTIONS_PROPERTIES_THING_ID, + JsonValue.of("Invalid Value")); final MessageMapperConfiguration wrongThingId = DefaultMessageMapperConfiguration.of("invalidThingId", props, validConditions, validConditions); - underTest.configure(mappingConfig, wrongThingId); + underTest.configure(connectionContext, wrongThingId); } @Test public void doForwardMappingContextWithThingIdMapping() { final Map props = - Map.of(ConnectionStatusMessageMapper.MAPPING_OPTIONS_PROPERTIES_THING_ID, JsonValue.of("{{ header:device_id }}")); + Map.of(ConnectionStatusMessageMapper.MAPPING_OPTIONS_PROPERTIES_THING_ID, + JsonValue.of("{{ header:device_id }}")); final MessageMapperConfiguration thingIdWithPlaceholder = DefaultMessageMapperConfiguration.of("placeholder", props, validConditions, validConditions); - underTest.configure(mappingConfig, thingIdWithPlaceholder); + underTest.configure(connectionContext, thingIdWithPlaceholder); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(validHeader).build(); final List mappingResult = underTest.map(externalMessage); assertThat(mappingResult.get(0).getTopicPath().getEntityName()) @@ -291,10 +293,11 @@ public void doForwardMappingContextWithThingIdMapping() { @Test public void doForwardMappingContextWithWrongThingIdMapping() { final Map props = - Map.of(ConnectionStatusMessageMapper.MAPPING_OPTIONS_PROPERTIES_THING_ID, JsonValue.of("{{ header:device_id }}")); + Map.of(ConnectionStatusMessageMapper.MAPPING_OPTIONS_PROPERTIES_THING_ID, + JsonValue.of("{{ header:device_id }}")); final MessageMapperConfiguration thingIdWithPlaceholder = DefaultMessageMapperConfiguration.of("placeholder", props, validConditions, validConditions); - underTest.configure(mappingConfig, thingIdWithPlaceholder); + underTest.configure(connectionContext, thingIdWithPlaceholder); final Map invalidHeader = validHeader; invalidHeader.replace(HEADER_HONO_DEVICE_ID, "Invalid Value"); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(validHeader).build(); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/DefaultMessageMapperFactoryTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/DefaultMessageMapperFactoryTest.java index 5697ff9a3f..be94ef1f5d 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/DefaultMessageMapperFactoryTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/DefaultMessageMapperFactoryTest.java @@ -19,13 +19,11 @@ import java.util.List; import java.util.Map; -import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; import org.eclipse.ditto.connectivity.model.MappingContext; -import org.eclipse.ditto.connectivity.service.config.mapping.DefaultMappingConfig; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; import org.eclipse.ditto.connectivity.service.mapping.test.MappingContexts; import org.eclipse.ditto.connectivity.service.mapping.test.MockMapper; +import org.eclipse.ditto.connectivity.service.messaging.TestConstants; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -48,7 +46,7 @@ @RunWith(MockitoJUnitRunner.class) public final class DefaultMessageMapperFactoryTest { - private static MappingConfig mappingConfig; + private static ConnectionContext connectionContext; private static ActorSystem system; @Mock @@ -60,7 +58,8 @@ public final class DefaultMessageMapperFactoryTest { public static void initTestFixture() { final Config testConfig = ConfigFactory.parseMap( Collections.singletonMap("ditto.connectivity.mapping.dummy", "")); - mappingConfig = DefaultMappingConfig.of(testConfig.getConfig("ditto.connectivity")); + connectionContext = + DittoConnectionContext.of(TestConstants.createConnection(), TestConstants.CONNECTIVITY_CONFIG); system = ActorSystem.create("test", testConfig); } @@ -73,7 +72,7 @@ public static void shutDownActorSystem() { @Before public void setUp() { - underTest = DefaultMessageMapperFactory.of(ConnectionId.of("connectionId"), system, mappingConfig, log); + underTest = DefaultMessageMapperFactory.of(connectionContext, system, log); } @After diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/ImplicitThingCreationMessageMapperTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/ImplicitThingCreationMessageMapperTest.java index 52c9f6f96a..6c1359cee6 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/ImplicitThingCreationMessageMapperTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/ImplicitThingCreationMessageMapperTest.java @@ -24,25 +24,24 @@ import java.util.List; import java.util.Map; -import org.eclipse.ditto.connectivity.service.config.mapping.DefaultMappingConfig; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; -import org.eclipse.ditto.json.JsonObject; -import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.entitytag.EntityTagMatchers; +import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.connectivity.api.ExternalMessage; +import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationInvalidException; +import org.eclipse.ditto.connectivity.service.messaging.TestConstants; import org.eclipse.ditto.internal.models.placeholders.UnresolvedPlaceholderException; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.policies.model.PoliciesResourceType; import org.eclipse.ditto.policies.model.Policy; import org.eclipse.ditto.policies.model.SubjectIssuer; +import org.eclipse.ditto.protocol.Adaptable; +import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; import org.eclipse.ditto.things.model.Thing; import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.ThingsModelFactory; -import org.eclipse.ditto.protocol.Adaptable; -import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; -import org.eclipse.ditto.connectivity.api.ExternalMessage; -import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; -import org.eclipse.ditto.base.model.signals.Signal; import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse; import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingConflictException; import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing; @@ -50,8 +49,6 @@ import org.junit.Before; import org.junit.Test; -import com.typesafe.config.ConfigFactory; - /** * Unit test for {@link ImplicitThingCreationMessageMapper}. */ @@ -111,19 +108,20 @@ public final class ImplicitThingCreationMessageMapperTest { public static final String GATEWAY_ID = "headerNamespace:headerGatewayId"; public static final String DEVICE_ID = "headerNamespace:headerDeviceId"; - private static MappingConfig mappingConfig; + private static ConnectionContext connectionContext; private MessageMapper underTest; @Before public void setUp() { - mappingConfig = DefaultMappingConfig.of(ConfigFactory.empty()); + connectionContext = + DittoConnectionContext.of(TestConstants.createConnection(), TestConstants.CONNECTIVITY_CONFIG); underTest = new ImplicitThingCreationMessageMapper(); } @Test public void doForwardMappingContextWithCommandHeaderPlaceholder() { final Map headers = createValidHeaders(); - underTest.configure(mappingConfig, createMapperConfig(THING_TEMPLATE, COMMAND_HEADERS)); + underTest.configure(connectionContext, createMapperConfig(THING_TEMPLATE, COMMAND_HEADERS)); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(headers).build(); final List mappingResult = underTest.map(externalMessage); @@ -146,7 +144,7 @@ public void doForwardMappingContextWithCommandHeaderPlaceholder() { @Test public void doForwardMappingContextWithDeviceIdPlaceholder() { final Map headers = createValidHeaders(); - underTest.configure(mappingConfig, createMapperConfig(THING_TEMPLATE, COMMAND_HEADERS)); + underTest.configure(connectionContext, createMapperConfig(THING_TEMPLATE, COMMAND_HEADERS)); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(headers).build(); final List mappingResult = underTest.map(externalMessage); @@ -167,7 +165,7 @@ public void doForwardMappingContextWithDeviceIdPlaceholder() { @Test public void doForwardMappingContextWithPolicyPlaceholder() { final Map headers = createValidHeaders(); - underTest.configure(mappingConfig, createMapperConfig(THING_TEMPLATE_WITH_POLICY, COMMAND_HEADERS)); + underTest.configure(connectionContext, createMapperConfig(THING_TEMPLATE_WITH_POLICY, COMMAND_HEADERS)); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(headers).build(); final List mappingResult = underTest.map(externalMessage); @@ -187,7 +185,7 @@ public void doForwardMappingContextWithPolicyPlaceholder() { @Test public void doForwardMappingTwice() { - underTest.configure(mappingConfig, createMapperConfig(THING_TEMPLATE_WITH_POLICY, COMMAND_HEADERS)); + underTest.configure(connectionContext, createMapperConfig(THING_TEMPLATE_WITH_POLICY, COMMAND_HEADERS)); final Map headers1 = new HashMap<>(); headers1.put(HEADER_HONO_DEVICE_ID, "headerNamespace:headerDeviceId1"); @@ -232,7 +230,8 @@ public void doForwardMappingTwice() { @Test public void doForwardWithoutPlaceholders() { final Map headers = createValidHeaders(); - underTest.configure(mappingConfig, createMapperConfig(THING_TEMPLATE_WITHOUT_PLACEHOLDERS, COMMAND_HEADERS)); + underTest.configure(connectionContext, + createMapperConfig(THING_TEMPLATE_WITHOUT_PLACEHOLDERS, COMMAND_HEADERS)); final ExternalMessage externalMessage = ExternalMessageFactory.newExternalMessageBuilder(headers).build(); final List mappingResult = underTest.map(externalMessage); @@ -253,7 +252,7 @@ public void throwErrorIfMappingConfigIsMissing() { COMMAND_HEADERS); assertThatExceptionOfType(MessageMapperConfigurationInvalidException.class) - .isThrownBy(() -> underTest.configure(mappingConfig, invalidMapperConfig)); + .isThrownBy(() -> underTest.configure(connectionContext, invalidMapperConfig)); } @Test @@ -272,12 +271,12 @@ public void throwErrorIfThingIdIsMissingInConfig() { COMMAND_HEADERS); assertThatExceptionOfType(MessageMapperConfigurationInvalidException.class) - .isThrownBy(() -> underTest.configure(mappingConfig, invalidMapperConfig)); + .isThrownBy(() -> underTest.configure(connectionContext, invalidMapperConfig)); } @Test public void throwErrorIfHeaderForPlaceholderIsMissing() { - underTest.configure(mappingConfig, createMapperConfig(THING_TEMPLATE, COMMAND_HEADERS)); + underTest.configure(connectionContext, createMapperConfig(THING_TEMPLATE, COMMAND_HEADERS)); final Map missingEntityHeader = new HashMap<>(); missingEntityHeader.put(HEADER_HONO_DEVICE_ID, DEVICE_ID); @@ -291,7 +290,7 @@ public void throwErrorIfHeaderForPlaceholderIsMissing() { @Test public void throwExceptionOnErrorResponse() { - underTest.configure(mappingConfig, createMapperConfig(THING_TEMPLATE, COMMAND_HEADERS)); + underTest.configure(connectionContext, createMapperConfig(THING_TEMPLATE, COMMAND_HEADERS)); final ThingConflictException conflictException = ThingConflictException.newBuilder(ThingId.generateRandom()).build(); final Signal thingErrorResponse = ThingErrorResponse.of(conflictException); @@ -302,7 +301,7 @@ public void throwExceptionOnErrorResponse() { @Test public void regularCommandResponsesAreNotMapped() { - underTest.configure(mappingConfig, createMapperConfig(THING_TEMPLATE, COMMAND_HEADERS)); + underTest.configure(connectionContext, createMapperConfig(THING_TEMPLATE, COMMAND_HEADERS)); final Signal thingResponse = CreateThingResponse.of(Thing.newBuilder().setId(ThingId.generateRandom()).build(), DittoHeaders.empty()); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/NormalizedMessageMapperTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/NormalizedMessageMapperTest.java index 2a8477e5fa..bea0bc281e 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/NormalizedMessageMapperTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/NormalizedMessageMapperTest.java @@ -18,31 +18,34 @@ import java.util.Map; import org.assertj.core.api.Assertions; -import org.eclipse.ditto.connectivity.service.config.mapping.DefaultMappingConfig; +import org.eclipse.ditto.base.model.headers.DittoHeaders; +import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.connectivity.api.ExternalMessage; +import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; +import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig; +import org.eclipse.ditto.connectivity.service.messaging.TestConstants; +import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import org.eclipse.ditto.json.JsonFactory; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.json.JsonParseOptions; import org.eclipse.ditto.json.JsonPointer; import org.eclipse.ditto.json.JsonValue; -import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.messages.model.Message; import org.eclipse.ditto.messages.model.MessageDirection; import org.eclipse.ditto.messages.model.MessageHeaders; +import org.eclipse.ditto.messages.model.signals.commands.SendClaimMessage; import org.eclipse.ditto.policies.model.PolicyId; +import org.eclipse.ditto.protocol.Adaptable; +import org.eclipse.ditto.protocol.ProtocolFactory; +import org.eclipse.ditto.protocol.TopicPath; +import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; +import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; import org.eclipse.ditto.things.model.Attributes; import org.eclipse.ditto.things.model.Feature; import org.eclipse.ditto.things.model.Features; import org.eclipse.ditto.things.model.Thing; import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.ThingsModelFactory; -import org.eclipse.ditto.protocol.Adaptable; -import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; -import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; -import org.eclipse.ditto.protocol.ProtocolFactory; -import org.eclipse.ditto.protocol.TopicPath; -import org.eclipse.ditto.connectivity.api.ExternalMessage; -import org.eclipse.ditto.base.model.signals.Signal; -import org.eclipse.ditto.messages.model.signals.commands.SendClaimMessage; import org.eclipse.ditto.things.model.signals.commands.modify.DeleteThing; import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThingResponse; import org.eclipse.ditto.things.model.signals.events.AttributeDeleted; @@ -56,6 +59,7 @@ import org.junit.Before; import org.junit.Test; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; /** @@ -66,9 +70,16 @@ public final class NormalizedMessageMapperTest { private static final ProtocolAdapter ADAPTER = DittoProtocolAdapter.newInstance(); private MessageMapper underTest; + private ConnectionContext connectionContext; @Before public void setUp() { + final Config config = ConfigFactory.load("mapping-test") + .atKey("ditto.connectivity.mapping") + .withFallback(ConfigFactory.load("test")); + final ConnectivityConfig connectivityConfig = + DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(config)); + connectionContext = DittoConnectionContext.of(TestConstants.createConnection(), connectivityConfig); underTest = new NormalizedMessageMapper(); } @@ -299,7 +310,7 @@ public void withFieldSelection() { final Map options = Map.of(NormalizedMessageMapper.FIELDS, JsonValue.of( "_modified,_context/topic,_context/headers/content-type,nonexistent/json/pointer")); - underTest.configure(DefaultMappingConfig.of(ConfigFactory.load("mapping-test")), + underTest.configure(connectionContext, DefaultMessageMapperConfiguration.of("normalizer", options, Map.of(), Map.of())); final Adaptable adaptable = ADAPTER.toAdaptable(event); @@ -331,9 +342,8 @@ public void withFullThingPayloadFieldSelection() { final Map options = Map.of(NormalizedMessageMapper.FIELDS, JsonValue.of( "thingId,policyId,attributes,features,_modified,_revision,_context(topic,path)," + "_context/headers/correlation-id")); - underTest.configure(DefaultMappingConfig.of(ConfigFactory.load("mapping-test")), - DefaultMessageMapperConfiguration.of("normalizer", - options, Map.of(), Map.of())); + underTest.configure(connectionContext, + DefaultMessageMapperConfiguration.of("normalizer", options, Map.of(), Map.of())); final Adaptable adaptable = ADAPTER.toAdaptable(event); @@ -361,9 +371,8 @@ public void withExtraFieldsBeingIncluded() { final Map options = Map.of(NormalizedMessageMapper.FIELDS, JsonValue.of( "thingId,policyId,attributes/foo,features,_modified,_revision")); - underTest.configure(DefaultMappingConfig.of(ConfigFactory.load("mapping-test")), - DefaultMessageMapperConfiguration.of("normalizer", - options, Map.of(), Map.of())); + underTest.configure(connectionContext, + DefaultMessageMapperConfiguration.of("normalizer", options, Map.of(), Map.of())); final Thing thing = ThingsModelFactory.newThingBuilder() .setId(thingId) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/RawMessageMapperTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/RawMessageMapperTest.java index c73b77deae..88d8420fe6 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/RawMessageMapperTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/RawMessageMapperTest.java @@ -22,33 +22,31 @@ import javax.annotation.Nullable; -import org.eclipse.ditto.connectivity.service.config.mapping.DefaultMappingConfig; -import org.eclipse.ditto.json.JsonObject; -import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.base.model.common.HttpStatus; import org.eclipse.ditto.base.model.headers.DittoHeaders; +import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.connectivity.api.ExternalMessage; +import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; +import org.eclipse.ditto.connectivity.service.messaging.TestConstants; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.messages.model.Message; import org.eclipse.ditto.messages.model.MessageBuilder; import org.eclipse.ditto.messages.model.MessageDirection; import org.eclipse.ditto.messages.model.MessageFormatInvalidException; import org.eclipse.ditto.messages.model.MessagesModelFactory; -import org.eclipse.ditto.things.model.ThingId; +import org.eclipse.ditto.messages.model.signals.commands.SendFeatureMessageResponse; +import org.eclipse.ditto.messages.model.signals.commands.SendThingMessage; import org.eclipse.ditto.protocol.Adaptable; +import org.eclipse.ditto.protocol.ProtocolFactory; import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; -import org.eclipse.ditto.protocol.ProtocolFactory; -import org.eclipse.ditto.connectivity.api.ExternalMessage; -import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; -import org.eclipse.ditto.base.model.signals.Signal; -import org.eclipse.ditto.messages.model.signals.commands.SendFeatureMessageResponse; -import org.eclipse.ditto.messages.model.signals.commands.SendThingMessage; +import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.signals.commands.modify.DeleteThingResponse; import org.eclipse.ditto.things.model.signals.events.ThingDeleted; import org.junit.Before; import org.junit.Test; -import com.typesafe.config.ConfigFactory; - /** * Tests {@link RawMessageMapper}. */ @@ -58,10 +56,13 @@ public final class RawMessageMapperTest { private static final ProtocolAdapter ADAPTER = DittoProtocolAdapter.newInstance(); private MessageMapper underTest; + private ConnectionContext connectionContext; @Before public void setUp() { underTest = new RawMessageMapper(); + connectionContext = + DittoConnectionContext.of(TestConstants.createConnection(), TestConstants.CONNECTIVITY_CONFIG); } @Test @@ -139,7 +140,8 @@ public void mapFromTextMessageWithBinaryContentType() { @Test public void mapFromNonMessageCommand() { - final Signal signal = ThingDeleted.of(ThingId.of("thing:id"), 25L, Instant.EPOCH, DittoHeaders.empty(), null); + final Signal signal = + ThingDeleted.of(ThingId.of("thing:id"), 25L, Instant.EPOCH, DittoHeaders.empty(), null); final Adaptable adaptable = ADAPTER.toAdaptable(signal); final List actualExternalMessages = underTest.map(adaptable); final List expectedExternalMessages = new DittoMessageMapper().map(adaptable); @@ -225,7 +227,7 @@ public void mapToTextMessageWithContentTypeOverride() { "ditto-message-thing-id", "thing:id" ); final String payload = "lorem ipsum dolor sit amet"; - underTest.configure(DefaultMappingConfig.of(ConfigFactory.empty()), + underTest.configure(connectionContext, DefaultMessageMapperConfiguration.of("RawMessage", Map.of("incomingMessageHeaders", JsonObject.newBuilder() .set("content-type", "text/plain") @@ -251,7 +253,7 @@ public void mapToBinaryMessageWithContentTypeOverride() { "ditto-message-thing-id", "thing:id" ); final String payload = "lorem ipsum dolor sit amet"; - underTest.configure(DefaultMappingConfig.of(ConfigFactory.empty()), + underTest.configure(connectionContext, DefaultMessageMapperConfiguration.of("RawMessage", Map.of("incomingMessageHeaders", JsonObject.newBuilder() .set("content-type", "application/octet-stream") diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/WrappingMessageMapperTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/WrappingMessageMapperTest.java index 216cc53c2f..f7f3e96f5c 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/WrappingMessageMapperTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/WrappingMessageMapperTest.java @@ -26,8 +26,10 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.connectivity.api.ExternalMessage; import org.eclipse.ditto.connectivity.model.MessageMappingFailedException; -import org.eclipse.ditto.connectivity.service.config.mapping.DefaultMappingConfig; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; +import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; +import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig; +import org.eclipse.ditto.connectivity.service.messaging.TestConstants; +import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.protocol.Adaptable; import org.eclipse.ditto.protocol.ProtocolFactory; @@ -37,6 +39,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; /** @@ -52,7 +55,7 @@ public class WrappingMessageMapperTest { private ExternalMessage mockMessage; private Adaptable mockAdaptable; - private MappingConfig mapperLimitsConfig; + private ConnectionContext connectionContext; @Rule public final ExpectedException exception = ExpectedException.none(); @@ -73,7 +76,12 @@ public void setUp() { when(mockAdaptable.getPayload()).thenReturn(ProtocolFactory.newPayload("{\"path\":\"/\"}")); when(mockMessage.getInternalHeaders()).thenReturn(DittoHeaders.empty()); - mapperLimitsConfig = DefaultMappingConfig.of(ConfigFactory.load("mapping-test")); + final Config config = ConfigFactory.load("mapping-test") + .atKey("ditto.connectivity.mapping") + .withFallback(ConfigFactory.load("test")); + final ConnectivityConfig connectivityConfig = + DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(config)); + connectionContext = DittoConnectionContext.of(TestConstants.createConnection(), connectivityConfig); underTest = WrappingMessageMapper.wrap(mockMapper); } @@ -81,13 +89,13 @@ public void setUp() { public void configure() { when(mockConfiguration.getContentTypeBlocklist()).thenReturn(Collections.singletonList("blockedContentType")); - underTest.configure(mapperLimitsConfig, mockConfiguration); - verify(mockMapper).configure(mapperLimitsConfig, mockConfiguration); + underTest.configure(connectionContext, mockConfiguration); + verify(mockMapper).configure(connectionContext, mockConfiguration); } @Test public void mapMessage() { - underTest.configure(mapperLimitsConfig, mockConfiguration); + underTest.configure(connectionContext, mockConfiguration); underTest.map(mockMessage); verify(mockMapper).map(any(ExternalMessage.class)); verify(mockMapper).map(any(ExternalMessage.class)); @@ -98,7 +106,7 @@ public void mapAdaptable() { final var headers = DittoHeaders.newBuilder().contentType("contentType").build(); when(mockAdaptable.getDittoHeaders()).thenReturn(headers); - underTest.configure(mapperLimitsConfig, mockConfiguration); + underTest.configure(connectionContext, mockConfiguration); underTest.map(mockAdaptable); verify(mockMapper).map(mockAdaptable); } @@ -107,10 +115,13 @@ public void mapAdaptable() { public void mapMessageWithInvalidNumberOfMessages() { exception.expect(MessageMappingFailedException.class); final List adaptables = listOfElements(mockAdaptable, - mapperLimitsConfig.getMapperLimitsConfig().getMaxMappedInboundMessages()); + connectionContext.getConnectivityConfig() + .getMappingConfig() + .getMapperLimitsConfig() + .getMaxMappedInboundMessages()); when(mockMapper.map(any(ExternalMessage.class))).thenReturn(adaptables); - underTest.configure(mapperLimitsConfig, mockConfiguration); + underTest.configure(connectionContext, mockConfiguration); underTest.map(mockMessage); } @@ -119,10 +130,13 @@ public void mapAdaptableWithInvalidNumberOfMessages() { exception.expect(MessageMappingFailedException.class); final List externalMessages = listOfElements(mockMessage, - mapperLimitsConfig.getMapperLimitsConfig().getMaxMappedOutboundMessages()); + connectionContext.getConnectivityConfig() + .getMappingConfig() + .getMapperLimitsConfig() + .getMaxMappedOutboundMessages()); when(mockMapper.map(any(Adaptable.class))).thenReturn(externalMessages); - underTest.configure(mapperLimitsConfig, mockConfiguration); + underTest.configure(connectionContext, mockConfiguration); underTest.map(mockAdaptable); } diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/JavaScriptMessageMapperRhinoSandboxingTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/JavaScriptMessageMapperRhinoSandboxingTest.java index 40fec0076f..f12e3e1f08 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/JavaScriptMessageMapperRhinoSandboxingTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/JavaScriptMessageMapperRhinoSandboxingTest.java @@ -20,13 +20,20 @@ import org.assertj.core.api.Assertions; import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationFailedException; import org.eclipse.ditto.connectivity.model.MessageMappingFailedException; +import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; +import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig; import org.eclipse.ditto.connectivity.service.config.mapping.DefaultMappingConfig; import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; import org.eclipse.ditto.connectivity.service.mapping.MessageMapper; import org.eclipse.ditto.connectivity.api.ExternalMessage; import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; +import org.eclipse.ditto.connectivity.service.messaging.TestConstants; +import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import org.junit.Test; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; /** @@ -92,14 +99,18 @@ public void ensureTooBigMappingScriptIsNotLoaded() { private static MessageMapper createMapper(final String maliciousStuff) { final MessageMapper mapper = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - final MappingConfig mappingConfig = - DefaultMappingConfig.of(ConfigFactory.parseString("javascript {\n" + + final Config config = + ConfigFactory.parseString("ditto.connectivity.mapping.javascript {\n" + " maxScriptSizeBytes = 50000 # 50kB\n" + " maxScriptExecutionTime = 500ms\n" + " maxScriptStackDepth = 10\n" + - " }")); + " }").withFallback(ConfigFactory.load("test")); + final ConnectivityConfig connectivityConfig = + DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped(config)); + final ConnectionContext connectionContext = + DittoConnectionContext.of(TestConstants.createConnection(), connectivityConfig); - mapper.configure(mappingConfig, + mapper.configure(connectionContext, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("malicious", Collections.emptyMap()) .incomingScript(getMappingWrapperScript(maliciousStuff)) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/JavaScriptMessageMapperRhinoTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/JavaScriptMessageMapperRhinoTest.java index 6f13c9347b..53566915ea 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/JavaScriptMessageMapperRhinoTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/JavaScriptMessageMapperRhinoTest.java @@ -37,9 +37,12 @@ import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.signals.announcements.ConnectionOpenedAnnouncement; -import org.eclipse.ditto.connectivity.service.config.mapping.DefaultMappingConfig; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; +import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; import org.eclipse.ditto.connectivity.service.mapping.MessageMapper; +import org.eclipse.ditto.connectivity.service.messaging.TestConstants; +import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import org.eclipse.ditto.json.JsonFactory; import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.json.JsonPointer; @@ -86,15 +89,20 @@ public final class JavaScriptMessageMapperRhinoTest { private static final ByteBuffer MAPPING_INCOMING_PAYLOAD_BYTES = ByteBuffer.wrap( MAPPING_INCOMING_PAYLOAD_STRING.getBytes(StandardCharsets.UTF_8)); - private static final MappingConfig MAPPING_CONFIG = - DefaultMappingConfig.of(ConfigFactory.parseString( - "mapping {\n" + - " javascript {\n" + - " maxScriptSizeBytes = 50000 # 50kB\n" + - " maxScriptExecutionTime = 500ms\n" + - " maxScriptStackDepth = 10\n" + - " }\n" + - "}")); + private static final ConnectionContext CONNECTION_CONTEXT = DittoConnectionContext.of( + TestConstants.createConnection(), + DittoConnectivityConfig.of(DefaultScopedConfig.dittoScoped( + ConfigFactory.parseString( + "mapping {\n" + + " javascript {\n" + + " maxScriptSizeBytes = 50000 # 50kB\n" + + " maxScriptExecutionTime = 500ms\n" + + " maxScriptStackDepth = 10\n" + + " }\n" + + "}") + .atKey("ditto.connectivity") + .withFallback(ConfigFactory.load("test")) + ))); private static final String MAPPING_INCOMING_PLAIN = "function mapToDittoProtocolMsg(\n" + @@ -363,11 +371,13 @@ public final class JavaScriptMessageMapperRhinoTest { " );\n" + "}"; - private static final String MAPPING_INCOMING_DEFAULT = new BufferedReader(new InputStreamReader(JavaScriptMessageMapperRhinoTest.class.getResourceAsStream(JavaScriptMessageMapperRhino.INCOMING_SCRIPT))) + private static final String MAPPING_INCOMING_DEFAULT = new BufferedReader(new InputStreamReader( + JavaScriptMessageMapperRhinoTest.class.getResourceAsStream(JavaScriptMessageMapperRhino.INCOMING_SCRIPT))) .lines() .collect(Collectors.joining("\n")); - private static final String MAPPING_OUTGOING_DEFAULT = new BufferedReader(new InputStreamReader(JavaScriptMessageMapperRhinoTest.class.getResourceAsStream(JavaScriptMessageMapperRhino.OUTGOING_SCRIPT))) + private static final String MAPPING_OUTGOING_DEFAULT = new BufferedReader(new InputStreamReader( + JavaScriptMessageMapperRhinoTest.class.getResourceAsStream(JavaScriptMessageMapperRhino.OUTGOING_SCRIPT))) .lines() .collect(Collectors.joining("\n")); @@ -384,7 +394,7 @@ public final class JavaScriptMessageMapperRhinoTest { @BeforeClass public static void setup() { javaScriptRhinoMapperNoop = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperNoop.configure(MAPPING_CONFIG, + javaScriptRhinoMapperNoop.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("noop", Collections.emptyMap()) .incomingScript("") @@ -393,7 +403,7 @@ public static void setup() { ); javaScriptRhinoMapperPlain = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperPlain.configure(MAPPING_CONFIG, + javaScriptRhinoMapperPlain.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("plain", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_PLAIN) @@ -402,7 +412,7 @@ public static void setup() { ); javaScriptRhinoMapperPlainWithStatus = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperPlainWithStatus.configure(MAPPING_CONFIG, + javaScriptRhinoMapperPlainWithStatus.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("plainStatus", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_WITH_STATUS) @@ -412,7 +422,7 @@ public static void setup() { javaScriptRhinoMapperPlainWithStatusAndExtra = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperPlainWithStatusAndExtra.configure(MAPPING_CONFIG, + javaScriptRhinoMapperPlainWithStatusAndExtra.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("plainStatus", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_WITH_STATUS_AND_EXTRA) @@ -421,7 +431,7 @@ public static void setup() { ); javaScriptRhinoMapperEmpty = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperEmpty.configure(MAPPING_CONFIG, + javaScriptRhinoMapperEmpty.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("empty", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_EMPTY) @@ -430,7 +440,7 @@ public static void setup() { ); javaScriptRhinoMapperBinary = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperBinary.configure(MAPPING_CONFIG, + javaScriptRhinoMapperBinary.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("binary", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_BINARY) @@ -439,7 +449,7 @@ public static void setup() { ); javaScriptRhinoMapperChannelAsValue = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperChannelAsValue.configure(MAPPING_CONFIG, + javaScriptRhinoMapperChannelAsValue.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("channelAsValue", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_BINARY) @@ -448,7 +458,7 @@ public static void setup() { ); javaScriptRhinoMapperDefault = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperDefault.configure(MAPPING_CONFIG, + javaScriptRhinoMapperDefault.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("default", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_DEFAULT) @@ -516,7 +526,8 @@ public void testDefaultJavascriptIncomingMappingForDittoProtocol() { final Adaptable mappedAdaptable = adaptables.get(0); System.out.println(mappedAdaptable); System.out.println( - "testDefaultJavascriptIncomingMappingForDittoProtocol Duration: " + (System.nanoTime() - startTs) / 1000000.0 + "ms"); + "testDefaultJavascriptIncomingMappingForDittoProtocol Duration: " + + (System.nanoTime() - startTs) / 1000000.0 + "ms"); assertThat(mappedAdaptable).isEqualTo(jsonifiableInputAdaptable); } @@ -539,7 +550,8 @@ public void testDefaultJavascriptIncomingMappingForByteDittoProtocol() { final Adaptable inputAdaptable = DittoProtocolAdapter.newInstance().toAdaptable(modifyAttribute); final JsonifiableAdaptable jsonifiableInputAdaptable = ProtocolFactory.wrapAsJsonifiableAdaptable(inputAdaptable); - final ByteBuffer bytes = ByteBuffer.wrap(jsonifiableInputAdaptable.toJsonString().getBytes(StandardCharsets.UTF_8)); + final ByteBuffer bytes = + ByteBuffer.wrap(jsonifiableInputAdaptable.toJsonString().getBytes(StandardCharsets.UTF_8)); final ExternalMessage message = ExternalMessageFactory.newExternalMessageBuilder(headers) .withBytes(bytes) .build(); @@ -549,7 +561,8 @@ public void testDefaultJavascriptIncomingMappingForByteDittoProtocol() { final Adaptable mappedAdaptable = adaptables.get(0); System.out.println(mappedAdaptable); System.out.println( - "testDefaultJavascriptIncomingMappingForByteDittoProtocol Duration: " + (System.nanoTime() - startTs) / 1000000.0 + "ms"); + "testDefaultJavascriptIncomingMappingForByteDittoProtocol Duration: " + + (System.nanoTime() - startTs) / 1000000.0 + "ms"); assertThat(mappedAdaptable).isEqualTo(jsonifiableInputAdaptable); } @@ -681,8 +694,9 @@ public void testDefaultJavascriptOutgoingMappingWithStatus() { public void testDefaultJavascriptOutgoingMappingForPolicyAnnouncements() { final PolicyId policyId = PolicyId.of("org.eclipse.ditto:foo-bar-policy"); final String correlationId = UUID.randomUUID().toString(); - final SubjectDeletionAnnouncement announcement = SubjectDeletionAnnouncement.of(policyId, Instant.now(), List.of(), - DittoHeaders.newBuilder().correlationId(correlationId).build()); + final SubjectDeletionAnnouncement announcement = + SubjectDeletionAnnouncement.of(policyId, Instant.now(), List.of(), + DittoHeaders.newBuilder().correlationId(correlationId).build()); final Adaptable adaptable = DittoProtocolAdapter.newInstance().toAdaptable(announcement); final JsonifiableAdaptable jsonifiableAdaptable = ProtocolFactory.wrapAsJsonifiableAdaptable(adaptable); @@ -691,7 +705,8 @@ public void testDefaultJavascriptOutgoingMappingForPolicyAnnouncements() { final long startTs = System.nanoTime(); System.out.println( - "testDefaultJavascriptOutgoingMappingForPolicyAnnouncements Duration: " + (System.nanoTime() - startTs) / 1_000_000.0 + + "testDefaultJavascriptOutgoingMappingForPolicyAnnouncements Duration: " + + (System.nanoTime() - startTs) / 1_000_000.0 + "ms"); assertThat(rawMessage.findContentType()).contains(DittoConstants.DITTO_PROTOCOL_CONTENT_TYPE); @@ -699,7 +714,8 @@ public void testDefaultJavascriptOutgoingMappingForPolicyAnnouncements() { assertThat(rawMessage.isTextMessage()).isTrue(); final String textPayload = rawMessage.getTextPayload().get(); final JsonObject jsonPayload = JsonFactory.readFrom(textPayload).asObject(); - DittoJsonAssertions.assertThat(jsonPayload).isEqualToIgnoringFieldDefinitions(jsonifiableAdaptable.toJson()); + DittoJsonAssertions.assertThat(jsonPayload) + .isEqualToIgnoringFieldDefinitions(jsonifiableAdaptable.toJson()); }); } @@ -708,7 +724,8 @@ public void testDefaultJavascriptOutgoingMappingForConnectionAnnouncements() { final ConnectionId connectionId = ConnectionId.of("foo-bar-connection"); final String correlationId = UUID.randomUUID().toString(); final ConnectionOpenedAnnouncement announcement = - ConnectionOpenedAnnouncement.of(connectionId, Instant.now(), DittoHeaders.newBuilder().correlationId(correlationId).build()); + ConnectionOpenedAnnouncement.of(connectionId, Instant.now(), + DittoHeaders.newBuilder().correlationId(correlationId).build()); final Adaptable adaptable = DittoProtocolAdapter.newInstance().toAdaptable(announcement); final JsonifiableAdaptable jsonifiableAdaptable = ProtocolFactory.wrapAsJsonifiableAdaptable(adaptable); @@ -717,7 +734,8 @@ public void testDefaultJavascriptOutgoingMappingForConnectionAnnouncements() { final long startTs = System.nanoTime(); System.out.println( - "testDefaultJavascriptOutgoingMappingForConnectionAnnouncements Duration: " + (System.nanoTime() - startTs) / 1_000_000.0 + + "testDefaultJavascriptOutgoingMappingForConnectionAnnouncements Duration: " + + (System.nanoTime() - startTs) / 1_000_000.0 + "ms"); assertThat(rawMessage.findContentType()).contains(DittoConstants.DITTO_PROTOCOL_CONTENT_TYPE); @@ -725,7 +743,8 @@ public void testDefaultJavascriptOutgoingMappingForConnectionAnnouncements() { assertThat(rawMessage.isTextMessage()).isTrue(); final String textPayload = rawMessage.getTextPayload().get(); final JsonObject jsonPayload = JsonFactory.readFrom(textPayload).asObject(); - DittoJsonAssertions.assertThat(jsonPayload).isEqualToIgnoringFieldDefinitions(jsonifiableAdaptable.toJson()); + DittoJsonAssertions.assertThat(jsonPayload) + .isEqualToIgnoringFieldDefinitions(jsonifiableAdaptable.toJson()); }); } @@ -978,8 +997,9 @@ private static String byteBuffer2String(@Nullable final ByteBuffer buf) { public void testCorrectChannelForPolicyAnnouncementsJavascriptOutgoingMapping() { final PolicyId policyId = PolicyId.of("org.eclipse.ditto:foo-bar-policy"); final String correlationId = UUID.randomUUID().toString(); - final SubjectDeletionAnnouncement announcement = SubjectDeletionAnnouncement.of(policyId, Instant.now(), List.of(), - DittoHeaders.newBuilder().correlationId(correlationId).build()); + final SubjectDeletionAnnouncement announcement = + SubjectDeletionAnnouncement.of(policyId, Instant.now(), List.of(), + DittoHeaders.newBuilder().correlationId(correlationId).build()); final Adaptable adaptable = DittoProtocolAdapter.newInstance().toAdaptable(announcement); assertThat(javaScriptRhinoMapperChannelAsValue.map(adaptable)).allSatisfy(rawMessage -> { @@ -987,7 +1007,8 @@ public void testCorrectChannelForPolicyAnnouncementsJavascriptOutgoingMapping() final long startTs = System.nanoTime(); System.out.println( - "testCorrectChannelForPolicyAnnouncementsJavascriptOutgoingMapping Duration: " + (System.nanoTime() - startTs) / 1_000_000.0 + + "testCorrectChannelForPolicyAnnouncementsJavascriptOutgoingMapping Duration: " + + (System.nanoTime() - startTs) / 1_000_000.0 + "ms"); assertThat(rawMessage.findContentType()).contains(CONTENT_TYPE_PLAIN); @@ -1003,7 +1024,8 @@ public void testCorrectChannelForConnectionAnnouncementsJavascriptOutgoingMappin final ConnectionId connectionId = ConnectionId.of("foo-bar-connection"); final String correlationId = UUID.randomUUID().toString(); final ConnectionOpenedAnnouncement announcement = - ConnectionOpenedAnnouncement.of(connectionId, Instant.now(), DittoHeaders.newBuilder().correlationId(correlationId).build()); + ConnectionOpenedAnnouncement.of(connectionId, Instant.now(), + DittoHeaders.newBuilder().correlationId(correlationId).build()); final Adaptable adaptable = DittoProtocolAdapter.newInstance().toAdaptable(announcement); assertThat(javaScriptRhinoMapperChannelAsValue.map(adaptable)).allSatisfy(rawMessage -> { @@ -1011,7 +1033,8 @@ public void testCorrectChannelForConnectionAnnouncementsJavascriptOutgoingMappin final long startTs = System.nanoTime(); System.out.println( - "testCorrectChannelForConnectionAnnouncementsJavascriptOutgoingMapping Duration: " + (System.nanoTime() - startTs) / 1_000_000.0 + + "testCorrectChannelForConnectionAnnouncementsJavascriptOutgoingMapping Duration: " + + (System.nanoTime() - startTs) / 1_000_000.0 + "ms"); assertThat(rawMessage.findContentType()).contains(CONTENT_TYPE_PLAIN); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/MapToDittoProtocolScenario.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/MapToDittoProtocolScenario.java index b169eaa126..055d1f606d 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/MapToDittoProtocolScenario.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/MapToDittoProtocolScenario.java @@ -12,10 +12,13 @@ */ package org.eclipse.ditto.connectivity.service.mapping.javascript.benchmark; -import org.eclipse.ditto.connectivity.service.config.mapping.DefaultMappingConfig; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; -import org.eclipse.ditto.connectivity.service.mapping.MessageMapper; import org.eclipse.ditto.connectivity.api.ExternalMessage; +import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; +import org.eclipse.ditto.connectivity.service.mapping.MessageMapper; +import org.eclipse.ditto.connectivity.service.messaging.TestConstants; +import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import com.typesafe.config.ConfigFactory; @@ -24,12 +27,14 @@ */ public interface MapToDittoProtocolScenario { - MappingConfig MAPPING_CONFIG = - DefaultMappingConfig.of(ConfigFactory.parseString("javascript {\n" + - " maxScriptSizeBytes = 50000 # 50kB\n" + - " maxScriptExecutionTime = 500ms\n" + - " maxScriptStackDepth = 10\n" + - " }")); + ConnectionContext CONNECTION_CONTEXT = + DittoConnectionContext.of(TestConstants.createConnection(), DittoConnectivityConfig.of( + DefaultScopedConfig.dittoScoped(ConfigFactory.parseString("javascript {\n" + + " maxScriptSizeBytes = 50000 # 50kB\n" + + " maxScriptExecutionTime = 500ms\n" + + " maxScriptStackDepth = 10\n" + + " }").atKey("ditto.connectivity.mapping") + .withFallback(ConfigFactory.load("test"))))); MessageMapper getMessageMapper(); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/SimpleMapTextPayloadToDitto.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/SimpleMapTextPayloadToDitto.java index 36ba422150..f54f4159a2 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/SimpleMapTextPayloadToDitto.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/SimpleMapTextPayloadToDitto.java @@ -17,10 +17,10 @@ import java.util.Map; import java.util.UUID; -import org.eclipse.ditto.connectivity.service.mapping.MessageMapper; -import org.eclipse.ditto.connectivity.service.mapping.javascript.JavaScriptMessageMapperFactory; import org.eclipse.ditto.connectivity.api.ExternalMessage; import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; +import org.eclipse.ditto.connectivity.service.mapping.MessageMapper; +import org.eclipse.ditto.connectivity.service.mapping.javascript.JavaScriptMessageMapperFactory; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; @@ -81,7 +81,7 @@ public SimpleMapTextPayloadToDitto() { public MessageMapper getMessageMapper() { final MessageMapper javaScriptRhinoMapperPlain = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperPlain.configure(MAPPING_CONFIG, + javaScriptRhinoMapperPlain.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("text", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_PLAIN) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test1DecodeBinaryPayloadToDitto.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test1DecodeBinaryPayloadToDitto.java index 2f682e7443..dccc41504a 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test1DecodeBinaryPayloadToDitto.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test1DecodeBinaryPayloadToDitto.java @@ -95,7 +95,7 @@ public Test1DecodeBinaryPayloadToDitto() { public MessageMapper getMessageMapper() { final MessageMapper javaScriptRhinoMapperPlain = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperPlain.configure(MAPPING_CONFIG, + javaScriptRhinoMapperPlain.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("binary", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_PLAIN) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test2ParseJsonPayloadToDitto.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test2ParseJsonPayloadToDitto.java index 6494486881..d3ea6cc760 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test2ParseJsonPayloadToDitto.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test2ParseJsonPayloadToDitto.java @@ -140,7 +140,7 @@ public Test2ParseJsonPayloadToDitto() { public MessageMapper getMessageMapper() { final MessageMapper javaScriptRhinoMapperPlain = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperPlain.configure(MAPPING_CONFIG, + javaScriptRhinoMapperPlain.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("ditto", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_PLAIN) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test3FormatJsonPayloadToDitto.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test3FormatJsonPayloadToDitto.java index 5944f3f3f2..e7b1a5968f 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test3FormatJsonPayloadToDitto.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test3FormatJsonPayloadToDitto.java @@ -139,7 +139,7 @@ public Test3FormatJsonPayloadToDitto() { public MessageMapper getMessageMapper() { final MessageMapper javaScriptRhinoMapperPlain = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperPlain.configure(MAPPING_CONFIG, + javaScriptRhinoMapperPlain.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("format", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_PLAIN) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test4ConstructJsonPayloadToDitto.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test4ConstructJsonPayloadToDitto.java index df7003ad70..4db5c6f174 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test4ConstructJsonPayloadToDitto.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test4ConstructJsonPayloadToDitto.java @@ -93,7 +93,7 @@ public Test4ConstructJsonPayloadToDitto() { public MessageMapper getMessageMapper() { final MessageMapper javaScriptRhinoMapperPlain = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperPlain.configure(MAPPING_CONFIG, + javaScriptRhinoMapperPlain.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("construct", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_PLAIN) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test5DecodeBinaryToDitto.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test5DecodeBinaryToDitto.java index a5e9bb8a9e..c7377d7a35 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test5DecodeBinaryToDitto.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/javascript/benchmark/Test5DecodeBinaryToDitto.java @@ -88,7 +88,7 @@ public Test5DecodeBinaryToDitto() { public MessageMapper getMessageMapper() { final MessageMapper javaScriptRhinoMapperPlain = JavaScriptMessageMapperFactory.createJavaScriptMessageMapperRhino(); - javaScriptRhinoMapperPlain.configure(MAPPING_CONFIG, + javaScriptRhinoMapperPlain.configure(CONNECTION_CONTEXT, JavaScriptMessageMapperFactory .createJavaScriptMessageMapperConfigurationBuilder("decode", Collections.emptyMap()) .incomingScript(MAPPING_INCOMING_PLAIN) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/test/MockMapper.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/test/MockMapper.java index 82ab0ea6f6..084d8a422f 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/test/MockMapper.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/mapping/test/MockMapper.java @@ -21,13 +21,13 @@ import javax.annotation.Nonnull; +import org.eclipse.ditto.connectivity.api.ExternalMessage; import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationInvalidException; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; import org.eclipse.ditto.connectivity.service.mapping.MessageMapper; import org.eclipse.ditto.connectivity.service.mapping.MessageMapperConfiguration; import org.eclipse.ditto.connectivity.service.mapping.PayloadMapper; import org.eclipse.ditto.protocol.Adaptable; -import org.eclipse.ditto.connectivity.api.ExternalMessage; @PayloadMapper(alias = "test") public final class MockMapper implements MessageMapper { @@ -49,7 +49,7 @@ public Collection getContentTypeBlocklist() { } @Override - public void configure(@Nonnull final MappingConfig mappingConfig, + public void configure(@Nonnull final ConnectionContext connectionContext, @Nonnull final MessageMapperConfiguration configuration) { configuration.findProperty(OPT_IS_VALID).map(Boolean::valueOf).filter(Boolean.TRUE::equals).orElseThrow diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractConsumerActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractConsumerActorTest.java index 834d548ad2..840c72ca4d 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractConsumerActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractConsumerActorTest.java @@ -32,6 +32,10 @@ import org.eclipse.ditto.base.model.common.HttpStatus; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; +import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; +import org.eclipse.ditto.base.model.signals.acks.AcknowledgementRequestTimeoutException; +import org.eclipse.ditto.connectivity.api.OutboundSignal; import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectionSignalIdEnforcementFailedException; @@ -41,15 +45,12 @@ import org.eclipse.ditto.connectivity.model.PayloadMapping; import org.eclipse.ditto.connectivity.model.PayloadMappingDefinition; import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; import org.eclipse.ditto.connectivity.service.mapping.DittoMessageMapper; -import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; -import org.eclipse.ditto.connectivity.api.OutboundSignal; import org.eclipse.ditto.internal.models.placeholders.UnresolvedPlaceholderException; import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider; -import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; -import org.eclipse.ditto.base.model.signals.acks.AcknowledgementRequestTimeoutException; -import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException; import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingUnavailableException; import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThing; @@ -79,7 +80,8 @@ public abstract class AbstractConsumerActorTest { private static final Connection CONNECTION = TestConstants.createConnection(); private static final ConnectionId CONNECTION_ID = CONNECTION.getId(); private static final FiniteDuration ONE_SECOND = FiniteDuration.apply(1, TimeUnit.SECONDS); - protected static final Map.Entry REPLY_TO_HEADER = TestConstants.header("reply-to", "reply-to-address"); + protected static final Map.Entry REPLY_TO_HEADER = + TestConstants.header("reply-to", "reply-to-address"); protected static final Enforcement ENFORCEMENT = ConnectivityModelFactory.newEnforcement("{{ header:device_id }}", "{{ thing:id }}"); @@ -366,18 +368,12 @@ private ActorRef setupMessageMappingProcessorActor(final ActorRef clientActor, f when(logger.withCorrelationId(Mockito.any(WithDittoHeaders.class))) .thenReturn(logger); final ProtocolAdapter protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null); - final InboundMappingProcessor inboundMappingProcessor = InboundMappingProcessor.of(CONNECTION_ID, - CONNECTION.getConnectionType(), - payloadMappingDefinition, - actorSystem, - connectivityConfig, - protocolAdapter, - logger); - final OutboundMappingProcessor outboundMappingProcessor = OutboundMappingProcessor.of(CONNECTION, - actorSystem, - connectivityConfig, - protocolAdapter, - logger); + final var connection = CONNECTION.toBuilder().payloadMappingDefinition(payloadMappingDefinition).build(); + final var connectionContext = DittoConnectionContext.of(connection, connectivityConfig); + final InboundMappingProcessor inboundMappingProcessor = InboundMappingProcessor.of(connectionContext, + actorSystem, protocolAdapter, logger); + final OutboundMappingProcessor outboundMappingProcessor = OutboundMappingProcessor.of(connectionContext, + actorSystem, protocolAdapter, logger); final Props props = OutboundMappingProcessorActor.props(clientActor, outboundMappingProcessor, CONNECTION, 43); final ActorRef outboundProcessorActor = actorSystem.actorOf(props, OutboundMappingProcessorActor.ACTOR_NAME + "-" + name.getMethodName()); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractMessageMappingProcessorActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractMessageMappingProcessorActorTest.java index f17c0d20db..93ce87b35c 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractMessageMappingProcessorActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AbstractMessageMappingProcessorActorTest.java @@ -30,10 +30,6 @@ import javax.annotation.Nullable; import org.assertj.core.api.AutoCloseableSoftAssertions; -import org.eclipse.ditto.connectivity.service.mapping.ConnectivityCachingSignalEnrichmentProvider; -import org.eclipse.ditto.json.JsonObject; -import org.eclipse.ditto.json.JsonPointer; -import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.base.model.auth.AuthorizationContext; import org.eclipse.ditto.base.model.auth.AuthorizationModelFactory; import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType; @@ -41,6 +37,12 @@ import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; +import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; +import org.eclipse.ditto.base.model.signals.acks.Acknowledgements; +import org.eclipse.ditto.connectivity.api.ExternalMessage; +import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; +import org.eclipse.ditto.connectivity.api.OutboundSignal; import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; @@ -51,20 +53,20 @@ import org.eclipse.ditto.connectivity.model.PayloadMappingDefinition; import org.eclipse.ditto.connectivity.model.SourceBuilder; import org.eclipse.ditto.connectivity.model.Target; -import org.eclipse.ditto.things.model.ThingId; -import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; -import org.eclipse.ditto.protocol.JsonifiableAdaptable; -import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; -import org.eclipse.ditto.protocol.ProtocolFactory; -import org.eclipse.ditto.connectivity.api.ExternalMessage; -import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; -import org.eclipse.ditto.connectivity.api.OutboundSignal; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; +import org.eclipse.ditto.connectivity.service.mapping.ConnectivityCachingSignalEnrichmentProvider; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; import org.eclipse.ditto.internal.models.placeholders.Placeholder; import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider; -import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; -import org.eclipse.ditto.base.model.signals.acks.Acknowledgements; -import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.json.JsonPointer; +import org.eclipse.ditto.json.JsonValue; +import org.eclipse.ditto.protocol.JsonifiableAdaptable; +import org.eclipse.ditto.protocol.ProtocolFactory; +import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; +import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; +import org.eclipse.ditto.things.model.ThingId; import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse; import org.eclipse.ditto.things.model.signals.commands.modify.ModifyAttribute; import org.eclipse.ditto.things.model.signals.commands.modify.ModifyAttributeResponse; @@ -239,7 +241,8 @@ void testExternalMessageInDittoProtocolIsProcessed( assertThat(headers).contains(AddHeaderMessageMapper.OUTBOUND_HEADER); } } else { - final OutboundSignal errorResponse = expectMsgClass(BaseClientActor.PublishMappedMessage.class).getOutboundSignal(); + final OutboundSignal errorResponse = + expectMsgClass(BaseClientActor.PublishMappedMessage.class).getOutboundSignal(); assertThat(errorResponse.getSource()).isInstanceOf(ThingErrorResponse.class); final ThingErrorResponse response = (ThingErrorResponse) errorResponse.getSource(); verifyErrorResponse.accept(response); @@ -332,13 +335,11 @@ ActorRef createInboundMappingProcessorActor(final ActorRef proxyActor, Mockito.when(logger.withMdcEntry(Mockito.any(CharSequence.class), Mockito.nullable(CharSequence.class))) .thenReturn(logger); final ProtocolAdapter protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null); - final InboundMappingProcessor inboundMappingProcessor = InboundMappingProcessor.of(CONNECTION_ID, - CONNECTION.getConnectionType(), - payloadMappingDefinition, - actorSystem, - TestConstants.CONNECTIVITY_CONFIG, - protocolAdapter, - logger); + final ConnectionContext connectionContext = DittoConnectionContext.of( + CONNECTION.toBuilder().payloadMappingDefinition(payloadMappingDefinition).build(), + TestConstants.CONNECTIVITY_CONFIG); + final InboundMappingProcessor inboundMappingProcessor = InboundMappingProcessor.of(connectionContext, + actorSystem, protocolAdapter, logger); final Props inboundDispatchingActorProps = InboundDispatchingActor.props(CONNECTION, protocolAdapter.headerTranslator(), ActorSelection.apply(proxyActor, ""), connectionActorProbe.ref(), outboundMappingProcessorActor); @@ -360,9 +361,8 @@ ActorRef createOutboundMappingProcessorActor(final TestKit kit) { final ThreadSafeDittoLoggingAdapter logger = mockLoggingAdapter(); final ProtocolAdapter protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null); final OutboundMappingProcessor outboundMappingProcessor = - OutboundMappingProcessor.of(CONNECTION, - actorSystem, - TestConstants.CONNECTIVITY_CONFIG, protocolAdapter, logger); + OutboundMappingProcessor.of(DittoConnectionContext.of(CONNECTION, TestConstants.CONNECTIVITY_CONFIG), + actorSystem, protocolAdapter, logger); final Props props = OutboundMappingProcessorActor.props(kit.getRef(), outboundMappingProcessor, CONNECTION, 99); return actorSystem.actorOf(props); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AddHeaderMessageMapper.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AddHeaderMessageMapper.java index 952db18cf7..5e96f389db 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AddHeaderMessageMapper.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/AddHeaderMessageMapper.java @@ -21,15 +21,15 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder; +import org.eclipse.ditto.connectivity.api.ExternalMessage; import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; import org.eclipse.ditto.connectivity.model.MappingContext; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; import org.eclipse.ditto.connectivity.service.mapping.DittoMessageMapper; import org.eclipse.ditto.connectivity.service.mapping.MessageMapper; import org.eclipse.ditto.connectivity.service.mapping.MessageMapperConfiguration; import org.eclipse.ditto.connectivity.service.mapping.PayloadMapper; import org.eclipse.ditto.protocol.Adaptable; -import org.eclipse.ditto.connectivity.api.ExternalMessage; /** * Implementation of {@link org.eclipse.ditto.connectivity.service.mapping.MessageMapper} that delegates to @@ -63,7 +63,7 @@ public Collection getContentTypeBlocklist() { } @Override - public void configure(final MappingConfig mappingConfig, final MessageMapperConfiguration configuration) { + public void configure(final ConnectionContext connectionContext, final MessageMapperConfiguration configuration) { // ignore } diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessorActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessorActorTest.java index 1986fa00d5..f497fca67b 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessorActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessorActorTest.java @@ -21,19 +21,21 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.DittoHeadersSizeChecker; +import org.eclipse.ditto.connectivity.api.ExternalMessage; +import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; +import org.eclipse.ditto.connectivity.api.MappedInboundExternalMessage; +import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectionType; import org.eclipse.ditto.connectivity.model.PayloadMapping; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; import org.eclipse.ditto.connectivity.service.mapping.DittoMessageMapper; import org.eclipse.ditto.connectivity.service.mapping.MessageMapperRegistry; import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome; +import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.protocol.HeaderTranslator; -import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; import org.eclipse.ditto.protocol.TopicPath; -import org.eclipse.ditto.connectivity.api.ExternalMessage; -import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; -import org.eclipse.ditto.connectivity.api.MappedInboundExternalMessage; -import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; +import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -112,8 +114,9 @@ private static InboundMappingProcessor createThrowingProcessor() { final ThreadSafeDittoLoggingAdapter logger = Mockito.mock(ThreadSafeDittoLoggingAdapter.class); Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.any()); Mockito.doAnswer(inv -> logger).when(logger).withCorrelationId(Mockito.any()); - return InboundMappingProcessor.of(ConnectionId.of("connectionId"), - ConnectionType.MQTT, + final Connection connection = + TestConstants.createConnection(ConnectionId.of("connectionId"), ConnectionType.MQTT); + return InboundMappingProcessor.of(DittoConnectionContext.of(connection, TestConstants.CONNECTIVITY_CONFIG), registry, logger, Mockito.mock(ProtocolAdapter.class), diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessorTest.java index ccc3a1e6af..e3effd6745 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/InboundMappingProcessorTest.java @@ -22,29 +22,30 @@ import java.util.HashMap; import java.util.Map; -import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; -import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig; -import org.eclipse.ditto.connectivity.service.mapping.DittoMessageMapper; -import org.eclipse.ditto.connectivity.service.mapping.MessageMapperConfiguration; -import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome; -import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; -import org.eclipse.ditto.connectivity.model.ConnectionId; -import org.eclipse.ditto.connectivity.model.ConnectionType; -import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; -import org.eclipse.ditto.connectivity.model.MappingContext; -import org.eclipse.ditto.connectivity.model.PayloadMappingDefinition; +import org.eclipse.ditto.base.model.signals.SignalWithEntityId; import org.eclipse.ditto.connectivity.api.ExternalMessage; import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; import org.eclipse.ditto.connectivity.api.MappedInboundExternalMessage; +import org.eclipse.ditto.connectivity.model.Connection; +import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; +import org.eclipse.ditto.connectivity.model.MappingContext; +import org.eclipse.ditto.connectivity.model.PayloadMappingDefinition; +import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; +import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; +import org.eclipse.ditto.connectivity.service.mapping.DittoMessageMapper; +import org.eclipse.ditto.connectivity.service.mapping.MessageMapperConfiguration; +import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome; import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import org.eclipse.ditto.internal.utils.protocol.DittoProtocolAdapterProvider; import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider; import org.eclipse.ditto.internal.utils.protocol.config.ProtocolConfig; -import org.eclipse.ditto.base.model.signals.SignalWithEntityId; +import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThing; import org.junit.AfterClass; import org.junit.Before; @@ -128,10 +129,14 @@ public void init() { final PayloadMappingDefinition payloadMappingDefinition = ConnectivityModelFactory.newPayloadMappingDefinition(mappings); + final Connection connection = TestConstants.createConnection() + .toBuilder() + .payloadMappingDefinition(payloadMappingDefinition) + .build(); + final ConnectionContext connectionContext = DittoConnectionContext.of(connection, connectivityConfig); - underTest = InboundMappingProcessor.of(ConnectionId.of("theConnection"), ConnectionType.AMQP_10, - payloadMappingDefinition, actorSystem, - connectivityConfig, protocolAdapterProvider.getProtocolAdapter(null), logger); + underTest = InboundMappingProcessor.of(connectionContext, actorSystem, + protocolAdapterProvider.getProtocolAdapter(null), logger); } @Test diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActorTest.java index 9e43c3ae73..3624636582 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActorTest.java @@ -18,8 +18,6 @@ import java.util.Set; import java.util.stream.Collectors; -import org.eclipse.ditto.json.JsonPointer; -import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.base.model.acks.AcknowledgementLabel; import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotDeclaredException; import org.eclipse.ditto.base.model.acks.AcknowledgementRequest; @@ -27,16 +25,19 @@ import org.eclipse.ditto.base.model.auth.AuthorizationSubject; import org.eclipse.ditto.base.model.common.HttpStatus; import org.eclipse.ditto.base.model.headers.DittoHeaders; +import org.eclipse.ditto.base.model.signals.Signal; +import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; +import org.eclipse.ditto.connectivity.api.InboundSignal; +import org.eclipse.ditto.connectivity.api.OutboundSignal; import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; import org.eclipse.ditto.connectivity.model.Target; import org.eclipse.ditto.connectivity.model.Topic; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; +import org.eclipse.ditto.json.JsonPointer; +import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter; -import org.eclipse.ditto.connectivity.api.InboundSignal; -import org.eclipse.ditto.connectivity.api.OutboundSignal; -import org.eclipse.ditto.base.model.signals.acks.Acknowledgement; -import org.eclipse.ditto.base.model.signals.Signal; import org.eclipse.ditto.things.model.signals.events.AttributeModified; import org.junit.After; import org.junit.Before; @@ -294,9 +295,10 @@ private void testForwardThingEvent(final Connection connection, final boolean is final ActorSelection proxyActorSelection = ActorSelection.apply(proxyActor.ref(), ""); final TestProbe mappingActor = TestProbe.apply("mapping", actorSystem); + final var connectionContext = DittoConnectionContext.of(connection, TestConstants.CONNECTIVITY_CONFIG); final OutboundMappingSettings settings = - OutboundMappingSettings.of(connection, actorSystem, proxyActorSelection, - TestConstants.CONNECTIVITY_CONFIG, DittoProtocolAdapter.newInstance(), + OutboundMappingSettings.of(connectionContext, actorSystem, proxyActorSelection, + DittoProtocolAdapter.newInstance(), MockActor.getThreadSafeDittoLoggingAdapter(actorSystem)); final ActorRef underTest = childActorOf(OutboundDispatchingActor.props(settings, mappingActor.ref())); @@ -320,10 +322,10 @@ private ActorRef getOutboundDispatchingActor(final Connection connection, final final TestProbe proxyActor = TestProbe.apply("proxy", actorSystem); final ActorSelection proxyActorSelection = ActorSelection.apply(proxyActor.ref(), ""); + final var connectionContext = DittoConnectionContext.of(connection, TestConstants.CONNECTIVITY_CONFIG); final OutboundMappingSettings settings = - OutboundMappingSettings.of(connection, actorSystem, proxyActorSelection, - TestConstants.CONNECTIVITY_CONFIG, DittoProtocolAdapter.newInstance(), - MockActor.getThreadSafeDittoLoggingAdapter(actorSystem)); + OutboundMappingSettings.of(connectionContext, actorSystem, proxyActorSelection, + DittoProtocolAdapter.newInstance(), MockActor.getThreadSafeDittoLoggingAdapter(actorSystem)); return actorSystem.actorOf(OutboundDispatchingActor.props(settings, mappingActor)); } diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java index 53f5d67058..d602173cbb 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.base.model.acks.AcknowledgementLabel; import org.eclipse.ditto.base.model.acks.AcknowledgementRequest; import org.eclipse.ditto.base.model.auth.AuthorizationContext; @@ -30,6 +29,9 @@ import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType; import org.eclipse.ditto.base.model.entity.metadata.Metadata; import org.eclipse.ditto.base.model.headers.DittoHeaders; +import org.eclipse.ditto.base.model.signals.acks.Acknowledgements; +import org.eclipse.ditto.connectivity.api.OutboundSignal; +import org.eclipse.ditto.connectivity.api.OutboundSignalFactory; import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectionType; @@ -38,15 +40,14 @@ import org.eclipse.ditto.connectivity.model.Source; import org.eclipse.ditto.connectivity.model.Target; import org.eclipse.ditto.connectivity.model.Topic; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; +import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.protocol.TopicPath; import org.eclipse.ditto.things.model.Attributes; import org.eclipse.ditto.things.model.Thing; import org.eclipse.ditto.things.model.ThingFieldSelector; import org.eclipse.ditto.things.model.ThingId; -import org.eclipse.ditto.protocol.TopicPath; -import org.eclipse.ditto.connectivity.api.OutboundSignal; -import org.eclipse.ditto.connectivity.api.OutboundSignalFactory; -import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider; -import org.eclipse.ditto.base.model.signals.acks.Acknowledgements; import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing; import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse; import org.eclipse.ditto.things.model.signals.events.ThingModified; @@ -229,7 +230,8 @@ public void expectNoTargetIssuedAckRequestInPublishedSignals() { } private OutboundMappingProcessor getProcessor() { - return OutboundMappingProcessor.of(CONNECTION, actorSystem, TestConstants.CONNECTIVITY_CONFIG, + final var connectionContext = DittoConnectionContext.of(CONNECTION, TestConstants.CONNECTIVITY_CONFIG); + return OutboundMappingProcessor.of(connectionContext, actorSystem, protocolAdapterProvider.getProtocolAdapter("test"), AbstractMessageMappingProcessorActorTest.mockLoggingAdapter()); } diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorTest.java index d5ec29a021..8a48c15e50 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorTest.java @@ -29,12 +29,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; -import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig; -import org.eclipse.ditto.connectivity.service.mapping.DittoMessageMapper; -import org.eclipse.ditto.connectivity.service.mapping.MessageMapperConfiguration; -import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome; -import org.eclipse.ditto.json.JsonObject; import org.eclipse.ditto.base.model.acks.AcknowledgementLabel; import org.eclipse.ditto.base.model.acks.AcknowledgementRequest; import org.eclipse.ditto.base.model.auth.AuthorizationContext; @@ -42,6 +36,8 @@ import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; +import org.eclipse.ditto.connectivity.api.OutboundSignal; +import org.eclipse.ditto.connectivity.api.OutboundSignalFactory; import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectionType; @@ -51,19 +47,24 @@ import org.eclipse.ditto.connectivity.model.PayloadMappingDefinition; import org.eclipse.ditto.connectivity.model.Target; import org.eclipse.ditto.connectivity.model.Topic; -import org.eclipse.ditto.messages.model.Message; -import org.eclipse.ditto.messages.model.MessageDirection; -import org.eclipse.ditto.messages.model.MessageHeaders; -import org.eclipse.ditto.protocol.TopicPath; -import org.eclipse.ditto.connectivity.api.OutboundSignal; -import org.eclipse.ditto.connectivity.api.OutboundSignalFactory; +import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; +import org.eclipse.ditto.connectivity.service.config.DittoConnectivityConfig; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; +import org.eclipse.ditto.connectivity.service.mapping.DittoMessageMapper; +import org.eclipse.ditto.connectivity.service.mapping.MessageMapperConfiguration; +import org.eclipse.ditto.connectivity.service.messaging.mappingoutcome.MappingOutcome; import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig; import org.eclipse.ditto.internal.utils.protocol.DittoProtocolAdapterProvider; import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider; import org.eclipse.ditto.internal.utils.protocol.config.ProtocolConfig; +import org.eclipse.ditto.json.JsonObject; +import org.eclipse.ditto.messages.model.Message; +import org.eclipse.ditto.messages.model.MessageDirection; +import org.eclipse.ditto.messages.model.MessageHeaders; import org.eclipse.ditto.messages.model.signals.commands.MessageCommand; import org.eclipse.ditto.messages.model.signals.commands.SendThingMessage; +import org.eclipse.ditto.protocol.TopicPath; import org.eclipse.ditto.things.model.signals.events.ThingModifiedEvent; import org.junit.AfterClass; import org.junit.Before; @@ -167,8 +168,9 @@ public void init() { .build())) .build(); - underTest = OutboundMappingProcessor.of(connection, actorSystem, - connectivityConfig, protocolAdapterProvider.getProtocolAdapter(null), logger); + final var connectionContext = DittoConnectionContext.of(connection, connectivityConfig); + underTest = OutboundMappingProcessor.of(connectionContext, actorSystem, + protocolAdapterProvider.getProtocolAdapter(null), logger); } @Test diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/ThrowingMapper.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/ThrowingMapper.java index caf2156164..0e5ea9cef5 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/ThrowingMapper.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/ThrowingMapper.java @@ -16,12 +16,12 @@ import java.util.List; import java.util.Map; +import org.eclipse.ditto.connectivity.api.ExternalMessage; import org.eclipse.ditto.connectivity.model.MessageMappingFailedException; -import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; import org.eclipse.ditto.connectivity.service.mapping.MessageMapper; import org.eclipse.ditto.connectivity.service.mapping.MessageMapperConfiguration; import org.eclipse.ditto.protocol.Adaptable; -import org.eclipse.ditto.connectivity.api.ExternalMessage; final class ThrowingMapper implements MessageMapper { @@ -42,7 +42,7 @@ public Collection getContentTypeBlocklist() { } @Override - public void configure(final MappingConfig mappingConfig, final MessageMapperConfiguration configuration) { + public void configure(final ConnectionContext connectionContext, final MessageMapperConfiguration configuration) { // nothing to configure } diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActorTest.java index 17e2908864..a618b10546 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActorTest.java @@ -13,8 +13,8 @@ package org.eclipse.ditto.connectivity.service.messaging.amqp; import static org.assertj.core.api.Assertions.fail; -import static org.eclipse.ditto.json.assertions.DittoJsonAssertions.assertThat; import static org.eclipse.ditto.connectivity.service.messaging.TestConstants.header; +import static org.eclipse.ditto.json.assertions.DittoJsonAssertions.assertThat; import java.util.Arrays; import java.util.Collections; @@ -39,17 +39,14 @@ import org.apache.qpid.jms.provider.amqp.AmqpConnection; import org.apache.qpid.jms.provider.amqp.message.AmqpJmsTextMessageFacade; import org.apache.qpid.proton.amqp.Symbol; -import org.eclipse.ditto.connectivity.service.mapping.javascript.JavaScriptMessageMapperFactory; -import org.eclipse.ditto.connectivity.service.messaging.InboundDispatchingActor; -import org.eclipse.ditto.connectivity.service.messaging.InboundMappingProcessor; -import org.eclipse.ditto.connectivity.service.messaging.InboundMappingProcessorActor; -import org.eclipse.ditto.json.JsonPointer; -import org.eclipse.ditto.json.JsonValue; import org.eclipse.ditto.base.model.acks.AcknowledgementRequest; import org.eclipse.ditto.base.model.acks.FilteredAcknowledgementRequest; import org.eclipse.ditto.base.model.common.ResponseType; import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.base.model.headers.WithDittoHeaders; +import org.eclipse.ditto.base.model.signals.commands.Command; +import org.eclipse.ditto.connectivity.api.ExternalMessage; +import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.ConnectionId; import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; @@ -57,13 +54,18 @@ import org.eclipse.ditto.connectivity.model.PayloadMapping; import org.eclipse.ditto.connectivity.model.ReplyTarget; import org.eclipse.ditto.connectivity.model.Source; -import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; +import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext; +import org.eclipse.ditto.connectivity.service.mapping.DittoConnectionContext; +import org.eclipse.ditto.connectivity.service.mapping.javascript.JavaScriptMessageMapperFactory; import org.eclipse.ditto.connectivity.service.messaging.AbstractConsumerActorTest; +import org.eclipse.ditto.connectivity.service.messaging.InboundDispatchingActor; +import org.eclipse.ditto.connectivity.service.messaging.InboundMappingProcessor; +import org.eclipse.ditto.connectivity.service.messaging.InboundMappingProcessorActor; import org.eclipse.ditto.connectivity.service.messaging.TestConstants; -import org.eclipse.ditto.connectivity.api.ExternalMessage; -import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter; -import org.eclipse.ditto.base.model.signals.commands.Command; +import org.eclipse.ditto.json.JsonPointer; +import org.eclipse.ditto.json.JsonValue; +import org.eclipse.ditto.protocol.adapter.ProtocolAdapter; import org.eclipse.ditto.things.model.signals.commands.modify.ModifyAttribute; import org.eclipse.ditto.things.model.signals.commands.modify.ModifyFeatureProperty; import org.junit.Test; @@ -281,11 +283,14 @@ private ActorRef setupActor(final ActorRef testRef, @Nullable final MappingConte Mockito.when(logger.withMdcEntry(Mockito.any(CharSequence.class), Mockito.nullable(CharSequence.class))) .thenReturn(logger); final ProtocolAdapter protocolAdapter = protocolAdapterProvider.getProtocolAdapter(null); - final InboundMappingProcessor inboundMappingProcessor = InboundMappingProcessor.of(CONNECTION_ID, - CONNECTION.getConnectionType(), - ConnectivityModelFactory.newPayloadMappingDefinition(mappings), + final ConnectionContext connectionContext = + DittoConnectionContext.of(CONNECTION.toBuilder() + .payloadMappingDefinition(ConnectivityModelFactory.newPayloadMappingDefinition(mappings)) + .build(), + TestConstants.CONNECTIVITY_CONFIG); + final InboundMappingProcessor inboundMappingProcessor = InboundMappingProcessor.of( + connectionContext, actorSystem, - TestConstants.CONNECTIVITY_CONFIG, protocolAdapter, logger); final Props inboundDispatchingActorProps = InboundDispatchingActor.props(CONNECTION,