Skip to content

Commit

Permalink
Added 2 new Controllers that perform Daffodil Parse and Unparse using…
Browse files Browse the repository at this point in the history
… Records, Implemented nearly all CLI options for both existing Processors and new Controllers, and moved all shared properties and methods to new global file for Processors and Controllers to use
  • Loading branch information
andrewjc2000 authored and stevedlawrence committed Aug 27, 2020
1 parent 3da5ccb commit 8795400
Show file tree
Hide file tree
Showing 43 changed files with 3,138 additions and 430 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
target
target/*
nifi-daffodil-nar/target/
nifi-daffodil-processors/target/
44 changes: 40 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,50 @@ This repository contains the source for two NiFi processors which use
[Apache Daffodil (incubating)](https://daffodil.apache.org),
an open source implementation of the [Data Format Description Language
(DFDL)](https://www.ogf.org/ogf/doku.php/standards/dfdl/dfdl) to parse/unparse
data to/from an XML infoset. The two processor included are:
data to/from NiFi Records, which is then transformed into an Infoset based on the supplied NiFi Controller.
The two processors included are:

* DaffodilParse: Reads a FlowFile and parses the data into an XML infoset
* DaffodilUnparse: Reads a FlowFile, in the form of an XML infoset, and
unparses the infoset to the original file format
* DaffodilParse: Reads a FlowFile and parses the data into a NiFi Record,
which is then converted into an Infoset by a NiFi RecordSetWriter component.
* DaffodilUnparse: Reads a FlowFile containing an infoset in some form, reads it using the correct NiFi RecordReader
component and converts it into Records, and then unparses these Records to the original file format.

## Processor Properties

Each Processor has a number of configurable properties intended to be analogous
to the [CLI options](https://daffodil.apache.org/cli/) for the Daffodil tool.
Here are is a note about the __Stream__ option:

- __Stream Mode:__ This mode is disabled by default, but when enabled parsing will continue in the situation
that there is leftover data rather than routing to failure; it is simply repeated, generating a Record per parse.
If they are all successful, a Set of Records will be generated.
When using this mode for the XML Reader and Writer components, the Writer component must be configured with a
name for the Root Tag, and the Reader component must be configured with "Expect Records as Array" set to true.

And here is a note about __Tunables__ and __External Variables__:

- To add External Variables to the Processor, simply add custom key/value pairs as custom Properties when
configuring the Processor. To add Tunables, do the same thing but add a "+" character in front of the name
of the tunable variable; i.e. +maxOccursCount would be the key and something like 10 would be the value.

## Note about the Controllers

Currently, when using the DaffodilReader and DaffodilWriter Controllers in this project, unparsing from XML is not fully supported due to the fact that the NiFi XMLReader Controller ignores empty XML elements. Unparsing from XML is only supported for XML infosets that do not contain any empty XML elements. However, unparsing via JSON is fully supported.

## Build Instructions

**This is a specific step for the development branch**:

Because this project depends on a snapshot version of Daffodil, in order to run `mvn install` you
must first clone the latest version of [Apache Daffodil](https://github.com/apache/incubator-daffodil)
and run

sbt publishM2

This step will not be necessary once Daffodil 3.0.0 is released.

Then, the following should work as expected:

This repository uses the maven build environment. To create a nar file for use
in Apache NiFi, run

Expand Down
8 changes: 4 additions & 4 deletions nifi-daffodil-nar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.tresys</groupId>
<artifactId>nifi-daffodil</artifactId>
<version>1.5</version>
<version>2.0</version>
</parent>

<artifactId>nifi-daffodil-nar</artifactId>
<version>1.5</version>
<version>2.0</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
Expand All @@ -34,7 +34,7 @@
<dependency>
<groupId>com.tresys</groupId>
<artifactId>nifi-daffodil-processors</artifactId>
<version>1.5</version>
<version>2.0</version>
</dependency>
</dependencies>

Expand Down
116 changes: 104 additions & 12 deletions nifi-daffodil-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,145 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.tresys</groupId>
<artifactId>nifi-daffodil</artifactId>
<version>1.5</version>
<version>2.0</version>
</parent>

<artifactId>nifi-daffodil-processors</artifactId>
<packaging>jar</packaging>

<!-- TODO: Delete this once the TODO below is taken care of-->
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>

<dependencies>
<!-- TODO: Replace this dependency with two others: the one for Daffodil Core and Daffodil Java API once
once a Daffodil release contains commit 92d2036e3d (2.7.0 does not, the first release candidate
for 3.0.0 likely will.)
-->
<dependency>
<groupId>com.github.apache</groupId>
<artifactId>incubator-daffodil</artifactId>
<version>92d2036e3d</version>
</dependency>
<!-- For most of the NIFI imports -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${nifi.version}</version>
</dependency>
<!-- For Record, RecordSchema, etc. classes -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<version>${nifi.version}</version>
</dependency>
<!-- For RecordReaderFactory -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<artifactId>nifi-record-serialization-service-api</artifactId>
<version>${nifi.version}</version>
<scope>compile</scope>
</dependency>
<!-- To make the NAR actually build with references to RecordReaderFactory without completely failing -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services-nar</artifactId>
<version>${nifi.version}</version>
<type>nar</type>
</dependency>

<!-- For StandardValidators -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>${nifi.version}</version>
</dependency>

<!-- For the caching done in AbstractDFDLRecordComponent -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<!-- For Setting up a Simulated NiFi Environment in Unit Testing-->
<dependency>
<groupId>org.apache.daffodil</groupId>
<artifactId>daffodil-japi_2.12</artifactId>
<version>2.5.0</version>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${nifi.version}</version>
<scope>test</scope>
</dependency>

<!-- For ConvertRecord in Unit Tests-->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<artifactId>nifi-standard-processors</artifactId>
<version>${nifi.version}</version>
<scope>test</scope>
</dependency>
<!-- For The Json and XML Reader/Writer Components in Unit Tests-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>${nifi.version}</version>
<scope>test</scope>
</dependency>
<!-- For SchemaAccessUtils so that Reader/Writer components can be instantiated in Unit Tests-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<version>${nifi.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<!-- This entire setup is necessary in order to mix Scala and Java code in the same Maven project-->
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.tresys.nifi.controllers;

import com.tresys.nifi.util.DaffodilCompileException;
import com.tresys.nifi.util.DaffodilResources;
import static com.tresys.nifi.util.DaffodilResources.DataProcessorSchemaPair;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public abstract class AbstractDaffodilController extends AbstractControllerService {

private volatile ConfigurationContext context;
protected volatile StreamMode streamMode;

@OnEnabled
public void storeConfiguration(final ConfigurationContext context) {
this.streamMode = StreamMode.valueOf(context.getProperty(STREAM_MODE).getValue());
this.context = context;
}

protected DataProcessorSchemaPair getPair(Map<String, String> attributes) throws DaffodilCompileException {
if (context == null) {
return null;
} else {
String dfdlFile = context.getProperty(DaffodilResources.DFDL_SCHEMA_FILE).evaluateAttributeExpressions(attributes).getValue();
return DaffodilResources.getDataProcessorSchemaPair(getLogger(), context, dfdlFile);
}
}

public enum StreamMode {
OFF, ALL_SUCCESSFUL, ONLY_SUCCESSFUL
}

public static final AllowableValue STREAM_MODE_OFF
= new AllowableValue(StreamMode.OFF.name(), StreamMode.OFF.name(), "Stream Mode is off.");
public static final AllowableValue STREAM_MODE_ALL_SUCCESSFUL
= new AllowableValue(StreamMode.ALL_SUCCESSFUL.name(), StreamMode.ALL_SUCCESSFUL.name(),
"Multiple records are parsed until there is no remaining data. If there is a failure, stop and discard all records.");
public static final AllowableValue STREAM_MODE_ONLY_SUCCESSFUL
= new AllowableValue(StreamMode.ONLY_SUCCESSFUL.name(), StreamMode.ONLY_SUCCESSFUL.name(),
"Multiple records are parsed until there is no remaining data. If there is a failure, stop, and keep all successful records.");

/**
* If this Property is set to true, then multiple Records will be produced when there is leftover data, with each one beginning where
* the last one left off. Normally leftover data just errors out. We will still route to failure if *any* of these Records
* are not successfully produced. Making this option true does not cause any issues for unparsing, as the unparse Record component is
* a RecordSetWriterFactory, which is able to handle the data containing a set of Records rather than just one Record.
*/
static final PropertyDescriptor STREAM_MODE = new PropertyDescriptor.Builder()
.name("stream-mode")
.displayName("Stream Mode")
.description("Rather than throwing an error when left over data exists after a parse, one can repeat the parse with the remaining data. "
+ "With the 'All Successful' Mode, an error is thrown if any of the parses fail, whereas with 'Only Successful', the parse will succeed,"
+ " and only successful parses show up in the output.")
.required(false)
.defaultValue(STREAM_MODE_OFF.getValue())
.allowableValues(STREAM_MODE_OFF, STREAM_MODE_ALL_SUCCESSFUL, STREAM_MODE_ONLY_SUCCESSFUL)
.build();

