Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
+ "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
@WritesAttributes({
@WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
@WritesAttribute(attribute = "s3.region", description = "The region of the S3 bucket"),
@WritesAttribute(attribute = "filename", description = "The name of the file"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
@WritesAttribute(attribute = "s3.isLatest", description = "A boolean indicating if this is the latest version of the object"),
Expand Down Expand Up @@ -499,7 +500,7 @@ private void listByTrackingTimestamps(ProcessContext context, ProcessSession ses
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
writer = new RecordObjectWriter(session, writerFactory, getLogger());
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue());
}

try {
Expand All @@ -523,7 +524,7 @@ private void listByTrackingTimestamps(ProcessContext context, ProcessSession ses
ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);

// Write the entity to the listing
writer.addToListing(versionSummary, taggingResult, objectMetadata);
writer.addToListing(versionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());

// Track the latest lastModified timestamp and keys having that timestamp.
// NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps.
Expand Down Expand Up @@ -633,7 +634,7 @@ private void publishListing(ProcessContext context, ProcessSession session, List
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
writer = new RecordObjectWriter(session, writerFactory, getLogger());
writer = new RecordObjectWriter(session, writerFactory, getLogger(), context.getProperty(S3_REGION).getValue());
}

try {
Expand All @@ -648,7 +649,7 @@ private void publishListing(ProcessContext context, ProcessSession session, List
GetObjectTaggingResult taggingResult = getTaggingResult(context, s3Client, s3VersionSummary);
ObjectMetadata objectMetadata = getObjectMetadata(context, s3Client, s3VersionSummary);

writer.addToListing(s3VersionSummary, taggingResult, objectMetadata);
writer.addToListing(s3VersionSummary, taggingResult, objectMetadata, context.getProperty(S3_REGION).getValue());

listCount++;

Expand Down Expand Up @@ -910,7 +911,7 @@ public boolean isTruncated() {
interface S3ObjectWriter {
void beginListing() throws IOException, SchemaNotFoundException;

void addToListing(S3VersionSummary summary, GetObjectTaggingResult taggingResult, ObjectMetadata objectMetadata) throws IOException;
void addToListing(S3VersionSummary summary, GetObjectTaggingResult taggingResult, ObjectMetadata objectMetadata, String region) throws IOException;

void finishListing() throws IOException;

Expand Down Expand Up @@ -957,11 +958,13 @@ static class RecordObjectWriter implements S3ObjectWriter {
private final ComponentLog logger;
private RecordSetWriter recordWriter;
private FlowFile flowFile;
private String region;

public RecordObjectWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger) {
public RecordObjectWriter(final ProcessSession session, final RecordSetWriterFactory writerFactory, final ComponentLog logger, final String region) {
this.session = session;
this.writerFactory = writerFactory;
this.logger = logger;
this.region = region;
}

@Override
Expand All @@ -974,7 +977,7 @@ public void beginListing() throws IOException, SchemaNotFoundException {
}

@Override
public void addToListing(final S3VersionSummary summary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) throws IOException {
public void addToListing(final S3VersionSummary summary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata, String region) throws IOException {
recordWriter.write(createRecordForListing(summary, taggingResult, objectMetadata));
}

Expand All @@ -988,6 +991,7 @@ public void finishListing() throws IOException {
} else {
final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put("s3.region", region);
flowFile = session.putAllAttributes(flowFile, attributes);

session.transfer(flowFile, REL_SUCCESS);
Expand Down Expand Up @@ -1060,11 +1064,12 @@ public void beginListing() {
}

@Override
public void addToListing(final S3VersionSummary versionSummary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) {
public void addToListing(final S3VersionSummary versionSummary, final GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata, final String region) {
// Create the attributes
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
attributes.put("s3.bucket", versionSummary.getBucketName());
attributes.put("s3.region", region);
if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
attributes.put("s3.owner", versionSummary.getOwner().getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void testList() {
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("filename", "a");
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
ff0.assertAttributeEquals("s3.region", "eu-west-1");
String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
Expand Down Expand Up @@ -174,6 +175,7 @@ public void testListWithRecords() throws InitializationException {

final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals("record.count", "3");
flowFile.assertAttributeEquals("s3.region", "eu-west-1");
flowFile.assertContentEquals("a,test-bucket,,," + lastModifiedString + ",0,,true,,,\n"
+ "b/c,test-bucket,,," + lastModifiedString + ",0,,true,,,\n"
+ "d/e,test-bucket,,," + lastModifiedString + ",0,,true,,,\n");
Expand Down