Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-6468: Adding AWS S3 'requester pays' to Fetch and List processors. #3601

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -32,6 +32,7 @@
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
Expand Down Expand Up @@ -73,10 +74,23 @@ public class FetchS3Object extends AbstractS3Processor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder()
.name("requester-pays")
.displayName("Requester Pays")
.required(true)
.description("If true, indicates that the requester consents to pay any charges associated with retrieving objects from "
+ "the S3 bucket. This sets the 'x-amz-request-payer' header to 'requester'.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues(new AllowableValue("true", "True", "Indicates that the requester consents to pay any charges associated "
+ "with retrieving objects from the S3 bucket."), new AllowableValue("false", "False", "Does not consent to pay "
+ "requester charges for retrieving objects from the S3 bucket."))
.defaultValue("false")
.build();

public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD,
REQUESTER_PAYS));

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
Expand All @@ -94,6 +108,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();

final AmazonS3 client = getClient();
final GetObjectRequest request;
Expand All @@ -102,6 +117,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
} else {
request = new GetObjectRequest(bucket, key, versionId);
}
request.setRequesterPays(requesterPays);

final Map<String, String> attributes = new HashMap<>();
try (final S3Object s3Object = client.getObject(request)) {
Expand Down
Expand Up @@ -42,6 +42,9 @@
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
Expand Down Expand Up @@ -149,12 +152,25 @@ public class ListS3 extends AbstractS3Processor {
.allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
.defaultValue("false")
.build();
public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder()
.name("requester-pays")
.displayName("Requester Pays")
.required(true)
.description("If true, indicates that the requester consents to pay any charges associated with listing "
+ "the S3 bucket. This sets the 'x-amz-request-payer' header to 'requester'. Note that this "
+ "setting is not applicable when 'Use Versions' is 'true'.")
.addValidator(createRequesterPaysValidator())
.allowableValues(new AllowableValue("true", "True", "Indicates that the requester consents to pay any charges associated "
+ "with listing the S3 bucket."), new AllowableValue("false", "False", "Does not consent to pay "
+ "requester charges for listing the S3 bucket."))
.defaultValue("false")
.build();

public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, WRITE_OBJECT_TAGS, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME,
PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE));
PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, LIST_TYPE, MIN_AGE, REQUESTER_PAYS));

public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Collections.singletonList(REL_SUCCESS)));
Expand All @@ -166,6 +182,23 @@ public class ListS3 extends AbstractS3Processor {
private long currentTimestamp = 0L;
private Set<String> currentKeys;

private static Validator createRequesterPaysValidator() {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
boolean requesterPays = Boolean.valueOf(input);
boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
boolean valid = !requesterPays || !useVersions;
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(valid)
.explanation(valid ? null : "'Requester Pays' cannot be used when listing object versions.")
.build();
}
};
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
Expand Down Expand Up @@ -226,6 +259,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long listingTimestamp = System.currentTimeMillis();
final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();

final AmazonS3 client = getClient();
int listCount = 0;
Expand All @@ -243,6 +277,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
: new S3ObjectBucketLister(client);

bucketLister.setBucketName(bucket);
bucketLister.setRequesterPays(requesterPays);

