Skip to content

Commit

Permalink
[Feature][Connector]Jdbc] Add vertica connector.
Browse files Browse the repository at this point in the history
  • Loading branch information
FlechazoW committed Mar 12, 2023
1 parent add75d7 commit f2d5db3
Show file tree
Hide file tree
Showing 10 changed files with 555 additions and 1 deletion.
4 changes: 3 additions & 1 deletion docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ This option is used to support operations such as `insert`, `delete`, and `updat

### support_upsert_by_query_primary_key_exist [boolean]

Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax.
Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupported upsert syntax.
**Note**: that this method has low performance

### connection_check_timeout_sec [int]
Expand Down Expand Up @@ -161,6 +161,7 @@ there are some reference value for params above.
| Doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc |
| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |

## Example

Expand Down Expand Up @@ -237,4 +238,5 @@ sink {
- [Feature] Support Redshift JDBC Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
- [Improve] Add config item enable upsert by query([#3708](https://github.com/apache/incubator-seatunnel/pull/3708))
- [Improve] Add database field to sink config([#4199](https://github.com/apache/incubator-seatunnel/pull/4199))
- [Improve] Add Vertica connector([#4303](https://github.com/apache/incubator-seatunnel/pull/4303))

2 changes: 2 additions & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ there are some reference value for params above.
| doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc |
| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 |
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |

## Example

Expand Down Expand Up @@ -174,4 +175,5 @@ Jdbc {
- [Feature] Support Doris JDBC Source ([3586](https://github.com/apache/incubator-seatunnel/pull/3586))
- [Feature] Support Redshift JDBC Sink([#3615](https://github.com/apache/incubator-seatunnel/pull/3615))
- [BugFix] Fix jdbc connection reset bug ([3670](https://github.com/apache/incubator-seatunnel/pull/3670))
- [Improve] Add Vertica connector([#4303](https://github.com/apache/incubator-seatunnel/pull/4303))

12 changes: 12 additions & 0 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<teradata.version>17.20.00.12</teradata.version>
<redshift.version>2.1.0.9</redshift.version>
<saphana.version>2.14.7</saphana.version>
<vertica.version>12.0.3-0</vertica.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -122,6 +123,12 @@
<version>${saphana.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.vertica.jdbc</groupId>
<artifactId>vertica-jdbc</artifactId>
<version>${vertica.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -184,5 +191,10 @@
<groupId>com.sap.cloud.db.jdbc</groupId>
<artifactId>ngdbc</artifactId>
</dependency>

<dependency>
<groupId>com.vertica.jdbc</groupId>
<artifactId>vertica-jdbc</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class VerticaDialect implements JdbcDialect {
@Override
public String dialectName() {
return "Vertica";
}

@Override
public JdbcRowConverter getRowConverter() {
return new VerticaJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new VerticaTypeMapper();
}

@Override
public String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}

@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {
List<String> nonUniqueKeyFields =
Arrays.stream(fieldNames)
.filter(fieldName -> !Arrays.asList(uniqueKeyFields).contains(fieldName))
.collect(Collectors.toList());
String valuesBinding =
Arrays.stream(fieldNames)
.map(fieldName -> ":" + fieldName + " " + quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));

String usingClause = String.format("SELECT %s FROM DUAL", valuesBinding);
String onConditions =
Arrays.stream(uniqueKeyFields)
.map(
fieldName ->
String.format(
"TARGET.%s=SOURCE.%s",
quoteIdentifier(fieldName),
quoteIdentifier(fieldName)))
.collect(Collectors.joining(" AND "));
String updateSetClause =
nonUniqueKeyFields.stream()
.map(
fieldName ->
String.format(
"TARGET.%s=SOURCE.%s",
quoteIdentifier(fieldName),
quoteIdentifier(fieldName)))
.collect(Collectors.joining(", "));
String insertFields =
Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String insertValues =
Arrays.stream(fieldNames)
.map(fieldName -> "SOURCE." + quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));

String upsertSQL =
String.format(
" MERGE INTO %s.%s TARGET"
+ " USING (%s) SOURCE"
+ " ON (%s) "
+ " WHEN MATCHED THEN"
+ " UPDATE SET %s"
+ " WHEN NOT MATCHED THEN"
+ " INSERT (%s) VALUES (%s)",
database,
tableName,
usingClause,
onConditions,
updateSetClause,
insertFields,
insertValues);

return Optional.of(upsertSQL);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

/** Factory for {@link VerticaDialect}. */
@AutoService(JdbcDialectFactory.class)
public class VerticaDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:vertica:");
}

@Override
public JdbcDialect create() {
return new VerticaDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

public class VerticaJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
public String converterName() {
return "Vertica";
}
}

0 comments on commit f2d5db3

Please sign in to comment.