From ddc411baf53f281a2a9f4e309d57aca1c6231129 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Thu, 7 Jun 2018 15:03:16 +0200 Subject: [PATCH] NIFI-5264 - Added attribute for validation error message in ValidateCSV --- .../apache/nifi/processors/standard/ValidateCsv.java | 12 +++++++++++- .../nifi/processors/standard/TestValidateCsv.java | 4 ++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java index 667d4f941fba..9796822aff6f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java @@ -92,7 +92,8 @@ @WritesAttributes({ @WritesAttribute(attribute="count.valid.lines", description="If line by line validation, number of valid lines extracted from the source data"), @WritesAttribute(attribute="count.invalid.lines", description="If line by line validation, number of invalid lines extracted from the source data"), - @WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data") + @WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data"), + @WritesAttribute(attribute="validation.error.message", description="For flow files routed to invalid, message of the first validation error") }) public class ValidateCsv extends AbstractProcessor { @@ -455,6 +456,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final AtomicReference totalCount = new AtomicReference(0); final AtomicReference invalidFF = new AtomicReference(null); final AtomicReference validFF = new AtomicReference(null); + final AtomicReference validationError = new AtomicReference(null); if(!isWholeFFValidation) { invalidFF.set(session.create(flowFile)); @@ -514,6 +516,7 @@ public void process(OutputStream out) throws IOException { } catch (final SuperCsvException e) { valid.set(false); if(isWholeFFValidation) { + validationError.set(e.getLocalizedMessage()); logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e); break; } else { @@ -528,6 +531,10 @@ public void process(OutputStream out) throws IOException { if(isFirstLineInvalid.get()) { isFirstLineInvalid.set(false); } + + if(validationError.get() == null) { + validationError.set(e.getLocalizedMessage()); + } } } finally { if(!isWholeFFValidation) { @@ -554,6 +561,7 @@ public void process(OutputStream out) throws IOException { session.transfer(flowFile, REL_VALID); } else { session.getProvenanceReporter().route(flowFile, REL_INVALID); + session.putAttribute(flowFile, "validation.error.message", validationError.get()); session.transfer(flowFile, REL_INVALID); } } else { @@ -578,6 +586,7 @@ public void process(OutputStream out) throws IOException { session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)"); session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString((totalCount.get() - okCount.get()))); session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get())); + session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get()); session.transfer(invalidFF.get(), REL_INVALID); session.remove(flowFile); } else { @@ -585,6 +594,7 @@ public void process(OutputStream out) throws IOException { session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid"); session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get())); session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get())); + session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get()); session.transfer(invalidFF.get(), REL_INVALID); session.remove(validFF.get()); session.remove(flowFile); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java index 097aad879aee..b03aed4718b3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java @@ -121,6 +121,8 @@ public void testValidDateOptionalDouble() { runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0"); runner.run(); runner.assertTransferCount(ValidateCsv.REL_INVALID, 1); + runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("validation.error.message", + "'22/111954' could not be parsed as a Date"); } @Test @@ -197,6 +199,8 @@ public void testStrlenStrMinMaxStrRegex() { runner.enqueue("test,test,testapache.org"); runner.run(); runner.assertTransferCount(ValidateCsv.REL_INVALID, 1); + runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("validation.error.message", + "'testapache.org' does not match the regular expression '[a-z0-9\\._]+@[a-z0-9\\.]+'"); } @Test