From dcc72baf1a5b0dff2aa8dfe2dc3d774c5ec73bba Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Thu, 1 Nov 2018 22:09:13 -0400 Subject: [PATCH 1/4] NIFI-5604: Added property to allow empty FlowFile when no SQL generated by GenerateTableFetch --- .../standard/GenerateTableFetch.java | 93 +++++++++++-------- .../standard/TestGenerateTableFetch.java | 31 +++++++ 2 files changed, 87 insertions(+), 37 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index a5473938fa0e..adacb3caea74 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -142,6 +142,18 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS = new PropertyDescriptor.Builder() + .name("gen-table-output-flowfile-on-zero-results") + .displayName("Output Empty FlowFile on Zero Results") + .description("Depending on the specified properties, an execution of this processor may not result in any SQL statements generated. When this property " + + "is true, an empty flow file will be generated (having the parent of the incoming flow file if present) and transferred to the 'success' relationship. " + + "When this property is false, no output flow files will be generated.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. " @@ -164,6 +176,7 @@ public GenerateTableFetch() { pds.add(PARTITION_SIZE); pds.add(COLUMN_FOR_VALUE_PARTITIONING); pds.add(WHERE_CLAUSE); + pds.add(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS); propDescriptors = Collections.unmodifiableList(pds); } @@ -247,6 +260,7 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory final String columnForPartitioning = context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue(); final boolean useColumnValsForPaging = !StringUtils.isEmpty(columnForPartitioning); final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue(); + final boolean outputEmptyFlowFileOnZeroResults = context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean(); final StateManager stateManager = context.getStateManager(); final StateMap stateMap; @@ -434,48 +448,53 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); } - // Generate SQL statements to read "pages" of data - Long limit = partitionSize == 0 ? null : (long) partitionSize; - final String fragmentIdentifier = UUID.randomUUID().toString(); - for (long i = 0; i < numberOfFetches; i++) { - // Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit) - if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) { - maxValueClauses.add(columnForPartitioning + " <= " + maxValueForPartitioning); - limit = null; - } + // If there are no SQL statements to be generated, still output an empty flow file if specified by the user + if (numberOfFetches == 0 && outputEmptyFlowFileOnZeroResults) { + session.transfer((fileToProcess == null) ? session.create() : session.create(fileToProcess), REL_SUCCESS); + } else { + // Generate SQL statements to read "pages" of data + Long limit = partitionSize == 0 ? null : (long) partitionSize; + final String fragmentIdentifier = UUID.randomUUID().toString(); + for (long i = 0; i < numberOfFetches; i++) { + // Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit) + if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) { + maxValueClauses.add(columnForPartitioning + " <= " + maxValueForPartitioning); + limit = null; + } - //Update WHERE list to include new right hand boundaries - whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND "); + //Update WHERE list to include new right hand boundaries + whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND "); - Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0); + Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0); - final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); - final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning); - FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); - sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); - Map attributesToAdd = new HashMap<>(); + final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); + final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning); + FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); + sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); + Map attributesToAdd = new HashMap<>(); - attributesToAdd.put("generatetablefetch.tableName", tableName); - if (columnNames != null) { - attributesToAdd.put("generatetablefetch.columnNames", columnNames); - } - if (StringUtils.isNotBlank(whereClause)) { - attributesToAdd.put("generatetablefetch.whereClause", whereClause); - } - if (StringUtils.isNotBlank(maxColumnNames)) { - attributesToAdd.put("generatetablefetch.maxColumnNames", maxColumnNames); - } - attributesToAdd.put("generatetablefetch.limit", String.valueOf(limit)); - if (partitionSize != 0) { - attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset)); - } - // Add fragment attributes - attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier); - attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i)); - attributesToAdd.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches)); + attributesToAdd.put("generatetablefetch.tableName", tableName); + if (columnNames != null) { + attributesToAdd.put("generatetablefetch.columnNames", columnNames); + } + if (StringUtils.isNotBlank(whereClause)) { + attributesToAdd.put("generatetablefetch.whereClause", whereClause); + } + if (StringUtils.isNotBlank(maxColumnNames)) { + attributesToAdd.put("generatetablefetch.maxColumnNames", maxColumnNames); + } + attributesToAdd.put("generatetablefetch.limit", String.valueOf(limit)); + if (partitionSize != 0) { + attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset)); + } + // Add fragment attributes + attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier); + attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i)); + attributesToAdd.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches)); - sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd); - session.transfer(sqlFlowFile, REL_SUCCESS); + sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd); + session.transfer(sqlFlowFile, REL_SUCCESS); + } } if (fileToProcess != null) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 6e0c3971e58b..d4aa46c6339a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -543,6 +543,37 @@ public void testOnePartition() throws ClassNotFoundException, SQLException, Init runner.clearTransferState(); } + @Test + public void testFlowFileGeneratedOnZeroResults() throws SQLException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(false); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID"); + // Set partition size to 0 so we can see that the flow file gets all rows + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "1"); + runner.setProperty(GenerateTableFetch.OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS, "false"); + + runner.run(); + runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 0); + runner.clearTransferState(); + + runner.setProperty(GenerateTableFetch.OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS, "true"); + runner.run(); + runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1); + } + @Test public void testMultiplePartitions() throws ClassNotFoundException, SQLException, InitializationException, IOException { From 039309702f234acb90baddb23ac21ee29ebce60c Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Tue, 13 Nov 2018 13:45:20 -0500 Subject: [PATCH 2/4] NIFI-5604: Incorporated review comments --- .../standard/GenerateTableFetch.java | 28 ++++++++++++++++++- .../standard/TestGenerateTableFetch.java | 11 ++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index adacb3caea74..c8ae7d895589 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -450,7 +450,33 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory // If there are no SQL statements to be generated, still output an empty flow file if specified by the user if (numberOfFetches == 0 && outputEmptyFlowFileOnZeroResults) { - session.transfer((fileToProcess == null) ? session.create() : session.create(fileToProcess), REL_SUCCESS); + FlowFile emptyFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); + Map attributesToAdd = new HashMap<>(); + + attributesToAdd.put("generatetablefetch.tableName", tableName); + if (columnNames != null) { + attributesToAdd.put("generatetablefetch.columnNames", columnNames); + } + whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND "); + if (StringUtils.isNotBlank(whereClause)) { + attributesToAdd.put("generatetablefetch.whereClause", whereClause); + } + final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); + if (StringUtils.isNotBlank(maxColumnNames)) { + attributesToAdd.put("generatetablefetch.maxColumnNames", maxColumnNames); + } + attributesToAdd.put("generatetablefetch.limit", null); + if (partitionSize != 0) { + attributesToAdd.put("generatetablefetch.offset", null); + } + // Add fragment attributes + final String fragmentIdentifier = UUID.randomUUID().toString(); + attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier); + attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(0)); + attributesToAdd.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches)); + + emptyFlowFile = session.putAllAttributes(emptyFlowFile, attributesToAdd); + session.transfer(emptyFlowFile, REL_SUCCESS); } else { // Generate SQL statements to read "pages" of data Long limit = partitionSize == 0 ? null : (long) partitionSize; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index d4aa46c6339a..1b089a3e1bc9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -54,6 +54,7 @@ import static org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor.REL_SUCCESS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; @@ -560,6 +561,7 @@ public void testFlowFileGeneratedOnZeroResults() throws SQLException { runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); runner.setIncomingConnection(false); + runner.setProperty(GenerateTableFetch.COLUMN_NAMES, "ID,BUCKET"); runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID"); // Set partition size to 0 so we can see that the flow file gets all rows runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "1"); @@ -572,6 +574,15 @@ public void testFlowFileGeneratedOnZeroResults() throws SQLException { runner.setProperty(GenerateTableFetch.OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS, "true"); runner.run(); runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0); + assertEquals("TEST_QUERY_DB_TABLE", flowFile.getAttribute("generatetablefetch.tableName")); + assertEquals("ID,BUCKET", flowFile.getAttribute("generatetablefetch.columnNames")); + assertEquals("1=1", flowFile.getAttribute("generatetablefetch.whereClause")); + assertEquals("ID", flowFile.getAttribute("generatetablefetch.maxColumnNames")); + assertNull(flowFile.getAttribute("generatetablefetch.limit")); + assertNull(flowFile.getAttribute("generatetablefetch.offset")); + assertEquals("0", flowFile.getAttribute("fragment.index")); + assertEquals("0", flowFile.getAttribute("fragment.count")); } @Test From f30fdd916f17190ccd3ae4c858f3d902c0667d98 Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Wed, 14 Nov 2018 09:33:43 -0700 Subject: [PATCH 3/4] NIFI-5604 Refactor code to for better code reuse. Signed-off-by: Peter Wicks --- .../standard/GenerateTableFetch.java | 52 +++++++++---------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index c8ae7d895589..493d80af9fc6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -448,39 +448,45 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); } + // Generate SQL statements to read "pages" of data + final String fragmentIdentifier = UUID.randomUUID().toString(); + List flowFilesToTransfer = new ArrayList<>(); + + Map baseAttributes = new HashMap<>(); + baseAttributes.put("generatetablefetch.tableName", tableName); + if (columnNames != null) { + baseAttributes.put("generatetablefetch.columnNames", columnNames); + } + + final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); + if (StringUtils.isNotBlank(maxColumnNames)) { + baseAttributes.put("generatetablefetch.maxColumnNames", maxColumnNames); + } + + baseAttributes.put(FRAGMENT_ID, fragmentIdentifier); + baseAttributes.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches)); + // If there are no SQL statements to be generated, still output an empty flow file if specified by the user if (numberOfFetches == 0 && outputEmptyFlowFileOnZeroResults) { FlowFile emptyFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); Map attributesToAdd = new HashMap<>(); - attributesToAdd.put("generatetablefetch.tableName", tableName); - if (columnNames != null) { - attributesToAdd.put("generatetablefetch.columnNames", columnNames); - } whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND "); if (StringUtils.isNotBlank(whereClause)) { attributesToAdd.put("generatetablefetch.whereClause", whereClause); } - final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); - if (StringUtils.isNotBlank(maxColumnNames)) { - attributesToAdd.put("generatetablefetch.maxColumnNames", maxColumnNames); - } attributesToAdd.put("generatetablefetch.limit", null); if (partitionSize != 0) { attributesToAdd.put("generatetablefetch.offset", null); } // Add fragment attributes - final String fragmentIdentifier = UUID.randomUUID().toString(); - attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier); attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(0)); - attributesToAdd.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches)); + attributesToAdd.putAll(baseAttributes); emptyFlowFile = session.putAllAttributes(emptyFlowFile, attributesToAdd); - session.transfer(emptyFlowFile, REL_SUCCESS); + flowFilesToTransfer.add(emptyFlowFile); } else { - // Generate SQL statements to read "pages" of data Long limit = partitionSize == 0 ? null : (long) partitionSize; - final String fragmentIdentifier = UUID.randomUUID().toString(); for (long i = 0; i < numberOfFetches; i++) { // Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit) if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) { @@ -490,39 +496,31 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory //Update WHERE list to include new right hand boundaries whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND "); - Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0); - final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", "); - final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning); + final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning); FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); Map attributesToAdd = new HashMap<>(); - attributesToAdd.put("generatetablefetch.tableName", tableName); - if (columnNames != null) { - attributesToAdd.put("generatetablefetch.columnNames", columnNames); - } if (StringUtils.isNotBlank(whereClause)) { attributesToAdd.put("generatetablefetch.whereClause", whereClause); } - if (StringUtils.isNotBlank(maxColumnNames)) { - attributesToAdd.put("generatetablefetch.maxColumnNames", maxColumnNames); - } attributesToAdd.put("generatetablefetch.limit", String.valueOf(limit)); if (partitionSize != 0) { attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset)); } // Add fragment attributes - attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier); attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i)); - attributesToAdd.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches)); + attributesToAdd.putAll(baseAttributes); sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd); - session.transfer(sqlFlowFile, REL_SUCCESS); + flowFilesToTransfer.add(sqlFlowFile); } } + session.transfer(flowFilesToTransfer, REL_SUCCESS); + if (fileToProcess != null) { session.remove(fileToProcess); } From e1ce90eeccea970fff7185a2f53b3c9b43f1935f Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Wed, 14 Nov 2018 15:21:50 -0500 Subject: [PATCH 4/4] NIFI-5604: more review comments --- .../processors/standard/GenerateTableFetch.java | 13 +++++-------- .../processors/standard/TestGenerateTableFetch.java | 5 ++++- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index 493d80af9fc6..b4ba9fee3061 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -472,9 +472,8 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory Map attributesToAdd = new HashMap<>(); whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND "); - if (StringUtils.isNotBlank(whereClause)) { - attributesToAdd.put("generatetablefetch.whereClause", whereClause); - } + attributesToAdd.put("generatetablefetch.whereClause", whereClause); + attributesToAdd.put("generatetablefetch.limit", null); if (partitionSize != 0) { attributesToAdd.put("generatetablefetch.offset", null); @@ -498,15 +497,13 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND "); Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0); - final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning); + final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset, columnForPartitioning); FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); Map attributesToAdd = new HashMap<>(); - if (StringUtils.isNotBlank(whereClause)) { - attributesToAdd.put("generatetablefetch.whereClause", whereClause); - } - attributesToAdd.put("generatetablefetch.limit", String.valueOf(limit)); + attributesToAdd.put("generatetablefetch.whereClause", whereClause); + attributesToAdd.put("generatetablefetch.limit", (limit == null) ? null : limit.toString()); if (partitionSize != 0) { attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset)); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 1b089a3e1bc9..8ccca2c4ec53 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -540,7 +540,10 @@ public void testOnePartition() throws ClassNotFoundException, SQLException, Init runner.run(); runner.assertAllFlowFilesTransferred(GenerateTableFetch.REL_SUCCESS, 1); - runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0).assertContentEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID"); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0); + flowFile.assertContentEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID <= 2 ORDER BY ID"); + flowFile.assertAttributeExists("generatetablefetch.limit"); + flowFile.assertAttributeEquals("generatetablefetch.limit", null); runner.clearTransferState(); }