Skip to content

Commit

Permalink
Set message annotations for outbound AMQP messages
Browse files Browse the repository at this point in the history
Co-authored-by: Stanchev Aleksandar <aleksandar.stanchev@bosch.io>
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk and Stanchev Aleksandar committed Oct 6, 2022
1 parent e1755c1 commit b55f186
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ static <T extends LoggingAdapter & MdcEntrySettable<T>> void setPropertiesAndApp
try {
if (isDefinedAmqpProperty(headerName)) {
setDefinedAmqpProperty(message, headerName, headerValue);
} else if (isMessageAnnotation(headerName)) {
setMessageAnnotation(message, headerName, headerValue);
} else {
setAmqpApplicationProperty(message, headerName, headerValue);
}
Expand Down Expand Up @@ -286,6 +288,10 @@ private static boolean isDefinedAmqpProperty(final String name) {
return AMQP_PROPERTY_SETTER.containsKey(name);
}

private static boolean isMessageAnnotation(final String name) {
return name.startsWith(AMQP.MESSAGE_ANNOTATION_PREFIX);
}

private static Consumer<String> set(final Map<String, String> modifiableHeaders, final String name) {
return value -> modifiableHeaders.put(name, value);
}
Expand All @@ -295,6 +301,14 @@ private static void setDefinedAmqpProperty(final Message message, final String n
AMQP_PROPERTY_SETTER.get(name).accept(message, value);
}

// precondition: isMessageAnnotation(name)
private static void setMessageAnnotation(final Message message, final String name, final String value) {
final String applicationPropertyName = name.substring(AMQP.MESSAGE_ANNOTATION_PREFIX.length());

wrapFacadeBiConsumer(message, value, (facade, v) -> wrap((x, y) ->
facade.setTracingAnnotation(applicationPropertyName, value)).accept(message, value));
}

private static void setAmqpApplicationProperty(final Message message, final String name, final String value) {
final String applicationPropertyName = name.startsWith(AMQP.APPLICATION_PROPERTY_PREFIX)
? name.substring(AMQP.APPLICATION_PROPERTY_PREFIX.length())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public void testMsgPublishedOntoFullQueueShallBeDropped() throws Exception {
.build();
final DittoHeaders withAckRequest = dittoHeaders.toBuilder()
.acknowledgementRequest(AcknowledgementRequest.of(AcknowledgementLabel.of(ack)))
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(),
getRef().path().toSerializationFormat())
.build();

final Props props = getPublisherActorProps();
Expand Down Expand Up @@ -269,7 +270,7 @@ public void testRecoverPublisher() throws Exception {
final int wantedNumberOfInvocations = i + 2;
final long millis =
1_000 // initial backoff
* (long) (Math.pow(2, i)) // backoff doubles with each retry
* (long) Math.pow(2, i) // backoff doubles with each retry
+ 500; // give the producer some time to recover
LOGGER.info("Want {} invocations after {}ms.", wantedNumberOfInvocations, millis);
verify(session, after(millis)
Expand Down Expand Up @@ -429,6 +430,7 @@ public void testPublishMessageWithAmqpProperties() throws Exception {
.set("application-property-with-dash", "value0")
.set("amqp.application.property:to", "value1")
.set("amqp.application.property:anotherApplicationProperty", "value2")
.set("amqp.message.annotation:message-annotation", "value3")
.build()
))
.build()
Expand All @@ -446,7 +448,9 @@ public void testPublishMessageWithAmqpProperties() throws Exception {

final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
verify(messageProducer, timeout(2000)).send(messageCaptor.capture(), any(CompletionListener.class));
final Message message = messageCaptor.getValue();
final JmsMessage message = messageCaptor.getValue();
assertThat(message.getFacade().getTracingAnnotation("message-annotation")).as("Sets message annotation " +
"as tracing annotation").isEqualTo("value3");
final Map<String, String> receivedHeaders =
JMSPropertyMapper.getHeadersFromProperties(message);

Expand All @@ -462,6 +466,7 @@ public void testPublishMessageWithAmqpProperties() throws Exception {
assertThat(receivedHeaders).containsEntry("amqp.application.property:to", "value1");
assertThat(receivedHeaders).containsEntry("anotherApplicationProperty", "value2");
// group-sequence is an AMQP prop of type "int", therefore it must not be contained in the headers here
assertThat(receivedHeaders).containsEntry("message-annotation", "value3");
assertThat(receivedHeaders).doesNotContainKey("group-sequence");
}};

Expand Down

0 comments on commit b55f186

Please sign in to comment.