Skip to content

Commit

Permalink
[FLINK-34639]Support DebeziumDeserializationSchema in OceanBase sourc…
Browse files Browse the repository at this point in the history
…e connector
  • Loading branch information
whhe committed Mar 12, 2024
1 parent 26eb6a9 commit 2787a4c
Show file tree
Hide file tree
Showing 30 changed files with 2,212 additions and 1,574 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ limitations under the License.
<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Debezium dependencies -->
<dependency>
<groupId>com.ververica</groupId>
Expand All @@ -47,12 +54,6 @@ limitations under the License.
</exclusions>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${project.version}</version>
</dependency>

<!-- OceanBase Log Client -->
<dependency>
<groupId>com.oceanbase</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.flink.cdc.connectors.oceanbase;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction;
import org.apache.flink.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import org.apache.flink.cdc.connectors.oceanbase.table.StartupMode;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import com.oceanbase.clogproxy.client.config.ClientConf;
Expand All @@ -47,7 +47,7 @@ public static <T> Builder<T> builder() {
public static class Builder<T> {

// common config
private StartupMode startupMode;
private StartupOptions startupOptions;
private String username;
private String password;
private String tenantName;
Expand All @@ -73,11 +73,12 @@ public static class Builder<T> {
private String configUrl;
private String workingMode;
private Properties obcdcProperties;
private Properties debeziumProperties;

private OceanBaseDeserializationSchema<T> deserializer;
private DebeziumDeserializationSchema<T> deserializer;

public Builder<T> startupMode(StartupMode startupMode) {
this.startupMode = startupMode;
public Builder<T> startupOptions(StartupOptions startupOptions) {
this.startupOptions = startupOptions;
return this;
}

Expand Down Expand Up @@ -151,7 +152,7 @@ public Builder<T> logProxyHost(String logProxyHost) {
return this;
}

public Builder<T> logProxyPort(int logProxyPort) {
public Builder<T> logProxyPort(Integer logProxyPort) {
this.logProxyPort = logProxyPort;
return this;
}
Expand Down Expand Up @@ -186,23 +187,44 @@ public Builder<T> obcdcProperties(Properties obcdcProperties) {
return this;
}

public Builder<T> deserializer(OceanBaseDeserializationSchema<T> deserializer) {
public Builder<T> debeziumProperties(Properties debeziumProperties) {
this.debeziumProperties = debeziumProperties;
return this;
}

public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
this.deserializer = deserializer;
return this;
}

public SourceFunction<T> build() {
switch (startupMode) {
case INITIAL:
checkNotNull(hostname, "hostname shouldn't be null on startup mode 'initial'");
checkNotNull(port, "port shouldn't be null on startup mode 'initial'");
checkNotNull(
compatibleMode,
"compatibleMode shouldn't be null on startup mode 'initial'");
checkNotNull(
jdbcDriver, "jdbcDriver shouldn't be null on startup mode 'initial'");
startupTimestamp = 0L;
checkNotNull(username, "username shouldn't be null");
checkNotNull(password, "password shouldn't be null");
checkNotNull(hostname, "hostname shouldn't be null");
checkNotNull(port, "port shouldn't be null");

if (startupOptions == null) {
startupOptions = StartupOptions.initial();
}
if (compatibleMode == null) {
compatibleMode = "mysql";
}
if (jdbcDriver == null) {
jdbcDriver = "com.mysql.jdbc.Driver";
}

if (connectTimeout == null) {
connectTimeout = Duration.ofSeconds(30);
}

if (serverTimeZone == null) {
serverTimeZone = "+00:00";
}

switch (startupOptions.startupMode) {
case SNAPSHOT:
break;
case INITIAL:
case LATEST_OFFSET:
startupTimestamp = 0L;
break;
Expand All @@ -213,15 +235,9 @@ public SourceFunction<T> build() {
break;
default:
throw new UnsupportedOperationException(
startupMode + " mode is not supported.");
startupOptions.startupMode + " mode is not supported.");
}

if (!startupMode.equals(StartupMode.INITIAL)
&& (StringUtils.isNotEmpty(databaseName)
|| StringUtils.isNotEmpty(tableName))) {
throw new IllegalArgumentException(
"If startup mode is not 'INITIAL', 'database-name' and 'table-name' must not be configured");
}
if (StringUtils.isNotEmpty(databaseName) || StringUtils.isNotEmpty(tableName)) {
if (StringUtils.isEmpty(databaseName) || StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException(
Expand All @@ -233,57 +249,61 @@ public SourceFunction<T> build() {
"'database-name', 'table-name' or 'table-list' should be configured");
}

if (serverTimeZone == null) {
serverTimeZone = "+00:00";
}
ClientConf clientConf = null;
ObReaderConfig obReaderConfig = null;

if (connectTimeout == null) {
connectTimeout = Duration.ofSeconds(30);
}
if (!startupOptions.isSnapshotOnly()) {

if (logProxyClientId == null) {
logProxyClientId =
String.format(
"%s_%s_%s",
ClientIdGenerator.generate(),
Thread.currentThread().getId(),
checkNotNull(tenantName));
}
ClientConf clientConf =
ClientConf.builder()
.clientId(logProxyClientId)
.connectTimeoutMs((int) connectTimeout.toMillis())
.build();

ObReaderConfig obReaderConfig = new ObReaderConfig();
if (StringUtils.isNotEmpty(rsList)) {
obReaderConfig.setRsList(rsList);
}
if (StringUtils.isNotEmpty(configUrl)) {
obReaderConfig.setClusterUrl(configUrl);
}
if (StringUtils.isNotEmpty(workingMode)) {
obReaderConfig.setWorkingMode(workingMode);
}
obReaderConfig.setUsername(username);
obReaderConfig.setPassword(password);
obReaderConfig.setStartTimestamp(startupTimestamp);
obReaderConfig.setTimezone(serverTimeZone);

if (obcdcProperties != null && !obcdcProperties.isEmpty()) {
Map<String, String> extraConfigs = new HashMap<>();
obcdcProperties.forEach((k, v) -> extraConfigs.put(k.toString(), v.toString()));
obReaderConfig.setExtraConfigs(extraConfigs);
checkNotNull(logProxyHost);
checkNotNull(logProxyPort);
checkNotNull(tenantName);

if (logProxyClientId == null) {
logProxyClientId =
String.format(
"%s_%s_%s",
ClientIdGenerator.generate(),
Thread.currentThread().getId(),
tenantName);
}
clientConf =
ClientConf.builder()
.clientId(logProxyClientId)
.connectTimeoutMs((int) connectTimeout.toMillis())
.build();

obReaderConfig = new ObReaderConfig();
if (StringUtils.isNotEmpty(rsList)) {
obReaderConfig.setRsList(rsList);
}
if (StringUtils.isNotEmpty(configUrl)) {
obReaderConfig.setClusterUrl(configUrl);
}
if (StringUtils.isNotEmpty(workingMode)) {
obReaderConfig.setWorkingMode(workingMode);
}
obReaderConfig.setTableWhiteList(tenantName + ".*.*");
obReaderConfig.setUsername(username);
obReaderConfig.setPassword(password);
obReaderConfig.setStartTimestamp(startupTimestamp);
obReaderConfig.setTimezone(serverTimeZone);

if (obcdcProperties != null && !obcdcProperties.isEmpty()) {
Map<String, String> extraConfigs = new HashMap<>();
obcdcProperties.forEach((k, v) -> extraConfigs.put(k.toString(), v.toString()));
obReaderConfig.setExtraConfigs(extraConfigs);
}
}

return new OceanBaseRichSourceFunction<>(
StartupMode.INITIAL.equals(startupMode),
startupOptions,
username,
password,
tenantName,
databaseName,
tableName,
tableList,
serverTimeZone,
connectTimeout,
hostname,
port,
Expand All @@ -294,6 +314,7 @@ public SourceFunction<T> build() {
logProxyPort,
clientConf,
obReaderConfig,
debeziumProperties,
deserializer);
}
}
Expand Down
Loading

0 comments on commit 2787a4c

Please sign in to comment.