Skip to content

Commit

Permalink
CAMEL-11969 - Camel-AWS: add a deleteObject operation to the S3 Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
oscerd committed Oct 31, 2017
1 parent 1386c43 commit 4710347
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 38 deletions.
Expand Up @@ -19,6 +19,7 @@
public enum S3Operations {

copyObject,
deleteObject,
deleteBucket,
listBuckets
}
Expand Up @@ -38,6 +38,7 @@
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectResult;
import com.amazonaws.services.s3.model.DeleteBucketRequest;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
Expand All @@ -62,20 +63,19 @@
import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageForResponse;

/**
* A Producer which sends messages to the Amazon Web Service Simple Storage Service <a
* href="http://aws.amazon.com/s3/">AWS S3</a>
* A Producer which sends messages to the Amazon Web Service Simple Storage
* Service <a href="http://aws.amazon.com/s3/">AWS S3</a>
*/
public class S3Producer extends DefaultProducer {

private static final Logger LOG = LoggerFactory.getLogger(S3Producer.class);

private transient String s3ProducerToString;

public S3Producer(final Endpoint endpoint) {
super(endpoint);
}


@Override
public void process(final Exchange exchange) throws Exception {
S3Operations operation = determineOperation(exchange);
Expand All @@ -90,6 +90,9 @@ public void process(final Exchange exchange) throws Exception {
case copyObject:
copyObject(getEndpoint().getS3Client(), exchange);
break;
case deleteObject:
deleteObject(getEndpoint().getS3Client(), exchange);
break;
case listBuckets:
listBuckets(getEndpoint().getS3Client(), exchange);
break;
Expand All @@ -110,7 +113,7 @@ public void processMultiPart(final Exchange exchange) throws Exception {
obj = ((WrappedFile<?>)obj).getFile();
}
if (obj instanceof File) {
filePayload = (File) obj;
filePayload = (File)obj;
} else {
throw new InvalidArgumentException("aws-s3: MultiPart upload requires a File input.");
}
Expand All @@ -121,8 +124,7 @@ public void processMultiPart(final Exchange exchange) throws Exception {
}

final String keyName = determineKey(exchange);
final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(getConfiguration().getBucketName(),
keyName, objectMetadata);
final InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(getConfiguration().getBucketName(), keyName, objectMetadata);

String storageClass = determineStorageClass(exchange);
if (storageClass != null) {
Expand All @@ -137,7 +139,8 @@ public void processMultiPart(final Exchange exchange) throws Exception {

AccessControlList acl = exchange.getIn().getHeader(S3Constants.ACL, AccessControlList.class);
if (acl != null) {
// note: if cannedacl and acl are both specified the last one will be used. refer to
// note: if cannedacl and acl are both specified the last one will
// be used. refer to
// PutObjectRequest#setAccessControlList for more details
initRequest.setAccessControlList(acl);
}
Expand All @@ -152,34 +155,24 @@ public void processMultiPart(final Exchange exchange) throws Exception {

long filePosition = 0;


try {
for (int part = 1; filePosition < contentLength; part++) {
partSize = Math.min(partSize, contentLength - filePosition);

UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(getConfiguration().getBucketName()).withKey(keyName)
.withUploadId(initResponse.getUploadId()).withPartNumber(part)
.withFileOffset(filePosition)
.withFile(filePayload)
.withPartSize(partSize);
UploadPartRequest uploadRequest = new UploadPartRequest().withBucketName(getConfiguration().getBucketName()).withKey(keyName)
.withUploadId(initResponse.getUploadId()).withPartNumber(part).withFileOffset(filePosition).withFile(filePayload).withPartSize(partSize);

LOG.trace("Uploading part [{}] for {}", part, keyName);
partETags.add(getEndpoint().getS3Client().uploadPart(uploadRequest).getPartETag());

filePosition += partSize;
}
CompleteMultipartUploadRequest compRequest = new
CompleteMultipartUploadRequest(getConfiguration().getBucketName(),
keyName,
initResponse.getUploadId(),
partETags);
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(getConfiguration().getBucketName(), keyName, initResponse.getUploadId(), partETags);

uploadResult = getEndpoint().getS3Client().completeMultipartUpload(compRequest);

} catch (Exception e) {
getEndpoint().getS3Client().abortMultipartUpload(new AbortMultipartUploadRequest(
getConfiguration().getBucketName(), keyName, initResponse.getUploadId()));
getEndpoint().getS3Client().abortMultipartUpload(new AbortMultipartUploadRequest(getConfiguration().getBucketName(), keyName, initResponse.getUploadId()));
throw e;
}

Expand Down Expand Up @@ -208,13 +201,13 @@ public void processSingleOp(final Exchange exchange) throws Exception {
obj = ((WrappedFile<?>)obj).getFile();
}
if (obj instanceof File) {
filePayload = (File) obj;
filePayload = (File)obj;
is = new FileInputStream(filePayload);
} else {
is = exchange.getIn().getMandatoryBody(InputStream.class);
baos = determineLengthInputStream(is);
objectMetadata.setContentLength(baos.size());
is = new ByteArrayInputStream(baos.toByteArray());
is = new ByteArrayInputStream(baos.toByteArray());
}

putObjectRequest = new PutObjectRequest(getConfiguration().getBucketName(), determineKey(exchange), is, objectMetadata);
Expand All @@ -232,7 +225,8 @@ public void processSingleOp(final Exchange exchange) throws Exception {

AccessControlList acl = exchange.getIn().getHeader(S3Constants.ACL, AccessControlList.class);
if (acl != null) {
// note: if cannedacl and acl are both specified the last one will be used. refer to
// note: if cannedacl and acl are both specified the last one will
// be used. refer to
// PutObjectRequest#setAccessControlList for more details
putObjectRequest.setAccessControlList(acl);
}
Expand All @@ -255,14 +249,14 @@ public void processSingleOp(final Exchange exchange) throws Exception {
FileUtil.deleteFile(filePayload);
}
}

private void copyObject(AmazonS3 s3Client, Exchange exchange) {
String bucketNameDestination;
String destinationKey;
String sourceKey;
String bucketName;
String versionId;

bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
if (ObjectHelper.isEmpty(bucketName)) {
bucketName = getConfiguration().getBucketName();
Expand All @@ -271,7 +265,7 @@ private void copyObject(AmazonS3 s3Client, Exchange exchange) {
destinationKey = exchange.getIn().getHeader(S3Constants.DESTINATION_KEY, String.class);
bucketNameDestination = exchange.getIn().getHeader(S3Constants.BUCKET_DESTINATION_NAME, String.class);
versionId = exchange.getIn().getHeader(S3Constants.VERSION_ID, String.class);

if (ObjectHelper.isEmpty(bucketName)) {
throw new IllegalArgumentException("Bucket Name must be specified for copyObject Operation");
}
Expand All @@ -291,24 +285,48 @@ private void copyObject(AmazonS3 s3Client, Exchange exchange) {
copyObjectRequest = new CopyObjectRequest(bucketName, sourceKey, versionId, bucketNameDestination, destinationKey);
}
CopyObjectResult copyObjectResult = s3Client.copyObject(copyObjectRequest);

Message message = getMessageForResponse(exchange);
message.setHeader(S3Constants.E_TAG, copyObjectResult.getETag());
if (copyObjectResult.getVersionId() != null) {
message.setHeader(S3Constants.VERSION_ID, copyObjectResult.getVersionId());
}
}


private void deleteObject(AmazonS3 s3Client, Exchange exchange) {
String sourceKey;
String bucketName;

bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
if (ObjectHelper.isEmpty(bucketName)) {
bucketName = getConfiguration().getBucketName();
}
sourceKey = exchange.getIn().getHeader(S3Constants.KEY, String.class);

if (ObjectHelper.isEmpty(bucketName)) {
throw new IllegalArgumentException("Bucket Name must be specified for deleteObject Operation");
}
if (ObjectHelper.isEmpty(sourceKey)) {
throw new IllegalArgumentException("Source Key must be specified for deleteObject Operation");
}
DeleteObjectRequest deleteObjectRequest;
deleteObjectRequest = new DeleteObjectRequest(bucketName, sourceKey);
s3Client.deleteObject(deleteObjectRequest);

Message message = getMessageForResponse(exchange);
message.setBody(true);
}

private void listBuckets(AmazonS3 s3Client, Exchange exchange) {
List<Bucket> bucketsList = s3Client.listBuckets();

Message message = getMessageForResponse(exchange);
message.setBody(bucketsList);
}

private void deleteBucket(AmazonS3 s3Client, Exchange exchange) {
String bucketName;

bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
if (ObjectHelper.isEmpty(bucketName)) {
bucketName = getConfiguration().getBucketName();
Expand All @@ -317,7 +335,7 @@ private void deleteBucket(AmazonS3 s3Client, Exchange exchange) {
DeleteBucketRequest deleteBucketRequest = new DeleteBucketRequest(bucketName);
s3Client.deleteBucket(deleteBucketRequest);
}

private S3Operations determineOperation(Exchange exchange) {
S3Operations operation = exchange.getIn().getHeader(S3Constants.S3_OPERATION, S3Operations.class);
if (operation == null) {
Expand Down Expand Up @@ -400,7 +418,7 @@ private String determineStorageClass(final Exchange exchange) {

return storageClass;
}

private ByteArrayOutputStream determineLengthInputStream(InputStream is) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] bytes = new byte[1024];
Expand All @@ -425,7 +443,7 @@ public String toString() {

@Override
public S3Endpoint getEndpoint() {
return (S3Endpoint) super.getEndpoint();
return (S3Endpoint)super.getEndpoint();
}

}
Expand Up @@ -358,7 +358,7 @@ public void deleteObject(String bucketName, String key) throws AmazonClientExcep

@Override
public void deleteObject(DeleteObjectRequest deleteObjectRequest) throws AmazonClientException, AmazonServiceException {
throw new UnsupportedOperationException();
// noop
}

@Override
Expand Down
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.aws.s3;

import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.spring.CamelSpringTestSupport;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class S3ComponentDeleteObjectSpringTest extends CamelSpringTestSupport {

@EndpointInject(uri = "direct:start")
private ProducerTemplate template;

@EndpointInject(uri = "mock:result")
private MockEndpoint result;

private AmazonS3ClientMock client;

@Test
public void sendIn() throws Exception {
result.expectedMessageCount(1);

template.send("direct:deleteObject", ExchangePattern.InOnly, new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader(S3Constants.KEY, "camelKey");
}
});

assertMockEndpointsSatisfied();

assertResultExchange(result.getExchanges().get(0));

}

private void assertResultExchange(Exchange resultExchange) {
assertEquals(resultExchange.getIn().getBody(), true);
}

@Override
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry registry = super.createRegistry();

client = new AmazonS3ClientMock();
registry.bind("amazonS3Client", client);

return registry;
}

@Override
protected ClassPathXmlApplicationContext createApplicationContext() {
return new ClassPathXmlApplicationContext("org/apache/camel/component/aws/s3/S3ComponentSpringTest-context.xml");
}
}

0 comments on commit 4710347

Please sign in to comment.