Skip to content

Commit aaef22b

Browse files
authored
[Feature][Connector-V2][Oracle-cdc]Support for oracle cdc (#5196)
1 parent e3f6d2f commit aaef22b

File tree

41 files changed

+6430
-5
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+6430
-5
lines changed

docs/en/connector-v2/source/Oracle-CDC.md

Lines changed: 266 additions & 0 deletions
Large diffs are not rendered by default.

plugin-mapping.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,5 @@ seatunnel.source.AmazonSqs = connector-amazonsqs
115115
seatunnel.sink.AmazonSqs = connector-amazonsqs
116116
seatunnel.source.Paimon = connector-paimon
117117
seatunnel.sink.Paimon = connector-paimon
118-
seatunnel.sink.Pulsar = connector-pulsar
118+
seatunnel.source.Oracle-CDC = connector-cdc-oracle
119+
seatunnel.sink.Pulsar = connector-pulsar
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22+
<modelVersion>4.0.0</modelVersion>
23+
<parent>
24+
<groupId>org.apache.seatunnel</groupId>
25+
<artifactId>connector-cdc</artifactId>
26+
<version>${revision}</version>
27+
</parent>
28+
<artifactId>connector-cdc-oracle</artifactId>
29+
<name>SeaTunnel : Connectors V2 : CDC : Oracle</name>
30+
31+
<properties>
32+
<oracle.version>19.18.0.0</oracle.version>
33+
</properties>
34+
35+
<dependencyManagement>
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.seatunnel</groupId>
39+
<artifactId>connector-cdc-base</artifactId>
40+
<version>${project.version}</version>
41+
<scope>compile</scope>
42+
</dependency>
43+
44+
<dependency>
45+
<groupId>io.debezium</groupId>
46+
<artifactId>debezium-connector-oracle</artifactId>
47+
<version>${debezium.version}</version>
48+
<scope>compile</scope>
49+
</dependency>
50+
51+
<dependency>
52+
<groupId>org.apache.seatunnel</groupId>
53+
<artifactId>connector-jdbc</artifactId>
54+
<version>${project.version}</version>
55+
</dependency>
56+
57+
<dependency>
58+
<groupId>com.oracle.database.jdbc</groupId>
59+
<artifactId>ojdbc8</artifactId>
60+
<version>${oracle.version}</version>
61+
<scope>provided</scope>
62+
</dependency>
63+
<dependency>
64+
<groupId>com.oracle.database.xml</groupId>
65+
<artifactId>xdb</artifactId>
66+
<version>${oracle.version}</version>
67+
<scope>provided</scope>
68+
</dependency>
69+
</dependencies>
70+
</dependencyManagement>
71+
72+
<dependencies>
73+
74+
<dependency>
75+
<groupId>org.apache.seatunnel</groupId>
76+
<artifactId>connector-cdc-base</artifactId>
77+
</dependency>
78+
79+
<dependency>
80+
<groupId>io.debezium</groupId>
81+
<artifactId>debezium-connector-oracle</artifactId>
82+
</dependency>
83+
84+
<dependency>
85+
<groupId>org.apache.seatunnel</groupId>
86+
<artifactId>connector-jdbc</artifactId>
87+
</dependency>
88+
89+
<dependency>
90+
<groupId>com.oracle.database.jdbc</groupId>
91+
<artifactId>ojdbc8</artifactId>
92+
</dependency>
93+
<dependency>
94+
<groupId>com.oracle.database.xml</groupId>
95+
<artifactId>xdb</artifactId>
96+
</dependency>
97+
</dependencies>
98+
99+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.debezium.connector.oracle;
19+
20+
import io.debezium.connector.base.ChangeEventQueue;
21+
import io.debezium.pipeline.ErrorHandler;
22+
23+
import java.io.IOException;
24+
import java.sql.SQLRecoverableException;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
28+
/**
29+
* Copied from https://github.com/debezium/debezium project to fix
30+
* https://issues.redhat.com/browse/DBZ-4536 for 1.6.4.Final version.
31+
*
32+
* <p>This file is override to fix logger mining session stopped due to 'No more data to read from
33+
* socket' exception. please see more discussion under
34+
* https://github.com/debezium/debezium/pull/3118, We should remove this class since we bumped
35+
* higher debezium version after 1.8.1.Final where the issue has been fixed.
36+
*/
37+
public class OracleErrorHandler extends ErrorHandler {
38+
39+
private static final List<String> RETRY_ORACLE_ERRORS = new ArrayList<>();
40+
private static final List<String> RETRY_ORACLE_MESSAGE_CONTAINS_TEXTS = new ArrayList<>();
41+
42+
static {
43+
// Contents of this list should only be ORA-xxxxx errors
44+
// The error check uses starts-with semantics
45+
RETRY_ORACLE_ERRORS.add("ORA-03135"); // connection lost
46+
RETRY_ORACLE_ERRORS.add("ORA-12543"); // TNS:destination host unreachable
47+
RETRY_ORACLE_ERRORS.add("ORA-00604"); // error occurred at recursive SQL level 1
48+
RETRY_ORACLE_ERRORS.add("ORA-01089"); // Oracle immediate shutdown in progress
49+
RETRY_ORACLE_ERRORS.add("ORA-01333"); // Failed to establish LogMiner dictionary
50+
RETRY_ORACLE_ERRORS.add("ORA-01284"); // Redo/Archive log cannot be opened, likely locked
51+
RETRY_ORACLE_ERRORS.add(
52+
"ORA-26653"); // Apply DBZXOUT did not start properly and is currently in state
53+
// INITIAL
54+
RETRY_ORACLE_ERRORS.add("ORA-01291"); // missing logfile
55+
RETRY_ORACLE_ERRORS.add(
56+
"ORA-01327"); // failed to exclusively lock system dictionary as required BUILD
57+
RETRY_ORACLE_ERRORS.add("ORA-04030"); // out of process memory
58+
59+
// Contents of this list should be any type of error message text
60+
// The error check uses case-insensitive contains semantics
61+
RETRY_ORACLE_MESSAGE_CONTAINS_TEXTS.add("No more data to read from socket");
62+
}
63+
64+
public OracleErrorHandler(String logicalName, ChangeEventQueue<?> queue) {
65+
super(OracleConnector.class, logicalName, queue);
66+
}
67+
68+
@Override
69+
protected boolean isRetriable(Throwable throwable) {
70+
while (throwable != null) {
71+
// Always retry any recoverable error
72+
if (throwable instanceof SQLRecoverableException) {
73+
return true;
74+
}
75+
76+
// If message is provided, run checks against it
77+
final String message = throwable.getMessage();
78+
if (message != null && message.length() > 0) {
79+
// Check Oracle error codes
80+
for (String errorCode : RETRY_ORACLE_ERRORS) {
81+
if (message.startsWith(errorCode)) {
82+
return true;
83+
}
84+
}
85+
// Check Oracle error message texts
86+
for (String messageText : RETRY_ORACLE_MESSAGE_CONTAINS_TEXTS) {
87+
if (message.toUpperCase().contains(messageText.toUpperCase())) {
88+
return true;
89+
}
90+
}
91+
}
92+
93+
if (throwable.getCause() != null) {
94+
// We explicitly check this below the top-level error as we only want
95+
// certain nested exceptions to be retried, not if they're at the top
96+
final Throwable cause = throwable.getCause();
97+
if (cause instanceof IOException) {
98+
return true;
99+
}
100+
}
101+
102+
throwable = throwable.getCause();
103+
}
104+
return false;
105+
}
106+
}

0 commit comments

Comments
 (0)