From f435a12a14d68943741eef5b719bd980ec62fa0e Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 4 Jul 2017 10:48:41 +0200 Subject: [PATCH 1/3] NIFI-3281 - fix for (S)FTP processors when using EL against FFs --- .../standard/FetchFileTransfer.java | 3 +- .../processors/standard/util/FTPTransfer.java | 8 ++++- .../standard/util/FileTransfer.java | 2 ++ .../standard/util/SFTPTransfer.java | 5 +++ .../nifi/processors/standard/TestFTP.java | 31 ++++++++++++++++++- .../standard/TestFetchFileTransfer.java | 4 +++ 6 files changed, 50 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index 6182b0aa8427..1216e71605b8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -239,9 +239,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session @Override public void process(final OutputStream out) throws IOException { StreamUtils.copy(in, out); - transfer.flush(); } }); + + transfer.flush(flowFile); transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime())); } catch (final FileNotFoundException e) { getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java index b64a6f88d1a3..43c132bd79cc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java @@ -289,7 +289,7 @@ public InputStream getInputStream(String remoteFileName) throws IOException { @Override public InputStream getInputStream(final String remoteFileName, final FlowFile flowFile) throws IOException { - final FTPClient client = getClient(null); + final FTPClient client = getClient(flowFile); InputStream in = client.retrieveFileStream(remoteFileName); if (in == null) { throw new IOException(client.getReplyString()); @@ -303,6 +303,12 @@ public void flush() throws IOException { client.completePendingCommand(); } + @Override + public void flush(final FlowFile flowFile) throws IOException { + final FTPClient client = getClient(flowFile); + client.completePendingCommand(); + } + @Override public FileInfo getRemoteFileInfo(final FlowFile flowFile, String path, String remoteFileName) throws IOException { final FTPClient client = getClient(flowFile); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java index 22d9ec52eace..60d802472ee3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java @@ -39,6 +39,8 @@ public interface FileTransfer extends Closeable { void flush() throws IOException; + void flush(FlowFile flowFile) throws IOException; + FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException; String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index a6a9e4b70b9c..4f408fae4bce 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -308,6 +308,11 @@ public void flush() throws IOException { // nothing needed here } + @Override + public void flush(final FlowFile flowFile) throws IOException { + // nothing needed here + } + @Override public void deleteFile(final String path, final String remoteFileName) throws IOException { final String fullPath = (path == null) ? remoteFileName : (path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java index 102931f28c2d..0ebc1c07fdb5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java @@ -36,7 +36,6 @@ import org.mockftpserver.fake.filesystem.WindowsFakeFileSystem; import java.io.FileInputStream; - import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -167,4 +166,34 @@ public void basicFileGet() throws IOException { final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(GetFTP.REL_SUCCESS).get(0); retrievedFile.assertContentEquals("Just some random test test test chocolate"); } + + @Test + public void basicFileFetch() throws IOException { + FileSystem results = fakeFtpServer.getFileSystem(); + + FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2"); + sampleFile.setContents("Just some random test test test chocolate"); + results.add(sampleFile); + + // Check file exists + Assert.assertTrue(results.exists("c:\\data\\randombytes-2")); + + TestRunner runner = TestRunners.newTestRunner(FetchFTP.class); + runner.setProperty(FetchFTP.HOSTNAME, "${host}"); + runner.setProperty(FetchFTP.USERNAME, "${username}"); + runner.setProperty(FTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, "${port}"); + runner.setProperty(FetchFTP.REMOTE_FILENAME, "c:\\data\\randombytes-2"); + + Map attrs = new HashMap(); + attrs.put("host", "localhost"); + attrs.put("username", username); + attrs.put("port", Integer.toString(ftpPort)); + runner.enqueue("", attrs); + + runner.run(); + + final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0); + retrievedFile.assertContentEquals("Just some random test test test chocolate"); + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java index 2b78a4b09ffc..c47c0d4cab50 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java @@ -281,6 +281,10 @@ public InputStream getInputStream(String remoteFileName, FlowFile flowFile) thro public void flush() throws IOException { } + @Override + public void flush(FlowFile flowFile) throws IOException { + } + @Override public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException { return null; From 0fe7ee86ac16f6e21fdeffb24273a311809006b0 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Thu, 6 Jul 2017 18:24:24 +0200 Subject: [PATCH 2/3] NIFI-3281 - Review - handle completePendingCommand return and added a unit test for ListFTP --- .../standard/FetchFileTransfer.java | 5 ++- .../processors/standard/ListFileTransfer.java | 2 +- .../processors/standard/util/FTPTransfer.java | 5 ++- .../standard/util/FileTransfer.java | 2 +- .../standard/util/SFTPTransfer.java | 4 +-- .../nifi/processors/standard/TestFTP.java | 34 +++++++++++++++++++ .../standard/TestFetchFileTransfer.java | 3 +- 7 files changed, 46 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index 1216e71605b8..4fc4fa391700 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -242,7 +242,10 @@ public void process(final OutputStream out) throws IOException { } }); - transfer.flush(flowFile); + if(!transfer.flush(flowFile)) { + throw new IOException("completePendingCommand returned false, file transfer failed"); + } + transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime())); } catch (final FileNotFoundException e) { getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java index 58e443a6f582..3f35c4e399a1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java @@ -94,7 +94,7 @@ protected Map createAttributes(final FileInfo fileInfo, final Pr @Override protected String getPath(final ProcessContext context) { - return context.getProperty(REMOTE_PATH).getValue(); + return context.getProperty(REMOTE_PATH).evaluateAttributeExpressions().getValue(); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java index 43c132bd79cc..831e4135b1bc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java @@ -304,9 +304,8 @@ public void flush() throws IOException { } @Override - public void flush(final FlowFile flowFile) throws IOException { - final FTPClient client = getClient(flowFile); - client.completePendingCommand(); + public boolean flush(final FlowFile flowFile) throws IOException { + return getClient(flowFile).completePendingCommand(); } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java index 60d802472ee3..20baeee62607 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java @@ -39,7 +39,7 @@ public interface FileTransfer extends Closeable { void flush() throws IOException; - void flush(FlowFile flowFile) throws IOException; + boolean flush(FlowFile flowFile) throws IOException; FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index 4f408fae4bce..d4f14cad7337 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -309,8 +309,8 @@ public void flush() throws IOException { } @Override - public void flush(final FlowFile flowFile) throws IOException { - // nothing needed here + public boolean flush(final FlowFile flowFile) throws IOException { + return true; } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java index 0ebc1c07fdb5..b125dc6d0176 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java @@ -196,4 +196,38 @@ public void basicFileFetch() throws IOException { final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0); retrievedFile.assertContentEquals("Just some random test test test chocolate"); } + + @Test + public void basicFileList() throws IOException { + FileSystem results = fakeFtpServer.getFileSystem(); + + FileEntry sampleFile = new FileEntry("c:\\data\\randombytes-2"); + sampleFile.setContents("Just some random test test test chocolate"); + results.add(sampleFile); + + // Check file exists + Assert.assertTrue(results.exists("c:\\data\\randombytes-2")); + + TestRunner runner = TestRunners.newTestRunner(ListFTP.class); + runner.setProperty(ListFTP.HOSTNAME, "localhost"); + runner.setProperty(ListFTP.USERNAME, username); + runner.setProperty(FTPTransfer.PASSWORD, password); + runner.setProperty(FTPTransfer.PORT, Integer.toString(ftpPort)); + runner.setProperty(ListFTP.REMOTE_PATH, "/"); + runner.assertValid(); + + runner.run(); + + final MockFlowFile retrievedFile = runner.getFlowFilesForRelationship(FetchFTP.REL_SUCCESS).get(0); + runner.assertAllFlowFilesContainAttribute("ftp.remote.host"); + runner.assertAllFlowFilesContainAttribute("ftp.remote.port"); + runner.assertAllFlowFilesContainAttribute("ftp.listing.user"); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_OWNER_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_GROUP_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_SIZE_ATTRIBUTE); + runner.assertAllFlowFilesContainAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE); + retrievedFile.assertAttributeEquals("ftp.listing.user", username); + retrievedFile.assertAttributeEquals("filename", "randombytes-2"); + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java index c47c0d4cab50..96f5f47bb883 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java @@ -282,7 +282,8 @@ public void flush() throws IOException { } @Override - public void flush(FlowFile flowFile) throws IOException { + public boolean flush(FlowFile flowFile) throws IOException { + return true; } @Override From 12d18dda17a27553810bb467e1b57b977bf71eac Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 18 Jul 2017 14:16:36 +0200 Subject: [PATCH 3/3] NIFI-3281 - Review - Added flow file for EL evaluation in other methods and added unit test for NIFI-3590 --- .../nifi/processors/standard/FetchFileTransfer.java | 4 ++-- .../nifi/processors/standard/GetFileTransfer.java | 2 +- .../nifi/processors/standard/PutFileTransfer.java | 2 +- .../nifi/processors/standard/util/FTPTransfer.java | 12 ++++++------ .../nifi/processors/standard/util/FileTransfer.java | 6 +++--- .../nifi/processors/standard/util/SFTPTransfer.java | 8 ++++---- .../org/apache/nifi/processors/standard/TestFTP.java | 3 +++ .../processors/standard/TestFetchFileTransfer.java | 6 +++--- 8 files changed, 23 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index 4fc4fa391700..0023c4b2f59f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -301,7 +301,7 @@ public void process(final OutputStream out) throws IOException { final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue(); if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) { try { - transfer.deleteFile(null, filename); + transfer.deleteFile(flowFile, null, filename); } catch (final FileNotFoundException e) { // file doesn't exist -- effectively the same as removing it. Move on. } catch (final IOException ioe) { @@ -317,7 +317,7 @@ public void process(final OutputStream out) throws IOException { final String target = targetDir + simpleFilename; try { - transfer.rename(filename, target); + transfer.rename(flowFile, filename, target); } catch (final IOException ioe) { getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}", new Object[] {flowFile, host, port, filename, ioe}, ioe); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java index 00dfccf0486d..4ce31de485c6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java @@ -208,7 +208,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (deleteOriginal) { try { - transfer.deleteFile(null, file.getFullPathFileName()); + transfer.deleteFile(flowFile, null, file.getFullPathFileName()); } catch (final IOException e) { logger.error("Failed to remove remote file {} due to {}; deleting local copy", new Object[]{file.getFullPathFileName(), e}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java index 054d1d871b4a..cbaa9ecaab38 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java @@ -230,7 +230,7 @@ private ConflictResult identifyAndResolveConflictFile( logger.warn("Resolving conflict by rejecting {} due to conflicting filename with a directory or file already on remote server", new Object[]{flowFile}); break; case FileTransfer.CONFLICT_RESOLUTION_REPLACE: - transfer.deleteFile(path, fileName); + transfer.deleteFile(flowFile, path, fileName); destinationRelationship = REL_SUCCESS; transferFile = true; penalizeFile = false; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java index 831e4135b1bc..aeec6c3fef07 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java @@ -449,8 +449,8 @@ public String put(final FlowFile flowFile, final String path, final String filen @Override - public void rename(final String source, final String target) throws IOException { - final FTPClient client = getClient(null); + public void rename(final FlowFile flowFile, final String source, final String target) throws IOException { + final FTPClient client = getClient(flowFile); final boolean renameSuccessful = client.rename(source, target); if (!renameSuccessful) { throw new IOException("Failed to rename temporary file " + source + " to " + target + " due to: " + client.getReplyString()); @@ -458,8 +458,8 @@ public void rename(final String source, final String target) throws IOException } @Override - public void deleteFile(final String path, final String remoteFileName) throws IOException { - final FTPClient client = getClient(null); + public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException { + final FTPClient client = getClient(flowFile); if (path != null) { setWorkingDirectory(path); } @@ -469,8 +469,8 @@ public void deleteFile(final String path, final String remoteFileName) throws IO } @Override - public void deleteDirectory(final String remoteDirectoryName) throws IOException { - final FTPClient client = getClient(null); + public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException { + final FTPClient client = getClient(flowFile); final boolean success = client.removeDirectory(remoteDirectoryName); if (!success) { throw new IOException("Failed to remove directory " + remoteDirectoryName + " due to " + client.getReplyString()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java index 20baeee62607..ac7f728ea06a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java @@ -45,11 +45,11 @@ public interface FileTransfer extends Closeable { String put(FlowFile flowFile, String path, String filename, InputStream content) throws IOException; - void rename(String source, String target) throws IOException; + void rename(FlowFile flowFile, String source, String target) throws IOException; - void deleteFile(String path, String remoteFileName) throws IOException; + void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException; - void deleteDirectory(String remoteDirectoryName) throws IOException; + void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException; boolean isClosed(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index d4f14cad7337..bc31ba967cba 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -314,7 +314,7 @@ public boolean flush(final FlowFile flowFile) throws IOException { } @Override - public void deleteFile(final String path, final String remoteFileName) throws IOException { + public void deleteFile(final FlowFile flowFile, final String path, final String remoteFileName) throws IOException { final String fullPath = (path == null) ? remoteFileName : (path.endsWith("/")) ? path + remoteFileName : path + "/" + remoteFileName; try { sftp.rm(fullPath); @@ -331,7 +331,7 @@ public void deleteFile(final String path, final String remoteFileName) throws IO } @Override - public void deleteDirectory(final String remoteDirectoryName) throws IOException { + public void deleteDirectory(final FlowFile flowFile, final String remoteDirectoryName) throws IOException { try { sftp.rm(remoteDirectoryName); } catch (final SftpException e) { @@ -618,8 +618,8 @@ public String put(final FlowFile flowFile, final String path, final String filen } @Override - public void rename(final String source, final String target) throws IOException { - final ChannelSftp sftp = getChannel(null); + public void rename(final FlowFile flowFile, final String source, final String target) throws IOException { + final ChannelSftp sftp = getChannel(flowFile); try { sftp.rename(source, target); } catch (final SftpException e) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java index b125dc6d0176..d8797dc8a9df 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java @@ -184,6 +184,9 @@ public void basicFileFetch() throws IOException { runner.setProperty(FTPTransfer.PASSWORD, password); runner.setProperty(FTPTransfer.PORT, "${port}"); runner.setProperty(FetchFTP.REMOTE_FILENAME, "c:\\data\\randombytes-2"); + runner.setProperty(FetchFTP.COMPLETION_STRATEGY, FetchFTP.COMPLETION_MOVE); + runner.setProperty(FetchFTP.MOVE_DESTINATION_DIR, "data"); + Map attrs = new HashMap(); attrs.put("host", "localhost"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java index 96f5f47bb883..4965893e3664 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java @@ -297,7 +297,7 @@ public String put(FlowFile flowFile, String path, String filename, InputStream c } @Override - public void deleteFile(String path, String remoteFileName) throws IOException { + public void deleteFile(FlowFile flowFile, String path, String remoteFileName) throws IOException { if (!allowDelete) { throw new PermissionDeniedException("test permission denied"); } @@ -310,7 +310,7 @@ public void deleteFile(String path, String remoteFileName) throws IOException { } @Override - public void rename(String source, String target) throws IOException { + public void rename(FlowFile flowFile, String source, String target) throws IOException { if (!allowRename) { throw new PermissionDeniedException("test permission denied"); } @@ -324,7 +324,7 @@ public void rename(String source, String target) throws IOException { } @Override - public void deleteDirectory(String remoteDirectoryName) throws IOException { + public void deleteDirectory(FlowFile flowFile, String remoteDirectoryName) throws IOException { }