Skip to content

Commit

Permalink
Remove implicit header mapping in WrappingMessageMapper.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Nov 14, 2019
1 parent 5fce7bc commit 42b311d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.DittoHeadersBuilder;
import org.eclipse.ditto.model.connectivity.MessageMappingFailedException;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.ProtocolFactory;
Expand All @@ -33,11 +30,8 @@
import org.eclipse.ditto.services.models.connectivity.ExternalMessageFactory;

/**
* Does wrap any {@link MessageMapper}.
* <p>
* adds headers to ExternalMessage and Adaptable in mappings even when the wrapped {@link MessageMapper} does
* forget to do so by himself.
* </p>
* Enforce message size limits on a {@link MessageMapper} and adds random correlation IDs should they not be present
* in the mapped message.
*/
final class WrappingMessageMapper implements MessageMapper {

Expand Down Expand Up @@ -87,37 +81,9 @@ public void configure(final MappingConfig mappingConfig, final MessageMapperConf

@Override
public List<Adaptable> map(final ExternalMessage message) {
final ExternalMessage enhancedMessage;
final String correlationId;
if (!message.getHeaders().containsKey(DittoHeaderDefinition.CORRELATION_ID.getKey())) {
// if no correlation-id was provided in the ExternalMessage, generate one here:
correlationId = UUID.randomUUID().toString();
enhancedMessage = ExternalMessageFactory.newExternalMessageBuilder(message)
.withAdditionalHeaders(DittoHeaderDefinition.CORRELATION_ID.getKey(), correlationId)
.build();
} else {
correlationId = message.getHeaders().get(DittoHeaderDefinition.CORRELATION_ID.getKey());
enhancedMessage = message;
}

final List<Adaptable> mappedAdaptables =
checkMaxMappedMessagesLimit(delegate.map(enhancedMessage), inboundMessageLimit);

return mappedAdaptables.stream().map(mapped -> {
final DittoHeadersBuilder headersBuilder = DittoHeaders.newBuilder();
headersBuilder.correlationId(correlationId);

Optional.ofNullable(message.getHeaders().get(ExternalMessage.REPLY_TO_HEADER)).ifPresent(replyTo ->
headersBuilder.putHeader(ExternalMessage.REPLY_TO_HEADER, replyTo)
);

final Optional<DittoHeaders> headersOpt = mapped.getHeaders();
headersOpt.ifPresent(headersBuilder::putHeaders); // overwrite with mapped headers (if any)

return ProtocolFactory.newAdaptableBuilder(mapped)
.withHeaders(headersBuilder.build())
.build();
}).collect(Collectors.toList());
return checkMaxMappedMessagesLimit(delegate.map(message), inboundMessageLimit).stream()
.map(WrappingMessageMapper::addRandomCorrelationId)
.collect(Collectors.toList());
}

@Override
Expand All @@ -127,10 +93,6 @@ public List<ExternalMessage> map(final Adaptable adaptable) {
return mappedMessages.stream().map(mapped -> {
final ExternalMessageBuilder messageBuilder = ExternalMessageFactory.newExternalMessageBuilder(mapped);
messageBuilder.asResponse(adaptable.getPayload().getStatus().isPresent());
adaptable.getHeaders()
.map(h -> h.get(ExternalMessage.REPLY_TO_HEADER))
.ifPresent(
replyTo -> messageBuilder.withAdditionalHeaders(ExternalMessage.REPLY_TO_HEADER, replyTo));
return messageBuilder.build();
}).collect(Collectors.toList());
}
Expand Down Expand Up @@ -173,4 +135,17 @@ public String toString() {
"]";
}

private static Adaptable addRandomCorrelationId(final Adaptable adaptable) {
if (adaptable.getHeaders().flatMap(DittoHeaders::getCorrelationId).isPresent()) {
return adaptable;
} else {
return ProtocolFactory.newAdaptableBuilder(adaptable)
.withHeaders(DittoHeaders.newBuilder()
.correlationId(UUID.randomUUID().toString())
.putHeaders(adaptable.getHeaders().orElse(DittoHeaders.empty()))
.build())
.build();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public void mapAdaptable() {

underTest.configure(mapperLimitsConfig, mockConfiguration);
underTest.map(mockAdaptable);
verify(mockAdaptable, VerificationModeFactory.atLeastOnce()).getHeaders();
verify(mockMapper).map(mockAdaptable);
}

Expand Down

0 comments on commit 42b311d

Please sign in to comment.