Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.camel.component.azure.storage.queue;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
Expand Down Expand Up @@ -111,10 +112,24 @@ public int processBatch(Queue<Object> exchanges) {
// update pending number of exchanges
pendingExchanges = total - index - 1;

// copy messageId, popReceipt, timeout for fix exchange override case
// azure storage blob can override this headers
final String messageId = exchange.getIn()
.getHeader(QueueConstants.MESSAGE_ID, String.class);
final String popReceipt = exchange.getIn()
.getHeader(QueueConstants.POP_RECEIPT, String.class);
final Duration timeout = exchange.getIn()
.getHeader(QueueConstants.TIMEOUT, Duration.class);

// add on completion to handle after work when the exchange is done
exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
@Override
public void onComplete(Exchange exchange) {
// past messageId, popReceipt, timeout for fix exchange override case
exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, messageId);
exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, popReceipt);
exchange.getIn().setHeader(QueueConstants.TIMEOUT, timeout);

processCommit(exchange);
}

Expand Down