Skip to content

Commit

Permalink
made ExternalMessage.getMessageType optional as this only applies for…
Browse files Browse the repository at this point in the history
… outgoing messages where we know the type

* added getContentType() to DittoHeaders as it's already in the definition

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch-si.com>
  • Loading branch information
thjaeckle committed Mar 1, 2018
1 parent ddea732 commit 07d2043
Show file tree
Hide file tree
Showing 14 changed files with 56 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,18 @@ public static ExternalMessageBuilder newExternalMessageBuilderForEvent(final Map
}

/**
* Creates a new ExternalMessageBuilder for the passed {@code messageType}.
* Creates a new ExternalMessageBuilder initialized with the passed {@code headers}.
*
* @param headers the headers to initialize the builder with.
* @return the builder.
*/
public static ExternalMessageBuilder newExternalMessageBuilder(final Map<String, String> headers) {
return new MutableExternalMessageBuilder(headers);
}

/**
* Creates a new ExternalMessageBuilder for the passed {@code messageType} initialized with the passed
* {@code headers}.
*
* @param headers the headers to initialize the builder with.
* @param messageType the MessageType to initialize the builder with.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ default Optional<String> findContentType() {
Optional<ByteBuffer> getBytePayload();

/**
* @return the MessageType of this ExternalMessage
* @return the MessageType of this ExternalMessage, only makes sense for outgoing messages where the type was
* already known.
*/
MessageType getMessageType();
Optional<MessageType> getMessageType();

/**
* @return the PayloadType of this ExternalMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
final class ImmutableExternalMessage implements ExternalMessage {

private final Map<String, String> headers;
@Nullable
private final MessageType messageType;
private final PayloadType payloadType;

Expand All @@ -37,7 +38,7 @@ final class ImmutableExternalMessage implements ExternalMessage {
private final ByteBuffer bytePayload;

ImmutableExternalMessage(final Map<String, String> headers,
final MessageType messageType,
@Nullable final MessageType messageType,
final PayloadType payloadType,
@Nullable final String textPayload,
@Nullable final ByteBuffer bytePayload) {
Expand Down Expand Up @@ -96,8 +97,8 @@ public Optional<ByteBuffer> getBytePayload() {
}

@Override
public MessageType getMessageType() {
return messageType;
public Optional<MessageType> getMessageType() {
return Optional.ofNullable(messageType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
final class MutableExternalMessageBuilder implements ExternalMessageBuilder {

private final Map<String, String> headers;
@Nullable
private final ExternalMessage.MessageType messageType;
private ExternalMessage.PayloadType payloadType = ExternalMessage.PayloadType.UNKNOWN;
@Nullable
Expand All @@ -40,17 +41,27 @@ final class MutableExternalMessageBuilder implements ExternalMessageBuilder {
this.headers = new HashMap<>(message.getHeaders());
this.bytePayload = message.getBytePayload().orElse(null);
this.textPayload = message.getTextPayload().orElse(null);
this.messageType = message.getMessageType();
this.messageType = message.getMessageType().orElse(null);
this.payloadType = message.getPayloadType();
}

/**
* Constructs a new MutableExternalMessageBuilder initialized with the passed {@code headers}.
*
* @param headers the headers to use for initialization.
*/
MutableExternalMessageBuilder(final Map<String, String> headers) {
this(headers, null);
}

