Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20026][Connector][Jdbc] Jdbc connector support regular expression #14535

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,6 +26,7 @@

import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -34,13 +35,15 @@ public class JdbcOptions extends JdbcConnectionOptions {

private static final long serialVersionUID = 1L;

private String tableName;
private @Nullable String tableName;
private @Nullable Pattern tablePattern;
private JdbcDialect dialect;
private final @Nullable Integer parallelism;

private JdbcOptions(
String dbURL,
String tableName,
Pattern tablePattern,
String driverName,
String username,
String password,
Expand All @@ -49,6 +52,7 @@ private JdbcOptions(
int connectionCheckTimeoutSeconds) {
super(dbURL, driverName, username, password, connectionCheckTimeoutSeconds);
this.tableName = tableName;
this.tablePattern = tablePattern;
this.dialect = dialect;
this.parallelism = parallelism;
}
Expand All @@ -57,6 +61,10 @@ public String getTableName() {
return tableName;
}

public Pattern getTablePattern() {
return tablePattern;
}

public JdbcDialect getDialect() {
return dialect;
}
Expand All @@ -75,6 +83,7 @@ public boolean equals(Object o) {
JdbcOptions options = (JdbcOptions) o;
return Objects.equals(url, options.url)
&& Objects.equals(tableName, options.tableName)
&& Objects.equals(String.valueOf(tablePattern), String.valueOf(options.tablePattern))
&& Objects.equals(driverName, options.driverName)
&& Objects.equals(username, options.username)
&& Objects.equals(password, options.password)
Expand All @@ -93,6 +102,7 @@ public int hashCode() {
return Objects.hash(
url,
tableName,
String.valueOf(tablePattern),
driverName,
username,
password,
Expand All @@ -105,19 +115,26 @@ public int hashCode() {
public static class Builder {
private String dbURL;
private String tableName;
private Pattern tablePattern;
private String driverName;
private String username;
private String password;
private JdbcDialect dialect;
private Integer parallelism;
private int connectionCheckTimeoutSeconds = 60;

/** required, table name. */
/** optional, table name, Either 'table-name' or 'table-pattern' must be set. */
public Builder setTableName(String tableName) {
this.tableName = tableName;
return this;
}

/** optional, table pattern, Either 'table-name' or 'table-pattern' must be set. */
public Builder setTablePattern(Pattern tablePattern) {
this.tablePattern = tablePattern;
return this;
}

/** optional, user name. */
public Builder setUsername(String username) {
this.username = username;
Expand Down Expand Up @@ -167,7 +184,6 @@ public Builder setParallelism(Integer parallelism) {

public JdbcOptions build() {
checkNotNull(dbURL, "No dbURL supplied.");
checkNotNull(tableName, "No tableName supplied.");
if (this.dialect == null) {
Optional<JdbcDialect> optional = JdbcDialects.get(dbURL);
this.dialect =
Expand All @@ -189,6 +205,7 @@ public JdbcOptions build() {
return new JdbcOptions(
dbURL,
tableName,
tablePattern,
driverName,
username,
password,
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcReadOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
Expand All @@ -43,6 +44,7 @@
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkState;

Expand All @@ -63,7 +65,13 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("the jdbc table name.");
.withDescription("the jdbc table name. Either 'table-name' or 'table-pattern' must be set.");
public static final ConfigOption<String> TABLE_PATTERN =
ConfigOptions.key("table-pattern")
.stringType()
.noDefaultValue()
.withDescription(
"Optional table pattern from which the table is read for source. Either 'table-name' or 'table-pattern' must be set.");
public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
Expand Down Expand Up @@ -173,6 +181,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {

helper.validate();
validateConfigOptions(config);
validateSinkAndLookUpTable(config);
JdbcOptions jdbcOptions = getJdbcOptions(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
Expand All @@ -192,6 +201,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {

helper.validate();
validateConfigOptions(config);
validateSourceTable(config);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new JdbcDynamicTableSource(
Expand All @@ -207,6 +217,7 @@ private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
JdbcOptions.builder()
.setDBUrl(url)
.setTableName(readableConfig.get(TABLE_NAME))
.setTablePattern(readableConfig.getOptional(TABLE_PATTERN).map(Pattern::compile).orElse(null))
.setDialect(JdbcDialects.get(url).get())
.setParallelism(
readableConfig
Expand Down Expand Up @@ -274,7 +285,6 @@ public String factoryIdentifier() {
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(URL);
requiredOptions.add(TABLE_NAME);
return requiredOptions;
}

Expand All @@ -284,6 +294,8 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(DRIVER);
optionalOptions.add(USERNAME);
optionalOptions.add(PASSWORD);
optionalOptions.add(TABLE_NAME);
optionalOptions.add(TABLE_PATTERN);
optionalOptions.add(SCAN_PARTITION_COLUMN);
optionalOptions.add(SCAN_PARTITION_LOWER_BOUND);
optionalOptions.add(SCAN_PARTITION_UPPER_BOUND);
Expand Down Expand Up @@ -374,4 +386,30 @@ private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptio
"Either all or none of the following options should be provided:\n"
+ String.join("\n", propertyNames));
}

public static void validateSourceTable(ReadableConfig config) {
Optional<String> table = config.getOptional(TABLE_NAME);
Optional<String> pattern = config.getOptional(TABLE_PATTERN);

if (table.isPresent() && pattern.isPresent()) {
throw new ValidationException(
"Option 'table-name' and 'table-pattern' shouldn't be set together.");
}

if (!table.isPresent() && !pattern.isPresent()) {
throw new ValidationException("Either 'table-name' or 'table-pattern' must be set.");
}
}

public static void validateSinkAndLookUpTable(ReadableConfig config) {
String errorMessageTemp =
"Flink Jdbc sink currently only supports table name, but got %s: %s.";
if (config.getOptional(TABLE_PATTERN).isPresent()) {
throw new ValidationException(
String.format(
errorMessageTemp,
"'table-pattern'",
config.get(TABLE_PATTERN)));
}
}
}