Skip to content

Commit

Permalink
Added newlines to files, removed inaccurate comments, removed unneces…
Browse files Browse the repository at this point in the history
…sary logs, and added back custom exceptions
  • Loading branch information
andrewjc2000 committed Aug 10, 2020
1 parent 33de111 commit 89b3745
Show file tree
Hide file tree
Showing 26 changed files with 339 additions and 295 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ to the [CLI options](https://daffodil.apache.org/cli/) for the Daffodil tool.
Here are is a note about the __Stream__ option:

- __Stream Mode:__ This mode is disabled by default, but when enabled parsing will continue in the situation
that there is leftover data rather than erroring out; it is simply repeated, generating a Record per parse.
that there is leftover data rather than routing to failure; it is simply repeated, generating a Record per parse.
If they are all successful, a Set of Records will be generated.
When using this mode for the XML Reader and Writer components, the Writer component must be configured with a
name for the Root Tag, and the Reader component must be configured with "Expect Records as Array" set to true.
Expand All @@ -32,6 +32,10 @@ And here is a note about __Tunables__ and __External Variables__:
configuring the Processor. To add Tunables, do the same thing but add a "+" character in front of the name
of the tunable variable; i.e. +maxOccursCount would be the key and something like 10 would be the value.

## Note about the Controllers

Currently, when using the DaffodilReader and DaffodilWriter Controllers in this project, unparsing from XML is not fully supported due to the fact that the NiFi XMLReader Controller ignores empty XML elements. Unparsing from XML is only supported for XML infosets that do not contain any empty XML elements. However, unparsing via JSON is fully supported.

## Build Instructions

**This is a specific step for the development branch**:
Expand All @@ -40,7 +44,9 @@ Because this project depends on a snapshot version of Daffodil, in order to run
must first clone the latest version of [Apache Daffodil](https://github.com/apache/incubator-daffodil)
and run

`sbt publishM2`
sbt publishM2

This step will not be necessary once Daffodil 3.0.0 is released.

Then, the following should work as expected:

Expand All @@ -55,4 +61,4 @@ environment.

## License

NiFi Daffodil Processors is licensed under the Apache Software License v2.
NiFi Daffodil Processors is licensed under the Apache Software License v2.
2 changes: 1 addition & 1 deletion nifi-daffodil-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
</repositories>

<dependencies>
<!-- TODO: Replace this dependency with two others: the one for Daffodil Core and Daffodil JAPI once
<!-- TODO: Replace this dependency with two others: the one for Daffodil Core and Daffodil Java API once
once a Daffodil release contains commit 92d2036e3d (2.7.0 does not, the first release candidate
for 3.0.0 likely will.)
-->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package com.tresys.nifi.controllers;

import com.tresys.nifi.properties.DaffodilProperties;
import static com.tresys.nifi.properties.DaffodilProperties.DataProcessorSchemaPair;
import static com.tresys.nifi.properties.DaffodilProperties.StreamMode;
import com.tresys.nifi.util.DaffodilCompileException;
import com.tresys.nifi.util.DaffodilResources;
import static com.tresys.nifi.util.DaffodilResources.DataProcessorSchemaPair;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -26,15 +25,19 @@ public void storeConfiguration(final ConfigurationContext context) {
this.context = context;
}

protected DataProcessorSchemaPair getPair(Map<String, String> attributes) throws IOException {
protected DataProcessorSchemaPair getPair(Map<String, String> attributes) throws DaffodilCompileException {
if (context == null) {
return null;
} else {
String dfdlFile = context.getProperty(DaffodilProperties.DFDL_SCHEMA_FILE).evaluateAttributeExpressions(attributes).getValue();
return DaffodilProperties.getDataProcessorSchemaPair(getLogger(), context, dfdlFile);
String dfdlFile = context.getProperty(DaffodilResources.DFDL_SCHEMA_FILE).evaluateAttributeExpressions(attributes).getValue();
return DaffodilResources.getDataProcessorSchemaPair(getLogger(), context, dfdlFile);
}
}

public enum StreamMode {
OFF, ALL_SUCCESSFUL, ONLY_SUCCESSFUL
}

public static final AllowableValue STREAM_MODE_OFF
= new AllowableValue(StreamMode.OFF.name(), StreamMode.OFF.name(), "Stream Mode is off.");
public static final AllowableValue STREAM_MODE_ALL_SUCCESSFUL
Expand Down Expand Up @@ -64,7 +67,7 @@ protected DataProcessorSchemaPair getPair(Map<String, String> attributes) throws
private static final List<PropertyDescriptor> controllerProperties;

static {
List<PropertyDescriptor> properties = new ArrayList<>(DaffodilProperties.daffodilProperties);
List<PropertyDescriptor> properties = new ArrayList<>(DaffodilResources.daffodilProperties);
properties.add(STREAM_MODE);
controllerProperties = Collections.unmodifiableList(properties);
}
Expand All @@ -76,7 +79,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return DaffodilProperties.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
return DaffodilResources.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.tresys.nifi.controllers;

import static com.tresys.nifi.properties.DaffodilProperties.DataProcessorSchemaPair;
import com.tresys.nifi.util.DaffodilResources.DataProcessorSchemaPair;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.logging.ComponentLog;
Expand All @@ -15,6 +15,7 @@
@Tags({"xml", "json", "daffodil", "dfdl", "schema", "xsd"})
@CapabilityDescription("Use Daffodil and a user-specified DFDL schema to transform data to an infoset into Records")
public class DaffodilReader extends AbstractDaffodilController implements RecordReaderFactory {

@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream,
long inputLength, ComponentLog logger) {
Expand All @@ -26,4 +27,5 @@ public RecordReader createRecordReader(Map<String, String> variables, InputStrea
throw new ProcessException("Unable to obtain DataProcessor and/or Schema", ioe);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package com.tresys.nifi.controllers;

import com.tresys.nifi.controllers.AbstractDaffodilController.StreamMode;
import com.tresys.nifi.infoset.InfosetNode;
import com.tresys.nifi.infoset.InfosetNodeOutputter;
import com.tresys.nifi.schema.OptionalRecordField;
import com.tresys.nifi.schema.RecordUtil;
import com.tresys.nifi.properties.DaffodilProperties.StreamMode;
import com.tresys.nifi.util.DaffodilProcessingException;
import org.apache.daffodil.japi.DataLocation;
import org.apache.daffodil.japi.DataProcessor;
import org.apache.daffodil.japi.ParseResult;
Expand Down Expand Up @@ -72,10 +72,6 @@ public DaffodilRecordReader(RecordSchema schema, InputStream inputStream, DataPr
this.streamMode = streamMode;
}

private void log(String message, Object... attributes) {
RecordUtil.log(logger, message, attributes);
}

@Override
public Record nextRecord(boolean shouldCoerceTypes, boolean ignored) throws IOException {
this.coerceTypes = shouldCoerceTypes;
Expand All @@ -87,12 +83,10 @@ public Record nextRecord(boolean shouldCoerceTypes, boolean ignored) throws IOEx
InfosetNode rootNode = parseToInfosetNode();
Record result = complexNodeToRecord(originalSchema, rootNode);
currentSchema = result.getSchema();
log("Infoset Root: {}", rootNode);
log("Final Result: {}", RecordUtil.printRecord(result, ""));
log("Current Schema: {}", RecordUtil.printRecordSchemaHelper(currentSchema, ""));
return result;
} catch (IllegalStateException possiblyIgnored) {
} catch (DaffodilProcessingException possiblyIgnored) {
if (streamMode != StreamMode.ONLY_SUCCESSFUL) {
logger.error("Error Performing Daffodil Parse: {}", new Object[]{possiblyIgnored});
throw possiblyIgnored;
}
return null;
Expand All @@ -107,20 +101,20 @@ public Record nextRecord(boolean shouldCoerceTypes, boolean ignored) throws IOEx
* the Infoset data in the form of an InfosetNode tree
* @return the InfosetNode result of the parse
*/
private InfosetNode parseToInfosetNode() {
private InfosetNode parseToInfosetNode() throws DaffodilProcessingException {
InfosetNodeOutputter outputter = new InfosetNodeOutputter();
ParseResult result = dataProcessor.parse(dataInputStream, outputter);
if (result.isError()) {
throw new IllegalStateException("Failed to parse: " + result.getDiagnostics());
throw new DaffodilProcessingException("Failed to parse: " + result.getDiagnostics());
}
if (!outputter.getResult().isDefined()) {
throw new IllegalStateException("NodeOutputter did not successfully parse infoset!");
throw new DaffodilProcessingException("NodeOutputter did not successfully parse infoset!");
}
// Currently, if all bytes are not consumed and the Processor is not set to stream mode, we
// throw an Exception and then, as a result, route to failure
DataLocation loc = result.location();
if (loc.bitPos1b() - 1 <= bitsRead) {
throw new IllegalStateException(
throw new DaffodilProcessingException(
String.format(
"No data consumed! The current parse started and ended with %s bit(s)"
+ " having been read when trying to parse %s",
Expand All @@ -131,7 +125,7 @@ private InfosetNode parseToInfosetNode() {
bitsRead = loc.bitPos1b() - 1;
}
if (streamMode == StreamMode.OFF && totalBits != bitsRead) {
throw new IllegalStateException(
throw new DaffodilProcessingException(
String.format(
"Left over data. Consumed %s bit(s) with %s bit(s) remaining when parsing %s",
bitsRead, totalBits - bitsRead, outputter.getResult().get()
Expand All @@ -147,11 +141,7 @@ private InfosetNode parseToInfosetNode() {
* @param parentNode the InfosetNode with data corresponding to this Record
* @return a NiFi Record with a possibly modified Schema correctly filled with the data from the Node
*/
private Record complexNodeToRecord(RecordSchema schema, InfosetNode parentNode) {
log(
"Attempting to construct Record with schema {} and parentNode {}",
RecordUtil.printRecordSchemaHelper(schema, ""), parentNode
);
private Record complexNodeToRecord(RecordSchema schema, InfosetNode parentNode) throws DaffodilProcessingException {
Map<String, Object> recordMap = new LinkedHashMap<>();
List<RecordField> modifiedFieldList = new ArrayList<>();
for (RecordField field: schema.getFields()) {
Expand All @@ -164,7 +154,7 @@ private Record complexNodeToRecord(RecordSchema schema, InfosetNode parentNode)
modifiedFieldList.add(pair.toRecordField());
recordMap.put(pair.name, pair.value);
}
} catch (IllegalStateException possiblyIgnored) {
} catch (DaffodilProcessingException possiblyIgnored) {
if (!(field instanceof OptionalRecordField)) {
throw possiblyIgnored;
}
Expand Down Expand Up @@ -211,8 +201,8 @@ public RecordField toRecordField() {
* @param parentNode the parent of the list that the current child Node is in
* @return a list of Field-Value Pairs to insert into the Record containing this field
*/
private List<FieldValuePair> getRecordValue(String fieldName, DataType dataType, InfosetNode parentNode) {
log("Attempting to construct Field {} of type {} with Node {}", fieldName, dataType, parentNode);
private List<FieldValuePair> getRecordValue(String fieldName, DataType dataType,
InfosetNode parentNode) throws DaffodilProcessingException {
List<FieldValuePair> fieldValuePairs = new ArrayList<>();
Optional<InfosetNode> optChild = parentNode.getChild(fieldName);
if (fieldName == null) {
Expand All @@ -221,7 +211,7 @@ private List<FieldValuePair> getRecordValue(String fieldName, DataType dataType,
optChild = Optional.of(parentNode);
}
if (!(dataType instanceof ChoiceDataType) && !optChild.isPresent()) {
throw new IllegalStateException(
throw new DaffodilProcessingException(
String.format(
"Required Schema field %s was not present in child list %s", fieldName, parentNode.childrenToString()
)
Expand Down Expand Up @@ -260,15 +250,15 @@ private List<FieldValuePair> getRecordValue(String fieldName, DataType dataType,
* are not all RecordDataTypes
* @param parentNode the parent InfosetNode whose child list should contain the Choice option to be selected
* @return a list of Pairs representing the Fields & Values that were selected for the Choice
* @throws IllegalStateException if one of the possibleSubTypes is not a RecordDataType, or if none
* @throws DaffodilProcessingException if one of the possibleSubTypes is not a RecordDataType, or if none
* of the choice options are successfully selected.
*/
private List<FieldValuePair> choiceToPairList(List<DataType> possibleSubTypes, InfosetNode parentNode) {
log("Attempting to construct Choice Record with possible sub types {} and parentNode {}", possibleSubTypes, parentNode);
private List<FieldValuePair> choiceToPairList(List<DataType> possibleSubTypes,
InfosetNode parentNode) throws DaffodilProcessingException {
List<FieldValuePair> fieldValPairs = new ArrayList<>();
for (DataType possibleType: possibleSubTypes) {
if (!(possibleType instanceof RecordDataType)) {
throw new IllegalStateException("Possible Type of Choice element was not a record!");
throw new DaffodilProcessingException("Possible Type of Choice element was not a record!");
} else {
RecordDataType possRecordType = (RecordDataType) possibleType;
// RecordFields belonging to the RecordSchema for the current Choice Option
Expand All @@ -293,7 +283,7 @@ private List<FieldValuePair> choiceToPairList(List<DataType> possibleSubTypes, I
}
}
if (fieldValPairs.isEmpty()) {
throw new IllegalStateException(
throw new DaffodilProcessingException(
String.format("InfosetNode Child List %s did not match any choice option of choice %s",
possibleSubTypes.toString(), parentNode.toString()
)
Expand All @@ -313,11 +303,8 @@ private List<FieldValuePair> choiceToPairList(List<DataType> possibleSubTypes, I
* @param arrayNode the InfosetNode representing the current Array
* @return a FieldValuePair as described above
*/
private FieldValuePair arrayToPair(String originalFieldName, DataType childType, InfosetNode arrayNode) {
log(
"Attempting to construct Array with field name {}, child type {}, and members {}",
originalFieldName, childType, arrayNode.childrenToString()
);
private FieldValuePair arrayToPair(String originalFieldName, DataType childType,
InfosetNode arrayNode) throws DaffodilProcessingException {
List<Object> valueArrList = new ArrayList<>();
List<DataType> possibleSubTypes = new ArrayList<>();
for (InfosetNode arrayMember : arrayNode) {
Expand All @@ -337,9 +324,9 @@ private FieldValuePair arrayToPair(String originalFieldName, DataType childType,
* @param type the type that the String is attempting to parse into
* @param simpleTypeValue the String to be parsed
* @return the parsed version of the String, if it succeeds
* @throws IllegalStateException if type is not one of the known types or the parse fails
* @throws DaffodilProcessingException if type is not one of the known types or the parse fails
*/
private Object coerceSimpleType(RecordFieldType type, String simpleTypeValue) {
private Object coerceSimpleType(RecordFieldType type, String simpleTypeValue) throws DaffodilProcessingException {
try {
switch (type) {
case STRING:
Expand All @@ -355,10 +342,10 @@ private Object coerceSimpleType(RecordFieldType type, String simpleTypeValue) {
case BIGINT:
return new BigInteger(simpleTypeValue);
default:
throw new IllegalStateException("Attempted coercing unsupported type " + type);
throw new DaffodilProcessingException("Attempted coercing unsupported type " + type);
}
} catch (NumberFormatException nfe) {
throw new IllegalStateException(String.format("Could not cast %s to a %s", simpleTypeValue, type.name()));
throw new DaffodilProcessingException(String.format("Could not cast %s to a %s", simpleTypeValue, type.name()));
}
}

Expand Down
Loading

0 comments on commit 89b3745

Please sign in to comment.