diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index 0f51b7331756..5a1649814ece 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -116,6 +116,7 @@ + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.") @WritesAttributes({ @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), + @WritesAttribute(attribute = "s3.region", description = "The region of the S3 bucket"), @WritesAttribute(attribute = "filename", description = "The name of the file"), @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), @WritesAttribute(attribute = "s3.isLatest", description = "A boolean indicating if this is the latest version of the object"), @@ -499,7 +500,7 @@ private void listByTrackingTimestamps(ProcessContext context, ProcessSession ses if (writerFactory == null) { writer = new AttributeObjectWriter(session); } else { - writer = new RecordObjectWriter(session, writerFactory, getLogger()); + writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue()); } try { @@ -523,7 +524,7 @@ private void listByTrackingTimestamps(ProcessContext context, ProcessSession ses ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary); // Write the entity to the listing - writer.addToListing(versionSummary, taggingResult, objectMetadata); + writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue()); // Track the latest lastModified timestamp and keys having that timestamp. // NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps. @@ -633,7 +634,7 @@ private void publishListing(ProcessContext context, ProcessSession session, List if (writerFactory == null) { writer = new AttributeObjectWriter(session); } else { - writer = new RecordObjectWriter(session, writerFactory, getLogger()); + writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue()); } try { @@ -648,7 +649,7 @@ private void publishListing(ProcessContext context, ProcessSession session, List GetObjectTaggingResult taggingResult = getTaggingResult(context, s3Client, s3VersionSummary); ObjectMetadata objectMetadata = getObjectMetadata(context, s3Client, s3VersionSummary); - writer.addToListing(s3VersionSummary, taggingResult, objectMetadata); + writer.addToListing(s3VersionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue()); listCount++; @@ -910,7 +911,7 @@ public boolean isTruncated() { interface S3ObjectWriter { void beginListing() throws IOException, SchemaNotFoundException; - void addToListing(S3VersionSummary summary, GetObjectTaggingResult taggingResult, ObjectMetadata objectMetadata) throws IOException; + void addToListing(S3VersionSummary summary, GetObjectTaggingResult taggingResult, ObjectMetadata objectMetadata, String region) throws IOException; void finishListing() throws IOException; @@ -957,11 +958,13 @@ static class RecordObjectWriter implements S3ObjectWriter { private final ComponentLog logger; private RecordSetWriter recordWriter; private FlowFile flowFile; + private String region; - public RecordObjectWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger) { + public RecordObjectWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger, final String region) { this.session = session; this.writerFactory = writerFactory; this.logger = logger; + this.region = region; } @Override @@ -974,7 +977,7 @@ public void beginListing() throws IOException, SchemaNotFoundException { } @Override - public void addToListing(final S3VersionSummary summary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) throws IOException { + public void addToListing(final S3VersionSummary summary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata, String region) throws IOException { recordWriter.write(createRecordForListing(summary, taggingResult, objectMetadata)); } @@ -988,6 +991,7 @@ public void finishListing() throws IOException { } else { final Map attributes = new HashMap<>(writeResult.getAttributes()); attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put("s3.region", region); flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS); @@ -1060,11 +1064,12 @@ public void beginListing() { } @Override - public void addToListing(final S3VersionSummary versionSummary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) { + public void addToListing(final S3VersionSummary versionSummary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata, final String region) { // Create the attributes final Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey()); attributes.put("s3.bucket", versionSummary.getBucketName()); + attributes.put("s3.region", region); if (versionSummary.getOwner() != null) { // We may not have permission to read the owner attributes.put("s3.owner", versionSummary.getOwner().getId()); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java index 031d833530e4..4b2afc9cf239 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -116,6 +116,7 @@ public void testList() { MockFlowFile ff0 = flowFiles.get(0); ff0.assertAttributeEquals("filename", "a"); ff0.assertAttributeEquals("s3.bucket", "test-bucket"); + ff0.assertAttributeEquals("s3.region", "eu-west-1"); String lastModifiedTimestamp = String.valueOf(lastModified.getTime()); ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp); flowFiles.get(1).assertAttributeEquals("filename", "b/c"); @@ -174,6 +175,7 @@ public void testListWithRecords() throws InitializationException { final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0); flowFile.assertAttributeEquals("record.count", "3"); + flowFile.assertAttributeEquals("s3.region", "eu-west-1"); flowFile.assertContentEquals("a,test-bucket,,," + lastModifiedString + ",0,,true,,,\n" + "b/c,test-bucket,,," + lastModifiedString + ",0,,true,,,\n" + "d/e,test-bucket,,," + lastModifiedString + ",0,,true,,,\n");