Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector][Jdbc] Add vertica connector. #4303

Merged
merged 2 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ This option is used to support operations such as `insert`, `delete`, and `updat

### support_upsert_by_query_primary_key_exist [boolean]

Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax.
Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupported upsert syntax.
**Note**: that this method has low performance

### connection_check_timeout_sec [int]
Expand Down Expand Up @@ -161,6 +161,7 @@ there are some reference value for params above.
| Doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc |
| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |

## Example

Expand Down Expand Up @@ -238,4 +239,5 @@ sink {
- [Feature] Support Redshift JDBC Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
- [Improve] Add config item enable upsert by query([#3708](https://github.com/apache/incubator-seatunnel/pull/3708))
- [Improve] Add database field to sink config([#4199](https://github.com/apache/incubator-seatunnel/pull/4199))
- [Improve] Add Vertica connector([#4303](https://github.com/apache/incubator-seatunnel/pull/4303))

2 changes: 2 additions & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ there are some reference value for params above.
| doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc |
| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |

## Example

Expand Down Expand Up @@ -174,4 +175,5 @@ Jdbc {
- [Feature] Support Doris JDBC Source ([3586](https://github.com/apache/incubator-seatunnel/pull/3586))
- [Feature] Support Redshift JDBC Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
- [BugFix] Fix jdbc connection reset bug ([3670](https://github.com/apache/incubator-seatunnel/pull/3670))
- [Improve] Add Vertica connector([#4303](https://github.com/apache/incubator-seatunnel/pull/4303))

1 change: 1 addition & 0 deletions release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- [Github] Add Github source connector #4155
- [CDC] Support export debezium-json format to kafka #4339
- [RocketMQ] Add RocketMQ source and sink connector #4007
- [Jdbc] Add vertica connector #4303
### Formats
- [Canal]Support read canal format message #3950

Expand Down
12 changes: 12 additions & 0 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<teradata.version>17.20.00.12</teradata.version>
<redshift.version>2.1.0.9</redshift.version>
<saphana.version>2.14.7</saphana.version>
<vertica.version>12.0.3-0</vertica.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -122,6 +123,12 @@
<version>${saphana.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.vertica.jdbc</groupId>
<artifactId>vertica-jdbc</artifactId>
<version>${vertica.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -184,5 +191,10 @@
<groupId>com.sap.cloud.db.jdbc</groupId>
<artifactId>ngdbc</artifactId>
</dependency>

<dependency>
<groupId>com.vertica.jdbc</groupId>
<artifactId>vertica-jdbc</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class VerticaDialect implements JdbcDialect {
@Override
public String dialectName() {
return "Vertica";
}

@Override
public JdbcRowConverter getRowConverter() {
return new VerticaJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new VerticaTypeMapper();
}

@Override
public String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}

@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {
List<String> nonUniqueKeyFields =
Arrays.stream(fieldNames)
.filter(fieldName -> !Arrays.asList(uniqueKeyFields).contains(fieldName))
.collect(Collectors.toList());
String valuesBinding =
Arrays.stream(fieldNames)
.map(fieldName -> ":" + fieldName + " " + quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));

String usingClause = String.format("SELECT %s FROM DUAL", valuesBinding);
String onConditions =
Arrays.stream(uniqueKeyFields)
.map(
fieldName ->
String.format(
"TARGET.%s=SOURCE.%s",
quoteIdentifier(fieldName),
quoteIdentifier(fieldName)))
.collect(Collectors.joining(" AND "));
String updateSetClause =
nonUniqueKeyFields.stream()
.map(
fieldName ->
String.format(
"TARGET.%s=SOURCE.%s",
quoteIdentifier(fieldName),
quoteIdentifier(fieldName)))
.collect(Collectors.joining(", "));
String insertFields =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String insertValues =
Arrays.stream(fieldNames)
.map(fieldName -> "SOURCE." + quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));

String upsertSQL =
String.format(
" MERGE INTO %s.%s TARGET"
+ " USING (%s) SOURCE"
+ " ON (%s) "
+ " WHEN MATCHED THEN"
+ " UPDATE SET %s"
+ " WHEN NOT MATCHED THEN"
+ " INSERT (%s) VALUES (%s)",
database,
tableName,
usingClause,
onConditions,
updateSetClause,
insertFields,
insertValues);

return Optional.of(upsertSQL);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

/** Factory for {@link VerticaDialect}. */
@AutoService(JdbcDialectFactory.class)
public class VerticaDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:vertica:");
}

@Override
public JdbcDialect create() {
return new VerticaDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

public class VerticaJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
public String converterName() {
return "Vertica";
}
}