From 633e8c36ed18a6c0f691258a678ce99d03bd2d8d Mon Sep 17 00:00:00 2001 From: patricker Date: Fri, 29 Jun 2018 14:10:14 -0600 Subject: [PATCH] NIFI-1251 ExecuteSQL Max Rows and Output Batching --- .../nifi/processors/standard/ExecuteSQL.java | 173 ++++++++++++++---- .../standard/QueryDatabaseTable.java | 13 +- .../processors/standard/util/JdbcCommon.java | 1 - .../processors/standard/TestExecuteSQL.java | 93 ++++++++++ 4 files changed, 236 insertions(+), 44 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 66c23389f049..97763c2d8a7f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -46,6 +47,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -96,7 +98,17 @@ @WritesAttribute(attribute="executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"), @WritesAttribute(attribute="executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"), @WritesAttribute(attribute="executesql.resultset.index", description = "Assuming multiple result sets are returned, " - + "the zero based index of this result set.") + + "the zero based index of this result set."), + @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced") }) public class ExecuteSQL extends AbstractProcessor { @@ -106,6 +118,10 @@ public class ExecuteSQL extends AbstractProcessor { public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime"; public static final String RESULTSET_INDEX = "executesql.resultset.index"; + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); + // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -146,6 +162,31 @@ public class ExecuteSQL extends AbstractProcessor { .sensitive(false) .build(); + public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() + .name("esql-max-rows") + .displayName("Max Rows Per Flow File") + .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large " + + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("esql-output-batch-size") + .displayName("Output Batch Size") + .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows " + + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles " + + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will " + + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this " + + "property is set.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + private final List propDescriptors; public ExecuteSQL() { @@ -162,6 +203,8 @@ public ExecuteSQL() { pds.add(USE_AVRO_LOGICAL_TYPES); pds.add(DEFAULT_PRECISION); pds.add(DEFAULT_SCALE); + pds.add(MAX_ROWS_PER_FLOW_FILE); + pds.add(OUTPUT_BATCH_SIZE); propDescriptors = Collections.unmodifiableList(pds); } @@ -200,11 +243,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } + final List resultSetFlowFiles = new ArrayList<>(); + final ComponentLog logger = getLogger(); final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean(); + final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField; final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger(); final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger(); final String selectQuery; @@ -227,6 +275,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session JdbcCommon.setParameters(st, fileToProcess.getAttributes()); } logger.debug("Executing query {}", new Object[]{selectQuery}); + + int fragmentIndex=0; + final String fragmentId = UUID.randomUUID().toString(); + final StopWatch executionTime = new StopWatch(true); boolean hasResults = st.execute(); @@ -238,47 +290,84 @@ public void onTrigger(final ProcessContext context, final ProcessSession session while(hasResults || hasUpdateCount) { //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet if (hasResults) { - FlowFile resultSetFF; - if (fileToProcess == null) { - resultSetFF = session.create(); - } else { - resultSetFF = session.create(fileToProcess); - resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes()); - } - - final StopWatch fetchTime = new StopWatch(true); - final AtomicLong nrOfRows = new AtomicLong(0L); - resultSetFF = session.write(resultSetFF, out -> { - try { - - final ResultSet resultSet = st.getResultSet(); - final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() - .convertNames(convertNamesForAvro) - .useLogicalTypes(useAvroLogicalTypes) - .defaultPrecision(defaultPrecision) - .defaultScale(defaultScale) - .build(); - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); - } catch (final SQLException e) { - throw new ProcessException(e); - } - }); - - long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS); - // set attribute how many rows were selected - resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed)); - resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed)); - resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed)); - resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); - resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount)); - - logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{resultSetFF, nrOfRows.get()}); - session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed); - session.transfer(resultSetFF, REL_SUCCESS); + try { + final ResultSet resultSet = st.getResultSet(); + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() + .convertNames(convertNamesForAvro) + .useLogicalTypes(useAvroLogicalTypes) + .defaultPrecision(defaultPrecision) + .defaultScale(defaultScale) + .maxRows(maxRowsPerFlowFile) + .build(); + + do { + final StopWatch fetchTime = new StopWatch(true); + + FlowFile resultSetFF; + if (fileToProcess == null) { + resultSetFF = session.create(); + } else { + resultSetFF = session.create(fileToProcess); + resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes()); + } + + resultSetFF = session.write(resultSetFF, out -> { + try { + nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); + } catch (SQLException e) { + throw new ProcessException(e); + } + }); + + long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS); + + // set attribute how many rows were selected + resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed)); + resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed)); + resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed)); + resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); + resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount)); + + // if fragemented ResultSet, determine if we should keep this fragment; set fragment attributes + if(maxRowsPerFlowFile > 0) { + // if row count is zero and this is not the first fragment, drop it instead of committing it. + if(nrOfRows.get() == 0 && fragmentIndex > 0) { + session.remove(resultSetFF); + break; + } + + resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId); + resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex)); + } + + logger.info("{} contains {} Avro records; transferring to 'success'", + new Object[]{resultSetFF, nrOfRows.get()}); + session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed); + resultSetFlowFiles.add(resultSetFF); + + // If we've reached the batch size, send out the flow files + if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) { + session.transfer(resultSetFlowFiles, REL_SUCCESS); + session.commit(); + resultSetFlowFiles.clear(); + } + + fragmentIndex++; + } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile); + + // If we are splitting results but not outputting batches, set count on all FlowFiles + if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) { + for (int i = 0; i < resultSetFlowFiles.size(); i++) { + resultSetFlowFiles.set(i, + session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex))); + } + } + } catch (final SQLException e) { + throw new ProcessException(e); + } resultCount++; } @@ -293,6 +382,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } + // Transfer any remaining files to SUCCESS + session.transfer(resultSetFlowFiles, REL_SUCCESS); + resultSetFlowFiles.clear(); + //If we had at least one result then it's OK to drop the original file, but if we had no results then // pass the original flow file down the line to trigger downstream processors if(fileToProcess != null){ diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index b245f5750d89..5c210c4907d1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -38,6 +38,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -116,6 +117,9 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { public static final String RESULT_TABLENAME = "tablename"; public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() .name("Fetch Size") .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be " @@ -355,8 +359,8 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory fileToProcess = session.putAttribute(fileToProcess, RESULT_TABLENAME, tableName); fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); if(maxRowsPerFlowFile > 0) { - fileToProcess = session.putAttribute(fileToProcess, "fragment.identifier", fragmentIdentifier); - fileToProcess = session.putAttribute(fileToProcess, "fragment.index", String.valueOf(fragmentIndex)); + fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_ID, fragmentIdentifier); + fileToProcess = session.putAttribute(fileToProcess, FRAGMENT_INDEX, String.valueOf(fragmentIndex)); } logger.info("{} contains {} Avro records; transferring to 'success'", @@ -373,7 +377,10 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory } else { // If there were no rows returned, don't send the flowfile session.remove(fileToProcess); - context.yield(); + // If no rows and this was first FlowFile, yield + if(fragmentIndex == 0){ + context.yield(); + } break; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java index 260a94ae7805..166a81c30386 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java @@ -176,7 +176,6 @@ public class JdbcCommon { .required(true) .build(); - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException { return convertToAvroStream(rs, outStream, null, null, convertNames); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index a5d8f45f09ca..85d98d7215ef 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.standard; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; @@ -35,7 +36,9 @@ import org.apache.avro.io.DatumReader; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.processors.standard.util.TestJdbcHugeStream; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; @@ -192,6 +195,96 @@ public void testWithNullIntColumn() throws SQLException { runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0).assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0"); } + @Test + public void testWithOutputBatching() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + for (int i = 0; i < 1000; i++) { + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)"); + } + + runner.setIncomingConnection(false); + runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5"); + runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "5"); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT"); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200); + runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key()); + runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key()); + + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); + + firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5"); + firstFlowFile.assertAttributeNotExists(FragmentAttributes.FRAGMENT_COUNT.key()); + firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0"); + firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0"); + + MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199); + + lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5"); + lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199"); + lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0"); + } + + @Test + public void testMaxRowsPerFlowFile() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + for (int i = 0; i < 1000; i++) { + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (" + i + ", 1, 1)"); + } + + runner.setIncomingConnection(false); + runner.setProperty(ExecuteSQL.MAX_ROWS_PER_FLOW_FILE, "5"); + runner.setProperty(ExecuteSQL.OUTPUT_BATCH_SIZE, "0"); + runner.setProperty(ExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT"); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 200); + runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_INDEX.key()); + runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_ID.key()); + runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, FragmentAttributes.FRAGMENT_COUNT.key()); + + MockFlowFile firstFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); + + firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5"); + firstFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "0"); + firstFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0"); + + MockFlowFile lastFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(199); + + lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULT_ROW_COUNT, "5"); + lastFlowFile.assertAttributeEquals(FragmentAttributes.FRAGMENT_INDEX.key(), "199"); + lastFlowFile.assertAttributeEquals(ExecuteSQL.RESULTSET_INDEX, "0"); + } + @Test public void testInsertStatementCreatesFlowFile() throws SQLException { // remove previous test database, if any