Skip to content

Commit 384ece1

Browse files
authored
[Feature][Connector-V2] oracle connector (#2550)
* [Feature][Connector-V2] support oracle connector * [Feature][Connector-V2] modify oracle connector
1 parent a843792 commit 384ece1

File tree

12 files changed

+687
-14
lines changed

12 files changed

+687
-14
lines changed

docs/en/connector-v2/sink/Jdbc.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,14 @@ In the case of is_exactly_once = "true", Xa transactions are used. This requires
107107
## appendix
108108

109109
there are some reference value for params above.
110-
| datasource | driver | url | xa_data_source_class_name | maven |
111-
|------------|----------------------------------------------|-----------------------------------------------------------------------|--------------------------------------------------------|---------------------------------------------------------------------------------------|
112-
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
113-
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | |
114-
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
115-
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
116-
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
110+
| datasource | driver | url | xa_data_source_class_name | maven |
111+
| ---------- | -------------------------------------------- | ------------------------------------------------------------ | -------------------------------------------------- | ------------------------------------------------------------ |
112+
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
113+
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql |
114+
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
115+
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
116+
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
117+
| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
117118

118119
## Example
119120

docs/en/connector-v2/source/Jdbc.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,14 @@ in parallel according to the concurrency of tasks.
9090

9191
there are some reference value for params above.
9292

93-
| datasource | driver | url | maven |
94-
|------------|--------------------------|----------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------|
95-
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
96-
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql |
97-
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
98-
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
99-
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
93+
| datasource | driver | url | maven |
94+
| ---------- | -------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
95+
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
96+
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql |
97+
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
98+
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
99+
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
100+
| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
100101

101102
## Example
102103

seatunnel-connectors-v2/connector-jdbc/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
<dm-jdbc.version>8.1.2.141</dm-jdbc.version>
3636
<sqlserver.version>9.2.1.jre8</sqlserver.version>
3737
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
38+
<oracle.version>12.2.0.1</oracle.version>
3839
</properties>
3940

4041
<dependencyManagement>
@@ -69,6 +70,13 @@
6970
<version>${sqlserver.version}</version>
7071
<scope>provided</scope>
7172
</dependency>
73+
<dependency>
74+
<groupId>com.oracle.database.jdbc</groupId>
75+
<artifactId>ojdbc8</artifactId>
76+
<version>${oracle.version}</version>
77+
<scope>provided</scope>
78+
</dependency>
79+
7280
</dependencies>
7381
</dependencyManagement>
7482

@@ -96,6 +104,10 @@
96104
<groupId>com.microsoft.sqlserver</groupId>
97105
<artifactId>mssql-jdbc</artifactId>
98106
</dependency>
107+
<dependency>
108+
<groupId>com.oracle.database.jdbc</groupId>
109+
<artifactId>ojdbc8</artifactId>
110+
</dependency>
99111
</dependencies>
100112

101113
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
21+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
22+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
23+
24+
public class OracleDialect implements JdbcDialect {
25+
@Override
26+
public String dialectName() {
27+
return "Oracle";
28+
}
29+
30+
@Override
31+
public JdbcRowConverter getRowConverter() {
32+
return new OracleJdbcRowConverter();
33+
}
34+
35+
@Override
36+
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
37+
return new OracleTypeMapper();
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
21+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
22+
23+
import com.google.auto.service.AutoService;
24+
25+
/**
26+
* Factory for {@link OracleDialect}.
27+
*/
28+
29+
@AutoService(JdbcDialectFactory.class)
30+
public class OracleDialectFactory implements JdbcDialectFactory {
31+
@Override
32+
public boolean acceptsURL(String url) {
33+
return url.startsWith("jdbc:oracle:thin:");
34+
}
35+
36+
@Override
37+
public JdbcDialect create() {
38+
return new OracleDialect();
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
19+
20+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
22+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
23+
24+
import java.sql.ResultSet;
25+
import java.sql.ResultSetMetaData;
26+
import java.sql.SQLException;
27+
28+
public class OracleJdbcRowConverter extends AbstractJdbcRowConverter {
29+
30+
@Override
31+
public String converterName() {
32+
return "Oracle";
33+
}
34+
35+
@Override
36+
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
37+
return super.toInternal(rs, metaData, typeInfo);
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
19+
20+
import org.apache.seatunnel.api.table.type.BasicType;
21+
import org.apache.seatunnel.api.table.type.DecimalType;
22+
import org.apache.seatunnel.api.table.type.LocalTimeType;
23+
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
24+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
25+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
26+
27+
import lombok.extern.slf4j.Slf4j;
28+
29+
import java.sql.ResultSetMetaData;
30+
import java.sql.SQLException;
31+
32+
@Slf4j
33+
public class OracleTypeMapper implements JdbcDialectTypeMapper {
34+
35+
// ============================data types=====================
36+
37+
private static final String ORACLE_UNKNOWN = "UNKNOWN";
38+
39+
// -------------------------number----------------------------
40+
private static final String ORACLE_BINARY_DOUBLE = "BINARY_DOUBLE";
41+
private static final String ORACLE_BINARY_FLOAT = "BINARY_FLOAT";
42+
private static final String ORACLE_NUMBER = "NUMBER";
43+
private static final String ORACLE_FLOAT = "FLOAT";
44+
private static final String ORACLE_REAL = "REAL";
45+
private static final String ORACLE_INTEGER = "INTEGER";
46+
47+
// -------------------------string----------------------------
48+
private static final String ORACLE_CHAR = "CHAR";
49+
private static final String ORACLE_VARCHAR2 = "VARCHAR2";
50+
private static final String ORACLE_NCHAR = "NCHAR";
51+
private static final String ORACLE_NVARCHAR2 = "NVARCHAR2";
52+
private static final String ORACLE_LONG = "LONG";
53+
private static final String ORACLE_ROWID = "ROWID";
54+
private static final String ORACLE_CLOB = "CLOB";
55+
private static final String ORACLE_NCLOB = "NCLOB";
56+
57+
// ------------------------------time-------------------------
58+
private static final String ORACLE_DATE = "DATE";
59+
private static final String ORACLE_TIMESTAMP = "TIMESTAMP";
60+
private static final String ORACLE_TIMESTAMP_WITH_LOCAL_TIME_ZONE = "TIMESTAMP WITH LOCAL TIME ZONE";
61+
62+
// ------------------------------blob-------------------------
63+
private static final String ORACLE_BLOB = "BLOB";
64+
private static final String ORACLE_BFILE = "BFILE";
65+
private static final String ORACLE_RAW = "RAW";
66+
private static final String ORACLE_LONG_RAW = "LONG RAW";
67+
68+
@SuppressWarnings("checkstyle:MagicNumber")
69+
@Override
70+
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
71+
String oracleType = metadata.getColumnTypeName(colIndex).toUpperCase();
72+
String columnName = metadata.getColumnName(colIndex);
73+
int precision = metadata.getPrecision(colIndex);
74+
int scale = metadata.getScale(colIndex);
75+
switch (oracleType) {
76+
case ORACLE_INTEGER:
77+
return BasicType.INT_TYPE;
78+
case ORACLE_NUMBER:
79+
if (precision < 38) {
80+
return new DecimalType(precision, scale);
81+
}
82+
return new DecimalType(38, 18);
83+
case ORACLE_FLOAT:
84+
case ORACLE_BINARY_DOUBLE:
85+
return BasicType.DOUBLE_TYPE;
86+
case ORACLE_BINARY_FLOAT:
87+
case ORACLE_REAL:
88+
return BasicType.FLOAT_TYPE;
89+
case ORACLE_CHAR:
90+
case ORACLE_NCHAR:
91+
case ORACLE_NVARCHAR2:
92+
case ORACLE_VARCHAR2:
93+
case ORACLE_LONG:
94+
case ORACLE_ROWID:
95+
case ORACLE_NCLOB:
96+
case ORACLE_CLOB:
97+
return BasicType.STRING_TYPE;
98+
case ORACLE_DATE:
99+
case ORACLE_TIMESTAMP:
100+
case ORACLE_TIMESTAMP_WITH_LOCAL_TIME_ZONE:
101+
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
102+
case ORACLE_BLOB:
103+
case ORACLE_RAW:
104+
case ORACLE_LONG_RAW:
105+
case ORACLE_BFILE:
106+
return PrimitiveByteArrayType.INSTANCE;
107+
//Doesn't support yet
108+
case ORACLE_UNKNOWN:
109+
default:
110+
final String jdbcColumnName = metadata.getColumnName(colIndex);
111+
throw new UnsupportedOperationException(
112+
String.format(
113+
"Doesn't support ORACLE type '%s' on column '%s' yet.",
114+
oracleType, jdbcColumnName));
115+
}
116+
}
117+
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@
6565
<artifactId>DmJdbcDriver18</artifactId>
6666
<scope>test</scope>
6767
</dependency>
68+
<dependency>
69+
<groupId>com.oracle.database.jdbc</groupId>
70+
<artifactId>ojdbc8</artifactId>
71+
<scope>test</scope>
72+
</dependency>
6873
</dependencies>
6974

7075
</project>

0 commit comments

Comments
 (0)