if (delimiter != null && !delimiter.isEmpty()) {
bucketLister.setDelimiter(delimiter);
Expand Down Expand Up @@ -358,6 +393,7 @@ private interface S3BucketLister {
public void setBucketName(String bucketName);
public void setPrefix(String prefix);
public void setDelimiter(String delimiter);
public void setRequesterPays(boolean requesterPays);
// Versions have a superset of the fields that Objects have, so we'll use
// them as a common interface
public VersionListing listVersions();
Expand Down Expand Up @@ -389,6 +425,11 @@ public void setDelimiter(String delimiter) {
listObjectsRequest.setDelimiter(delimiter);
}

@Override
public void setRequesterPays(boolean requesterPays) {
listObjectsRequest.setRequesterPays(requesterPays);
}

@Override
public VersionListing listVersions() {
VersionListing versionListing = new VersionListing();
Expand Down Expand Up @@ -445,6 +486,11 @@ public void setDelimiter(String delimiter) {
listObjectsRequest.setDelimiter(delimiter);
}

@Override
public void setRequesterPays(boolean requesterPays) {
listObjectsRequest.setRequesterPays(requesterPays);
}

@Override
public VersionListing listVersions() {
VersionListing versionListing = new VersionListing();
Expand Down Expand Up @@ -501,6 +547,11 @@ public void setDelimiter(String delimiter) {
listVersionsRequest.setDelimiter(delimiter);
}

@Override
public void setRequesterPays(boolean requesterPays) {
// Not supported in versionListing, so this does nothing.
}

@Override
public VersionListing listVersions() {
versionListing = client.listVersions(listVersionsRequest);
Expand Down
Expand Up @@ -43,6 +43,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;


Expand Down Expand Up @@ -100,6 +101,65 @@ public void testGetObject() throws IOException {
GetObjectRequest request = captureRequest.getValue();
assertEquals("request-bucket", request.getBucketName());
assertEquals("request-key", request.getKey());
assertFalse(request.isRequesterPays());
assertNull(request.getVersionId());

runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
MockFlowFile ff = ffs.get(0);
ff.assertAttributeEquals("s3.bucket", "response-bucket-name");
ff.assertAttributeEquals(CoreAttributes.FILENAME.key(), "file.txt");
ff.assertAttributeEquals(CoreAttributes.PATH.key(), "key/path/to");
ff.assertAttributeEquals(CoreAttributes.ABSOLUTE_PATH.key(), "key/path/to/file.txt");
ff.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain");
ff.assertAttributeEquals("hash.value", "testMD5hash");
ff.assertAttributeEquals("hash.algorithm", "MD5");
ff.assertAttributeEquals("s3.etag", "test-etag");
ff.assertAttributeEquals("s3.expirationTime", String.valueOf(expiration.getTime()));
ff.assertAttributeEquals("s3.expirationTimeRuleId", "testExpirationRuleId");
ff.assertAttributeEquals("userKey1", "userValue1");
ff.assertAttributeEquals("userKey2", "userValue2");
ff.assertAttributeEquals("s3.sseAlgorithm", "testAlgorithm");
ff.assertContentEquals("Some Content");
}

@Test
public void testGetObjectWithRequesterPays() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
runner.setProperty(FetchS3Object.REQUESTER_PAYS, "true");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
runner.enqueue(new byte[0], attrs);

S3Object s3ObjectResponse = new S3Object();
s3ObjectResponse.setBucketName("response-bucket-name");
s3ObjectResponse.setKey("response-key");
s3ObjectResponse.setObjectContent(new StringInputStream("Some Content"));
ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class);
metadata.setContentDisposition("key/path/to/file.txt");
metadata.setContentType("text/plain");
metadata.setContentMD5("testMD5hash");
Date expiration = new Date();
metadata.setExpirationTime(expiration);
metadata.setExpirationTimeRuleId("testExpirationRuleId");
Map<String, String> userMetadata = new HashMap<>();
userMetadata.put("userKey1", "userValue1");
userMetadata.put("userKey2", "userValue2");
metadata.setUserMetadata(userMetadata);
metadata.setSSEAlgorithm("testAlgorithm");
Mockito.when(metadata.getETag()).thenReturn("test-etag");
s3ObjectResponse.setObjectMetadata(metadata);
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);

runner.run(1);

ArgumentCaptor<GetObjectRequest> captureRequest = ArgumentCaptor.forClass(GetObjectRequest.class);
Mockito.verify(mockS3Client, Mockito.times(1)).getObject(captureRequest.capture());
GetObjectRequest request = captureRequest.getValue();
assertEquals("request-bucket", request.getBucketName());
assertEquals("request-key", request.getKey());
assertTrue(request.isRequesterPays());
assertNull(request.getVersionId());

runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
Expand Down Expand Up @@ -179,7 +239,7 @@ public void testGetObjectExceptionGoesToFailure() throws IOException {
public void testGetPropertyDescriptors() throws Exception {
FetchS3Object processor = new FetchS3Object();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
assertEquals("size should be eq", 17, pd.size());
assertEquals("size should be eq", 18, pd.size());
assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(FetchS3Object.BUCKET));
Expand All @@ -197,6 +257,7 @@ public void testGetPropertyDescriptors() throws Exception {
assertTrue(pd.contains(FetchS3Object.PROXY_HOST_PORT));
assertTrue(pd.contains(FetchS3Object.PROXY_USERNAME));
assertTrue(pd.contains(FetchS3Object.PROXY_PASSWORD));
assertTrue(pd.contains(FetchS3Object.REQUESTER_PAYS));

}
}