From d5a2423ce792eea5fbc0037e36ba7dd741db472c Mon Sep 17 00:00:00 2001 From: Mark Bean Date: Fri, 5 Oct 2018 14:08:28 +0000 Subject: [PATCH 1/2] NIFI-5660: JMSPublisher should not set header properties directly in the message --- .../org/apache/nifi/jms/processors/JMSPublisher.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java index c13f4b71b0a9..7711d6052895 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java @@ -84,11 +84,14 @@ void setMessageHeaderAndProperties(final Message message, final Map entry : flowFileAttributesToSend.entrySet()) { try { if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) { - message.setJMSDeliveryMode(Integer.parseInt(entry.getValue())); + this.jmsTemplate.setExplicitQosEnabled(true); + this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue())); } else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) { - message.setJMSExpiration(Integer.parseInt(entry.getValue())); + this.jmsTemplate.setExplicitQosEnabled(true); + this.jmsTemplate.setTimeToLive(Integer.parseInt(entry.getValue())); } else if (entry.getKey().equals(JmsHeaders.PRIORITY)) { - message.setJMSPriority(Integer.parseInt(entry.getValue())); + this.jmsTemplate.setExplicitQosEnabled(true); + this.jmsTemplate.setPriority(Integer.parseInt(entry.getValue())); } else if (entry.getKey().equals(JmsHeaders.REDELIVERED)) { message.setJMSRedelivered(Boolean.parseBoolean(entry.getValue())); } else if (entry.getKey().equals(JmsHeaders.TIMESTAMP)) { From 98cdfc096fb3bb7163c217db74decac3a31b91b8 Mon Sep 17 00:00:00 2001 From: Mark Bean Date: Tue, 12 Feb 2019 20:42:27 +0000 Subject: [PATCH 2/2] NIFI-5660: update to handle threading issue with JmsTemplate usage --- .../org/apache/nifi/jms/processors/AbstractJMSProcessor.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index 0094eaf78869..19ee28302719 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -33,6 +33,7 @@ import org.springframework.jms.core.JmsTemplate; import javax.jms.ConnectionFactory; +import javax.jms.Message; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; @@ -158,6 +159,10 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro try { rendezvousWithJms(context, session, worker); } finally { + worker.jmsTemplate.setExplicitQosEnabled(false); + worker.jmsTemplate.setDeliveryMode(Message.DEFAULT_DELIVERY_MODE); + worker.jmsTemplate.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); + worker.jmsTemplate.setPriority(Message.DEFAULT_PRIORITY); workerPool.offer(worker); } }