From 67886a2a1dd914583e0b33d1f5f9f0eb6ae389ed Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 30 Jun 2017 18:55:37 +0200 Subject: [PATCH 1/2] NIFI-4144 - added min/max age to ListHDFS processor --- .../nifi/processors/hadoop/ListHDFS.java | 57 +++++++++++++++- .../nifi/processors/hadoop/TestListHDFS.java | 68 +++++++++++++++++++ 2 files changed, 122 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index a705ee8f2611..96d7c4d6837f 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -33,6 +33,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; @@ -48,6 +50,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -58,7 +61,6 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; - @TriggerSerially @TriggerWhenEmpty @InputRequirement(Requirement.INPUT_FORBIDDEN) @@ -114,6 +116,25 @@ public class ListHDFS extends AbstractHadoopProcessor { .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); + public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() + .name("minimum-file-age") + .displayName("Minimum File Age") + .description("The minimum age that a file must be in order to be pulled; any file younger than this " + + "amount of time (based on last modification date) will be ignored") + .required(true) + .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) + .defaultValue("0 sec") + .build(); + + public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder() + .name("maximum-file-age") + .displayName("Maximum File Age") + .description("The maximum age that a file must be in order to be pulled; any file older than this " + + "amount of time (based on last modification date) will be ignored") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All FlowFiles are transferred to this relationship") @@ -144,6 +165,8 @@ protected List getSupportedPropertyDescriptors() { props.add(DIRECTORY); props.add(RECURSE_SUBDIRS); props.add(FILE_FILTER); + props.add(MIN_AGE); + props.add(MAX_AGE); return props; } @@ -154,6 +177,23 @@ public Set getRelationships() { return relationships; } + @Override + protected Collection customValidate(ValidationContext context) { + final List problems = new ArrayList<>(super.customValidate(context)); + + final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp; + final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp; + + if (minimumAge > maximumAge) { + problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration") + .explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build()); + } + + return problems; + } + protected String getKey(final String directory) { return getIdentifier() + ".lastListingTime." + directory; } @@ -171,18 +211,29 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String * Determines which of the given FileStatus's describes a File that should be listed. * * @param statuses the eligible FileStatus objects that we could potentially list + * @param context processor context with properties values * @return a Set containing only those FileStatus objects that we want to list */ - Set determineListable(final Set statuses) { + Set determineListable(final Set statuses, ProcessContext context) { final long minTimestamp = this.latestTimestampListed; final TreeMap> orderedEntries = new TreeMap<>(); + final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp; + final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp; + // Build a sorted map to determine the latest possible entries for (final FileStatus status : statuses) { if (status.getPath().getName().endsWith("_COPYING_")) { continue; } + final long fileAge = System.currentTimeMillis() - status.getModificationTime(); + if (minimumAge > fileAge || fileAge > maximumAge) { + continue; + } + final long entityTimestamp = status.getModificationTime(); if (entityTimestamp > latestTimestampListed) { @@ -293,7 +344,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session return; } - final Set listable = determineListable(statuses); + final Set listable = determineListable(statuses, context); getLogger().debug("Of the {} files found in HDFS, {} are listable", new Object[] {statuses.size(), listable.size()}); for (final FileStatus status : listable) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index f0fce5af2487..f176a5fc5e2a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -313,6 +314,73 @@ public void testOnlyNewestEntriesHeldBack() throws InterruptedException { runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 5); } + @Test + public void testMinAgeMaxAge() throws IOException, InterruptedException { + long now = new Date().getTime(); + long oneHourAgo = now - 3600000; + long twoHoursAgo = now - 2*3600000; + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, now, now, create777(), "owner", "group", new Path("/test/willBeIgnored.txt"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, now-5, now-5, create777(), "owner", "group", new Path("/test/testFile.txt"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, oneHourAgo, oneHourAgo, create777(), "owner", "group", new Path("/test/testFile1.txt"))); + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, twoHoursAgo, twoHoursAgo, create777(), "owner", "group", new Path("/test/testFile2.txt"))); + + // all files + runner.run(); + runner.assertValid(); + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3); + runner.clearTransferState(); + runner.getStateManager().clear(Scope.CLUSTER); + + // invalid min_age > max_age + runner.setProperty(ListHDFS.MIN_AGE, "30 sec"); + runner.setProperty(ListHDFS.MAX_AGE, "1 sec"); + runner.assertNotValid(); + + // only one file (one hour ago) + runner.setProperty(ListHDFS.MIN_AGE, "30 sec"); + runner.setProperty(ListHDFS.MAX_AGE, "90 min"); + runner.assertValid(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); // will ignore the file for this cycle + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + // Next iteration should pick up the file, since nothing else was added. + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0).assertAttributeEquals("filename", "testFile1.txt"); + runner.clearTransferState(); + runner.getStateManager().clear(Scope.CLUSTER); + + // two files (one hour ago and two hours ago) + runner.setProperty(ListHDFS.MIN_AGE, "30 sec"); + runner.removeProperty(ListHDFS.MAX_AGE); + runner.assertValid(); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + runner.clearTransferState(); + runner.getStateManager().clear(Scope.CLUSTER); + + // two files (now and one hour ago) + runner.setProperty(ListHDFS.MIN_AGE, "0 sec"); + runner.setProperty(ListHDFS.MAX_AGE, "90 min"); + runner.assertValid(); + + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); + } + private FsPermission create777() { return new FsPermission((short) 0777); From 96e11b51bb65319f8ac1783ed80fbb5daffa3b7c Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 4 Jul 2017 18:13:53 +0200 Subject: [PATCH 2/2] NIFI-4144 - changed minimum file age property to ensure behavior consistency --- .../java/org/apache/nifi/processors/hadoop/ListHDFS.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 96d7c4d6837f..14d057d2e2f6 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -121,16 +121,15 @@ public class ListHDFS extends AbstractHadoopProcessor { .displayName("Minimum File Age") .description("The minimum age that a file must be in order to be pulled; any file younger than this " + "amount of time (based on last modification date) will be ignored") - .required(true) + .required(false) .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) - .defaultValue("0 sec") .build(); public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder() .name("maximum-file-age") .displayName("Maximum File Age") .description("The maximum age that a file must be in order to be pulled; any file older than this " - + "amount of time (based on last modification date) will be ignored") + + "amount of time (based on last modification date) will be ignored. Minimum value is 100ms.") .required(false) .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) .build(); @@ -219,7 +218,9 @@ Set determineListable(final Set statuses, ProcessContext final TreeMap> orderedEntries = new TreeMap<>(); final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); - final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp; + // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in + // the future relative to the nifi instance, files are not skipped. + final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp; final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;