From 34ab2d8df08af00c1363c2f23c442e1ce1d5419a Mon Sep 17 00:00:00 2001 From: Martin Mucha Date: Sun, 7 Jan 2018 11:34:30 +0100 Subject: [PATCH] NIFI-4745 : configuration property allowing failure description processing We need to pass description of validation failure further in processing chain, and eventually pass it back to calling system. Therefore having failure desc logged in logs and issued as provenance route event is not sufficient for us. This patch adds new property, specifying name of attribute to be created and filled with validation failure description. Signed-off-by: Martin Mucha --- .../processors/standard/ValidateRecord.java | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java index 5dbc305661fd..92f0500045e2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java @@ -48,6 +48,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; @@ -166,6 +167,16 @@ public class ValidateRecord extends AbstractProcessor { .required(true) .build(); + + static final PropertyDescriptor ATTRIBUTE_NAME_TO_STORE_FAILURE_DESCRIPTION = new PropertyDescriptor.Builder() + .name("emit-failure-description-property") + .displayName("Variable Describing Parse Failure") + .description("If validation fails, validation failure will be stored into variable named according to this setting.") + .required(false) + .expressionLanguageSupported(false) + .addValidator(createAttributeNameValidator()) + .build(); + static final Relationship REL_VALID = new Relationship.Builder() .name("valid") .description("Records that are valid according to the schema will be routed to this relationship") @@ -179,6 +190,24 @@ public class ValidateRecord extends AbstractProcessor { .description("If the records cannot be read, validated, or written, for any reason, the original FlowFile will be routed to this relationship") .build(); + private static Validator createAttributeNameValidator() { + return (subject, input, context) -> { + String validAttributeRegex = "^[a-zA-Z_$][a-zA-Z_$0-9]*$"; + boolean validAttributeName = input == null || input.matches(validAttributeRegex); + + ValidationResult.Builder builder = new ValidationResult.Builder(); + builder.valid(validAttributeName); + builder.input(input); + if (!validAttributeName) { + builder + .subject("Attribute name") + .explanation(String.format("Attribute '%s' has to have format '%s'", + ATTRIBUTE_NAME_TO_STORE_FAILURE_DESCRIPTION.getDisplayName(), + validAttributeRegex)); + } + return builder.build(); + }; + } @Override protected List getSupportedPropertyDescriptors() { @@ -191,6 +220,7 @@ protected List getSupportedPropertyDescriptors() { properties.add(SCHEMA_TEXT); properties.add(ALLOW_EXTRA_FIELDS); properties.add(STRICT_TYPE_CHECKING); + properties.add(ATTRIBUTE_NAME_TO_STORE_FAILURE_DESCRIPTION); return properties; } @@ -330,7 +360,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } if (validWriter != null) { - completeFlowFile(session, validFlowFile, validWriter, REL_VALID, null); + completeFlowFile(session, validFlowFile, validWriter, REL_VALID, null, null); } if (invalidWriter != null) { @@ -369,7 +399,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } final String validationErrorString = errorBuilder.toString(); - completeFlowFile(session, invalidFlowFile, invalidWriter, REL_INVALID, validationErrorString); + final String attributeNameToStoreFailureDescription = context.getProperty(ATTRIBUTE_NAME_TO_STORE_FAILURE_DESCRIPTION).getValue(); + completeFlowFile(session, invalidFlowFile, invalidWriter, REL_INVALID, validationErrorString, attributeNameToStoreFailureDescription); } } finally { closeQuietly(validWriter); @@ -404,7 +435,12 @@ private void closeQuietly(final RecordSetWriter writer) { } } - private void completeFlowFile(final ProcessSession session, final FlowFile flowFile, final RecordSetWriter writer, final Relationship relationship, final String details) throws IOException { + private void completeFlowFile(final ProcessSession session, + final FlowFile flowFile, + final RecordSetWriter writer, + final Relationship relationship, + final String details, + String attributeNameToStoreFailureDescription) throws IOException { final WriteResult writeResult = writer.finishRecordSet(); writer.close(); @@ -412,6 +448,9 @@ private void completeFlowFile(final ProcessSession session, final FlowFile flowF attributes.putAll(writeResult.getAttributes()); attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + if (attributeNameToStoreFailureDescription != null) { + attributes.put(attributeNameToStoreFailureDescription, details); + } session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, relationship);