Skip to content

Commit

Permalink
[minor][common] Improve the changelogMode expression (#1793)
Browse files Browse the repository at this point in the history
  • Loading branch information
wuzhenhua01 committed Dec 5, 2022
1 parent 8bf6160 commit 8f920ac
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

import com.ververica.cdc.connectors.db2.Db2Source;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
Expand Down Expand Up @@ -93,12 +92,7 @@ public Db2TableSource(

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.UPDATE_BEFORE)
.build();
return ChangelogMode.all();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
Expand Down Expand Up @@ -147,12 +146,7 @@ public MySqlTableSource(

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
return ChangelogMode.all();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema;
Expand Down Expand Up @@ -124,12 +123,7 @@ public OceanBaseTableSource(

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
return ChangelogMode.all();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
Expand Down Expand Up @@ -138,12 +137,7 @@ public OracleTableSource(

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
return ChangelogMode.all();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
Expand Down Expand Up @@ -102,12 +101,7 @@ public SqlServerTableSource(

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
return ChangelogMode.all();
}

@Override
Expand Down

0 comments on commit 8f920ac

Please sign in to comment.