From 1f8df2da50846317513a5b1890f7568368b9eb14 Mon Sep 17 00:00:00 2001 From: Naden Franciscus Date: Tue, 8 Aug 2017 14:55:40 +1000 Subject: [PATCH] NIFI-4256 - Add support for all AWS S3 Encryption Options --- .../aws/s3/AbstractS3Processor.java | 31 +- .../nifi/processors/aws/s3/FetchS3Object.java | 2 +- .../nifi/processors/aws/s3/PutS3Object.java | 23 +- .../S3ClientSideEncryptionService.java | 43 +++ .../S3ServerSideEncryptionService.java | 38 +++ ...StandardS3ClientSideEncryptionService.java | 293 ++++++++++++++++++ ...StandardS3ServerSideEncryptionService.java | 200 ++++++++++++ ...g.apache.nifi.controller.ControllerService | 4 +- .../processors/aws/s3/TestFetchS3Object.java | 3 +- .../processors/aws/s3/TestPutS3Object.java | 107 +++++-- pom.xml | 2 +- 11 files changed, 714 insertions(+), 32 deletions(-) create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/S3ClientSideEncryptionService.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/S3ServerSideEncryptionService.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/StandardS3ClientSideEncryptionService.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/StandardS3ServerSideEncryptionService.java diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java index d3725324ae26..df0072798209 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java @@ -41,6 +41,7 @@ import com.amazonaws.services.s3.model.Grantee; import com.amazonaws.services.s3.model.Owner; import com.amazonaws.services.s3.model.Permission; +import org.apache.nifi.processors.aws.s3.encryption.service.S3ClientSideEncryptionService; public abstract class AbstractS3Processor extends AbstractAWSCredentialsProviderProcessor { @@ -125,16 +126,31 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider new AllowableValue("S3SignerType", "Signature v2")) .defaultValue("Default Signature") .build(); + + public static final PropertyDescriptor CLIENT_SIDE_ENCRYPTION_SERVICE = new PropertyDescriptor.Builder() + .name("Client Side Encryption Service") + .description("Specifies an optional Client Side Encryption Service that, if provided, will be used to create connections") + .required(false) + .identifiesControllerService(S3ClientSideEncryptionService.class) + .build(); + /** * Create client using credentials provider. This is the preferred way for creating clients */ @Override protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { - getLogger().info("Creating client with credentials provider"); + S3ClientSideEncryptionService encryptionService = context.getProperty(CLIENT_SIDE_ENCRYPTION_SERVICE).asControllerService(S3ClientSideEncryptionService.class); + AmazonS3Client s3; initializeSignerOverride(context, config); - final AmazonS3Client s3 = new AmazonS3Client(credentialsProvider, config); + if (encryptionService != null && encryptionService.needsEncryptedClient()) { + getLogger().info("Creating encrypted client with credentials provider"); + s3 = encryptionService.encryptedClient(credentialsProvider, config); + }else{ + getLogger().info("Creating unencrypted client with credentials provider"); + s3 = new AmazonS3Client(credentialsProvider, config); + } initalizeEndpointOverride(context, s3); @@ -165,11 +181,18 @@ private void initializeSignerOverride(final ProcessContext context, final Client */ @Override protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { - getLogger().info("Creating client with AWS credentials"); + S3ClientSideEncryptionService encryptionService = context.getProperty(CLIENT_SIDE_ENCRYPTION_SERVICE).asControllerService(S3ClientSideEncryptionService.class); + AmazonS3Client s3; initializeSignerOverride(context, config); - final AmazonS3Client s3 = new AmazonS3Client(credentials, config); + if (encryptionService != null && encryptionService.needsEncryptedClient()) { + getLogger().info("Creating encrypted client with credentials provider"); + s3 = encryptionService.encryptedClient(credentials, config); + }else{ + getLogger().info("Creating unencrypted client with credentials provider"); + s3 = new AmazonS3Client(credentials, config); + } initalizeEndpointOverride(context, s3); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 4be487ea6c9a..43cb1edbc693 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -75,7 +75,7 @@ public class FetchS3Object extends AbstractS3Processor { public static final List properties = Collections.unmodifiableList( Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID, - SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT)); + SSL_CONTEXT_SERVICE, CLIENT_SIDE_ENCRYPTION_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT)); @Override protected List getSupportedPropertyDescriptors() { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 817b2e5fe887..582c726d6b0c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -58,6 +58,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.s3.encryption.service.S3ServerSideEncryptionService; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3Client; @@ -154,6 +155,14 @@ public class PutS3Object extends AbstractS3Processor { .defaultValue(StorageClass.Standard.name()) .build(); + + public static final PropertyDescriptor SERVER_SIDE_ENCRYPTION_SERVICE = new PropertyDescriptor.Builder() + .name("Server Side Encryption Service") + .description("Specifies an optional Server Side Encryption Service that, if provided, will be used to create connections") + .required(false) + .identifiesControllerService(S3ServerSideEncryptionService.class) + .build(); + public static final PropertyDescriptor MULTIPART_THRESHOLD = new PropertyDescriptor.Builder() .name("Multipart Threshold") .description("Specifies the file size threshold for switch from the PutS3Object API to the " + @@ -206,7 +215,7 @@ public class PutS3Object extends AbstractS3Processor { public static final List properties = Collections.unmodifiableList( Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, - FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, + FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, CLIENT_SIDE_ENCRYPTION_SERVICE, SERVER_SIDE_ENCRYPTION_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, PROXY_HOST, PROXY_HOST_PORT)); @@ -403,6 +412,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final Long multipartThreshold = context.getProperty(MULTIPART_THRESHOLD).asDataSize(DataUnit.B).longValue(); final Long multipartPartSize = context.getProperty(MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue(); + final S3ServerSideEncryptionService ssEncryptionService = context.getProperty(SERVER_SIDE_ENCRYPTION_SERVICE).asControllerService(S3ServerSideEncryptionService.class); + final long now = System.currentTimeMillis(); /* @@ -461,6 +472,11 @@ public void process(final InputStream rawIn) throws IOException { final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata); request.setStorageClass( StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); + + if (ssEncryptionService != null) { + ssEncryptionService.encrypt(request); + } + final AccessControlList acl = createACL(context, ff); if (acl != null) { request.setAccessControlList(acl); @@ -553,6 +569,11 @@ public void process(final InputStream rawIn) throws IOException { final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, key, objectMetadata); initiateRequest.setStorageClass(currentState.getStorageClass()); + + if (ssEncryptionService != null) { + ssEncryptionService.encrypt(initiateRequest); + } + final AccessControlList acl = createACL(context, ff); if (acl != null) { initiateRequest.setAccessControlList(acl); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/S3ClientSideEncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/S3ClientSideEncryptionService.java new file mode 100644 index 000000000000..829e413e7c7b --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/S3ClientSideEncryptionService.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption.service; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.AmazonS3Encryption; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; + +/** + * Definition for S3ClientSideEncryptionService. + * + */ +@Tags({"aws", "s3", "encryption", "client", "kms", "key"}) +@CapabilityDescription("Provides the ability to configure S3 Client Side Encryption once and reuse " + + "that configuration throughout the application") +public interface S3ClientSideEncryptionService extends ControllerService { + + boolean needsEncryptedClient(); + + AmazonS3Client encryptedClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration config); + + AmazonS3Client encryptedClient(AWSCredentials credentials, ClientConfiguration config); + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/S3ServerSideEncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/S3ServerSideEncryptionService.java new file mode 100644 index 000000000000..04f81b8536db --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/S3ServerSideEncryptionService.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption.service; + +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.PutObjectRequest; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; + +/** + * Definition for S3ServerSideEncryptionService. + * + */ +@Tags({"aws", "s3", "encryption", "server", "kms", "key"}) +@CapabilityDescription("Provides the ability to configure S3 Server Side Encryption once and reuse " + + "that configuration throughout the application") +public interface S3ServerSideEncryptionService extends ControllerService { + + void encrypt(PutObjectRequest putObjectRequest); + + void encrypt(InitiateMultipartUploadRequest initiateMultipartUploadRequest); + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/StandardS3ClientSideEncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/StandardS3ClientSideEncryptionService.java new file mode 100644 index 000000000000..73fedace5a63 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/StandardS3ClientSideEncryptionService.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption.service; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.AmazonS3EncryptionClient; +import com.amazonaws.services.s3.model.CryptoConfiguration; +import com.amazonaws.services.s3.model.CryptoMode; +import com.amazonaws.services.s3.model.CryptoStorageMode; +import com.amazonaws.services.s3.model.EncryptionMaterials; +import com.amazonaws.services.s3.model.KMSEncryptionMaterials; +import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.StringUtils; + +import javax.crypto.spec.SecretKeySpec; +import java.security.KeyFactory; +import java.security.KeyPair; +import java.security.spec.PKCS8EncodedKeySpec; +import java.security.spec.X509EncodedKeySpec; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@Tags({"aws", "s3", "encryption", "client", "kms", "key"}) +@CapabilityDescription("Provides the ability to configure S3 Client Side Encryption once and reuse " + + "that configuration throughout the application") +public class StandardS3ClientSideEncryptionService extends AbstractControllerService implements S3ClientSideEncryptionService { + + public static final String METHOD_CSE_MK = "Client Side Master Key"; + public static final String METHOD_CSE_KMS = "KMS-Managed Customer Master Key"; + + public static final String ALGORITHM_AES128 = "AES_128"; + public static final String ALGORITHM_AES256 = "AES_256"; + public static final String ALGORITHM_DES = "DES"; + public static final String ALGORITHM_RSA = "RSA"; + + public static final PropertyDescriptor ENCRYPTION_METHOD = new PropertyDescriptor.Builder() + .name("encryption-method") + .displayName("Encryption Method") + .required(true) + .allowableValues(METHOD_CSE_MK, METHOD_CSE_KMS) + .defaultValue(METHOD_CSE_MK) + .description("Method by which the S3 object will be encrypted client-side.") + .build(); + + public static final PropertyDescriptor KMS_CMK_ID = new PropertyDescriptor.Builder() + .name("kms-cmk-id") + .displayName("KMS Customer Master Key Id") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("Identifier belonging to the custom master key managed by the KMS.") + .build(); + + public static final PropertyDescriptor KMS_REGION = new PropertyDescriptor.Builder() + .name("kms-region") + .displayName("KMS Region") + .required(false) + .allowableValues(getAvailableRegions()) + .description("AWS region that contains the KMS.") + .build(); + + public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder() + .name("secret-key") + .displayName("Secret Key") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("Secret key used when performing symmetric client side encryption.") + .build(); + + public static final PropertyDescriptor SECRET_KEY_ALGORITHM = new PropertyDescriptor.Builder() + .name("secret-key-algorithm") + .displayName("Secret Key Algorithm") + .required(false) + .description("Secret key algorithm used when performing symmetric client side encryption.") + .build(); + + public static final PropertyDescriptor PRIVATE_KEY = new PropertyDescriptor.Builder() + .name("private-key") + .displayName("Private Key") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("Private key used when performing asymmetric client side encryption.") + .build(); + + public static final PropertyDescriptor PRIVATE_KEY_ALGORITHM = new PropertyDescriptor.Builder() + .name("private-key-algorithm") + .displayName("Private Key Algorithm") + .required(false) + .description("Private key algorithm used when performing asymmetric client side encryption.") + .build(); + + public static final PropertyDescriptor PUBLIC_KEY = new PropertyDescriptor.Builder() + .name("public-key") + .displayName("Public Key") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("Public key used when performing asymmetric client side encryption.") + .build(); + + public static final PropertyDescriptor PUBLIC_KEY_ALGORITHM = new PropertyDescriptor.Builder() + .name("public-key-algorithm") + .displayName("Public Key Algorithm") + .required(false) + .description("Public key algorithm used when performing asymmetric client side encryption.") + .build(); + + public static final PropertyDescriptor CRYPTO_MODE = new PropertyDescriptor.Builder() + .name("crypto-mode") + .displayName("Crypto Mode") + .required(false) + .allowableValues( + new AllowableValue(CryptoMode.StrictAuthenticatedEncryption.toString(), "Strict Authenticated Encryption"), + new AllowableValue(CryptoMode.AuthenticatedEncryption.toString(), "Authenticated Encryption"), + new AllowableValue(CryptoMode.EncryptionOnly.toString(), "Encryption Only")) + .description("Cryptographic mode used to secure an S3 object.") + .build(); + + public static final PropertyDescriptor CRYPTO_STORAGE_MODE = new PropertyDescriptor.Builder() + .name("storage-mode") + .displayName("Crypto Storage Mode") + .required(false) + .allowableValues( + new AllowableValue(CryptoStorageMode.InstructionFile.toString(), "Instruction File"), + new AllowableValue(CryptoStorageMode.ObjectMetadata.toString(), "Object Metadata")) + .description("Storage mode used to store encryption information when encrypting an S3 object.") + .build(); + + public static final PropertyDescriptor IGNORE_MISSING_INSTRUCTION_FILE = new PropertyDescriptor.Builder() + .name("ignore-missing-instruction-file") + .displayName("Ignore Missing Instruction File") + .required(false) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("false") + .description("Whether to ignore missing instruction files during decryption.") + .build(); + + private static final List properties; + private String encryptionMethod; + private String kmsCmkId; + private String kmsRegion; + private String secretKey; + private String secretKeyAlgorithm; + private String privateKey; + private String privateKeyAlgorithm; + private String publicKey; + private String publicKeyAlgorithm; + private String cryptoMode; + private String cryptoStorageMode; + private Boolean ignoreMissingInstructionFile; + + + static { + final List props = new ArrayList<>(); + props.add(ENCRYPTION_METHOD); + props.add(KMS_CMK_ID); + props.add(KMS_REGION); + props.add(SECRET_KEY); + props.add(SECRET_KEY_ALGORITHM); + props.add(PRIVATE_KEY); + props.add(PRIVATE_KEY_ALGORITHM); + props.add(PUBLIC_KEY); + props.add(PUBLIC_KEY_ALGORITHM); + props.add(CRYPTO_MODE); + props.add(CRYPTO_STORAGE_MODE); + props.add(IGNORE_MISSING_INSTRUCTION_FILE); + + properties = Collections.unmodifiableList(props); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException { + encryptionMethod = context.getProperty(ENCRYPTION_METHOD).getValue(); + kmsCmkId = context.getProperty(KMS_CMK_ID).getValue(); + kmsRegion = context.getProperty(KMS_REGION).getValue(); + secretKey = context.getProperty(SECRET_KEY).getValue(); + secretKeyAlgorithm = context.getProperty(SECRET_KEY_ALGORITHM).getValue(); + privateKey = context.getProperty(PRIVATE_KEY).getValue(); + privateKeyAlgorithm = context.getProperty(PRIVATE_KEY_ALGORITHM).getValue(); + publicKey = context.getProperty(PUBLIC_KEY).getValue(); + publicKeyAlgorithm = context.getProperty(PUBLIC_KEY_ALGORITHM).getValue(); + cryptoMode = context.getProperty(CRYPTO_MODE).getValue(); + cryptoStorageMode = context.getProperty(CRYPTO_STORAGE_MODE).getValue(); + ignoreMissingInstructionFile = context.getProperty(IGNORE_MISSING_INSTRUCTION_FILE).asBoolean(); + } + + public boolean needsEncryptedClient() { + return encryptionMethod != null && (encryptionMethod.equals(METHOD_CSE_KMS) || encryptionMethod.equals(METHOD_CSE_MK)); + } + + public AmazonS3Client encryptedClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration config) { + return new AmazonS3EncryptionClient(credentialsProvider, new StaticEncryptionMaterialsProvider(encryptionMaterials()), config, cryptoConfiguration()); + } + + public AmazonS3Client encryptedClient(AWSCredentials credentials, ClientConfiguration config) { + return new AmazonS3EncryptionClient(credentials, encryptionMaterials(), config, cryptoConfiguration()); + } + + private CryptoConfiguration cryptoConfiguration() { + CryptoConfiguration config = new CryptoConfiguration(); + + if (!StringUtils.isBlank(cryptoMode)) { + config.setCryptoMode(CryptoMode.valueOf(cryptoMode)); + } + + if (!StringUtils.isBlank(cryptoStorageMode)) { + config.setStorageMode(CryptoStorageMode.valueOf(cryptoStorageMode)); + } + + if (!StringUtils.isBlank(kmsRegion)) { + config.setAwsKmsRegion(Region.getRegion(Regions.fromName(kmsRegion))); + } + + config.setIgnoreMissingInstructionFile(ignoreMissingInstructionFile); + return config; + } + + private EncryptionMaterials encryptionMaterials() { + if (!StringUtils.isBlank(kmsCmkId)) { + return new KMSEncryptionMaterials(kmsCmkId); + } + + if (!StringUtils.isBlank(secretKey)) { + return new EncryptionMaterials(new SecretKeySpec(secretKey.getBytes(), secretKeyAlgorithm != null ? publicKeyAlgorithm : "RSA")); + } + + if (!StringUtils.isBlank(publicKey) && !StringUtils.isBlank(privateKey)) { + try { + KeyFactory publicKeyFactory = KeyFactory.getInstance(publicKeyAlgorithm != null ? publicKeyAlgorithm : "RSA"); + KeyFactory privateKeyFactory = KeyFactory.getInstance(privateKeyAlgorithm != null ? privateKeyAlgorithm : "RSA"); + X509EncodedKeySpec publicKeySpec = new X509EncodedKeySpec(publicKey.getBytes()); + PKCS8EncodedKeySpec privateKeySpec = new PKCS8EncodedKeySpec(privateKey.getBytes()); + KeyPair keyPair = new KeyPair(publicKeyFactory.generatePublic(publicKeySpec), privateKeyFactory.generatePrivate(privateKeySpec)); + return new EncryptionMaterials(keyPair); + }catch(Exception e) { + getLogger().info("Failed to create key pair based encryption materials: reason={}", new Object[]{e.getMessage()}); + return null; + } + } + + return null; + } + + private static AllowableValue[] getAvailableRegions() { + final List values = new ArrayList<>(); + for (final Regions regions : Regions.values()) { + values.add(new AllowableValue(regions.getName(), regions.getName(), regions.getName())); + } + return values.toArray(new AllowableValue[values.size()]); + } + + @Override + public String toString() { + return "StandardS3ClientSideEncryptionService[id=" + getIdentifier() + "]"; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/StandardS3ServerSideEncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/StandardS3ServerSideEncryptionService.java new file mode 100644 index 000000000000..cfa0b0a6806e --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/service/StandardS3ServerSideEncryptionService.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption.service; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; +import com.amazonaws.services.s3.model.SSECustomerKey; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; + +@Tags({"aws", "s3", "encryption", "server", "kms", "key"}) +@CapabilityDescription("Provides the ability to configure S3 Server Side Encryption once and reuse " + + "that configuration throughout the application") +public class StandardS3ServerSideEncryptionService extends AbstractControllerService implements S3ServerSideEncryptionService { + + public static final String METHOD_SSE_S3 = "SSE-S3"; + public static final String METHOD_SSE_KMS = "SSE-KMS"; + public static final String METHOD_SSE_C = "SSE-C"; + + public static final String ALGORITHM_AES256 = "AES256"; + public static final String CUSTOMER_ALGORITHM_AES256 = "AES256"; + + + public static final PropertyDescriptor ENCRYPTION_METHOD = new PropertyDescriptor.Builder() + .name("encryption-method") + .displayName("Encryption Method") + .required(true) + .allowableValues(METHOD_SSE_S3, METHOD_SSE_KMS, METHOD_SSE_C) + .defaultValue(METHOD_SSE_S3) + .description("Method by which the S3 object will be encrypted server-side.") + .build(); + + public static final PropertyDescriptor ALGORITHM = new PropertyDescriptor.Builder() + .name("sse-algorithm") + .displayName("Algorithm") + .allowableValues(ALGORITHM_AES256) + .defaultValue(ALGORITHM_AES256) + .description("Encryption algorithm to use (only AES256 currently supported)") + .build(); + + public static final PropertyDescriptor KMS_KEY_ID = new PropertyDescriptor.Builder() + .name("sse-kme-key-id") + .displayName("KMS Key Id") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("Custom KMS key identifier. Supports key or alias ARN.") + .build(); + + public static final PropertyDescriptor CUSTOMER_KEY = new PropertyDescriptor.Builder() + .name("sse-customer-key") + .displayName("Customer Key") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("Customer provided 256-bit, base64-encoded encryption key.") + .build(); + + public static final PropertyDescriptor CUSTOMER_KEY_MD5 = new PropertyDescriptor.Builder() + .name("sse-customer-key-md5") + .displayName("Customer Key MD5") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("Base64-encoded 128-bit MD5 digest of the encryption key.") + .build(); + + public static final PropertyDescriptor CUSTOMER_ALGORITHM = new PropertyDescriptor.Builder() + .name("sse-customer-algorithm") + .displayName("Customer Algorithm") + .allowableValues(CUSTOMER_ALGORITHM_AES256) + .defaultValue(CUSTOMER_ALGORITHM_AES256) + .description("Customer encryption algorithm to use (only AES256 currently supported)") + .build(); + + private static final List properties; + private String encryptionMethod; + private String algorithm; + private String kmsKeyId; + private String customerKey; + private String customerKeyMD5; + private String customerAlgorithm; + + + static { + final List props = new ArrayList<>(); + props.add(ENCRYPTION_METHOD); + props.add(ALGORITHM); + props.add(KMS_KEY_ID); + props.add(CUSTOMER_KEY); + props.add(CUSTOMER_KEY_MD5); + props.add(CUSTOMER_ALGORITHM); + properties = Collections.unmodifiableList(props); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException { + encryptionMethod = context.getProperty(ENCRYPTION_METHOD).getValue(); + algorithm = context.getProperty(ALGORITHM).getValue(); + kmsKeyId = context.getProperty(KMS_KEY_ID).getValue(); + customerKey = context.getProperty(CUSTOMER_KEY).getValue(); + customerKeyMD5 = context.getProperty(CUSTOMER_KEY_MD5).getValue(); + customerAlgorithm = context.getProperty(CUSTOMER_ALGORITHM).getValue(); + + if (algorithm == null) algorithm = ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION; + } + + public void encrypt(PutObjectRequest putObjectRequest) { + if (encryptionMethod == null) return; + + if (encryptionMethod.equals(METHOD_SSE_S3)) { + getLogger().info("Encrypting single part object using SSE-S3"); + putObjectRequest.getMetadata().setSSEAlgorithm(algorithm == null ? ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION : algorithm); + } + + if (encryptionMethod.equals(METHOD_SSE_KMS)) { + getLogger().info("Encrypting single part object using SSE-KMS"); + putObjectRequest.setSSEAwsKeyManagementParams(kmsKeyId == null ? new SSEAwsKeyManagementParams() : new SSEAwsKeyManagementParams(kmsKeyId)); + } + + if (encryptionMethod.equals(METHOD_SSE_C)) { + getLogger().info("Encrypting single part object using SSE-C"); + if (StringUtils.isNotBlank(customerKey)) { + putObjectRequest.setSSECustomerKey(new SSECustomerKey(customerKey)); + } + + String sseCustomerAlgorithm = customerAlgorithm == null ? ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION : customerAlgorithm; + putObjectRequest.getMetadata().setSSECustomerAlgorithm(sseCustomerAlgorithm); + + if (StringUtils.isNotBlank(customerKeyMD5)) { + putObjectRequest.getMetadata().setSSECustomerKeyMd5(customerKeyMD5); + } + } + } + + public void encrypt(InitiateMultipartUploadRequest initiateMultipartUploadRequest) { + if (encryptionMethod == null) return; + + if (encryptionMethod.equals(METHOD_SSE_S3)) { + getLogger().info("Encrypting multipart object using SSE-S3"); + initiateMultipartUploadRequest.getObjectMetadata().setSSEAlgorithm(algorithm == null ? ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION : algorithm); + } + + if (encryptionMethod.equals(METHOD_SSE_KMS)) { + getLogger().info("Encrypting multipart object using SSE-KMS"); + initiateMultipartUploadRequest.setSSEAwsKeyManagementParams(kmsKeyId == null ? new SSEAwsKeyManagementParams() : new SSEAwsKeyManagementParams(kmsKeyId)); + } + + if (encryptionMethod.equals(METHOD_SSE_C)) { + getLogger().info("Encrypting multipart object using SSE-C"); + if (StringUtils.isNotBlank(customerKey)) { + initiateMultipartUploadRequest.setSSECustomerKey(new SSECustomerKey(customerKey)); + } + + String sseCustomerAlgorithm = customerAlgorithm == null ? ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION : customerAlgorithm; + initiateMultipartUploadRequest.getObjectMetadata().setSSECustomerAlgorithm(sseCustomerAlgorithm); + + if (StringUtils.isNotBlank(customerKeyMD5)) { + initiateMultipartUploadRequest.getObjectMetadata().setSSECustomerKeyMd5(customerKeyMD5); + } + } + } + + @Override + public String toString() { + return "StandardS3ServerSideEncryptionService[id=" + getIdentifier() + "]"; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 5e2dea45979b..84e19bfb8a73 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -12,4 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService \ No newline at end of file +org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService +org.apache.nifi.processors.aws.s3.encryption.service.StandardS3ClientSideEncryptionService +org.apache.nifi.processors.aws.s3.encryption.service.StandardS3ServerSideEncryptionService diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java index 1ebf79bbd02d..e58fefd3ba54 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -178,7 +178,7 @@ public void testGetObjectExceptionGoesToFailure() throws IOException { public void testGetPropertyDescriptors() throws Exception { FetchS3Object processor = new FetchS3Object(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 14, pd.size()); + assertEquals("size should be eq", 15, pd.size()); assertTrue(pd.contains(FetchS3Object.ACCESS_KEY)); assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(FetchS3Object.BUCKET)); @@ -188,6 +188,7 @@ public void testGetPropertyDescriptors() throws Exception { assertTrue(pd.contains(FetchS3Object.REGION)); assertTrue(pd.contains(FetchS3Object.SECRET_KEY)); assertTrue(pd.contains(FetchS3Object.SIGNER_OVERRIDE)); + assertTrue(pd.contains(FetchS3Object.CLIENT_SIDE_ENCRYPTION_SERVICE)); assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE)); assertTrue(pd.contains(FetchS3Object.TIMEOUT)); assertTrue(pd.contains(FetchS3Object.VERSION_ID)); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index 0ee779240d52..55a7d509f981 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -21,10 +21,20 @@ import java.util.List; import java.util.Map; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.services.s3.AmazonS3EncryptionClient; +import com.amazonaws.services.s3.model.CryptoMode; +import com.amazonaws.services.s3.model.CryptoStorageMode; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.s3.encryption.service.S3ClientSideEncryptionService; +import org.apache.nifi.processors.aws.s3.encryption.service.S3ServerSideEncryptionService; +import org.apache.nifi.processors.aws.s3.encryption.service.StandardS3ClientSideEncryptionService; +import org.apache.nifi.processors.aws.s3.encryption.service.StandardS3ServerSideEncryptionService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -48,6 +58,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestPutS3Object { @@ -56,6 +68,9 @@ public class TestPutS3Object { private AmazonS3Client actualS3Client = null; private AmazonS3Client mockS3Client = null; + private S3ClientSideEncryptionService clientSideEncryptionService = null; + private S3ServerSideEncryptionService serverSideEncryptionService = null; + @Before public void setUp() { mockS3Client = Mockito.mock(AmazonS3Client.class); @@ -65,28 +80,17 @@ protected AmazonS3Client getClient() { return mockS3Client; } }; + + clientSideEncryptionService = new StandardS3ClientSideEncryptionService(); + serverSideEncryptionService = new StandardS3ServerSideEncryptionService(); + runner = TestRunners.newTestRunner(mockPutS3Object); } @Test public void testPutSinglePart() { - runner.setProperty(PutS3Object.REGION, "ap-northeast-1"); - runner.setProperty(PutS3Object.BUCKET, "test-bucket"); - runner.setProperty("x-custom-prop", "hello"); - final Map ffAttributes = new HashMap<>(); - ffAttributes.put("filename", "testfile.txt"); - runner.enqueue("Test Content", ffAttributes); - - PutObjectResult putObjectResult = Mockito.spy(PutObjectResult.class); - Date expiration = new Date(); - putObjectResult.setExpirationTime(expiration); - putObjectResult.setMetadata(new ObjectMetadata()); - putObjectResult.setVersionId("test-version"); - Mockito.when(putObjectResult.getETag()).thenReturn("test-etag"); - Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult); - MultipartUploadListing uploadListing = new MultipartUploadListing(); - Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing); - Mockito.when(mockS3Client.getResourceUrl(Mockito.anyString(), Mockito.anyString())).thenReturn("test-s3-url"); + enqueueDefaultTestFile(); + putS3TestFile(); runner.assertValid(); runner.run(1); @@ -106,11 +110,7 @@ public void testPutSinglePart() { @Test public void testPutSinglePartException() { - runner.setProperty(PutS3Object.REGION, "ap-northeast-1"); - runner.setProperty(PutS3Object.BUCKET, "test-bucket"); - final Map ffAttributes = new HashMap<>(); - ffAttributes.put("filename", "testfile.txt"); - runner.enqueue("Test Content", ffAttributes); + enqueueDefaultTestFile(); MultipartUploadListing uploadListing = new MultipartUploadListing(); Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing); @@ -122,6 +122,45 @@ public void testPutSinglePartException() { runner.assertAllFlowFilesTransferred(PutS3Object.REL_FAILURE, 1); } + @Test + public void testSinglePartClientSideEncryption() throws InitializationException { + runner.addControllerService("client-side-encryption-service", clientSideEncryptionService); + runner.setProperty(PutS3Object.CLIENT_SIDE_ENCRYPTION_SERVICE, "client-side-encryption-service"); + runner.setProperty(clientSideEncryptionService, StandardS3ClientSideEncryptionService.ENCRYPTION_METHOD, StandardS3ClientSideEncryptionService.METHOD_CSE_MK); + runner.setProperty(clientSideEncryptionService, StandardS3ClientSideEncryptionService.CRYPTO_MODE, CryptoMode.StrictAuthenticatedEncryption.toString()); + runner.setProperty(clientSideEncryptionService, StandardS3ClientSideEncryptionService.CRYPTO_STORAGE_MODE, CryptoStorageMode.InstructionFile.toString()); + runner.setProperty(clientSideEncryptionService, StandardS3ClientSideEncryptionService.KMS_REGION, "ap-northeast-1"); + runner.enableControllerService(clientSideEncryptionService); + + enqueueDefaultTestFile(); + putS3TestFile(); + + runner.assertValid(); + runner.run(1); + + assertTrue(clientSideEncryptionService.needsEncryptedClient()); + } + + @Test + public void testSinglePartServerSideEncryption() throws InitializationException { + runner.addControllerService("server-side-encryption-service", serverSideEncryptionService); + runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION_SERVICE, "server-side-encryption-service"); + runner.setProperty(serverSideEncryptionService, StandardS3ServerSideEncryptionService.ENCRYPTION_METHOD, StandardS3ServerSideEncryptionService.METHOD_SSE_KMS); + runner.setProperty(serverSideEncryptionService, StandardS3ServerSideEncryptionService.KMS_KEY_ID, "kms-key-id"); + runner.enableControllerService(serverSideEncryptionService); + + enqueueDefaultTestFile(); + putS3TestFile(); + + runner.assertValid(); + runner.run(1); + + ArgumentCaptor captureRequest = ArgumentCaptor.forClass(PutObjectRequest.class); + Mockito.verify(mockS3Client, Mockito.times(1)).putObject(captureRequest.capture()); + PutObjectRequest request = captureRequest.getValue(); + assertEquals("kms-key-id", request.getSSEAwsKeyManagementParams().getAwsKmsKeyId()); + } + @Test public void testSignerOverrideOptions() { final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); @@ -150,7 +189,7 @@ public void testSignerOverrideOptions() { public void testGetPropertyDescriptors() throws Exception { PutS3Object processor = new PutS3Object(); List pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 28, pd.size()); + assertEquals("size should be eq", 30, pd.size()); assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(PutS3Object.BUCKET)); @@ -166,6 +205,8 @@ public void testGetPropertyDescriptors() throws Exception { assertTrue(pd.contains(PutS3Object.SECRET_KEY)); assertTrue(pd.contains(PutS3Object.SIGNER_OVERRIDE)); assertTrue(pd.contains(PutS3Object.SSL_CONTEXT_SERVICE)); + assertTrue(pd.contains(PutS3Object.CLIENT_SIDE_ENCRYPTION_SERVICE)); + assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION_SERVICE)); assertTrue(pd.contains(PutS3Object.TIMEOUT)); assertTrue(pd.contains(PutS3Object.EXPIRATION_RULE_ID)); assertTrue(pd.contains(PutS3Object.STORAGE_CLASS)); @@ -174,4 +215,24 @@ public void testGetPropertyDescriptors() throws Exception { assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION)); } + private void putS3TestFile() { + PutObjectResult putObjectResult = Mockito.spy(PutObjectResult.class); + Date expiration = new Date(); + putObjectResult.setExpirationTime(expiration); + putObjectResult.setMetadata(new ObjectMetadata()); + putObjectResult.setVersionId("test-version"); + Mockito.when(putObjectResult.getETag()).thenReturn("test-etag"); + Mockito.when(mockS3Client.putObject(Mockito.any(PutObjectRequest.class))).thenReturn(putObjectResult); + MultipartUploadListing uploadListing = new MultipartUploadListing(); + Mockito.when(mockS3Client.listMultipartUploads(Mockito.any(ListMultipartUploadsRequest.class))).thenReturn(uploadListing); + Mockito.when(mockS3Client.getResourceUrl(Mockito.anyString(), Mockito.anyString())).thenReturn("test-s3-url"); + } + + private void enqueueDefaultTestFile() { + runner.setProperty(PutS3Object.REGION, "ap-northeast-1"); + runner.setProperty(PutS3Object.BUCKET, "test-bucket"); + final Map ffAttributes = new HashMap<>(); + ffAttributes.put("filename", "testfile.txt"); + runner.enqueue("Test Content", ffAttributes); + } } diff --git a/pom.xml b/pom.xml index 7c741255ecc0..7e5a0ba0f5d1 100644 --- a/pom.xml +++ b/pom.xml @@ -1607,7 +1607,7 @@ com.amazonaws aws-java-sdk - 1.11.68 + 1.11.172 com.squareup.okhttp3