Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] oracle connector #2550

Merged
merged 23 commits into from
Oct 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 10 additions & 2 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-jdbc</artifactId>

<properties>
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<pg.version>42.3.3</pg.version>
<mysql.version>8.0.16</mysql.version>
<oracle.version>12.2.0.1</oracle.version>
</properties>

<dependencies>
Expand All @@ -55,6 +56,13 @@
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
<version>${phoenix.version}</version>
</dependency>

<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>${oracle.version}</version>
<scope>test</scope>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use provided?

</dependency>
</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

public class OracleDialect implements JdbcDialect {
@Override
public String dialectName() {
return "Oracle";
}

@Override
public JdbcRowConverter getRowConverter() {
return new OracleJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new OracleTypeMapper();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

/**
* Factory for {@link OracleDialect}.
*/

@AutoService(JdbcDialectFactory.class)
public class OracleDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:oracle:thin:");
}

@Override
public JdbcDialect create() {
return new OracleDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class OracleJdbcRowConverter extends AbstractJdbcRowConverter {

@Override
public String converterName() {
return "Oracle";
}

@Override
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
return super.toInternal(rs, metaData, typeInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;

@Slf4j
public class OracleTypeMapper implements JdbcDialectTypeMapper {

// ============================data types=====================

private static final String ORACLE_UNKNOWN = "UNKNOWN";

// -------------------------number----------------------------
private static final String ORACLE_BINARY_DOUBLE = "BINARY_DOUBLE";
private static final String ORACLE_BINARY_FLOAT = "BINARY_FLOAT";
private static final String ORACLE_NUMBER = "NUMBER";
private static final String ORACLE_FLOAT = "FLOAT";

// -------------------------string----------------------------
private static final String ORACLE_CHAR = "CHAR";
private static final String ORACLE_VARCHAR2 = "VARCHAR2";
private static final String ORACLE_NCHAR = "NCHAR";
private static final String ORACLE_NVARCHAR2 = "NVARCHAR2";
private static final String ORACLE_LONG = "LONG";
private static final String ORACLE_ROWID = "ROWID";
private static final String ORACLE_CLOB = "CLOB";
private static final String ORACLE_NCLOB = "NCLOB";

// ------------------------------time-------------------------
private static final String ORACLE_DATE = "DATE";
private static final String ORACLE_TIMESTAMP = "TIMESTAMP";
private static final String ORACLE_TIME_WITHOUT_TIME_ZONE = "TIME WITHOUT TIME ZONE";
private static final String ORACLE_TIMESTAMP_WITHOUT_TIME_ZONE = "TIMESTAMP WITHOUT TIME ZONE";

// ------------------------------blob-------------------------
private static final String ORACLE_BLOB = "BLOB";
private static final String ORACLE_BFILE = "BFILE";
private static final String ORACLE_RAW = "RAW";
private static final String ORACLE_LONG_RAW = "LONG RAW";

@SuppressWarnings("checkstyle:MagicNumber")
@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
String oracleType = metadata.getColumnTypeName(colIndex).toUpperCase();
String columnName = metadata.getColumnName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (oracleType) {
case ORACLE_NUMBER:
return new DecimalType(precision, scale);
case ORACLE_FLOAT:
case ORACLE_BINARY_DOUBLE:
return BasicType.DOUBLE_TYPE;
case ORACLE_BINARY_FLOAT:
return BasicType.FLOAT_TYPE;
case ORACLE_CHAR:
case ORACLE_NCHAR:
case ORACLE_NVARCHAR2:
case ORACLE_VARCHAR2:
case ORACLE_LONG:
case ORACLE_ROWID:
case ORACLE_NCLOB:
case ORACLE_CLOB:
return BasicType.STRING_TYPE;
case ORACLE_DATE:
case ORACLE_TIMESTAMP:
case ORACLE_TIME_WITHOUT_TIME_ZONE:
case ORACLE_TIMESTAMP_WITHOUT_TIME_ZONE:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case ORACLE_BLOB:
case ORACLE_RAW:
case ORACLE_LONG_RAW:
case ORACLE_BFILE:
return PrimitiveByteArrayType.INSTANCE;
//Doesn't support yet
case ORACLE_UNKNOWN:
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support ORACLE type '%s' on column '%s' yet.",
oracleType, jdbcColumnName));
}
}
}
10 changes: 7 additions & 3 deletions seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@
<version>1.17.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>oracle-xe</artifactId>
<version>1.17.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>

</dependencies>

</project>
</project>