Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2][JDBC] support sqlite Source & Sink #3089

Merged
merged 32 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
04e2395
refactor JDBC[SqLite] connector
nutsjian Oct 13, 2022
cf1ddc4
[Improve] [JDBC] [SQLite] Connector test more field types
nutsjian Oct 14, 2022
2981c1a
add [JDBC] [SQLite] doc
nutsjian Oct 14, 2022
0643e4a
fix [JDBC] [SQLite] e2e test case method name
nutsjian Oct 14, 2022
f3de959
[fix] class name error
nutsjian Oct 15, 2022
6f3b792
[Hotfix] [JDBC] [SQLite] connector e2e test path problem
nutsjian Oct 15, 2022
e4bf558
[fix] [JDBC] [SQLite] flink e2e test docker container db file cannot …
nutsjian Oct 18, 2022
45701f3
[fix] [JDBC] [SQLite] flink e2e test
nutsjian Oct 19, 2022
7bd5125
Merge branch 'dev' into jdbc-sqlite
nutsjian Oct 19, 2022
e587104
[fix] [JDBC] [SQLite] flink e2e test database closed error
nutsjian Oct 20, 2022
049f6c7
Merge branch 'dev' into jdbc-sqlite
nutsjian Oct 20, 2022
803dec9
Merge branch 'dev' into jdbc-sqlite
nutsjian Oct 20, 2022
ce5ddf0
[fix] [JDBC] [SQLite] e2e test, rerun workflows commit
nutsjian Oct 24, 2022
eb321b0
Merge branch 'dev' into jdbc-sqlite
nutsjian Oct 28, 2022
0118847
update [JDBC] [SQLite] add change log
nutsjian Nov 1, 2022
7ef5bfe
Merge branch 'dev' into jdbc-sqlite
nutsjian Nov 1, 2022
d5ed310
update [JDBC] [SQLite] change log doc
nutsjian Nov 1, 2022
a861767
update [JDBC] [SQLite] connector map tinyint smallint to SeaTunnel Sh…
nutsjian Nov 8, 2022
8fbaad8
refactor JDBC[SqLite] connector
nutsjian Oct 13, 2022
dfd660c
Merge branch 'dev' into jdbc-sqlite
nutsjian Nov 11, 2022
93468b5
update, merge upstream dev, add e2e in flink/spark e2e module so that…
nutsjian Nov 11, 2022
18cf506
update checkstyle
nutsjian Nov 11, 2022
70494ae
Merge branch 'dev' into jdbc-sqlite
nutsjian Nov 15, 2022
03fba9f
[checkstyle] remove unused import
nutsjian Nov 15, 2022
976c5a6
Merge remote-tracking branch 'upstream' into jdbc-sqlite
nutsjian Nov 15, 2022
6197196
Merge branch 'dev' of https://github.com/apache/incubator-seatunnel i…
nutsjian Nov 16, 2022
d03bf4d
update [JDBC] [SQLite] connector
nutsjian Nov 16, 2022
5ad4b4c
Merge branch 'dev' into jdbc-sqlite
nutsjian Nov 18, 2022
fa00b72
Merge branch 'dev' into jdbc-sqlite
nutsjian Nov 18, 2022
5c0b7e8
Merge branch 'dev' into jdbc-sqlite
nutsjian Nov 22, 2022
f4d4d33
fix [JDBC] [SQLite] connector, remove root pom.xml dup net.alchim31.m…
nutsjian Nov 22, 2022
3204610
update [Connector] [JDBC] SQLite, remove typeAffinity option, impleme…
nutsjian Nov 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<sqlserver.version>9.2.1.jre8</sqlserver.version>
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<oracle.version>12.2.0.1</oracle.version>
<sqlite.version>3.39.3.0</sqlite.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -76,7 +77,12 @@
<version>${oracle.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>${sqlite.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -108,6 +114,10 @@
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
</dependency>
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class JdbcConfig implements Serializable {

public static final String TRANSACTION_TIMEOUT_SEC = "transaction_timeout_sec";

public static final String TYPE_AFFINITY = "type_affinity";

//source config
public static final String PARTITION_COLUMN = "partition_column";
Expand Down Expand Up @@ -95,6 +96,10 @@ public static JdbcConnectionOptions buildJdbcConnectionOptions(Config config) {
jdbcOptions.transactionTimeoutSec = config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC);
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert it.

if (config.hasPath(JdbcConfig.TYPE_AFFINITY)) {
jdbcOptions.typeAffinity = config.getBoolean(JdbcConfig.TYPE_AFFINITY);
}
return jdbcOptions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

public class SqliteDialect implements JdbcDialect {
@Override
public String dialectName() {
return "Sqlite";
}

@Override
public JdbcRowConverter getRowConverter() {
return new SqliteJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new SqliteTypeMapper();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

/**
* Factory for {@link SqliteDialect}.
*/

@AutoService(JdbcDialectFactory.class)
public class SqliteDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:sqlite:");
}

@Override
public JdbcDialect create() {
return new SqliteDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class SqliteJdbcRowConverter extends AbstractJdbcRowConverter {

@Override
public String converterName() {
return "Sqlite";
}

@Override
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
return super.toInternal(rs, metaData, typeInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;

@Slf4j
public class SqliteTypeMapper implements JdbcDialectTypeMapper {

private static final String OBJECT_CLASS = "java.lang.Object";
private static final String CHARACTER_CLASS = "java.lang.Character";
private static final String STRING_CLASS = "java.lang.String";
private static final String INTEGER_CLASS = "java.lang.Integer";
private static final String SHORT_CLASS = "java.lang.Short";
private static final String LONG_CLASS = "java.lang.Long";
private static final String FLOAT_CLASS = "java.lang.Float";
private static final String DOUBLE_CLASS = "java.lang.Double";
private static final String BOOLEAN_CLASS = "java.lang.Boolean";
private static final String BYTE_CLASS = "java.lang.Byte";

/**
* because of sqlite's dynamic data type and affinity, use columnType(java.sql.Types) determine the SeaTunnel Data Types
*/
@SuppressWarnings("checkstyle:MagicNumber")
@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
int columnType = metadata.getColumnType(colIndex);
String columnClassName = metadata.getColumnClassName(colIndex);
// sqlite data type is dynamic and type affinity, see https://www.sqlite.org/datatype3.html
// 1. if columnClassName is java.lang.Object, this column has no data, use columnType determine the SeaTunnel Data Types
// 2. otherwise we just use columnClassName to determine the SeaTunnel Data Types
if (columnClassName.equalsIgnoreCase(OBJECT_CLASS)) { // case 1.
switch (columnType) {
case Types.CHAR:
case Types.NCHAR:
case Types.VARCHAR:
case Types.NVARCHAR:
case Types.LONGVARCHAR:
case Types.LONGNVARCHAR:
return BasicType.STRING_TYPE;
case Types.INTEGER:
return BasicType.INT_TYPE;
case Types.TINYINT:
case Types.SMALLINT:
return BasicType.SHORT_TYPE;
case Types.BIGINT:
case Types.DATE:
case Types.TIME:
case Types.TIMESTAMP:
case Types.TIME_WITH_TIMEZONE:
case Types.TIMESTAMP_WITH_TIMEZONE:
return BasicType.LONG_TYPE;
case Types.FLOAT:
return BasicType.FLOAT_TYPE;
case Types.DOUBLE:
case Types.REAL:
case Types.NUMERIC:
case Types.DECIMAL:
return BasicType.DOUBLE_TYPE;
case Types.BOOLEAN:
case Types.BIT:
return BasicType.BOOLEAN_TYPE;
case Types.BLOB:
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
return PrimitiveByteArrayType.INSTANCE;
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support sql type '%s' on column '%s' yet.",
columnType, jdbcColumnName));
}
} else { // case 2
switch (columnClassName) {
case INTEGER_CLASS:
return BasicType.INT_TYPE;
case SHORT_CLASS:
return BasicType.SHORT_TYPE;
case LONG_CLASS:
return BasicType.LONG_TYPE;
case FLOAT_CLASS:
return BasicType.FLOAT_TYPE;
case DOUBLE_CLASS:
return BasicType.DOUBLE_TYPE;
case STRING_CLASS:
case CHARACTER_CLASS:
return BasicType.STRING_TYPE;
case BOOLEAN_CLASS:
return BasicType.BOOLEAN_TYPE;
case BYTE_CLASS:
return BasicType.BYTE_TYPE;
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support sql type '%s' on column '%s' yet.",
columnType, jdbcColumnName));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class JdbcConnectionOptions
public String username;
public String password;
public String query;
// since sqlite data type affinity, the specific data type cannot be determined only by column type name
public boolean typeAffinity;

public int batchSize = DEFAULT_BATCH_SIZE;
public int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
Expand Down Expand Up @@ -99,6 +101,10 @@ public Optional<Integer> getTransactionTimeoutSec() {
return transactionTimeoutSec < 0 ? Optional.empty() : Optional.of(transactionTimeoutSec);
}

public boolean isTypeAffinity() {
return typeAffinity;
}

public static JdbcConnectionOptionsBuilder builder() {
return new JdbcConnectionOptionsBuilder();
}
Expand All @@ -116,6 +122,7 @@ public static final class JdbcConnectionOptionsBuilder {
private String xaDataSourceClassName;
private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
private int transactionTimeoutSec = DEFAULT_TRANSACTION_TIMEOUT_SEC;
private boolean typeAffinity;

private JdbcConnectionOptionsBuilder() {
}
Expand Down Expand Up @@ -180,6 +187,11 @@ public JdbcConnectionOptionsBuilder withTransactionTimeoutSec(int transactionTim
return this;
}

public JdbcConnectionOptionsBuilder withTypeAffinity(boolean affinity) {
this.typeAffinity = affinity;
return this;
}

public JdbcConnectionOptions build() {
JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions();
jdbcConnectionOptions.batchSize = this.batchSize;
Expand All @@ -194,6 +206,7 @@ public JdbcConnectionOptions build() {
jdbcConnectionOptions.transactionTimeoutSec = this.transactionTimeoutSec;
jdbcConnectionOptions.maxCommitAttempts = this.maxCommitAttempts;
jdbcConnectionOptions.xaDataSourceClassName = this.xaDataSourceClassName;
jdbcConnectionOptions.typeAffinity = this.typeAffinity;
return jdbcConnectionOptions;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite.SqliteDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
Expand All @@ -51,6 +52,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

@AutoService(SeaTunnelSource.class)
public class JdbcSource implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit, JdbcSourceState> {
Expand Down Expand Up @@ -137,15 +139,31 @@ private SeaTunnelRowType initTableField(Connection conn) {
JdbcDialectTypeMapper jdbcDialectTypeMapper = jdbcDialect.getJdbcDialectTypeMapper();
ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
ResultSet resultSet = null;
try {
PreparedStatement ps = conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
ResultSetMetaData resultSetMetaData = ps.getMetaData();
ResultSetMetaData resultSetMetaData;
if (jdbcSourceOptions.getJdbcConnectionOptions().isTypeAffinity() && jdbcDialect instanceof SqliteDialect) {
PreparedStatement ps = conn.prepareStatement("select t.* from (" + jdbcSourceOptions.getJdbcConnectionOptions().getQuery() + ") as t limit 1, 1");
resultSet = ps.executeQuery();
resultSetMetaData = resultSet.getMetaData();
} else {
PreparedStatement ps = conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
resultSetMetaData = ps.getMetaData();
}
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
fieldNames.add(resultSetMetaData.getColumnName(i));
seaTunnelDataTypes.add(jdbcDialectTypeMapper.mapping(resultSetMetaData, i));
}
} catch (Exception e) {
LOG.warn("get row type info exception", e);
} finally {
if (Objects.nonNull(resultSet)) {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the role of this code?

resultSet.close();
} catch (SQLException e) {
LOG.warn("get row type info exception", e);
}
}
}
return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
}
Expand Down