From 82b4fb06338aa1019f70950b0d467352d36d66b0 Mon Sep 17 00:00:00 2001 From: r65535 <56300962+r65535@users.noreply.github.com> Date: Wed, 18 Dec 2019 10:07:55 +0000 Subject: [PATCH] NIFI-6957 - Added REGEX header property, and option to allow illegal chars in header names Signed-off-by: Pierre Villard This closes #3942. --- .../jms/processors/AbstractJMSProcessor.java | 21 ++++++++ .../nifi/jms/processors/ConsumeJMS.java | 2 + .../nifi/jms/processors/JMSPublisher.java | 7 +-- .../nifi/jms/processors/PublishJMS.java | 31 +++++++++++- .../processors/JMSPublisherConsumerIT.java | 9 ++-- .../nifi/jms/processors/PublishJMSIT.java | 49 +++++++++++++++++++ 6 files changed, 106 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index e51238dd791c..e7a29d350e17 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -118,6 +118,25 @@ abstract class AbstractJMSProcessor extends AbstractProcess .defaultValue(Charset.defaultCharset().name()) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); + static final PropertyDescriptor ALLOW_ILLEGAL_HEADER_CHARS = new PropertyDescriptor.Builder() + .name("allow-illegal-chars-in-jms-header-names") + .displayName("Allow Illegal Characters in Header Names") + .description("Specifies whether illegal characters in header names should be sent to the JMS broker. " + + "Usually hyphens and full-stops.") + .required(true) + .defaultValue("false") + .allowableValues("true", "false") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor ATTRIBUTES_AS_HEADERS_REGEX = new PropertyDescriptor.Builder() + .name("attributes-to-send-as-jms-headers-regex") + .displayName("Attributes to Send as JMS Headers (Regex)") + .description("Specifies the Regular Expression that determines the names of FlowFile attributes that" + + " should be sent as JMS Headers") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .defaultValue(".*") + .required(true) + .build(); static final PropertyDescriptor CF_SERVICE = new PropertyDescriptor.Builder() @@ -141,6 +160,8 @@ abstract class AbstractJMSProcessor extends AbstractProcess propertyDescriptors.add(SESSION_CACHE_SIZE); propertyDescriptors.add(MESSAGE_BODY); propertyDescriptors.add(CHARSET); + propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS); + propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX); } @Override diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index 4b149e2652a3..eb4597cd10a1 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -151,6 +151,8 @@ public class ConsumeJMS extends AbstractJMSProcessor { List _propertyDescriptors = new ArrayList<>(); _propertyDescriptors.addAll(propertyDescriptors); _propertyDescriptors.remove(MESSAGE_BODY); + _propertyDescriptors.remove(ALLOW_ILLEGAL_HEADER_CHARS); + _propertyDescriptors.remove(ATTRIBUTES_AS_HEADERS_REGEX); // change the validator on CHARSET property _propertyDescriptors.remove(CHARSET); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java index 1ea61b67ee0c..17d5690e7e4a 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java @@ -35,7 +35,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.stream.Collectors; /** * Generic publisher of messages to JMS compliant messaging system. @@ -77,11 +76,7 @@ public Message createMessage(Session session) throws JMSException { void setMessageHeaderAndProperties(final Message message, final Map flowFileAttributes) throws JMSException { if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) { - Map flowFileAttributesToSend = flowFileAttributes.entrySet().stream() - .filter(entry -> !entry.getKey().contains("-") && !entry.getKey().contains(".")) // '-' and '.' are illegal chars in JMS property names - .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); - - for (Entry entry : flowFileAttributesToSend.entrySet()) { + for (Entry entry : flowFileAttributes.entrySet()) { try { if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) { this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue())); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java index 12451cf3dd5b..c95ec9d440bf 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java @@ -43,8 +43,11 @@ import java.io.StringWriter; import java.nio.charset.Charset; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; /** * An implementation of JMS Message publishing {@link Processor} which upon each @@ -121,10 +124,34 @@ protected void rendezvousWithJms(ProcessContext context, ProcessSession processS try { String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue(); String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue(); + Boolean allowIllegalChars = context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean(); + String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue(); + + Map attributesToSend = new HashMap<>(); + // REGEX Attributes + final Pattern pattern = Pattern.compile(attributeHeaderRegex); + for (final Map.Entry entry : flowFile.getAttributes().entrySet()) { + final String key = entry.getKey(); + if (pattern.matcher(key).matches()) { + attributesToSend.put(key, flowFile.getAttribute(key)); + } + } + + // Optionally remove illegal headers names apart from .type attributes for JMS variable types + if (!allowIllegalChars) { + for (final Map.Entry entry : attributesToSend.entrySet()) { + if (!entry.getKey().endsWith(".type")){ + if (entry.getKey().contains("-") || entry.getKey().contains(".")) { + attributesToSend.remove(entry.getKey()); + } + } + } + } + switch (context.getProperty(MESSAGE_BODY).getValue()) { case TEXT_MESSAGE: try { - publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), flowFile.getAttributes()); + publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), attributesToSend); } catch(Exception e) { publisher.setValid(false); throw e; @@ -133,7 +160,7 @@ protected void rendezvousWithJms(ProcessContext context, ProcessSession processS case BYTES_MESSAGE: default: try { - publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes()); + publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), attributesToSend); } catch(Exception e) { publisher.setValid(false); throw e; diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java index 12474ecd80e3..bc480a205674 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java @@ -17,7 +17,6 @@ package org.apache.nifi.jms.processors; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -74,8 +73,8 @@ public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() thro JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); Map flowFileAttributes = new HashMap<>(); flowFileAttributes.put("foo", "foo"); - flowFileAttributes.put("illegal-property", "value"); - flowFileAttributes.put("another.illegal", "value"); + flowFileAttributes.put("hyphen-property", "value"); + flowFileAttributes.put("fullstop.property", "value"); flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic"); flowFileAttributes.put(JmsHeaders.DELIVERY_MODE, "1"); flowFileAttributes.put(JmsHeaders.PRIORITY, "1"); @@ -85,8 +84,8 @@ public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() thro Message receivedMessage = jmsTemplate.receive(destinationName); assertTrue(receivedMessage instanceof BytesMessage); assertEquals("foo", receivedMessage.getStringProperty("foo")); - assertFalse(receivedMessage.propertyExists("illegal-property")); - assertFalse(receivedMessage.propertyExists("another.illegal")); + assertTrue(receivedMessage.propertyExists("hyphen-property")); + assertTrue(receivedMessage.propertyExists("fullstop.property")); assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic); assertEquals(1, receivedMessage.getJMSDeliveryMode()); assertEquals(1, receivedMessage.getJMSPriority()); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java index fa0bd7a6b916..ad3febdaa524 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSIT.java @@ -36,6 +36,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -62,6 +63,7 @@ public void validateSuccessfulPublishAndTransferToSuccess() throws Exception { Map attributes = new HashMap<>(); attributes.put("foo", "foo"); attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); + attributes.put("test-attribute", "value"); runner.enqueue("Hey dude!".getBytes(), attributes); runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it. @@ -75,6 +77,7 @@ public void validateSuccessfulPublishAndTransferToSuccess() throws Exception { assertEquals("Hey dude!", new String(messageBytes)); assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); assertEquals("foo", message.getStringProperty("foo")); + assertNull(message.getStringProperty("test-attribute")); runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory } @@ -253,4 +256,50 @@ public void validatePublishPropertyTypes() throws Exception { runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory } + + @Test(timeout = 10000) + public void validateRegexAndIllegalHeaders() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + + final String destinationName = "validatePublishTextMessage"; + PublishJMS pubProc = new PublishJMS(); + TestRunner runner = TestRunners.newTestRunner(pubProc); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(cf); + + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(PublishJMS.DESTINATION, destinationName); + runner.setProperty(PublishJMS.MESSAGE_BODY, "text"); + runner.setProperty(PublishJMS.ATTRIBUTES_AS_HEADERS_REGEX, "^((?!bar).)*$"); + runner.setProperty(PublishJMS.ALLOW_ILLEGAL_HEADER_CHARS, "true"); + + Map attributes = new HashMap<>(); + attributes.put("foo", "foo"); + attributes.put("bar", "bar"); + attributes.put("test-header-with-hyphen", "value"); + attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); + runner.enqueue("Hey dude!".getBytes(), attributes); + runner.run(1, false); + + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); + assertNotNull(successFF); + + JmsTemplate jmst = new JmsTemplate(cf); + Message message = jmst.receive(destinationName); + assertTrue(message instanceof TextMessage); + TextMessage textMessage = (TextMessage) message; + + byte[] messageBytes = MessageBodyToBytesConverter.toBytes(textMessage); + assertEquals("Hey dude!", new String(messageBytes)); + assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); + assertEquals("foo", message.getStringProperty("foo")); + assertEquals("value", message.getStringProperty("test-header-with-hyphen")); + assertNull(message.getStringProperty("bar")); + + runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory + } }