From e07c3c8be22fe5120e2252e4714afd298bfe0074 Mon Sep 17 00:00:00 2001 From: ambud Date: Thu, 23 Feb 2017 12:51:58 -0600 Subject: [PATCH] Add table attributes to flow files https://issues.apache.org/jira/browse/NIFI-3525 --- .../apache/nifi/processors/standard/QueryDatabaseTable.java | 5 +++-- .../nifi/processors/standard/QueryDatabaseTableTest.java | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 1d898b4b47a9..0b54a5b3f9ca 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -85,6 +85,7 @@ + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + "per the State Management documentation") @WritesAttributes({ + @WritesAttribute(attribute = "tablename", description="Name of the table being queried"), @WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"), @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), @@ -101,6 +102,7 @@ + "max value for max value columns. Properties should be added in the format `initial.maxvalue.{max_value_column}`.") public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { + public static final String RESULT_TABLENAME = "tablename"; public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; public static final String INTIIAL_MAX_VALUE_PROP_START = "initial.maxvalue."; @@ -277,7 +279,6 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory final AtomicLong nrOfRows = new AtomicLong(0L); FlowFile fileToProcess = session.create(); - try { fileToProcess = session.write(fileToProcess, out -> { // Max values will be updated in the state property map by the callback @@ -297,7 +298,7 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory if (nrOfRows.get() > 0) { // set attribute how many rows were selected fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - + fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName); if(maxRowsPerFlowFile > 0) { fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier); fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 92f4757f4e5e..6af59ebab689 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -192,6 +192,7 @@ public void testAddedRows() throws ClassNotFoundException, SQLException, Initial runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 2); MockFlowFile flowFile = runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute(QueryDatabaseTable.RESULT_TABLENAME)); assertEquals(flowFile.getAttribute("maxvalue.id"), "2"); InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); runner.setProperty(QueryDatabaseTable.FETCH_SIZE, "2");