Skip to content

Commit

Permalink
[Feature][Connector-V2] add sqlserver connector (#2646)
Browse files Browse the repository at this point in the history
* refactor sqlserver connector

* merge dev and fix some conflict

* add sqlserver doc

* [Feature][Connector-V2] add sqlserver jdbc driver into container.

* [Feature][Connector-V2] fix some bug

* Update docs/en/connector-v2/sink/Jdbc.md

Co-authored-by: Hisoka <fanjiaeminem@qq.com>

* [Feature][Connector-V2]jdbc-sqlserver change seatunnel home path

* fix some merge error

* fix some merge error

* fix `log has private access`

* revert

* fix some review problem

Co-authored-by: Hisoka <fanjiaeminem@qq.com>
  • Loading branch information
liugddx and Hisoka-X committed Sep 30, 2022
1 parent 7a8a076 commit 05d105d
Show file tree
Hide file tree
Showing 15 changed files with 778 additions and 15 deletions.
18 changes: 9 additions & 9 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
## Description

Write data through jdb c. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once
Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once
semantics (using XA transaction guarantee).

## Key features
Expand Down Expand Up @@ -37,7 +37,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
### driver [string]

The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver.
Warn: for license compliance, you have to provide MySQL JDBC driver yourself, e.g. copy mysql-connector-java-xxx.jar to
Warn: for license compliance, you have to provide any driver yourself like MySQL JDBC Driver, e.g. copy mysql-connector-java-xxx.jar to
$SEATNUNNEL_HOME/lib for Standalone.

### user [string]
Expand Down Expand Up @@ -102,13 +102,13 @@ In the case of is_exactly_once = "true", Xa transactions are used. This requires
## appendix

there are some reference value for params above.

| datasource | driver | url | xa_data_source_class_name | maven |
|------------|----------------------------------------------|-----------------------------------------------------------------------|-------------------------------------|---------------------------------------------------------------------------------------|
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | |
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
| datasource | driver | url | xa_data_source_class_name | maven |
|------------|----------------------------------------------|-----------------------------------------------------------------------|--------------------------------------------------------|---------------------------------------------------------------------------------------|
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | |
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |

## Example

Expand Down
11 changes: 6 additions & 5 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ in parallel according to the concurrency of tasks.

there are some reference value for params above.

| datasource | driver | url | maven |
|------------|----------------------------------------------|--------------------------------------------------------------------|---------------------------------------------------------------------------------------|
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql | |
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
| datasource | driver | url | maven |
|------------|--------------------------|----------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------|
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql |
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |

## Example

Expand Down
13 changes: 12 additions & 1 deletion seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<mysql.version>8.0.16</mysql.version>
<postgresql.version>42.3.3</postgresql.version>
<dm-jdbc.version>8.1.2.141</dm-jdbc.version>
<sqlserver.version>9.2.1.jre8</sqlserver.version>
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
</properties>

Expand Down Expand Up @@ -62,6 +63,12 @@
<version>${dm-jdbc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>${sqlserver.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand All @@ -70,7 +77,6 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>

<dependency>
Expand All @@ -85,6 +91,11 @@
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
</dependency>

<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

public class SqlServerDialect implements JdbcDialect {
@Override
public String dialectName() {
return "Sqlserver";
}

@Override
public JdbcRowConverter getRowConverter() {
return new SqlserverJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new SqlserverTypeMapper();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

/**
* Factory for {@link SqlServerDialect}.
*/

@AutoService(JdbcDialectFactory.class)
public class SqlServerDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:sqlserver:");
}

@Override
public JdbcDialect create() {
return new SqlServerDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class SqlserverJdbcRowConverter extends AbstractJdbcRowConverter {

@Override
public String converterName() {
return "Sqlserver";
}

@Override
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
return super.toInternal(rs, metaData, typeInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;

@Slf4j
public class SqlserverTypeMapper implements JdbcDialectTypeMapper {


// ============================data types=====================

private static final String SQLSERVER_UNKNOWN = "UNKNOWN";

// -------------------------number----------------------------
private static final String SQLSERVER_BIT = "BIT";
private static final String SQLSERVER_TINYINT = "TINYINT";
private static final String SQLSERVER_SMALLINT = "SMALLINT";
private static final String SQLSERVER_INTEGER = "INTEGER";
private static final String SQLSERVER_INT = "INT";
private static final String SQLSERVER_BIGINT = "BIGINT";
private static final String SQLSERVER_DECIMAL = "DECIMAL";
private static final String SQLSERVER_FLOAT = "FLOAT";
private static final String SQLSERVER_REAL = "REAL";
private static final String SQLSERVER_NUMERIC = "NUMERIC";
private static final String SQLSERVER_MONEY = "MONEY";
private static final String SQLSERVER_SMALLMONEY = "SMALLMONEY";
// -------------------------string----------------------------
private static final String SQLSERVER_CHAR = "CHAR";
private static final String SQLSERVER_VARCHAR = "VARCHAR";
private static final String SQLSERVER_NTEXT = "NTEXT";
private static final String SQLSERVER_NCHAR = "NCHAR";
private static final String SQLSERVER_NVARCHAR = "NVARCHAR";
private static final String SQLSERVER_TEXT = "TEXT";

// ------------------------------time-------------------------
private static final String SQLSERVER_DATE = "DATE";
private static final String SQLSERVER_TIME = "TIME";
private static final String SQLSERVER_DATETIME = "DATETIME";
private static final String SQLSERVER_DATETIME2 = "DATETIME2";
private static final String SQLSERVER_SMALLDATETIME = "SMALLDATETIME";
private static final String SQLSERVER_DATETIMEOFFSET = "DATETIMEOFFSET";
private static final String SQLSERVER_TIMESTAMP = "TIMESTAMP";

// ------------------------------blob-------------------------
private static final String SQLSERVER_BINARY = "BINARY";
private static final String SQLSERVER_VARBINARY = "VARBINARY";
private static final String SQLSERVER_IMAGE = "IMAGE";

@SuppressWarnings("checkstyle:MagicNumber")
@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
String sqlServerType = metadata.getColumnTypeName(colIndex).toUpperCase();
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (sqlServerType) {
case SQLSERVER_BIT:
return BasicType.BOOLEAN_TYPE;
case SQLSERVER_TINYINT:
case SQLSERVER_SMALLINT:
return BasicType.SHORT_TYPE;
case SQLSERVER_INTEGER:
case SQLSERVER_INT:
return BasicType.INT_TYPE;
case SQLSERVER_BIGINT:
return BasicType.LONG_TYPE;
case SQLSERVER_DECIMAL:
case SQLSERVER_NUMERIC:
case SQLSERVER_MONEY:
case SQLSERVER_SMALLMONEY:
return new DecimalType(precision, scale);
case SQLSERVER_REAL:
return BasicType.FLOAT_TYPE;
case SQLSERVER_FLOAT:
return BasicType.DOUBLE_TYPE;
case SQLSERVER_CHAR:
case SQLSERVER_NCHAR:
case SQLSERVER_VARCHAR:
case SQLSERVER_NTEXT:
case SQLSERVER_NVARCHAR:
case SQLSERVER_TEXT:
return BasicType.STRING_TYPE;
case SQLSERVER_DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case SQLSERVER_TIME:
return LocalTimeType.LOCAL_TIME_TYPE;
case SQLSERVER_DATETIME:
case SQLSERVER_DATETIME2:
case SQLSERVER_TIMESTAMP:
case SQLSERVER_SMALLDATETIME:
case SQLSERVER_DATETIMEOFFSET:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case SQLSERVER_BINARY:
case SQLSERVER_VARBINARY:
case SQLSERVER_IMAGE:
return PrimitiveByteArrayType.INSTANCE;
//Doesn't support yet
case SQLSERVER_UNKNOWN:
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support SQLSERVER type '%s' on column '%s' yet.",
sqlServerType, jdbcColumnName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mssqlserver</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 05d105d

Please sign in to comment.