Skip to content

Commit cfb8760

Browse files
authored
[Feature][Connector-V2]Sftp file source support multiple table (#7824)
1 parent 511c8af commit cfb8760

File tree

7 files changed

+293
-115
lines changed

7 files changed

+293
-115
lines changed

docs/en/connector-v2/source/SftpFile.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,4 +264,44 @@ sink {
264264
}
265265
}
266266
```
267+
### Multiple Table
268+
269+
```hocon
270+
271+
SftpFile {
272+
tables_configs = [
273+
{
274+
schema {
275+
table = "student"
276+
fields {
277+
name = string
278+
age = int
279+
}
280+
}
281+
path = "/tmp/seatunnel/sink/text"
282+
host = "192.168.31.48"
283+
port = 21
284+
user = tyrantlucifer
285+
password = tianchao
286+
file_format_type = "parquet"
287+
},
288+
{
289+
schema {
290+
table = "teacher"
291+
fields {
292+
name = string
293+
age = int
294+
}
295+
}
296+
path = "/tmp/seatunnel/sink/text"
297+
host = "192.168.31.48"
298+
port = 21
299+
user = tyrantlucifer
300+
password = tianchao
301+
file_format_type = "parquet"
302+
}
303+
]
304+
}
305+
306+
```
267307

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.sftp.config;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
22+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
23+
24+
public class MultipleTableSFTPFileSourceConfig extends BaseMultipleTableFileSourceConfig {
25+
26+
public MultipleTableSFTPFileSourceConfig(ReadonlyConfig ossFileSourceRootConfig) {
27+
super(ossFileSourceRootConfig);
28+
}
29+
30+
@Override
31+
public BaseFileSourceConfig getBaseSourceConfig(ReadonlyConfig readonlyConfig) {
32+
return new SFTPFileSourceConfig(readonlyConfig);
33+
}
34+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.sftp.config;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
22+
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
23+
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
24+
25+
import lombok.Getter;
26+
27+
@Getter
28+
public class SFTPFileSourceConfig extends BaseFileSourceConfig {
29+
30+
private static final long serialVersionUID = 1L;
31+
32+
@Override
33+
public HadoopConf getHadoopConfig() {
34+
return SftpConf.buildWithConfig(getBaseFileSourceConfig());
35+
}
36+
37+
@Override
38+
public String getPluginName() {
39+
return FileSystemType.SFTP.getFileSystemPluginName();
40+
}
41+
42+
public SFTPFileSourceConfig(ReadonlyConfig readonlyConfig) {
43+
super(readonlyConfig);
44+
}
45+
}

seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java

Lines changed: 6 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -17,121 +17,18 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.file.sftp.source;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
22-
import org.apache.seatunnel.api.common.PrepareFailException;
23-
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
2420
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
25-
import org.apache.seatunnel.api.source.SeaTunnelSource;
26-
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
27-
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
28-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
29-
import org.apache.seatunnel.common.config.CheckConfigUtil;
30-
import org.apache.seatunnel.common.config.CheckResult;
31-
import org.apache.seatunnel.common.constants.PluginType;
32-
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
33-
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
3421
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
35-
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
36-
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
37-
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
38-
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions;
39-
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
40-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
41-
42-
import com.google.auto.service.AutoService;
22+
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.MultipleTableSFTPFileSourceConfig;
23+
import org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
4324

44-
import java.io.IOException;
25+
public class SftpFileSource extends BaseMultipleTableFileSource {
26+
public SftpFileSource(ReadonlyConfig config) {
27+
super(new MultipleTableSFTPFileSourceConfig(config));
28+
}
4529

46-
@AutoService(SeaTunnelSource.class)
47-
public class SftpFileSource extends BaseFileSource {
4830
@Override
4931
public String getPluginName() {
5032
return FileSystemType.SFTP.getFileSystemPluginName();
5133
}
52-
53-
@Override
54-
public void prepare(Config pluginConfig) throws PrepareFailException {
55-
CheckResult result =
56-
CheckConfigUtil.checkAllExists(
57-
pluginConfig,
58-
SftpConfigOptions.FILE_PATH.key(),
59-
SftpConfigOptions.FILE_FORMAT_TYPE.key(),
60-
SftpConfigOptions.SFTP_HOST.key(),
61-
SftpConfigOptions.SFTP_PORT.key(),
62-
SftpConfigOptions.SFTP_USER.key(),
63-
SftpConfigOptions.SFTP_PASSWORD.key());
64-
if (!result.isSuccess()) {
65-
throw new FileConnectorException(
66-
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
67-
String.format(
68-
"PluginName: %s, PluginType: %s, Message: %s",
69-
getPluginName(), PluginType.SOURCE, result.getMsg()));
70-
}
71-
FileFormat fileFormat =
72-
FileFormat.valueOf(
73-
pluginConfig
74-
.getString(SftpConfigOptions.FILE_FORMAT_TYPE.key())
75-
.toUpperCase());
76-
if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
77-
throw new FileConnectorException(
78-
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
79-
"Sftp file source connector only support read [text, csv, json, xml] files");
80-
}
81-
String path = pluginConfig.getString(SftpConfigOptions.FILE_PATH.key());
82-
hadoopConf = SftpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig));
83-
readStrategy =
84-
ReadStrategyFactory.of(
85-
pluginConfig.getString(SftpConfigOptions.FILE_FORMAT_TYPE.key()));
86-
readStrategy.setPluginConfig(pluginConfig);
87-
readStrategy.init(hadoopConf);
88-
try {
89-
filePaths = readStrategy.getFileNamesByPath(path);
90-
} catch (IOException e) {
91-
String errorMsg = String.format("Get file list from this path [%s] failed", path);
92-
throw new FileConnectorException(
93-
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
94-
}
95-
// support user-defined schema
96-
// only json csv text type support user-defined schema now
97-
if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
98-
switch (fileFormat) {
99-
case CSV:
100-
case TEXT:
101-
case JSON:
102-
case EXCEL:
103-
case XML:
104-
SeaTunnelRowType userDefinedSchema =
105-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
106-
readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
107-
rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
108-
break;
109-
case ORC:
110-
case PARQUET:
111-
case BINARY:
112-
throw new FileConnectorException(
113-
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
114-
"SeaTunnel does not support user-defined schema for [parquet, orc, binary] files");
115-
default:
116-
// never got in there
117-
throw new FileConnectorException(
118-
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
119-
"SeaTunnel does not supported this file format");
120-
}
121-
} else {
122-
if (filePaths.isEmpty()) {
123-
// When the directory is empty, distribute default behavior schema
124-
rowType = CatalogTableUtil.buildSimpleTextSchema();
125-
return;
126-
}
127-
try {
128-
rowType = readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
129-
} catch (FileConnectorException e) {
130-
String errorMsg =
131-
String.format("Get table schema from file [%s] failed", filePaths.get(0));
132-
throw new FileConnectorException(
133-
CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
134-
}
135-
}
136-
}
13734
}

seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
2121
import org.apache.seatunnel.api.source.SeaTunnelSource;
22+
import org.apache.seatunnel.api.source.SourceSplit;
2223
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
24+
import org.apache.seatunnel.api.table.connector.TableSource;
2325
import org.apache.seatunnel.api.table.factory.Factory;
2426
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
27+
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
2528
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
2629
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
2730
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
2831
import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfigOptions;
2932

3033
import com.google.auto.service.AutoService;
3134

35+
import java.io.Serializable;
3236
import java.util.Arrays;
3337

3438
@AutoService(Factory.class)
@@ -41,12 +45,12 @@ public String factoryIdentifier() {
4145
@Override
4246
public OptionRule optionRule() {
4347
return OptionRule.builder()
44-
.required(SftpConfigOptions.FILE_PATH)
45-
.required(SftpConfigOptions.SFTP_HOST)
46-
.required(SftpConfigOptions.SFTP_PORT)
47-
.required(SftpConfigOptions.SFTP_USER)
48-
.required(SftpConfigOptions.SFTP_PASSWORD)
49-
.required(BaseSourceConfigOptions.FILE_FORMAT_TYPE)
48+
.optional(SftpConfigOptions.FILE_PATH)
49+
.optional(SftpConfigOptions.SFTP_HOST)
50+
.optional(SftpConfigOptions.SFTP_PORT)
51+
.optional(SftpConfigOptions.SFTP_USER)
52+
.optional(SftpConfigOptions.SFTP_PASSWORD)
53+
.optional(BaseSourceConfigOptions.FILE_FORMAT_TYPE)
5054
.conditional(
5155
BaseSourceConfigOptions.FILE_FORMAT_TYPE,
5256
FileFormat.TEXT,
@@ -75,6 +79,12 @@ public OptionRule optionRule() {
7579
.build();
7680
}
7781

82+
@Override
83+
public <T, SplitT extends SourceSplit, StateT extends Serializable>
84+
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
85+
return () -> (SeaTunnelSource<T, SplitT, StateT>) new SftpFileSource(context.getOptions());
86+
}
87+
7888
@Override
7989
public Class<? extends SeaTunnelSource> getSourceClass() {
8090
return SftpFileSource.class;

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,15 @@ public void testSftpFileReadAndWrite(TestContainer container)
146146
helper.execute("/xml/fake_to_sftp_file_xml.conf");
147147
// test read sftp xml file
148148
helper.execute("/xml/sftp_file_xml_to_assert.conf");
149+
// test sftp source support multipleTable
150+
String homePath = "/home/seatunnel";
151+
String sink01 = "/tmp/multipleSource/seatunnel/json/fake01";
152+
String sink02 = "/tmp/multipleSource/seatunnel/json/fake02";
153+
deleteFileFromContainer(homePath + sink01);
154+
deleteFileFromContainer(homePath + sink02);
155+
helper.execute("/json/sftp_file_json_to_assert_with_multipletable.conf");
156+
Assertions.assertEquals(getFileListFromContainer(homePath + sink01).size(), 1);
157+
Assertions.assertEquals(getFileListFromContainer(homePath + sink02).size(), 1);
149158
}
150159

151160
@TestTemplate

0 commit comments

Comments
 (0)