From c33da1d7f3635225f74ae4a722fbb49440e5e72a Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 6 Jul 2016 22:34:37 -0400 Subject: [PATCH 1/2] NIFI-2156: Add ListDatabaseTables processor --- .../standard/ListDatabaseTables.java | 304 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../standard/TestListDatabaseTables.java | 168 ++++++++++ 3 files changed, 473 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java new file mode 100644 index 000000000000..3e58434922cf --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A processor to retrieve a list of tables (and their metadata) from a database connection + */ +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "list", "jdbc", "table", "database"}) +@CapabilityDescription("Generates a set of flow files, each containing attributes corresponding to metadata about a table from a database connection.") +@WritesAttributes({ + @WritesAttribute(attribute = "db.table.name", description = "Contains the name of a database table from the connection"), + @WritesAttribute(attribute = "db.table.catalog", description = "Contains the name of the catalog to which the table belongs (may be null)"), + @WritesAttribute(attribute = "db.table.schema", description = "Contains the name of the schema to which the table belongs (may be null)"), + @WritesAttribute(attribute = "db.table.fullname", description = "Contains the fully-qualifed table name (possibly including catalog, schema, etc.)"), + @WritesAttribute(attribute = "db.table.type", + description = "Contains the type of the database table from the connection. Typical types are \"TABLE\", \"VIEW\", \"SYSTEM TABLE\", " + + "\"GLOBAL TEMPORARY\", \"LOCAL TEMPORARY\", \"ALIAS\", \"SYNONYM\""), + @WritesAttribute(attribute = "db.table.remarks", description = "Contains the name of a database table from the connection"), + @WritesAttribute(attribute = "db.table.count", description = "Contains the number of rows in the table") +}) +@Stateful(scopes = {Scope.LOCAL}, description = "After performing a listing of tables, the timestamp of the query is stored. " + + "This allows the Processor to not re-list tables the next time that the Processor is run. Changing any of the processor properties will " + + "indicate that the processor should reset state and thus re-list the tables using the new configuration. This processor is meant to be " + + "run on the primary node only.") +public class ListDatabaseTables extends AbstractProcessor { + + // Attribute names + public static final String DB_TABLE_NAME = "db.table.name"; + public static final String DB_TABLE_CATALOG = "db.table.catalog"; + public static final String DB_TABLE_SCHEMA = "db.table.schema"; + public static final String DB_TABLE_FULLNAME = "db.table.fullname"; + public static final String DB_TABLE_TYPE = "db.table.type"; + public static final String DB_TABLE_REMARKS = "db.table.remarks"; + public static final String DB_TABLE_COUNT = "db.table.count"; + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are received are routed to success") + .build(); + + // Property descriptors + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("list-db-tables-db-connection") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder() + .name("list-db-tables-catalog") + .displayName("Catalog") + .description("The name of a catalog from which to list database tables. The name must match the catalog name as it is stored in the database. " + + "If the property is not set, the catalog name will not be used to narrow the search for tables. If the property is set to an empty string, " + + "tables without a catalog will be listed.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor SCHEMA_PATTERN = new PropertyDescriptor.Builder() + .name("list-db-tables-schema-pattern") + .displayName("Schema Pattern") + .description("A pattern for matching schemas in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, " + + "and \"_\" means match any one character. The pattern must match the schema name as it is stored in the database. " + + "If the property is not set, the schema name will not be used to narrow the search for tables. If the property is set to an empty string, " + + "tables without a schema will be listed.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor TABLE_NAME_PATTERN = new PropertyDescriptor.Builder() + .name("list-db-tables-name-pattern") + .displayName("Table Name Pattern") + .description("A pattern for matching tables in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, " + + "and \"_\" means match any one character. The pattern must match the table name as it is stored in the database. " + + "If the property is not set, all tables will be retrieved.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor TABLE_TYPES = new PropertyDescriptor.Builder() + .name("list-db-tables-types") + .displayName("Table Types") + .description("A comma-separated list of table types to include. For example, some databases support TABLE and VIEW types. If the property is not set, " + + "tables of all types will be returned.") + .required(false) + .defaultValue("TABLE") + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor INCLUDE_COUNT = new PropertyDescriptor.Builder() + .name("list-db-include-count") + .displayName("Include Count") + .description("Whether to include the table's row count as a flow file attribute. This affects performance as a database query will be generated " + + "for each table in the retrieved list.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + private static final List propertyDescriptors; + private static final Set relationships; + + private boolean resetState = false; + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(DBCP_SERVICE); + _propertyDescriptors.add(CATALOG); + _propertyDescriptors.add(SCHEMA_PATTERN); + _propertyDescriptors.add(TABLE_NAME_PATTERN); + _propertyDescriptors.add(TABLE_TYPES); + _propertyDescriptors.add(INCLUDE_COUNT); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @OnScheduled + public void setup(ProcessContext context) { + try { + if (resetState) { + context.getStateManager().clear(Scope.LOCAL); + resetState = false; + } + } catch (IOException ioe) { + throw new ProcessException(ioe); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final ComponentLog logger = getLogger(); + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String catalog = context.getProperty(CATALOG).getValue(); + final String schemaPattern = context.getProperty(SCHEMA_PATTERN).getValue(); + final String tableNamePattern = context.getProperty(TABLE_NAME_PATTERN).getValue(); + final String[] tableTypes = context.getProperty(TABLE_TYPES).isSet() + ? context.getProperty(TABLE_TYPES).getValue().split("\\s*,\\s*") + : null; + final boolean includeCount = context.getProperty(INCLUDE_COUNT).asBoolean(); + + final StateManager stateManager = context.getStateManager(); + final StateMap stateMap; + final Map stateMapProperties; + try { + stateMap = stateManager.getState(Scope.LOCAL); + stateMapProperties = new HashMap<>(stateMap.toMap()); + } catch (IOException ioe) { + throw new ProcessException(ioe); + } + + try (final Connection con = dbcpService.getConnection()) { + + DatabaseMetaData dbMetaData = con.getMetaData(); + ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes); + while (rs.next()) { + final String tableCatalog = rs.getString(1); + final String tableSchema = rs.getString(2); + final String tableName = rs.getString(3); + final String tableType = rs.getString(4); + final String tableRemarks = rs.getString(5); + + // Build fully-qualified name + String fqn = Stream.of(tableCatalog, tableSchema, tableName) + .filter(segment -> !StringUtils.isEmpty(segment)) + .collect(Collectors.joining(".")); + + String fqTableName = stateMap.get(fqn); + if (fqTableName == null) { + FlowFile flowFile = session.create(); + logger.info("Found {}: {}", new Object[]{tableType, fqn}); + if (includeCount) { + try (Statement st = con.createStatement()) { + final String countQuery = "SELECT COUNT(1) FROM " + fqn; + + logger.debug("Executing query: {}", new Object[]{countQuery}); + ResultSet countResult = st.executeQuery(countQuery); + if (countResult.next()) { + flowFile = session.putAttribute(flowFile, DB_TABLE_COUNT, Long.toString(countResult.getLong(1))); + } + } catch (SQLException se) { + logger.error("Couldn't get row count for {}", new Object[]{fqn}); + session.remove(flowFile); + continue; + } + } + if (tableCatalog != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_CATALOG, tableCatalog); + } + if (tableSchema != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_SCHEMA, tableSchema); + } + flowFile = session.putAttribute(flowFile, DB_TABLE_NAME, tableName); + flowFile = session.putAttribute(flowFile, DB_TABLE_FULLNAME, fqn); + flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE, tableType); + if (tableRemarks != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_REMARKS, tableRemarks); + } + + String transitUri; + try { + transitUri = dbMetaData.getURL(); + } catch (SQLException sqle) { + transitUri = ""; + } + session.getProvenanceReporter().receive(flowFile, transitUri); + session.transfer(flowFile, REL_SUCCESS); + stateMapProperties.put(fqn, Long.toString(System.currentTimeMillis())); + } + } + stateManager.replace(stateMap, stateMapProperties, Scope.LOCAL); + + } catch (final SQLException | IOException e) { + throw new ProcessException(e); + } + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); + // If any of the properties that define the retrieved list have changed, then reset the state + if (DBCP_SERVICE.equals(descriptor) + || CATALOG.equals(descriptor) + || SCHEMA_PATTERN.equals(descriptor) + || TABLE_NAME_PATTERN.equals(descriptor) + || TABLE_TYPES.equals(descriptor) + || INCLUDE_COUNT.equals(descriptor)) { + resetState = true; + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 6ed7f5b25f6a..6399d3bb941b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -45,6 +45,7 @@ org.apache.nifi.processors.standard.InvokeHTTP org.apache.nifi.processors.standard.JoltTransformJSON org.apache.nifi.processors.standard.GetJMSQueue org.apache.nifi.processors.standard.GetJMSTopic +org.apache.nifi.processors.standard.ListDatabaseTables org.apache.nifi.processors.standard.ListFile org.apache.nifi.processors.standard.ListenHTTP org.apache.nifi.processors.standard.ListenRELP diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java new file mode 100644 index 000000000000..005cb40179d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + + +/** + * Unit tests for ListDatabaseTables processor. + */ +public class TestListDatabaseTables { + + TestRunner runner; + ListDatabaseTables processor; + + private final static String DB_LOCATION = "target/db_ldt"; + + @BeforeClass + public static void setupBeforeClass() throws IOException { + System.setProperty("derby.stream.error.file", "target/derby.log"); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + } + + @AfterClass + public static void cleanUpAfterClass() throws Exception { + try { + DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); + } catch (SQLNonTransientConnectionException e) { + // Do nothing, this is what happens at Derby shutdown + } + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + } + + @Before + public void setUp() throws Exception { + processor = new ListDatabaseTables(); + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map dbcpProperties = new HashMap<>(); + + runner = TestRunners.newTestRunner(ListDatabaseTables.class); + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(ListDatabaseTables.DBCP_SERVICE, "dbcp"); + } + + @Test + public void testListTablesNoCount() throws Exception { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_TABLE1"); + stmt.execute("drop table TEST_TABLE2"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))"); + stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))"); + + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2); + // Already got these tables, shouldn't get them again + runner.clearTransferState(); + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 0); + } + + @Test + public void testListTablesWithCount() throws Exception { + runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true"); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_TABLE1"); + stmt.execute("drop table TEST_TABLE2"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))"); + stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1, 1)"); + stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))"); + + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2); + List results = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS); + assertEquals("2", results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT)); + assertEquals("0", results.get(1).getAttribute(ListDatabaseTables.DB_TABLE_COUNT)); + + } + + /** + * Simple implementation only for ListDatabaseTables processor testing. + */ + class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + return con; + } catch (final Exception e) { + throw new ProcessException("getConnection failed: " + e); + } + } + } +} \ No newline at end of file From 4037dbe540df0d3f98927cd66296f1310c53b9ef Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 6 Jul 2016 22:35:55 -0400 Subject: [PATCH 2/2] NIFI-2157: Add GenerateTableFetch processor --- .../standard/GenerateTableFetch.java | 207 ++++++++++++++++++ .../standard/db/DatabaseAdapter.java | 38 ++++ .../db/impl/GenericDatabaseAdapter.java | 66 ++++++ .../db/impl/OracleDatabaseAdapter.java | 83 +++++++ .../org.apache.nifi.processor.Processor | 1 + ...ifi.processors.standard.db.DatabaseAdapter | 16 ++ .../standard/TestGenerateTableFetch.java | 157 +++++++++++++ 7 files changed, 568 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java new file mode 100644 index 000000000000..fd90f5319da8 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.db.DatabaseAdapter; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; + + +@EventDriven +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"sql", "select", "jdbc", "query", "database", "fetch"}) +@CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the db.table.count " + + "FlowFile attribute, determine the size and number of pages and generated FlowFiles.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "db.table.fullname", description = "Contains the fully-qualified name of the table to fetch rows from."), + @ReadsAttribute(attribute = "db.table.count", description = "Contains the number of rows in the specified table") +}) +public class GenerateTableFetch extends AbstractProcessor { + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile containing SQL query.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Failed to process the incoming FlowFile.") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFile input") + .build(); + + private static final Set relationships; + + public static final PropertyDescriptor DB_TYPE; + + public static final PropertyDescriptor COLUMN_NAMES = new PropertyDescriptor.Builder() + .name("gen-table-fetch-column-names") + .displayName("Columns to Return") + .description("A comma-separated list of column names to be used in the query. If your database requires " + + "special treatment of the names (quoting, e.g.), each name should include such treatment. If no " + + "column names are supplied, all columns in the specified table will be returned.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder() + .name("gen-table-fetch-partition-size") + .displayName("Partition Size") + .description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in " + + "the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A " + + "value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows " + + "in the table.") + .defaultValue("10000") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + + private static final List propDescriptors; + protected final static Map dbAdapters = new HashMap<>(); + + static { + final Set r = new HashSet<>(); + r.add(REL_SUCCESS); + r.add(REL_FAILURE); + r.add(REL_ORIGINAL); + relationships = Collections.unmodifiableSet(r); + + + // Load the DatabaseAdapters + ServiceLoader dbAdapterLoader = ServiceLoader.load(DatabaseAdapter.class); + dbAdapterLoader.forEach(it -> dbAdapters.put(it.getName(), it)); + + DB_TYPE = new PropertyDescriptor.Builder() + .name("gen-table-fetch-db-type") + .displayName("Database Type") + .description("The type/flavor of database, used for generating database-specific code. In many cases the Generic type " + + "should suffice, but some databases (such as Oracle) require custom SQL clauses. ") + .allowableValues(dbAdapters.keySet()) + .defaultValue(dbAdapters.values().stream().findFirst().get().getName()) + .required(true) + .build(); + + + final List pds = new ArrayList<>(); + pds.add(COLUMN_NAMES); + pds.add(DB_TYPE); + pds.add(PARTITION_SIZE); + propDescriptors = Collections.unmodifiableList(pds); + + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return propDescriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ComponentLog logger = getLogger(); + + final String tableName = flowFile.getAttribute(ListDatabaseTables.DB_TABLE_FULLNAME); + if (StringUtils.isEmpty(tableName)) { + logger.error("FlowFile attribute " + ListDatabaseTables.DB_TABLE_FULLNAME + " not set, routing to failure"); + session.transfer(flowFile, REL_FAILURE); + return; + } + final String rowCountAttribute = flowFile.getAttribute(ListDatabaseTables.DB_TABLE_COUNT); + if (StringUtils.isEmpty(rowCountAttribute)) { + logger.error("FlowFile attribute " + ListDatabaseTables.DB_TABLE_COUNT + " not set, routing to failure"); + session.transfer(flowFile, REL_FAILURE); + return; + } + final int rowCount = Integer.parseInt(rowCountAttribute); + final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); + final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(flowFile).asInteger(); + final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); + + final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); + + // If the partition size is zero, get everything + if (partitionSize == 0) { + String query = dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null); + FlowFile sqlFlowFile = session.create(flowFile); + sqlFlowFile = session.write(sqlFlowFile, out -> { + out.write(query.getBytes()); + }); + session.transfer(sqlFlowFile, REL_SUCCESS); + } else { + + // Generate SQL statements to read "pages" of data + for (int i = 0; i < numberOfFetches; i++) { + FlowFile sqlFlowFile = null; + try { + String query = dbAdapter.getSelectStatement(tableName, columnNames, null, null, partitionSize, i * partitionSize); + sqlFlowFile = session.create(flowFile); + sqlFlowFile = session.write(sqlFlowFile, out -> { + out.write(query.getBytes()); + }); + session.transfer(sqlFlowFile, REL_SUCCESS); + + } catch (Exception e) { + logger.error("Error while generating SQL statement", e); + if (sqlFlowFile != null) { + session.remove(sqlFlowFile); + } + } + } + } + session.transfer(flowFile, REL_ORIGINAL); + + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java new file mode 100644 index 000000000000..21ab3318db88 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/DatabaseAdapter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.db; + +/** + * Interface for RDBMS/JDBC-specific code. + */ +public interface DatabaseAdapter { + + String getName(); + + /** + * Returns a SQL SELECT statement with the given clauses applied. + * + * @param tableName The name of the table to fetch rows from + * @param columnNames The names of the columns to fetch from the table + * @param whereClause The filter to apply to the statement. This should not include the WHERE keyword + * @param orderByClause The columns/clause used for ordering the result rows. This should not include the ORDER BY keywords + * @param limit The value for the LIMIT clause (i.e. the number of rows to return) + * @param offset The value for the OFFSET clause (i.e. the number of rows to skip) + * @return A String containing a SQL SELECT statement with the given clauses applied + */ + String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset); +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java new file mode 100644 index 000000000000..b5a606d48c96 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/GenericDatabaseAdapter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.db.impl; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processors.standard.db.DatabaseAdapter; + +/** + * A generic database adapter that generates ANSI SQL. + */ +public class GenericDatabaseAdapter implements DatabaseAdapter { + @Override + public String getName() { + return "Generic"; + } + + @Override + public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) { + if (StringUtils.isEmpty(tableName)) { + throw new IllegalArgumentException("Table name cannot be null or empty"); + } + final StringBuilder query = new StringBuilder("SELECT "); + if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) { + query.append("*"); + } else { + query.append(columnNames); + } + query.append(" FROM "); + query.append(tableName); + + if (!StringUtils.isEmpty(whereClause)) { + query.append(" WHERE "); + query.append(whereClause); + } + if (!StringUtils.isEmpty(orderByClause)) { + query.append(" ORDER BY "); + query.append(whereClause); + } + if (limit != null) { + query.append(" LIMIT "); + query.append(limit); + } + if (offset != null) { + query.append(" OFFSET "); + query.append(offset); + } + + return query.toString(); + } + + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java new file mode 100644 index 000000000000..905960fbcf55 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/OracleDatabaseAdapter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.db.impl; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processors.standard.db.DatabaseAdapter; + +/** + * A DatabaseAdapter that generates Oracle-compliant SQL. + */ +public class OracleDatabaseAdapter implements DatabaseAdapter { + @Override + public String getName() { + return "Oracle"; + } + + @Override + public String getSelectStatement(String tableName, String columnNames, String whereClause, String orderByClause, Integer limit, Integer offset) { + if (StringUtils.isEmpty(tableName)) { + throw new IllegalArgumentException("Table name cannot be null or empty"); + } + + final StringBuilder query = new StringBuilder(); + boolean nestedSelect = (limit != null || offset != null); + if (nestedSelect) { + // Need a nested SELECT query here in order to use ROWNUM to limit the results + query.append("SELECT "); + if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) { + query.append("*"); + } else { + query.append(columnNames); + } + query.append(" FROM (SELECT a.*, ROWNUM rnum FROM ("); + } + + query.append("SELECT "); + if (StringUtils.isEmpty(columnNames) || columnNames.trim().equals("*")) { + query.append("*"); + } else { + query.append(columnNames); + } + query.append(" FROM "); + query.append(tableName); + + + if (!StringUtils.isEmpty(whereClause)) { + query.append(" WHERE "); + query.append(whereClause); + } + if (!StringUtils.isEmpty(orderByClause)) { + query.append(" ORDER BY "); + query.append(whereClause); + } + if (nestedSelect) { + query.append(") a"); + int offsetVal = 0; + if (offset != null) { + offsetVal = offset; + } + if (limit != null) { + query.append(" WHERE ROWNUM <= "); + query.append(offsetVal + limit); + } + query.append(") WHERE rnum > "); + query.append(offsetVal); + } + return query.toString(); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 6399d3bb941b..d23d88dd1991 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -43,6 +43,7 @@ org.apache.nifi.processors.standard.HashContent org.apache.nifi.processors.standard.IdentifyMimeType org.apache.nifi.processors.standard.InvokeHTTP org.apache.nifi.processors.standard.JoltTransformJSON +org.apache.nifi.processors.standard.GenerateTableFetch org.apache.nifi.processors.standard.GetJMSQueue org.apache.nifi.processors.standard.GetJMSTopic org.apache.nifi.processors.standard.ListDatabaseTables diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter new file mode 100644 index 000000000000..0e3685a5ca57 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processors.standard.db.DatabaseAdapter @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter +org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java new file mode 100644 index 000000000000..f9da71009961 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateTableFetch.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.processors.standard.db.impl.GenericDatabaseAdapter; +import org.apache.nifi.processors.standard.db.impl.OracleDatabaseAdapter; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + + +/** + * Unit tests for the GenerateTableFetch processor. + */ +public class TestGenerateTableFetch { + + private GenerateTableFetch processor; + private TestRunner runner; + private DatabaseAdapter dbAdapter; + + @Before + public void setUp() throws Exception { + processor = new GenerateTableFetch(); + runner = TestRunners.newTestRunner(processor); + dbAdapter = new GenericDatabaseAdapter(); + } + + @Test + public void getSelectFromClauseGeneric() throws Exception { + assertEquals("SELECT * FROM myTable", dbAdapter.getSelectStatement("myTable", null, null, null, null, null)); + assertEquals("SELECT myCol1, myCol2 FROM myTable", dbAdapter.getSelectStatement("myTable", "myCol1, myCol2", null, null, null, null)); + assertEquals("SELECT * FROM myTable LIMIT 10", dbAdapter.getSelectStatement("myTable", null, null, null, 10, null)); + assertEquals("SELECT * FROM myTable OFFSET 10", dbAdapter.getSelectStatement("myTable", null, null, null, null, 10)); + assertEquals("SELECT * FROM myTable LIMIT 2 OFFSET 10", dbAdapter.getSelectStatement("myTable", null, null, null, 2, 10)); + } + + @Test + public void getSelectFromClauseOracle() throws Exception { + dbAdapter = new OracleDatabaseAdapter(); + assertEquals("SELECT * FROM myTable", + dbAdapter.getSelectStatement("myTable", null, null, null, null, null)); + assertEquals("SELECT myCol1, myCol2 FROM myTable", + dbAdapter.getSelectStatement("myTable", "myCol1, myCol2", null, null, null, null)); + assertEquals("SELECT * FROM (SELECT a.*, ROWNUM rnum FROM (SELECT * FROM myTable) a WHERE ROWNUM <= 10) WHERE rnum > 0", + dbAdapter.getSelectStatement("myTable", null, null, null, 10, null)); + assertEquals("SELECT * FROM (SELECT a.*, ROWNUM rnum FROM (SELECT * FROM myTable) a) WHERE rnum > 10", + dbAdapter.getSelectStatement("myTable", null, null, null, null, 10)); + assertEquals("SELECT * FROM (SELECT a.*, ROWNUM rnum FROM (SELECT * FROM myTable) a WHERE ROWNUM <= 12) WHERE rnum > 10", + dbAdapter.getSelectStatement("myTable", null, null, null, 2, 10)); + } + + @Test(expected = IllegalArgumentException.class) + public void getSelectFromClauseNullTableName() throws Exception { + assertEquals("SELECT * FROM myTable", dbAdapter.getSelectStatement(null, null, null, null, null, null)); + } + + @Test(expected = IllegalArgumentException.class) + public void getSelectFromClauseEmptyTableName() throws Exception { + assertEquals("SELECT * FROM myTable", dbAdapter.getSelectStatement("", null, null, null, null, null)); + } + + @Test + public void testOnTrigger() throws Exception { + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "5"); + runner.enqueue(new byte[0], new HashMap() {{ + put("db.table.fullname", "person"); + put("db.table.count", "100"); + }}); + + runner.run(); + runner.assertTransferCount(GenerateTableFetch.REL_SUCCESS, 20); + runner.assertTransferCount(GenerateTableFetch.REL_ORIGINAL, 1); + } + + @Test + public void testOnTriggerExtraPartition() throws Exception { + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "3"); + runner.enqueue(new byte[0], new HashMap() {{ + put("db.table.fullname", "person"); + put("db.table.count", "100"); + }}); + + runner.run(); + runner.assertTransferCount(GenerateTableFetch.REL_SUCCESS, 34); + runner.assertTransferCount(GenerateTableFetch.REL_ORIGINAL, 1); + } + + @Test + public void testOnTriggerFetchAll() throws Exception { + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "0"); + runner.setProperty(GenerateTableFetch.COLUMN_NAMES, "id, col1"); + runner.enqueue(new byte[0], new HashMap() {{ + put("db.table.fullname", "person"); + put("db.table.count", "100"); + }}); + + runner.run(); + runner.assertTransferCount(GenerateTableFetch.REL_SUCCESS, 1); + runner.assertTransferCount(GenerateTableFetch.REL_ORIGINAL, 1); + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(GenerateTableFetch.REL_SUCCESS).get(0); + assertNotNull(resultFlowFile); + resultFlowFile.assertContentEquals("SELECT id, col1 FROM person"); + } + + @Test + public void testOnTriggerNoCount() throws Exception { + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "10"); + runner.enqueue(new byte[0], new HashMap() {{ + put("db.table.fullname", "person"); + }}); + + runner.run(); + runner.assertTransferCount(GenerateTableFetch.REL_FAILURE, 1); + runner.assertTransferCount(GenerateTableFetch.REL_ORIGINAL, 0); + runner.assertTransferCount(GenerateTableFetch.REL_SUCCESS, 0); + } + + @Test + public void testOnTriggerNoTableName() throws Exception { + runner.setProperty(GenerateTableFetch.PARTITION_SIZE, "10"); + runner.enqueue(new byte[0]); + + runner.run(); + runner.assertTransferCount(GenerateTableFetch.REL_FAILURE, 1); + runner.assertTransferCount(GenerateTableFetch.REL_ORIGINAL, 0); + runner.assertTransferCount(GenerateTableFetch.REL_SUCCESS, 0); + } + + @Test + public void testOnTriggerNoIncomingFlowFile() throws Exception { + runner.setIncomingConnection(false); + runner.run(); + processor.getRelationships().forEach(rel -> runner.assertTransferCount(rel, 0)); + } +} \ No newline at end of file