private static final List<PropertyDescriptor> controllerProperties;

static {
List<PropertyDescriptor> properties = new ArrayList<>(DaffodilResources.daffodilProperties);
properties.add(STREAM_MODE);
controllerProperties = Collections.unmodifiableList(properties);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return controllerProperties;
}

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return DaffodilResources.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.tresys.nifi.controllers;

import com.tresys.nifi.util.DaffodilResources.DataProcessorSchemaPair;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

@Tags({"xml", "json", "daffodil", "dfdl", "schema", "xsd"})
@CapabilityDescription("Use Daffodil and a user-specified DFDL schema to transform data to an infoset into Records")
public class DaffodilReader extends AbstractDaffodilController implements RecordReaderFactory {

@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream,
long inputLength, ComponentLog logger) {
try {
DataProcessorSchemaPair pair = getPair(variables);
return new DaffodilRecordReader(pair.getSchema(), inputStream, pair.getDataProcessor(), streamMode, logger);
} catch (IOException ioe) {
getLogger().error("Unable to obtain DataProcessor and/or Schema due to {}", new Object[]{ioe.getMessage()});
throw new ProcessException("Unable to obtain DataProcessor and/or Schema", ioe);
}
}

}
Loading

0 comments on commit 8795400

Please sign in to comment.