Skip to content

Commit

Permalink
[FLINK-34639] Support DebeziumDeserializationSchema in OceanBase sour…
Browse files Browse the repository at this point in the history
…ce connector
  • Loading branch information
whhe committed Apr 1, 2024
1 parent d099603 commit 8bd6539
Show file tree
Hide file tree
Showing 34 changed files with 2,581 additions and 1,748 deletions.
242 changes: 142 additions & 100 deletions docs/content.zh/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md

Large diffs are not rendered by default.

184 changes: 108 additions & 76 deletions docs/content/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ limitations under the License.
<packaging>jar</packaging>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Debezium dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -47,12 +54,6 @@ limitations under the License.
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${project.version}</version>
</dependency>

<!-- OceanBase Log Client -->
<dependency>
<groupId>com.oceanbase</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package org.apache.flink.cdc.connectors.oceanbase;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction;
import org.apache.flink.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import org.apache.flink.cdc.connectors.oceanbase.table.StartupMode;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.config.ObReaderConfig;
import com.oceanbase.clogproxy.client.util.ClientIdGenerator;
import com.oceanbase.clogproxy.client.util.ClientUtil;
import org.apache.commons.lang3.StringUtils;

import java.time.Duration;
Expand All @@ -47,7 +47,7 @@ public static <T> Builder<T> builder() {
public static class Builder<T> {

// common config
private StartupMode startupMode;
private StartupOptions startupOptions;
private String username;
private String password;
private String tenantName;
Expand All @@ -73,11 +73,12 @@ public static class Builder<T> {
private String configUrl;
private String workingMode;
private Properties obcdcProperties;
private Properties debeziumProperties;

private OceanBaseDeserializationSchema<T> deserializer;
private DebeziumDeserializationSchema<T> deserializer;

public Builder<T> startupMode(StartupMode startupMode) {
this.startupMode = startupMode;
public Builder<T> startupOptions(StartupOptions startupOptions) {
this.startupOptions = startupOptions;
return this;
}

Expand Down Expand Up @@ -151,7 +152,7 @@ public Builder<T> logProxyHost(String logProxyHost) {
return this;
}

public Builder<T> logProxyPort(int logProxyPort) {
public Builder<T> logProxyPort(Integer logProxyPort) {
this.logProxyPort = logProxyPort;
return this;
}
Expand Down Expand Up @@ -186,23 +187,44 @@ public Builder<T> obcdcProperties(Properties obcdcProperties) {
return this;
}

public Builder<T> deserializer(OceanBaseDeserializationSchema<T> deserializer) {
public Builder<T> debeziumProperties(Properties debeziumProperties) {
this.debeziumProperties = debeziumProperties;
return this;
}

public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
this.deserializer = deserializer;
return this;
}

public SourceFunction<T> build() {
switch (startupMode) {
case INITIAL:
checkNotNull(hostname, "hostname shouldn't be null on startup mode 'initial'");
checkNotNull(port, "port shouldn't be null on startup mode 'initial'");
checkNotNull(
compatibleMode,
"compatibleMode shouldn't be null on startup mode 'initial'");
checkNotNull(
jdbcDriver, "jdbcDriver shouldn't be null on startup mode 'initial'");
startupTimestamp = 0L;
checkNotNull(username, "username shouldn't be null");
checkNotNull(password, "password shouldn't be null");
checkNotNull(hostname, "hostname shouldn't be null");
checkNotNull(port, "port shouldn't be null");

if (startupOptions == null) {
startupOptions = StartupOptions.initial();
}
if (compatibleMode == null) {
compatibleMode = "mysql";
}
if (jdbcDriver == null) {
jdbcDriver = "com.mysql.jdbc.Driver";
}

if (connectTimeout == null) {
connectTimeout = Duration.ofSeconds(30);
}

if (serverTimeZone == null) {
serverTimeZone = "+00:00";
}

switch (startupOptions.startupMode) {
case SNAPSHOT:
break;
case INITIAL:
case LATEST_OFFSET:
startupTimestamp = 0L;
break;
Expand All @@ -213,15 +235,9 @@ public SourceFunction<T> build() {
break;
default:
throw new UnsupportedOperationException(
startupMode + " mode is not supported.");
startupOptions.startupMode + " mode is not supported.");
}

if (!startupMode.equals(StartupMode.INITIAL)
&& (StringUtils.isNotEmpty(databaseName)
|| StringUtils.isNotEmpty(tableName))) {
throw new IllegalArgumentException(
"If startup mode is not 'INITIAL', 'database-name' and 'table-name' must not be configured");
}
if (StringUtils.isNotEmpty(databaseName) || StringUtils.isNotEmpty(tableName)) {
if (StringUtils.isEmpty(databaseName) || StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException(
Expand All @@ -233,57 +249,61 @@ public SourceFunction<T> build() {
"'database-name', 'table-name' or 'table-list' should be configured");
}

if (serverTimeZone == null) {
serverTimeZone = "+00:00";
}
ClientConf clientConf = null;
ObReaderConfig obReaderConfig = null;

if (connectTimeout == null) {
connectTimeout = Duration.ofSeconds(30);
}
if (!startupOptions.isSnapshotOnly()) {

if (logProxyClientId == null) {
logProxyClientId =
String.format(
"%s_%s_%s",
ClientIdGenerator.generate(),
Thread.currentThread().getId(),
checkNotNull(tenantName));
}
ClientConf clientConf =
ClientConf.builder()
.clientId(logProxyClientId)
.connectTimeoutMs((int) connectTimeout.toMillis())
.build();

ObReaderConfig obReaderConfig = new ObReaderConfig();
if (StringUtils.isNotEmpty(rsList)) {
obReaderConfig.setRsList(rsList);
}
if (StringUtils.isNotEmpty(configUrl)) {
obReaderConfig.setClusterUrl(configUrl);
}
if (StringUtils.isNotEmpty(workingMode)) {
obReaderConfig.setWorkingMode(workingMode);
}
obReaderConfig.setUsername(username);
obReaderConfig.setPassword(password);
obReaderConfig.setStartTimestamp(startupTimestamp);
obReaderConfig.setTimezone(serverTimeZone);

if (obcdcProperties != null && !obcdcProperties.isEmpty()) {
Map<String, String> extraConfigs = new HashMap<>();
obcdcProperties.forEach((k, v) -> extraConfigs.put(k.toString(), v.toString()));
obReaderConfig.setExtraConfigs(extraConfigs);
checkNotNull(logProxyHost);
checkNotNull(logProxyPort);
checkNotNull(tenantName);

if (logProxyClientId == null) {
logProxyClientId =
String.format(
"%s_%s_%s",
ClientUtil.generateClientId(),
Thread.currentThread().getId(),
tenantName);
}
clientConf =
ClientConf.builder()
.clientId(logProxyClientId)
.maxReconnectTimes(0)
.connectTimeoutMs((int) connectTimeout.toMillis())
.build();

obReaderConfig = new ObReaderConfig();
if (StringUtils.isNotEmpty(rsList)) {
obReaderConfig.setRsList(rsList);
}
if (StringUtils.isNotEmpty(configUrl)) {
obReaderConfig.setClusterUrl(configUrl);
}
if (StringUtils.isNotEmpty(workingMode)) {
obReaderConfig.setWorkingMode(workingMode);
}
obReaderConfig.setUsername(username);
obReaderConfig.setPassword(password);
obReaderConfig.setStartTimestamp(startupTimestamp);
obReaderConfig.setTimezone(serverTimeZone);

if (obcdcProperties != null && !obcdcProperties.isEmpty()) {
Map<String, String> extraConfigs = new HashMap<>();
obcdcProperties.forEach((k, v) -> extraConfigs.put(k.toString(), v.toString()));
obReaderConfig.setExtraConfigs(extraConfigs);
}
}

return new OceanBaseRichSourceFunction<>(
StartupMode.INITIAL.equals(startupMode),
startupOptions,
username,
password,
tenantName,
databaseName,
tableName,
tableList,
serverTimeZone,
connectTimeout,
hostname,
port,
Expand All @@ -294,6 +314,7 @@ public SourceFunction<T> build() {
logProxyPort,
clientConf,
obReaderConfig,
debeziumProperties,
deserializer);
}
}
Expand Down
Loading

0 comments on commit 8bd6539

Please sign in to comment.