Skip to content

Commit

Permalink
[Fix] [Connectors-v2-file-ftp] add ftp connection mode (apache#6077)
Browse files Browse the repository at this point in the history
  • Loading branch information
xumingbei committed Dec 26, 2023
1 parent db4531b commit e9c3356
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs/en/connector-v2/sink/FtpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| password | string | yes | - | |
| path | string | yes | - | |
| tmp_path | string | yes | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a FTP dir. |
| active_model_status | boolean | no | true |
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
Expand Down Expand Up @@ -76,6 +77,10 @@ The target ftp password is required

The target dir path is required.

### active_model_status [boolean]

The target ftp model status , default is active model

### custom_filename [boolean]

Whether custom the filename
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/FtpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| password | string | yes | - |
| path | string | yes | - |
| file_format_type | string | yes | - |
| active_model_status | boolean | no | true |
| delimiter/field_delimiter | string | no | \001 |
| read_columns | list | no | - |
| parse_partition_from_path | boolean | no | true |
Expand Down Expand Up @@ -77,6 +78,10 @@ The target ftp password is required

The source file path.

### active_model_status [boolean]

The target ftp model status , default is active model

### file_format_type [string]

File type, supported as the following file types:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public static HadoopConf buildWithConfig(Config config) {
"fs.ftp.user." + host, config.getString(FtpConfigOptions.FTP_USERNAME.key()));
ftpOptions.put(
"fs.ftp.password." + host, config.getString(FtpConfigOptions.FTP_PASSWORD.key()));
if (config.hasPath(FtpConfigOptions.FTP_ACTIVE_MODE_STATUS.key())) {
ftpOptions.put(
"fs.ftp.data.connection.mode." + host,
config.getString(FtpConfigOptions.FTP_ACTIVE_MODE_STATUS.key()));
}
hadoopConf.setExtraOptions(ftpOptions);
return hadoopConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,10 @@ public class FtpConfigOptions extends BaseSourceConfigOptions {
Options.key("host").stringType().noDefaultValue().withDescription("FTP server host");
public static final Option<Integer> FTP_PORT =
Options.key("port").intType().noDefaultValue().withDescription("FTP server port");

public static final Option<Boolean> FTP_ACTIVE_MODE_STATUS =
Options.key("active_model_status")
.booleanType()
.defaultValue(true)
.withDescription("FTP server model default active");
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public OptionRule optionRule() {
.optional(BaseSinkConfig.DATE_FORMAT)
.optional(BaseSinkConfig.DATETIME_FORMAT)
.optional(BaseSinkConfig.TIME_FORMAT)
.optional(FtpConfigOptions.FTP_ACTIVE_MODE_STATUS)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.TIME_FORMAT)
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
.optional(FtpConfigOptions.FTP_ACTIVE_MODE_STATUS)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
public static final String FS_FTP_HOST = "fs.ftp.host";
public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
public static final String FS_FTP_ACTIVE_MODEL_STATUS = "fs.ftp.data.connection.mode.";
public static final String E_SAME_DIRECTORY_ONLY = "only same directory renames are supported";

private URI uri;
Expand Down Expand Up @@ -153,6 +154,11 @@ private FTPClient connect() throws IOException {
+ "'");
}

// set passive mode
if (!conf.getBoolean(FS_FTP_ACTIVE_MODEL_STATUS + host, true)) {
client.enterLocalPassiveMode();
}

return client;
}

Expand Down Expand Up @@ -446,7 +452,6 @@ private FileStatus getFileStatus(FTPClient client, Path file) throws IOException
return new FileStatus(
length, isDir, blockReplication, blockSize, modTime, root.makeQualified(this));
}
client.enterLocalPassiveMode();
String pathName = parentPath.toUri().getPath();
FTPFile[] ftpFiles = client.listFiles(pathName);
if (ftpFiles != null) {
Expand Down

0 comments on commit e9c3356

Please sign in to comment.