From c33da1d7f3635225f74ae4a722fbb49440e5e72a Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 6 Jul 2016 22:34:37 -0400 Subject: [PATCH] NIFI-2156: Add ListDatabaseTables processor --- .../standard/ListDatabaseTables.java | 304 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../standard/TestListDatabaseTables.java | 168 ++++++++++ 3 files changed, 473 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java new file mode 100644 index 000000000000..3e58434922cf --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A processor to retrieve a list of tables (and their metadata) from a database connection + */ +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "list", "jdbc", "table", "database"}) +@CapabilityDescription("Generates a set of flow files, each containing attributes corresponding to metadata about a table from a database connection.") +@WritesAttributes({ + @WritesAttribute(attribute = "db.table.name", description = "Contains the name of a database table from the connection"), + @WritesAttribute(attribute = "db.table.catalog", description = "Contains the name of the catalog to which the table belongs (may be null)"), + @WritesAttribute(attribute = "db.table.schema", description = "Contains the name of the schema to which the table belongs (may be null)"), + @WritesAttribute(attribute = "db.table.fullname", description = "Contains the fully-qualifed table name (possibly including catalog, schema, etc.)"), + @WritesAttribute(attribute = "db.table.type", + description = "Contains the type of the database table from the connection. Typical types are \"TABLE\", \"VIEW\", \"SYSTEM TABLE\", " + + "\"GLOBAL TEMPORARY\", \"LOCAL TEMPORARY\", \"ALIAS\", \"SYNONYM\""), + @WritesAttribute(attribute = "db.table.remarks", description = "Contains the name of a database table from the connection"), + @WritesAttribute(attribute = "db.table.count", description = "Contains the number of rows in the table") +}) +@Stateful(scopes = {Scope.LOCAL}, description = "After performing a listing of tables, the timestamp of the query is stored. " + + "This allows the Processor to not re-list tables the next time that the Processor is run. Changing any of the processor properties will " + + "indicate that the processor should reset state and thus re-list the tables using the new configuration. This processor is meant to be " + + "run on the primary node only.") +public class ListDatabaseTables extends AbstractProcessor { + + // Attribute names + public static final String DB_TABLE_NAME = "db.table.name"; + public static final String DB_TABLE_CATALOG = "db.table.catalog"; + public static final String DB_TABLE_SCHEMA = "db.table.schema"; + public static final String DB_TABLE_FULLNAME = "db.table.fullname"; + public static final String DB_TABLE_TYPE = "db.table.type"; + public static final String DB_TABLE_REMARKS = "db.table.remarks"; + public static final String DB_TABLE_COUNT = "db.table.count"; + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are received are routed to success") + .build(); + + // Property descriptors + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("list-db-tables-db-connection") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder() + .name("list-db-tables-catalog") + .displayName("Catalog") + .description("The name of a catalog from which to list database tables. The name must match the catalog name as it is stored in the database. " + + "If the property is not set, the catalog name will not be used to narrow the search for tables. If the property is set to an empty string, " + + "tables without a catalog will be listed.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor SCHEMA_PATTERN = new PropertyDescriptor.Builder() + .name("list-db-tables-schema-pattern") + .displayName("Schema Pattern") + .description("A pattern for matching schemas in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, " + + "and \"_\" means match any one character. The pattern must match the schema name as it is stored in the database. " + + "If the property is not set, the schema name will not be used to narrow the search for tables. If the property is set to an empty string, " + + "tables without a schema will be listed.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor TABLE_NAME_PATTERN = new PropertyDescriptor.Builder() + .name("list-db-tables-name-pattern") + .displayName("Table Name Pattern") + .description("A pattern for matching tables in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, " + + "and \"_\" means match any one character. The pattern must match the table name as it is stored in the database. " + + "If the property is not set, all tables will be retrieved.") + .required(false) + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor TABLE_TYPES = new PropertyDescriptor.Builder() + .name("list-db-tables-types") + .displayName("Table Types") + .description("A comma-separated list of table types to include. For example, some databases support TABLE and VIEW types. If the property is not set, " + + "tables of all types will be returned.") + .required(false) + .defaultValue("TABLE") + .addValidator(Validator.VALID) + .build(); + + public static final PropertyDescriptor INCLUDE_COUNT = new PropertyDescriptor.Builder() + .name("list-db-include-count") + .displayName("Include Count") + .description("Whether to include the table's row count as a flow file attribute. This affects performance as a database query will be generated " + + "for each table in the retrieved list.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + private static final List propertyDescriptors; + private static final Set relationships; + + private boolean resetState = false; + + /* + * Will ensure that the list of property descriptors is build only once. + * Will also create a Set of relationships + */ + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(DBCP_SERVICE); + _propertyDescriptors.add(CATALOG); + _propertyDescriptors.add(SCHEMA_PATTERN); + _propertyDescriptors.add(TABLE_NAME_PATTERN); + _propertyDescriptors.add(TABLE_TYPES); + _propertyDescriptors.add(INCLUDE_COUNT); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @OnScheduled + public void setup(ProcessContext context) { + try { + if (resetState) { + context.getStateManager().clear(Scope.LOCAL); + resetState = false; + } + } catch (IOException ioe) { + throw new ProcessException(ioe); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final ComponentLog logger = getLogger(); + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final String catalog = context.getProperty(CATALOG).getValue(); + final String schemaPattern = context.getProperty(SCHEMA_PATTERN).getValue(); + final String tableNamePattern = context.getProperty(TABLE_NAME_PATTERN).getValue(); + final String[] tableTypes = context.getProperty(TABLE_TYPES).isSet() + ? context.getProperty(TABLE_TYPES).getValue().split("\\s*,\\s*") + : null; + final boolean includeCount = context.getProperty(INCLUDE_COUNT).asBoolean(); + + final StateManager stateManager = context.getStateManager(); + final StateMap stateMap; + final Map stateMapProperties; + try { + stateMap = stateManager.getState(Scope.LOCAL); + stateMapProperties = new HashMap<>(stateMap.toMap()); + } catch (IOException ioe) { + throw new ProcessException(ioe); + } + + try (final Connection con = dbcpService.getConnection()) { + + DatabaseMetaData dbMetaData = con.getMetaData(); + ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes); + while (rs.next()) { + final String tableCatalog = rs.getString(1); + final String tableSchema = rs.getString(2); + final String tableName = rs.getString(3); + final String tableType = rs.getString(4); + final String tableRemarks = rs.getString(5); + + // Build fully-qualified name + String fqn = Stream.of(tableCatalog, tableSchema, tableName) + .filter(segment -> !StringUtils.isEmpty(segment)) + .collect(Collectors.joining(".")); + + String fqTableName = stateMap.get(fqn); + if (fqTableName == null) { + FlowFile flowFile = session.create(); + logger.info("Found {}: {}", new Object[]{tableType, fqn}); + if (includeCount) { + try (Statement st = con.createStatement()) { + final String countQuery = "SELECT COUNT(1) FROM " + fqn; + + logger.debug("Executing query: {}", new Object[]{countQuery}); + ResultSet countResult = st.executeQuery(countQuery); + if (countResult.next()) { + flowFile = session.putAttribute(flowFile, DB_TABLE_COUNT, Long.toString(countResult.getLong(1))); + } + } catch (SQLException se) { + logger.error("Couldn't get row count for {}", new Object[]{fqn}); + session.remove(flowFile); + continue; + } + } + if (tableCatalog != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_CATALOG, tableCatalog); + } + if (tableSchema != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_SCHEMA, tableSchema); + } + flowFile = session.putAttribute(flowFile, DB_TABLE_NAME, tableName); + flowFile = session.putAttribute(flowFile, DB_TABLE_FULLNAME, fqn); + flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE, tableType); + if (tableRemarks != null) { + flowFile = session.putAttribute(flowFile, DB_TABLE_REMARKS, tableRemarks); + } + + String transitUri; + try { + transitUri = dbMetaData.getURL(); + } catch (SQLException sqle) { + transitUri = ""; + } + session.getProvenanceReporter().receive(flowFile, transitUri); + session.transfer(flowFile, REL_SUCCESS); + stateMapProperties.put(fqn, Long.toString(System.currentTimeMillis())); + } + } + stateManager.replace(stateMap, stateMapProperties, Scope.LOCAL); + + } catch (final SQLException | IOException e) { + throw new ProcessException(e); + } + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); + // If any of the properties that define the retrieved list have changed, then reset the state + if (DBCP_SERVICE.equals(descriptor) + || CATALOG.equals(descriptor) + || SCHEMA_PATTERN.equals(descriptor) + || TABLE_NAME_PATTERN.equals(descriptor) + || TABLE_TYPES.equals(descriptor) + || INCLUDE_COUNT.equals(descriptor)) { + resetState = true; + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 6ed7f5b25f6a..6399d3bb941b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -45,6 +45,7 @@ org.apache.nifi.processors.standard.InvokeHTTP org.apache.nifi.processors.standard.JoltTransformJSON org.apache.nifi.processors.standard.GetJMSQueue org.apache.nifi.processors.standard.GetJMSTopic +org.apache.nifi.processors.standard.ListDatabaseTables org.apache.nifi.processors.standard.ListFile org.apache.nifi.processors.standard.ListenHTTP org.apache.nifi.processors.standard.ListenRELP diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java new file mode 100644 index 000000000000..005cb40179d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + + +/** + * Unit tests for ListDatabaseTables processor. + */ +public class TestListDatabaseTables { + + TestRunner runner; + ListDatabaseTables processor; + + private final static String DB_LOCATION = "target/db_ldt"; + + @BeforeClass + public static void setupBeforeClass() throws IOException { + System.setProperty("derby.stream.error.file", "target/derby.log"); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + } + + @AfterClass + public static void cleanUpAfterClass() throws Exception { + try { + DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); + } catch (SQLNonTransientConnectionException e) { + // Do nothing, this is what happens at Derby shutdown + } + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + } + + @Before + public void setUp() throws Exception { + processor = new ListDatabaseTables(); + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map dbcpProperties = new HashMap<>(); + + runner = TestRunners.newTestRunner(ListDatabaseTables.class); + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(ListDatabaseTables.DBCP_SERVICE, "dbcp"); + } + + @Test + public void testListTablesNoCount() throws Exception { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_TABLE1"); + stmt.execute("drop table TEST_TABLE2"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))"); + stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))"); + + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2); + // Already got these tables, shouldn't get them again + runner.clearTransferState(); + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 0); + } + + @Test + public void testListTablesWithCount() throws Exception { + runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true"); + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_TABLE1"); + stmt.execute("drop table TEST_TABLE2"); + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))"); + stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1, 1)"); + stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))"); + + runner.run(); + runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2); + List results = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS); + assertEquals("2", results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT)); + assertEquals("0", results.get(1).getAttribute(ListDatabaseTables.DB_TABLE_COUNT)); + + } + + /** + * Simple implementation only for ListDatabaseTables processor testing. + */ + class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + final Connection con = DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + return con; + } catch (final Exception e) { + throw new ProcessException("getConnection failed: " + e); + } + } + } +} \ No newline at end of file