From 46b2c22a6f059881793412d65fd6f0c9df989224 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 25 Aug 2017 16:22:43 -0400 Subject: [PATCH] NIFI-4116: Allow fields of Record returned from Lookup Service to be placed into record in the input, instead of requiring that the 'wrapper record' returned from Lookup be included --- .../processors/standard/LookupRecord.java | 56 +++++- .../processors/standard/TestLookupRecord.java | 173 ++++++++++++++++++ 2 files changed, 224 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 10539bcfa38f..286f7eee7922 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -95,6 +95,11 @@ public class LookupRecord extends AbstractRouteRecord getSupportedPropertyDescriptors() { properties.add(LOOKUP_SERVICE); properties.add(RESULT_RECORD_PATH); properties.add(ROUTING_STRATEGY); + properties.add(RESULT_CONTENTS); return properties; } @@ -272,14 +288,14 @@ protected Set route(final Record record, final RecordSchema writeS lookupCoordinates.put(coordinateKey, coordinateValue); } - final Optional lookupValue; + final Optional lookupValueOption; try { - lookupValue = lookupService.lookup(lookupCoordinates); + lookupValueOption = lookupService.lookup(lookupCoordinates); } catch (final Exception e) { throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e); } - if (!lookupValue.isPresent()) { + if (!lookupValueOption.isPresent()) { final Set rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; return rels; } @@ -289,9 +305,39 @@ protected Set route(final Record record, final RecordSchema writeS if (resultPath != null) { record.incorporateSchema(writeSchema); - final Object replacementValue = lookupValue.get(); + final Object lookupValue = lookupValueOption.get(); final RecordPathResult resultPathResult = flowFileContext.getValue().evaluate(record); - resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue)); + + final String resultContentsValue = context.getProperty(RESULT_CONTENTS).getValue(); + if (RESULT_RECORD_FIELDS.getValue().equals(resultContentsValue) && lookupValue instanceof Record) { + final Record lookupRecord = (Record) lookupValue; + + // Use wants to add all fields of the resultant Record to the specified Record Path. + // If the destination Record Path returns to us a Record, then we will add all field values of + // the Lookup Record to the destination Record. However, if the destination Record Path returns + // something other than a Record, then we can't add the fields to it. We can only replace it, + // because it doesn't make sense to add fields to anything but a Record. + resultPathResult.getSelectedFields().forEach(fieldVal -> { + final Object destinationValue = fieldVal.getValue(); + + if (destinationValue instanceof Record) { + final Record destinationRecord = (Record) destinationValue; + + for (final String fieldName : lookupRecord.getRawFieldNames()) { + final Object value = lookupRecord.getValue(fieldName); + destinationRecord.setValue(fieldName, value); + } + } else { + final Optional parentOption = fieldVal.getParentRecord(); + + if (parentOption.isPresent()) { + parentOption.get().setValue(fieldVal.getField().getFieldName(), lookupRecord); + } + } + }); + } else { + resultPathResult.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(lookupValue)); + } } final Set rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index b84f518fd7fb..29966e7c516f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -17,19 +17,30 @@ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.lookup.RecordLookupService; import org.apache.nifi.lookup.StringLookupService; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -227,6 +238,137 @@ public void testInvalidUnlessAllRequiredPropertiesAdded() throws InitializationE } + @Test + public void testAddFieldsToExistingRecord() throws InitializationException, IOException { + final RecordLookup lookupService = new RecordLookup(); + runner.addControllerService("lookup", lookupService); + runner.enableControllerService(lookupService); + + final List fields = new ArrayList<>(); + fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("least", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + final Record sports = new MapRecord(schema, new HashMap()); + + sports.setValue("favorite", "basketball"); + sports.setValue("least", "soccer"); + + lookupService.addValue("John Doe", sports); + + recordReader = new MockRecordParser(); + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("favorite", RecordFieldType.STRING); + recordReader.addSchemaField("least", RecordFieldType.STRING); + + recordReader.addRecord("John Doe", 48, null, "baseball"); + + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + + runner.setProperty("lookup", "/name"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/"); + runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS); + + runner.enqueue(""); + runner.run(); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0); + out.assertContentEquals("John Doe,48,basketball,soccer\n"); + } + + /** + * If the output fields are added to a record that doesn't exist, the result should be that a Record is + * created and the results added to it. + */ + @Test + public void testAddFieldsToNonExistentRecord() throws InitializationException { + final RecordLookup lookupService = new RecordLookup(); + runner.addControllerService("lookup", lookupService); + runner.enableControllerService(lookupService); + + final List fields = new ArrayList<>(); + fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("least", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + final Record sports = new MapRecord(schema, new HashMap()); + + sports.setValue("favorite", "basketball"); + sports.setValue("least", "soccer"); + + lookupService.addValue("John Doe", sports); + + recordReader = new MockRecordParser(); + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.RECORD); + + recordReader.addRecord("John Doe", 48, null); + + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + + runner.setProperty("lookup", "/name"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport"); + runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS); + + runner.enqueue(""); + runner.run(); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0); + + // We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first + final String outputContents = new String(out.toByteArray()); + assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n") + || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n")); + } + + /** + * If the output fields are added to a non-record field, then the result should be that the field + * becomes a UNION that does allow the Record and the value is set to a Record. + */ + @Test + public void testAddFieldsToNonRecordField() throws InitializationException { + final RecordLookup lookupService = new RecordLookup(); + runner.addControllerService("lookup", lookupService); + runner.enableControllerService(lookupService); + + final List fields = new ArrayList<>(); + fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("least", RecordFieldType.STRING.getDataType())); + final RecordSchema schema = new SimpleRecordSchema(fields); + final Record sports = new MapRecord(schema, new HashMap()); + + sports.setValue("favorite", "basketball"); + sports.setValue("least", "soccer"); + + lookupService.addValue("John Doe", sports); + + recordReader = new MockRecordParser(); + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.STRING); + + recordReader.addRecord("John Doe", 48, null); + + runner.addControllerService("reader", recordReader); + runner.enableControllerService(recordReader); + + runner.setProperty("lookup", "/name"); + runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport"); + runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS); + + runner.enqueue(""); + runner.run(); + + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0); + + // We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first + final String outputContents = new String(out.toByteArray()); + assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n") + || outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n")); + } + private static class MapLookup extends AbstractControllerService implements StringLookupService { private final Map values = new HashMap<>(); @@ -260,4 +402,35 @@ public Set getRequiredKeys() { } } + private static class RecordLookup extends AbstractControllerService implements RecordLookupService { + private final Map values = new HashMap<>(); + + public void addValue(final String key, final Record value) { + values.put(key, value); + } + + @Override + public Class getValueType() { + return String.class; + } + + @Override + public Optional lookup(final Map coordinates) { + if (coordinates == null) { + return Optional.empty(); + } + + final String key = coordinates.get("lookup"); + if (key == null) { + return Optional.empty(); + } + + return Optional.ofNullable(values.get(key)); + } + + @Override + public Set getRequiredKeys() { + return Collections.singleton("lookup"); + } + } }