From 6949b448316de6613c8d3281a13c1eb454193d94 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 14 Mar 2018 14:04:03 -0400 Subject: [PATCH] NIFI-4976: If unable to retrieve message content, warn an error but acknowledge message. --- .../nifi/jms/processors/JMSConsumer.java | 34 +++++++++++++------ .../MessageBodyToBytesConverter.java | 31 +++-------------- .../processors/JMSPublisherConsumerIT.java | 2 +- 3 files changed, 30 insertions(+), 37 deletions(-) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java index 07aee320c486..809227779daf 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java @@ -32,6 +32,7 @@ import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.nifi.jms.processors.MessageBodyToBytesConverter.MessageConversionException; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.springframework.jms.connection.CachingConnectionFactory; @@ -96,13 +97,22 @@ public Void doInJms(final Session session) throws JMSException { if (message != null) { byte[] messageBody = null; - if (message instanceof TextMessage) { - messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset)); - } else if (message instanceof BytesMessage) { - messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); - } else { - throw new IllegalStateException("Message type other then TextMessage and BytesMessage are " - + "not supported at the moment"); + + try { + if (message instanceof TextMessage) { + messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset)); + } else if (message instanceof BytesMessage) { + messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); + } else { + processLog.error("Received a JMS Message that was neither a TextMessage nor a BytesMessage [{}]; will skip this message.", new Object[] {message}); + acknowledge(message, session); + return null; + } + } catch (final MessageConversionException mce) { + processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.", + new Object[] {message}, mce); + acknowledge(message, session); + return null; } final Map messageHeaders = extractMessageHeaders(message); @@ -115,9 +125,7 @@ public Void doInJms(final Session session) throws JMSException { // and ACK message *only* after its successful invocation // and if CLIENT_ACKNOWLEDGE is set. consumerCallback.accept(response); - if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { - message.acknowledge(); - } + acknowledge(message, session); } finally { JmsUtils.closeMessageConsumer(msgConsumer); } @@ -127,6 +135,12 @@ public Void doInJms(final Session session) throws JMSException { }, true); } + private void acknowledge(final Message message, final Session session) throws JMSException { + if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { + message.acknowledge(); + } + } + @SuppressWarnings("unchecked") private Map extractMessageProperties(final Message message) { diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java index 47827cc54c7a..e23e795750e7 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java @@ -72,22 +72,14 @@ public static byte[] toBytes(BytesMessage message){ } } - /** - * - */ + private static class BytesMessageInputStream extends InputStream { private BytesMessage message; - /** - * - */ public BytesMessageInputStream(BytesMessage message) { this.message = message; } - /** - * - */ @Override public int read() throws IOException { try { @@ -97,24 +89,19 @@ public int read() throws IOException { } } - /** - * - */ @Override public int read(byte[] buffer, int offset, int length) throws IOException { try { - if (offset == 0) + if (offset == 0) { return this.message.readBytes(buffer, length); - else + } else { return super.read(buffer, offset, length); + } } catch (JMSException e) { throw new IOException(e.toString()); } } - /** - * - */ @Override public int read(byte[] buffer) throws IOException { try { @@ -125,22 +112,14 @@ public int read(byte[] buffer) throws IOException { } } - /** - * - */ + static class MessageConversionException extends RuntimeException { private static final long serialVersionUID = -1464448549601643887L; - /** - * - */ public MessageConversionException(String msg) { super(msg); } - /** - * - */ public MessageConversionException(String msg, Throwable cause) { super(msg, cause); } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java index 83cd32030ec1..7812e7185aa8 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java @@ -99,7 +99,7 @@ public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() thro * used. The may change to the point where all message types are supported * at which point this test will no be longer required. */ - @Test(expected = IllegalStateException.class) + @Test public void validateFailOnUnsupportedMessageType() throws Exception { final String destinationName = "validateFailOnUnsupportedMessageType"; JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);