Skip to content

Commit

Permalink
first draft of payload mapper extension
Browse files Browse the repository at this point in the history
Co-authored-by: Stanchev Aleksandar <aleksandar.stanchev@bosch.io>
Signed-off-by: Johannes Schneider <johannes.schneider@bosch.io>
  • Loading branch information
jokraehe and Stanchev Aleksandar committed Sep 26, 2022
1 parent 812621f commit b48190b
Show file tree
Hide file tree
Showing 18 changed files with 296 additions and 172 deletions.
Expand Up @@ -53,14 +53,12 @@
* the mapping configuration. The thingId must be set in the mapping configuration. It can either be a fixed Thing ID
* or it can be resolved from the message headers by using a placeholder e.g. {@code {{ header:device_id }}}.
*/
@PayloadMapper(
alias = "ConnectionStatus",
requiresMandatoryConfiguration = true // "thingId" is mandatory configuration
)
public class ConnectionStatusMessageMapper extends AbstractMessageMapper {
public class ConnectionStatusMessageMapper extends AbstractMessageMapper implements PayloadMapper {

private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(ConnectionStatusMessageMapper.class);

private static final String PAYLOAD_MAPPER_ALIAS = "ConnectionStatus";

static final String HEADER_HONO_TTD = "ttd";
static final String HEADER_HONO_CREATION_TIME = "creation-time";
static final String DEFAULT_FEATURE_ID = "ConnectionStatus";
Expand All @@ -83,14 +81,27 @@ public class ConnectionStatusMessageMapper extends AbstractMessageMapper {
private String mappingOptionFeatureId;
private String mappingOptionThingId;

@Override
public String getAlias() {
return PAYLOAD_MAPPER_ALIAS;
}

/**
* "thingId" is mandatory.
*/
@Override
public boolean isConfigurationMandatory() {
return true;
}

@Override
public void doConfigure(final MappingConfig mappingConfig,
final MessageMapperConfiguration messageMapperConfiguration) {
mappingOptionThingId = messageMapperConfiguration.findProperty(MAPPING_OPTIONS_PROPERTIES_THING_ID)
.orElseThrow(
() -> MessageMapperConfigurationInvalidException.newBuilder(MAPPING_OPTIONS_PROPERTIES_THING_ID)
.build());
// Check if ThingId is valid when its not a placeholder
// Check if ThingId is valid when it's not a placeholder
if (!Placeholders.containsAnyPlaceholder(mappingOptionThingId)) {
try {
ThingId.of(mappingOptionThingId);
Expand Down
Expand Up @@ -15,22 +15,18 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.atteo.classindex.ClassIndex;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.MappingContext;
import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationFailedException;
import org.eclipse.ditto.connectivity.model.MessageMapperConfigurationInvalidException;
import org.eclipse.ditto.connectivity.model.PayloadMappingDefinition;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
Expand All @@ -40,12 +36,8 @@
import org.eclipse.ditto.json.JsonObject;

import akka.actor.ActorSystem;
import akka.actor.DynamicAccess;
import akka.actor.ExtendedActorSystem;
import akka.event.LoggingAdapter;
import scala.collection.immutable.List$;
import scala.reflect.ClassTag;
import scala.util.Try;

/**
* Encapsulates responsibility for instantiating {@link MessageMapper} objects.
Expand Down Expand Up @@ -76,7 +68,7 @@ public final class DefaultMessageMapperFactory implements MessageMapperFactory {
* The factory function that creates instances of {@link MessageMapper}.
*/
private final MessageMapperExtension messageMapperExtension;
private static final Map<String, Class<?>> REGISTERED_MAPPERS = tryToLoadPayloadMappers();
private final Set<PayloadMapperFactory> payloadMapperFactories;

private final LoggingAdapter log;

Expand All @@ -91,13 +83,19 @@ private DefaultMessageMapperFactory(final Connection connection,
this.log = checkNotNull(log);

messageMapperExtension = loadMessageMapperExtension(actorSystem);
payloadMapperFactories = loadPayloadMapperProvider(actorSystem).getPayloadMapperFactories();
}

private static MessageMapperExtension loadMessageMapperExtension(final ActorSystem actorSystem) {
final var extensionsConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
return MessageMapperExtension.get(actorSystem, extensionsConfig);
}

private static PayloadMapperProvider loadPayloadMapperProvider(final ActorSystem actorSystem) {
final var extensionsConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
return PayloadMapperProvider.get(actorSystem, extensionsConfig);
}

/**
* Creates a new factory and returns the instance
*
Expand Down Expand Up @@ -144,16 +142,16 @@ private static MergedJsonObjectMap mergeMappingOptions(final JsonObject defaultO
public MessageMapperRegistry registryOf(final MappingContext defaultContext,
final PayloadMappingDefinition payloadMappingDefinition) {

final MessageMapper defaultMapper = mapperOf("default", defaultContext)
final var defaultMapper = mapperOf("default", defaultContext)
.orElseThrow(() -> new IllegalArgumentException("No default mapper found: " + defaultContext));

final Map<String, MessageMapper> mappersFromConnectionConfig =
final var mappersFromConnectionConfig =
instantiateMappers(payloadMappingDefinition.getDefinitions().entrySet().stream());

final Map<String, MessageMapper> fallbackMappers =
instantiateMappers(REGISTERED_MAPPERS.entrySet().stream()
.filter(requiresNoMandatoryConfiguration())
.map(Map.Entry::getKey)
final var fallbackMappers = instantiateMappers(
payloadMapperFactories.stream()
.filter(pmf -> !pmf.isConfigurationMandatory())
.map(PayloadMapperFactory::getPayloadMapperAlias)
.map(DefaultMessageMapperFactory::getEmptyMappingContextForAlias));

return DefaultMessageMapperRegistry.of(defaultMapper, mappersFromConnectionConfig, fallbackMappers);
Expand All @@ -177,106 +175,18 @@ private Map<String, MessageMapper> instantiateMappers(final Stream<Map.Entry<Str
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private static Map<String, Class<?>> tryToLoadPayloadMappers() {
try {
final Iterable<Class<?>> payloadMappers = ClassIndex.getAnnotated(PayloadMapper.class);
final Map<String, Class<?>> mappers = new HashMap<>();
for (final Class<?> payloadMapper : payloadMappers) {
LOGGER.info("Loading payload mapper <{}>.", payloadMapper);
if (!MessageMapper.class.isAssignableFrom(payloadMapper)) {
throw new IllegalStateException("The class " + payloadMapper.getName() + " does not implement " +
MessageMapper.class.getName());
}
final PayloadMapper annotation = payloadMapper.getAnnotation(PayloadMapper.class);
if (annotation == null) {
throw new IllegalStateException("The mapper " + payloadMapper.getName() + " is not annotated with" +
" @PayloadMapper.");
}
final String[] aliases = annotation.alias();
if (aliases.length == 0) {
throw new IllegalStateException("No alias configured for " + payloadMapper.getName());
}

Stream.of(aliases).forEach(alias -> {
final Class<?> existingMapper = mappers.get(alias);
if (existingMapper == null) {
mappers.put(alias, payloadMapper);
LOGGER.info("Registered mapper {} for alias {}.", payloadMapper.getName(), alias);
} else if (annotation.priority() == existingMapper.getAnnotation(PayloadMapper.class).priority()) {
throw new IllegalStateException("Mapper alias <" + alias + "> was already registered and is " +
"tried to register again for " + payloadMapper.getName());
} else if (annotation.priority() >
existingMapper.getAnnotation(PayloadMapper.class).priority()) {
mappers.replace(alias, payloadMapper);
LOGGER.info("Replaced mapper {} by higher priority", payloadMapper.getName());
} else {
LOGGER.info("Skipped registration of mapper {} because of lower priority",
payloadMapper.getName());
}
});
}
return mappers;
} catch (final Exception e) {
final String message = e.getClass().getCanonicalName() + ": " + e.getMessage();
throw MessageMapperConfigurationFailedException.newBuilder(message).build();
}
}

/**
* Instantiates a mapper for the specified mapping context.
*
* @return the instantiated mapper if it can be instantiated from the configured factory class.
*/
Optional<MessageMapper> createMessageMapperInstance(final String mappingEngine) {
final Optional<MessageMapper> result;
final var connectionId = connection.getId();
if (REGISTERED_MAPPERS.containsKey(mappingEngine)) {
final Class<?> messageMapperClass = REGISTERED_MAPPERS.get(mappingEngine);
final MessageMapper mapper = createAnyMessageMapper(messageMapperClass,
actorSystem.dynamicAccess());
if (null == mapper) {
result = Optional.empty();
} else {
result = Optional.ofNullable(messageMapperExtension.apply(connectionId, mapper));
}
} else {
log.info("Mapper {} not found in {}.", mappingEngine, REGISTERED_MAPPERS);
result = Optional.empty();
}
return result;
}

@Nullable
private static MessageMapper createAnyMessageMapper(final Class<?> clazz,
final DynamicAccess dynamicAccess) {
final ClassTag<MessageMapper> tag = scala.reflect.ClassTag$.MODULE$.apply(MessageMapper.class);
final Try<MessageMapper> mapperTry = dynamicAccess.createInstanceFor(clazz, List$.MODULE$.empty(), tag);

if (mapperTry.isFailure()) {
final Throwable error = mapperTry.failed().get();
if (error instanceof ClassNotFoundException || error instanceof InstantiationException ||
error instanceof ClassCastException) {
LOGGER.warn("Could not instantiate message mapper.", error);
return null;
} else {
throw new IllegalStateException("There was an unknown error when trying to creating instance for '"
+ clazz + "'", error);
}
}
final MessageMapper messageMapper = mapperTry.get();
if (messageMapper == null) {
LOGGER.warn("Could not instantiate message mapper because result was null.");
}
return messageMapper;
}

private static Predicate<? super Map.Entry<String, Class<?>>> requiresNoMandatoryConfiguration() {
return e -> !getPayloadMapperAnnotation(e).requiresMandatoryConfiguration();
}

private static PayloadMapper getPayloadMapperAnnotation(final Map.Entry<String, Class<?>> entry) {
final Class<?> mapperClass = entry.getValue();
return mapperClass.getAnnotation(PayloadMapper.class);
return payloadMapperFactories.stream()
.filter(pmf -> pmf.getPayloadMapperAlias()
.equals(mappingEngine))
.findFirst()
.map(PayloadMapperFactory::createPayloadMapper)
.map(payloadMapper -> messageMapperExtension.apply(connection.getId(), payloadMapper));
}

private Optional<MessageMapper> configureInstance(final MessageMapper mapper,
Expand Down
Expand Up @@ -31,14 +31,14 @@

/**
* A message mapper implementation for the Ditto Protocol.
* Expects messages to contain a JSON serialized Ditto Protocol message.
* Expect messages to contain a JSON serialized Ditto Protocol message.
*/
@PayloadMapper(
alias = {"Ditto",
// legacy full qualified name
"org.eclipse.ditto.connectivity.service.mapping.DittoMessageMapper"})
public final class DittoMessageMapper extends AbstractMessageMapper {
public final class DittoMessageMapper extends AbstractMessageMapper implements PayloadMapper {

/**
* The alias of this mapper.
*/
public static final String ALIAS = "Ditto";

static final JsonObject DEFAULT_OPTIONS = JsonObject.newBuilder()
.set(MessageMapperConfiguration.CONTENT_TYPE_BLOCKLIST,
Expand All @@ -53,9 +53,14 @@ public final class DittoMessageMapper extends AbstractMessageMapper {
* The context representing this mapper
*/
public static final MappingContext CONTEXT = ConnectivityModelFactory.newMappingContextBuilder(
DittoMessageMapper.class.getCanonicalName(),
DEFAULT_OPTIONS
).build();
DittoMessageMapper.class.getCanonicalName(),
DEFAULT_OPTIONS)
.build();

@Override
public String getAlias() {
return ALIAS;
}

@Override
public List<Adaptable> map(final ExternalMessage message) {
Expand Down
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.mapping;

final class DittoPayloadMapperFactory implements PayloadMapperFactory {

@Override
public DittoMessageMapper createPayloadMapper() {
return new DittoMessageMapper();
}

@Override
public String getPayloadMapperAlias() {
return DittoMessageMapper.ALIAS;
}

}
Expand Up @@ -51,6 +51,7 @@
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.Payload;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.things.model.Thing;
Expand All @@ -69,11 +70,9 @@
*
* @since 1.3.0
*/
@PayloadMapper(
alias = "ImplicitThingCreation",
requiresMandatoryConfiguration = true // "thing" is mandatory configuration
)
public final class ImplicitThingCreationMessageMapper extends AbstractMessageMapper {
public final class ImplicitThingCreationMessageMapper extends AbstractMessageMapper implements PayloadMapper {

private static final String PAYLOAD_MAPPER_ALIAS = "ImplicitThingCreation";

private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(ImplicitThingCreationMessageMapper.class);

Expand Down Expand Up @@ -114,6 +113,19 @@ public ImplicitThingCreationMessageMapper(
this.resolverFactory = resolverFactory;
}

@Override
public String getAlias() {
return PAYLOAD_MAPPER_ALIAS;
}

/**
* "thing" is mandatory.
*/
@Override
public boolean isConfigurationMandatory() {
return true;
}

@Override
protected void doConfigure(final MappingConfig mappingConfig, final MessageMapperConfiguration configuration) {
thingTemplate = configuration.findProperty(THING_TEMPLATE).orElseThrow(
Expand Down
Expand Up @@ -45,8 +45,9 @@
* Create-, modify- and merged-events are mapped to nested sparse JSON.
* All other signals and incoming messages are dropped.
*/
@PayloadMapper(alias = "Normalized")
public final class NormalizedMessageMapper extends AbstractMessageMapper {
public final class NormalizedMessageMapper extends AbstractMessageMapper implements PayloadMapper {

private static final String PAYLOAD_MAPPER_ALIAS = "Normalized";

/**
* Config property to project parts from the mapping result.
Expand All @@ -62,6 +63,11 @@ public final class NormalizedMessageMapper extends AbstractMessageMapper {
@Nullable
private JsonFieldSelector jsonFieldSelector;

@Override
public String getAlias() {
return PAYLOAD_MAPPER_ALIAS;
}

@Override
public String getId() {
return "normalized";
Expand Down

0 comments on commit b48190b

Please sign in to comment.