From ed0192246fd66b74424f5d98e610d97ab9f85eaf Mon Sep 17 00:00:00 2001 From: Justin Taras Date: Fri, 1 Apr 2022 09:28:50 -0400 Subject: [PATCH] add schema support for oracle: PLUGIN-1146 --- .../widgets/AuroraMysql-batchsink.json | 5 ++ .../widgets/AuroraPostgres-batchsink.json | 5 ++ .../widgets/CloudSQLMySQL-batchsink.json | 5 ++ .../widgets/CloudSQLPostgreSQL-batchsink.json | 5 ++ .../config/AbstractDBSpecificSinkConfig.java | 12 +++++ .../db/batch/config/DatabaseSinkConfig.java | 5 ++ .../plugin/db/batch/sink/AbstractDBSink.java | 54 +++++++++++++------ db2-plugin/widgets/Db2-batchsink.json | 5 ++ .../widgets/Database-batchsink.json | 5 ++ mariadb-plugin/widgets/Mariadb-batchsink.json | 5 ++ memsql-plugin/widgets/Memsql-batchsink.json | 5 ++ mssql-plugin/widgets/SqlServer-batchsink.json | 5 ++ mysql-plugin/widgets/Mysql-batchsink.json | 5 ++ oracle-plugin/widgets/Oracle-batchsink.json | 5 ++ 14 files changed, 109 insertions(+), 17 deletions(-) diff --git a/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json b/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json index cbe2e2df6..175df275b 100644 --- a/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json +++ b/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json @@ -45,6 +45,11 @@ "widget-type": "textbox", "label": "Table Name", "name": "tableName" + }, + { + "widget-type": "hidden", + "label": "Schema Name", + "name": "dbSchemaName" } ] }, diff --git a/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json b/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json index ae1a6bac7..14a8200f7 100644 --- a/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json +++ b/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json @@ -45,6 +45,11 @@ "widget-type": "textbox", "label": "Table Name", "name": "tableName" + }, + { + "widget-type": "hidden", + "label": "Schema Name", + "name": "dbSchemaName" } ] }, diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json index c4531d1f6..1355b0c9f 100644 --- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json +++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json @@ -121,6 +121,11 @@ "widget-attributes": { "placeholder": "The table to write to" } + }, + { + "widget-type": "hidden", + "label": "Schema Name", + "name": "dbSchemaName" } ] }, diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json index e9afc4f86..f55b6b7f4 100644 --- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json +++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json @@ -121,6 +121,11 @@ "widget-attributes": { "placeholder": "The table to write to" } + }, + { + "widget-type": "hidden", + "label": "Schema Name", + "name": "dbSchemaName" } ] }, diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSinkConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSinkConfig.java index f4a2e49eb..fb9804144 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSinkConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/config/AbstractDBSpecificSinkConfig.java @@ -35,6 +35,7 @@ */ public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implements DatabaseSinkConfig { public static final String TABLE_NAME = "tableName"; + public static final String DB_SCHEMA_NAME = "dbSchemaName"; public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel"; @Name(Constants.Reference.REFERENCE_NAME) @@ -46,11 +47,22 @@ public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implemen @Macro private String tableName; + @Name(DB_SCHEMA_NAME) + @Description("Name of the database schema of table.") + @Macro + @Nullable + public String dbSchemaName; + @Override public String getTableName() { return tableName; } + @Override + public String getDBSchemaName() { + return dbSchemaName; + } + /** * Adds escape characters (back quotes, double quotes, etc.) to the table name for * databases with case-sensitive identifiers. diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/config/DatabaseSinkConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/config/DatabaseSinkConfig.java index 645b9e14f..a98340a49 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/config/DatabaseSinkConfig.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/config/DatabaseSinkConfig.java @@ -50,6 +50,11 @@ public interface DatabaseSinkConfig extends DatabaseConnectionConfig { */ String getTableName(); + /** + * @return the schema name + */ + String getDBSchemaName(); + /** * Adds escape characters (back quotes, double quotes, etc.) to the table name for * databases with case-sensitive identifiers. diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java index 9ca18f4ee..bbd2529e6 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/sink/AbstractDBSink.java @@ -68,6 +68,7 @@ import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; +import javax.annotation.Nullable; /** * Sink that can be configured to export data to a database table. @@ -108,7 +109,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { Class driverClass = DBUtils.getDriverClass( pipelineConfigurer, dbSinkConfig, ConnectionConfig.JDBC_PLUGIN_TYPE); if (driverClass != null && dbSinkConfig.canConnect()) { - validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema); + validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema, dbSinkConfig.getDBSchemaName()); } } @@ -116,8 +117,9 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { public void prepareRun(BatchSinkContext context) { String connectionString = dbSinkConfig.getConnectionString(); - LOG.debug("tableName = {}; pluginType = {}; pluginName = {}; connectionString = {};", + LOG.debug("tableName = {}; schemaName = {}, pluginType = {}; pluginName = {}; connectionString = {};", dbSinkConfig.getTableName(), + dbSinkConfig.getDBSchemaName(), ConnectionConfig.JDBC_PLUGIN_TYPE, dbSinkConfig.getJdbcPluginName(), connectionString); @@ -130,7 +132,8 @@ public void prepareRun(BatchSinkContext context) { try { if (Objects.nonNull(outputSchema)) { FailureCollector collector = context.getFailureCollector(); - validateSchema(collector, driverClass, dbSinkConfig.getTableName(), outputSchema); + validateSchema(collector, driverClass, dbSinkConfig.getTableName(), + outputSchema, dbSinkConfig.getDBSchemaName()); collector.getOrThrowException(); } else { outputSchema = inferSchema(driverClass); @@ -148,8 +151,9 @@ public void prepareRun(BatchSinkContext context) { configAccessor.setInitQueries(dbSinkConfig.getInitQueries()); configAccessor.getConfiguration().set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClass.getName()); configAccessor.getConfiguration().set(DBConfiguration.URL_PROPERTY, connectionString); - configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, - dbSinkConfig.getEscapedTableName()); + String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName() + : dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName(); + configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, fullyQualifiedTableName); configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, dbColumns); if (dbSinkConfig.getUser() != null) { configAccessor.getConfiguration().set(DBConfiguration.USERNAME_PROPERTY, dbSinkConfig.getUser()); @@ -201,6 +205,8 @@ public void initialize(BatchRuntimeContext context) throws Exception { private Schema inferSchema(Class driverClass) { List inferredFields = new ArrayList<>(); + String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName() + : dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName(); try { DBUtils.ensureJDBCDriverIsAvailable(driverClass, dbSinkConfig.getConnectionString(), dbSinkConfig.getJdbcPluginName()); @@ -211,7 +217,7 @@ private Schema inferSchema(Class driverClass) { executeInitQueries(connection, dbSinkConfig.getInitQueries()); try (Statement statement = connection.createStatement(); - ResultSet rs = statement.executeQuery("SELECT * FROM " + dbSinkConfig.getEscapedTableName() + ResultSet rs = statement.executeQuery("SELECT * FROM " + fullyQualifiedTableName + " WHERE 1 = 0")) { inferredFields.addAll(getSchemaReader().getSchemaFields(rs)); } @@ -249,6 +255,8 @@ public void destroy() { private void setResultSetMetadata() throws Exception { List columnTypes = new ArrayList<>(columns.size()); String connectionString = dbSinkConfig.getConnectionString(); + String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName() + : dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName(); driverCleanup = DBUtils .ensureJDBCDriverIsAvailable(driverClass, connectionString, dbSinkConfig.getJdbcPluginName()); @@ -261,8 +269,7 @@ private void setResultSetMetadata() throws Exception { // Run a query against the DB table that returns 0 records, but returns valid ResultSetMetadata // that can be used to construct DBRecord objects to sink to the database table. ResultSet rs = statement.executeQuery(String.format("SELECT %s FROM %s WHERE 1 = 0", - dbColumns, - dbSinkConfig.getEscapedTableName())) + dbColumns, fullyQualifiedTableName)) ) { ResultSetMetaData resultSetMetadata = rs.getMetaData(); columnTypes.addAll(getMatchedColumnTypeList(resultSetMetadata, columns)); @@ -296,15 +303,17 @@ static List getMatchedColumnTypeList(ResultSetMetaData resultSetMeta } private void validateSchema(FailureCollector collector, Class jdbcDriverClass, String tableName, - Schema inputSchema) { + Schema inputSchema, String dbSchemaName) { String connectionString = dbSinkConfig.getConnectionString(); - + String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName() + : dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName(); try { DBUtils.ensureJDBCDriverIsAvailable(jdbcDriverClass, connectionString, dbSinkConfig.getJdbcPluginName()); } catch (IllegalAccessException | InstantiationException | SQLException e) { collector.addFailure(String.format("Unable to load or register JDBC driver '%s' while checking for " + "the existence of the database table '%s'.", - jdbcDriverClass, tableName), null).withStacktrace(e.getStackTrace()); + jdbcDriverClass, fullyQualifiedTableName), + null).withStacktrace(e.getStackTrace()); throw collector.getOrThrowException(); } @@ -312,12 +321,12 @@ private void validateSchema(FailureCollector collector, Class connectionProperties.putAll(dbSinkConfig.getConnectionArguments()); try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) { executeInitQueries(connection, dbSinkConfig.getInitQueries()); - try (ResultSet tables = connection.getMetaData().getTables(null, null, tableName, null)) { + try (ResultSet tables = connection.getMetaData().getTables(null, dbSchemaName, tableName, null)) { if (!tables.next()) { collector.addFailure( String.format("Table '%s' does not exist.", tableName), - String.format("Ensure table '%s' is set correctly and that the connection string '%s' points " + - "to a valid database.", tableName, connectionString)) + String.format("Ensure table '%s' is set correctly and that the connection string '%s' " + + "points to a valid database.", fullyQualifiedTableName, connectionString)) .withConfigProperty(DBSinkConfig.TABLE_NAME); return; } @@ -325,16 +334,16 @@ private void validateSchema(FailureCollector collector, Class setColumnsInfo(inputSchema.getFields()); try (PreparedStatement pStmt = connection.prepareStatement(String.format("SELECT %s FROM %s WHERE 1 = 0", dbColumns, - dbSinkConfig.getEscapedTableName())); + fullyQualifiedTableName)); ResultSet rs = pStmt.executeQuery()) { getFieldsValidator().validateFields(inputSchema, rs, collector); } } catch (SQLException e) { LOG.error("Exception while trying to validate schema of database table {} for connection {}.", - tableName, connectionString, e); + fullyQualifiedTableName, connectionString, e); collector.addFailure( String.format("Exception while trying to validate schema of database table '%s' for connection '%s' with %s", - tableName, connectionString, e.getMessage()), + fullyQualifiedTableName, connectionString, e.getMessage()), null).withStacktrace(e.getStackTrace()); } } @@ -365,6 +374,7 @@ private void executeInitQueries(Connection connection, List initQueries) */ public abstract static class DBSinkConfig extends DBConfig implements DatabaseSinkConfig { public static final String TABLE_NAME = "tableName"; + public static final String DB_SCHEMA_NAME = "dbSchemaName"; public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel"; @Name(TABLE_NAME) @@ -372,10 +382,20 @@ public abstract static class DBSinkConfig extends DBConfig implements DatabaseSi @Macro public String tableName; + @Name(DB_SCHEMA_NAME) + @Description("Name of the database schema of table.") + @Macro + @Nullable + public String dbSchemaName; + public String getTableName() { return tableName; } + public String getDBSchemaName() { + return dbSchemaName; + } + /** * Adds escape characters (back quotes, double quotes, etc.) to the table name for * databases with case-sensitive identifiers. diff --git a/db2-plugin/widgets/Db2-batchsink.json b/db2-plugin/widgets/Db2-batchsink.json index f0e13cd4a..3f2d2ab92 100644 --- a/db2-plugin/widgets/Db2-batchsink.json +++ b/db2-plugin/widgets/Db2-batchsink.json @@ -48,6 +48,11 @@ "widget-type": "textbox", "label": "Table Name", "name": "tableName" + }, + { + "widget-type": "hidden", + "label": "Schema Name", + "name": "dbSchemaName" } ] }, diff --git a/generic-database-plugin/widgets/Database-batchsink.json b/generic-database-plugin/widgets/Database-batchsink.json index 495cd5eb2..6b85fa583 100644 --- a/generic-database-plugin/widgets/Database-batchsink.json +++ b/generic-database-plugin/widgets/Database-batchsink.json @@ -29,6 +29,11 @@ "widget-type": "textbox", "label": "Table Name", "name": "tableName" + }, + { + "widget-type": "hidden", + "label": "Schema Name", + "name": "dbSchemaName" } ] }, diff --git a/mariadb-plugin/widgets/Mariadb-batchsink.json b/mariadb-plugin/widgets/Mariadb-batchsink.json index 5ef662436..2aa5ede2b 100644 --- a/mariadb-plugin/widgets/Mariadb-batchsink.json +++ b/mariadb-plugin/widgets/Mariadb-batchsink.json @@ -40,6 +40,11 @@ "widget-type": "textbox", "label": "Table Name", "name": "tableName" + }, + { + "widget-type": "hidden", + "label": "Schema Name", + "name": "dbSchemaName" } ] }, diff --git a/memsql-plugin/widgets/Memsql-batchsink.json b/memsql-plugin/widgets/Memsql-batchsink.json index 7067af5dc..a4975aea2 100644 --- a/memsql-plugin/widgets/Memsql-batchsink.json +++ b/memsql-plugin/widgets/Memsql-batchsink.json @@ -40,6 +40,11 @@ "widget-type": "textbox", "label": "Table Name", "name": "tableName" + }, + { + "widget-type": "hidden", + "label": "Schema Name", + "name": "dbSchemaName" } ] }, diff --git a/mssql-plugin/widgets/SqlServer-batchsink.json b/mssql-plugin/widgets/SqlServer-batchsink.json index bbeb25212..98ce28d15 100644 --- a/mssql-plugin/widgets/SqlServer-batchsink.json +++ b/mssql-plugin/widgets/SqlServer-batchsink.json @@ -126,6 +126,11 @@ "widget-type": "textbox", "label": "Table Name", "name": "tableName" + }, + { + "widget-type": "hidden", + "label": "Schema Name", + "name": "dbSchemaName" } ] }, diff --git a/mysql-plugin/widgets/Mysql-batchsink.json b/mysql-plugin/widgets/Mysql-batchsink.json index 34b0c77de..b360daa11 100644 --- a/mysql-plugin/widgets/Mysql-batchsink.json +++ b/mysql-plugin/widgets/Mysql-batchsink.json @@ -107,6 +107,11 @@ "widget-type": "textbox", "label": "Table Name", "name": "tableName" + }, + { + "widget-type": "hidden", + "label": "Schema Name", + "name": "dbSchemaName" } ] }, diff --git a/oracle-plugin/widgets/Oracle-batchsink.json b/oracle-plugin/widgets/Oracle-batchsink.json index 17eb150bd..ee9b93572 100644 --- a/oracle-plugin/widgets/Oracle-batchsink.json +++ b/oracle-plugin/widgets/Oracle-batchsink.json @@ -154,6 +154,11 @@ "widget-type": "textbox", "label": "Table Name", "name": "tableName" + }, + { + "widget-type": "textbox", + "label": "Schema Name", + "name": "dbSchemaName" } ] },