Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAMEL-20444 - Camel-Azure-Servicebus: Support setting of CorrelationId on producer #13237

Merged
merged 2 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public final class ServiceBusConstants {
public static final String APPLICATION_PROPERTIES = HEADER_PREFIX + "ApplicationProperties";
@Metadata(label = "consumer", description = "Gets the content type of the message.", javaType = "String")
public static final String CONTENT_TYPE = HEADER_PREFIX + "ContentType";
@Metadata(label = "consumer", description = "Gets a correlation identifier.", javaType = "String")
public static final String CORRELATION_ID = HEADER_PREFIX + "CorrelationId";
@Metadata(label = "consumer", description = "Gets the description for a message that has been dead-lettered.",
javaType = "String")
public static final String DEAD_LETTER_ERROR_DESCRIPTION = HEADER_PREFIX + "DeadLetterErrorDescription";
Expand Down Expand Up @@ -98,6 +96,10 @@ public final class ServiceBusConstants {
javaType = "org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition")
public static final String PRODUCER_OPERATION = HEADER_PREFIX + "ProducerOperation";

// headers evaluated by the producer and consumer
@Metadata(label = "common", description = "Gets or Sets a correlation identifier.", javaType = "String")
public static final String CORRELATION_ID = HEADER_PREFIX + "CorrelationId";

private ServiceBusConstants() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,23 @@ private BiConsumer<Exchange, AsyncCallback> sendMessages() {
final Object inputBody = exchange.getMessage().getBody();
final Map<String, Object> applicationProperties
= exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.class);
final String correlationId = exchange.getMessage().getHeader(ServiceBusConstants.CORRELATION_ID, String.class);

Mono<Void> sendMessageAsync;

if (inputBody instanceof Iterable<?>) {
sendMessageAsync
= serviceBusSenderOperations.sendMessages(convertBodyToList((Iterable<?>) inputBody),
configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties,
correlationId);
} else {
Object convertedBody = inputBody instanceof BinaryData ? inputBody
: getConfiguration().isBinary() ? convertBodyToBinary(exchange)
: exchange.getMessage().getBody(String.class);

sendMessageAsync = serviceBusSenderOperations.sendMessages(convertedBody,
configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties,
correlationId);
}

subscribeToMono(sendMessageAsync, exchange, noop -> {
Expand All @@ -176,6 +179,7 @@ private BiConsumer<Exchange, AsyncCallback> scheduleMessages() {
final Object inputBody = exchange.getMessage().getBody();
final Map<String, Object> applicationProperties
= exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.class);
final String correlationId = exchange.getMessage().getHeader(ServiceBusConstants.CORRELATION_ID, String.class);

Mono<List<Long>> scheduleMessagesAsync;

Expand All @@ -184,7 +188,8 @@ private BiConsumer<Exchange, AsyncCallback> scheduleMessages() {
= serviceBusSenderOperations.scheduleMessages(convertBodyToList((Iterable<?>) inputBody),
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
applicationProperties);
applicationProperties,
correlationId);
} else {
Object convertedBody = inputBody instanceof BinaryData ? inputBody
: getConfiguration().isBinary() ? convertBodyToBinary(exchange)
Expand All @@ -193,7 +198,8 @@ private BiConsumer<Exchange, AsyncCallback> scheduleMessages() {
= serviceBusSenderOperations.scheduleMessages(convertedBody,
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
applicationProperties);
applicationProperties,
correlationId);
}

subscribeToMono(scheduleMessagesAsync, exchange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@

import com.azure.core.util.BinaryData;
import com.azure.messaging.servicebus.ServiceBusMessage;
import org.apache.camel.util.ObjectHelper;

public final class ServiceBusUtils {

private ServiceBusUtils() {
}

public static ServiceBusMessage createServiceBusMessage(
final Object data, final Map<String, Object> applicationProperties) {
final Object data, final Map<String, Object> applicationProperties, final String correlationId) {
ServiceBusMessage serviceBusMessage;
if (data instanceof String) {
serviceBusMessage = new ServiceBusMessage((String) data);
Expand All @@ -43,13 +44,16 @@ public static ServiceBusMessage createServiceBusMessage(
if (applicationProperties != null) {
serviceBusMessage.getRawAmqpMessage().getApplicationProperties().putAll(applicationProperties);
}
if (ObjectHelper.isNotEmpty(correlationId)) {
serviceBusMessage.setCorrelationId(correlationId);
}
return serviceBusMessage;
}

public static Iterable<ServiceBusMessage> createServiceBusMessages(
final Iterable<?> data, final Map<String, Object> applicationProperties) {
final Iterable<?> data, final Map<String, Object> applicationProperties, final String correlationId) {
return StreamSupport.stream(data.spliterator(), false)
.map(obj -> createServiceBusMessage(obj, applicationProperties))
.map(obj -> createServiceBusMessage(obj, applicationProperties, correlationId))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,39 @@ public ServiceBusSenderOperations(ServiceBusSenderAsyncClientWrapper client) {
public Mono<Void> sendMessages(
final Object data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final Map<String, Object> applicationProperties,
final String correlationId) {
if (data instanceof Iterable<?>) {
return sendMessages((Iterable<?>) data, context, applicationProperties);
return sendMessages((Iterable<?>) data, context, applicationProperties, correlationId);
}

return sendMessage(data, context, applicationProperties);
return sendMessage(data, context, applicationProperties, correlationId);
}

public Mono<List<Long>> scheduleMessages(
final Object data,
final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final Map<String, Object> applicationProperties,
final String correlationId) {
if (ObjectHelper.isEmpty(scheduledEnqueueTime)) {
throw new IllegalArgumentException("To schedule a message, you need to set scheduledEnqueueTime.");
}

if (data instanceof Iterable<?>) {
return scheduleMessages((Iterable<?>) data, scheduledEnqueueTime, context, applicationProperties);
return scheduleMessages((Iterable<?>) data, scheduledEnqueueTime, context, applicationProperties, correlationId);
}

return scheduleMessage(data, scheduledEnqueueTime, context, applicationProperties);
return scheduleMessage(data, scheduledEnqueueTime, context, applicationProperties, correlationId);
}

private Mono<Void> sendMessages(
final Iterable<?> data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
final Map<String, Object> applicationProperties,
final String correlationId) {
final Iterable<ServiceBusMessage> messages
= ServiceBusUtils.createServiceBusMessages(data, applicationProperties, correlationId);

if (ObjectHelper.isEmpty(context)) {
return client.sendMessages(messages);
Expand All @@ -81,8 +85,9 @@ private Mono<Void> sendMessages(
private Mono<Void> sendMessage(
final Object data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties);
final Map<String, Object> applicationProperties,
final String correlationId) {
final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties, correlationId);

if (ObjectHelper.isEmpty(context)) {
return client.sendMessage(message);
Expand All @@ -95,8 +100,9 @@ private Mono<List<Long>> scheduleMessage(
final Object data,
final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties);
final Map<String, Object> applicationProperties,
final String correlationId) {
final ServiceBusMessage message = ServiceBusUtils.createServiceBusMessage(data, applicationProperties, correlationId);

if (ObjectHelper.isEmpty(context)) {
return client.scheduleMessage(message, scheduledEnqueueTime)
Expand All @@ -110,8 +116,10 @@ private Mono<List<Long>> scheduleMessage(
private Mono<List<Long>> scheduleMessages(
final Iterable<?> data, final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
final Map<String, Object> applicationProperties,
final String correlationId) {
final Iterable<ServiceBusMessage> messages
= ServiceBusUtils.createServiceBusMessages(data, applicationProperties, correlationId);

if (ObjectHelper.isEmpty(context)) {
return client.scheduleMessages(messages, scheduledEnqueueTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ public class ServiceBusUtilsTest {
@Test
void testCreateServiceBusMessage() {
// test string
final ServiceBusMessage message1 = ServiceBusUtils.createServiceBusMessage("test string", null);
final ServiceBusMessage message1 = ServiceBusUtils.createServiceBusMessage("test string", null, null);

assertEquals("test string", message1.getBody().toString());

// test int
final ServiceBusMessage message2 = ServiceBusUtils.createServiceBusMessage(String.valueOf(12345), null);
final ServiceBusMessage message2 = ServiceBusUtils.createServiceBusMessage(String.valueOf(12345), null, null);

assertEquals("12345", message2.getBody().toString());

//test bytes
byte[] testByteBody = "test string".getBytes(StandardCharsets.UTF_8);
final ServiceBusMessage message3 = ServiceBusUtils.createServiceBusMessage(testByteBody, null);
final ServiceBusMessage message3 = ServiceBusUtils.createServiceBusMessage(testByteBody, null, null);
assertArrayEquals(testByteBody, message3.getBody().toBytes());
}

Expand All @@ -53,7 +53,7 @@ void testCreateServiceBusMessages() {
inputMessages.add("test data");
inputMessages.add(String.valueOf(12345));

final Iterable<ServiceBusMessage> busMessages = ServiceBusUtils.createServiceBusMessages(inputMessages, null);
final Iterable<ServiceBusMessage> busMessages = ServiceBusUtils.createServiceBusMessages(inputMessages, null, null);

assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
.anyMatch(record -> record.getBody().toString().equals("test data")));
Expand All @@ -67,7 +67,7 @@ void testCreateServiceBusMessages() {
inputMessages2.add(byteBody1);
inputMessages2.add(byteBody2);

final Iterable<ServiceBusMessage> busMessages2 = ServiceBusUtils.createServiceBusMessages(inputMessages2, null);
final Iterable<ServiceBusMessage> busMessages2 = ServiceBusUtils.createServiceBusMessages(inputMessages2, null, null);

assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
.anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody1)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void testReceiveMessages() throws InterruptedException {
inputBatch.add("test batch 3");

new ServiceBusSenderOperations(new ServiceBusSenderAsyncClientWrapper(senderAsyncClient))
.sendMessages(inputBatch, null, null)
.sendMessages(inputBatch, null, null, null)
.block();

// test the data now
Expand Down Expand Up @@ -83,7 +83,7 @@ void testPeekMessages() throws InterruptedException {
inputBatch.add("peek test batch 3");

new ServiceBusSenderOperations(new ServiceBusSenderAsyncClientWrapper(senderAsyncClient))
.sendMessages(inputBatch, null, null)
.sendMessages(inputBatch, null, null, null)
.block();

// test the data now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void closeSubscriber() {
void testSendSingleMessage() {
final ServiceBusSenderOperations operations = new ServiceBusSenderOperations(clientSenderWrapper);

operations.sendMessages("test data", null, Map.of("customKey", "customValue")).block();
operations.sendMessages("test data", null, Map.of("customKey", "customValue"), null).block();

final boolean exists = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
.anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString().equals("test data"));
Expand All @@ -97,15 +97,15 @@ void testSendSingleMessage() {

//test bytes
byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
operations.sendMessages(testByteBody, null, Map.of("customKey", "customValue")).block();
operations.sendMessages(testByteBody, null, Map.of("customKey", "customValue"), null).block();
final boolean exists2 = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(),
testByteBody));
assertTrue(exists2, "test byte body");

// test if we have something other than string or byte[]
assertThrows(IllegalArgumentException.class, () -> {
operations.sendMessages(12345, null, null).block();
operations.sendMessages(12345, null, null, null).block();
});
}

Expand All @@ -118,7 +118,7 @@ void testSendingBatchMessages() {
inputBatch.add("test batch 2");
inputBatch.add("test batch 3");

operations.sendMessages(inputBatch, null, null).block();
operations.sendMessages(inputBatch, null, null, null).block();

final Spliterator<ServiceBusReceivedMessage> receivedMessages
= clientReceiverWrapper.receiveMessages().toIterable().spliterator();
Expand All @@ -143,7 +143,7 @@ void testSendingBatchMessages() {
inputBatch2.add(byteBody1);
inputBatch2.add(byteBody2);

operations.sendMessages(inputBatch2, null, null).block();
operations.sendMessages(inputBatch2, null, null, null).block();
final Spliterator<ServiceBusReceivedMessage> receivedMessages2
= clientReceiverWrapper.receiveMessages().toIterable().spliterator();

Expand All @@ -161,7 +161,7 @@ void testSendingBatchMessages() {
void testScheduleMessage() {
final ServiceBusSenderOperations operations = new ServiceBusSenderOperations(clientSenderWrapper);

operations.scheduleMessages("testScheduleMessage", OffsetDateTime.now(), null, null).block();
operations.scheduleMessages("testScheduleMessage", OffsetDateTime.now(), null, null, null).block();

final boolean exists = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
.anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString()
Expand All @@ -171,15 +171,15 @@ void testScheduleMessage() {

//test bytes
byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
operations.scheduleMessages(testByteBody, OffsetDateTime.now(), null, null).block();
operations.scheduleMessages(testByteBody, OffsetDateTime.now(), null, null, null).block();
final boolean exists2 = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(),
testByteBody));
assertTrue(exists2, "test byte body");

// test if we have something other than string or byte[]
assertThrows(IllegalArgumentException.class, () -> {
operations.scheduleMessages(12345, OffsetDateTime.now(), null, null).block();
operations.scheduleMessages(12345, OffsetDateTime.now(), null, null, null).block();
});
}

Expand All @@ -192,7 +192,7 @@ void testSchedulingBatchMessages() {
inputBatch.add("testSchedulingBatchMessages 2");
inputBatch.add("testSchedulingBatchMessages 3");

operations.scheduleMessages(inputBatch, OffsetDateTime.now(), null, null).block();
operations.scheduleMessages(inputBatch, OffsetDateTime.now(), null, null, null).block();

final Spliterator<ServiceBusReceivedMessage> receivedMessages
= clientReceiverWrapper.receiveMessages().toIterable().spliterator();
Expand Down Expand Up @@ -220,7 +220,7 @@ void testSchedulingBatchMessages() {
inputBatch2.add(byteBody1);
inputBatch2.add(byteBody2);

operations.scheduleMessages(inputBatch2, OffsetDateTime.now(), null, null).block();
operations.scheduleMessages(inputBatch2, OffsetDateTime.now(), null, null, null).block();
final Spliterator<ServiceBusReceivedMessage> receivedMessages2
= clientReceiverWrapper.receiveMessages().toIterable().spliterator();

Expand Down