Skip to content

Commit

Permalink
feat: jdbc support copy in statement.
Browse files Browse the repository at this point in the history
  • Loading branch information
mosence committed Mar 6, 2024
1 parent e1a81ac commit 14ccb30
Show file tree
Hide file tree
Showing 11 changed files with 352 additions and 3 deletions.
7 changes: 7 additions & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
| data_save_mode | Enum | No | APPEND_DATA |
| custom_sql | String | No | - |
| enable_upsert | Boolean | No | true |
| use_copy_statement | Boolean | No | false |

### driver [string]

Expand Down Expand Up @@ -197,6 +198,12 @@ When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL

Enable upsert by primary_keys exist, If the task has no key duplicate data, setting this parameter to `false` can speed up data import

### use_copy_statement [boolean]

Use `COPY ${table} FROM STDIN` statement to import data. Only drivers with `getCopyAPI()` method connections are supported. e.g.: Postgresql driver `org.postgresql.Driver`.

NOTICE: `MAP`, `ARRAY`, `ROW` types are not supported.

## tips

In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases require some setup :
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
<commons-lang3.version>3.5</commons-lang3.version>
<commons-io.version>2.11.0</commons-io.version>
<commons-collections4.version>4.4</commons-collections4.version>
<commons-csv.version>1.10.0</commons-csv.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<protostuff.version>1.8.0</protostuff.version>
<spark.scope>provided</spark.scope>
Expand Down Expand Up @@ -329,6 +330,11 @@
<artifactId>commons-collections4</artifactId>
<version>${commons-collections4.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>${commons-csv.version}</version>
</dependency>

<dependency>
<groupId>com.beust</groupId>
Expand Down
5 changes: 4 additions & 1 deletion seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@
</dependencyManagement>

<dependencies>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
Expand All @@ -205,6 +204,10 @@
<version>${hikari.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ public interface JdbcOptions {
.defaultValue(false)
.withDescription("support upsert by insert only");

Option<Boolean> USE_COPY_STATEMENT =
Options.key("use_copy_statement")
.booleanType()
.defaultValue(false)
.withDescription("support copy in statement (postgresql)");

/** source config */
Option<String> PARTITION_COLUMN =
Options.key("partition_column")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class JdbcSinkConfig implements Serializable {
private boolean enableUpsert;
@Builder.Default private boolean isPrimaryKeyUpdated = true;
private boolean supportUpsertByInsertOnly;
private boolean useCopyStatement;

public static JdbcSinkConfig of(ReadonlyConfig config) {
JdbcSinkConfigBuilder builder = JdbcSinkConfig.builder();
Expand All @@ -55,6 +56,7 @@ public static JdbcSinkConfig of(ReadonlyConfig config) {
builder.isPrimaryKeyUpdated(config.get(IS_PRIMARY_KEY_UPDATED));
builder.supportUpsertByInsertOnly(config.get(SUPPORT_UPSERT_BY_INSERT_ONLY));
builder.simpleSql(config.get(JdbcOptions.QUERY));
builder.useCopyStatement(config.get(JdbcOptions.USE_COPY_STATEMENT));
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public enum JdbcConnectorErrorCode implements SeaTunnelErrorCode {
"JDBC-05", "transaction operation failed, such as (commit, rollback) etc.."),
NO_SUITABLE_DIALECT_FACTORY("JDBC-06", "No suitable dialect factory found"),
DONT_SUPPORT_SINK("JDBC-07", "The jdbc type don't support sink"),
KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication failed");
KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication failed"),
NO_SUPPORT_OPERATION_FAILED("JDBC-09", "The jdbc driver not support operation.");
private final String code;

private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.CopyManagerBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
Expand Down Expand Up @@ -63,7 +64,14 @@ public JdbcOutputFormat build() {
jdbcSinkConfig.getDatabase() + "." + jdbcSinkConfig.getTable()));

final List<String> primaryKeys = jdbcSinkConfig.getPrimaryKeys();
if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
if (jdbcSinkConfig.isUseCopyStatement()) {
CopyManagerBatchStatementExecutor.copyManagerProxyChecked(connectionProvider);
statementExecutorFactory =
() ->
createCopyInBufferStatementExecutor(
createCopyInBatchStatementExecutor(
dialect, table, tableSchema));
} else if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
statementExecutorFactory =
() ->
createSimpleBufferedExecutor(
Expand Down Expand Up @@ -185,6 +193,22 @@ private static JdbcBatchStatementExecutor<SeaTunnelRow> createUpsertExecutor(
dialect, database, table, tableSchema, pkNames, isPrimaryKeyUpdated);
}

private static JdbcBatchStatementExecutor<SeaTunnelRow> createCopyInBufferStatementExecutor(
CopyManagerBatchStatementExecutor copyManagerBatchStatementExecutor) {
return new BufferedBatchStatementExecutor(
copyManagerBatchStatementExecutor, Function.identity());
}

private static CopyManagerBatchStatementExecutor createCopyInBatchStatementExecutor(
JdbcDialect dialect, String table, TableSchema tableSchema) {
String columns =
Arrays.stream(tableSchema.getFieldNames())
.map(dialect::quoteIdentifier)
.collect(Collectors.joining(",", "(", ")"));
String copyInSql = String.format("COPY %s %s FROM STDIN WITH CSV", table, columns);
return new CopyManagerBatchStatementExecutor(copyInSql, tableSchema);
}

private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOnlyExecutor(
JdbcDialect dialect, String database, String table, TableSchema tableSchema) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;

import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;

import java.io.IOException;
import java.io.StringReader;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;

public class CopyManagerBatchStatementExecutor implements JdbcBatchStatementExecutor<SeaTunnelRow> {

private final String copySql;
private final TableSchema tableSchema;
CopyManagerProxy copyManagerProxy;
CSVFormat csvFormat = CSVFormat.POSTGRESQL_CSV;
CSVPrinter csvPrinter;

public CopyManagerBatchStatementExecutor(String copySql, TableSchema tableSchema) {
this.copySql = copySql;
this.tableSchema = tableSchema;
}

public static void copyManagerProxyChecked(JdbcConnectionProvider connectionProvider) {
try (Connection connection = connectionProvider.getOrEstablishConnection()) {
new CopyManagerProxy(connection);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED,
"unable to open CopyManager Operation in this JDBC writer. Please configure option use_copy_statement = false.",
e);
} catch (ClassNotFoundException | SQLException e) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.CREATE_DRIVER_FAILED, "unable to open JDBC writer", e);
}
}

@Override
public void prepareStatements(Connection connection) throws SQLException {
try {
this.copyManagerProxy = new CopyManagerProxy(connection);
this.csvPrinter = new CSVPrinter(new StringBuilder(), csvFormat);
} catch (NoSuchMethodException
| IllegalAccessException
| InvocationTargetException
| IOException e) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.NO_SUPPORT_OPERATION_FAILED,
"unable to open CopyManager Operation in this JDBC writer. Please configure option use_copy_statement = false.",
e);
} catch (SQLException e) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.CREATE_DRIVER_FAILED, "unable to open JDBC writer", e);
}
}

