Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public String getEscapedTableName() {
return ESCAPE_CHAR + tableName + ESCAPE_CHAR;
}

@Override
public String getEscapedDbSchemaName() {
return ESCAPE_CHAR + dbSchemaName + ESCAPE_CHAR;
}

@Override
public Map<String, String> getDBSpecificArguments() {
if (connectionTimeout != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ public String getTransactionIsolationLevel() {
public String getEscapedTableName() {
return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR;
}

@Override
public String getEscapedDbSchemaName() {
return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR;
}

@Override
public Map<String, String> getDBSpecificArguments() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ public abstract class AbstractDBSpecificSinkConfig extends PluginConfig implemen
@Macro
@Nullable
private String dbSchemaName;

@Name(OPERATION_NAME)
@Description("Operation for the query to perform. By default, the query performs INSERT operation")
@Macro
@Nullable
protected String operationName;

@Name(RELATION_TABLE_KEY)
@Macro
@Nullable
Expand Down Expand Up @@ -89,6 +91,10 @@ public String getEscapedTableName() {
return tableName;
}

public String getEscapedDbSchemaName() {
return dbSchemaName;
}

@Override
public boolean canConnect() {
return !containsMacro(TABLE_NAME) && getConnection().canConnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ public interface DatabaseSinkConfig extends DatabaseConnectionConfig {
*/
String getEscapedTableName();

/**
* Adds escape characters (back quotes, double quotes, etc.) to the database schema name for
* databases with case-sensitive identifiers.
*
* @return dBSchemaName with leading and trailing escape characters appended.
* Default implementation returns unchanged table name string.
*/
String getEscapedDbSchemaName();

/**
* Validate the sink config
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void prepareRun(BatchSinkContext context) {
configAccessor.getConfiguration().set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClass.getName());
configAccessor.getConfiguration().set(DBConfiguration.URL_PROPERTY, connectionString);
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, fullyQualifiedTableName);
configAccessor.getConfiguration().set(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, dbColumns);
configAccessor.setOperationName(dbSinkConfig.getOperationName());
Expand Down Expand Up @@ -267,7 +267,7 @@ private Schema inferSchema(Class<? extends Driver> driverClass) {
List<Schema.Field> inferredFields = new ArrayList<>();
String dbSchemaName = dbSinkConfig.getDBSchemaName();
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();
try {
DBUtils.ensureJDBCDriverIsAvailable(driverClass, dbSinkConfig.getConnectionString(),
dbSinkConfig.getJdbcPluginName());
Expand Down Expand Up @@ -318,7 +318,7 @@ private void setResultSetMetadata() throws Exception {
String connectionString = dbSinkConfig.getConnectionString();
String dbSchemaName = dbSinkConfig.getDBSchemaName();
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();

driverCleanup = DBUtils
.ensureJDBCDriverIsAvailable(driverClass, connectionString, dbSinkConfig.getJdbcPluginName());
Expand Down Expand Up @@ -381,7 +381,7 @@ private void validateSchema(FailureCollector collector, Class<? extends Driver>
Schema inputSchema, String dbSchemaName) {
String connectionString = dbSinkConfig.getConnectionString();
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
: dbSchemaName + "." + dbSinkConfig.getEscapedTableName();
: dbSinkConfig.getEscapedDbSchemaName() + "." + dbSinkConfig.getEscapedTableName();
try {
DBUtils.ensureJDBCDriverIsAvailable(jdbcDriverClass, connectionString, dbSinkConfig.getJdbcPluginName());
} catch (IllegalAccessException | InstantiationException | SQLException e) {
Expand Down Expand Up @@ -467,12 +467,14 @@ public abstract static class DBSinkConfig extends DBConfig implements DatabaseSi
@Description("Name of the database schema of table.")
@Macro
@Nullable
private String dbSchemaName;
public String dbSchemaName;

@Name(OPERATION_NAME)
@Description("Operation for the query to perform. By default, the query performs INSERT operation")
@Macro
@Nullable
protected String operationName;

@Name(RELATION_TABLE_KEY)
@Macro
@Nullable
Expand All @@ -486,6 +488,7 @@ public String getTableName() {
public String getDBSchemaName() {
return dbSchemaName;
}

@Override
public Operation getOperationName() {
return Strings.isNullOrEmpty(operationName) ? Operation.INSERT : Operation.valueOf(operationName.toUpperCase());
Expand All @@ -506,6 +509,17 @@ public String getEscapedTableName() {
return tableName;
}

/**
* Adds escape characters (back quotes, double quotes, etc.) to the database schema name for
* databases with case-sensitive identifiers.
*
* @return dbschemaName with leading and trailing escape characters appended.
* Default implementation returns unchanged table name string.
*/
public String getEscapedDbSchemaName() {
return dbSchemaName;
}

public boolean canConnect() {
return (!containsMacro(ConnectionConfig.HOST) && !containsMacro(ConnectionConfig.PORT) &&
!containsMacro(ConnectionConfig.DATABASE) && !containsMacro(TABLE_NAME) && !containsMacro(USER) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public String getEscapedTableName() {
return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR;
}

@Override
public String getEscapedDbSchemaName() {
return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR;
}

@Override
protected OracleConnectorConfig getConnection() {
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public String getEscapedTableName() {
return ESCAPE_CHAR + getTableName() + ESCAPE_CHAR;
}

@Override
public String getEscapedDbSchemaName() {
return ESCAPE_CHAR + getDBSchemaName() + ESCAPE_CHAR;
}

@Override
public Map<String, String> getDBSpecificArguments() {
return ImmutableMap.of(PostgresConstants.CONNECTION_TIMEOUT, String.valueOf(connectionTimeout));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,10 @@ public String getEscapedTableName() {
return ESCAPE_CHAR + tableName + ESCAPE_CHAR;
}

@Override
public String getEscapedDbSchemaName() {
return ESCAPE_CHAR + dbSchemaName + ESCAPE_CHAR;
}

}
}