Skip to content

Commit

Permalink
[#1228] provided new "UpdateTwinWithLiveResponse" MessageMapper for c…
Browse files Browse the repository at this point in the history
…reating "MergeThing" commands for updating the twin when a "live response" was consumed via a connection source

* made headers to use configurable and supported using placeholders in those headers
* added a new MiscPlaceholder providing the current timestamp, e.g. as ISO-8601
* enhanced documentation for the new UpdateTwinWithLiveResponseMessageMapper and MiscPlaceholder

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Nov 18, 2021
1 parent 4058e35 commit 977b95e
Show file tree
Hide file tree
Showing 29 changed files with 895 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ public final class ConnectivityMessagingConstants {
*/
public static final String CLUSTER_ROLE = "connectivity";

/**
* Path of the connectivity-stream-provider actor.
*/
public static final String STREAM_PROVIDER_ACTOR_PATH = "/user/connectivityRoot/persistenceStreamingActor";

/*
* Inhibit instantiation of this utility class.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
*/
package org.eclipse.ditto.connectivity.api.placeholders;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.placeholders.Placeholder;
import org.eclipse.ditto.protocol.placeholders.MiscPlaceholder;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder;

Expand Down Expand Up @@ -54,7 +53,7 @@ public static EntityIdPlaceholder newEntityPlaceholder() {
/**
* @return the singleton instance of the placeholder with prefix {@code request}.
*/
public static Placeholder<AuthorizationContext> newRequestPlaceholder() {
public static RequestPlaceholder newRequestPlaceholder() {
return ImmutableRequestPlaceholder.INSTANCE;
}

Expand All @@ -72,6 +71,13 @@ public static ResourcePlaceholder newResourcePlaceholder() {
return ResourcePlaceholder.getInstance();
}

/**
* @return the singleton instance of {@link MiscPlaceholder}
*/
public static MiscPlaceholder newMiscPlaceholder() {
return MiscPlaceholder.getInstance();
}

/**
* @return the singleton instance of {@link ConnectionIdPlaceholder}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
import org.eclipse.ditto.placeholders.Placeholder;

/**
* Placeholder implementation that replaces {@code request} related things based on an {@link AuthorizationContext}.
*/
@Immutable
final class ImmutableRequestPlaceholder implements Placeholder<AuthorizationContext> {
final class ImmutableRequestPlaceholder implements RequestPlaceholder {

/**
* Singleton instance of the ImmutableHeadersPlaceholder.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.api.placeholders;

import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.placeholders.Placeholder;

/**
* A {@link Placeholder} implementation that replaces {@code request} related things based on an
* {@link AuthorizationContext}.
*/
public interface RequestPlaceholder extends Placeholder<AuthorizationContext> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@
*/
package org.eclipse.ditto.connectivity.service.mapping;

import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;

import org.eclipse.ditto.base.model.common.CharsetDeterminer;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.MessageMappingFailedException;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.TopicPath;

import akka.actor.ActorSystem;

Expand Down Expand Up @@ -75,4 +84,50 @@ public final void configure(final Connection connection,
protected void doConfigure(final MappingConfig mappingConfig, final MessageMapperConfiguration configuration) {
// noop default
}

/**
* Extracts the payload of the passed in {@code message} as string.
*
* @param message the external message to extract the payload from.
* @return the payload of the passed in {@code message} as string
* @throws MessageMappingFailedException if no payload was present or if it was empty.
*/
protected static String extractPayloadAsString(final ExternalMessage message) {
final Optional<String> payload;
if (message.isTextMessage()) {
payload = message.getTextPayload();
} else if (message.isBytesMessage()) {
final Charset charset = determineCharset(message.getHeaders());
payload = message.getBytePayload().map(charset::decode).map(CharBuffer::toString);
} else {
payload = Optional.empty();
}

return payload.filter(s -> !s.isEmpty()).orElseThrow(() ->
MessageMappingFailedException.newBuilder(message.findContentType().orElse(""))
.description(
"As payload was absent or empty, please make sure to send payload in your messages.")
.dittoHeaders(DittoHeaders.of(message.getHeaders()))
.build());
}

protected static Charset determineCharset(final Map<String, String> messageHeaders) {
return CharsetDeterminer.getInstance().apply(messageHeaders.get(ExternalMessage.CONTENT_TYPE_HEADER));
}

protected static boolean isResponse(final Adaptable adaptable) {
final var payload = adaptable.getPayload();
final var httpStatus = payload.getHttpStatus();
return httpStatus.isPresent();
}

protected static boolean isError(final Adaptable adaptable) {
final var topicPath = adaptable.getTopicPath();
return topicPath.isCriterion(TopicPath.Criterion.ERRORS);
}

protected static boolean isLiveSignal(final Adaptable adaptable) {
return adaptable.getTopicPath().isChannel(TopicPath.Channel.LIVE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ private DefaultMessageMapperConfiguration(final String id, final MergedJsonObjec
*/
public static DefaultMessageMapperConfiguration of(final String id, final Map<String, JsonValue> configuration,
final Map<String, String> incomingConditions, final Map<String, String> outgoingConditions) {
return of(id, MergedJsonObjectMap.of(configuration),
incomingConditions, outgoingConditions);
return of(id, MergedJsonObjectMap.of(configuration), incomingConditions, outgoingConditions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,20 @@

import static java.util.Collections.singletonList;

import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
import org.eclipse.ditto.base.model.common.DittoConstants;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.MappingContext;
import org.eclipse.ditto.connectivity.model.MessageMappingFailedException;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.JsonifiableAdaptable;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;

/**
* A message mapper implementation for the Ditto Protocol.
Expand Down Expand Up @@ -96,43 +89,9 @@ private static String getJsonString(final Adaptable adaptable) {
return jsonifiableAdaptable.toJsonString();
}

private static boolean isResponse(final Adaptable adaptable) {
final var payload = adaptable.getPayload();
final var httpStatus = payload.getHttpStatus();
return httpStatus.isPresent();
}

private static boolean isError(final Adaptable adaptable) {
final var topicPath = adaptable.getTopicPath();
return topicPath.isCriterion(TopicPath.Criterion.ERRORS);
}

@Override
public JsonObject getDefaultOptions() {
return DEFAULT_OPTIONS;
}

private static String extractPayloadAsString(final ExternalMessage message) {
final Optional<String> payload;
if (message.isTextMessage()) {
payload = message.getTextPayload();
} else if (message.isBytesMessage()) {
final Charset charset = determineCharset(message.getHeaders());
payload = message.getBytePayload().map(charset::decode).map(CharBuffer::toString);
} else {
payload = Optional.empty();
}

return payload.filter(s -> !s.isEmpty()).orElseThrow(() ->
MessageMappingFailedException.newBuilder(message.findContentType().orElse(""))
.description(
"As payload was absent or empty, please make sure to send payload in your messages.")
.dittoHeaders(DittoHeaders.of(message.getHeaders()))
.build());
}

private static Charset determineCharset(final Map<String, String> messageHeaders) {
return CharsetDeterminer.getInstance().apply(messageHeaders.get(ExternalMessage.CONTENT_TYPE_HEADER));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public interface MessageMapper {
* @throws org.eclipse.ditto.connectivity.model.MessageMapperConfigurationFailedException if the configuration
* failed for a mapper specific reason.
*/
void configure(Connection connection, final ConnectivityConfig connectivityConfig,
MessageMapperConfiguration configuration, final ActorSystem actorSystem);
void configure(Connection connection, ConnectivityConfig connectivityConfig,
MessageMapperConfiguration configuration, ActorSystem actorSystem);

/**
* Maps an {@link ExternalMessage} to an {@link Adaptable}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.api.ExternalMessageBuilder;
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.MappingContext;
import org.eclipse.ditto.connectivity.service.config.mapping.MappingConfig;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
Expand Down Expand Up @@ -117,14 +115,6 @@ public final class RawMessageMapper extends AbstractMessageMapper {

private final DittoMessageMapper dittoMessageMapper = new DittoMessageMapper();

/**
* The context representing this mapper.
*/
public static final MappingContext CONTEXT = ConnectivityModelFactory.newMappingContextBuilder(
RawMessageMapper.class.getCanonicalName(),
DEFAULT_CONFIG
).build();

@Override
public List<Adaptable> map(final ExternalMessage externalMessage) {
final Optional<MessageHeaders> messageHeadersOptional =
Expand Down

0 comments on commit 977b95e

Please sign in to comment.