Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-5900 Add a SplitLargeJson processor #3414

Closed
wants to merge 1 commit into from

Conversation

arenger
Copy link

@arenger arenger commented Apr 6, 2019

Overview

The current SplitJson processor works great when the targeted documents are small. However, if a given flow needs to handle large JSON documents, the SplitJson processor can consume a lot of memory. This is already noted as a "System Resource Consideration" in the documentation for the SplitJson processor.

I created a SplitLargeJson processor to address this concern. It uses an event-based streaming API to process the JSON without loading the whole document into memory, similar to SAX implementations for XML processing. This allows for near-constant memory usage, independent of file size, as shown in the following test results:

MemoryUsage

The trade-off is between heap space usage and JSON Path functionality. The SplitLargeJson processor supports a basic subset of JSON Path, excluding constructs that would require "backtracking" in the file. See below for examples. For full JSON Path support, the SplitJson processor should be used. When memory conservation is important, the SplitLargeJson processor should be used. SplitLargeJson runs about 20 to 70 percent faster than SplitJson because it doesn't have to build a DOM before processing the JSON Path. See the "Test Methodology" section, below.

Licensing

The SplitLargeJson processor depends on javax.json, which is licensed under either GNU v2 or CDDL. According to Apache legal docs, there is a "weak copyleft" provision allowing for compatibility between CDDL and ASF licensing. Furthermore, Apache NiFi already includes javax.json in other modules:

$ find . -type f | fgrep pom.xml | xargs fgrep -l javax.json
./nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
./nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
./nifi-nar-bundles/nifi-slack-bundle/nifi-slack-processors/pom.xml
./nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/pom.xml
./nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml

The LICENSE and NOTICE files were not changed with this PR since javax.json is already included.

Testing

This PR includes 30 new unit tests to verify the changes, primarily exercising different permutations of JSON Path expressions. In addition, memory and timing metrics were gathered in order to confirm the trade-offs between SplitJson and SplitLargeJson. See the memory usage chart above and the "Test Methodology" section, below.

How to use SplitLargeJson Processor

Given an incoming FlowFile and a valid JSON Path setting, SplitLargeJson will send one or more FlowFiles to the split relation, and the original FlowFile will be sent to the original relation. If JSON Path did not match any object or array in the document, then the document will be passed to the failure relation.

JSON Path Examples

Here is a sample JSON file, followed by JSON Path expressions and the content of the FlowFiles that would be output from the SplitLargeJson processor.

Sample JSON:

[
  {
    "name": "Seattle",
    "weather": [
      {
        "main": "Snow",
        "description": "light snow"
      }
    ]
  },
  {
    "name": "Washington, DC",
    "weather": [
      {
        "main": "Mist",
        "description": "mist"
      },
      {
        "main": "Fog",
        "description": "fog"
      }
    ]
  }
]
  • JSON Path Expression: $[1].weather
    • FlowFile 0: {"main":"Mist","description":"mist"}
    • FlowFile 1: {"main":"Fog","description":"fog"}
  • JSON Path Expression: $[1].weather[0]
    • FlowFile 0: {"main":"Mist"}
    • FlowFile 1: {"description":"mist"}
  • JSON Path Expression: $[*].name
    • FlowFile 0: ["Seattle"]
    • FlowFile 1: ["Washington, DC"]
  • JSON Path Expression: $[*]['weather'][*]['main']
    • FlowFile 0: {"main":"Snow"}
    • FlowFile 1: {"main":"Mist"}
    • FlowFile 2: {"main":"Fog"}

Array slices (:), recursive descent (..), and script expressions (()) are not supported. However, a fragment.index attribute is set on every outgoing FlowFile to the split relation. This, in combination with a RouteOnAttribute processor, can be used in place of JSON Path array slices.

Test Methodology

Measuring the memory usage of a NiFi processor can be tricky because these processors aren't meant to run in isolation. They're designed to run in the NiFi JVM, of course, but in that context a processor will share heap space with all the other processors, controller services, and "housekeeping" infrastructure required to support the data flow. This section describes how memory and timing metrics were obtained for evaluation of the SplitLargeJson processor.

An extensive testing framework exists for Apache NiFi that allows for processors to be exercised outside of a NiFi data flow. The framework provides the ability to send FlowFiles to a processor and then assert the presence and content of FlowFiles generated by the processor. This framework is built using JUnit and Maven, and can be invoked within an IDE or via command line interface (CLI). Most Java IDEs are written in Java, and tests run from within the IDE share heap space with the IDE itself. Therefore, memory measurements were taken outside NiFi and outside an IDE, via CLI invocation of the NiFi testing framework.