/**
* Constructs a new MutableExternalMessageBuilder initialized with the passed {@code headers} and {@code messageType}.
*
* @param headers the headers to use for initialization.
* @param messageType the messageType to use for initialization.
*/
MutableExternalMessageBuilder(final Map<String, String> headers, final ExternalMessage.MessageType messageType) {
MutableExternalMessageBuilder(final Map<String, String> headers,
@Nullable final ExternalMessage.MessageType messageType) {
this.headers = new HashMap<>(headers);
this.messageType = messageType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.util.AbstractMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -65,6 +64,11 @@ protected Optional<String> getStringForDefinition(final HeaderDefinition definit
return Optional.ofNullable(headers.get(definition.getKey()));
}

@Override
public Optional<String> getContentType() {
return getStringForDefinition(DittoHeaderDefinition.CONTENT_TYPE);
}

@Override
public Optional<String> getSource() {
return getStringForDefinition(DittoHeaderDefinition.SOURCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ static Set<String> readSubjectsFromString(final String readSubjectsString) {
*/
Optional<String> getCorrelationId();

/**
* Returns the content-type of the entity.
*
* @return the content-type.
*/
Optional<String> getContentType();

/**
* Returns the source which caused the command, e.g. a "clientId".
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
import java.text.MessageFormat;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.headers.AbstractDittoHeaders;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.HeaderDefinition;

Expand Down Expand Up @@ -80,11 +77,6 @@ public Optional<String> getFeatureId() {
return getStringForDefinition(MessageHeaderDefinition.FEATURE_ID);
}

@Override
public Optional<String> getContentType() {
return getStringForDefinition(DittoHeaderDefinition.CONTENT_TYPE);
}

@Override
public Optional<Duration> getTimeout() {
return getStringForDefinition(MessageHeaderDefinition.TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import org.eclipse.ditto.json.JsonFactory;
Expand Down Expand Up @@ -140,20 +139,4 @@ private ExternalMessage.MessageType determineMessageType(final Adaptable adaptab
}
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
return true;
}

@Override
public int hashCode() {
return Objects.hash();
}

@Override
public String toString() {
return getClass().getSimpleName() + "[]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public interface MessageMapperConfiguration {

/**
*
* TODO TJ doc
* @return
*/
Map<String, String> getProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ public ExternalMessage map(final Adaptable adaptable) {

final Map<String, String> headers = !(mappingHeaders instanceof Undefined) ? null : Collections.emptyMap();
//TODO pm evaulate if bytes or text payload
return AmqpBridgeModelFactory.newExternalMessageBuilder(headers, ExternalMessage.MessageType.RESPONSE) // TODO it is not always response! could be also event or error
// TODO TJ it is not always response! could be also event or error
return AmqpBridgeModelFactory.newExternalMessageBuilder(headers, ExternalMessage.MessageType.RESPONSE)
.withAdditionalHeaders(ExternalMessage.CONTENT_TYPE_HEADER, contentType)
.withText(mappingString)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private Map<Adaptable, ExternalMessage> createValidOutgoingMappings() {
.build());

ExternalMessage message =
AmqpBridgeModelFactory.newExternalMessageBuilderForCommand(headers).withText(adaptable.toJsonString()).build();
AmqpBridgeModelFactory.newExternalMessageBuilder(headers).withText(adaptable.toJsonString()).build();
mappings.put(adaptable, message);

final JsonObject json = JsonFactory.newObjectBuilder()
Expand All @@ -172,7 +172,7 @@ private Map<Adaptable, ExternalMessage> createValidOutgoingMappings() {
adaptable = ProtocolFactory.wrapAsJsonifiableAdaptable(ProtocolFactory.newAdaptableBuilder(adaptable)
.withHeaders(DittoHeaders.of(headers)).build());

message = AmqpBridgeModelFactory.newExternalMessageBuilderForCommand(headers).withText(adaptable.toJsonString()).build();
message = AmqpBridgeModelFactory.newExternalMessageBuilder(headers).withText(adaptable.toJsonString()).build();
mappings.put(adaptable, message);

return mappings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private CommandProcessorActor(final ActorRef pubSubMediator, final ActorRef comm
* @param mappingContexts the mapping contexts to apply for different content-types.
* @return the Akka configuration Props object
*/
static Props props(final ActorRef pubSubMediator, final ActorRef commandProducer,
public static Props props(final ActorRef pubSubMediator, final ActorRef commandProducer,
final AuthorizationSubject authorizationSubject,
final List<MappingContext> mappingContexts) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ public void postStop() throws Exception {
public void onMessage(final Message message) {
try {
final Map<String, String> headers = extractHeadersMapFromJmsMessage(message);
// TODO TJ how can we be sure that the message is a command at this point? could be anything ..
final ExternalMessageBuilder builder = AmqpBridgeModelFactory.newExternalMessageBuilderForCommand(headers);
final ExternalMessageBuilder builder = AmqpBridgeModelFactory.newExternalMessageBuilder(headers);
extractPayloadFromMessage(message, builder);
final ExternalMessage externalMessage = builder.build();
log.debug("Forwarding to processor: {}, {}", externalMessage.getHeaders(),
Expand Down Expand Up @@ -160,14 +159,17 @@ private Map<String, String> extractHeadersMapFromJmsMessage(final Message messag

final String replyTo = message.getJMSReplyTo() != null ? String.valueOf(message.getJMSReplyTo()) : null;
if (replyTo != null) {
headersFromJmsProperties.put("reply-to", replyTo);
headersFromJmsProperties.put("replyTo", replyTo);
}

final String jmsCorrelationId = message.getJMSCorrelationID() != null ? message.getJMSCorrelationID() :
message.getJMSMessageID();
if (jmsCorrelationId != null) {
headersFromJmsProperties.put(CORRELATION_ID_HEADER, jmsCorrelationId);
}

// message. // TODO TJ extract contentType

return headersFromJmsProperties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,8 @@ private void handleDelivery(final Delivery delivery) {
final String correlationId = properties.getCorrelationId();
LogUtil.enhanceLogWithCorrelationId(log, correlationId);
final Map<String, String> headers = extractHeadersFromMessage(properties, envelope);
// TODO TJ how can we be sure that the message is a command at this point? could be anything ..
final ExternalMessageBuilder externalMessageBuilder =
AmqpBridgeModelFactory.newExternalMessageBuilderForCommand(headers);
AmqpBridgeModelFactory.newExternalMessageBuilder(headers);
final String contentType = properties.getContentType();
if (shouldBeInterpretedAsText(contentType)) {
final String text = new String(body, determineCharset(contentType));
Expand Down

0 comments on commit 07d2043

Please sign in to comment.