From 113b6ceec2ebdba8459bee30cf05568b6e5718a9 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 18 Aug 2017 12:19:11 +0200 Subject: [PATCH 1/2] NIFI-1923 - AttributesToJson regex property --- .../processors/standard/AttributesToJSON.java | 47 +++++++++++++++---- .../standard/TestAttributesToJSON.java | 33 +++++++++++++ 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java index cfa4cfe00ff8..cd163a3655b3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.ArrayList; import java.util.Set; +import java.util.regex.Pattern; import java.util.HashSet; import java.util.Map; import java.util.Collections; @@ -79,6 +80,17 @@ public class AttributesToJSON extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + + public static final PropertyDescriptor ATTRIBUTES_REGEX = new PropertyDescriptor.Builder() + .name("attributes-to-json-regex") + .displayName("Attributes Regular Expression") + .description("Regular expression that will be evaluated against the flow file attributes to select " + + "the matching attributes. This property can be used in combination with the attributes " + + "list property.") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() .name("Destination") .description("Control if JSON value is written as a new flowfile attribute '" + JSON_ATTRIBUTE_NAME + "' " + @@ -120,11 +132,13 @@ public class AttributesToJSON extends AbstractProcessor { private volatile Set attributes; private volatile Boolean nullValueForEmptyString; private volatile boolean destinationContent; + private volatile Pattern pattern; @Override protected void init(final ProcessorInitializationContext context) { final List properties = new ArrayList<>(); properties.add(ATTRIBUTES_LIST); + properties.add(ATTRIBUTES_REGEX); properties.add(DESTINATION); properties.add(INCLUDE_CORE_ATTRIBUTES); properties.add(NULL_VALUE_FOR_EMPTY_STRING); @@ -153,17 +167,27 @@ public Set getRelationships() { * @return * Map of values that are feed to a Jackson ObjectMapper */ - protected Map buildAttributesMapForFlowFile(FlowFile ff, Set attributes, Set attributesToRemove, boolean nullValForEmptyString) { + protected Map buildAttributesMapForFlowFile(FlowFile ff, Set attributes, Set attributesToRemove, + boolean nullValForEmptyString, Pattern attPattern) { Map result; //If list of attributes specified get only those attributes. Otherwise write them all - if (attributes != null) { - result = new HashMap<>(attributes.size()); - for (String attribute : attributes) { - String val = ff.getAttribute(attribute); - if (val != null || nullValForEmptyString) { - result.put(attribute, val); - } else { - result.put(attribute, ""); + if (attributes != null || attPattern != null) { + result = new HashMap<>(); + if(attributes != null) { + for (String attribute : attributes) { + String val = ff.getAttribute(attribute); + if (val != null || nullValForEmptyString) { + result.put(attribute, val); + } else { + result.put(attribute, ""); + } + } + } + if(attPattern != null) { + for (Map.Entry e : ff.getAttributes().entrySet()) { + if(attPattern.matcher(e.getKey()).matches()) { + result.put(e.getKey(), e.getValue()); + } } } } else { @@ -204,6 +228,9 @@ public void onScheduled(ProcessContext context) { attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue(), attributesToRemove); nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean(); destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue()); + if(context.getProperty(ATTRIBUTES_REGEX).isSet()) { + pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).getValue()); + } } @Override @@ -213,7 +240,7 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro return; } - final Map atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString); + final Map atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove, nullValueForEmptyString, pattern); try { if (destinationContent) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java index 5c8df9bdc890..20478d5a3494 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java @@ -388,4 +388,37 @@ public void testAttribute_includeCoreAttributesAttribute() throws IOException { Set coreAttributes = Arrays.stream(CoreAttributes.values()).map(CoreAttributes::key).collect(Collectors.toSet()); val.keySet().forEach(k -> assertTrue(coreAttributes.contains(k))); } + + @Test + public void testAttributesRegex() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_REGEX, "delimited\\.header\\.column\\.[0-9]+"); + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "test, test1"); + + Map attributes = new HashMap(); + attributes.put("delimited.header.column.1", "Registry"); + attributes.put("delimited.header.column.2", "Assignment"); + attributes.put("delimited.header.column.3", "Organization Name"); + attributes.put("delimited.header.column.4", "Organization Address"); + attributes.put("delimited.footer.column.1", "not included"); + attributes.put("test", "test"); + attributes.put("test1", "test1"); + testRunner.enqueue("".getBytes(), attributes); + + testRunner.run(); + + testRunner.assertTransferCount(AttributesToJSON.REL_FAILURE, 0); + testRunner.assertTransferCount(AttributesToJSON.REL_SUCCESS, 1); + + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(AttributesToJSON.REL_SUCCESS).get(0); + + Map val = new ObjectMapper().readValue(flowFile.getAttribute(AttributesToJSON.JSON_ATTRIBUTE_NAME), HashMap.class); + assertTrue(val.keySet().contains("delimited.header.column.1")); + assertTrue(val.keySet().contains("delimited.header.column.2")); + assertTrue(val.keySet().contains("delimited.header.column.3")); + assertTrue(val.keySet().contains("delimited.header.column.4")); + assertTrue(!val.keySet().contains("delimited.footer.column.1")); + assertTrue(val.keySet().contains("test")); + assertTrue(val.keySet().contains("test1")); + } } From db707a8c761a11a61633fbbd16b44bdcc121ead0 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 22 Aug 2017 22:59:02 +0200 Subject: [PATCH 2/2] added EL support --- .../apache/nifi/processors/standard/AttributesToJSON.java | 6 ++++-- .../nifi/processors/standard/TestAttributesToJSON.java | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java index cd163a3655b3..6e00619a61ed 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AttributesToJSON.java @@ -88,7 +88,9 @@ public class AttributesToJSON extends AbstractProcessor { + "the matching attributes. This property can be used in combination with the attributes " + "list property.") .required(false) - .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() @@ -229,7 +231,7 @@ public void onScheduled(ProcessContext context) { nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean(); destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue()); if(context.getProperty(ATTRIBUTES_REGEX).isSet()) { - pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).getValue()); + pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions().getValue()); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java index 20478d5a3494..84f5c7d52cea 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAttributesToJSON.java @@ -392,7 +392,8 @@ public void testAttribute_includeCoreAttributesAttribute() throws IOException { @Test public void testAttributesRegex() throws IOException { final TestRunner testRunner = TestRunners.newTestRunner(new AttributesToJSON()); - testRunner.setProperty(AttributesToJSON.ATTRIBUTES_REGEX, "delimited\\.header\\.column\\.[0-9]+"); + testRunner.setVariable("regex", "delimited\\.header\\.column\\.[0-9]+"); + testRunner.setProperty(AttributesToJSON.ATTRIBUTES_REGEX, "${regex}"); testRunner.setProperty(AttributesToJSON.ATTRIBUTES_LIST, "test, test1"); Map attributes = new HashMap();