Skip to content

Commit dc4b989

Browse files
authored
[Feature][Connector-V2] sftp file sink suport multiple table and save mode (#7668)
1 parent fb89033 commit dc4b989

File tree

10 files changed

+460
-49
lines changed

10 files changed

+460
-49
lines changed

docs/en/connector-v2/sink/SftpFile.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ By default, we use 2PC commit to ensure `exactly-once`
6363
| parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. |
6464
| parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. |
6565
| encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. |
66+
| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method |
67+
| data_save_mode | string | no | APPEND_DATA | Existing data processing method |
6668

6769
### host [string]
6870

@@ -220,6 +222,19 @@ Support writing Parquet INT96 from a 12-byte field, only valid for parquet files
220222
Only used when file_format_type is json,text,csv,xml.
221223
The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`.
222224

225+
### schema_save_mode [string]
226+
Existing dir processing method.
227+
- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist
228+
- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist
229+
- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist
230+
- IGNORE :Ignore the treatment of the table
231+
232+
### data_save_mode [string]
233+
Existing data processing method.
234+
- DROP_DATA: preserve dir and delete data files
235+
- APPEND_DATA: preserve dir, preserve data files
236+
- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported
237+
223238
## Example
224239

225240
For text file format with `have_partition` and `custom_filename` and `sink_columns`
@@ -247,6 +262,35 @@ SftpFile {
247262
is_enable_transaction = true
248263
}
249264

265+
```
266+
267+
When our source end is multiple tables, and wants different expressions to different directory, we can configure this way
268+
269+
```hocon
270+
SftpFile {
271+
host = "xxx.xxx.xxx.xxx"
272+
port = 22
273+
user = "username"
274+
password = "password"
275+
path = "/data/sftp/seatunnel/job1/${table_name}"
276+
tmp_path = "/data/sftp/seatunnel/tmp"
277+
file_format_type = "text"
278+
field_delimiter = "\t"
279+
row_delimiter = "\n"
280+
have_partition = true
281+
partition_by = ["age"]
282+
partition_dir_expression = "${k0}=${v0}"
283+
is_partition_field_write_in_file = true
284+
custom_filename = true
285+
file_name_expression = "${transactionId}_${now}"
286+
filename_time_format = "yyyy.MM.dd"
287+
sink_columns = ["name","age"]
288+
is_enable_transaction = true
289+
schema_save_mode=RECREATE_SCHEMA
290+
data_save_mode=DROP_DATA
291+
}
292+
293+
250294
```
251295

252296
## Changelog
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.sftp.catalog;
19+
20+
import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
21+
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
22+
23+
public class SftpFileCatalog extends AbstractFileCatalog {
24+
25+
public SftpFileCatalog(
26+
HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) {
27+
super(hadoopFileSystemProxy, filePath, catalogName);
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.sftp.catalog;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.table.catalog.Catalog;
23+
import org.apache.seatunnel.api.table.factory.CatalogFactory;
24+
import org.apache.seatunnel.api.table.factory.Factory;
25+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
26+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
27+
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
28+
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
29+
30+
import com.google.auto.service.AutoService;
31+
32+
@AutoService(Factory.class)
33+
public class SftpFileCatalogFactory implements CatalogFactory {
34+
@Override
35+
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
36+
HadoopFileSystemProxy fileSystemUtils =
37+
new HadoopFileSystemProxy(SftpConf.buildWithConfig(options));
38+
return new SftpFileCatalog(
39+
fileSystemUtils,
40+
options.get(BaseSourceConfigOptions.FILE_PATH),
41+
factoryIdentifier());
42+
}
43+
44+
@Override
45+
public String factoryIdentifier() {
46+
return FileSystemType.SFTP.getFileSystemPluginName();
47+
}
48+
49+
@Override
50+
public OptionRule optionRule() {
51+
return OptionRule.builder().build();
52+
}
53+
}

seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.sftp.config;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2221
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2322

2423
import java.util.HashMap;
@@ -42,20 +41,16 @@ public String getSchema() {
4241
return SCHEMA;
4342
}
4443

45-
public static HadoopConf buildWithConfig(Config config) {
46-
String host = config.getString(SftpConfigOptions.SFTP_HOST.key());
47-
int port = config.getInt(SftpConfigOptions.SFTP_PORT.key());
44+
public static HadoopConf buildWithConfig(ReadonlyConfig config) {
45+
String host = config.get(SftpConfigOptions.SFTP_HOST);
46+
int port = config.get(SftpConfigOptions.SFTP_PORT);
4847
String defaultFS = String.format("sftp://%s:%s", host, port);
4948
HadoopConf hadoopConf = new SftpConf(defaultFS);
5049
HashMap<String, String> sftpOptions = new HashMap<>();
50+
sftpOptions.put("fs.sftp.user." + host, config.get(SftpConfigOptions.SFTP_USER));
5151
sftpOptions.put(
52-
"fs.sftp.user." + host, config.getString(SftpConfigOptions.SFTP_USER.key()));
53-
sftpOptions.put(
54-
"fs.sftp.password."
55-
+ host
56-
+ "."
57-
+ config.getString(SftpConfigOptions.SFTP_USER.key()),
58-
config.getString(SftpConfigOptions.SFTP_PASSWORD.key()));
52+
"fs.sftp.password." + host + "." + config.get(SftpConfigOptions.SFTP_USER),
53+
config.get(SftpConfigOptions.SFTP_PASSWORD));
5954
hadoopConf.setExtraOptions(sftpOptions);
6055
return hadoopConf;
6156
}

seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,46 +17,19 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.sftp.sink;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.common.PrepareFailException;
23-
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
24-
import org.apache.seatunnel.api.sink.SeaTunnelSink;
25-
import org.apache.seatunnel.common.config.CheckConfigUtil;
26-
import org.apache.seatunnel.common.config.CheckResult;
27-
import org.apache.seatunnel.common.constants.PluginType;
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2822
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
29-
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
3023
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
31-
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions;
32-
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
24+
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
3325

34-
import com.google.auto.service.AutoService;
26+
public class SftpFileSink extends BaseMultipleTableFileSink {
27+
public SftpFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
28+
super(SftpConf.buildWithConfig(readonlyConfig), readonlyConfig, catalogTable);
29+
}
3530

36-
@AutoService(SeaTunnelSink.class)
37-
public class SftpFileSink extends BaseFileSink {
3831
@Override
3932
public String getPluginName() {
4033
return FileSystemType.SFTP.getFileSystemPluginName();
4134
}
42-
43-
@Override
44-
public void prepare(Config pluginConfig) throws PrepareFailException {
45-
CheckResult result =
46-
CheckConfigUtil.checkAllExists(
47-
pluginConfig,
48-
SftpConfigOptions.SFTP_HOST.key(),
49-
SftpConfigOptions.SFTP_PORT.key(),
50-
SftpConfigOptions.SFTP_USER.key(),
51-
SftpConfigOptions.SFTP_PASSWORD.key());
52-
if (!result.isSuccess()) {
53-
throw new FileConnectorException(
54-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
55-
String.format(
56-
"PluginName: %s, PluginType: %s, Message: %s",
57-
getPluginName(), PluginType.SINK, result.getMsg()));
58-
}
59-
super.prepare(pluginConfig);
60-
hadoopConf = SftpConf.buildWithConfig(pluginConfig);
61-
}
6235
}

seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,27 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.sftp.sink;
1919

20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2021
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.sink.SinkCommonOptions;
23+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
24+
import org.apache.seatunnel.api.table.connector.TableSink;
2125
import org.apache.seatunnel.api.table.factory.Factory;
22-
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
26+
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
27+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2328
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
2429
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
2530
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
31+
import org.apache.seatunnel.connectors.seatunnel.file.factory.BaseMultipleTableFileSinkFactory;
2632
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions;
33+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
34+
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
35+
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
2736

2837
import com.google.auto.service.AutoService;
2938

3039
@AutoService(Factory.class)
31-
public class SftpFileSinkFactory implements TableSinkFactory {
40+
public class SftpFileSinkFactory extends BaseMultipleTableFileSinkFactory {
3241
@Override
3342
public String factoryIdentifier() {
3443
return FileSystemType.SFTP.getFileSystemPluginName();
@@ -43,6 +52,9 @@ public OptionRule optionRule() {
4352
.required(SftpConfigOptions.SFTP_USER)
4453
.required(SftpConfigOptions.SFTP_PASSWORD)
4554
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
55+
.optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
56+
.optional(BaseSinkConfig.DATA_SAVE_MODE)
57+
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
4658
.conditional(
4759
BaseSinkConfig.FILE_FORMAT_TYPE,
4860
FileFormat.TEXT,
@@ -93,4 +105,12 @@ public OptionRule optionRule() {
93105
.optional(BaseSinkConfig.TIME_FORMAT)
94106
.build();
95107
}
108+
109+
@Override
110+
public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo>
111+
createSink(TableSinkFactoryContext context) {
112+
ReadonlyConfig readonlyConfig = context.getOptions();
113+
CatalogTable catalogTable = context.getCatalogTable();
114+
return () -> new SftpFileSink(readonlyConfig, catalogTable);
115+
}
96116
}

seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.seatunnel.api.common.PrepareFailException;
2323
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
24+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2425
import org.apache.seatunnel.api.source.SeaTunnelSource;
2526
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
2627
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
@@ -78,7 +79,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
7879
"Sftp file source connector only support read [text, csv, json, xml] files");
7980
}
8081
String path = pluginConfig.getString(SftpConfigOptions.FILE_PATH.key());
81-
hadoopConf = SftpConf.buildWithConfig(pluginConfig);
82+
hadoopConf = SftpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig));
8283
readStrategy =
8384
ReadStrategyFactory.of(
8485
pluginConfig.getString(SftpConfigOptions.FILE_FORMAT_TYPE.key()));

0 commit comments

Comments
 (0)