diff --git a/docs/en/connector-v2/sink/ClickhouseFile.md b/docs/en/connector-v2/sink/ClickhouseFile.md index 86f762a9cdc..1eb2458d081 100644 --- a/docs/en/connector-v2/sink/ClickhouseFile.md +++ b/docs/en/connector-v2/sink/ClickhouseFile.md @@ -21,22 +21,25 @@ Write data to Clickhouse can also be done using JDBC ## Options -| name | type | required | default value | -| ---------------------- | ------- | -------- | ------------- | -| host | string | yes | - | -| database | string | yes | - | -| table | string | yes | - | -| username | string | yes | - | -| password | string | yes | - | -| clickhouse_local_path | string | yes | - | -| sharding_key | string | no | - | -| copy_method | string | no | scp | -| node_free_password | boolean | no | false | -| node_pass | list | no | - | -| node_pass.node_address | string | no | - | -| node_pass.username | string | no | "root" | -| node_pass.password | string | no | - | -| common-options | | no | - | +| name | type | required | default value | +|------------------------|---------|----------|----------------------------------------| +| host | string | yes | - | +| database | string | yes | - | +| table | string | yes | - | +| username | string | yes | - | +| password | string | yes | - | +| clickhouse_local_path | string | yes | - | +| sharding_key | string | no | - | +| copy_method | string | no | scp | +| node_free_password | boolean | no | false | +| node_pass | list | no | - | +| node_pass.node_address | string | no | - | +| node_pass.username | string | no | "root" | +| node_pass.password | string | no | - | +| compatible_mode | boolean | no | false | +| file_fields_delimiter | string | no | "\t" | +| file_temp_path | string | no | "/tmp/seatunnel/clickhouse-local/file" | +| common-options | | no | - | ### host [string] @@ -94,6 +97,21 @@ The username corresponding to the clickhouse server, default root user. The password corresponding to the clickhouse server. +### compatible_mode [boolean] + +In the lower version of Clickhouse, the ClickhouseLocal program does not support the `--path` parameter, +you need to use this mode to take other ways to realize the `--path` parameter function + +### file_fields_delimiter [string] + +ClickhouseFile uses csv format to temporarily save data. If the data in the row contains the delimiter value +of csv, it may cause program exceptions. +Avoid this with this configuration. Value string has to be an exactly one character long + +### file_temp_path [string] + +The directory where ClickhouseFile stores temporary files locally. + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details @@ -122,3 +140,8 @@ Sink plugin common parameters, please refer to [Sink Common Options](common-opti ### 2.2.0-beta 2022-09-26 - Support write data to ClickHouse File and move to ClickHouse data dir + +### Next version + +- [BugFix] Fix generated data part name conflict and improve file commit logic [3416](https://github.com/apache/incubator-seatunnel/pull/3416) +- [Feature] Support compatible_mode compatible with lower version Clickhouse [3416](https://github.com/apache/incubator-seatunnel/pull/3416) \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java index 62f7bd398bd..3f48df99085 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java @@ -96,7 +96,16 @@ public class ClickhouseConfig { public static final Option COPY_METHOD = Options.key("copy_method").enumType(ClickhouseFileCopyMethod.class) .defaultValue(ClickhouseFileCopyMethod.SCP).withDescription("The method of copy Clickhouse file"); + public static final Option COMPATIBLE_MODE = Options.key("compatible_mode").booleanType() + .defaultValue(false).withDescription("In the lower version of Clickhouse, the ClickhouseLocal program does not support the `--path` parameter, " + + "you need to use this mode to take other ways to realize the --path parameter function"); + public static final String NODE_ADDRESS = "node_address"; + + public static final Option NODE_FREE_PASSWORD = Options.key("node_free_password").booleanType() + .defaultValue(false).withDescription("Because seatunnel need to use scp or rsync for file transfer, " + + "seatunnel need clickhouse server-side access. If each spark node and clickhouse server are configured with password-free login, " + + "you can configure this option to true, otherwise you need to configure the corresponding node password in the node_pass configuration"); /** * The password of Clickhouse server node */ @@ -106,4 +115,11 @@ public class ClickhouseConfig { public static final Option> CLICKHOUSE_PREFIX = Options.key("clickhouse").mapType() .defaultValue(Collections.emptyMap()).withDescription("Clickhouse custom config"); + public static final Option FILE_FIELDS_DELIMITER = Options.key("file_fields_delimiter").stringType() + .defaultValue("\t").withDescription("ClickhouseFile uses csv format to temporarily save data. If the data in the row contains the delimiter value of csv," + + " it may cause program exceptions. Avoid this with this configuration. Value string has to be an exactly one character long"); + + public static final Option FILE_TEMP_PATH = Options.key("file_temp_path").stringType() + .defaultValue("/tmp/seatunnel/clickhouse-local/file").withDescription("The directory where ClickhouseFile stores temporary files locally."); + } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java index 283c1c4f975..fb00ee995ed 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java @@ -20,10 +20,13 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; +import lombok.Data; + import java.io.Serializable; import java.util.List; import java.util.Map; +@Data public class FileReaderOption implements Serializable { private ShardMetadata shardMetadata; @@ -35,90 +38,29 @@ public class FileReaderOption implements Serializable { private Map nodeUser; private Map nodePassword; private SeaTunnelRowType seaTunnelRowType; + private boolean compatibleMode; + private String fileTempPath; + private String fileFieldsDelimiter; public FileReaderOption(ShardMetadata shardMetadata, Map tableSchema, List fields, String clickhouseLocalPath, ClickhouseFileCopyMethod copyMethod, Map nodeUser, - Map nodePassword) { + boolean nodeFreePass, + Map nodePassword, + boolean compatibleMode, + String fileTempPath, + String fileFieldsDelimiter) { this.shardMetadata = shardMetadata; this.tableSchema = tableSchema; this.fields = fields; this.clickhouseLocalPath = clickhouseLocalPath; this.copyMethod = copyMethod; this.nodeUser = nodeUser; - this.nodePassword = nodePassword; - } - - public SeaTunnelRowType getSeaTunnelRowType() { - return seaTunnelRowType; - } - - public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; - } - - public boolean isNodeFreePass() { - return nodeFreePass; - } - - public void setNodeFreePass(boolean nodeFreePass) { this.nodeFreePass = nodeFreePass; - } - - public String getClickhouseLocalPath() { - return clickhouseLocalPath; - } - - public void setClickhouseLocalPath(String clickhouseLocalPath) { - this.clickhouseLocalPath = clickhouseLocalPath; - } - - public ClickhouseFileCopyMethod getCopyMethod() { - return copyMethod; - } - - public void setCopyMethod(ClickhouseFileCopyMethod copyMethod) { - this.copyMethod = copyMethod; - } - - public Map getNodeUser() { - return nodeUser; - } - - public void setNodeUser(Map nodeUser) { - this.nodeUser = nodeUser; - } - - public Map getNodePassword() { - return nodePassword; - } - - public void setNodePassword(Map nodePassword) { this.nodePassword = nodePassword; - } - - public ShardMetadata getShardMetadata() { - return shardMetadata; - } - - public void setShardMetadata(ShardMetadata shardMetadata) { - this.shardMetadata = shardMetadata; - } - - public Map getTableSchema() { - return tableSchema; - } - - public void setTableSchema(Map tableSchema) { - this.tableSchema = tableSchema; - } - - public List getFields() { - return fields; - } - - public void setFields(List fields) { - this.fields = fields; + this.compatibleMode = compatibleMode; + this.fileFieldsDelimiter = fileFieldsDelimiter; + this.fileTempPath = fileTempPath; } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java index 973478a8b4b..7aa5010f23d 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java @@ -18,11 +18,15 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_ADDRESS; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY; @@ -31,7 +35,10 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -46,8 +53,8 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; @@ -62,10 +69,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; @AutoService(SeaTunnelSink.class) -public class ClickhouseFileSink implements SeaTunnelSink { +public class ClickhouseFileSink implements SeaTunnelSink { private FileReaderOption readerOption; @@ -85,6 +93,10 @@ public void prepare(Config config) throws PrepareFailException { } Map defaultConfigs = ImmutableMap.builder() .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName()) + .put(NODE_FREE_PASSWORD.key(), NODE_FREE_PASSWORD.defaultValue()) + .put(COMPATIBLE_MODE.key(), COMPATIBLE_MODE.defaultValue()) + .put(FILE_TEMP_PATH.key(), FILE_TEMP_PATH.defaultValue()) + .put(FILE_FIELDS_DELIMITER.key(), FILE_FIELDS_DELIMITER.defaultValue()) .build(); config = config.withFallback(ConfigFactory.parseMap(defaultConfigs)); @@ -126,8 +138,13 @@ public void prepare(Config config) throws PrepareFailException { configObject -> configObject.toConfig().getString(PASSWORD.key()))); proxy.close(); + + if (config.getString(FILE_FIELDS_DELIMITER.key()).length() != 1) { + throw new ClickhouseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, FILE_FIELDS_DELIMITER.key() + " must be a single character"); + } this.readerOption = new FileReaderOption(shardMetadata, tableSchema, fields, config.getString(CLICKHOUSE_LOCAL_PATH.key()), - ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, nodePassword); + ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, config.getBoolean(NODE_FREE_PASSWORD.key()), nodePassword, + config.getBoolean(COMPATIBLE_MODE.key()), config.getString(FILE_TEMP_PATH.key()), config.getString(FILE_FIELDS_DELIMITER.key())); } @Override @@ -141,7 +158,22 @@ public SeaTunnelDataType getConsumedType() { } @Override - public SinkWriter createWriter(SinkWriter.Context context) throws IOException { + public SinkWriter createWriter(SinkWriter.Context context) throws IOException { return new ClickhouseFileSinkWriter(readerOption, context); } + + @Override + public Optional> getCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public Optional> createAggregatedCommitter() throws IOException { + return Optional.of(new ClickhouseFileSinkAggCommitter(this.readerOption)); + } + + @Override + public Optional> getAggregatedCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java new file mode 100644 index 00000000000..80ce83aa554 --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; + +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo; + +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClickhouseFileSinkAggCommitter implements SinkAggregatedCommitter { + + private final ClickhouseProxy proxy; + private final ClickhouseTable clickhouseTable; + + public ClickhouseFileSinkAggCommitter(FileReaderOption readerOption) { + proxy = new ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode()); + clickhouseTable = proxy.getClickhouseTable(readerOption.getShardMetadata().getDatabase(), + readerOption.getShardMetadata().getTable()); + } + + @Override + public List commit(List aggregatedCommitInfo) throws IOException { + aggregatedCommitInfo.forEach(commitInfo -> commitInfo.getDetachedFiles().forEach((shard, files) -> { + try { + this.attachFileToClickhouse(shard, files); + } catch (ClickHouseException e) { + throw new SeaTunnelException("failed commit file to clickhouse", e); + } + })); + return new ArrayList<>(); + } + + @Override + public CKFileAggCommitInfo combine(List commitInfos) { + Map> files = new HashMap<>(); + commitInfos.forEach(infos -> infos.getDetachedFiles().forEach((shard, file) -> { + if (files.containsKey(shard)) { + files.get(shard).addAll(file); + } else { + files.put(shard, file); + } + })); + return new CKFileAggCommitInfo(files); + } + + @Override + public void abort(List aggregatedCommitInfo) throws Exception { + + } + + @Override + public void close() throws IOException { + proxy.close(); + } + + private void attachFileToClickhouse(Shard shard, List clickhouseLocalFiles) throws ClickHouseException { + ClickHouseRequest request = proxy.getClickhouseConnection(shard); + for (String clickhouseLocalFile : clickhouseLocalFiles) { + ClickHouseResponse response = request.query(String.format("ALTER TABLE %s ATTACH PART '%s'", + clickhouseTable.getLocalTableName(), + clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 1))).executeAndWait(); + response.close(); + } + } + +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java index 2d9ccf220e3..829d8e005cf 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java @@ -18,10 +18,14 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY; @@ -44,6 +48,6 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH) - .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_PASS).build(); + .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build(); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java index 6f24e2c79bd..14b5c181cb3 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java @@ -27,17 +27,15 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; -import com.clickhouse.client.ClickHouseException; -import com.clickhouse.client.ClickHouseRequest; -import com.clickhouse.client.ClickHouseResponse; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import java.io.BufferedReader; import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -48,7 +46,6 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -58,24 +55,32 @@ import java.util.stream.Collectors; @Slf4j -public class ClickhouseFileSinkWriter implements SinkWriter { - private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/seatunnel-file"; +public class ClickhouseFileSinkWriter implements SinkWriter { + + private static final String CK_LOCAL_CONFIG_TEMPLATE = " %s default default" + + "1"; + private static final String CLICKHOUSE_LOCAL_FILE_SUFFIX = "/local_data.log"; private static final int UUID_LENGTH = 10; private final FileReaderOption readerOption; private final ShardRouter shardRouter; private final ClickhouseProxy proxy; private final ClickhouseTable clickhouseTable; private final Map> shardLocalDataPaths; - private final Map> rowCache; + private final Map rowCache; + + private final Map shardTempFile; + + private final SinkWriter.Context context; public ClickhouseFileSinkWriter(FileReaderOption readerOption, SinkWriter.Context context) { this.readerOption = readerOption; + this.context = context; proxy = new ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode()); shardRouter = new ShardRouter(proxy, this.readerOption.getShardMetadata()); clickhouseTable = proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(), this.readerOption.getShardMetadata().getTable()); rowCache = new HashMap<>(Common.COLLECTION_SIZE); - + shardTempFile = new HashMap<>(); nodePasswordCheck(); // find file local save path of each node @@ -90,7 +95,20 @@ public ClickhouseFileSinkWriter(FileReaderOption readerOption, SinkWriter.Contex @Override public void write(SeaTunnelRow element) throws IOException { Shard shard = shardRouter.getShard(element); - rowCache.computeIfAbsent(shard, k -> new ArrayList<>()).add(element); + FileChannel channel = rowCache.computeIfAbsent(shard, k -> { + try { + String uuid = UUID.randomUUID().toString().substring(0, UUID_LENGTH).replaceAll("-", "_"); + String clickhouseLocalFile = String.format("%s/%s", readerOption.getFileTempPath(), uuid); + FileUtils.forceMkdir(new File(clickhouseLocalFile)); + String clickhouseLocalFileTmpFile = clickhouseLocalFile + CLICKHOUSE_LOCAL_FILE_SUFFIX; + shardTempFile.put(shard, clickhouseLocalFileTmpFile); + return FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), StandardOpenOption.WRITE, + StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); + } catch (IOException e) { + throw new ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "can't create new file to save tmp data", e); + } + }); + saveDataToFile(channel, element); } private void nodePasswordCheck() { @@ -105,62 +123,67 @@ private void nodePasswordCheck() { } @Override - public Optional prepareCommit() throws IOException { - return Optional.empty(); + public Optional prepareCommit() throws IOException { + for (FileChannel channel : rowCache.values()) { + channel.close(); + } + Map> detachedFiles = new HashMap<>(); + shardTempFile.forEach((shard, path) -> { + List clickhouseLocalFiles = null; + try { + clickhouseLocalFiles = generateClickhouseLocalFiles(path); + // move file to server + moveClickhouseLocalFileToServer(shard, clickhouseLocalFiles); + detachedFiles.put(shard, clickhouseLocalFiles); + } catch (Exception e) { + throw new ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data into clickhouse file error", e); + } finally { + if (clickhouseLocalFiles != null && !clickhouseLocalFiles.isEmpty()) { + // clear local file + clearLocalFileDirectory(clickhouseLocalFiles); + } + } + }); + rowCache.clear(); + shardTempFile.clear(); + return Optional.of(new CKFileCommitInfo(detachedFiles)); } @Override public void abortPrepare() { - } @Override public void close() throws IOException { - rowCache.forEach(this::flush); + for (FileChannel channel : rowCache.values()) { + channel.close(); + } } - private void flush(Shard shard, List rows) { - try { - // generate clickhouse local file - // TODO generate file by sub rows to save memory - List clickhouseLocalFiles = generateClickhouseLocalFiles(rows); - // move file to server - attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles); - // clear local file - clearLocalFileDirectory(clickhouseLocalFiles); - } catch (Exception e) { - throw new ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data into clickhouse file error", e); - } + private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row) throws IOException { + String data = this.readerOption.getFields().stream().map(field -> row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString()) + .collect(Collectors.joining(readerOption.getFileFieldsDelimiter())) + "\n"; + MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(), + data.getBytes(StandardCharsets.UTF_8).length); + buffer.put(data.getBytes(StandardCharsets.UTF_8)); } - private List generateClickhouseLocalFiles(List rows) throws IOException, + private List generateClickhouseLocalFiles(String clickhouseLocalFileTmpFile) throws IOException, InterruptedException { - if (rows.isEmpty()) { - return Collections.emptyList(); - } - String uuid = UUID.randomUUID().toString().substring(0, UUID_LENGTH).replaceAll("-", "_"); - String clickhouseLocalFile = String.format("%s/%s", CLICKHOUSE_LOCAL_FILE_PREFIX, uuid); - FileUtils.forceMkdir(new File(clickhouseLocalFile)); - String clickhouseLocalFileTmpFile = clickhouseLocalFile + "/local_data.log"; - try (FileChannel fileChannel = FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), StandardOpenOption.WRITE, - StandardOpenOption.READ, StandardOpenOption.CREATE_NEW)) { - String data = rows.stream() - .map(row -> this.readerOption.getFields().stream().map(field -> row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString()) - .collect(Collectors.joining("\t"))) - .collect(Collectors.joining("\n")); - MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(), - data.getBytes(StandardCharsets.UTF_8).length); - buffer.put(data.getBytes(StandardCharsets.UTF_8)); - } - + // temp file path format prefix//suffix + String[] tmpStrArr = clickhouseLocalFileTmpFile.split("/"); + String uuid = tmpStrArr[tmpStrArr.length - 2]; List localPaths = Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(" ")) .collect(Collectors.toList()); + String clickhouseLocalFile = clickhouseLocalFileTmpFile.substring(0, clickhouseLocalFileTmpFile.length() - CLICKHOUSE_LOCAL_FILE_SUFFIX.length()); List command = new ArrayList<>(localPaths); if (localPaths.size() == 1) { command.add("local"); } command.add("--file"); command.add(clickhouseLocalFileTmpFile); + command.add("--format_csv_delimiter"); + command.add("\"" + readerOption.getFileFieldsDelimiter() + "\""); command.add("-S"); command.add("\"" + this.readerOption.getFields().stream().map(field -> field + " " + readerOption.getTableSchema().get(field)).collect(Collectors.joining(",")) + "\""); command.add("-N"); @@ -178,8 +201,19 @@ private List generateClickhouseLocalFiles(List rows) throw } }).collect(Collectors.joining(",")), uuid)); - command.add("--path"); - command.add("\"" + clickhouseLocalFile + "\""); + if (readerOption.isCompatibleMode()) { + String ckLocalConfigPath = String.format("%s/%s/config.xml", readerOption.getFileTempPath(), uuid); + try (FileWriter writer = new FileWriter(ckLocalConfigPath)) { + writer.write(String.format(CK_LOCAL_CONFIG_TEMPLATE, clickhouseLocalFile)); + } catch (IOException e) { + throw new ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "Error occurs when create ck local config"); + } + command.add("--config-file"); + command.add("\"" + ckLocalConfigPath + "\""); + } else { + command.add("--path"); + command.add("\"" + clickhouseLocalFile + "\""); + } log.info("Generate clickhouse local file command: {}", String.join(" ", command)); ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command)); Process start = processBuilder.start(); @@ -192,6 +226,14 @@ private List generateClickhouseLocalFiles(List rows) throw log.info(line); } } + try (InputStream inputStream = start.getErrorStream(); + InputStreamReader inputStreamReader = new InputStreamReader(inputStream); + BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) { + String line; + while ((line = bufferedReader.readLine()) != null) { + log.error(line); + } + } start.waitFor(); File file = new File(clickhouseLocalFile + "/data/_local/" + clickhouseTable.getLocalTableName()); if (!file.exists()) { @@ -204,10 +246,18 @@ private List generateClickhouseLocalFiles(List rows) throw return Arrays.stream(files) .filter(File::isDirectory) .filter(f -> !"detached".equals(f.getName())) - .map(File::getAbsolutePath).collect(Collectors.toList()); + .map(f -> { + File newFile = new File(f.getParent() + "/" + f.getName() + "_" + context.getIndexOfSubtask()); + if (f.renameTo(newFile)) { + return newFile; + } else { + log.warn("rename file failed, will continue move file, but maybe cause file conflict"); + return f; + } + }).map(File::getAbsolutePath).collect(Collectors.toList()); } - private void attachClickhouseLocalFileToServer(Shard shard, List clickhouseLocalFiles) throws ClickHouseException { + private void moveClickhouseLocalFileToServer(Shard shard, List clickhouseLocalFiles) { String hostAddress = shard.getNode().getAddress().getHostName(); String user = readerOption.getNodeUser().getOrDefault(hostAddress, "root"); String password = readerOption.getNodePassword().getOrDefault(hostAddress, null); @@ -215,22 +265,15 @@ private void attachClickhouseLocalFileToServer(Shard shard, List clickho fileTransfer.init(); fileTransfer.transferAndChown(clickhouseLocalFiles, shardLocalDataPaths.get(shard).get(0) + "detached/"); fileTransfer.close(); - ClickHouseRequest request = proxy.getClickhouseConnection(shard); - for (String clickhouseLocalFile : clickhouseLocalFiles) { - ClickHouseResponse response = request.query(String.format("ALTER TABLE %s ATTACH PART '%s'", - clickhouseTable.getLocalTableName(), - clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 1))).executeAndWait(); - response.close(); - } } private void clearLocalFileDirectory(List clickhouseLocalFiles) { String clickhouseLocalFile = clickhouseLocalFiles.get(0); - String localFileDir = clickhouseLocalFile.substring(0, CLICKHOUSE_LOCAL_FILE_PREFIX.length() + UUID_LENGTH + 1); + String localFileDir = clickhouseLocalFile.substring(0, readerOption.getFileTempPath().length() + UUID_LENGTH + 1); try { File file = new File(localFileDir); if (file.exists()) { - FileUtils.deleteDirectory(new File(localFileDir)); + FileUtils.deleteDirectory(file); } } catch (IOException e) { throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.DELETE_DIRECTORY_FIELD, "Unable to delete directory " + localFileDir, e); diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java new file mode 100644 index 00000000000..9962b9dcb0d --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.state; + +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +@AllArgsConstructor +public class CKFileAggCommitInfo implements Serializable { + + private Map> detachedFiles; + +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java new file mode 100644 index 00000000000..1b5399b7c8b --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.state; + +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +@AllArgsConstructor +public class CKFileCommitInfo implements Serializable { + + private Map> detachedFiles; + +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java index 2a4205016ed..e6c50b0611a 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.ClickhouseSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkFactory; import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceFactory; import org.junit.jupiter.api.Assertions; @@ -29,5 +30,6 @@ public class ClickhouseFactoryTest { public void testOptionRule() { Assertions.assertNotNull((new ClickhouseSourceFactory()).optionRule()); Assertions.assertNotNull((new ClickhouseSinkFactory()).optionRule()); + Assertions.assertNotNull((new ClickhouseFileSinkFactory()).optionRule()); } }