diff --git a/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/AmqpBridgeModelFactory.java b/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/AmqpBridgeModelFactory.java index 696835c1e2..e3ed6e37bf 100644 --- a/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/AmqpBridgeModelFactory.java +++ b/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/AmqpBridgeModelFactory.java @@ -126,7 +126,18 @@ public static ExternalMessageBuilder newExternalMessageBuilderForEvent(final Map } /** - * Creates a new ExternalMessageBuilder for the passed {@code messageType}. + * Creates a new ExternalMessageBuilder initialized with the passed {@code headers}. + * + * @param headers the headers to initialize the builder with. + * @return the builder. + */ + public static ExternalMessageBuilder newExternalMessageBuilder(final Map headers) { + return new MutableExternalMessageBuilder(headers); + } + + /** + * Creates a new ExternalMessageBuilder for the passed {@code messageType} initialized with the passed + * {@code headers}. * * @param headers the headers to initialize the builder with. * @param messageType the MessageType to initialize the builder with. diff --git a/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/ExternalMessage.java b/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/ExternalMessage.java index 332191978b..78e20eeb75 100644 --- a/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/ExternalMessage.java +++ b/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/ExternalMessage.java @@ -88,9 +88,10 @@ default Optional findContentType() { Optional getBytePayload(); /** - * @return the MessageType of this ExternalMessage + * @return the MessageType of this ExternalMessage, only makes sense for outgoing messages where the type was + * already known. */ - MessageType getMessageType(); + Optional getMessageType(); /** * @return the PayloadType of this ExternalMessage diff --git a/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/ImmutableExternalMessage.java b/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/ImmutableExternalMessage.java index cbb952728a..4fb6f88d44 100644 --- a/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/ImmutableExternalMessage.java +++ b/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/ImmutableExternalMessage.java @@ -28,6 +28,7 @@ final class ImmutableExternalMessage implements ExternalMessage { private final Map headers; + @Nullable private final MessageType messageType; private final PayloadType payloadType; @@ -37,7 +38,7 @@ final class ImmutableExternalMessage implements ExternalMessage { private final ByteBuffer bytePayload; ImmutableExternalMessage(final Map headers, - final MessageType messageType, + @Nullable final MessageType messageType, final PayloadType payloadType, @Nullable final String textPayload, @Nullable final ByteBuffer bytePayload) { @@ -96,8 +97,8 @@ public Optional getBytePayload() { } @Override - public MessageType getMessageType() { - return messageType; + public Optional getMessageType() { + return Optional.ofNullable(messageType); } @Override diff --git a/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/MutableExternalMessageBuilder.java b/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/MutableExternalMessageBuilder.java index 91dc079b7d..3daea0f100 100644 --- a/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/MutableExternalMessageBuilder.java +++ b/model/amqp-bridge/src/main/java/org/eclipse/ditto/model/amqpbridge/MutableExternalMessageBuilder.java @@ -24,6 +24,7 @@ final class MutableExternalMessageBuilder implements ExternalMessageBuilder { private final Map headers; + @Nullable private final ExternalMessage.MessageType messageType; private ExternalMessage.PayloadType payloadType = ExternalMessage.PayloadType.UNKNOWN; @Nullable @@ -40,17 +41,27 @@ final class MutableExternalMessageBuilder implements ExternalMessageBuilder { this.headers = new HashMap<>(message.getHeaders()); this.bytePayload = message.getBytePayload().orElse(null); this.textPayload = message.getTextPayload().orElse(null); - this.messageType = message.getMessageType(); + this.messageType = message.getMessageType().orElse(null); this.payloadType = message.getPayloadType(); } + /** + * Constructs a new MutableExternalMessageBuilder initialized with the passed {@code headers}. + * + * @param headers the headers to use for initialization. + */ + MutableExternalMessageBuilder(final Map headers) { + this(headers, null); + } + /** * Constructs a new MutableExternalMessageBuilder initialized with the passed {@code headers} and {@code messageType}. * * @param headers the headers to use for initialization. * @param messageType the messageType to use for initialization. */ - MutableExternalMessageBuilder(final Map headers, final ExternalMessage.MessageType messageType) { + MutableExternalMessageBuilder(final Map headers, + @Nullable final ExternalMessage.MessageType messageType) { this.headers = new HashMap<>(headers); this.messageType = messageType; } diff --git a/model/base/src/main/java/org/eclipse/ditto/model/base/headers/AbstractDittoHeaders.java b/model/base/src/main/java/org/eclipse/ditto/model/base/headers/AbstractDittoHeaders.java index 19ee27e5a4..2b517dea76 100755 --- a/model/base/src/main/java/org/eclipse/ditto/model/base/headers/AbstractDittoHeaders.java +++ b/model/base/src/main/java/org/eclipse/ditto/model/base/headers/AbstractDittoHeaders.java @@ -14,7 +14,6 @@ import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull; import java.util.AbstractMap; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -65,6 +64,11 @@ protected Optional getStringForDefinition(final HeaderDefinition definit return Optional.ofNullable(headers.get(definition.getKey())); } + @Override + public Optional getContentType() { + return getStringForDefinition(DittoHeaderDefinition.CONTENT_TYPE); + } + @Override public Optional getSource() { return getStringForDefinition(DittoHeaderDefinition.SOURCE); diff --git a/model/base/src/main/java/org/eclipse/ditto/model/base/headers/DittoHeaders.java b/model/base/src/main/java/org/eclipse/ditto/model/base/headers/DittoHeaders.java index 3ce5facd2b..b9ec7fe15d 100755 --- a/model/base/src/main/java/org/eclipse/ditto/model/base/headers/DittoHeaders.java +++ b/model/base/src/main/java/org/eclipse/ditto/model/base/headers/DittoHeaders.java @@ -129,6 +129,13 @@ static Set readSubjectsFromString(final String readSubjectsString) { */ Optional getCorrelationId(); + /** + * Returns the content-type of the entity. + * + * @return the content-type. + */ + Optional getContentType(); + /** * Returns the source which caused the command, e.g. a "clientId". * diff --git a/model/messages/src/main/java/org/eclipse/ditto/model/messages/ImmutableMessageHeaders.java b/model/messages/src/main/java/org/eclipse/ditto/model/messages/ImmutableMessageHeaders.java index b8a3ce7350..219dd3afe7 100755 --- a/model/messages/src/main/java/org/eclipse/ditto/model/messages/ImmutableMessageHeaders.java +++ b/model/messages/src/main/java/org/eclipse/ditto/model/messages/ImmutableMessageHeaders.java @@ -16,8 +16,6 @@ import java.text.MessageFormat; import java.time.Duration; import java.time.OffsetDateTime; -import java.util.Arrays; -import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -25,7 +23,6 @@ import org.eclipse.ditto.model.base.common.HttpStatusCode; import org.eclipse.ditto.model.base.headers.AbstractDittoHeaders; -import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition; import org.eclipse.ditto.model.base.headers.DittoHeaders; import org.eclipse.ditto.model.base.headers.HeaderDefinition; @@ -80,11 +77,6 @@ public Optional getFeatureId() { return getStringForDefinition(MessageHeaderDefinition.FEATURE_ID); } - @Override - public Optional getContentType() { - return getStringForDefinition(DittoHeaderDefinition.CONTENT_TYPE); - } - @Override public Optional getTimeout() { return getStringForDefinition(MessageHeaderDefinition.TIMEOUT) diff --git a/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/DittoMessageMapper.java b/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/DittoMessageMapper.java index d30243274a..162e6dac19 100644 --- a/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/DittoMessageMapper.java +++ b/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/DittoMessageMapper.java @@ -20,7 +20,6 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Objects; import java.util.Optional; import org.eclipse.ditto.json.JsonFactory; @@ -140,20 +139,4 @@ private ExternalMessage.MessageType determineMessageType(final Adaptable adaptab } } - @Override - public boolean equals(final Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - return true; - } - - @Override - public int hashCode() { - return Objects.hash(); - } - - @Override - public String toString() { - return getClass().getSimpleName() + "[]"; - } } diff --git a/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/MessageMapperConfiguration.java b/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/MessageMapperConfiguration.java index 5f092ea892..5620724aef 100644 --- a/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/MessageMapperConfiguration.java +++ b/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/MessageMapperConfiguration.java @@ -24,7 +24,7 @@ public interface MessageMapperConfiguration { /** - * + * TODO TJ doc * @return */ Map getProperties(); diff --git a/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/javascript/JavaScriptMessageMapperRhino.java b/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/javascript/JavaScriptMessageMapperRhino.java index 264d9109d5..f49a5326ba 100644 --- a/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/javascript/JavaScriptMessageMapperRhino.java +++ b/services/amqp-bridge/mapping/src/main/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/javascript/JavaScriptMessageMapperRhino.java @@ -164,7 +164,8 @@ public ExternalMessage map(final Adaptable adaptable) { final Map headers = !(mappingHeaders instanceof Undefined) ? null : Collections.emptyMap(); //TODO pm evaulate if bytes or text payload - return AmqpBridgeModelFactory.newExternalMessageBuilder(headers, ExternalMessage.MessageType.RESPONSE) // TODO it is not always response! could be also event or error + // TODO TJ it is not always response! could be also event or error + return AmqpBridgeModelFactory.newExternalMessageBuilder(headers, ExternalMessage.MessageType.RESPONSE) .withAdditionalHeaders(ExternalMessage.CONTENT_TYPE_HEADER, contentType) .withText(mappingString) .build(); diff --git a/services/amqp-bridge/mapping/src/test/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/DittoMessageMapperTest.java b/services/amqp-bridge/mapping/src/test/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/DittoMessageMapperTest.java index 9431fe23aa..0485cc0f36 100644 --- a/services/amqp-bridge/mapping/src/test/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/DittoMessageMapperTest.java +++ b/services/amqp-bridge/mapping/src/test/java/org/eclipse/ditto/services/amqpbridge/mapping/mapper/DittoMessageMapperTest.java @@ -161,7 +161,7 @@ private Map createValidOutgoingMappings() { .build()); ExternalMessage message = - AmqpBridgeModelFactory.newExternalMessageBuilderForCommand(headers).withText(adaptable.toJsonString()).build(); + AmqpBridgeModelFactory.newExternalMessageBuilder(headers).withText(adaptable.toJsonString()).build(); mappings.put(adaptable, message); final JsonObject json = JsonFactory.newObjectBuilder() @@ -172,7 +172,7 @@ private Map createValidOutgoingMappings() { adaptable = ProtocolFactory.wrapAsJsonifiableAdaptable(ProtocolFactory.newAdaptableBuilder(adaptable) .withHeaders(DittoHeaders.of(headers)).build()); - message = AmqpBridgeModelFactory.newExternalMessageBuilderForCommand(headers).withText(adaptable.toJsonString()).build(); + message = AmqpBridgeModelFactory.newExternalMessageBuilder(headers).withText(adaptable.toJsonString()).build(); mappings.put(adaptable, message); return mappings; diff --git a/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/CommandProcessorActor.java b/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/CommandProcessorActor.java index 82fc87d001..fedfe47d06 100644 --- a/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/CommandProcessorActor.java +++ b/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/CommandProcessorActor.java @@ -101,7 +101,7 @@ private CommandProcessorActor(final ActorRef pubSubMediator, final ActorRef comm * @param mappingContexts the mapping contexts to apply for different content-types. * @return the Akka configuration Props object */ - static Props props(final ActorRef pubSubMediator, final ActorRef commandProducer, + public static Props props(final ActorRef pubSubMediator, final ActorRef commandProducer, final AuthorizationSubject authorizationSubject, final List mappingContexts) { diff --git a/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/amqp/CommandConsumerActor.java b/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/amqp/CommandConsumerActor.java index 1a8b261c46..294a74e98c 100644 --- a/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/amqp/CommandConsumerActor.java +++ b/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/amqp/CommandConsumerActor.java @@ -116,8 +116,7 @@ public void postStop() throws Exception { public void onMessage(final Message message) { try { final Map headers = extractHeadersMapFromJmsMessage(message); - // TODO TJ how can we be sure that the message is a command at this point? could be anything .. - final ExternalMessageBuilder builder = AmqpBridgeModelFactory.newExternalMessageBuilderForCommand(headers); + final ExternalMessageBuilder builder = AmqpBridgeModelFactory.newExternalMessageBuilder(headers); extractPayloadFromMessage(message, builder); final ExternalMessage externalMessage = builder.build(); log.debug("Forwarding to processor: {}, {}", externalMessage.getHeaders(), @@ -160,7 +159,7 @@ private Map extractHeadersMapFromJmsMessage(final Message messag final String replyTo = message.getJMSReplyTo() != null ? String.valueOf(message.getJMSReplyTo()) : null; if (replyTo != null) { - headersFromJmsProperties.put("reply-to", replyTo); + headersFromJmsProperties.put("replyTo", replyTo); } final String jmsCorrelationId = message.getJMSCorrelationID() != null ? message.getJMSCorrelationID() : @@ -168,6 +167,9 @@ private Map extractHeadersMapFromJmsMessage(final Message messag if (jmsCorrelationId != null) { headersFromJmsProperties.put(CORRELATION_ID_HEADER, jmsCorrelationId); } + +// message. // TODO TJ extract contentType + return headersFromJmsProperties; } diff --git a/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/rabbitmq/CommandConsumerActor.java b/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/rabbitmq/CommandConsumerActor.java index decd3d479d..9d811bf173 100644 --- a/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/rabbitmq/CommandConsumerActor.java +++ b/services/amqp-bridge/messaging/src/main/java/org/eclipse/ditto/services/amqpbridge/messaging/rabbitmq/CommandConsumerActor.java @@ -95,9 +95,8 @@ private void handleDelivery(final Delivery delivery) { final String correlationId = properties.getCorrelationId(); LogUtil.enhanceLogWithCorrelationId(log, correlationId); final Map headers = extractHeadersFromMessage(properties, envelope); - // TODO TJ how can we be sure that the message is a command at this point? could be anything .. final ExternalMessageBuilder externalMessageBuilder = - AmqpBridgeModelFactory.newExternalMessageBuilderForCommand(headers); + AmqpBridgeModelFactory.newExternalMessageBuilder(headers); final String contentType = properties.getContentType(); if (shouldBeInterpretedAsText(contentType)) { final String text = new String(body, determineCharset(contentType));