@Override
public void addToBatch(SeaTunnelRow record) throws SQLException {
try {
this.csvPrinter.printRecord(toExtract(record));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private List<Object> toExtract(SeaTunnelRow record) {
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
List<Object> csvRecord = new ArrayList<>();
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
Object fieldValue = record.getField(fieldIndex);
if (fieldValue == null) {
csvRecord.add(null);
continue;
}
switch (seaTunnelDataType.getSqlType()) {
case STRING:
csvRecord.add((String) record.getField(fieldIndex));
break;
case BOOLEAN:
csvRecord.add((Boolean) record.getField(fieldIndex));
break;
case TINYINT:
csvRecord.add((Byte) record.getField(fieldIndex));
break;
case SMALLINT:
csvRecord.add((Short) record.getField(fieldIndex));
break;
case INT:
csvRecord.add((Integer) record.getField(fieldIndex));
break;
case BIGINT:
csvRecord.add((Long) record.getField(fieldIndex));
break;
case FLOAT:
csvRecord.add((Float) record.getField(fieldIndex));
break;
case DOUBLE:
csvRecord.add((Double) record.getField(fieldIndex));
break;
case DECIMAL:
csvRecord.add((BigDecimal) record.getField(fieldIndex));
break;
case DATE:
LocalDate localDate = (LocalDate) record.getField(fieldIndex);
csvRecord.add((java.sql.Date) java.sql.Date.valueOf(localDate));
break;
case TIME:
LocalTime localTime = (LocalTime) record.getField(fieldIndex);
csvRecord.add((java.sql.Time) java.sql.Time.valueOf(localTime));
break;
case TIMESTAMP:
LocalDateTime localDateTime = (LocalDateTime) record.getField(fieldIndex);
csvRecord.add((java.sql.Timestamp) java.sql.Timestamp.valueOf(localDateTime));
break;
case BYTES:
csvRecord.add(
org.apache.commons.codec.binary.Base64.encodeBase64String(
(byte[]) record.getField(fieldIndex)));
break;
case NULL:
csvRecord.add(null);
break;
case MAP:
case ARRAY:
case ROW:
default:
throw new JdbcConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"Unexpected value: " + seaTunnelDataType);
}
}
return csvRecord;
}

@Override
public void executeBatch() throws SQLException {
try {
this.csvPrinter.flush();
this.copyManagerProxy.doCopy(
copySql, new StringReader(this.csvPrinter.getOut().toString()));

} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
this.csvPrinter.close();
this.csvPrinter = new CSVPrinter(new StringBuilder(), csvFormat);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

@Override
public void closeStatements() throws SQLException {
this.copyManagerProxy = null;
try {
this.csvPrinter.close();
this.csvPrinter = null;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;

import java.io.Reader;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;

class CopyManagerProxy {
Object connection;
Object copyManager;
Class<?> connectionClazz;
Class<?> copyManagerClazz;
Method getCopyAPIMethod;
Method copyInMethod;

CopyManagerProxy(Connection connection)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException,
SQLException {
this.connection = connection.unwrap(Connection.class);
this.connectionClazz = this.connection.getClass();
this.getCopyAPIMethod = this.connectionClazz.getMethod("getCopyAPI");
this.copyManager = this.getCopyAPIMethod.invoke(this.connection);
this.copyManagerClazz = this.copyManager.getClass();
this.copyInMethod = this.copyManagerClazz.getMethod("copyIn", String.class, Reader.class);
}

long doCopy(String sql, Reader reader)
throws InvocationTargetException, IllegalAccessException {
return (long) this.copyInMethod.invoke(this.copyManager, sql, reader);
}
}

0 comments on commit 14ccb30

Please sign in to comment.