From 02e94ed885f86d67e02eb9b4e783ac4ef56397e4 Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Tue, 23 Aug 2016 12:46:12 +0000 Subject: [PATCH 1/3] NIFI-2631: Adding 'Commit Mode' and 'Use Versions' to ListS3 --- .../apache/nifi/processors/aws/s3/ListS3.java | 255 +++++++++++++++--- 1 file changed, 218 insertions(+), 37 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index bc149252bbdd..757a77e4ad95 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -16,10 +16,17 @@ */ package org.apache.nifi.processors.aws.s3; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.S3ObjectSummary; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.text.WordUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; @@ -40,15 +47,13 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ListVersionsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.S3VersionSummary; +import com.amazonaws.services.s3.model.VersionListing; @TriggerSerially @TriggerWhenEmpty @@ -66,11 +71,30 @@ @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), @WritesAttribute(attribute = "filename", description = "The name of the file"), @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), + @WritesAttribute(attribute = "s3.isLatest", description = "A boolean indicating if this is the latest version of the object"), @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"), @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"), - @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),}) + @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"), + @WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable")}) @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) public class ListS3 extends AbstractS3Processor { + private static enum CommitMode { + PER_PAGE, + ONCE; + + public String getValue() { + return WordUtils.capitalize(name().toLowerCase().replace('_', ' ')); + } + + public static CommitMode fromValue(String value) { + for (CommitMode m : CommitMode.values()) { + if (m.getValue().equals(value)) { + return m; + } + } + throw new IllegalArgumentException("Unrecognized CommitMode " + value); + } + } public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder() .name("delimiter") @@ -91,10 +115,32 @@ public class ListS3 extends AbstractS3Processor { .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').") .build(); + public static final PropertyDescriptor COMMIT_MODE = new PropertyDescriptor.Builder() + .name("commit-mode") + .displayName("Commit mode") + .expressionLanguageSupported(false) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(CommitMode.ONCE.getValue(), CommitMode.PER_PAGE.getValue()) + .defaultValue(CommitMode.ONCE.getValue()) + .description("The commit mode: 'Once' will commit all flow files at the end, and 'Per page' will commit at the end of each page of S3 objects.") + .build(); + + public static final PropertyDescriptor USE_VERSIONS = new PropertyDescriptor.Builder() + .name("use-versions") + .displayName("Use Versions") + .expressionLanguageSupported(false) + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("false") + .description("Specifies whether to use S3 versions, if applicable. If false, only the latest version of each object will be returned.") + .build(); + public static final List properties = Collections.unmodifiableList( Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, - PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX)); + PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, COMMIT_MODE, USE_VERSIONS)); public static final Set relationships = Collections.unmodifiableSet( new HashSet<>(Collections.singletonList(REL_SUCCESS))); @@ -171,35 +217,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session String delimiter = context.getProperty(DELIMITER).getValue(); String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket); + CommitMode commitMode = CommitMode.fromValue(context.getProperty(COMMIT_MODE).getValue()); + boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean(); + + S3BucketLister bucketLister = useVersions + ? new S3VersionBucketLister(client) + : new S3ObjectBucketLister(client); + + bucketLister.setBucketName(bucket); + if (delimiter != null && !delimiter.isEmpty()) { - listObjectsRequest.setDelimiter(delimiter); + bucketLister.setDelimiter(delimiter); } if (prefix != null && !prefix.isEmpty()) { - listObjectsRequest.setPrefix(prefix); + bucketLister.setPrefix(prefix); } - ObjectListing objectListing; + VersionListing versionListing; do { - objectListing = client.listObjects(listObjectsRequest); - for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { - long lastModified = objectSummary.getLastModified().getTime(); + versionListing = bucketLister.listVersions(); + for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) { + long lastModified = versionSummary.getLastModified().getTime(); if (lastModified < currentTimestamp - || lastModified == currentTimestamp && currentKeys.contains(objectSummary.getKey())) { + || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())) { continue; } // Create the attributes final Map attributes = new HashMap<>(); - attributes.put(CoreAttributes.FILENAME.key(), objectSummary.getKey()); - attributes.put("s3.bucket", objectSummary.getBucketName()); - if (objectSummary.getOwner() != null) { // We may not have permission to read the owner - attributes.put("s3.owner", objectSummary.getOwner().getId()); + attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey()); + attributes.put("s3.bucket", versionSummary.getBucketName()); + if (versionSummary.getOwner() != null) { // We may not have permission to read the owner + attributes.put("s3.owner", versionSummary.getOwner().getId()); } - attributes.put("s3.etag", objectSummary.getETag()); + attributes.put("s3.etag", versionSummary.getETag()); attributes.put("s3.lastModified", String.valueOf(lastModified)); - attributes.put("s3.length", String.valueOf(objectSummary.getSize())); - attributes.put("s3.storeClass", objectSummary.getStorageClass()); + attributes.put("s3.length", String.valueOf(versionSummary.getSize())); + attributes.put("s3.storeClass", versionSummary.getStorageClass()); + attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest())); + if (versionSummary.getVersionId() != null) { + attributes.put("s3.version", versionSummary.getVersionId()); + } // Create the flowfile FlowFile flowFile = session.create(); @@ -212,24 +270,147 @@ public void onTrigger(final ProcessContext context, final ProcessSession session currentKeys.clear(); } if (lastModified == maxTimestamp) { - currentKeys.add(objectSummary.getKey()); + currentKeys.add(versionSummary.getKey()); } listCount++; } - listObjectsRequest.setMarker(objectListing.getNextMarker()); - } while (objectListing.isTruncated()); + bucketLister.setNextMarker(); + + if (commitMode == CommitMode.PER_PAGE) { + commit(context, session, listCount); + listCount = 0; + } + } while (bucketLister.isTruncated()); currentTimestamp = maxTimestamp; final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); - if (listCount > 0) { + if (!commit(context, session, listCount)) { + if (currentTimestamp > 0) { + persistState(context); + } + getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); + context.yield(); + } + } + + private boolean commit(final ProcessContext context, final ProcessSession session, int listCount) { + boolean willCommit = listCount > 0; + if (willCommit) { getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount}); session.commit(); persistState(context); - } else { - getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); - context.yield(); + } + return willCommit; + } + + private interface S3BucketLister { + public void setBucketName(String bucketName); + public void setPrefix(String prefix); + public void setDelimiter(String delimiter); + // Versions have a superset of the fields that Objects have, so we'll use + // them as a common interface + public VersionListing listVersions(); + public void setNextMarker(); + public boolean isTruncated(); + } + + public class S3ObjectBucketLister implements S3BucketLister { + private AmazonS3 client; + private ListObjectsRequest listObjectsRequest; + private ObjectListing objectListing; + + public S3ObjectBucketLister(AmazonS3 client) { + this.client = client; + } + + @Override + public void setBucketName(String bucketName) { + listObjectsRequest = new ListObjectsRequest().withBucketName(bucketName); + } + + @Override + public void setPrefix(String prefix) { + listObjectsRequest.setPrefix(prefix); + } + + @Override + public void setDelimiter(String delimiter) { + listObjectsRequest.setDelimiter(delimiter); + } + + @Override + public VersionListing listVersions() { + VersionListing versionListing = new VersionListing(); + this.objectListing = client.listObjects(listObjectsRequest); + for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { + S3VersionSummary versionSummary = new S3VersionSummary(); + versionSummary.setBucketName(objectSummary.getBucketName()); + versionSummary.setETag(objectSummary.getETag()); + versionSummary.setKey(objectSummary.getKey()); + versionSummary.setLastModified(objectSummary.getLastModified()); + versionSummary.setOwner(objectSummary.getOwner()); + versionSummary.setSize(objectSummary.getSize()); + versionSummary.setStorageClass(objectSummary.getStorageClass()); + versionSummary.setIsLatest(true); + + versionListing.getVersionSummaries().add(versionSummary); + } + + return versionListing; + } + + @Override + public void setNextMarker() { + listObjectsRequest.setMarker(objectListing.getNextMarker()); + } + + @Override + public boolean isTruncated() { + return (objectListing == null) ? false : objectListing.isTruncated(); + } + } + + public class S3VersionBucketLister implements S3BucketLister { + private AmazonS3 client; + private ListVersionsRequest listVersionsRequest; + private VersionListing versionListing; + + public S3VersionBucketLister(AmazonS3 client) { + this.client = client; + } + + @Override + public void setBucketName(String bucketName) { + listVersionsRequest = new ListVersionsRequest().withBucketName(bucketName); + } + + @Override + public void setPrefix(String prefix) { + listVersionsRequest.setPrefix(prefix); + } + + @Override + public void setDelimiter(String delimiter) { + listVersionsRequest.setDelimiter(delimiter); + } + + @Override + public VersionListing listVersions() { + versionListing = client.listVersions(listVersionsRequest); + return versionListing; + } + + @Override + public void setNextMarker() { + listVersionsRequest.setKeyMarker(versionListing.getNextKeyMarker()); + listVersionsRequest.setVersionIdMarker(versionListing.getNextVersionIdMarker()); + } + + @Override + public boolean isTruncated() { + return (versionListing == null) ? false : versionListing.isTruncated(); } } } From c6b2ca7657223ed72672fd8f5fc2e2defcc42a28 Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Wed, 31 Aug 2016 11:03:12 +0000 Subject: [PATCH 2/3] NIFI-2631: Removing Commit Mode from ListS3 --- .../apache/nifi/processors/aws/s3/ListS3.java | 38 ++----------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index 757a77e4ad95..29f771bd8abf 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -26,7 +26,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.text.WordUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Stateful; @@ -78,23 +77,6 @@ @WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable")}) @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class}) public class ListS3 extends AbstractS3Processor { - private static enum CommitMode { - PER_PAGE, - ONCE; - - public String getValue() { - return WordUtils.capitalize(name().toLowerCase().replace('_', ' ')); - } - - public static CommitMode fromValue(String value) { - for (CommitMode m : CommitMode.values()) { - if (m.getValue().equals(value)) { - return m; - } - } - throw new IllegalArgumentException("Unrecognized CommitMode " + value); - } - } public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder() .name("delimiter") @@ -115,17 +97,6 @@ public static CommitMode fromValue(String value) { .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').") .build(); - public static final PropertyDescriptor COMMIT_MODE = new PropertyDescriptor.Builder() - .name("commit-mode") - .displayName("Commit mode") - .expressionLanguageSupported(false) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .allowableValues(CommitMode.ONCE.getValue(), CommitMode.PER_PAGE.getValue()) - .defaultValue(CommitMode.ONCE.getValue()) - .description("The commit mode: 'Once' will commit all flow files at the end, and 'Per page' will commit at the end of each page of S3 objects.") - .build(); - public static final PropertyDescriptor USE_VERSIONS = new PropertyDescriptor.Builder() .name("use-versions") .displayName("Use Versions") @@ -140,7 +111,7 @@ public static CommitMode fromValue(String value) { public static final List properties = Collections.unmodifiableList( Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, - PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, COMMIT_MODE, USE_VERSIONS)); + PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS)); public static final Set relationships = Collections.unmodifiableSet( new HashSet<>(Collections.singletonList(REL_SUCCESS))); @@ -217,7 +188,6 @@ public void onTrigger(final ProcessContext context, final ProcessSession session String delimiter = context.getProperty(DELIMITER).getValue(); String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue(); - CommitMode commitMode = CommitMode.fromValue(context.getProperty(COMMIT_MODE).getValue()); boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean(); S3BucketLister bucketLister = useVersions @@ -276,10 +246,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } bucketLister.setNextMarker(); - if (commitMode == CommitMode.PER_PAGE) { - commit(context, session, listCount); - listCount = 0; - } + commit(context, session, listCount); + listCount = 0; } while (bucketLister.isTruncated()); currentTimestamp = maxTimestamp; From 1d1014ed8d6ff1fce1ef09b6c52c907e507438c1 Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Sat, 3 Sep 2016 11:12:06 +0000 Subject: [PATCH 3/3] NIFI-2631: Updating integration tests for 'Use Versions' property in ListS3 --- .../nifi/processors/aws/s3/AbstractS3IT.java | 6 +++-- .../nifi/processors/aws/s3/ITListS3.java | 24 ++++++++++++++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java index 95b4ba8c55e6..0ecc33e94695 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java @@ -52,7 +52,7 @@ public abstract class AbstractS3IT { protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt"; - protected final static String REGION = "us-west-1"; + protected final static String REGION = System.getProperty("it.aws.region", "us-west-1"); // Adding REGION to bucket prevents errors of // "A conflicting conditional operation is currently in progress against this resource." // when bucket is rapidly added/deleted and consistency propogation causes this error. @@ -82,7 +82,9 @@ public static void oneTimeSetup() { fail("Bucket " + BUCKET_NAME + " exists. Choose a different bucket name to continue test"); } - CreateBucketRequest request = new CreateBucketRequest(BUCKET_NAME, REGION); + CreateBucketRequest request = REGION.contains("east") + ? new CreateBucketRequest(BUCKET_NAME) // See https://github.com/boto/boto3/issues/125 + : new CreateBucketRequest(BUCKET_NAME, REGION); client.createBucket(request); } catch (final AmazonS3Exception e) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java index 6d77eb679044..9370022bf011 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java @@ -124,11 +124,32 @@ public void testSimpleListWithPrefix() throws Throwable { flowFiles.get(0).assertAttributeEquals("filename", "b/c"); } + @Test + public void testSimpleListWithPrefixAndVersions() throws Throwable { + putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + + final TestRunner runner = TestRunners.newTestRunner(new ListS3()); + + runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(ListS3.REGION, REGION); + runner.setProperty(ListS3.BUCKET, BUCKET_NAME); + runner.setProperty(ListS3.PREFIX, "b/"); + runner.setProperty(ListS3.USE_VERSIONS, "true"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS); + flowFiles.get(0).assertAttributeEquals("filename", "b/c"); + } + @Test public void testGetPropertyDescriptors() throws Exception { ListS3 processor = new ListS3(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 13, pd.size()); + assertEquals("size should be eq", 14, pd.size()); assertTrue(pd.contains(ListS3.ACCESS_KEY)); assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(ListS3.BUCKET)); @@ -142,5 +163,6 @@ public void testGetPropertyDescriptors() throws Exception { assertTrue(pd.contains(ListS3.PROXY_HOST_PORT)); assertTrue(pd.contains(ListS3.DELIMITER)); assertTrue(pd.contains(ListS3.PREFIX)); + assertTrue(pd.contains(ListS3.USE_VERSIONS)); } }