Skip to content

Commit 81c5107

Browse files
authored
[Feature][Connector]add s3file save mode function (#6131)
1 parent 919a910 commit 81c5107

File tree

8 files changed

+404
-8
lines changed

8 files changed

+404
-8
lines changed

docs/en/connector-v2/sink/S3File.md

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,12 @@ If write to `csv`, `text` file type, All column will be string.
117117
| max_rows_in_memory | int | no | - | Only used when file_format is excel. |
118118
| sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. |
119119
| hadoop_s3_properties | map | no | | If you need to add a other option, you could add it here and refer to this [link](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html) |
120-
| |
120+
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Before turning on the synchronous task, do different treatment of the target path |
121+
| data_save_mode | Enum | no | APPEND_DATA | Before opening the synchronous task, the data file in the target path is differently processed |
122+
123+
### path [string]
124+
125+
Store the path of the data file to support variable replacement. For example: path=/test/${database_name}/${schema_name}/${table_name}
121126

122127
### hadoop_s3_properties [map]
123128

@@ -241,6 +246,22 @@ When File Format is Excel,The maximum number of data items that can be cached in
241246

242247
Writer the sheet of the workbook
243248

249+
### schema_save_mode[Enum]
250+
251+
Before turning on the synchronous task, do different treatment of the target path.
252+
Option introduction:
253+
`RECREATE_SCHEMA` :Will be created when the path does not exist. If the path already exists, delete the path and recreate it.
254+
`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the path does not exist, use the path when the path is existed.
255+
`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the path does not exist
256+
257+
### data_save_mode[Enum]
258+
259+
Before opening the synchronous task, the data file in the target path is differently processed.
260+
Option introduction:
261+
`DROP_DATA`: use the path but delete data files in the path.
262+
`APPEND_DATA`:use the path, and add new files in the path for write data.
263+
`ERROR_WHEN_DATA_EXISTS`:When there are some data files in the path, an error will is reported.
264+
244265
## Example
245266

246267
### Simple:
@@ -383,10 +404,67 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden
383404
access_key = "xxxxxxxxxxxxxxxxx"
384405
secret_key = "xxxxxxxxxxxxxxxxx"
385406
file_format_type = "orc"
407+
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
408+
data_save_mode="APPEND_DATA"
386409
}
387410
388411
```
389412

413+
Multi-table writing and saveMode
414+
415+
```
416+
env {
417+
"job.name"="SeaTunnel_job"
418+
"job.mode"=STREAMING
419+
}
420+
source {
421+
MySQL-CDC {
422+
423+
"connect.max-retries"=3
424+
"connection.pool.size"=6
425+
"startup.mode"=INITIAL
426+
"exactly_once"="true"
427+
"stop.mode"=NEVER
428+
parallelism=1
429+
"result_table_name"=Table11519548644512
430+
"dag-parsing.mode"=MULTIPLEX
431+
catalog {
432+
factory=Mysql
433+
}
434+
database-names=[
435+
"wls_t1"
436+
]
437+
table-names=[
438+
"wls_t1.mysqlcdc_to_s3_t3",
439+
"wls_t1.mysqlcdc_to_s3_t4",
440+
"wls_t1.mysqlcdc_to_s3_t5",
441+
"wls_t1.mysqlcdc_to_s3_t1",
442+
"wls_t1.mysqlcdc_to_s3_t2"
443+
]
444+
password="xxxxxx"
445+
username="xxxxxxxxxxxxx"
446+
base-url="jdbc:mysql://localhost:3306/qa_source"
447+
server-time-zone=UTC
448+
}
449+
}
450+
transform {
451+
}
452+
sink {
453+
S3File {
454+
bucket = "s3a://seatunnel-test"
455+
tmp_path = "/tmp/seatunnel"
456+
path="/test/${table_name}"
457+
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
458+
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
459+
access_key = "xxxxxxxxxxxxxxxxx"
460+
secret_key = "xxxxxxxxxxxxxxxxx"
461+
file_format_type = "orc"
462+
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
463+
data_save_mode="APPEND_DATA"
464+
}
465+
}
466+
```
467+
390468
## Changelog
391469

392470
### 2.3.0-beta 2022-10-20

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import org.apache.hadoop.fs.FSDataOutputStream;
2929
import org.apache.hadoop.fs.FileStatus;
3030
import org.apache.hadoop.fs.FileSystem;
31+
import org.apache.hadoop.fs.LocatedFileStatus;
3132
import org.apache.hadoop.fs.Path;
33+
import org.apache.hadoop.fs.RemoteIterator;
3234
import org.apache.hadoop.security.UserGroupInformation;
3335

3436
import lombok.NonNull;
@@ -140,6 +142,23 @@ public void createDir(@NonNull String filePath) throws IOException {
140142
}
141143
}
142144

145+
public List<LocatedFileStatus> listFile(String path) throws IOException {
146+
if (fileSystem == null) {
147+
initialize();
148+
}
149+
List<LocatedFileStatus> fileList = new ArrayList<>();
150+
if (!fileExist(path)) {
151+
return fileList;
152+
}
153+
Path fileName = new Path(path);
154+
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator =
155+
fileSystem.listFiles(fileName, false);
156+
while (locatedFileStatusRemoteIterator.hasNext()) {
157+
fileList.add(locatedFileStatusRemoteIterator.next());
158+
}
159+
return fileList;
160+
}
161+
143162
public List<Path> getAllSubFiles(@NonNull String filePath) throws IOException {
144163
if (fileSystem == null) {
145164
initialize();
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.table.catalog.Catalog;
22+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
23+
import org.apache.seatunnel.api.table.catalog.TablePath;
24+
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
25+
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
26+
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
27+
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
28+
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
29+
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
30+
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
31+
32+
import org.apache.commons.collections4.CollectionUtils;
33+
import org.apache.hadoop.fs.LocatedFileStatus;
34+
35+
import lombok.AllArgsConstructor;
36+
import lombok.SneakyThrows;
37+
38+
import java.util.List;
39+
40+
@AllArgsConstructor
41+
public class S3FileCatalog implements Catalog {
42+
43+
private final HadoopFileSystemProxy hadoopFileSystemProxy;
44+
private final ReadonlyConfig readonlyConfig;
45+
46+
@Override
47+
public void open() throws CatalogException {}
48+
49+
@Override
50+
public void close() throws CatalogException {}
51+
52+
@Override
53+
public String name() {
54+
return "S3File";
55+
}
56+
57+
@Override
58+
public String getDefaultDatabase() throws CatalogException {
59+
return null;
60+
}
61+
62+
@Override
63+
public boolean databaseExists(String databaseName) throws CatalogException {
64+
return false;
65+
}
66+
67+
@Override
68+
public List<String> listDatabases() throws CatalogException {
69+
return null;
70+
}
71+
72+
@Override
73+
public List<String> listTables(String databaseName)
74+
throws CatalogException, DatabaseNotExistException {
75+
return null;
76+
}
77+
78+
@SneakyThrows
79+
@Override
80+
public boolean tableExists(TablePath tablePath) throws CatalogException {
81+
return hadoopFileSystemProxy.fileExist(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
82+
}
83+
84+
@Override
85+
public CatalogTable getTable(TablePath tablePath)
86+
throws CatalogException, TableNotExistException {
87+
return null;
88+
}
89+
90+
@SneakyThrows
91+
@Override
92+
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
93+
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
94+
hadoopFileSystemProxy.createDir(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
95+
}
96+
97+
@SneakyThrows
98+
@Override
99+
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
100+
throws TableNotExistException, CatalogException {
101+
hadoopFileSystemProxy.deleteFile(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
102+
}
103+
104+
@Override
105+
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
106+
throws DatabaseAlreadyExistException, CatalogException {}
107+
108+
@Override
109+
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
110+
throws DatabaseNotExistException, CatalogException {}
111+
112+
@SneakyThrows
113+
@Override
114+
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
115+
throws TableNotExistException, CatalogException {
116+
hadoopFileSystemProxy.deleteFile(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
117+
hadoopFileSystemProxy.createDir(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
118+
}
119+
120+
@SneakyThrows
121+
@Override
122+
public boolean isExistsData(TablePath tablePath) {
123+
final List<LocatedFileStatus> locatedFileStatuses =
124+
hadoopFileSystemProxy.listFile(readonlyConfig.get(S3ConfigOptions.FILE_PATH));
125+
return CollectionUtils.isNotEmpty(locatedFileStatuses);
126+
}
127+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.s3.catalog;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.configuration.util.OptionRule;
22+
import org.apache.seatunnel.api.table.catalog.Catalog;
23+
import org.apache.seatunnel.api.table.factory.CatalogFactory;
24+
import org.apache.seatunnel.api.table.factory.Factory;
25+
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
26+
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
27+
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
28+
29+
import com.google.auto.service.AutoService;
30+
31+
@AutoService(Factory.class)
32+
public class S3FileCatalogFactory implements CatalogFactory {
33+
@Override
34+
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
35+
HadoopConf hadoopConf = S3Conf.buildWithReadOnlyConfig(options);
36+
HadoopFileSystemProxy fileSystemUtils = new HadoopFileSystemProxy(hadoopConf);
37+
return new S3FileCatalog(fileSystemUtils, options);
38+
}
39+
40+
@Override
41+
public String factoryIdentifier() {
42+
return "S3";
43+
}
44+
45+
@Override
46+
public OptionRule optionRule() {
47+
return OptionRule.builder().build();
48+
}
49+
}

seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.shade.com.typesafe.config.Config;
2121

22+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2223
import org.apache.seatunnel.common.config.CheckConfigUtil;
2324
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
2425

@@ -70,6 +71,30 @@ public static HadoopConf buildWithConfig(Config config) {
7071
return hadoopConf;
7172
}
7273

74+
public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {
75+
Config config = readonlyConfig.toConfig();
76+
HadoopConf hadoopConf = new S3Conf(readonlyConfig.get(S3ConfigOptions.S3_BUCKET));
77+
String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);
78+
if (bucketName.startsWith(S3A_SCHEMA)) {
79+
SCHEMA = S3A_SCHEMA;
80+
}
81+
HashMap<String, String> s3Options = new HashMap<>();
82+
putS3SK(s3Options, config);
83+
if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
84+
config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
85+
.forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
86+
}
87+
88+
s3Options.put(
89+
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
90+
readonlyConfig.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());
91+
s3Options.put(
92+
S3ConfigOptions.FS_S3A_ENDPOINT.key(),
93+
readonlyConfig.get(S3ConfigOptions.FS_S3A_ENDPOINT));
94+
hadoopConf.setExtraOptions(s3Options);
95+
return hadoopConf;
96+
}
97+
7398
private String switchHdfsImpl() {
7499
switch (SCHEMA) {
75100
case S3A_SCHEMA:

seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3ConfigOptions.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,17 @@
1919

2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.api.sink.DataSaveMode;
23+
import org.apache.seatunnel.api.sink.SchemaSaveMode;
2224
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
2325

26+
import java.util.Arrays;
2427
import java.util.Map;
2528

29+
import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
30+
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
31+
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
32+
2633
public class S3ConfigOptions extends BaseSourceConfigOptions {
2734
public static final Option<String> S3_ACCESS_KEY =
2835
Options.key("access_key")
@@ -48,6 +55,22 @@ public class S3ConfigOptions extends BaseSourceConfigOptions {
4855
.defaultValue(S3aAwsCredentialsProvider.InstanceProfileCredentialsProvider)
4956
.withDescription("s3a aws credentials provider");
5057

58+
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
59+
Options.key("schema_save_mode")
60+
.enumType(SchemaSaveMode.class)
61+
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
62+
.withDescription(
63+
"Before the synchronization task begins, process the existing path");
64+
65+
public static final Option<DataSaveMode> DATA_SAVE_MODE =
66+
Options.key("data_save_mode")
67+
.singleChoice(
68+
DataSaveMode.class,
69+
Arrays.asList(DROP_DATA, APPEND_DATA, ERROR_WHEN_DATA_EXISTS))
70+
.defaultValue(APPEND_DATA)
71+
.withDescription(
72+
"Before the synchronization task begins, different processing of data files that already exist in the directory");
73+
5174
/**
5275
* The current key for that config option. if you need to add a new option, you can add it here
5376
* and refer to this:

0 commit comments

Comments
 (0)