The goal was to compare SplitLargeJson processor with the existing SplitJson processor. To do this, NiFi was cloned, built locally, and instrumented to log its memory usage. The following four lines were added at the end of the onTrigger method of org.apache.nifi.processors.standard.SplitJson:

        Runtime rt = Runtime.getRuntime();
        rt.gc();
        logger.info(String.format("Memory used: %.3f",
              (double)(rt.totalMemory() - rt.freeMemory())/(1024 * 1024)));

At this point in the code, resources used to split the original FlowFile are still in scope. A similar change was made to SplitLargeJson, to log memory before its JsonParser falls out of scope (after the the while loop yet inside the try block of the onTrigger method). A simple test was then created that runs an arbitrary JSON file through the processor:

    @Test
    public void test() throws Exception {
        { // invoke static loading and "prime the pumps" by running a small file
            TestRunner testRunner = TestRunners.newTestRunner(new SplitLargeJson());
            testRunner.setProperty(SplitLargeJson.JSON_PATH_EXPRESSION, "$.*[0][0][0]");

            testRunner.enqueue(Paths.get("/tmp/little.json"));
            testRunner.run();

            testRunner.assertTransferCount(SplitLargeJson.REL_SPLIT, 3);
        }

        TestRunner testRunner = TestRunners.newTestRunner(new SplitLargeJson());
        testRunner.setProperty(SplitLargeJson.JSON_PATH_EXPRESSION, "$.*[2][2][2]");
        testRunner.enqueue(Paths.get(System.getProperty("testFile")));

        long start = System.currentTimeMillis();
        testRunner.run();
        long stop  = System.currentTimeMillis();

        testRunner.getLogger().info(String.format("Process time: %.3f sec", (double)(stop - start)/1000));
    }

Five JSON files were generated using a python script, starting with 8 MiB and doubling until 128 MiB. Each file was tested with each processor via CLI invocation of the NiFi testing framework, as mentioned above. The resulting log output would then contain memory and timing information. For example, this command runs a 32 MiB file through SplitLargeJson and shows the memory and timing results:

mvn test -Dtest=gov.ic.cte.nifi.processors.PerformanceTest -DtestFile=/tmp/s32.json && \
tail -3 target/surefire-reports/gov.ic.cte.nifi.processors.PerformanceTest-output.txt | cut -c70-
...
argeJson - SplitLargeJson[id=5a60a1ab-912c-4d2a-aa4d-ce63badc8af0] Memory used: 34.382
argeJson - SplitLargeJson[id=5a60a1ab-912c-4d2a-aa4d-ce63badc8af0] Split FlowFile[0,s32.json,33741503B] into 3 FlowFile(s)
SplitLargeJson[id=5a60a1ab-912c-4d2a-aa4d-ce63badc8af0] Process time: 0.261 sec

Here is the corresponding test and output for SplitJson:

mvn test -Dtest=org.apache.nifi.processors.standard.PerformanceTest -DtestFile=/tmp/s32.json && \
tail -3 target/surefire-reports/org.apache.nifi.processors.standard.PerformanceTest-output.txt | cut -c70-
...
tJson - SplitJson[id=33b08e70-c38e-4674-9b2d-0ffc087503f2] Split FlowFile[0,s32.json,33741503B] into 3 FlowFiles
tJson - SplitJson[id=33b08e70-c38e-4674-9b2d-0ffc087503f2] Memory used: 164.947
itJson[id=33b08e70-c38e-4674-9b2d-0ffc087503f2] Process time: 0.549 sec

The attentive reader may notice that the above memory usage statistics don't match those presented in the graph from the "Overview" section. This is because the raw output needs to be adjusted to account for the fact that FlowFiles are kept in memory while Apache's NiFi testing framework is running. When these processors are running in an actual NiFi instance (under default configuration), the FlowFile content would be stored on disk.

It's worth noting that FlowFiles provided to and generated by processors in the testing framework are kept in memory (see the importFrom method and usage of the transferMap in MockProcessSession). This is why a deep and specific JSON Path was selected for the tests: to reduce the size and count of the resulting FlowFile splits. Otherwise, if a more inclusive JSON Path were to be used, the memory reading would need further correction to account for data generated by the processor in question. The content of the three FlowFiles generated in these tests were only 41 bytes each.

