Skip to content

Commit ae4240c

Browse files
authored
[Feature][Oracle-CDC] Support custom table primary key (#6216)
1 parent a9bc5ca commit ae4240c

File tree

7 files changed

+186
-7
lines changed

7 files changed

+186
-7
lines changed

docs/en/connector-v2/source/Oracle-CDC.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ exit;
211211
| database-names | List | No | - | Database name of the database to monitor. |
212212
| schema-names | List | No | - | Schema name of the database to monitor. |
213213
| table-names | List | Yes | - | Table name of the database to monitor. The table name needs to include the database name, for example: `database_name.table_name` |
214+
| table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}] |
214215
| startup.mode | Enum | No | INITIAL | Optional startup mode for Oracle CDC consumer, valid enumerations are `initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize historical data at startup, and then synchronize incremental data.<br/> `earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup from the latest offset.<br/> `specific`: Startup from user-supplied specific offsets. |
215216
| startup.specific-offset.file | String | No | - | Start from the specified binlog file name. **Note, This option is required when the `startup.mode` option used `specific`.** |
216217
| startup.specific-offset.pos | Long | No | - | Start from the specified binlog file position. **Note, This option is required when the `startup.mode` option used `specific`.** |
@@ -254,6 +255,31 @@ source {
254255
}
255256
```
256257

258+
### Support custom primary key for table
259+
260+
```
261+
262+
source {
263+
Oracle-CDC {
264+
result_table_name = "customers"
265+
base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
266+
source.reader.close.timeout = 120000
267+
username = "system"
268+
password = "oracle"
269+
database-names = ["XE"]
270+
schema-names = ["DEBEZIUM"]
271+
table-names = ["XE.DEBEZIUM.FULL_TYPES"]
272+
table-names-config = [
273+
{
274+
table = "XE.DEBEZIUM.FULL_TYPES"
275+
primaryKeys = ["ID"]
276+
}
277+
]
278+
}
279+
}
280+
281+
```
282+
257283
### Support debezium-compatible format send to kafka
258284

259285
> Must be used with kafka connector sink, see [compatible debezium format](../formats/cdc-compatible-debezium-json.md) for details

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleDialect.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source;
1919

20+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
21+
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
22+
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
2023
import org.apache.seatunnel.common.utils.SeaTunnelException;
2124
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
2225
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
2326
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
2427
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
2528
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
2629
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
30+
import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
2731
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfig;
2832
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.config.OracleSourceConfigFactory;
2933
import org.apache.seatunnel.connectors.seatunnel.cdc.oracle.source.eumerator.OracleChunkSplitter;
@@ -40,6 +44,8 @@
4044

4145
import java.sql.SQLException;
4246
import java.util.List;
47+
import java.util.Map;
48+
import java.util.Optional;
4349

4450
import static org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleConnectionUtils.createOracleConnection;
4551

@@ -49,10 +55,13 @@ public class OracleDialect implements JdbcDataSourceDialect {
4955
private final OracleSourceConfigFactory configFactory;
5056
private final OracleSourceConfig sourceConfig;
5157
private transient OracleSchema oracleSchema;
58+
private final Map<TableId, CatalogTable> tableMap;
5259

53-
public OracleDialect(OracleSourceConfigFactory configFactory) {
60+
public OracleDialect(
61+
OracleSourceConfigFactory configFactory, List<CatalogTable> catalogTables) {
5462
this.configFactory = configFactory;
5563
this.sourceConfig = configFactory.create(0);
64+
this.tableMap = CatalogTableUtils.convertTables(catalogTables);
5665
}
5766

5867
@Override
@@ -102,7 +111,7 @@ public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
102111
@Override
103112
public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
104113
if (oracleSchema == null) {
105-
oracleSchema = new OracleSchema(sourceConfig.getDbzConnectorConfig());
114+
oracleSchema = new OracleSchema(sourceConfig.getDbzConnectorConfig(), tableMap);
106115
}
107116
return oracleSchema.getTableSchema(jdbc, tableId);
108117
}
@@ -121,4 +130,14 @@ public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBas
121130
return new OracleRedoLogFetchTask(sourceSplitBase.asIncrementalSplit());
122131
}
123132
}
133+
134+
@Override
135+
public Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, TableId tableId) {
136+
return Optional.ofNullable(tableMap.get(tableId).getTableSchema().getPrimaryKey());
137+
}
138+
139+
@Override
140+
public List<ConstraintKey> getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId) {
141+
return tableMap.get(tableId).getTableSchema().getConstraintKeys();
142+
}
124143
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(
121121

122122
@Override
123123
public DataSourceDialect<JdbcSourceConfig> createDataSourceDialect(ReadonlyConfig config) {
124-
return new OracleDialect((OracleSourceConfigFactory) configFactory);
124+
return new OracleDialect((OracleSourceConfigFactory) configFactory, catalogTables);
125125
}
126126

127127
@Override
@@ -132,7 +132,8 @@ public OffsetFactory createOffsetFactory(ReadonlyConfig config) {
132132

133133
private Map<TableId, Struct> tableChanges() {
134134
JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
135-
OracleDialect dialect = new OracleDialect((OracleSourceConfigFactory) configFactory);
135+
OracleDialect dialect =
136+
new OracleDialect((OracleSourceConfigFactory) configFactory, catalogTables);
136137
List<TableId> discoverTables = dialect.discoverDataCollections(jdbcSourceConfig);
137138
ConnectTableChangeSerializer connectTableChangeSerializer =
138139
new ConnectTableChangeSerializer();

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceFactory.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,26 @@
2323
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
2424
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2525
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
26+
import org.apache.seatunnel.api.table.catalog.TablePath;
2627
import org.apache.seatunnel.api.table.connector.TableSource;
2728
import org.apache.seatunnel.api.table.factory.Factory;
2829
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
2930
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
3031
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3132
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
33+
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceTableConfig;
3234
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
3335
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
3436
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
3537
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
38+
import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
3639
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
3740

3841
import com.google.auto.service.AutoService;
3942

4043
import java.io.Serializable;
4144
import java.util.List;
45+
import java.util.Optional;
4246

4347
@AutoService(Factory.class)
4448
public class OracleIncrementalSourceFactory implements TableSourceFactory {
@@ -65,7 +69,8 @@ public OptionRule optionRule() {
6569
JdbcSourceOptions.CONNECTION_POOL_SIZE,
6670
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
6771
JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
68-
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD)
72+
JdbcSourceOptions.SAMPLE_SHARDING_THRESHOLD,
73+
JdbcSourceOptions.TABLE_NAMES_CONFIG)
6974
.optional(OracleSourceOptions.STARTUP_MODE, OracleSourceOptions.STOP_MODE)
7075
.conditional(
7176
OracleSourceOptions.STARTUP_MODE,
@@ -102,6 +107,13 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
102107
List<CatalogTable> catalogTables =
103108
CatalogTableUtil.getCatalogTables(
104109
context.getOptions(), context.getClassLoader());
110+
Optional<List<JdbcSourceTableConfig>> tableConfigs =
111+
context.getOptions().getOptional(JdbcSourceOptions.TABLE_NAMES_CONFIG);
112+
if (tableConfigs.isPresent()) {
113+
catalogTables =
114+
CatalogTableUtils.mergeCatalogTableConfig(
115+
catalogTables, tableConfigs.get(), s -> TablePath.of(s, true));
116+
}
105117
SeaTunnelDataType<SeaTunnelRow> dataType =
106118
CatalogTableUtil.convertToMultipleRowType(catalogTables);
107119
return new OracleIncrementalSource(context.getOptions(), dataType, catalogTables);

seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils;
1919

20+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2021
import org.apache.seatunnel.common.utils.SeaTunnelException;
22+
import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
2123

2224
import io.debezium.connector.oracle.OracleConnection;
2325
import io.debezium.connector.oracle.OracleConnectorConfig;
@@ -37,10 +39,13 @@ public class OracleSchema {
3739

3840
private final OracleConnectorConfig connectorConfig;
3941
private final Map<TableId, TableChange> schemasByTableId;
42+
private final Map<TableId, CatalogTable> tableMap;
4043

41-
public OracleSchema(OracleConnectorConfig connectorConfig) {
44+
public OracleSchema(
45+
OracleConnectorConfig connectorConfig, Map<TableId, CatalogTable> tableMap) {
4246
this.connectorConfig = connectorConfig;
4347
this.schemasByTableId = new HashMap<>();
48+
this.tableMap = tableMap;
4449
}
4550

4651
/**
@@ -71,7 +76,9 @@ private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
7176
null,
7277
false);
7378

74-
Table table = tables.forTable(tableId);
79+
Table table =
80+
CatalogTableUtils.mergeCatalogTableConfig(
81+
tables.forTable(tableId), tableMap.get(tableId));
7582
TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table);
7683
tableChangeMap.put(tableId, tableChange);
7784
} catch (SQLException e) {

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,52 @@ public void testOracleCdcCheckDataWithNoPrimaryKey(TestContainer container) thro
221221
});
222222
}
223223

224+
@TestTemplate
225+
public void testOracleCdcCheckDataWithCustomPrimaryKey(TestContainer container)
226+
throws Exception {
227+
228+
clearTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
229+
clearTable(DATABASE, SINK_TABLE1);
230+
231+
insertSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
232+
233+
CompletableFuture.supplyAsync(
234+
() -> {
235+
try {
236+
container.executeJob("/oraclecdc_to_oracle_with_custom_primary_key.conf");
237+
} catch (Exception e) {
238+
log.error("Commit task exception :" + e.getMessage());
239+
throw new RuntimeException(e);
240+
}
241+
return null;
242+
});
243+
244+
// snapshot stage
245+
await().atMost(600000, TimeUnit.MILLISECONDS)
246+
.untilAsserted(
247+
() -> {
248+
Assertions.assertIterableEquals(
249+
querySql(
250+
getSourceQuerySQL(
251+
DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY)),
252+
querySql(getSourceQuerySQL(DATABASE, SINK_TABLE1)));
253+
});
254+
255+
// insert update delete
256+
updateSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY);
257+
258+
// stream stage
259+
await().atMost(600000, TimeUnit.MILLISECONDS)
260+
.untilAsserted(
261+
() -> {
262+
Assertions.assertIterableEquals(
263+
querySql(
264+
getSourceQuerySQL(
265+
DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY)),
266+
querySql(getSourceQuerySQL(DATABASE, SINK_TABLE1)));
267+
});
268+
}
269+
224270
@TestTemplate
225271
@DisabledOnContainer(
226272
value = {},
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
# You can set engine configuration here
23+
execution.parallelism = 1
24+
job.mode = "STREAMING"
25+
checkpoint.interval = 5000
26+
}
27+
28+
source {
29+
# This is a example source plugin **only for test and demonstrate the feature source plugin**
30+
Oracle-CDC {
31+
result_table_name = "customers"
32+
username = "system"
33+
password = "oracle"
34+
database-names = ["XE"]
35+
schema-names = ["DEBEZIUM"]
36+
base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
37+
source.reader.close.timeout = 120000
38+
connection.pool.size = 1
39+
debezium {
40+
# log.mining.strategy = "online_catalog"
41+
# log.mining.continuous.mine = true
42+
database.oracle.jdbc.timezoneAsRegion = "false"
43+
}
44+
45+
table-names = ["XE.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"]
46+
table-names-config = [
47+
{
48+
table = "XE.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"
49+
primaryKeys = ["ID"]
50+
}
51+
]
52+
}
53+
}
54+
55+
sink {
56+
Jdbc {
57+
source_table_name = "customers"
58+
driver = "oracle.jdbc.driver.OracleDriver"
59+
url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
60+
user = "system"
61+
password = "oracle"
62+
generate_sink_sql = true
63+
database = "XE"
64+
table = "DEBEZIUM.SINK_FULL_TYPES"
65+
batch_size = 1
66+
primary_keys = ["ID"]
67+
}
68+
}

0 commit comments

Comments
 (0)