From 8f920ac5529b7261d417f6e15b186a7d166436b3 Mon Sep 17 00:00:00 2001 From: wuzhenhua <102498303+wuzhenhua01@users.noreply.github.com> Date: Mon, 5 Dec 2022 21:50:24 +0800 Subject: [PATCH] [minor][common] Improve the changelogMode expression (#1793) --- .../cdc/connectors/db2/table/Db2TableSource.java | 8 +------- .../cdc/connectors/mysql/table/MySqlTableSource.java | 8 +------- .../connectors/oceanbase/table/OceanBaseTableSource.java | 8 +------- .../cdc/connectors/oracle/table/OracleTableSource.java | 8 +------- .../connectors/sqlserver/table/SqlServerTableSource.java | 8 +------- 5 files changed, 5 insertions(+), 35 deletions(-) diff --git a/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSource.java b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSource.java index c47122d34a..733e222b1f 100644 --- a/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSource.java +++ b/flink-connector-db2-cdc/src/main/java/com/ververica/cdc/connectors/db2/table/Db2TableSource.java @@ -26,7 +26,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.RowKind; import com.ververica.cdc.connectors.db2.Db2Source; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; @@ -93,12 +92,7 @@ public Db2TableSource( @Override public ChangelogMode getChangelogMode() { - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.DELETE) - .addContainedKind(RowKind.UPDATE_AFTER) - .addContainedKind(RowKind.UPDATE_BEFORE) - .build(); + return ChangelogMode.all(); } @Override diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java index d5ef447e2c..7736e9285e 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java @@ -27,7 +27,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.RowKind; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; @@ -147,12 +146,7 @@ public MySqlTableSource( @Override public ChangelogMode getChangelogMode() { - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.UPDATE_BEFORE) - .addContainedKind(RowKind.UPDATE_AFTER) - .addContainedKind(RowKind.DELETE) - .build(); + return ChangelogMode.all(); } @Override diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java index 32beb046bc..ac8a23c9f5 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java @@ -26,7 +26,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.RowKind; import com.ververica.cdc.connectors.oceanbase.OceanBaseSource; import com.ververica.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema; @@ -124,12 +123,7 @@ public OceanBaseTableSource( @Override public ChangelogMode getChangelogMode() { - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.UPDATE_BEFORE) - .addContainedKind(RowKind.UPDATE_AFTER) - .addContainedKind(RowKind.DELETE) - .build(); + return ChangelogMode.all(); } @Override diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java index f216a97b86..0f5ccfe3a1 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java @@ -27,7 +27,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.RowKind; import com.ververica.cdc.connectors.base.options.StartupOptions; import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; @@ -138,12 +137,7 @@ public OracleTableSource( @Override public ChangelogMode getChangelogMode() { - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.UPDATE_BEFORE) - .addContainedKind(RowKind.UPDATE_AFTER) - .addContainedKind(RowKind.DELETE) - .build(); + return ChangelogMode.all(); } @Override diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableSource.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableSource.java index 37f0b189d4..ccc876e1fc 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableSource.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableSource.java @@ -26,7 +26,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.RowKind; import com.ververica.cdc.connectors.sqlserver.SqlServerSource; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; @@ -102,12 +101,7 @@ public SqlServerTableSource( @Override public ChangelogMode getChangelogMode() { - return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.UPDATE_BEFORE) - .addContainedKind(RowKind.UPDATE_AFTER) - .addContainedKind(RowKind.DELETE) - .build(); + return ChangelogMode.all(); } @Override