From 6367110cf620ac765ba5f556e6f11fbe0545e639 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 14 Mar 2017 23:19:05 +0100 Subject: [PATCH 1/2] NIFI-3596 - added attributes to GenerateTableFetch processor --- .../standard/GenerateTableFetch.java | 27 +++++++++++++++++-- .../standard/TestGenerateTableFetch.java | 18 +++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index a850e74e9211..a192d83fc122 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -85,7 +85,14 @@ + "per the State Management documentation") @WritesAttributes({ @WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes " - + "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message.") + + "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."), + @WritesAttribute(attribute = "generatetablefetch.tableName", description = "The name of the database table to be queried."), + @WritesAttribute(attribute = "generatetablefetch.columnNames", description = "The comma-separated list of column names used in the query."), + @WritesAttribute(attribute = "generatetablefetch.whereClause", description = "Where clause used in the query to get the expected rows."), + @WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data " + + "that has been returned since the processor started running."), + @WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."), + @WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.") }) public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { @@ -140,6 +147,7 @@ protected Collection customValidate(ValidationContext validati return super.customValidate(validationContext); } + @Override @OnScheduled public void setup(final ProcessContext context) { // Pre-fetch the column types if using a static table name and max-value columns @@ -291,9 +299,24 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory for (long i = 0; i < numberOfFetches; i++) { long limit = partitionSize == 0 ? null : partitionSize; long offset = partitionSize == 0 ? null : i * partitionSize; - final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset); + final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); + final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset); FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); + sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName); + if (columnNames != null) { + sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.columnNames", columnNames); + } + if (StringUtils.isNotBlank(whereClause)) { + sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.whereClause", whereClause); + } + if (StringUtils.isNotBlank(maxColumnNames)) { + sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.maxColumnNames", maxColumnNames); + } + sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.limit", String.valueOf(limit)); + if (partitionSize != 0) { + sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.offset", String.valueOf(offset)); + } session.transfer(sqlFlowFile, REL_SUCCESS); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 79093b2cf294..93468087dc9d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -207,6 +207,12 @@ public void testAddedRows() throws ClassNotFoundException, SQLException, Initial assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray())); flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(3); assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 6 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray())); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName")); + assertEquals(null, flowFile.getAttribute("generatetablefetch.columnNames")); + assertEquals(null, flowFile.getAttribute("generatetablefetch.whereClause")); + assertEquals("name", flowFile.getAttribute("generatetablefetch.maxColumnNames")); + assertEquals("2", flowFile.getAttribute("generatetablefetch.limit")); + assertEquals("6", flowFile.getAttribute("generatetablefetch.offset")); runner.clearTransferState(); } @@ -415,6 +421,12 @@ public void testBackwardsCompatibilityStateKeyDynamicTableDynamicMaxValues() thr MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); // Note there is no WHERE clause here. Because we are using dynamic tables, the old state key/value is not retrieved assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName")); + assertEquals(null, flowFile.getAttribute("generatetablefetch.columnNames")); + assertEquals(null, flowFile.getAttribute("generatetablefetch.whereClause")); + assertEquals("id", flowFile.getAttribute("generatetablefetch.maxColumnNames")); + assertEquals("10000", flowFile.getAttribute("generatetablefetch.limit")); + assertEquals("0", flowFile.getAttribute("generatetablefetch.offset")); runner.clearTransferState(); stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)"); @@ -428,6 +440,12 @@ public void testBackwardsCompatibilityStateKeyDynamicTableDynamicMaxValues() thr runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName")); + assertEquals(null, flowFile.getAttribute("generatetablefetch.columnNames")); + assertEquals("id > 1", flowFile.getAttribute("generatetablefetch.whereClause")); + assertEquals("id", flowFile.getAttribute("generatetablefetch.maxColumnNames")); + assertEquals("10000", flowFile.getAttribute("generatetablefetch.limit")); + assertEquals("0", flowFile.getAttribute("generatetablefetch.offset")); } @Test From f1a5ac10702ef6d51666ce553d36a085f1f6023d Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 24 Mar 2017 22:09:34 +0100 Subject: [PATCH 2/2] Updated test to check selected column names --- .../processors/standard/TestGenerateTableFetch.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 93468087dc9d..a9b675c8bfbd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -197,18 +197,19 @@ public void testAddedRows() throws ClassNotFoundException, SQLException, Initial // Set name as the max value column name (and clear the state), all rows should be returned since the max value for name has not been set runner.getStateManager().clear(Scope.CLUSTER); runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "name"); + runner.setProperty(GenerateTableFetch.COLUMN_NAMES, "id, name, scale, created_on"); runner.run(); runner.assertAllFlowFilesTransferred(REL_SUCCESS, 4); // 7 records with partition size 2 means 4 generated FlowFiles flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); - assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY name FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray())); + assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray())); flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(1); - assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray())); + assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray())); flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(2); - assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray())); + assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray())); flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(3); - assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 6 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray())); + assertEquals("SELECT id, name, scale, created_on FROM TEST_QUERY_DB_TABLE ORDER BY name OFFSET 6 ROWS FETCH NEXT 2 ROWS ONLY", new String(flowFile.toByteArray())); assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName")); - assertEquals(null, flowFile.getAttribute("generatetablefetch.columnNames")); + assertEquals("id, name, scale, created_on", flowFile.getAttribute("generatetablefetch.columnNames")); assertEquals(null, flowFile.getAttribute("generatetablefetch.whereClause")); assertEquals("name", flowFile.getAttribute("generatetablefetch.maxColumnNames")); assertEquals("2", flowFile.getAttribute("generatetablefetch.limit"));