Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.aws.s3;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
Expand All @@ -24,6 +25,7 @@
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.EmailAddressGrantee;
Expand All @@ -36,6 +38,7 @@
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;

Expand Down Expand Up @@ -315,6 +318,24 @@ protected final AccessControlList createACL(final ProcessContext context, final
return acl;
}

protected FlowFile extractExceptionDetails(final Exception e, final ProcessSession session, FlowFile flowFile) {
flowFile = session.putAttribute(flowFile, "s3.exception", e.getClass().getName());
if (e instanceof AmazonS3Exception) {
flowFile = putAttribute(session, flowFile, "s3.additionalDetails", ((AmazonS3Exception) e).getAdditionalDetails());
}
if (e instanceof AmazonServiceException) {
final AmazonServiceException ase = (AmazonServiceException) e;
flowFile = putAttribute(session, flowFile, "s3.statusCode", ase.getStatusCode());
flowFile = putAttribute(session, flowFile, "s3.errorCode", ase.getErrorCode());
flowFile = putAttribute(session, flowFile, "s3.errorMessage", ase.getErrorMessage());
}
return flowFile;
}

private FlowFile putAttribute(final ProcessSession session, final FlowFile flowFile, final String key, final Object value) {
return (value == null) ? flowFile : session.putAttribute(flowFile, key, value.toString());
}

