From 3853b13806121c1479edac2038634992ffc6fdfe Mon Sep 17 00:00:00 2001 From: Adam Lamar Date: Sat, 23 Dec 2017 20:29:02 -0700 Subject: [PATCH 1/2] NIFI-4715: ListS3 produces duplicates in frequently updated buckets Keep totalListCount, reduce unnecessary persistState This closes #2361. Signed-off-by: Koji Kawamura --- .../org/apache/nifi/processors/aws/s3/ListS3.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index f5a69acb59bb..fc3260c8ef04 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -229,7 +229,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final AmazonS3 client = getClient(); int listCount = 0; - long maxTimestamp = 0L; + int totalListCount = 0; + long maxTimestamp = currentTimestamp; String delimiter = context.getProperty(DELIMITER).getValue(); String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); @@ -298,18 +299,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } bucketLister.setNextMarker(); + totalListCount += listCount; commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); + + // Update stateManger with the most recent timestamp currentTimestamp = maxTimestamp; + persistState(context); final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); - if (!commit(context, session, listCount)) { - if (currentTimestamp > 0) { - persistState(context); - } + if (totalListCount == 0) { getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } @@ -320,7 +322,6 @@ private boolean commit(final ProcessContext context, final ProcessSession sessio if (willCommit) { getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount}); session.commit(); - persistState(context); } return willCommit; } From 4d445055cf605811f85bfed12b33155adbd570a2 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 31 Oct 2018 16:01:36 +0900 Subject: [PATCH 2/2] NIFI-4715: Update currentKeys after listing loop ListS3 used to update currentKeys within listing loop, that causes duplicates. Because S3 returns object list in lexicographic order, if we clear currentKeys during the loop, we cannot tell if the object has been listed or not, in a case where newer object has a lexicographically former name. --- .../apache/nifi/processors/aws/s3/ListS3.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index fc3260c8ef04..d3bade9faa3a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -230,7 +230,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final AmazonS3 client = getClient(); int listCount = 0; int totalListCount = 0; - long maxTimestamp = currentTimestamp; + long latestListedTimestampInThisCycle = currentTimestamp; String delimiter = context.getProperty(DELIMITER).getValue(); String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); @@ -252,6 +252,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } VersionListing versionListing; + final Set listedKeys = new HashSet<>(); + getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{listingTimestamp, currentTimestamp, currentKeys}); + do { versionListing = bucketLister.listVersions(); for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) { @@ -262,6 +265,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session continue; } + getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys}); + // Create the attributes final Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey()); @@ -287,14 +292,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS); - // Update state - if (lastModified > maxTimestamp) { - maxTimestamp = lastModified; - currentKeys.clear(); - } - if (lastModified == maxTimestamp) { - currentKeys.add(versionSummary.getKey()); + // Track the latest lastModified timestamp and keys having that timestamp. + // NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps. + if (lastModified > latestListedTimestampInThisCycle) { + latestListedTimestampInThisCycle = lastModified; + listedKeys.clear(); + listedKeys.add(versionSummary.getKey()); + + } else if (lastModified == latestListedTimestampInThisCycle) { + listedKeys.add(versionSummary.getKey()); } + listCount++; } bucketLister.setNextMarker(); @@ -304,8 +312,14 @@ public void onTrigger(final ProcessContext context, final ProcessSession session listCount = 0; } while (bucketLister.isTruncated()); + // Update currentKeys. + if (latestListedTimestampInThisCycle > currentTimestamp) { + currentKeys.clear(); + } + currentKeys.addAll(listedKeys); + // Update stateManger with the most recent timestamp - currentTimestamp = maxTimestamp; + currentTimestamp = latestListedTimestampInThisCycle; persistState(context); final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);