As for the timing results, SplitLargeJson completed between 20 and 70 percent faster than SplitJson, for the same five generated JSON files:

Timing

The timing metrics will vary based on the structure of the JSON document being split. As an additional time test, a JSON file representing the City and County of San Francisco's Subdivision parcels was split into fifteen thousand FlowFiles using both processors. A perl script was used to create 5 different subsets from this file, at the same size increments as above (8, 16, 32, 64, and 128). These tests had run times between 2 seconds and 5 minutes, and SplitLargeJson ranged between 20 and 50 percent faster.

Checklist

  • Is there a JIRA ticket associated with this PR? Is it referenced in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit?

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    (Note: mvn clean install completes without error after disabling FileBasedClusterNodeFirewallTest and DBCPServiceTest.
    Adding -Pcontrib-check fails , but it appears to fail on master branch too)

  • Have you written or updated unit tests to verify your changes?

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?

  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?

  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?

  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

  • Have you ensured that format looks appropriate for the output in which it is rendered?

@arenger
Copy link
Author

arenger commented Apr 6, 2019

When I run mvn install -Pdocker in the nifi-docker directory, start the resulting image, login to the container, and extract nifi-standard-processors-1.10.0-SNAPSHOT.jar, I can see org/apache/nifi/processors/standard/SplitLargeJson.class ... but it does not show up in the UI. I wonder if I missed a step. Please let me know how I can use the processor in the resulting docker image.

I've tested the code as a separated/custom nar file (and with the unit tests, etc), but I would like to test it via the UI, as a bundled standard nar.

@ottobackwards
Copy link
Contributor

How does your approach compare with https://github.com/jsurfer/JsonSurfer? In other Jira issues we have talked about possibly using that, although I'm not sure we have thought about a new processor, as opposed to improving splitjson.
Adding a new processor vs. improving may be an issue, but I can't speak to that.

@arenger
Copy link
Author

arenger commented Apr 9, 2019

@ottobackwards I originally sought to improve SplitJson instead of adding a new processor. I could certainly submit a different PR targeting an improvement to SplitJson, but there were a few reasons I thought a different processor might be better:

  1. The SplitLargeJson processor is designed to always output complete JSON documents. This differs from the SplitJson behavior. For example, when splitting an array of strings, SplitJson would output String1, String2, etc, but SplitLargeJson would output ["String1"], ["String2"], etc. This can be advantageous when the output relation (the split-relation) is directed to another processor that expects JSON.
  2. The SplitJson processor can only split arrays. The JSON Path must target an array in the document. However, SplitLargeJson can split arrays and objects. If the JSON Path points to an object then it will output all the key-value pairs of that object in separate flowfiles.
  3. The SplitJson processor sets a fragment.count attribute on outgoing flowfiles to indicate the total number of documents that were split from the designated JSON Path. This is by nature impossible when using a sax-like (streaming) approach to reading the JSON because the processor is designed to avoid loading the whole document into memory at the same time. Therefore, in order to preserve the current function, a setting would need to be added to optionally engage the optimized handling for large files -- with a stated caveat that the fragment.count attribute would be unavailable.

Again, I could submit a different pull request that targets an optimization of SplitJson rather than an addition of a new SplitLargeJson processor. I started down that path originally, with a boolean setting to optionally activate large file processing (and in that mode it could also split objects, provided the JSON Path was not "overly complex" [i.e. require backtracking, etc]) -- but then I had to change the processor to occasionally output non-json documents which made the code less elegant. That said, I could see the value in sticking with one processor.

As for JsonSurfer, I had honestly never heard of it. My code here was from a work project I did a couple years ago that was finally approved for release to the public. I could probably make a change to SplitJson that employs JsonSurfer... I'm bummed my code isn't as novel as I'd hoped, but I know that's how things go!

Let me know what you think is best.

@ottobackwards
Copy link
Contributor

@arenger, I did not mean to trivialize what you did, and I certainly not saying you are wrong in what you did, so don't worry.
It is only that I have thought of this issue before myself and I had :

  1. Not thought of doing a new processor
  2. Found JSON Surfer
    I only wanted to ask about it in that context :)

@arenger
Copy link
Author

arenger commented Apr 9, 2019

Haha thanks Otto! I didn't feel trivialized. : ) My point was just that I was bummed to learn that a project like JsonSurfer already exists, but at the same time it looks pretty solid. Maybe we should work that in instead, now or in the future. I'm fine with whatever's decided about NiFi's JSON-splitting behaviors -- it'd be fun to contribute to NiFi but we should obviously do whatever's best for the code base / end users.

