From aeab1feeb3c5f47858793e00425b26cc9f2d5b0b Mon Sep 17 00:00:00 2001 From: Francois Prunier Date: Fri, 3 Mar 2017 11:41:42 +0100 Subject: [PATCH 1/2] NIFI-3204 fix handling deleting a path with a wildcard when the processor is invoqued via an incoming flowfile --- .../nifi/processors/hadoop/DeleteHDFS.java | 21 +++++++++++++--- .../processors/hadoop/TestDeleteHDFS.java | 25 +++++++++++++++++++ 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java index ed4d10db8209..1cda3b462276 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -47,10 +47,10 @@ @TriggerWhenEmpty @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) @Tags({ "hadoop", "HDFS", "delete", "remove", "filesystem", "restricted" }) -@CapabilityDescription("Deletes a file from HDFS. The file can be provided as an attribute from an incoming FlowFile, " - + "or a statically set file that is periodically removed. If this processor has an incoming connection, it" +@CapabilityDescription("Deletes one or more files or directories from HDFS. The path can be provided as an attribute from an incoming FlowFile, " + + "or a statically set path that is periodically removed. If this processor has an incoming connection, it" + "will ignore running on a periodic basis and instead rely on incoming FlowFiles to trigger a delete. " - + "Optionally, you may specify use a wildcard character to match multiple files or directories.") + + "Note that you may use a wildcard character to match multiple files or directories.") @Restricted("Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") public class DeleteHDFS extends AbstractHadoopProcessor { public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -141,20 +141,33 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro } Map attributes = Maps.newHashMapWithExpectedSize(2); + boolean foundMissingFile = false; for (Path path : pathList) { attributes.put("filename", path.getName()); attributes.put("path", path.getParent().toString()); if (fileSystem.exists(path)) { fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean()); + if (!context.hasIncomingConnection()) { flowFile = session.create(); + session.transfer(session.putAllAttributes(flowFile, attributes), REL_SUCCESS); } - session.transfer(session.putAllAttributes(flowFile, attributes), REL_SUCCESS); + } else { getLogger().warn("File (" + path + ") does not exist"); + if (!context.hasIncomingConnection()) { flowFile = session.create(); + session.transfer(session.putAllAttributes(flowFile, attributes), REL_FAILURE); } + + } + } + if (context.hasIncomingConnection()) { + // TODO we only put the last path deleted, change the semantic of the processor? + if (!foundMissingFile) { + session.transfer(session.putAllAttributes(flowFile, attributes), REL_SUCCESS); + } else { session.transfer(session.putAllAttributes(flowFile, attributes), REL_FAILURE); } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java index 0cb371c83fe0..9143d8388f5e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java @@ -168,6 +168,31 @@ public void testGlobDelete() throws Exception { } } + @Test + public void testGlobDeleteFromIncomingFlowFile() throws Exception { + Path glob = new Path("/data/for/2017/08/05/*"); + int fileCount = 300; + FileStatus[] fileStatuses = new FileStatus[fileCount]; + for (int i = 0; i < fileCount; i++) { + Path file = new Path("/data/for/2017/08/05/file" + i); + FileStatus fileStatus = mock(FileStatus.class); + when(fileStatus.getPath()).thenReturn(file); + fileStatuses[i] = fileStatus; + } + when(mockFileSystem.exists(any(Path.class))).thenReturn(true); + when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); + DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(deleteHDFS); + runner.setIncomingConnection(true); + Map attributes = Maps.newHashMap(); + runner.enqueue("foo", attributes); + runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString()); + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); + } + private static class TestableDeleteHDFS extends DeleteHDFS { private KerberosProperties testKerberosProperties; private FileSystem mockFileSystem; From a82fd2f67ab4c72bc70b0f12730e342d3128c87c Mon Sep 17 00:00:00 2001 From: Francois Prunier Date: Mon, 10 Apr 2017 11:28:01 +0200 Subject: [PATCH 2/2] applied Joseph Witts patch, see https://issues.apache.org/jira/browse/NIFI-3204?focusedCommentId=15961623&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15961623 --- .../nifi/processors/hadoop/DeleteHDFS.java | 66 +++++++------------ .../processors/hadoop/TestDeleteHDFS.java | 34 ++-------- 2 files changed, 30 insertions(+), 70 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java index 1cda3b462276..cdabc80cc5a6 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.hadoop; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -39,33 +38,38 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.nifi.annotation.documentation.SeeAlso; @TriggerWhenEmpty @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) -@Tags({ "hadoop", "HDFS", "delete", "remove", "filesystem", "restricted" }) +@Tags({"hadoop", "HDFS", "delete", "remove", "filesystem", "restricted"}) @CapabilityDescription("Deletes one or more files or directories from HDFS. The path can be provided as an attribute from an incoming FlowFile, " + "or a statically set path that is periodically removed. If this processor has an incoming connection, it" + "will ignore running on a periodic basis and instead rely on incoming FlowFiles to trigger a delete. " - + "Note that you may use a wildcard character to match multiple files or directories.") + + "Note that you may use a wildcard character to match multiple files or directories. If there are" + + " no incoming connections no flowfiles will be transfered to any output relationships. If there is an incoming" + + " flowfile then provided there are no detected failures it will be transferred to success otherwise it will be sent to false. If" + + " knowledge of globbed files deleted is necessary use ListHDFS first to produce a specific list of files to delete. ") @Restricted("Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.") +@SeeAlso({ListHDFS.class}) public class DeleteHDFS extends AbstractHadoopProcessor { + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") - .description("FlowFiles will be routed here if the delete command was successful") + .description("When an incoming flowfile is used then if there are no errors invoking delete the flowfile will route here.") .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") - .description("FlowFiles will be routed here if the delete command was unsuccessful") + .description("When an incoming flowfile is used and there is a failure while deleting then the flowfile will route here.") .build(); public static final PropertyDescriptor FILE_OR_DIRECTORY = new PropertyDescriptor.Builder() .name("file_or_directory") - .displayName("File or Directory") + .displayName("Path") .description("The HDFS file or directory to delete. A wildcard expression may be used to only delete certain files") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -109,20 +113,20 @@ public Set getRelationships() { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - String fileOrDirectoryName = null; - FlowFile flowFile = session.get(); + final FlowFile originalFlowFile = session.get(); // If this processor has an incoming connection, then do not run unless a // FlowFile is actually sent through - if (flowFile == null && context.hasIncomingConnection()) { + if (originalFlowFile == null && context.hasIncomingConnection()) { context.yield(); return; } - if (flowFile != null) { - fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); - } else { + final String fileOrDirectoryName; + if (originalFlowFile == null) { fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue(); + } else { + fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(originalFlowFile).getValue(); } final FileSystem fileSystem = getFileSystem(); @@ -140,43 +144,21 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro pathList.add(new Path(fileOrDirectoryName)); } - Map attributes = Maps.newHashMapWithExpectedSize(2); - boolean foundMissingFile = false; for (Path path : pathList) { - attributes.put("filename", path.getName()); - attributes.put("path", path.getParent().toString()); if (fileSystem.exists(path)) { fileSystem.delete(path, context.getProperty(RECURSIVE).asBoolean()); - - if (!context.hasIncomingConnection()) { - flowFile = session.create(); - session.transfer(session.putAllAttributes(flowFile, attributes), REL_SUCCESS); - } - - } else { - getLogger().warn("File (" + path + ") does not exist"); - - if (!context.hasIncomingConnection()) { - flowFile = session.create(); - session.transfer(session.putAllAttributes(flowFile, attributes), REL_FAILURE); - } - + getLogger().debug("For flowfile {} Deleted file at path {} with name {}", new Object[]{originalFlowFile, path.getParent().toString(), path.getName()}); } } - if (context.hasIncomingConnection()) { - // TODO we only put the last path deleted, change the semantic of the processor? - if (!foundMissingFile) { - session.transfer(session.putAllAttributes(flowFile, attributes), REL_SUCCESS); - } else { - session.transfer(session.putAllAttributes(flowFile, attributes), REL_FAILURE); - } + if (originalFlowFile != null) { + session.transfer(originalFlowFile, DeleteHDFS.REL_SUCCESS); } } catch (IOException e) { - getLogger().warn("Error processing delete for file or directory", e); - if (flowFile != null) { - session.rollback(true); + if (originalFlowFile != null) { + getLogger().error("Error processing delete for flowfile {} due to {}", new Object[]{originalFlowFile, e.getMessage()}, e); + session.transfer(originalFlowFile, DeleteHDFS.REL_FAILURE); } } - } + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java index 9143d8388f5e..be16ac63b7f6 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.processors.hadoop; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.any; @@ -25,15 +24,12 @@ import java.io.File; import java.io.IOException; -import java.util.List; import java.util.Map; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hadoop.KerberosProperties; -import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -55,6 +51,7 @@ public void setup() throws Exception { mockFileSystem = mock(FileSystem.class); } + //Tests the case where a file is found and deleted but there was no incoming connection @Test public void testSuccessfulDelete() throws Exception { Path filePath = new Path("/some/path/to/file.txt"); @@ -66,11 +63,8 @@ public void testSuccessfulDelete() throws Exception { runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); runner.assertValid(); runner.run(); - runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); - runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); - FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0); - assertEquals(filePath.getName(), flowFile.getAttribute("filename")); - assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path")); + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0); } @Test @@ -86,9 +80,6 @@ public void testDeleteFromIncomingFlowFile() throws Exception { runner.run(); runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1); - FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0); - assertEquals(filePath.getName(), flowFile.getAttribute("filename")); - assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path")); } @Test @@ -102,9 +93,7 @@ public void testIOException() throws Exception { attributes.put("hdfs.file", filePath.toString()); runner.enqueue("foo", attributes); runner.run(); - runner.assertQueueNotEmpty(); - runner.assertPenalizeCount(1); - assertEquals(1, runner.getQueueSize().getObjectCount()); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1); } @Test @@ -131,11 +120,7 @@ public void testUnsuccessfulDelete() throws Exception { runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString()); runner.assertValid(); runner.run(); - runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_FAILURE); - runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1); - FlowFile flowFile = runner.getFlowFilesForRelationship(DeleteHDFS.REL_FAILURE).get(0); - assertEquals(filePath.getName(), flowFile.getAttribute("filename")); - assertEquals(filePath.getParent().toString(), flowFile.getAttribute("path")); + runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0); } @Test @@ -158,14 +143,7 @@ public void testGlobDelete() throws Exception { runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString()); runner.assertValid(); runner.run(); - runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS); - runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, fileCount); - List flowFiles = runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS); - for (int i = 0; i < fileCount; i++) { - FlowFile flowFile = flowFiles.get(i); - assertEquals("file" + i, flowFile.getAttribute("filename")); - assertEquals("/data/for/2017/08/05", flowFile.getAttribute("path")); - } + runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0); } @Test