Skip to content

Commit

Permalink
NIFI-6957 - Added REGEX header property, and option to allow illegal …
Browse files Browse the repository at this point in the history
…chars in header names

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3942.
  • Loading branch information
r65535 authored and pvillard31 committed Dec 30, 2019
1 parent ad63678 commit 82b4fb0
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 13 deletions.
Expand Up @@ -118,6 +118,25 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
.defaultValue(Charset.defaultCharset().name())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor ALLOW_ILLEGAL_HEADER_CHARS = new PropertyDescriptor.Builder()
.name("allow-illegal-chars-in-jms-header-names")
.displayName("Allow Illegal Characters in Header Names")
.description("Specifies whether illegal characters in header names should be sent to the JMS broker. " +
"Usually hyphens and full-stops.")
.required(true)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ATTRIBUTES_AS_HEADERS_REGEX = new PropertyDescriptor.Builder()
.name("attributes-to-send-as-jms-headers-regex")
.displayName("Attributes to Send as JMS Headers (Regex)")
.description("Specifies the Regular Expression that determines the names of FlowFile attributes that" +
" should be sent as JMS Headers")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.defaultValue(".*")
.required(true)
.build();


static final PropertyDescriptor CF_SERVICE = new PropertyDescriptor.Builder()
Expand All @@ -141,6 +160,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
propertyDescriptors.add(SESSION_CACHE_SIZE);
propertyDescriptors.add(MESSAGE_BODY);
propertyDescriptors.add(CHARSET);
propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS);
propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
}

@Override
Expand Down
Expand Up @@ -151,6 +151,8 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(propertyDescriptors);
_propertyDescriptors.remove(MESSAGE_BODY);
_propertyDescriptors.remove(ALLOW_ILLEGAL_HEADER_CHARS);
_propertyDescriptors.remove(ATTRIBUTES_AS_HEADERS_REGEX);

// change the validator on CHARSET property
_propertyDescriptors.remove(CHARSET);
Expand Down
Expand Up @@ -35,7 +35,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

/**
* Generic publisher of messages to JMS compliant messaging system.
Expand Down Expand Up @@ -77,11 +76,7 @@ public Message createMessage(Session session) throws JMSException {
void setMessageHeaderAndProperties(final Message message, final Map<String, String> flowFileAttributes) throws JMSException {
if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {

Map<String, String> flowFileAttributesToSend = flowFileAttributes.entrySet().stream()
.filter(entry -> !entry.getKey().contains("-") && !entry.getKey().contains(".")) // '-' and '.' are illegal chars in JMS property names
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));

for (Entry<String, String> entry : flowFileAttributesToSend.entrySet()) {
for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
try {
if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue()));
Expand Down
Expand Up @@ -43,8 +43,11 @@
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

/**
* An implementation of JMS Message publishing {@link Processor} which upon each
Expand Down Expand Up @@ -121,10 +124,34 @@ protected void rendezvousWithJms(ProcessContext context, ProcessSession processS
try {
String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
Boolean allowIllegalChars = context.getProperty(ALLOW_ILLEGAL_HEADER_CHARS).asBoolean();
String attributeHeaderRegex = context.getProperty(ATTRIBUTES_AS_HEADERS_REGEX).getValue();

Map<String,String> attributesToSend = new HashMap<>();
// REGEX Attributes
final Pattern pattern = Pattern.compile(attributeHeaderRegex);
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
final String key = entry.getKey();
if (pattern.matcher(key).matches()) {
attributesToSend.put(key, flowFile.getAttribute(key));
}
}

// Optionally remove illegal headers names apart from .type attributes for JMS variable types
if (!allowIllegalChars) {
for (final Map.Entry<String,String> entry : attributesToSend.entrySet()) {
if (!entry.getKey().endsWith(".type")){
if (entry.getKey().contains("-") || entry.getKey().contains(".")) {
attributesToSend.remove(entry.getKey());
}
}
}
}

switch (context.getProperty(MESSAGE_BODY).getValue()) {
case TEXT_MESSAGE:
try {
publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), flowFile.getAttributes());
publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), attributesToSend);
} catch(Exception e) {
publisher.setValid(false);
throw e;
Expand All @@ -133,7 +160,7 @@ protected void rendezvousWithJms(ProcessContext context, ProcessSession processS
case BYTES_MESSAGE:
default:
try {
publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), attributesToSend);
} catch(Exception e) {
publisher.setValid(false);
throw e;
Expand Down
Expand Up @@ -17,7 +17,6 @@
package org.apache.nifi.jms.processors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -74,8 +73,8 @@ public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() thro
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("foo", "foo");
flowFileAttributes.put("illegal-property", "value");
flowFileAttributes.put("another.illegal", "value");
flowFileAttributes.put("hyphen-property", "value");
flowFileAttributes.put("fullstop.property", "value");
flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
flowFileAttributes.put(JmsHeaders.DELIVERY_MODE, "1");
flowFileAttributes.put(JmsHeaders.PRIORITY, "1");
Expand All @@ -85,8 +84,8 @@ public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() thro
Message receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage instanceof BytesMessage);
assertEquals("foo", receivedMessage.getStringProperty("foo"));
assertFalse(receivedMessage.propertyExists("illegal-property"));
assertFalse(receivedMessage.propertyExists("another.illegal"));
assertTrue(receivedMessage.propertyExists("hyphen-property"));
assertTrue(receivedMessage.propertyExists("fullstop.property"));
assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
assertEquals(1, receivedMessage.getJMSDeliveryMode());
assertEquals(1, receivedMessage.getJMSPriority());
Expand Down
Expand Up @@ -36,6 +36,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -62,6 +63,7 @@ public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
attributes.put("test-attribute", "value");
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it.

Expand All @@ -75,6 +77,7 @@ public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
assertEquals("Hey dude!", new String(messageBytes));
assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName());
assertEquals("foo", message.getStringProperty("foo"));
assertNull(message.getStringProperty("test-attribute"));

runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
}
Expand Down Expand Up @@ -253,4 +256,50 @@ public void validatePublishPropertyTypes() throws Exception {

runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
}

@Test(timeout = 10000)
public void validateRegexAndIllegalHeaders() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");

final String destinationName = "validatePublishTextMessage";
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);

runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);

runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, destinationName);
runner.setProperty(PublishJMS.MESSAGE_BODY, "text");
runner.setProperty(PublishJMS.ATTRIBUTES_AS_HEADERS_REGEX, "^((?!bar).)*$");
runner.setProperty(PublishJMS.ALLOW_ILLEGAL_HEADER_CHARS, "true");

Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
attributes.put("bar", "bar");
attributes.put("test-header-with-hyphen", "value");
attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false);

final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);

JmsTemplate jmst = new JmsTemplate(cf);
Message message = jmst.receive(destinationName);
assertTrue(message instanceof TextMessage);
TextMessage textMessage = (TextMessage) message;

byte[] messageBytes = MessageBodyToBytesConverter.toBytes(textMessage);
assertEquals("Hey dude!", new String(messageBytes));
assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName());
assertEquals("foo", message.getStringProperty("foo"));
assertEquals("value", message.getStringProperty("test-header-with-hyphen"));
assertNull(message.getStringProperty("bar"));

runner.run(1, true, false); // Run once just so that we can trigger the shutdown of the Connection Factory
}
}

0 comments on commit 82b4fb0

Please sign in to comment.