From e678b7674af251091b2b651c7c32571416aeb781 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Mon, 9 Jan 2017 13:37:03 -0500 Subject: [PATCH 1/4] NIFI-2881: Added EL support to DB Fetch processors, allow incoming flowfiles to GenerateTableFetch --- .../AbstractDatabaseFetchProcessor.java | 16 +++- .../standard/GenerateTableFetch.java | 89 +++++++++++++------ .../standard/QueryDatabaseTable.java | 17 ++-- .../standard/QueryDatabaseTableTest.java | 9 +- .../standard/TestGenerateTableFetch.java | 84 +++++++++++++++++ 5 files changed, 179 insertions(+), 36 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index eda93287e3b1..c0d6c10dfa81 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -24,6 +24,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.util.StringUtils; import java.io.IOException; import java.math.BigDecimal; @@ -97,6 +98,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .description("The name of the database table to be queried.") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor COLUMN_NAMES = new PropertyDescriptor.Builder() @@ -106,6 +108,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact + "column names are supplied, all columns in the specified table will be returned.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_VALUE_COLUMN_NAMES = new PropertyDescriptor.Builder() @@ -120,6 +123,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact + "are provided, all rows from the table will be considered, which could have a performance impact.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() @@ -129,6 +133,7 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .defaultValue("0 seconds") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() @@ -167,10 +172,17 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact } public void setup(final ProcessContext context) { + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + + // If there are no max-value column names specified, we don't need to perform this processing + if (StringUtils.isEmpty(maxValueColumnNames)) { + return; + } + // Try to fill the columnTypeMap with the types of the desired max-value columns final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - final String tableName = context.getProperty(TABLE_NAME).getValue(); - final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); + final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); try (final Connection con = dbcpService.getConnection(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index bff1024ae112..d27a23c35967 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -60,13 +60,19 @@ @TriggerSerially -@InputRequirement(Requirement.INPUT_FORBIDDEN) +@InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"}) -@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class}) +@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class, ListDatabaseTables.class}) @CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, " + "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, " + "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This " - + "processor is intended to be run on the Primary Node only.") + + "processor is intended to be run on the Primary Node only.\n\n" + + "This processor can accept incoming connections; the behavior of the processor differs slightly whether incoming connections are provided:\n" + + " - If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many " + + "fields, but no flow file attributes are available. However the properties will be evaluated using the Variable Registry.\n" + + " - If incoming connection(s) are specified and no flow file is available to a processor task, no work will be performed.\n" + + " - If incoming connection(s) are specified and a flow file is available to a processor task, the flow file's attributes may be used in Expression Language for such fields " + + "as Table Name and others. However, the Max-Value Columns field must be empty, or an error will be reported when the processor is scheduled to run.") @Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for " + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor " + "to fetch only those records that have max values greater than the retained values. This can be used for " @@ -83,13 +89,20 @@ public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { + "in the table.") .defaultValue("10000") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. " + + "If no incoming connection(s) are specified, this relationship is unused.") + .build(); + public GenerateTableFetch() { final Set r = new HashSet<>(); r.add(REL_SUCCESS); + r.add(REL_FAILURE); relationships = Collections.unmodifiableSet(r); final List pds = new ArrayList<>(); @@ -115,20 +128,36 @@ protected List getSupportedPropertyDescriptors() { @OnScheduled public void setup(final ProcessContext context) { + // The processor is invalid if there is an incoming connection and max-value columns are defined + if (context.getProperty(MAX_VALUE_COLUMN_NAMES).isSet() && context.hasIncomingConnection()) { + throw new ProcessException("If an incoming connection is supplied, no max-value column names may be specified"); + } super.setup(context); } @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { ProcessSession session = sessionFactory.createSession(); + + FlowFile fileToProcess = null; + if (context.hasIncomingConnection()) { + fileToProcess = session.get(); + + if (fileToProcess == null) { + // Incoming connection with no flow file available, do no work (see capability description) + return; + } + } + final ComponentLog logger = getLogger(); final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); - final String tableName = context.getProperty(TABLE_NAME).getValue(); - final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); - final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); - final int partitionSize = context.getProperty(PARTITION_SIZE).asInteger(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue(); + final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue(); + // Max-value column names can only be specified if there is no incoming connection, so fileToProcess must be null + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger(); final StateManager stateManager = context.getStateManager(); final StateMap stateMap; @@ -186,7 +215,7 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { - final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds logger.debug("Executing {}", new Object[]{selectQuery}); @@ -217,25 +246,35 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory // Something is very wrong here, one row (even if count is zero) should be returned throw new SQLException("No rows returned from metadata query: " + selectQuery); } - } catch (SQLException e) { - logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); - throw new ProcessException(e); - } - final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); + final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); + + // Generate SQL statements to read "pages" of data + for (int i = 0; i < numberOfFetches; i++) { + FlowFile sqlFlowFile; + + Integer limit = partitionSize == 0 ? null : partitionSize; + Integer offset = partitionSize == 0 ? null : i * partitionSize; + final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset); + sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); + sqlFlowFile = session.write(sqlFlowFile, out -> { + out.write(query.getBytes()); + }); + session.transfer(sqlFlowFile, REL_SUCCESS); + } - // Generate SQL statements to read "pages" of data - for (int i = 0; i < numberOfFetches; i++) { - FlowFile sqlFlowFile; + if (fileToProcess != null) { + session.remove(fileToProcess); + } + } catch (SQLException e) { + if (fileToProcess != null) { + logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess}); + session.transfer(fileToProcess, REL_FAILURE); - Integer limit = partitionSize == 0 ? null : partitionSize; - Integer offset = partitionSize == 0 ? null : i * partitionSize; - final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset); - sqlFlowFile = session.create(); - sqlFlowFile = session.write(sqlFlowFile, out -> { - out.write(query.getBytes()); - }); - session.transfer(sqlFlowFile, REL_SUCCESS); + } else { + logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); + throw new ProcessException(e); + } } session.commit(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 2fddb87dd545..cb8fd022205a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -107,6 +107,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .defaultValue("0") .required(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() @@ -117,6 +118,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .defaultValue("0") .required(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder() @@ -127,6 +129,7 @@ public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor { .defaultValue("0") .required(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) .build(); public QueryDatabaseTable() { @@ -184,13 +187,13 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); - final String tableName = context.getProperty(TABLE_NAME).getValue(); - final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); - final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); - final Integer fetchSize = context.getProperty(FETCH_SIZE).asInteger(); - final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).asInteger(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); + final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger(); + final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet() - ? context.getProperty(MAX_FRAGMENTS).asInteger() + ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger() : 0; final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); @@ -247,7 +250,7 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly } - final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue(); st.setQueryTimeout(queryTimeout); // timeout in seconds try { logger.debug("Executing query {}", new Object[]{selectQuery}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 3353a8725f0d..77a87a4e0ed1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -72,6 +72,8 @@ public class QueryDatabaseTableTest { private final static String DB_LOCATION = "target/db_qdt"; private DatabaseAdapter dbAdapter; private HashMap origDbAdapters; + private final static String TABLE_NAME_KEY = "tableName"; + private final static String MAX_ROWS_KEY = "maxRows"; @BeforeClass @@ -460,6 +462,7 @@ public void testWithRuntimeException() throws SQLException { runner.setIncomingConnection(false); runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT"); + runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id"); QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new GenericDatabaseAdapter() { @Override @@ -521,7 +524,8 @@ public void testMaxRowsPerFlowFile() throws ClassNotFoundException, SQLException runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); runner.setIncomingConnection(false); runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "ID"); - runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "9");//Using a non-round number to make sure the last file is ragged + runner.setProperty(QueryDatabaseTable.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}"); + runner.setVariable(MAX_ROWS_KEY, "9"); runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 12); @@ -675,7 +679,8 @@ public void testInitialMaxValue() throws ClassNotFoundException, SQLException, I cal.add(Calendar.MINUTE, 1); } - runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setProperty(QueryDatabaseTable.TABLE_NAME, "${" + TABLE_NAME_KEY + "}"); + runner.setVariable(TABLE_NAME_KEY, "TEST_QUERY_DB_TABLE"); runner.setIncomingConnection(false); runner.setProperty(QueryDatabaseTable.MAX_VALUE_COLUMN_NAMES, "created_on"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 8a8aa01220ce..9f0206ec68e7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -251,6 +251,90 @@ public void testMultiplePartitions() throws ClassNotFoundException, SQLException runner.clearTransferState(); } + @Test(expected = AssertionError.class) + public void testErrorWhenIncomingConnectionAndMaxValueColumns() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID, BUCKET"); + runner.run(); + } + + @Test + public void testMultiplePartitionsIncomingFlowFiles() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE1"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE1 (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE1 (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE1 (id, bucket) VALUES (1, 0)"); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE2"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE2 (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE2 (id, bucket) VALUES (0, 0)"); + + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "${partSize}"); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE1"); + put("partSize", "1"); + }}); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE2"); + put("partSize", "2"); + }}); + + // The table does not exist, expect the original flow file to be routed to failure + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE3"); + put("partSize", "1"); + }}); + + runner.run(3); + runner.assertTransferCount(AbstractDatabaseFetchProcessor.REL_SUCCESS, 3); + + // Two records from table 1 + assertEquals(runner.getFlowFilesForRelationship(AbstractDatabaseFetchProcessor.REL_SUCCESS).stream().filter( + (ff) -> "TEST_QUERY_DB_TABLE1".equals(ff.getAttribute("tableName"))).count(), + 2); + + // One record from table 2 + assertEquals(runner.getFlowFilesForRelationship(AbstractDatabaseFetchProcessor.REL_SUCCESS).stream().filter( + (ff) -> "TEST_QUERY_DB_TABLE2".equals(ff.getAttribute("tableName"))).count(), + 1); + + // Table 3 doesn't exist, should be routed to failure + runner.assertTransferCount(GenerateTableFetch.REL_FAILURE, 1); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE1"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE2"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + } + /** * Simple implementation only for ListDatabaseTables processor testing. From bce79c412c94d5155e18fc0169e9460116150fdf Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Mon, 23 Jan 2017 14:24:46 -0500 Subject: [PATCH 2/4] NIFI-2881: Incorporated review comments/discussions --- .../AbstractDatabaseFetchProcessor.java | 48 ++++- .../standard/GenerateTableFetch.java | 63 ++++-- .../standard/QueryDatabaseTable.java | 49 ++++- .../standard/QueryDatabaseTableTest.java | 10 +- .../standard/TestGenerateTableFetch.java | 201 +++++++++++++++++- 5 files changed, 321 insertions(+), 50 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index c0d6c10dfa81..b2f23d74c8d0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -17,6 +17,8 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; @@ -37,6 +39,7 @@ import java.text.DecimalFormat; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -105,7 +108,8 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .name("Columns to Return") .description("A comma-separated list of column names to be used in the query. If your database requires " + "special treatment of the names (quoting, e.g.), each name should include such treatment. If no " - + "column names are supplied, all columns in the specified table will be returned.") + + "column names are supplied, all columns in the specified table will be returned.\nNOTE: If Expression Language is present " + + "for this property and it refers to flow file attribute(s), then the Table Name property must also contain Expression Language.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -120,7 +124,8 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact + "can be used to retrieve only those rows that have been added/updated since the last retrieval. Note that some " + "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these " + "types should not be listed in this property, and will result in error(s) during processing. If no columns " - + "are provided, all rows from the table will be considered, which could have a performance impact.") + + "are provided, all rows from the table will be considered, which could have a performance impact.\nNOTE: If Expression Language is " + + "present for this property and it refers to flow file attribute(s), then the Table Name property must also contain Expression Language.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -148,11 +153,24 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact protected List propDescriptors; + // The delimiter to use when referencing qualified names (such as table@!@column in the state map) + protected static final String NAMESPACE_DELIMITER = "@!@"; + public static final PropertyDescriptor DB_TYPE; protected final static Map dbAdapters = new HashMap<>(); protected final Map columnTypeMap = new HashMap<>(); + // This value is set when the processor is scheduled and indicates whether the Table Name property contains Expression Language. + // It is used for backwards-compatibility purposes; if the value is false and the fully-qualified state key (table + column) is not found, + // the processor will look for a state key with just the column name. + protected volatile boolean isDynamicTableName = false; + + // This value is set when the processor is scheduled and indicates whether the Maximum Value Columns property contains Expression Language. + // It is used for backwards-compatibility purposes; if the table name and max-value columns are static, then the column types can be + // pre-fetched when the processor is scheduled, rather than having to populate them on-the-fly. + protected volatile boolean isDynamicMaxValues = false; + private static SimpleDateFormat TIME_TYPE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS"); static { @@ -171,6 +189,15 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .build(); } + // A common validation procedure for DB fetch processors, it stores whether the Table Name and/or Max Value Column properties have expression language + protected Collection customValidate(ValidationContext validationContext) { + // For backwards-compatibility, keep track of whether the table name and max-value column properties are dynamic (i.e. has expression language) + isDynamicTableName = validationContext.isExpressionLanguagePresent(validationContext.getProperty(TABLE_NAME).getValue()); + isDynamicMaxValues = validationContext.isExpressionLanguagePresent(validationContext.getProperty(MAX_VALUE_COLUMN_NAMES).getValue()); + + return super.customValidate(validationContext); + } + public void setup(final ProcessContext context) { final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); @@ -184,7 +211,6 @@ public void setup(final ProcessContext context) { final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); - try (final Connection con = dbcpService.getConnection(); final Statement st = con.createStatement()) { @@ -199,10 +225,10 @@ public void setup(final ProcessContext context) { columnTypeMap.clear(); for (int i = 1; i <= numCols; i++) { String colName = resultSetMetaData.getColumnName(i).toLowerCase(); + String colKey = getStateKey(tableName, colName); int colType = resultSetMetaData.getColumnType(i); - columnTypeMap.put(colName, colType); + columnTypeMap.putIfAbsent(colKey, colType); } - } else { throw new ProcessException("No columns found in table from those specified: " + maxValueColumnNames); } @@ -390,4 +416,16 @@ protected static String getLiteralByType(int type, String value, String database return value; } } + + protected static String getStateKey(String prefix, String columnName) { + StringBuilder sb = new StringBuilder(); + if (prefix != null) { + sb.append(prefix.toLowerCase()); + sb.append(NAMESPACE_DELIMITER); + } + if (columnName != null) { + sb.append(columnName.toLowerCase()); + } + return sb.toString(); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index d27a23c35967..df03d15c7bc7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -26,6 +26,8 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; @@ -49,6 +51,7 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -67,12 +70,12 @@ + "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, " + "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This " + "processor is intended to be run on the Primary Node only.\n\n" - + "This processor can accept incoming connections; the behavior of the processor differs slightly whether incoming connections are provided:\n" + + "This processor can accept incoming connections; the behavior of the processor is different whether incoming connections are provided:\n" + " - If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many " + "fields, but no flow file attributes are available. However the properties will be evaluated using the Variable Registry.\n" + " - If incoming connection(s) are specified and no flow file is available to a processor task, no work will be performed.\n" + " - If incoming connection(s) are specified and a flow file is available to a processor task, the flow file's attributes may be used in Expression Language for such fields " - + "as Table Name and others. However, the Max-Value Columns field must be empty, or an error will be reported when the processor is scheduled to run.") + + "as Table Name and others. However, the Max-Value Columns and Columns to Return fields must be empty or refer to columns that are available in each specified table.") @Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for " + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor " + "to fetch only those records that have max values greater than the retained values. This can be used for " @@ -126,13 +129,17 @@ protected List getSupportedPropertyDescriptors() { return propDescriptors; } + @Override + protected Collection customValidate(ValidationContext validationContext) { + return super.customValidate(validationContext); + } + @OnScheduled public void setup(final ProcessContext context) { - // The processor is invalid if there is an incoming connection and max-value columns are defined - if (context.getProperty(MAX_VALUE_COLUMN_NAMES).isSet() && context.hasIncomingConnection()) { - throw new ProcessException("If an incoming connection is supplied, no max-value column names may be specified"); + // Pre-fetch the column types if using a static table name and max-value columns + if (!isDynamicTableName && !isDynamicMaxValues) { + super.setup(context); } - super.setup(context); } @Override @@ -155,8 +162,7 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue(); final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue(); - // Max-value column names can only be specified if there is no incoming connection, so fileToProcess must be null - final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue(); final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger(); final StateManager stateManager = context.getStateManager(); @@ -193,11 +199,20 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> { String colName = maxValueColumnNameList.get(index); maxValueSelectColumns.add("MAX(" + colName + ") " + colName); - String maxValue = statePropertyMap.get(colName.toLowerCase()); + final String fullyQualifiedStateKey = getStateKey(tableName, colName); + String maxValue = statePropertyMap.get(fullyQualifiedStateKey); + if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) { + // If the table name is static and the fully-qualified key was not found, try just the column name + maxValue = statePropertyMap.get(getStateKey(null, colName)); + } if (!StringUtils.isEmpty(maxValue)) { - Integer type = columnTypeMap.get(colName.toLowerCase()); + Integer type = columnTypeMap.get(fullyQualifiedStateKey); + if (type == null && !isDynamicTableName) { + // If the table name is static and the fully-qualified key was not found, try just the column name + type = columnTypeMap.get(getStateKey(null, colName)); + } if (type == null) { - // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled. + // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed throw new IllegalArgumentException("No column type found for: " + colName); } // Add a condition for the WHERE clause @@ -231,16 +246,30 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory ResultSetMetaData rsmd = resultSet.getMetaData(); for (int i = 2; i <= rsmd.getColumnCount(); i++) { String resultColumnName = rsmd.getColumnName(i).toLowerCase(); + String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName); + String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey); + if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) { + // If we can't find the value at the fully-qualified key name and the table name is static, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new + // maximum value is observed, it will be stored under the fully-qualified key from then on. + resultColumnCurrentMax = statePropertyMap.get(resultColumnName); + } + int type = rsmd.getColumnType(i); + if (isDynamicTableName) { + // We haven't pre-populated the column type map if the table name is dynamic, so do it here + columnTypeMap.put(fullyQualifiedStateKey, type); + } try { - String newMaxValue = getMaxValueFromRow(resultSet, i, type, statePropertyMap.get(resultColumnName.toLowerCase()), dbAdapter.getName()); + String newMaxValue = getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax, dbAdapter.getName()); if (newMaxValue != null) { - statePropertyMap.put(resultColumnName, newMaxValue); + statePropertyMap.put(fullyQualifiedStateKey, newMaxValue); } } catch (ParseException | IOException pie) { // Fail the whole thing here before we start creating flow files and such throw new ProcessException(pie); } + } } else { // Something is very wrong here, one row (even if count is zero) should be returned @@ -251,15 +280,11 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory // Generate SQL statements to read "pages" of data for (int i = 0; i < numberOfFetches; i++) { - FlowFile sqlFlowFile; - Integer limit = partitionSize == 0 ? null : partitionSize; Integer offset = partitionSize == 0 ? null : i * partitionSize; final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, StringUtils.join(maxValueColumnNameList, ", "), limit, offset); - sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); - sqlFlowFile = session.write(sqlFlowFile, out -> { - out.write(query.getBytes()); - }); + FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); + sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes())); session.transfer(sqlFlowFile, REL_SUCCESS); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index cb8fd022205a..7ca7a2c3a494 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -215,9 +215,21 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory final Map statePropertyMap = new HashMap<>(stateMap.toMap()); //If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map - for(final Map.Entry maxProp : maxValueProperties.entrySet()){ - if (!statePropertyMap.containsKey(maxProp.getKey().toLowerCase())) { - statePropertyMap.put(maxProp.getKey().toLowerCase(), maxProp.getValue()); + for (final Map.Entry maxProp : maxValueProperties.entrySet()) { + String maxPropKey = maxProp.getKey().toLowerCase(); + String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey); + if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) { + String newMaxPropValue; + // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name, + // but store the new initial max value under the fully-qualified key. + if (statePropertyMap.containsKey(maxPropKey)) { + newMaxPropValue = statePropertyMap.get(maxPropKey); + } else { + newMaxPropValue = maxProp.getValue(); + } + statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue); + } } @@ -307,7 +319,10 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory for (int i = 0; i < resultSetFlowFiles.size(); i++) { // Add maximum values as attributes for (Map.Entry entry : statePropertyMap.entrySet()) { - resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + entry.getKey(), entry.getValue())); + // Get just the column name from the key + String key = entry.getKey(); + String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length()); + resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue())); } //set count on all FlowFiles @@ -352,9 +367,16 @@ protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String co List whereClauses = new ArrayList<>(maxValColumnNames.size()); IntStream.range(0, maxValColumnNames.size()).forEach((index) -> { String colName = maxValColumnNames.get(index); - String maxValue = stateMap.get(colName.toLowerCase()); + String maxValueKey = getStateKey(tableName, colName); + String maxValue = stateMap.get(maxValueKey); + if (StringUtils.isEmpty(maxValue)) { + // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new + // maximum value is observed, it will be stored under the fully-qualified key from then on. + maxValue = stateMap.get(colName.toLowerCase()); + } if (!StringUtils.isEmpty(maxValue)) { - Integer type = columnTypeMap.get(colName.toLowerCase()); + Integer type = columnTypeMap.get(maxValueKey); if (type == null) { // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled. throw new IllegalArgumentException("No column type found for: " + colName); @@ -374,7 +396,7 @@ protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String co protected Map getDefaultMaxValueProperties(final Map properties){ - final Map defaultMaxValues = new HashMap(); + final Map defaultMaxValues = new HashMap<>(); for (final Map.Entry entry : properties.entrySet()) { final String key = entry.getKey().getName(); @@ -410,15 +432,22 @@ public void processRow(ResultSet resultSet) throws IOException { if (nrOfColumns > 0) { for (int i = 1; i <= nrOfColumns; i++) { String colName = meta.getColumnName(i).toLowerCase(); - Integer type = columnTypeMap.get(colName); + String fullyQualifiedMaxValueKey = getStateKey(meta.getTableName(i), colName); + Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey); // Skip any columns we're not keeping track of or whose value is null if (type == null || resultSet.getObject(i) == null) { continue; } - String maxValueString = newColMap.get(colName); + String maxValueString = newColMap.get(fullyQualifiedMaxValueKey); + // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new + // maximum value is observed, it will be stored under the fully-qualified key from then on. + if (StringUtils.isEmpty(maxValueString)) { + maxValueString = newColMap.get(colName); + } String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName()); if (newMaxValueString != null) { - newColMap.put(colName, newMaxValueString); + newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString); } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java index 77a87a4e0ed1..92f4757f4e5e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/QueryDatabaseTableTest.java @@ -144,13 +144,13 @@ public void testGetQuery() throws Exception { maxValues.put("id", "509"); StateManager stateManager = runner.getStateManager(); stateManager.setState(maxValues, Scope.CLUSTER); - processor.putColumnType("id", Types.INTEGER); + processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "id", Types.INTEGER); query = processor.getQuery(dbAdapter, "myTable", null, Collections.singletonList("id"), stateManager.getState(Scope.CLUSTER).toMap()); assertEquals("SELECT * FROM myTable WHERE id > 509", query); maxValues.put("date_created", "2016-03-07 12:34:56"); stateManager.setState(maxValues, Scope.CLUSTER); - processor.putColumnType("date_created", Types.TIMESTAMP); + processor.putColumnType("mytable" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "date_created", Types.TIMESTAMP); query = processor.getQuery(dbAdapter, "myTable", null, Arrays.asList("id", "DATE_CREATED"), stateManager.getState(Scope.CLUSTER).toMap()); assertEquals("SELECT * FROM myTable WHERE id > 509 AND DATE_CREATED >= '2016-03-07 12:34:56'", query); @@ -692,14 +692,14 @@ public void testInitialMaxValue() throws ClassNotFoundException, SQLException, I runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); assertEquals(4, getNumberOfRecordsFromStream(in)); - runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER); + runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER); runner.clearTransferState(); // Run again, this time no flowfiles/rows should be transferred // Validate Max Value doesn't change also runner.run(); runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 0); - runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER); + runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:09:00.0", Scope.CLUSTER); runner.clearTransferState(); // Append a new row, expect 1 flowfile one row @@ -712,7 +712,7 @@ public void testInitialMaxValue() throws ClassNotFoundException, SQLException, I runner.assertAllFlowFilesTransferred(QueryDatabaseTable.REL_SUCCESS, 1); in = new ByteArrayInputStream(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).get(0).toByteArray()); assertEquals(1, getNumberOfRecordsFromStream(in)); - runner.getStateManager().assertStateEquals("created_on", "1970-01-01 00:10:00.0", Scope.CLUSTER); + runner.getStateManager().assertStateEquals("test_query_db_table" + AbstractDatabaseFetchProcessor.NAMESPACE_DELIMITER + "created_on", "1970-01-01 00:10:00.0", Scope.CLUSTER); runner.clearTransferState(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java index 9f0206ec68e7..f79f96cbc8c0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -18,6 +18,7 @@ import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.processor.exception.ProcessException; @@ -95,7 +96,7 @@ public void setUp() throws Exception { final DBCPService dbcp = new DBCPServiceSimpleImpl(); final Map dbcpProperties = new HashMap<>(); - runner = TestRunners.newTestRunner(GenerateTableFetch.class); + runner = TestRunners.newTestRunner(processor); runner.addControllerService("dbcp", dbcp, dbcpProperties); runner.enableControllerService(dbcp); runner.setProperty(GenerateTableFetch.DBCP_SERVICE, "dbcp"); @@ -251,15 +252,6 @@ public void testMultiplePartitions() throws ClassNotFoundException, SQLException runner.clearTransferState(); } - @Test(expected = AssertionError.class) - public void testErrorWhenIncomingConnectionAndMaxValueColumns() throws ClassNotFoundException, SQLException, InitializationException, IOException { - - runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); - runner.setIncomingConnection(true); - runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "ID, BUCKET"); - runner.run(); - } - @Test public void testMultiplePartitionsIncomingFlowFiles() throws ClassNotFoundException, SQLException, InitializationException, IOException { @@ -335,9 +327,196 @@ public void testMultiplePartitionsIncomingFlowFiles() throws ClassNotFoundExcept } } + @Test + public void testBackwardsCompatibilityStateKeyStaticTableDynamicMaxValues() throws Exception { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "TEST_QUERY_DB_TABLE"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}"); + runner.enqueue("".getBytes(), new HashMap() {{ + put("maxValueCol", "id"); + }}); + + // Pre-populate the state with a key for column name (not fully-qualified) + StateManager stateManager = runner.getStateManager(); + stateManager.setState(new HashMap() {{ + put("id", "0"); + }}, Scope.CLUSTER); + + // Pre-populate the column type map with an entry for id (not fully-qualified) + processor.columnTypeMap.put("id", 4); + + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 0 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + } + + @Test + public void testBackwardsCompatibilityStateKeyDynamicTableDynamicMaxValues() throws Exception { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}"); + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + + // Pre-populate the state with a key for column name (not fully-qualified) + StateManager stateManager = runner.getStateManager(); + stateManager.setState(new HashMap() {{ + put("id", "0"); + }}, Scope.CLUSTER); + + // Pre-populate the column type map with an entry for id (not fully-qualified) + processor.columnTypeMap.put("id", 4); + + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + // Note there is no WHERE clause here. Because we are using dynamic tables, the old state key/value is not retrieved + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + + runner.clearTransferState(); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)"); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + } + + @Test + public void testBackwardsCompatibilityStateKeyDynamicTableStaticMaxValues() throws Exception { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}"); + runner.setIncomingConnection(true); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "id"); + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + }}); + + // Pre-populate the state with a key for column name (not fully-qualified) + StateManager stateManager = runner.getStateManager(); + stateManager.setState(new HashMap() {{ + put("id", "0"); + }}, Scope.CLUSTER); + + // Pre-populate the column type map with an entry for id (not fully-qualified) + processor.columnTypeMap.put("id", 4); + + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + // Note there is no WHERE clause here. Because we are using dynamic tables, the old state key/value is not retrieved + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + + runner.clearTransferState(); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (2, 0)"); + + runner.enqueue("".getBytes(), new HashMap() {{ + put("tableName", "TEST_QUERY_DB_TABLE"); + put("maxValueCol", "id"); + }}); + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id > 1 ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + } + + @Test + public void testBackwardsCompatibilityStateKeyVariableRegistry() throws Exception { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, bucket integer not null)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (0, 0)"); + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, bucket) VALUES (1, 0)"); + + runner.setProperty(GenerateTableFetch.TABLE_NAME, "${tableName}"); + runner.setIncomingConnection(false); + runner.setProperty(GenerateTableFetch.MAX_VALUE_COLUMN_NAMES, "${maxValueCol}"); + + runner.setVariable("tableName", "TEST_QUERY_DB_TABLE"); + runner.setVariable("maxValueCol", "id"); + + // Pre-populate the state with a key for column name (not fully-qualified) + StateManager stateManager = runner.getStateManager(); + stateManager.setState(new HashMap() {{ + put("id", "0"); + }}, Scope.CLUSTER); + + // Pre-populate the column type map with an entry for id (not fully-qualified) + processor.columnTypeMap.put("id", 4); + + runner.run(); + + runner.assertAllFlowFilesTransferred(REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + // Note there is no WHERE clause here. Because we are using dynamic tables (i.e. Expression Language, + // even when not referring to flow file attributes), the old state key/value is not retrieved + assertEquals("SELECT * FROM TEST_QUERY_DB_TABLE ORDER BY id FETCH NEXT 10000 ROWS ONLY", new String(flowFile.toByteArray())); + } + /** - * Simple implementation only for ListDatabaseTables processor testing. + * Simple implementation only for GenerateTableFetch processor testing. */ private class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { From 406b6abf7f1841b7cc25074b9c63c37e671cd51d Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 1 Feb 2017 09:36:16 -0500 Subject: [PATCH 3/4] NIFI-2881: Updated documentation, added error attribute to GenerateTableFetch --- .../processors/standard/GenerateTableFetch.java | 7 +++++++ .../processors/standard/QueryDatabaseTable.java | 13 +++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java index df03d15c7bc7..966c20db4bc5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -21,6 +21,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -81,6 +83,10 @@ + "to fetch only those records that have max values greater than the retained values. This can be used for " + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + "per the State Management documentation") +@WritesAttributes({ + @WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes " + + "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message.") +}) public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder() @@ -294,6 +300,7 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory } catch (SQLException e) { if (fileToProcess != null) { logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess}); + fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage()); session.transfer(fileToProcess, REL_FAILURE); } else { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 7ca7a2c3a494..1d898b4b47a9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -25,6 +25,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; @@ -70,9 +71,13 @@ @EventDriven @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"sql", "select", "jdbc", "query", "database"}) -@CapabilityDescription("Execute provided SQL select query. Query result will be converted to Avro format." - + " Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " - + "a timer or cron expression, using the standard scheduling methods. FlowFile attribute " +@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class}) +@CapabilityDescription("Generates and executes a SQL select query to fetch all rows whose values in the specified Maximum Value column(s) are larger than the " + + "previously-seen maxima. Query result will be converted to Avro format. Expression Language is supported for several properties, but no incoming " + + "connections are permitted. The Variable Registry may be used to provide values for any property containing Expression Language. If it is desired to " + + "leverage flow file attributes to perform these queries, the GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. " + + "Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on " + + "a timer or cron expression, using the standard scheduling methods. This processor is intended to be run on the Primary Node only. FlowFile attribute " + "'querydbtable.row.count' indicates how many rows were selected.") @Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for " + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor " @@ -80,7 +85,7 @@ + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + "per the State Management documentation") @WritesAttributes({ - @WritesAttribute(attribute = "querydbtable.row.count"), + @WritesAttribute(attribute = "querydbtable.row.count", description="The number of rows selected by the query"), @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), @WritesAttribute(attribute="fragment.count", description="If 'Max Rows Per Flow File' is set then this is the total number of " From 1af812ef8bfa7e736eb07673b45a064273391b37 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Thu, 2 Feb 2017 14:10:46 -0500 Subject: [PATCH 4/4] NIFI-2881: Corrected notes for column properties in fetch processors --- .../standard/AbstractDatabaseFetchProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java index b2f23d74c8d0..7728af1545f2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java @@ -108,8 +108,8 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact .name("Columns to Return") .description("A comma-separated list of column names to be used in the query. If your database requires " + "special treatment of the names (quoting, e.g.), each name should include such treatment. If no " - + "column names are supplied, all columns in the specified table will be returned.\nNOTE: If Expression Language is present " - + "for this property and it refers to flow file attribute(s), then the Table Name property must also contain Expression Language.") + + "column names are supplied, all columns in the specified table will be returned. NOTE: It is important " + + "to use consistent column names for a given table for incremental fetch to work properly.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -124,8 +124,8 @@ public abstract class AbstractDatabaseFetchProcessor extends AbstractSessionFact + "can be used to retrieve only those rows that have been added/updated since the last retrieval. Note that some " + "JDBC types such as bit/boolean are not conducive to maintaining maximum value, so columns of these " + "types should not be listed in this property, and will result in error(s) during processing. If no columns " - + "are provided, all rows from the table will be considered, which could have a performance impact.\nNOTE: If Expression Language is " - + "present for this property and it refers to flow file attribute(s), then the Table Name property must also contain Expression Language.") + + "are provided, all rows from the table will be considered, which could have a performance impact. NOTE: It is important " + + "to use consistent max-value column names for a given table for incremental fetch to work properly.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true)