From dff36898dfa50b248da972b33f7ac89784d8ea44 Mon Sep 17 00:00:00 2001 From: Joe Skora Date: Thu, 29 Oct 2015 01:41:58 -0400 Subject: [PATCH] New ListFile processor. --- .../nifi/processors/standard/ListFile.java | 140 ++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../processors/standard/TestListFile.java | 154 ++++++++++++++++++ 3 files changed, 295 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java new file mode 100644 index 000000000000..df3c7a748601 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.FileInfo; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.attribute.PosixFileAttributes; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class ListFile extends AbstractListProcessor { + public static final PropertyDescriptor PATH = new PropertyDescriptor.Builder() + .name("Path") + .description("The path on the system from which to pull or push files") + .required(false) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, false)) + .expressionLanguageSupported(true) + .defaultValue(".") + .build(); + + private List properties; + private Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List properties = new ArrayList<>(); + properties.add(PATH); + this.properties = Collections.unmodifiableList(properties); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + + @Override + protected Map createAttributes(final FileInfo fileInfo, final ProcessContext context) { + final Map attributes = new HashMap<>(); + attributes.put("file.owner", fileInfo.getOwner()); + attributes.put("file.group", fileInfo.getGroup()); + attributes.put("file.permissions", fileInfo.getPermissions()); + attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName()); + + final String fullPath = fileInfo.getFullPathFileName(); + if (fullPath != null) { + final int index = fullPath.lastIndexOf("/"); + if (index > -1) { + final String path = fullPath.substring(0, index); + attributes.put(CoreAttributes.PATH.key(), path); + } + } + return attributes; + } + + @Override + protected String getPath(final ProcessContext context) { + return context.getProperty(PATH).evaluateAttributeExpressions().getValue(); + } + + @Override + protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException { + final File path = new File(getPath(context)); + final List listing = new ArrayList<>(); + File[] files = path.listFiles(); + if (files != null) { + for (File file : files) { + final PosixFileAttributes attrib = Files.readAttributes(file.toPath(), PosixFileAttributes.class, LinkOption.NOFOLLOW_LINKS); + listing.add(new FileInfo.Builder() + .directory(file.isDirectory()) + .filename(file.getName()) + .fullPathFileName(file.getAbsolutePath()) + .group(attrib.group().getName()) + .lastModifiedTime(file.lastModified()) + .owner(attrib.owner().getName()) + .permissions(attrib.permissions().toString()) + .size(file.getTotalSpace()) + .build()); + } + } + if (minTimestamp == null) { + return listing; + } + + final Iterator itr = listing.iterator(); + while (itr.hasNext()) { + final FileInfo next = itr.next(); + if (next.getLastModifiedTime() < minTimestamp) { + itr.remove(); + } + } + + return listing; + } + + @Override + protected boolean isListingResetNecessary(final PropertyDescriptor property) { + return PATH.equals(property); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index b12fb6ff9690..c38c85aa3480 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -45,6 +45,7 @@ org.apache.nifi.processors.standard.GetJMSTopic org.apache.nifi.processors.standard.ListenHTTP org.apache.nifi.processors.standard.ListenUDP org.apache.nifi.processors.standard.ListSFTP +org.apache.nifi.processors.standard.ListFile org.apache.nifi.processors.standard.LogAttribute org.apache.nifi.processors.standard.MergeContent org.apache.nifi.processors.standard.ModifyBytes diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java new file mode 100644 index 000000000000..93c45ba6e29e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestListFile { + + final String TESTDIR = "target/test/data/in"; + final File testDir = new File(TESTDIR); + ListFile processor; + TestRunner runner; + ProcessContext context; + ProcessorInitializationContext initContext; + + @Before + public void setUp() throws Exception { + processor = new ListFile(); + runner = TestRunners.newTestRunner(processor); + context = runner.getProcessContext(); + deleteDirectory(testDir); + assertTrue("Unable to create test data directory " + testDir.getAbsolutePath(), testDir.exists() || testDir.mkdirs()); + } + + private void deleteDirectory(final File directory) throws IOException { + if (directory.exists()) { + File[] files = directory.listFiles(); + if (files != null) { + for (final File file : files) { + if (file.isDirectory()) { + deleteDirectory(file); + } + assertTrue("Could not delete " + file.getAbsolutePath(), file.delete()); + } + } + } + } + + @After + public void tearDown() throws Exception { + deleteDirectory(testDir); + File tempFile = processor.getPersistenceFile(); + if (tempFile.exists()) { + assertTrue(tempFile.delete()); + } + } + + @Test + public void testGetSupportedPropertyDescriptors() throws Exception { + List properties = processor.getSupportedPropertyDescriptors(); + assertEquals(1, properties.size()); + assertEquals(ListFile.PATH, properties.get(0)); + } + + @Test + public void testGetRelationships() throws Exception { + Set relationships = processor.getRelationships(); + assertEquals(1, relationships.size()); + assertEquals(AbstractListProcessor.REL_SUCCESS, relationships.toArray()[0]); + } + + @Test + public void testGetPath() { + runner.setProperty(ListFile.PATH, "/dir/test1"); + assertEquals("/dir/test1", processor.getPath(context)); + runner.setProperty(ListFile.PATH, "${literal(\"/DIR/TEST2\"):toLower()}"); + assertEquals("/dir/test2", processor.getPath(context)); + } + + @Test + public void testPerformListing() throws Exception { + + Long now = System.currentTimeMillis(); + + // process first file and set new timestamp + final File file1 = new File(TESTDIR + "/file1.txt"); + assertTrue(file1.createNewFile()); + assertTrue(file1.setLastModified(now - 10000)); + runner.setProperty(ListFile.PATH, testDir.getAbsolutePath()); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(1, successFiles.size()); + + // process second file after timestamp + final File file2 = new File(TESTDIR + "/file2.txt"); + assertTrue(file2.createNewFile()); + assertTrue(file2.setLastModified(now - 5000)); + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(1, successFiles2.size()); + + // process third file before timestamp + final File file3 = new File(TESTDIR + "/file3.txt"); + assertTrue(file3.createNewFile()); + assertTrue(file3.setLastModified(now - 10000)); + System.out.printf("%d %d %d\n", file1.lastModified(), file2.lastModified(), file3.lastModified()); + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(0, successFiles3.size()); + + // force state to reset and process all files + runner.clearTransferState(); + runner.removeProperty(ListFile.PATH); + runner.setProperty(ListFile.PATH, testDir.getAbsolutePath()); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(3, successFiles4.size()); + } + + @Test + public void testIsListingResetNecessary() throws Exception { + assertEquals(true, processor.isListingResetNecessary(ListFile.PATH)); + assertEquals(false, processor.isListingResetNecessary(new PropertyDescriptor.Builder().name("x").build())); + } +}