@pvillard31
Copy link
Contributor

Hey there, I don't have time for a review right now (traveling quite a bit) but wanted to take the time to thank you @arenger for this AMAZING pull request. It is incredibly well documented, with a lot of test cases and explanations. Really glad to see such contribution in the NiFi community! A big thank you for taking the time to do it.

Just a quick remark so that my comment is not totally useless :) You'll have to update the NOTICE file of the standard NAR bundle to mention the javax.json dependency.

@arenger
Copy link
Author

arenger commented Apr 11, 2019

@pvillard31 Thanks Pierre! I updated the NOTICE file as you suggested; good catch.

@ottobackwards and @pvillard31 (and anyone else): From here, I think there are two main questions:

  • Should there be an update to the SplitJson processor to provide for optionally-optimized large-file processing, or should there be a separate processor for this purpose?
  • Should JsonSurfer be used for the large-file processing, or should JSON-P (javax.json) be used (in conjunction with the utility classes in this PR)?

On the first question, I would still vote for a separate processor for the reasons I mentioned earlier: consistent output format (always emit a JSON doc instead of sometimes) and the ability to split objects instead of arrays only.

I could help with any of the four permutations (one processor or two; with javax.json or without). I plan to look into the feasibility of employing JsonSurfer in splitting json, when I can make more time.

@ottobackwards
Copy link
Contributor

My only question with JSONSurfer would be if the jsonpath support is equivalent

@ottobackwards
Copy link
Contributor

Just a quick review, I would like some javadoc so I can understand the intended purpose and implementation of the classes, so I can review for more than style

@arenger
Copy link
Author

arenger commented Apr 14, 2019

Hello Otto. Yeah no problem, I'll add some more code comments and push them up sometime soon. Probably by monday morning.

@arenger
Copy link
Author

arenger commented Apr 16, 2019

Hello @ottobackwards I added more javadoc comments. Maybe gitlab already notifies you when I push a commit; not sure.

@arenger arenger changed the title NIFI-5900 Added SplitLargeJson processor NIFI-5900 Add a SplitLargeJson processor Apr 16, 2019
@ottobackwards
Copy link
Contributor

Sorry for the delay, I have been on vacation. I'll take a look at the new javadoc.
Another high level comment: I think the there will be confusion with the splitjson processor. I think the PR description you have done here, along with concrete documentation of the JSONPath support should be added as additional detail html documentation.

@arenger
Copy link
Author

arenger commented Apr 20, 2019

No problem on delays, I hope you had a good vacation. As for additional detail to the html documentation you mentioned: Yes I could do that, but I don't know where to add it. Did you take a look at the @CapabilityDescription that I added for SplitLargeJson? I tried to make that wording clear but also succinct. I could add more detail there, or is there a better place where I should expound on the function of the processor?

Also, as I mentioned in an above comment, I think there are four roads we could take from here:

  1. Create a new SplitJsonProcessor that uses javax.json (this PR)
  2. Create a new SplitJsonProcessor that uses JsonSurfer
  3. Keep only SplitJson and optionally employ a streaming approach, backed by javax.json, when a new property is set
  4. Keep only SplitJson and optionally employ a streaming approach, backed by JsonSurfer, when a new property is set

I looked briefly into the 2nd and 4th option but have yet to confirm whether the memory usage is comparable. In order to use JsonSurfer in NiFi it looks like we'd need to suppress the version of ANTLR that is pulled from the nifi-syslog-utils module (via simple-syslog-5424) and explicitly replace it with 4.7.2 of antlr4-runtime. After I did that, I was able to run JsonSurfer without a runtime error.

JsonSurfer does have wider support for the JSON Path specification. If we went that route, I'd suggest we create a new processor called "JsonExtract", or something, that would simply receive a JSON file and a JSON Path. It would output zero, one, or more JSON documents from the incoming document. The notion of "splitting" isn't really the best description at that point, since the full JSON Path specification can be used to specify any part -- or set of parts -- of a JSON document.

@ottobackwards
Copy link
Contributor

@arenger how did you suppress the antlr runtime?

@arenger
Copy link
Author

arenger commented Apr 22, 2019

Well, jsurfer-core uses ANTLR 4.7 so I was getting a runtime error from conflicting versions of ANTLR. After tracking it down I found that simple-syslog-5424 was importing version 4.5. After I made this change JsonSurfer worked fine:

