Skip to content

Commit

Permalink
[Feature][Connectors-v2-file-ftp] FTP source/sink add ftp connection …
Browse files Browse the repository at this point in the history
…mode (#6077)  (#6099)
  • Loading branch information
WilliamTan778 committed Jan 2, 2024
1 parent 6a2ce2b commit f6bcc4d
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 0 deletions.
7 changes: 7 additions & 0 deletions docs/en/connector-v2/sink/FtpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| password | string | yes | - | |
| path | string | yes | - | |
| tmp_path | string | yes | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a FTP dir. |
| connection_mode | string | no | active_local | The target ftp connection mode |
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
Expand Down Expand Up @@ -76,6 +77,12 @@ The target ftp password is required

The target dir path is required.

### connection_mode [string]

The target ftp connection mode , default is active mode, supported as the following modes:

`active_local` `passive_local`

### custom_filename [boolean]

Whether custom the filename
Expand Down
7 changes: 7 additions & 0 deletions docs/en/connector-v2/source/FtpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| password | string | yes | - |
| path | string | yes | - |
| file_format_type | string | yes | - |
| connection_mode | string | no | active_local |
| delimiter/field_delimiter | string | no | \001 |
| read_columns | list | no | - |
| parse_partition_from_path | boolean | no | true |
Expand Down Expand Up @@ -154,6 +155,12 @@ connector will generate data as the following:
|---------------|-----|--------|
| tyrantlucifer | 26 | male |

### connection_mode [string]

The target ftp connection mode , default is active mode, supported as the following modes:

`active_local` `passive_local`

### delimiter/field_delimiter [string]

**delimiter** parameter will deprecate after version 2.3.5, please use **field_delimiter** instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import lombok.Data;
import lombok.NonNull;

import java.io.File;
import java.io.Serializable;
import java.util.Locale;

Expand Down Expand Up @@ -71,6 +72,10 @@ public BaseFileSinkConfig(@NonNull Config config) {
}
checkNotNull(path);

if (path.equals(File.separator)) {
this.path = "";
}

if (config.hasPath(BaseSinkConfig.FILE_NAME_EXPRESSION.key())
&& !StringUtils.isBlank(
config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key()))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public static HadoopConf buildWithConfig(Config config) {
"fs.ftp.user." + host, config.getString(FtpConfigOptions.FTP_USERNAME.key()));
ftpOptions.put(
"fs.ftp.password." + host, config.getString(FtpConfigOptions.FTP_PASSWORD.key()));
if (config.hasPath(FtpConfigOptions.FTP_CONNECTION_MODE.key())) {
ftpOptions.put(
"fs.ftp.connection.mode",
config.getString(FtpConfigOptions.FTP_CONNECTION_MODE.key()));
}
hadoopConf.setExtraOptions(ftpOptions);
return hadoopConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode;

import static org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE;

public class FtpConfigOptions extends BaseSourceConfigOptions {
public static final Option<String> FTP_PASSWORD =
Expand All @@ -36,4 +39,9 @@ public class FtpConfigOptions extends BaseSourceConfigOptions {
Options.key("host").stringType().noDefaultValue().withDescription("FTP server host");
public static final Option<Integer> FTP_PORT =
Options.key("port").intType().noDefaultValue().withDescription("FTP server port");
public static final Option<FtpConnectionMode> FTP_CONNECTION_MODE =
Options.key("connection_mode")
.enumType(FtpConnectionMode.class)
.defaultValue(ACTIVE_LOCAL_DATA_CONNECTION_MODE)
.withDescription("FTP server connection mode ");
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public OptionRule optionRule() {
.optional(BaseSinkConfig.DATE_FORMAT)
.optional(BaseSinkConfig.DATETIME_FORMAT)
.optional(BaseSinkConfig.TIME_FORMAT)
.optional(FtpConfigOptions.FTP_CONNECTION_MODE)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.TIME_FORMAT)
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
.optional(FtpConfigOptions.FTP_CONNECTION_MODE)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.ftp.system;

/** Ftp connection mode enum. href="http://commons.apache.org/net/">Apache Commons Net</a>. */
public enum FtpConnectionMode {

/** ACTIVE_LOCAL_DATA_CONNECTION_MODE */
ACTIVE_LOCAL_DATA_CONNECTION_MODE("active_local"),

/** PASSIVE_LOCAL_DATA_CONNECTION_MODE */
PASSIVE_LOCAL_DATA_CONNECTION_MODE("passive_local");

private final String mode;

FtpConnectionMode(String mode) {
this.mode = mode;
}

public String getMode() {
return mode;
}

public static FtpConnectionMode fromMode(String mode) {
for (FtpConnectionMode ftpConnectionModeEnum : FtpConnectionMode.values()) {
if (ftpConnectionModeEnum.getMode().equals(mode)) {
return ftpConnectionModeEnum;
}
}
throw new IllegalArgumentException("Unknown ftp connection mode: " + mode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
public static final String FS_FTP_HOST = "fs.ftp.host";
public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
public static final String FS_FTP_CONNECTION_MODE = "fs.ftp.connection.mode";

public static final String E_SAME_DIRECTORY_ONLY = "only same directory renames are supported";

private URI uri;
Expand Down Expand Up @@ -153,9 +155,34 @@ private FTPClient connect() throws IOException {
+ "'");
}

setFsFtpConnectionMode(
client,
conf.get(
FS_FTP_CONNECTION_MODE,
FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE.getMode()));

return client;
}

/**
* Set FTP connection mode. *
*
* @param client FTPClient
* @param mode mode
*/
private void setFsFtpConnectionMode(FTPClient client, String mode) {
switch (FtpConnectionMode.fromMode(mode)) {
case ACTIVE_LOCAL_DATA_CONNECTION_MODE:
client.enterLocalActiveMode();
break;
case PASSIVE_LOCAL_DATA_CONNECTION_MODE:
client.enterLocalPassiveMode();
break;
default:
break;
}
}

/**
* Logout and disconnect the given FTPClient. *
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public void startUp() throws Exception {
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
ftpContainer);

ContainerUtil.copyFileIntoContainers(
"/excel/e2e.xlsx", "/home/vsftpd/seatunnel/e2e.xlsx", ftpContainer);

ftpContainer.execInContainer("sh", "-c", "chmod -R 777 /home/vsftpd/seatunnel/");
ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp /home/vsftpd/seatunnel/");
}
Expand Down Expand Up @@ -136,6 +139,8 @@ public void testFtpFileReadAndWrite(TestContainer container)
helper.execute("/parquet/fake_to_ftp_file_parquet.conf");
// test write ftp orc file
helper.execute("/orc/fake_to_ftp_file_orc.conf");
// test write ftp root path excel file
helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf");
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"

# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
FakeSource {
result_table_name = "ftp"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
}
}
}
}
}

sink {
FtpFile {
host = "ftp"
port = 21
user = seatunnel
password = pass
path = "/"
source_table_name = "ftp"
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format_type = "excel"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
}
}

0 comments on commit f6bcc4d

Please sign in to comment.