Skip to content

Commit b5f0b43

Browse files
authored
[Improve][Connector-V2] Ensure that the FTP connector behaves reliably during directory operation (#8959)
1 parent 349f142 commit b5f0b43

File tree

3 files changed

+372
-36
lines changed

3 files changed

+372
-36
lines changed

seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
<artifactId>connector-file-ftp</artifactId>
3030
<name>SeaTunnel : Connectors V2 : File : Ftp</name>
3131

32+
<properties>
33+
<mockftpserver.version>3.1.0</mockftpserver.version>
34+
</properties>
35+
3236
<dependencies>
3337
<dependency>
3438
<groupId>org.apache.seatunnel</groupId>
@@ -48,6 +52,12 @@
4852
</exclusion>
4953
</exclusions>
5054
</dependency>
55+
<dependency>
56+
<groupId>org.mockftpserver</groupId>
57+
<artifactId>MockFtpServer</artifactId>
58+
<version>${mockftpserver.version}</version>
59+
<scope>test</scope>
60+
</dependency>
5161
</dependencies>
5262

5363
</project>

seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java

Lines changed: 94 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,19 @@ public void initialize(URI uri, Configuration conf) throws IOException { // get
127127
* @throws IOException IOException
128128
*/
129129
private FTPClient connect() throws IOException {
130-
FTPClient client = null;
130+
FTPClient client = new FTPClient();
131131
Configuration conf = getConf();
132+
// Get the connection mode from configuration, default to passive_local mode
133+
String connectionMode =
134+
conf.get(FS_FTP_CONNECTION_MODE, FtpConnectionMode.ACTIVE_LOCAL.getMode());
135+
136+
// Retrieve host, port, user, and password from configuration
132137
String host = conf.get(FS_FTP_HOST);
133138
int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
134139
String user = conf.get(FS_FTP_USER_PREFIX + host);
135140
String password = conf.get(FS_FTP_PASSWORD_PREFIX + host);
136-
client = new FTPClient();
141+
142+
// Connect to the FTP server
137143
client.connect(host, port);
138144
int reply = client.getReplyCode();
139145
if (!FTPReply.isPositiveCompletion(reply)) {
@@ -143,23 +149,29 @@ private FTPClient connect() throws IOException {
143149
NetUtils.UNKNOWN_HOST,
144150
0,
145151
new ConnectException("Server response " + reply));
146-
} else if (client.login(user, password)) {
147-
client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
148-
client.setFileType(FTP.BINARY_FILE_TYPE);
149-
client.setBufferSize(DEFAULT_BUFFER_SIZE);
150-
} else {
152+
}
153+
154+
// Log in to the FTP server
155+
if (!client.login(user, password)) {
151156
throw new IOException(
152-
"Login failed on server - "
153-
+ host
154-
+ ", port - "
155-
+ port
156-
+ " as user '"
157-
+ user
158-
+ "'");
157+
String.format(
158+
"Login failed on server - %s, port - %d as user '%s', reply code: %d",
159+
host, port, user, client.getReplyCode()));
159160
}
160161

161-
setFsFtpConnectionMode(
162-
client, conf.get(FS_FTP_CONNECTION_MODE, FtpConnectionMode.ACTIVE_LOCAL.getMode()));
162+
// Set the file type to binary and buffer size
163+
client.setFileType(FTP.BINARY_FILE_TYPE);
164+
client.setBufferSize(DEFAULT_BUFFER_SIZE);
165+
client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
166+
167+
// Set the connection mode
168+
setFsFtpConnectionMode(client, connectionMode);
169+
170+
// Log successful connection information
171+
LOG.info(
172+
String.format(
173+
"Successfully connected to FTP server %s:%d in %s",
174+
host, port, connectionMode));
163175

164176
return client;
165177
}
@@ -170,13 +182,39 @@ private FTPClient connect() throws IOException {
170182
* @param client FTPClient
171183
* @param mode mode
172184
*/
173-
private void setFsFtpConnectionMode(FTPClient client, String mode) {
174-
switch (FtpConnectionMode.fromMode(mode)) {
185+
private void setFsFtpConnectionMode(FTPClient client, String mode) throws IOException {
186+
FtpConnectionMode connectionMode = FtpConnectionMode.fromMode(mode);
187+
switch (connectionMode) {
175188
case PASSIVE_LOCAL:
176189
client.enterLocalPassiveMode();
190+
LOG.info("Using passive mode for FTP connection");
177191
break;
178192
case ACTIVE_LOCAL:
179-
client.enterLocalActiveMode();
193+
// Create a test directory to check if active mode is working
194+
String pathName = "/.ftptest" + System.currentTimeMillis();
195+
try {
196+
client.enterLocalActiveMode();
197+
// test active mode is working or not
198+
boolean created = client.makeDirectory(pathName);
199+
if (!created) {
200+
LOG.warn("Active mode failed, switching to passive mode");
201+
throw new IOException("FTP connection active mode test failed");
202+
}
203+
204+
LOG.info("Using active mode for FTP connection");
205+
} catch (IOException e) {
206+
// if active mode failed, switch to passive mode
207+
client.enterLocalPassiveMode();
208+
// update the connection mode to passive mode
209+
getConf()
210+
.set(FS_FTP_CONNECTION_MODE, FtpConnectionMode.PASSIVE_LOCAL.getMode());
211+
} finally {
212+
// delete the test directory if it was created
213+
FTPFile[] files = client.listFiles(pathName);
214+
if (files != null && files.length > 0) {
215+
client.deleteFile(pathName);
216+
}
217+
}
180218
break;
181219
default:
182220
log.warn(
@@ -548,30 +586,50 @@ public boolean mkdirs(Path file, FsPermission permission) throws IOException {
548586
*/
549587
private boolean mkdirs(FTPClient client, Path file, FsPermission permission)
550588
throws IOException {
551-
boolean created = true;
552589
Path workDir = new Path(client.printWorkingDirectory());
553590
Path absolute = makeAbsolute(workDir, file);
554-
String pathName = absolute.getName();
555-
if (!exists(client, absolute)) {
556-
Path parent = absolute.getParent();
557-
created = parent == null || mkdirs(client, parent, FsPermission.getDirDefault());
558-
if (created) {
559-
String parentDir = parent.toUri().getPath();
560-
client.changeWorkingDirectory(parentDir);
561-
LOG.debug("Creating directory " + pathName);
562-
created = client.makeDirectory(pathName);
591+
// If directory already exists, return true
592+
if (exists(client, absolute)) {
593+
if (isFile(client, absolute)) {
594+
throw new ParentNotDirectoryException(
595+
String.format(
596+
"Can't make directory for path %s since it is a file.", absolute));
563597
}
564-
} else if (isFile(client, absolute)) {
565-
throw new ParentNotDirectoryException(
598+
return true;
599+
}
600+
601+
// Create parent directories if they don't exist
602+
Path parent = absolute.getParent();
603+
if (parent != null && !exists(client, parent)) {
604+
mkdirs(client, parent, FsPermission.getDirDefault());
605+
}
606+
607+
// Create the directory
608+
String pathName = absolute.getName();
609+
String parentDir = parent != null ? parent.toUri().getPath() : "/";
610+
611+
// Change to parent directory
612+
if (!client.changeWorkingDirectory(parentDir)) {
613+
throw new IOException(
566614
String.format(
567-
"Can't make directory for path %s since it is a file.", absolute));
568-
} else {
569-
LOG.debug("Skipping creation of existing directory " + file);
615+
"Failed to change working directory to %s, FTP reply code: %d, reply string: %s",
616+
parentDir, client.getReplyCode(), client.getReplyString()));
570617
}
618+
// Create directory
619+
boolean created = client.makeDirectory(pathName);
571620
if (!created) {
572-
LOG.debug("Failed to create " + file);
621+
// Double check if directory was actually created (some FTP servers don't return true)
622+
if (!exists(client, absolute)) {
623+
throw new IOException(
624+
String.format(
625+
"Failed to create directory %s in %s, FTP reply code: %d, reply string: %s",
626+
pathName,
627+
parentDir,
628+
client.getReplyCode(),
629+
client.getReplyString()));
630+
}
573631
}
574-
return created;
632+
return true;
575633
}
576634

577635
/**

0 commit comments

Comments
 (0)