--- a/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-syslog-utils/pom.xml
@@ -23,10 +23,21 @@
     <artifactId>nifi-syslog-utils</artifactId>
     <packaging>jar</packaging>
     <dependencies>
+        <dependency>
+          <groupId>org.antlr</groupId>
+          <artifactId>antlr4-runtime</artifactId>
+          <version>4.7.2</version>
+        </dependency>
         <dependency>
             <groupId>com.github.palindromicity</groupId>
             <artifactId>simple-syslog-5424</artifactId>
             <version>0.0.11</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.antlr</groupId>
+                <artifactId>antlr4-runtime</artifactId>
+              </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>

... I did not yet run unit/regression tests in nifi-syslog-utils because it was experimental.

@ottobackwards
Copy link
Contributor

Interesting, thanks

@ottobackwards
Copy link
Contributor

I maintain simple syslog *, and I tried the new antlr runtime this morning, and I'm having issues compiling. I wonder if it just works if you use a different runtime... at runtime

@ottobackwards
Copy link
Contributor

@arenger I have upgraded simple-syslog5424 to antlr 4.7.2. I have not cut the release yet.
If we go forward with JsonSurfer, we can do another jira/pr and pick up the newer release so you don't have to do the work around.

@arenger
Copy link
Author

arenger commented Apr 23, 2019

@ottobackwards Yeah that sounds good. On which branch did you make the updates to simple-syslog5424? I could pull those into a new branch that adds a new FilterJson processor backed by JsonSurfer. I created a NIFI-5900-JsonSurfer branch with a prototype (messy approximation) of what it could look like. It's still called "SplitLargeJson" there, but I'd rename it. I also updated the unit tests so you could see how the processor would work.

I ran the same memory tests on that processor and it behaves as advertised (i.e. doesn't create an in-memory DOM while processing):
json-surfer-memory

... so that's good. How about I create a simpler PR for adding a new FilterJson processor, and we can discuss this further as part of that PR?

@ottobackwards
Copy link
Contributor

I actually was able to cut the release this morning. So, if in your branch you update the version to 0.0.12 you will get the new version. I think it should have been distributed out by now

No problem with FilterJson -> PR's are free ;)

I would like to get some committer level feedback at some point as well on the high level questions here.

@arenger arenger mentioned this pull request Apr 25, 2019
11 tasks
Copy link
Contributor

@ottobackwards ottobackwards left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should have test for the fragment writer and basically all the classes used in here.

* match a range of array elements OR a range of object fields. For example, see the disjointEmbeddedStar
* unit tests. This ambiguity ends up muddying the code. Accounting for a possible "alternative path" (altPath)
* is probably the cleanest (though maybe not the most efficient) way to handle the oddity. */
public static SimpleJsonPath of(String str) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these inline regex's be replaced by static compiled patterns?

@ottobackwards
Copy link
Contributor

What happens if not supported jsonpath is entered?

@arenger
Copy link
Author

arenger commented May 21, 2019

Hi @ottobackwards the processor will report an invalid state when the JSON path is not supported... and yeah I bet the regex handling could be optimized. I'd be happy to do that if folks think that this or the other PR have some promise/likelihood of being accepted. No problem either way, but I shouldn't spend any more time here until we have a more authoritative "trajectory"/guidance regarding the questions I raise above.

See also:
#3455 (comment)
#3455 (comment)

@MikeThomsen
Copy link
Contributor

@arenger

but it does not show up in the UI

You need to add an entry to org.apache.nifi.processor.Processor under resources/services in the standard processors bundle.

FWIW, I think going to JsonSurfer is the right call, in part because it wraps known quantities like Gson and Jackson. When I did that streaming json parser service, it was pretty easy once I got the hang of it.

The only thing in your implementation I am worried about is the 1:1 flowfile/record thing. I think you should have batching built in as an option. The use case that caused my initial use of JsonSurfer was a 16GB JSON file that would have put about 80M flowfiles into the queue. Not typical, but even putting a few hundred thousand tiny flowfiles at once is not necessarily a good thing either.

@MikeThomsen
Copy link
Contributor

@arenger

Recommend you consider closing this out and putting up a PR for the JsonSurfer branch.

@arenger
Copy link
Author

arenger commented Jul 2, 2019

Thanks @MikeThomsen . I already have a different PR for the alternate JsonSurfer implementation: #3455 We'll see what comes of it!

@arenger arenger closed this Jul 2, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants