Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
"widget-type": "textbox",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
"widget-type": "textbox",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
}
]
},
Expand Down
5 changes: 5 additions & 0 deletions cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@
"widget-attributes": {
"placeholder": "The table to write to"
}
},
{
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@
"widget-attributes": {
"placeholder": "The table to write to"
}
},
{
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
*/
public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implements DatabaseSinkConfig {
public static final String TABLE_NAME = "tableName";
public static final String DB_SCHEMA_NAME = "dbSchemaName";
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";

@Name(Constants.Reference.REFERENCE_NAME)
Expand All @@ -46,11 +47,22 @@ public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implemen
@Macro
private String tableName;

@Name(DB_SCHEMA_NAME)
@Description("Name of the database schema of table.")
@Macro
@Nullable
public String dbSchemaName;

@Override
public String getTableName() {
return tableName;
}

@Override
public String getDBSchemaName() {
return dbSchemaName;
}

/**
* Adds escape characters (back quotes, double quotes, etc.) to the table name for
* databases with case-sensitive identifiers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public interface DatabaseSinkConfig extends DatabaseConnectionConfig {
*/
String getTableName();

/**
* @return the schema name
*/
String getDBSchemaName();

/**
* Adds escape characters (back quotes, double quotes, etc.) to the table name for
* databases with case-sensitive identifiers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
* Sink that can be configured to export data to a database table.
Expand Down Expand Up @@ -108,16 +109,17 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
Class<? extends Driver> driverClass = DBUtils.getDriverClass(
pipelineConfigurer, dbSinkConfig, ConnectionConfig.JDBC_PLUGIN_TYPE);
if (driverClass != null && dbSinkConfig.canConnect()) {
validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema);
validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema, dbSinkConfig.getDBSchemaName());
}
}

@Override
public void prepareRun(BatchSinkContext context) {
String connectionString = dbSinkConfig.getConnectionString();

LOG.debug("tableName = {}; pluginType = {}; pluginName = {}; connectionString = {};",
LOG.debug("tableName = {}; schemaName = {}, pluginType = {}; pluginName = {}; connectionString = {};",
dbSinkConfig.getTableName(),
dbSinkConfig.getDBSchemaName(),
ConnectionConfig.JDBC_PLUGIN_TYPE,
dbSinkConfig.getJdbcPluginName(),
connectionString);
Expand All @@ -130,7 +132,8 @@ public void prepareRun(BatchSinkContext context) {
try {
if (Objects.nonNull(outputSchema)) {
FailureCollector collector = context.getFailureCollector();
validateSchema(collector, driverClass, dbSinkConfig.getTableName(), outputSchema);
validateSchema(collector, driverClass, dbSinkConfig.getTableName(),
outputSchema, dbSinkConfig.getDBSchemaName());
collector.getOrThrowException();
} else {
outputSchema = inferSchema(driverClass);
Expand All @@ -148,8 +151,9 @@ public void prepareRun(BatchSinkContext context) {
configAccessor.setInitQueries(dbSinkConfig.getInitQueries());
configAccessor.getConfiguration().set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClass.getName());
configAccessor.getConfiguration().set(DBConfiguration.URL_PROPERTY, connectionString);
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY,
dbSinkConfig.getEscapedTableName());
String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, fullyQualifiedTableName);
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, dbColumns);
if (dbSinkConfig.getUser() != null) {
configAccessor.getConfiguration().set(DBConfiguration.USERNAME_PROPERTY, dbSinkConfig.getUser());
Expand Down Expand Up @@ -201,6 +205,8 @@ public void initialize(BatchRuntimeContext context) throws Exception {

private Schema inferSchema(Class<? extends Driver> driverClass) {
List<Schema.Field> inferredFields = new ArrayList<>();
String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();
try {
DBUtils.ensureJDBCDriverIsAvailable(driverClass, dbSinkConfig.getConnectionString(),
dbSinkConfig.getJdbcPluginName());
Expand All @@ -211,7 +217,7 @@ private Schema inferSchema(Class<? extends Driver> driverClass) {
executeInitQueries(connection, dbSinkConfig.getInitQueries());

try (Statement statement = connection.createStatement();
ResultSet rs = statement.executeQuery("SELECT * FROM " + dbSinkConfig.getEscapedTableName()
ResultSet rs = statement.executeQuery("SELECT * FROM " + fullyQualifiedTableName
+ " WHERE 1 = 0")) {
inferredFields.addAll(getSchemaReader().getSchemaFields(rs));
}
Expand Down Expand Up @@ -249,6 +255,8 @@ public void destroy() {
private void setResultSetMetadata() throws Exception {
List<ColumnType> columnTypes = new ArrayList<>(columns.size());
String connectionString = dbSinkConfig.getConnectionString();
String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();

driverCleanup = DBUtils
.ensureJDBCDriverIsAvailable(driverClass, connectionString, dbSinkConfig.getJdbcPluginName());
Expand All @@ -261,8 +269,7 @@ private void setResultSetMetadata() throws Exception {
// Run a query against the DB table that returns 0 records, but returns valid ResultSetMetadata
// that can be used to construct DBRecord objects to sink to the database table.
ResultSet rs = statement.executeQuery(String.format("SELECT %s FROM %s WHERE 1 = 0",
dbColumns,
dbSinkConfig.getEscapedTableName()))
dbColumns, fullyQualifiedTableName))
) {
ResultSetMetaData resultSetMetadata = rs.getMetaData();
columnTypes.addAll(getMatchedColumnTypeList(resultSetMetadata, columns));
Expand Down Expand Up @@ -296,45 +303,47 @@ static List<ColumnType> getMatchedColumnTypeList(ResultSetMetaData resultSetMeta
}

private void validateSchema(FailureCollector collector, Class<? extends Driver> jdbcDriverClass, String tableName,
Schema inputSchema) {
Schema inputSchema, String dbSchemaName) {
String connectionString = dbSinkConfig.getConnectionString();

String fullyQualifiedTableName = dbSinkConfig.getDBSchemaName() == null ? dbSinkConfig.getEscapedTableName()
: dbSinkConfig.getDBSchemaName() + "." + dbSinkConfig.getEscapedTableName();
try {
DBUtils.ensureJDBCDriverIsAvailable(jdbcDriverClass, connectionString, dbSinkConfig.getJdbcPluginName());
} catch (IllegalAccessException | InstantiationException | SQLException e) {
collector.addFailure(String.format("Unable to load or register JDBC driver '%s' while checking for " +
"the existence of the database table '%s'.",
jdbcDriverClass, tableName), null).withStacktrace(e.getStackTrace());
jdbcDriverClass, fullyQualifiedTableName),
null).withStacktrace(e.getStackTrace());
throw collector.getOrThrowException();
}

Properties connectionProperties = new Properties();
connectionProperties.putAll(dbSinkConfig.getConnectionArguments());
try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) {
executeInitQueries(connection, dbSinkConfig.getInitQueries());
try (ResultSet tables = connection.getMetaData().getTables(null, null, tableName, null)) {
try (ResultSet tables = connection.getMetaData().getTables(null, dbSchemaName, tableName, null)) {
if (!tables.next()) {
collector.addFailure(
String.format("Table '%s' does not exist.", tableName),
String.format("Ensure table '%s' is set correctly and that the connection string '%s' points " +
"to a valid database.", tableName, connectionString))
String.format("Ensure table '%s' is set correctly and that the connection string '%s' " +
"points to a valid database.", fullyQualifiedTableName, connectionString))
.withConfigProperty(DBSinkConfig.TABLE_NAME);
return;
}
}
setColumnsInfo(inputSchema.getFields());
try (PreparedStatement pStmt = connection.prepareStatement(String.format("SELECT %s FROM %s WHERE 1 = 0",
dbColumns,
dbSinkConfig.getEscapedTableName()));
fullyQualifiedTableName));
ResultSet rs = pStmt.executeQuery()) {
getFieldsValidator().validateFields(inputSchema, rs, collector);
}
} catch (SQLException e) {
LOG.error("Exception while trying to validate schema of database table {} for connection {}.",
tableName, connectionString, e);
fullyQualifiedTableName, connectionString, e);
collector.addFailure(
String.format("Exception while trying to validate schema of database table '%s' for connection '%s' with %s",
tableName, connectionString, e.getMessage()),
fullyQualifiedTableName, connectionString, e.getMessage()),
null).withStacktrace(e.getStackTrace());
}
}
Expand Down Expand Up @@ -365,17 +374,28 @@ private void executeInitQueries(Connection connection, List<String> initQueries)
*/
public abstract static class DBSinkConfig extends DBConfig implements DatabaseSinkConfig {
public static final String TABLE_NAME = "tableName";
public static final String DB_SCHEMA_NAME = "dbSchemaName";
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";

@Name(TABLE_NAME)
@Description("Name of the database table to write to.")
@Macro
public String tableName;

@Name(DB_SCHEMA_NAME)
@Description("Name of the database schema of table.")
@Macro
@Nullable
public String dbSchemaName;

public String getTableName() {
return tableName;
}

public String getDBSchemaName() {
return dbSchemaName;
}

/**
* Adds escape characters (back quotes, double quotes, etc.) to the table name for
* databases with case-sensitive identifiers.
Expand Down
5 changes: 5 additions & 0 deletions db2-plugin/widgets/Db2-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
"widget-type": "textbox",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
}
]
},
Expand Down
5 changes: 5 additions & 0 deletions generic-database-plugin/widgets/Database-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
"widget-type": "textbox",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
}
]
},
Expand Down
5 changes: 5 additions & 0 deletions mariadb-plugin/widgets/Mariadb-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
"widget-type": "textbox",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
}
]
},
Expand Down
5 changes: 5 additions & 0 deletions memsql-plugin/widgets/Memsql-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
"widget-type": "textbox",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
}
]
},
Expand Down
5 changes: 5 additions & 0 deletions mssql-plugin/widgets/SqlServer-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@
"widget-type": "textbox",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
}
]
},
Expand Down
5 changes: 5 additions & 0 deletions mysql-plugin/widgets/Mysql-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@
"widget-type": "textbox",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "hidden",
"label": "Schema Name",
"name": "dbSchemaName"
}
]
},
Expand Down
5 changes: 5 additions & 0 deletions oracle-plugin/widgets/Oracle-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@
"widget-type": "textbox",
"label": "Table Name",
"name": "tableName"
},
{
"widget-type": "textbox",
"label": "Schema Name",
"name": "dbSchemaName"
}
]
},
Expand Down