Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-9677 Fixed LookupRecord issue that an empty JSON array caused mismatch #8266

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;


Expand Down Expand Up @@ -487,9 +488,19 @@ public Set<Relationship> lookup(final Record record, final ProcessContext contex
final RecordPath recordPath = entry.getValue();

final RecordPathResult pathResult = recordPath.evaluate(record);
AtomicLong selectedFieldsCount = new AtomicLong(0);
final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
.filter(fieldVal -> fieldVal.getValue() != null)
.collect(Collectors.toList());
.filter(fieldVal -> {
selectedFieldsCount.incrementAndGet();
return fieldVal.getValue() != null;
})
.collect(Collectors.toList());

if (selectedFieldsCount.get() == 0) {
// When selectedFieldsCount == 0; then an empty array was found which counts as a match.
// Since the array is empty, no further processing is needed, so continue to next recordPath.
continue;
}

if (lookupFieldValues.isEmpty()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,100 @@ public void testLookupArray() throws InitializationException, IOException {
out.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output.json").toPath());
}

@Test
public void testLookupEmptyArray() throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);
final MapLookup lookupService = new MapLookupForInPlaceReplacement();

final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);

final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);

runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.addControllerService("writer", jsonWriter);
runner.enableControllerService(jsonWriter);
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);

runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES);
runner.setProperty(LookupRecord.RECORD_READER, "reader");
runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
runner.setProperty("lookupLanguage", "/locales[*]/language");
runner.setProperty("lookupRegion", "/locales[*]/region");
runner.setProperty("lookupCurrency", "/currencies[*]/currency");
runner.setProperty("lookupFoo", "/foo/foo");
runner.setProperty("lookupBar", "/bar");

lookupService.addValue("CA", "Canada");
lookupService.addValue("CAD", "Canadian dollar");
lookupService.addValue("en", "English");
lookupService.addValue("EUR", "Euro");
lookupService.addValue("ja", "Japanese");
lookupService.addValue("JP", "Japan");
lookupService.addValue("JPY", "Japanese yen");
lookupService.addValue("US", "United States");
lookupService.addValue("USD", "United States Dollar");
lookupService.addValue("fr", "French");
lookupService.addValue("FR", "France");
lookupService.addValue("original", "updated");
lookupService.addValue("orgValue", "newValue");
lookupService.addValue("orgValue2", "newValue2");

runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input-empty-array.json").toPath());
runner.run();

runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
out.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output-empty-array.json").toPath());
}

@Test
public void testLookupMissingJsonField() throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);
final MapLookup lookupService = new MapLookupForInPlaceReplacement();

final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);

final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.setProperty(jsonWriter, JsonRecordSetWriter.SUPPRESS_NULLS, JsonRecordSetWriter.ALWAYS_SUPPRESS);

runner.enableControllerService(jsonReader);
runner.enableControllerService(jsonWriter);
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);

runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES);
runner.setProperty(LookupRecord.RECORD_READER, "reader");
runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
runner.setProperty("lookupFoo", "/foo/foo");

lookupService.addValue("original", "updated");

runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input-missing.json").toPath());
runner.run();

runner.assertTransferCount(LookupRecord.REL_UNMATCHED, 1);
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
outUnmatched.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output-missing-unmatched.json").toPath());

runner.assertTransferCount(LookupRecord.REL_MATCHED, 1);
final MockFlowFile outMatched = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
outMatched.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output-missing-matched.json").toPath());
}

@Test
public void testLookupArrayKeyNotInLRS() throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[
{
"bar" : "orgValue",
"foo" : {
"foo" : "original"
},
"locales": [
{
"language": "fr",
"region": "FR"
}, {
"language": "en",
"region": "US"
}
],
"currencies": []
}, {
"bar" : "orgValue2",
"foo" : {
"foo" : "original"
},
"locales": [
{
"language": "fr",
"region": "CA"
},
{
"language": "ja",
"region": "JP"
}
],
"currencies": [
{
"currency": "CAD"
}, {
"currency": "JPY"
}
]
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[
{
"foo" : {
"foo" : "original"
},
"unmentioned" : {
"foo" : "original"
}
}, {
"unmentioned" : {
"foo" : "original"
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"bar":"newValue","foo":{"foo":"updated"},"locales":[{"language":"French","region":"France"},{"language":"English","region":"United States"}],"currencies":[]},{"bar":"newValue2","foo":{"foo":"updated"},"locales":[{"language":"French","region":"Canada"},{"language":"Japanese","region":"Japan"}],"currencies":[{"currency":"Canadian dollar"},{"currency":"Japanese yen"}]}]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"foo":{"foo":"updated"},"unmentioned":{"foo":"original"}}]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"unmentioned":{"foo":"original"}}]