Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build();

public static final Set<Relationship> relationships = Collections.unmodifiableSet(
public static Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));

public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteVersionRequest;

import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;


@SupportsBatching
@SeeAlso({PutS3Object.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
"And the FlowFiles are checked if exists or not before deleting.")
public class DeleteS3Object extends AbstractS3Processor {

public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
.name("Version")
.description("The Version of the Object to delete")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.build();

public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

final long startNanos = System.nanoTime();

final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();

final AmazonS3 s3 = getClient();

// Deletes a key on Amazon S3
try {
if (versionId == null) {
final DeleteObjectRequest r = new DeleteObjectRequest(bucket, key);
s3.deleteObject(r);
} else {
final DeleteVersionRequest r = new DeleteVersionRequest(bucket, key, versionId);
s3.deleteVersion(r);
}
} catch (final AmazonServiceException ase) {
getLogger().error("Failed to delete S3 Object for {}; routing to failure", new Object[]{flowFile, ase});
session.transfer(flowFile, REL_FAILURE);
return;
}

session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
org.apache.nifi.processors.aws.s3.FetchS3Object
org.apache.nifi.processors.aws.s3.PutS3Object
org.apache.nifi.processors.aws.s3.DeleteS3Object
org.apache.nifi.processors.aws.sns.PutSNS
org.apache.nifi.processors.aws.sqs.GetSQS
org.apache.nifi.processors.aws.sqs.PutSQS
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.aws.s3;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CreateBucketRequest;
import com.amazonaws.services.s3.model.DeleteBucketRequest;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;

import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;


//@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
public class TestDeleteS3Object {

private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";

// When you want to test this, you should create a bucket on Amazon S3 as follows.
private static final String TEST_REGION = "ap-northeast-1";
private static final String TEST_BUCKET = "test-bucket-00000000-0000-0000-0000-1234567890123";

@BeforeClass
public static void oneTimeSetUp() {
// Creates a new bucket for this test
try {
PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE));
AmazonS3Client client = new AmazonS3Client(credentials);
CreateBucketRequest request = new CreateBucketRequest(TEST_BUCKET, TEST_REGION);
client.createBucket(request);
} catch (final AmazonS3Exception e) {
System.out.println("Can't create the key " + TEST_BUCKET + ":" + e.toString());
} catch (final IOException e) {
System.out.println(CREDENTIALS_FILE + " doesn't exist.");
}
}

@AfterClass
public static void oneTimeTearDown() throws IOException {
// Delete a bucket for this test
PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE));
AmazonS3Client client = new AmazonS3Client(credentials);
DeleteBucketRequest dbr = new DeleteBucketRequest(TEST_BUCKET);
client.deleteBucket(dbr);
}

@Test
public void testSimpleDelete() throws IOException {
// Prepares for this test
uploadTestFile("hello.txt");

DeleteS3Object deleter = new DeleteS3Object();
final TestRunner runner = TestRunners.newTestRunner(deleter);
runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
runner.setProperty(DeleteS3Object.KEY, "hello.txt");

final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1);

runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
}

@Test
public void testDeleteFolder() throws IOException {
// Prepares for this test
uploadTestFile("folder/1.txt");

DeleteS3Object deleter = new DeleteS3Object();
final TestRunner runner = TestRunners.newTestRunner(deleter);
runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
runner.setProperty(DeleteS3Object.KEY, "folder/1.txt");

final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "hello.txt");
runner.enqueue(new byte[0], attrs);
runner.run(1);

runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
}

@Test
public void testTryToDeleteNotExistingFile() throws IOException {
DeleteS3Object deleter = new DeleteS3Object();
final TestRunner runner = TestRunners.newTestRunner(deleter);
runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
runner.setProperty(DeleteS3Object.BUCKET, "no-such-a-key");

final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "no-such-a-file");
runner.enqueue(new byte[0], attrs);
runner.run(1);

runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
}

// Uploads a test file
private void uploadTestFile(String key) throws IOException {
PropertiesCredentials credentials = new PropertiesCredentials(new FileInputStream(CREDENTIALS_FILE));
AmazonS3Client client = new AmazonS3Client(credentials);
URL fileURL = this.getClass().getClassLoader().getResource("hello.txt");
File file = new File(fileURL.getPath());
PutObjectRequest putRequest = new PutObjectRequest(TEST_BUCKET, key, file);
PutObjectResult result = client.putObject(putRequest);
}
}