diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 340aa467c10fe..de12e712bd5d6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -349,6 +349,16 @@ nifi-database-utils 1.10.0-SNAPSHOT + + com.github.jsurfer + jsurfer-core + 1.4.3 + + + com.github.jsurfer + jsurfer-jackson + 1.4.3 + org.apache.nifi nifi-standard-web-test-utils diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java index 0f310d7f964f0..0e4f3832fca82 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EvaluateJsonPath.java @@ -39,7 +39,10 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -86,6 +89,12 @@ value = "A JsonPath expression", description = "If ='flowfile-attribute' then that FlowFile attribute " + "will be set to any JSON objects that match the JsonPath. If ='flowfile-content' then the FlowFile " + "content will be updated to any JSON objects that match the JsonPath.") +@SystemResourceConsideration(resource = SystemResource.MEMORY, description = + "The entirety of the incoming FlowFile's content is kept in memory while processing, in addition to all " + + "of the generated FlowFiles or attributes resulting from the evaluation. If the JSON documents to be " + + "processed are large, then the SelectJson processor should be considered in order to avoid excessive " + + "memory usage.") +@SeeAlso(SelectJson.class) public class EvaluateJsonPath extends AbstractJsonPathProcessor { public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute"; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SelectJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SelectJson.java new file mode 100644 index 0000000000000..7a32fcdba5b1a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SelectJson.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.jsfr.json.JacksonParser; +import org.jsfr.json.JsonPathListener; +import org.jsfr.json.JsonSurfer; +import org.jsfr.json.ParsingContext; +import org.jsfr.json.compiler.JsonPathCompiler; +import org.jsfr.json.path.JsonPath; +import org.jsfr.json.provider.JacksonProvider; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.copyAttributesToOriginal; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"json", "evaluate", "jsonpath"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Uses a JSON Path expression to select a portion (or portions) of an incoming JSON document " + + "and send it/them to outgoing FlowFile(s). The matched portion(s) will be sent to the 'selected' relation. " + + "If no portion of the incoming FlowFile is selected by the specified JSON Path, the incoming file will be " + + "sent to the 'failure' relation. The incoming JSON is processed in streaming fashion so as to avoid loading " + + "the entire document into memory. The amount of memory used by this processor will depend on the size of the " + + "selected JSON, but is independent of the size of the incoming document size. Lastly, note that JSON Paths " + + "containing paths after a filter clause are not supported. For example, a path like " + + "'$.store.book[?(@.price < 10)]' is supported, but not '$.store.book[?(@.price < 10)].price'.") +@WritesAttributes({ + @WritesAttribute(attribute = "fragment.identifier", + description = "All FlowFiles produced from the same parent FlowFile will have the same " + + "randomly generated UUID added for this attribute"), + @WritesAttribute(attribute = "fragment.index", + description = "A one-up number that indicates the ordering of the FlowFiles that were " + + "created from a single parent FlowFile"), + @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile") +}) +@SeeAlso(EvaluateJsonPath.class) +public class SelectJson extends AbstractProcessor { + + public static final PropertyDescriptor JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder() + .name("JsonPath Expression") + .description("A JsonPath expression to be selected from the incoming FlowFile.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // Full validation occurs in #customValidate + .required(true) + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFile. If the FlowFile fails processing, " + + "nothing will be sent to this relationship") + .build(); + public static final Relationship REL_SELECTED = new Relationship.Builder() + .name("selected") + .description("All segments of the original FlowFile will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid " + + "JSON or the specified path does not exist), it will be routed to this relationship") + .build(); + + private List properties; + private Set relationships; + private volatile JsonPath jsonPath; + + @Override + protected Collection customValidate(ValidationContext validationContext) { + String value = validationContext.getProperty(JSON_PATH_EXPRESSION).getValue(); + try { + jsonPath = JsonPathCompiler.compile(value); + } catch (Exception e) { + ValidationResult result = new ValidationResult.Builder().subject(JSON_PATH_EXPRESSION.getName()) + .valid(false).explanation( + "the specified JSON path is either invalid or not supported by this processor").build(); + return Collections.singleton(result); + } + return new ArrayList<>(); + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final List properties = new ArrayList<>(); + properties.add(JSON_PATH_EXPRESSION); + this.properties = Collections.unmodifiableList(properties); + + final Set relationships = new HashSet<>(); + relationships.add(REL_ORIGINAL); + relationships.add(REL_SELECTED); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + private class JsonFragmentWriter implements JsonPathListener { + + private final ProcessSession processSession; + private final FlowFile original; + private final Relationship relSelected; + private int fragmentIndex; + private ComponentLog logger; + private Map attributes = new HashMap<>(); + + public int getCount() { + return fragmentIndex; + } + + public JsonFragmentWriter(ProcessSession processSession, FlowFile original, Relationship relSelected, + String groupId, ComponentLog logger){ + this.processSession = processSession; + this.original = original; + this.relSelected = relSelected; + this.logger = logger; + attributes.put(FRAGMENT_ID.key(), groupId); + attributes.put(SEGMENT_ORIGINAL_FILENAME.key(), original.getAttribute(CoreAttributes.FILENAME.key())); + } + + @Override + public void onValue(Object value, ParsingContext context) { + FlowFile fragment = processSession.create(original); + try (OutputStream out = processSession.write(fragment)) { + out.write(value.toString().getBytes(Charset.defaultCharset())); + } catch (IOException e) { + logger.error("Problem writing to flowfile", e); + } + attributes.put(FRAGMENT_INDEX.key(), String.valueOf(fragmentIndex++)); + processSession.transfer(processSession.putAllAttributes(fragment, attributes), relSelected); + } + } + + @Override + public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) { + FlowFile original = processSession.get(); + if (original == null) { + return; + } + + final ComponentLog logger = getLogger(); + + String groupId = UUID.randomUUID().toString(); + JsonFragmentWriter fragmentWriter = + new JsonFragmentWriter(processSession, original, REL_SELECTED, groupId, logger); + + try (InputStream is = processSession.read(original)) { + JsonSurfer surfer = new JsonSurfer(JacksonParser.INSTANCE, JacksonProvider.INSTANCE); + surfer.configBuilder().bind(jsonPath, fragmentWriter).buildAndSurf(is); + } catch (Exception e) { + logger.error("Problems with FlowFile {}: {}", new Object[]{original, e.getMessage()}); + processSession.transfer(original, REL_FAILURE); + return; + } + + if (fragmentWriter.getCount() == 0) { + logger.error("No object or array was found at the specified json path"); + processSession.transfer(original, REL_FAILURE); + return; + } + + logger.info("Selected {} portion(s) from FlowFile {}", new Object[]{fragmentWriter.getCount(), original}); + original = copyAttributesToOriginal(processSession, original, groupId, fragmentWriter.getCount()); + + processSession.transfer(original, REL_ORIGINAL); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java index ea118f9c7eafd..5c28c8b0ed695 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitJson.java @@ -39,6 +39,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; @@ -82,9 +83,12 @@ description = "The number of split FlowFiles generated from the parent FlowFile"), @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile") }) +@SeeAlso(SelectJson.class) @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content (as a JsonNode object) is read into memory, " + - "in addition to all of the generated FlowFiles representing the split JSON. If many splits are generated due to the size of the JSON, or how the JSON is " + - "configured to be split, a two-phase approach may be necessary to avoid excessive use of memory.") + "in addition to all of the generated FlowFiles representing the split JSON. If the JSON documents to be " + + "split are large, then the SelectJson processor should be considered in order to avoid excessive " + + "memory usage. If a complex JSON Path is required and the JSON documents are large, then a two-phase " + + "approach may be necessary.") public class SplitJson extends AbstractJsonPathProcessor { public static final PropertyDescriptor ARRAY_JSON_PATH_EXPRESSION = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSelectJson.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSelectJson.java new file mode 100644 index 0000000000000..48509ccb9121b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSelectJson.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; + +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_COUNT; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_ID; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.FRAGMENT_INDEX; +import static org.apache.nifi.flowfile.attributes.FragmentAttributes.SEGMENT_ORIGINAL_FILENAME; + +public class TestSelectJson { + + private static final Path WEATHER_JSON = Paths.get("src/test/resources/TestSelectJson/weather.json"); + private static final Path MISC_JSON = Paths.get("src/test/resources/TestSelectJson/misc.json"); + private static final Path SAMPLE_XML = Paths.get("src/test/resources/TestSelectJson/sample.xml"); + + @Test + public void testProcessorValidation() { + final TestRunner testRunner = TestRunners.newTestRunner(new SelectJson()); + testRunner.setProperty(SelectJson.JSON_PATH_EXPRESSION, "badpath!"); + testRunner.assertNotValid(); + } + + @Test + public void invalidJson() throws Exception { + final TestRunner testRunner = newTestRunner(SAMPLE_XML, "$"); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(SelectJson.REL_FAILURE, 1); + final MockFlowFile out = testRunner.getFlowFilesForRelationship(SelectJson.REL_FAILURE).get(0); + out.assertContentEquals(SAMPLE_XML); + } + + @Test + public void contiguousArrayOfObject() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$[1].weather.*"); + testRunner.run(); + + checkOriginal(WEATHER_JSON, 2, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 2); + checkSplit(0, WEATHER_JSON, "{\"main\":\"Mist\",\"description\":\"mist\"}", testRunner); + checkSplit(1, WEATHER_JSON, "{\"main\":\"Fog\",\"description\":\"fog\"}", testRunner); + } + + @Test + public void contiguousObject() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$[1].main.*"); + testRunner.run(); + + checkOriginal(WEATHER_JSON, 5, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 5); + checkSplit(1, WEATHER_JSON, "93", testRunner); + } + + @Test + public void rootSplitArray() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$.*"); + testRunner.run(); + checkOriginal(WEATHER_JSON, 3, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 3); + } + + @Test + public void rootSplitObject() throws Exception { + final TestRunner testRunner = newTestRunner(MISC_JSON, "$.*"); + testRunner.run(); + checkOriginal(MISC_JSON, 5, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 5); + checkSplit(0, MISC_JSON, "\"leaves\"", testRunner); + } + + @Test + public void impliedRootSplitArray() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$[*]"); + testRunner.run(); + checkOriginal(WEATHER_JSON, 3, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 3); + } + + @Test + public void disjointSetOfObjects() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$[*].main"); + testRunner.run(); + + checkOriginal(WEATHER_JSON, 3, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 3); + checkSplit(1, WEATHER_JSON, + "{\"temp\":56.14,\"humidity\":93,\"temp_min\":50,\"temp_max\":62.6,\"something\":[4,5,6]}", + testRunner); + } + + @Test + public void disjointSetOfArrays() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$[*].weather"); + testRunner.run(); + + checkOriginal(WEATHER_JSON, 3, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 3); + checkSplit(0, WEATHER_JSON, "[{\"main\":\"Snow\",\"description\":\"light snow\"}]", testRunner); + } + + @Test + public void arrayExtract() throws Exception { + final TestRunner testRunner = newTestRunner(MISC_JSON, "$.two-d"); + testRunner.run(); + + checkOriginal(MISC_JSON, 1, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 1); + checkSplit(0, MISC_JSON, "[[3,7],[3,4,1],[2,0,1,8]]", testRunner); + } + + @Test + public void contiguousArrayOfArray() throws Exception { + final TestRunner testRunner = newTestRunner(MISC_JSON, "$.two-d[*]"); + testRunner.run(); + + checkOriginal(MISC_JSON, 3, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 3); + checkSplit(2, MISC_JSON, "[2,0,1,8]", testRunner); + } + + @Test + public void contiguousMixedArray() throws Exception { + final TestRunner testRunner = newTestRunner(MISC_JSON, "$.mixed-arr[*]"); + testRunner.run(); + + checkOriginal(MISC_JSON, 7, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 7); + checkSplit(0, MISC_JSON, "true", testRunner); + checkSplit(1, MISC_JSON, "false", testRunner); + checkSplit(2, MISC_JSON, "null", testRunner); + checkSplit(3, MISC_JSON, "23", testRunner); + checkSplit(4, MISC_JSON, "3.14", testRunner); + checkSplit(5, MISC_JSON, "\"pi\"", testRunner); + checkSplit(6, MISC_JSON, "{\"g\":3927}", testRunner); + } + + @Test + public void arrayOffsetFromWildcard() throws Exception { + final TestRunner testRunner = newTestRunner(MISC_JSON, "$.fence.*[1]"); + testRunner.run(); + + checkOriginal(MISC_JSON, 2, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 2); + checkSplit(0, MISC_JSON, "\"brown\"", testRunner); + checkSplit(1, MISC_JSON, "\"hinges\"", testRunner); + } + + @Test + public void disjointEmbeddedStar() throws Exception { + final TestRunner testRunner = newTestRunner(MISC_JSON, "$.nest.*.red"); + testRunner.run(); + + checkOriginal(MISC_JSON, 2, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 2); + checkSplit(0, MISC_JSON, "3", testRunner); + checkSplit(1, MISC_JSON, "7", testRunner); + } + + @Test + public void disjointEmbeddedStar2() throws Exception { + final TestRunner testRunner = newTestRunner(MISC_JSON, "$.nest[*]['red']"); + testRunner.run(); + + checkOriginal(MISC_JSON, 2, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 2); + checkSplit(0, MISC_JSON, "3", testRunner); + checkSplit(1, MISC_JSON, "7", testRunner); + } + + @Test + public void disjointEmbeddedStar3() throws Exception { + final TestRunner testRunner = newTestRunner(MISC_JSON, "$.nest.arr[*]['orange']"); + testRunner.run(); + + checkOriginal(MISC_JSON, 2, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 2); + checkSplit(0, MISC_JSON, "9", testRunner); + checkSplit(1, MISC_JSON, "2", testRunner); + } + + @Test + public void disjointSetOfScalarsArrContext() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$[*].main.something[1]"); + testRunner.run(); + + checkOriginal(WEATHER_JSON, 3, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 3); + checkSplit(0, WEATHER_JSON, "2", testRunner); + checkSplit(1, WEATHER_JSON, "5", testRunner); + checkSplit(2, WEATHER_JSON, "8", testRunner); + } + + @Test + public void disjointSetOfScalarsArrContext2() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$[*]['main']['something'][1]"); + testRunner.run(); + + checkOriginal(WEATHER_JSON, 3, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 3); + checkSplit(0, WEATHER_JSON, "2", testRunner); + checkSplit(1, WEATHER_JSON, "5", testRunner); + checkSplit(2, WEATHER_JSON, "8", testRunner); + } + + @Test + public void disjointSetOfScalarsObjContext() throws Exception { + final TestRunner testRunner = TestRunners.newTestRunner(new SelectJson()); + testRunner.setProperty(SelectJson.JSON_PATH_EXPRESSION, "$[*].name"); + final String filename = "test.json"; + + testRunner.enqueue(WEATHER_JSON, new HashMap() { + { + put(CoreAttributes.FILENAME.key(), filename); + } + }); + testRunner.run(); + + checkOriginal(WEATHER_JSON, 3, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 3); + checkSplit(0, filename, "\"Seattle\"", testRunner); + checkSplit(1, filename, "\"Washington, DC\"", testRunner); + checkSplit(2, filename, "\"Miami\"", testRunner); + } + + @Test + public void pathNotFound() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$.nonexistent"); + testRunner.run(); + + testRunner.assertTransferCount(SelectJson.REL_FAILURE, 1); + testRunner.getFlowFilesForRelationship(SelectJson.REL_FAILURE).get(0).assertContentEquals(WEATHER_JSON); + } + + @Test + public void contiguousDotStarEnd() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$[0].coord.*"); + testRunner.run(); + + checkOriginal(WEATHER_JSON, 2, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 2); + checkSplit(0, WEATHER_JSON, "-122.33", testRunner); + } + + @Test + public void contiguousBracketStarEnd() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$[0].weather[*]"); + testRunner.run(); + + checkOriginal(WEATHER_JSON, 1, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 1); + checkSplit(0, WEATHER_JSON, "{\"main\":\"Snow\",\"description\":\"light snow\"}", testRunner); + } + + @Test + public void disjointDotStarEnd() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$[*].coord.*"); + testRunner.run(); + + checkOriginal(WEATHER_JSON, 6, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 6); + checkSplit(0, WEATHER_JSON, "-122.33", testRunner); + } + + @Test + public void disjointBracketStarEnd() throws Exception { + final TestRunner testRunner = newTestRunner(WEATHER_JSON, "$[*].weather[*]"); + testRunner.run(); + + checkOriginal(WEATHER_JSON, 4, testRunner); + testRunner.assertTransferCount(SelectJson.REL_SELECTED, 4); + checkSplit(0, WEATHER_JSON, "{\"main\":\"Snow\",\"description\":\"light snow\"}", testRunner); + } + + private TestRunner newTestRunner(Path testFile, String path) throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new SelectJson()); + testRunner.setProperty(SelectJson.JSON_PATH_EXPRESSION, path); + testRunner.enqueue(testFile); + testRunner.assertValid(); + return testRunner; + } + + private void printSplit(TestRunner testRunner, int i) { + MockFlowFile split = testRunner.getFlowFilesForRelationship(SelectJson.REL_SELECTED).get(i); + System.out.printf("\nSplit %d:\n", i); + System.out.println(new String(testRunner.getContentAsByteArray(split))); + } + + private void checkOriginal(Path path, int numSplitsExpected, TestRunner testRunner) throws IOException { + testRunner.assertTransferCount(SelectJson.REL_ORIGINAL, 1); + final MockFlowFile orig = testRunner.getFlowFilesForRelationship(SelectJson.REL_ORIGINAL).get(0); + orig.assertAttributeExists(FRAGMENT_ID.key()); + orig.assertAttributeEquals(FRAGMENT_COUNT.key(), String.valueOf(numSplitsExpected)); + orig.assertContentEquals(path); + } + + private void checkSplit(int i, Path path, String content, TestRunner testRunner) { + checkSplit(i, path.getFileName().toString(), content, testRunner); + } + + private void checkSplit(int i, String fileName, String content, TestRunner testRunner) { + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(SelectJson.REL_SELECTED).get(i); + flowFile.assertContentEquals(content); + flowFile.assertAttributeEquals(FRAGMENT_INDEX.key(), String.valueOf(i)); + flowFile.assertAttributeEquals(SEGMENT_ORIGINAL_FILENAME.key(), fileName); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestSelectJson/misc.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestSelectJson/misc.json new file mode 100644 index 0000000000000..d076343a28dca --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestSelectJson/misc.json @@ -0,0 +1,39 @@ +{ + "autumn": "leaves", + "fence": { + "colors": ["black", "brown"], + "materials": ["wood", "hinges", "screws"] + }, + "mixed-arr": [ + true, + false, + null, + 23, + 3.14, + "pi", + {"g": 3927} + ], + "two-d": [ + [3,7], + [3,4,1], + [2,0,1,8] + ], + "nest": { + "obj1": { + "red": 3, + "green": 6 + }, + "obj2": { + "red": 7, + "green": 7 + }, + "arr": [ + { "orange": 9, + "white": 10 + }, + { "orange": 2, + "white": 3 + } + ] + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestSelectJson/sample.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestSelectJson/sample.xml new file mode 100644 index 0000000000000..ce34dd35ff02f --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestSelectJson/sample.xml @@ -0,0 +1,7 @@ + + + Internet + W3Schools + Sample + This here is XML + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestSelectJson/weather.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestSelectJson/weather.json new file mode 100644 index 0000000000000..415e6fd2fdeaf --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestSelectJson/weather.json @@ -0,0 +1,69 @@ +[ + { + "id": 5809844, + "name": "Seattle", + "coord": { + "lon": -122.33, + "lat": 47.61 + }, + "weather": [ + { + "main": "Snow", + "description": "light snow" + } + ], + "main": { + "temp": 35.78, + "humidity": 100, + "temp_min": 33.8, + "temp_max": 39.2, + "something": [1, 2, 3] + } + }, + { + "id": 4140963, + "name": "Washington, DC", + "coord": { + "lon": -77.04, + "lat": 38.9 + }, + "weather": [ + { + "main": "Mist", + "description": "mist" + }, + { + "main": "Fog", + "description": "fog" + } + ], + "main": { + "temp": 56.14, + "humidity": 93, + "temp_min": 50, + "temp_max": 62.6, + "something": [4, 5, 6] + } + }, + { + "id": 4164138, + "name": "Miami", + "coord": { + "lon": -80.19, + "lat": 25.77 + }, + "weather": [ + { + "main": "Clouds", + "description": "few clouds" + } + ], + "main": { + "temp": 71.98, + "humidity": 73, + "temp_min": 69.8, + "temp_max": 75.2, + "something": [7, 8, 9] + } + } +]