/**
* Create CannedAccessControlList if {@link #CANNED_ACL} property specified.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
Expand All @@ -41,6 +43,12 @@


@SupportsBatching
@WritesAttributes({
@WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
@WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
@WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
@WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
@WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation")})
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
@InputRequirement(Requirement.INPUT_REQUIRED)
Expand Down Expand Up @@ -92,14 +100,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
s3.deleteVersion(r);
}
} catch (final AmazonServiceException ase) {
getLogger().error("Failed to delete S3 Object for {}; routing to failure", new Object[]{flowFile, ase});
flowFile = extractExceptionDetails(ase, session, flowFile);
getLogger().error("Failed to delete S3 Object for {}; routing to failure", flowFile, ase);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}

session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", flowFile, transferMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@
@WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
@WritesAttribute(attribute = "mime.type", description = "If S3 provides the content type/MIME type, this attribute will hold that file"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
@WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
@WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
@WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
@WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
@WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation"),
@WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"),
@WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"),
@WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"),
Expand Down Expand Up @@ -251,13 +256,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
attributes.put("s3.version", metadata.getVersionId());
}
} catch (final IOException | AmazonClientException ioe) {
getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ioe});
flowFile = extractExceptionDetails(ioe, session, flowFile);
getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", flowFile, ioe);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
} catch (final FlowFileAccessException ffae) {
if (ExceptionUtils.indexOfType(ffae, AmazonClientException.class) != -1) {
getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ffae});
getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", flowFile, ffae);
flowFile = extractExceptionDetails(ffae, session, flowFile);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@
@WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
@WritesAttribute(attribute = "s3.contenttype", description = "The S3 content type of the S3 Object that put in S3"),
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
@WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
@WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
@WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
@WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
@WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
@WritesAttribute(attribute = "s3.contentdisposition", description = "The content disposition of the S3 Object that put in S3"),
@WritesAttribute(attribute = "s3.cachecontrol", description = "The cache-control header of the S3 Object"),
Expand Down Expand Up @@ -833,6 +838,7 @@ public void process(final InputStream rawIn) throws IOException {
new Object[]{cacheKey, e.getMessage()});
}
} catch (final ProcessException | AmazonClientException pe) {
extractExceptionDetails(pe, session, flowFile);
if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
getLogger().info(pe.getMessage());
session.rollback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@
@SupportsBatching
@WritesAttributes({
@WritesAttribute(attribute = "s3.tag.___", description = "The tags associated with the S3 object will be " +
"written as part of the FlowFile attributes")})
"written as part of the FlowFile attributes"),
@WritesAttribute(attribute = "s3.exception", description = "The class name of the exception thrown during processor execution"),
@WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 supplied detail from the failed operation"),
@WritesAttribute(attribute = "s3.statusCode", description = "The HTTP error code (if available) from the failed operation"),
@WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker of the failed operation"),
@WritesAttribute(attribute = "s3.errorMessage", description = "The S3 exception message from the failed operation")})
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Tag"})
@InputRequirement(Requirement.INPUT_REQUIRED)
Expand Down Expand Up @@ -171,7 +176,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
s3.setObjectTagging(r);
} catch (final AmazonServiceException ase) {
getLogger().error("Failed to tag S3 Object for {}; routing to failure", new Object[]{flowFile, ase});
flowFile = extractExceptionDetails(ase, session, flowFile);
getLogger().error("Failed to tag S3 Object for {}; routing to failure", flowFile, ase);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package org.apache.nifi.processors.aws.s3;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.SdkClientException;
import com.google.common.collect.ImmutableMap;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
Expand Down Expand Up @@ -255,6 +258,72 @@ public void testGetObjectExceptionGoesToFailure() throws IOException {
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
}

@Test
public void testFetchObject_FailAdditionalAttributesBucketName() {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
runner.enqueue(new byte[0], attrs);

final AmazonS3Exception exception = new AmazonS3Exception("The specified bucket does not exist");
exception.setAdditionalDetails(ImmutableMap.of("BucketName", "us-east-1", "Error", "ABC123"));
exception.setErrorCode("NoSuchBucket");
exception.setStatusCode(HttpURLConnection.HTTP_NOT_FOUND);
Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
runner.run(1);

final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
assertEquals(1, flowFiles.size());
final MockFlowFile flowFile = flowFiles.iterator().next();
assertEquals("NoSuchBucket", flowFile.getAttribute("s3.errorCode"));
assertTrue(exception.getMessage().startsWith(flowFile.getAttribute("s3.errorMessage")));
assertEquals("404", flowFile.getAttribute("s3.statusCode"));
assertEquals(exception.getClass().getName(), flowFile.getAttribute("s3.exception"));
}

@Test
public void testFetchObject_FailAdditionalAttributesAuthentication() {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
runner.enqueue(new byte[0], attrs);

final AmazonS3Exception exception = new AmazonS3Exception("signature");
exception.setAdditionalDetails(ImmutableMap.of("CanonicalRequestBytes", "AA BB CC DD EE FF"));
exception.setErrorCode("SignatureDoesNotMatch");
exception.setStatusCode(HttpURLConnection.HTTP_FORBIDDEN);
Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
runner.run(1);

final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
assertEquals(1, flowFiles.size());
final MockFlowFile flowFile = flowFiles.iterator().next();
assertEquals("SignatureDoesNotMatch", flowFile.getAttribute("s3.errorCode"));
assertTrue(exception.getMessage().startsWith(flowFile.getAttribute("s3.errorMessage")));
assertEquals("403", flowFile.getAttribute("s3.statusCode"));
assertEquals(exception.getClass().getName(), flowFile.getAttribute("s3.exception"));
}

@Test
public void testFetchObject_FailAdditionalAttributesNetworkFailure() {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
runner.enqueue(new byte[0], attrs);

final SdkClientException exception = new SdkClientException("message");
Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
runner.run(1);

final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
assertEquals(1, flowFiles.size());
final MockFlowFile flowFile = flowFiles.iterator().next();
assertEquals(exception.getClass().getName(), flowFile.getAttribute("s3.exception"));
}

@Test
public void testGetObjectReturnsNull() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
Expand Down Expand Up @@ -284,6 +353,7 @@ public void testFlowFileAccessExceptionGoesToFailure() throws IOException {

runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
}

@Test
public void testGetPropertyDescriptors() throws Exception {
FetchS3Object processor = new FetchS3Object();
Expand Down