Skip to content

Commit

Permalink
[#1081] Make connection context available for message mappers.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jun 21, 2021
1 parent 82acee3 commit b131186
Show file tree
Hide file tree
Showing 38 changed files with 584 additions and 401 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ public Collection<String> getContentTypeBlocklist() {
}

@Override
public final void configure(final MappingConfig mappingConfig, final MessageMapperConfiguration configuration) {
public final void configure(final ConnectionContext connectionContext,
final MessageMapperConfiguration configuration) {
this.id = configuration.getId();
this.incomingConditions = configuration.getIncomingConditions();
this.outgoingConditions = configuration.getOutgoingConditions();
this.contentTypeBlocklist = configuration.getContentTypeBlocklist();
doConfigure(mappingConfig, configuration);
doConfigure(connectionContext.getConnectivityConfig().getMappingConfig(), configuration);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.mapping;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;

/**
* Connection-related information relevant to a message mapper.
*/
public interface ConnectionContext {

/**
* @return the connection in which the mapper is defined.
*/
Connection getConnection();

/**
* @return the connectivity config for the connection in which the mapper is defined.
*/
ConnectivityConfig getConnectivityConfig();

/**
* Create a copy of this context with a modified connectivity config.
*
* @param modifiedConfig the modified config.
* @return the new context.
*/
ConnectionContext withConnectivityConfig(ConnectivityConfig modifiedConfig);

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@
import javax.annotation.concurrent.Immutable;

import org.atteo.classindex.ClassIndex;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.MappingContext;
import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationFailedException;
import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationInvalidException;
import org.eclipse.ditto.connectivity.model.PayloadMappingDefinition;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.json.JsonObject;
Expand Down Expand Up @@ -66,8 +64,7 @@ public final class DefaultMessageMapperFactory implements MessageMapperFactory {

private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(DefaultMessageMapperFactory.class);

private final ConnectionId connectionId;
private final MappingConfig mappingConfig;
private final ConnectionContext connectionContext;

/**
* The actor system used for dynamic class instantiation.
Expand All @@ -85,14 +82,12 @@ public final class DefaultMessageMapperFactory implements MessageMapperFactory {

private final LoggingAdapter log;

private DefaultMessageMapperFactory(final ConnectionId connectionId,
final MappingConfig mappingConfig,
private DefaultMessageMapperFactory(final ConnectionContext connectionContext,
final ExtendedActorSystem actorSystem,
final List<MessageMapperExtension> messageMapperExtensions,
final LoggingAdapter log) {

this.connectionId = checkNotNull(connectionId);
this.mappingConfig = checkNotNull(mappingConfig, "MappingConfig");
this.connectionContext = checkNotNull(connectionContext, "connectionContext");
this.actorSystem = checkNotNull(actorSystem);
this.messageMapperExtensions = checkNotNull(messageMapperExtensions);
this.log = checkNotNull(log);
Expand All @@ -101,22 +96,20 @@ private DefaultMessageMapperFactory(final ConnectionId connectionId,
/**
* Creates a new factory and returns the instance
*
* @param connectionId ID of the connection.
* @param connectionContext context of the connection for which this factory is instantiated.
* @param actorSystem the actor system to use for mapping config + dynamicAccess.
* @param mappingConfig the configuration of the mapping behaviour.
* @param log the log adapter used for debug and warning logs.
* @return the new instance.
* @throws NullPointerException if any argument is {@code null}.
*/
public static DefaultMessageMapperFactory of(final ConnectionId connectionId,
public static DefaultMessageMapperFactory of(final ConnectionContext connectionContext,
final ActorSystem actorSystem,
final MappingConfig mappingConfig,
final LoggingAdapter log) {

final ExtendedActorSystem extendedActorSystem = (ExtendedActorSystem) actorSystem;
final List<MessageMapperExtension> messageMapperExtensions =
tryToLoadMessageMappersExtensions(extendedActorSystem);
return new DefaultMessageMapperFactory(connectionId, mappingConfig, extendedActorSystem,
return new DefaultMessageMapperFactory(connectionContext, extendedActorSystem,
messageMapperExtensions, log);
}

Expand Down Expand Up @@ -246,6 +239,7 @@ private static List<Class<? extends MessageMapperExtension>> loadMessageMapperEx
* @return the instantiated mapper if it can be instantiated from the configured factory class.
*/
Optional<MessageMapper> createMessageMapperInstance(final String mappingEngine) {
final var connectionId = connectionContext.getConnection().getId();
if (registeredMappers.containsKey(mappingEngine)) {
final Class<?> messageMapperClass = registeredMappers.get(mappingEngine);
MessageMapper result = createAnyMessageMapper(messageMapperClass,
Expand Down Expand Up @@ -295,7 +289,7 @@ private static PayloadMapper getPayloadMapperAnnotation(final Map.Entry<String,
private Optional<MessageMapper> configureInstance(final MessageMapper mapper,
final MessageMapperConfiguration options) {
try {
mapper.configure(mappingConfig, options);
mapper.configure(connectionContext, options);
return Optional.of(mapper);
} catch (final MessageMapperConfigurationInvalidException e) {
log.warning("Failed to apply configuration <{}> to mapper instance <{}>: {}", options, mapper,
Expand All @@ -309,15 +303,14 @@ public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final DefaultMessageMapperFactory that = (DefaultMessageMapperFactory) o;
return Objects.equals(connectionId, that.connectionId) &&
Objects.equals(mappingConfig, that.mappingConfig) &&
return Objects.equals(connectionContext, that.connectionContext) &&
Objects.equals(actorSystem, that.actorSystem) &&
Objects.equals(messageMapperExtensions, that.messageMapperExtensions) &&
Objects.equals(log, that.log);
}

@Override
public int hashCode() {
return Objects.hash(connectionId, mappingConfig, actorSystem, messageMapperExtensions, log);
return Objects.hash(connectionContext, actorSystem, messageMapperExtensions, log);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.mapping;

import java.util.Objects;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;

/**
* Implementation of {@link ConnectionContext}.
*/
public final class DittoConnectionContext implements ConnectionContext {

private final Connection connection;
private final ConnectivityConfig connectivityConfig;

private DittoConnectionContext(final Connection connection, final ConnectivityConfig connectivityConfig) {
this.connection = connection;
this.connectivityConfig = connectivityConfig;
}

/**
* Create a connection context from a connection and its connectivity config.
*
* @param connection the connection.
* @param config the connectivity config.
* @return the connection context.
*/
public static DittoConnectionContext of(final Connection connection, final ConnectivityConfig config) {
return new DittoConnectionContext(connection, config);
}

@Override
public Connection getConnection() {
return connection;
}

@Override
public ConnectivityConfig getConnectivityConfig() {
return connectivityConfig;
}

@Override
public ConnectionContext withConnectivityConfig(final ConnectivityConfig modifiedConfig) {
return new DittoConnectionContext(connection, modifiedConfig);
}

@Override
public String toString() {
return getClass().getSimpleName() +
"[connection=" + connection +
",connectivityConfig=" + connectivityConfig +
"]";
}

@Override
public boolean equals(final Object other) {
if (other instanceof DittoConnectionContext) {
final var that = (DittoConnectionContext) other;
return Objects.equals(connection, that.connection) &&
Objects.equals(connectivityConfig, that.connectivityConfig);
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hash(connection, connectivityConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
import java.util.Map;
import java.util.Optional;

import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationInvalidException;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.connectivity.api.ExternalMessage;

/**
* Defines a message mapper which maps a {@link ExternalMessage} to a {@link Adaptable} and vice versa.
Expand All @@ -45,7 +44,7 @@ public interface MessageMapper {
/**
* Returns a blocklist of content-types which shall not be handled by this message mapper.
* Is determined from the passed in {@code MessageMapperConfiguration} in
* {@link #configure(org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig, MessageMapperConfiguration)}.
* {@link #configure(ConnectionContext, MessageMapperConfiguration)}.
*
* @return a blocklist of content-types which shall not be handled by this message mapper.
*/
Expand All @@ -54,13 +53,13 @@ public interface MessageMapper {
/**
* Applies configuration for this MessageMapper.
*
* @param mappingConfig the config scoped to the mapping section "ditto.connectivity.mapping".
* @param connectionContext the connection and its contextual information including its connectivity config.
* @param configuration the configuration to apply.
* @throws MessageMapperConfigurationInvalidException if configuration is invalid.
* @throws org.eclipse.ditto.connectivity.model.MessageMapperConfigurationFailedException if the configuration
* failed for a mapper specific reason.
*/
void configure(MappingConfig mappingConfig, MessageMapperConfiguration configuration);
void configure(ConnectionContext connectionContext, MessageMapperConfiguration configuration);

/**
* Maps an {@link ExternalMessage} to an {@link Adaptable}
Expand Down Expand Up @@ -91,16 +90,16 @@ default JsonObject getDefaultOptions() {

/**
* Returns the conditions to be checked before mapping incoming messages.
* @return the conditions.
*
* @return the conditions.
* @since 1.3.0
*/
Map<String, String> getIncomingConditions();

/**
* Returns the conditions to be checked before mapping outgoing messages.
* @return the conditions.
*
* @return the conditions.
* @since 1.3.0
*/
Map<String, String> getOutgoingConditions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.MappingContext;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.internal.models.placeholders.ExpressionResolver;
import org.eclipse.ditto.internal.models.placeholders.PlaceholderFactory;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
Expand All @@ -52,8 +54,6 @@
import org.eclipse.ditto.protocol.PayloadBuilder;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.internal.models.placeholders.ExpressionResolver;
import org.eclipse.ditto.internal.models.placeholders.PlaceholderFactory;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;

Expand Down Expand Up @@ -176,7 +176,7 @@ public JsonObject getDefaultOptions() {

@Override
protected void doConfigure(final MappingConfig mappingConfig, final MessageMapperConfiguration configuration) {
dittoMessageMapper.configure(mappingConfig, configuration);
dittoMessageMapper.doConfigure(mappingConfig, configuration);
fallbackOutgoingContentType = configuration.findProperty(OUTGOING_CONTENT_TYPE_KEY)
.map(ContentType::of)
.orElse(fallbackOutgoingContentType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import org.eclipse.ditto.connectivity.service.config.mapping.MapperLimitsConfig;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.MessageMappingFailedException;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;
import org.eclipse.ditto.connectivity.model.MessageMappingFailedException;
import org.eclipse.ditto.connectivity.service.config.mapping.MapperLimitsConfig;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.protocol.Adaptable;

/**
* Enforce message size limits on a {@link MessageMapper} and adds random correlation IDs should they not be present
Expand Down Expand Up @@ -88,11 +87,12 @@ public MessageMapper getDelegate() {
}

@Override
public void configure(final MappingConfig mappingConfig, final MessageMapperConfiguration configuration) {
final MapperLimitsConfig mapperLimitsConfig = mappingConfig.getMapperLimitsConfig();
public void configure(final ConnectionContext connectionContext, final MessageMapperConfiguration configuration) {
final MapperLimitsConfig mapperLimitsConfig =
connectionContext.getConnectivityConfig().getMappingConfig().getMapperLimitsConfig();
inboundMessageLimit = mapperLimitsConfig.getMaxMappedInboundMessages();
outboundMessageLimit = mapperLimitsConfig.getMaxMappedOutboundMessages();
delegate.configure(mappingConfig, configuration);
delegate.configure(connectionContext, configuration);
}

@Override
Expand Down
Loading

0 comments on commit b131186

Please sign in to comment.