From 8e6b4ce352fe3c3f2576e0cf773172ce387b480c Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sat, 9 Jun 2018 13:43:24 -0400 Subject: [PATCH 1/3] NIFI-5287 Made LookupRecord able to take in flowfile attributes and combine them with lookup keys. --- .../processors/standard/LookupAttribute.java | 3 +- .../processors/standard/LookupRecord.java | 37 ++++++++++++- .../standard/TestLookupAttribute.java | 52 +++++++++++++++++++ .../processors/standard/TestLookupRecord.java | 49 +++++++++++++++++ .../org/apache/nifi/lookup/LookupService.java | 13 +++++ 5 files changed, 152 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java index 56ad58fce2e8..20ea00fcdafb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java @@ -222,7 +222,8 @@ private void onTrigger(ComponentLog logger, LookupService lookupService, final PropertyValue lookupKeyExpression = e.getValue(); final String lookupKey = lookupKeyExpression.evaluateAttributeExpressions(flowFile).getValue(); final String attributeName = e.getKey().getName(); - final Optional attributeValue = lookupService.lookup(Collections.singletonMap(coordinateKey, lookupKey)); + final Optional attributeValue = lookupService.lookup(Collections.singletonMap(coordinateKey, lookupKey), + flowFile.getAttributes()); matched = putAttribute(attributeName, attributeValue, attributes, includeEmptyValues, logger) || matched; if (!matched && logger.isDebugEnabled()) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 589272ff3fad..c5f852d235a7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.nifi.annotation.behavior.DynamicProperty; @@ -51,6 +52,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.RecordPath; import org.apache.nifi.record.path.RecordPathResult; @@ -120,6 +122,18 @@ public class LookupRecord extends AbstractRouteRecord getSupportedPropertyDescriptors() { properties.addAll(super.getSupportedPropertyDescriptors()); properties.add(LOOKUP_SERVICE); properties.add(RESULT_RECORD_PATH); + properties.add(ATTRIBUTES_REGEX); properties.add(ROUTING_STRATEGY); properties.add(RESULT_CONTENTS); return properties; @@ -255,6 +270,25 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String } } + private Map getMatchingFlowfileAttributes(FlowFile input, ProcessContext context) { + Map retVal = new HashMap<>(); + + if (context.getProperty(ATTRIBUTES_REGEX).isSet()) { + final String regex = context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions(input).getValue(); + final Pattern pattern = Pattern.compile(regex); + retVal.putAll(input.getAttributes() + .entrySet().stream() + .filter(e -> pattern.matcher(e.getKey()).matches()) + .collect(Collectors.toMap( + e -> e.getKey(), + e -> e.getValue() + )) + ); + } + + return retVal; + } + @Override protected Set route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context, final Tuple, RecordPath> flowFileContext) { @@ -292,7 +326,8 @@ protected Set route(final Record record, final RecordSchema writeS final Optional lookupValueOption; try { - lookupValueOption = lookupService.lookup(lookupCoordinates); + Map filteredAttributes = getMatchingFlowfileAttributes(flowFile, context); + lookupValueOption = lookupService.lookup(lookupCoordinates, filteredAttributes); } catch (final Exception e) { throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java index 1cf41b89b0e5..3f9d2fa2f031 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java @@ -24,6 +24,8 @@ import java.util.Set; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.LookupService; import org.apache.nifi.lookup.SimpleKeyValueLookupService; import org.apache.nifi.lookup.StringLookupService; import org.apache.nifi.reporting.InitializationException; @@ -31,6 +33,7 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.assertNotNull; @@ -140,6 +143,27 @@ public void testCustomValidateMissingDynamicProps() throws InitializationExcepti runner.assertNotValid(); } + @Test + public void testLookupServicePassFlowfileAttributes() throws InitializationException { + final LookupService service = new TestService(); + + final TestRunner runner = TestRunners.newTestRunner(new LookupAttribute()); + runner.addControllerService("simple-key-value-lookup-service", service); + runner.enableControllerService(service); + runner.assertValid(service); + runner.setProperty(LookupAttribute.LOOKUP_SERVICE, "simple-key-value-lookup-service"); + runner.setProperty(LookupAttribute.INCLUDE_EMPTY_VALUES, "false"); + runner.setProperty("baz", "${attr1}"); + runner.assertValid(); + + final Map attributes = new HashMap<>(); + attributes.put("user_defined", "key4"); + + runner.enqueue("some content".getBytes(), attributes); + runner.run(1, false); + runner.assertAllFlowFilesTransferred(LookupAttribute.REL_MATCHED, 1); + } + private static class InvalidLookupService extends AbstractControllerService implements StringLookupService { @Override public Optional lookup(Map coordinates) { @@ -155,4 +179,32 @@ public Set getRequiredKeys() { } } + static class TestService extends AbstractControllerService implements StringLookupService { + @Override + public Optional lookup(Map coordinates, Map context) throws LookupFailureException { + Assert.assertNotNull(coordinates); + Assert.assertNotNull(context); + Assert.assertEquals(1, coordinates.size()); + Assert.assertTrue(context.containsKey("user_defined")); + + return Optional.of("Test!"); + } + + @Override + public Optional lookup(Map coordinates) throws LookupFailureException { + return Optional.empty(); + } + + @Override + public Class getValueType() { + return String.class; + } + + @Override + public Set getRequiredKeys() { + Set set = new HashSet(); + set.add("key"); + return set; + } + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index 8cdce71499df..5dcb85af5277 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -44,6 +44,7 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -84,6 +85,34 @@ public void setup() throws InitializationException { recordReader.addRecord("Jimmy Doe", 14, null); } + @Test + public void testAttributeRegex() { + Map attrs = new HashMap<>(); + attrs.put("schema.name", "person"); + attrs.put("something_something", "test"); + + runner.setProperty(LookupRecord.ATTRIBUTES_REGEX, "(schema.*|.*_something$)"); + + Map expected = new HashMap<>(); + expected.putAll(attrs); + + lookupService.setRequiredCoordinates(expected); + + lookupService.addValue("John Doe", "Soccer"); + lookupService.addValue("Jane Doe", "Basketball"); + lookupService.addValue("Jimmy Doe", "Football"); + + runner.enqueue("", attrs); + runner.run(); + + runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0); + + out.assertAttributeEquals("record.count", "3"); + out.assertAttributeEquals("mime.type", "text/plain"); + out.assertContentEquals("John Doe,48,Soccer\nJane Doe,47,Basketball\nJimmy Doe,14,Football\n"); + } + @Test public void testAllMatch() throws InitializationException { lookupService.addValue("John Doe", "Soccer"); @@ -372,6 +401,7 @@ public void testAddFieldsToNonRecordField() throws InitializationException { private static class MapLookup extends AbstractControllerService implements StringLookupService { private final Map values = new HashMap<>(); + private Map expectedCoordinates; public void addValue(final String key, final String value) { values.put(key, value); @@ -382,6 +412,11 @@ public Class getValueType() { return String.class; } + public Optional lookup(final Map coordinates, Map context) { + enforceRequiredCoordinates(context); + return lookup(coordinates); + } + @Override public Optional lookup(final Map coordinates) { if (coordinates == null || coordinates.get("lookup") == null) { @@ -400,6 +435,20 @@ public Optional lookup(final Map coordinates) { public Set getRequiredKeys() { return Collections.singleton("lookup"); } + + public void setRequiredCoordinates(Map expectedCoordinates) { + this.expectedCoordinates = expectedCoordinates; + } + + private void enforceRequiredCoordinates(Map context) { + if (expectedCoordinates != null) { + for (Map.Entry entry : expectedCoordinates.entrySet()) { + Assert.assertTrue(String.format("%s was not in coordinates.", entry.getKey()), + context.containsKey(entry.getKey())); + Assert.assertEquals("Wrong value", entry.getValue(), context.get(entry.getKey())); + } + } + } } private static class RecordLookup extends AbstractControllerService implements RecordLookupService { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java index a1f904d80f2d..8d9b537fdba3 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java @@ -35,6 +35,19 @@ public interface LookupService extends ControllerService { */ Optional lookup(Map coordinates) throws LookupFailureException; + /** + * Looks up a value that corresponds to the given map, coordinates. FlowFile attributes will also be passed into the + * map labeled context. + * + * @param coordinates a Map of key/value pairs that indicate the information that should be looked up + * @param context a Map of FlowFile attributes + * @return a value that corresponds to the given coordinates + * @throws LookupFailureException if unable to lookup a value for the given coordinates + */ + default Optional lookup(Map coordinates, Map context) throws LookupFailureException { + return lookup(coordinates); + } + /** * @return the Class that represents the type of value that will be returned by {@link #lookup(Map)} */ From 6624dee3b4329bb8cfc41afa607ed39f3102bc05 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Tue, 12 Jun 2018 13:48:19 -0400 Subject: [PATCH 2/3] NIFI-5287 Removed unneeded property descriptor. --- .../processors/standard/LookupRecord.java | 61 ++++--------------- .../processors/standard/TestLookupRecord.java | 4 +- 2 files changed, 14 insertions(+), 51 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index c5f852d235a7..c687f74641a4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -17,19 +17,6 @@ package org.apache.nifi.processors.standard; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -52,7 +39,6 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.RecordPath; import org.apache.nifi.record.path.RecordPathResult; @@ -63,6 +49,18 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.Tuple; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + @EventDriven @SideEffectFree @@ -122,18 +120,6 @@ public class LookupRecord extends AbstractRouteRecord getSupportedPropertyDescriptors() { properties.addAll(super.getSupportedPropertyDescriptors()); properties.add(LOOKUP_SERVICE); properties.add(RESULT_RECORD_PATH); - properties.add(ATTRIBUTES_REGEX); properties.add(ROUTING_STRATEGY); properties.add(RESULT_CONTENTS); return properties; @@ -270,25 +255,6 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String } } - private Map getMatchingFlowfileAttributes(FlowFile input, ProcessContext context) { - Map retVal = new HashMap<>(); - - if (context.getProperty(ATTRIBUTES_REGEX).isSet()) { - final String regex = context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions(input).getValue(); - final Pattern pattern = Pattern.compile(regex); - retVal.putAll(input.getAttributes() - .entrySet().stream() - .filter(e -> pattern.matcher(e.getKey()).matches()) - .collect(Collectors.toMap( - e -> e.getKey(), - e -> e.getValue() - )) - ); - } - - return retVal; - } - @Override protected Set route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context, final Tuple, RecordPath> flowFileContext) { @@ -326,8 +292,7 @@ protected Set route(final Record record, final RecordSchema writeS final Optional lookupValueOption; try { - Map filteredAttributes = getMatchingFlowfileAttributes(flowFile, context); - lookupValueOption = lookupService.lookup(lookupCoordinates, filteredAttributes); + lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes()); } catch (final Exception e) { throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index 5dcb85af5277..758a9f21dc84 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -86,13 +86,11 @@ public void setup() throws InitializationException { } @Test - public void testAttributeRegex() { + public void testFlowfileAttributesPassed() { Map attrs = new HashMap<>(); attrs.put("schema.name", "person"); attrs.put("something_something", "test"); - runner.setProperty(LookupRecord.ATTRIBUTES_REGEX, "(schema.*|.*_something$)"); - Map expected = new HashMap<>(); expected.putAll(attrs); From 4255716e2c4a16f35262da592984f122d40035e5 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Wed, 13 Jun 2018 20:06:13 -0400 Subject: [PATCH 3/3] NIFI-5287 Added additional changes from a code review. --- .../processors/standard/TestLookupRecord.java | 16 ++++++++-------- .../org/apache/nifi/lookup/LookupService.java | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index 758a9f21dc84..30b2b249b301 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -94,7 +94,7 @@ public void testFlowfileAttributesPassed() { Map expected = new HashMap<>(); expected.putAll(attrs); - lookupService.setRequiredCoordinates(expected); + lookupService.setExpectedContext(expected); lookupService.addValue("John Doe", "Soccer"); lookupService.addValue("Jane Doe", "Basketball"); @@ -399,7 +399,7 @@ public void testAddFieldsToNonRecordField() throws InitializationException { private static class MapLookup extends AbstractControllerService implements StringLookupService { private final Map values = new HashMap<>(); - private Map expectedCoordinates; + private Map expectedContext; public void addValue(final String key, final String value) { values.put(key, value); @@ -411,7 +411,7 @@ public Class getValueType() { } public Optional lookup(final Map coordinates, Map context) { - enforceRequiredCoordinates(context); + validateContext(context); return lookup(coordinates); } @@ -434,13 +434,13 @@ public Set getRequiredKeys() { return Collections.singleton("lookup"); } - public void setRequiredCoordinates(Map expectedCoordinates) { - this.expectedCoordinates = expectedCoordinates; + public void setExpectedContext(Map expectedContext) { + this.expectedContext = expectedContext; } - private void enforceRequiredCoordinates(Map context) { - if (expectedCoordinates != null) { - for (Map.Entry entry : expectedCoordinates.entrySet()) { + private void validateContext(Map context) { + if (expectedContext != null) { + for (Map.Entry entry : expectedContext.entrySet()) { Assert.assertTrue(String.format("%s was not in coordinates.", entry.getKey()), context.containsKey(entry.getKey())); Assert.assertEquals("Wrong value", entry.getValue(), context.get(entry.getKey())); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java index 8d9b537fdba3..6ef1a5a92088 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java @@ -36,11 +36,11 @@ public interface LookupService extends ControllerService { Optional lookup(Map coordinates) throws LookupFailureException; /** - * Looks up a value that corresponds to the given map, coordinates. FlowFile attributes will also be passed into the - * map labeled context. + * Looks up a value that corresponds to the given map, coordinates. Additional contextual information will also be passed into the + * map labeled context from sources such as flowfile attributes. * * @param coordinates a Map of key/value pairs that indicate the information that should be looked up - * @param context a Map of FlowFile attributes + * @param context a Map of additional information * @return a value that corresponds to the given coordinates * @throws LookupFailureException if unable to lookup a value for the given coordinates */