Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Daffodil Processors now use NiFi Records, and most other CLI Options have been implemented #1

Merged
merged 1 commit into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
target
target/*
nifi-daffodil-nar/target/
nifi-daffodil-processors/target/
44 changes: 40 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,50 @@ This repository contains the source for two NiFi processors which use
[Apache Daffodil (incubating)](https://daffodil.apache.org),
an open source implementation of the [Data Format Description Language
(DFDL)](https://www.ogf.org/ogf/doku.php/standards/dfdl/dfdl) to parse/unparse
data to/from an XML infoset. The two processor included are:
data to/from NiFi Records, which is then transformed into an Infoset based on the supplied NiFi Controller.
The two processors included are:

* DaffodilParse: Reads a FlowFile and parses the data into an XML infoset
* DaffodilUnparse: Reads a FlowFile, in the form of an XML infoset, and
unparses the infoset to the original file format
* DaffodilParse: Reads a FlowFile and parses the data into a NiFi Record,
which is then converted into an Infoset by a NiFi RecordSetWriter component.
* DaffodilUnparse: Reads a FlowFile containing an infoset in some form, reads it using the correct NiFi RecordReader
component and converts it into Records, and then unparses these Records to the original file format.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoguhts on keeping the DaffodilParse and DaffodilUnparse processors exactly the same as before? My concern here is that there are some odd edge cases due to how Records work in NiFi. In some cases, I can imagine we will want to avoid those, and just have Daffodil read/write the infoset directly, rather than going through Records.

We can still have the DaffodilRecordReader/Writer Controllers that I think should be able to be used in any of the standard NiFI Processors taht accespt Record Reader/Writers.

This essentially gives users two options, one where they use Daffodil directly, but are limited to Daffodils XML/JSON output but has no infoset quirks, or one where the use Daffodil via the new Record Reader/Writers, but may have some quirks to eal with.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of users having these Controllers because Controllers, unlike Processors, have to be configured globally. If we want to keep what we had before, I say we make 4 processors available: the 2 processors from before and the 2 new ones that use Records. I think this can be done with quite a bit of code reuse if we want to go this route.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that how controllers work in NiFi? What is wrong with them being configured globally? We should following NiFi conventions as close as possible so people already familiar with using Records will be able to plugin this new one in without having to learn something new.

Copy link
Author

@andrewjc2000 andrewjc2000 Aug 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm saying that by virtue of the fact that Controllers work this way, Processors seems like the more logical representation of the Daffodil tool. But if you don't think that the fact that every instance of the Controller would be on the same schema at a given time (i.e. every controller would have to be set to Parse JPEGS or PNGs, you can't have 2 going at the same time processing different things), then I suppose I can bring back the Controller solution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that NiFi can't create different instances of Controllers with different properties? That seems like a necessary capability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To test, I created to SplitRecord processors, for each I configured the RecordReader property and selected "Create new service" and selected "CSV Reader" for both. This results in two CSV Reader's that can be configured indpendently. Seems like this should allow to have multiple DaffodilReaderWriter controllers that are configured for different schemas.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it would appear you are correct. You can even rename them in order to distinguish them. I apologize for my ignorance, I did not think they were as flexible as Processors.

When I tried to develop controllers before, it required some serious restructuring of the project for it to build properly, but I can try to see if I can get away with keeping the current structure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. I know you have limited time left, so it if there isn't enough time to make the change not a big deal. Especially if there are any other upates you have planned. If you do run out of time, please just make sure to document where you're at some if someone picks up the remaining work they have some idea where to start off. Thanks for all your great work!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would definitely be open to going the controller route during my work day today and tomorrow. Would you like to see something in the end like the project containing the original 2 processors, only modified with the new CLI options, and then the 2 new controllers in the same project? I think that would be very realistic. If we do go this route, for the sake of code reuse, would you mind if I moved out every property into a shared sort of static class, so that way both the Processors and Controllers could use them? It would stink to have to define all of them the same way in 2 places.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the two original processors and two new controllers would probably be the ideal set up. That way people can use the native records (which are probably preferred) with the controllers, but can switch to the processors if they run into the various bugs/quirks you've discovered.

Some sort of shared class to reduce duplciation is definitely a good 👍 I suspect many of the properties will be exatly the same.

## Processor Properties

Each Processor has a number of configurable properties intended to be analogous
to the [CLI options](https://daffodil.apache.org/cli/) for the Daffodil tool.
Here are is a note about the __Stream__ option:

- __Stream Mode:__ This mode is disabled by default, but when enabled parsing will continue in the situation
that there is leftover data rather than routing to failure; it is simply repeated, generating a Record per parse.
If they are all successful, a Set of Records will be generated.
When using this mode for the XML Reader and Writer components, the Writer component must be configured with a
name for the Root Tag, and the Reader component must be configured with "Expect Records as Array" set to true.

And here is a note about __Tunables__ and __External Variables__:

- To add External Variables to the Processor, simply add custom key/value pairs as custom Properties when
configuring the Processor. To add Tunables, do the same thing but add a "+" character in front of the name
of the tunable variable; i.e. +maxOccursCount would be the key and something like 10 would be the value.

## Note about the Controllers

Currently, when using the DaffodilReader and DaffodilWriter Controllers in this project, unparsing from XML is not fully supported due to the fact that the NiFi XMLReader Controller ignores empty XML elements. Unparsing from XML is only supported for XML infosets that do not contain any empty XML elements. However, unparsing via JSON is fully supported.

## Build Instructions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please wrap these lines. Technically it doesn't matter in markdown files, but it makes it easier to read when viewing them as raw text, which is still common.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying to split it up into multiple lines?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, just wrap everything at 80 or 100 characters or something reasonable.

**This is a specific step for the development branch**:

Because this project depends on a snapshot version of Daffodil, in order to run `mvn install` you
must first clone the latest version of [Apache Daffodil](https://github.com/apache/incubator-daffodil)
and run

sbt publishM2

This step will not be necessary once Daffodil 3.0.0 is released.

Then, the following should work as expected:

This repository uses the maven build environment. To create a nar file for use
in Apache NiFi, run

Expand Down
8 changes: 4 additions & 4 deletions nifi-daffodil-nar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.tresys</groupId>
<artifactId>nifi-daffodil</artifactId>
<version>1.5</version>
<version>2.0</version>
</parent>

<artifactId>nifi-daffodil-nar</artifactId>
<version>1.5</version>
<version>2.0</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
Expand All @@ -34,7 +34,7 @@
<dependency>
<groupId>com.tresys</groupId>
<artifactId>nifi-daffodil-processors</artifactId>
<version>1.5</version>
<version>2.0</version>
</dependency>
</dependencies>

Expand Down
116 changes: 104 additions & 12 deletions nifi-daffodil-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,145 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.tresys</groupId>
<artifactId>nifi-daffodil</artifactId>
<version>1.5</version>
<version>2.0</version>
</parent>

<artifactId>nifi-daffodil-processors</artifactId>
<packaging>jar</packaging>

<!-- TODO: Delete this once the TODO below is taken care of-->
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>

<dependencies>
<!-- TODO: Replace this dependency with two others: the one for Daffodil Core and Daffodil Java API once
once a Daffodil release contains commit 92d2036e3d (2.7.0 does not, the first release candidate
for 3.0.0 likely will.)
-->
<dependency>
<groupId>com.github.apache</groupId>
<artifactId>incubator-daffodil</artifactId>
<version>92d2036e3d</version>
</dependency>
<!-- For most of the NIFI imports -->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather this just depend on 3.0.0-SNAPSHOT and require users to build and locally publish their own version of Daffodil.

jitpack looks like something that the Apache Software Foundation would frown upon. Code and packaged code must go through a pretty strict release process to verify that there are no licensing issues before being made publicly availabe, which this artifact skipped. This will need some clarification from the ASF IPMC whether this is okay, but I'd prefer that it just rely on user built snapshot and not use jitpack.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried including this dependency and it didn't work:

<dependency>
    <groupId>org.apache.daffodil</groupId>
    <artifactId>daffodil-japi_2.12</artifactId>
    <version>3.0.0-SNAPSHOT</version>
</dependency>

Is there another special attribute needed because it is a snapshot version?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct, but then you'll need to run sbt publishM2 from the daffodil repo to publish to a local maven repo where maven can find the dependency.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see - should we add that step to the readme and then take it out 3.0.0 is released?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's probably the right approach for the dev branch, which presumably will also depend on a snapshot of daffodil.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason when I switched to this dependency I started getting the following build error:
[ERROR] C:\Users\achafos\Desktop\nifi-daffodil\nifi-daffodil-processors\src\main\java\com\tresys\nifi\processors\AbstractDaffodilProcessor.java:30:31: error: cannot access ValidationMode

Very bizarre, so for now I'll leave in what I had before, but I definitely would prefer to resolve this before the end of the review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your latest changes build for me if I set the dependency to above and run sbt publishM2 from the daffodil repo. You might try clearing your maven cache (~/.m2) and delete all the target directories, and republishing the daffodil snapshot. Maybe the dependency switch caused maven confusion or conflict or something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted .m2, I deleted .ivy, I deleted .sbt, and it still resulted in the same problem. Now Unit Tests aren't running right either, very bizarre.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll circle back to this as the last change to the project, since I need a working build environment in order to develop. Everything works if I keep the Jitpack dependency, as unprofessional/hacky as it is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a problem. This is low hanging fruit that can be easily fixed at some point in the future.

<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${nifi.version}</version>
</dependency>
<!-- For Record, RecordSchema, etc. classes -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<version>${nifi.version}</version>
</dependency>
<!-- For RecordReaderFactory -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<artifactId>nifi-record-serialization-service-api</artifactId>
<version>${nifi.version}</version>
<scope>compile</scope>
</dependency>
<!-- To make the NAR actually build with references to RecordReaderFactory without completely failing -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services-nar</artifactId>
<version>${nifi.version}</version>
<type>nar</type>
</dependency>

<!-- For StandardValidators -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>${nifi.version}</version>
</dependency>

<!-- For the caching done in AbstractDFDLRecordComponent -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<!-- For Setting up a Simulated NiFi Environment in Unit Testing-->
<dependency>
<groupId>org.apache.daffodil</groupId>
<artifactId>daffodil-japi_2.12</artifactId>
<version>2.5.0</version>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${nifi.version}</version>
<scope>test</scope>
</dependency>

<!-- For ConvertRecord in Unit Tests-->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<artifactId>nifi-standard-processors</artifactId>
<version>${nifi.version}</version>
<scope>test</scope>
</dependency>
<!-- For The Json and XML Reader/Writer Components in Unit Tests-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>${nifi.version}</version>
<scope>test</scope>
</dependency>
<!-- For SchemaAccessUtils so that Reader/Writer components can be instantiated in Unit Tests-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<version>${nifi.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<!-- This entire setup is necessary in order to mix Scala and Java code in the same Maven project-->
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.tresys.nifi.controllers;

import com.tresys.nifi.util.DaffodilCompileException;
import com.tresys.nifi.util.DaffodilResources;
import static com.tresys.nifi.util.DaffodilResources.DataProcessorSchemaPair;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public abstract class AbstractDaffodilController extends AbstractControllerService {

private volatile ConfigurationContext context;
protected volatile StreamMode streamMode;

@OnEnabled
public void storeConfiguration(final ConfigurationContext context) {
this.streamMode = StreamMode.valueOf(context.getProperty(STREAM_MODE).getValue());
this.context = context;
}

protected DataProcessorSchemaPair getPair(Map<String, String> attributes) throws DaffodilCompileException {
if (context == null) {
return null;
} else {
String dfdlFile = context.getProperty(DaffodilResources.DFDL_SCHEMA_FILE).evaluateAttributeExpressions(attributes).getValue();
return DaffodilResources.getDataProcessorSchemaPair(getLogger(), context, dfdlFile);
}
}

public enum StreamMode {
OFF, ALL_SUCCESSFUL, ONLY_SUCCESSFUL
}

public static final AllowableValue STREAM_MODE_OFF
= new AllowableValue(StreamMode.OFF.name(), StreamMode.OFF.name(), "Stream Mode is off.");
public static final AllowableValue STREAM_MODE_ALL_SUCCESSFUL
= new AllowableValue(StreamMode.ALL_SUCCESSFUL.name(), StreamMode.ALL_SUCCESSFUL.name(),
"Multiple records are parsed until there is no remaining data. If there is a failure, stop and discard all records.");
public static final AllowableValue STREAM_MODE_ONLY_SUCCESSFUL
= new AllowableValue(StreamMode.ONLY_SUCCESSFUL.name(), StreamMode.ONLY_SUCCESSFUL.name(),
"Multiple records are parsed until there is no remaining data. If there is a failure, stop, and keep all successful records.");

/**
* If this Property is set to true, then multiple Records will be produced when there is leftover data, with each one beginning where
* the last one left off. Normally leftover data just errors out. We will still route to failure if *any* of these Records
* are not successfully produced. Making this option true does not cause any issues for unparsing, as the unparse Record component is
* a RecordSetWriterFactory, which is able to handle the data containing a set of Records rather than just one Record.
*/
static final PropertyDescriptor STREAM_MODE = new PropertyDescriptor.Builder()
.name("stream-mode")
.displayName("Stream Mode")
.description("Rather than throwing an error when left over data exists after a parse, one can repeat the parse with the remaining data. "
+ "With the 'All Successful' Mode, an error is thrown if any of the parses fail, whereas with 'Only Successful', the parse will succeed,"
+ " and only successful parses show up in the output.")
.required(false)
.defaultValue(STREAM_MODE_OFF.getValue())
.allowableValues(STREAM_MODE_OFF, STREAM_MODE_ALL_SUCCESSFUL, STREAM_MODE_ONLY_SUCCESSFUL)
.build();

private static final List<PropertyDescriptor> controllerProperties;

static {
List<PropertyDescriptor> properties = new ArrayList<>(DaffodilResources.daffodilProperties);
properties.add(STREAM_MODE);
controllerProperties = Collections.unmodifiableList(properties);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return controllerProperties;
}

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return DaffodilResources.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.tresys.nifi.controllers;

import com.tresys.nifi.util.DaffodilResources.DataProcessorSchemaPair;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

@Tags({"xml", "json", "daffodil", "dfdl", "schema", "xsd"})
@CapabilityDescription("Use Daffodil and a user-specified DFDL schema to transform data to an infoset into Records")
public class DaffodilReader extends AbstractDaffodilController implements RecordReaderFactory {

@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream,
long inputLength, ComponentLog logger) {
try {
DataProcessorSchemaPair pair = getPair(variables);
return new DaffodilRecordReader(pair.getSchema(), inputStream, pair.getDataProcessor(), streamMode, logger);
} catch (IOException ioe) {
getLogger().error("Unable to obtain DataProcessor and/or Schema due to {}", new Object[]{ioe.getMessage()});
throw new ProcessException("Unable to obtain DataProcessor and/or Schema", ioe);
}
}

}
Loading