diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 1941bec6fd8d4..6848107e94852 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1138,7 +1138,18 @@
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; + +import org.apache.commons.lang3.StringUtils; + +/** + * This enum is to centralize the encryption methods and + * the value required in the configuration. + */ +public enum S3AEncryptionMethods { + + SSE_S3("AES256"), + SSE_KMS("SSE-KMS"), + SSE_C("SSE-C"), + NONE(""); + + private String method; + + S3AEncryptionMethods(String method) { + this.method = method; + } + + public String getMethod() { + return method; + } + + public static S3AEncryptionMethods getMethod(String name) throws IOException { + if(StringUtils.isBlank(name)) { + return NONE; + } + switch(name) { + case "AES256": + return SSE_S3; + case "SSE-KMS": + return SSE_KMS; + case "SSE-C": + return SSE_C; + default: + throw new IOException("Unknown Server Side algorithm "+name); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 8152bf80d3479..bffc210b03792 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -43,6 +43,7 @@ import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; @@ -51,6 +52,8 @@ import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; +import com.amazonaws.services.s3.model.SSECustomerKey; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.transfer.Copy; @@ -135,7 +138,7 @@ public class S3AFileSystem extends FileSystem { LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress"); private LocalDirAllocator directoryAllocator; private CannedAccessControlList cannedACL; - private String serverSideEncryptionAlgorithm; + private S3AEncryptionMethods serverSideEncryptionAlgorithm; private S3AInstrumentation instrumentation; private S3AStorageStatistics storageStatistics; private long readAhead; @@ -227,8 +230,17 @@ public StorageStatistics provide() { initMultipartUploads(conf); - serverSideEncryptionAlgorithm = - conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM); + serverSideEncryptionAlgorithm = S3AEncryptionMethods.getMethod( + conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM)); + if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) && + StringUtils.isBlank(getServerSideEncryptionKey(getConf()))) { + throw new IOException(Constants.SSE_C_NO_KEY_ERROR); + } + if(S3AEncryptionMethods.SSE_S3.equals(serverSideEncryptionAlgorithm) && + StringUtils.isNotBlank(getServerSideEncryptionKey( + getConf()))) { + throw new IOException(Constants.SSE_S3_WITH_KEY_ERROR); + } LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm); inputPolicy = S3AInputPolicy.getPolicy( conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL)); @@ -514,9 +526,18 @@ public FSDataInputStream open(Path f, int bufferSize) + " because it is a directory"); } - return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), - fileStatus.getLen(), s3, statistics, instrumentation, readAhead, - inputPolicy)); + return new FSDataInputStream( + new S3AInputStream(new S3ObjectAttributes( + bucket, + pathToKey(f), + serverSideEncryptionAlgorithm, + getServerSideEncryptionKey(getConf())), + fileStatus.getLen(), + s3, + statistics, + instrumentation, + readAhead, + inputPolicy)); } /** @@ -892,7 +913,14 @@ public S3AStorageStatistics getStorageStatistics() { */ protected ObjectMetadata getObjectMetadata(String key) { incrementStatistic(OBJECT_METADATA_REQUESTS); - ObjectMetadata meta = s3.getObjectMetadata(bucket, key); + GetObjectMetadataRequest request = + new GetObjectMetadataRequest(bucket, key); + //SSE-C requires to be filled in if enabled for object metadata + if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) && + StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))){ + request.setSSECustomerKey(generateSSECustomerKey()); + } + ObjectMetadata meta = s3.getObjectMetadata(request); incrementReadOperations(); return meta; } @@ -986,6 +1014,7 @@ public PutObjectRequest newPutObjectRequest(String key, ObjectMetadata metadata, File srcfile) { PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile); + setOptionalPutRequestParameters(putObjectRequest); putObjectRequest.setCannedAcl(cannedACL); putObjectRequest.setMetadata(metadata); return putObjectRequest; @@ -1004,6 +1033,7 @@ PutObjectRequest newPutObjectRequest(String key, ObjectMetadata metadata, InputStream inputStream) { PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, inputStream, metadata); + setOptionalPutRequestParameters(putObjectRequest); putObjectRequest.setCannedAcl(cannedACL); return putObjectRequest; } @@ -1016,9 +1046,7 @@ PutObjectRequest newPutObjectRequest(String key, */ public ObjectMetadata newObjectMetadata() { final ObjectMetadata om = new ObjectMetadata(); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } + setOptionalObjectMetadata(om); return om; } @@ -1752,11 +1780,10 @@ private void copyFile(String srcKey, String dstKey, long size) try { ObjectMetadata srcom = getObjectMetadata(srcKey); ObjectMetadata dstom = cloneObjectMetadata(srcom); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } + setOptionalObjectMetadata(dstom); CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey); + setOptionalCopyObjectRequestParameters(copyObjectRequest); copyObjectRequest.setCannedAccessControlList(cannedACL); copyObjectRequest.setNewObjectMetadata(dstom); @@ -1788,6 +1815,83 @@ public void progressChanged(ProgressEvent progressEvent) { } } + protected void setOptionalMultipartUploadRequestParameters( + InitiateMultipartUploadRequest req) { + switch (serverSideEncryptionAlgorithm) { + case SSE_KMS: + req.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams()); + break; + case SSE_C: + if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) { + //at the moment, only supports copy using the same key + req.setSSECustomerKey(generateSSECustomerKey()); + } + break; + default: + } + } + + + protected void setOptionalCopyObjectRequestParameters( + CopyObjectRequest copyObjectRequest) throws IOException { + switch (serverSideEncryptionAlgorithm) { + case SSE_KMS: + copyObjectRequest.setSSEAwsKeyManagementParams( + generateSSEAwsKeyParams() + ); + break; + case SSE_C: + if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) { + //at the moment, only supports copy using the same key + SSECustomerKey customerKey = generateSSECustomerKey(); + copyObjectRequest.setSourceSSECustomerKey(customerKey); + copyObjectRequest.setDestinationSSECustomerKey(customerKey); + } + break; + default: + } + } + + protected void setOptionalPutRequestParameters(PutObjectRequest request) { + switch (serverSideEncryptionAlgorithm) { + case SSE_KMS: + request.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams()); + break; + case SSE_C: + if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) { + request.setSSECustomerKey(generateSSECustomerKey()); + } + break; + default: + } + } + + private void setOptionalObjectMetadata(ObjectMetadata metadata) { + if (S3AEncryptionMethods.SSE_S3.equals(serverSideEncryptionAlgorithm)) { + metadata.setSSEAlgorithm(serverSideEncryptionAlgorithm.getMethod()); + } + } + + private SSEAwsKeyManagementParams generateSSEAwsKeyParams() { + //Use specified key, otherwise default to default master aws/s3 key by AWS + SSEAwsKeyManagementParams sseAwsKeyManagementParams = + new SSEAwsKeyManagementParams(); + if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) { + sseAwsKeyManagementParams = + new SSEAwsKeyManagementParams( + getServerSideEncryptionKey(getConf()) + ); + } + return sseAwsKeyManagementParams; + } + + private SSECustomerKey generateSSECustomerKey() { + SSECustomerKey customerKey = new SSECustomerKey( + getServerSideEncryptionKey(getConf()) + ); + return customerKey; + } + /** * Perform post-write actions. * @param key key written to @@ -2240,6 +2344,7 @@ String initiateMultiPartUpload() throws IOException { key, newObjectMetadata(-1)); initiateMPURequest.setCannedACL(cannedACL); + setOptionalMultipartUploadRequestParameters(initiateMPURequest); try { return s3.initiateMultipartUpload(initiateMPURequest) .getUploadId(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 3c4093d8cd135..7d322a50f7d55 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -22,6 +22,7 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.model.SSECustomerKey; import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -36,6 +37,7 @@ import java.io.EOFException; import java.io.IOException; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.s3a.S3AUtils.*; /** @@ -78,6 +80,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { private final String uri; public static final Logger LOG = S3AFileSystem.LOG; private final S3AInstrumentation.InputStreamStatistics streamStatistics; + private S3AEncryptionMethods serverSideEncryptionAlgorithm; + private String serverSideEncryptionKey; private final S3AInputPolicy inputPolicy; private long readahead = Constants.DEFAULT_READAHEAD_RANGE; @@ -98,24 +102,26 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { */ private long contentRangeStart; - public S3AInputStream(String bucket, - String key, + public S3AInputStream(S3ObjectAttributes s3Attributes, long contentLength, AmazonS3 client, FileSystem.Statistics stats, S3AInstrumentation instrumentation, long readahead, S3AInputPolicy inputPolicy) { - Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket"); - Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key"); - Preconditions.checkArgument(contentLength >= 0 , "Negative content length"); - this.bucket = bucket; - this.key = key; + Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), "No Bucket"); + Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key"); + Preconditions.checkArgument(contentLength >= 0, "Negative content length"); + this.bucket = s3Attributes.getBucket(); + this.key = s3Attributes.getKey(); this.contentLength = contentLength; this.client = client; this.stats = stats; this.uri = "s3a://" + this.bucket + "/" + this.key; this.streamStatistics = instrumentation.newInputStreamStatistics(); + this.serverSideEncryptionAlgorithm = + s3Attributes.getServerSideEncryptionAlgorithm(); + this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey(); this.inputPolicy = inputPolicy; setReadahead(readahead); } @@ -145,6 +151,10 @@ private synchronized void reopen(String reason, long targetPos, long length) try { GetObjectRequest request = new GetObjectRequest(bucket, key) .withRange(targetPos, contentRangeFinish); + if (S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) && + StringUtils.isNotBlank(serverSideEncryptionKey)){ + request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey)); + } wrappedStream = client.getObject(request).getObjectContent(); contentRangeStart = targetPos; if (wrappedStream == null) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index c4ff638f9237a..53112118bf11c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -723,4 +723,14 @@ static void patchSecurityCredentialProviders(Configuration conf) { "patch of " + S3A_SECURITY_CREDENTIAL_PROVIDER_PATH); } } + + static String getServerSideEncryptionKey(Configuration conf) { + try { + return getPassword(conf, Constants.SERVER_SIDE_ENCRYPTION_KEY, + conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY)); + } catch (IOException e) { + LOG.error("Cannot retrieve SERVER_SIDE_ENCRYPTION_KEY", e); + } + return null; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java new file mode 100644 index 0000000000000..7c73a23f7925d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * This class is only a holder for bucket, key, SSE Algorithm and SSE key
+ * attributes. It is only used in {@link S3AInputStream}
+ * as a way to reduce parameters being passed
+ * to the constructor of such class.
+ */
+class S3ObjectAttributes {
+ private String bucket;
+ private String key;
+ private S3AEncryptionMethods serverSideEncryptionAlgorithm;
+ private String serverSideEncryptionKey;
+
+ public S3ObjectAttributes(
+ String bucket,
+ String key,
+ S3AEncryptionMethods serverSideEncryptionAlgorithm,
+ String serverSideEncryptionKey) {
+ this.bucket = bucket;
+ this.key = key;
+ this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
+ this.serverSideEncryptionKey = serverSideEncryptionKey;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
+ return serverSideEncryptionAlgorithm;
+ }
+
+ public String getServerSideEncryptionKey() {
+ return serverSideEncryptionKey;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index d804a596207a9..2471a521316b0 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -840,10 +840,20 @@ from placing its declaration on the command line.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.junit.Test;
+
+/**
+ * Concrete class that extends {@link AbstractTestS3AEncryption}
+ * and tests SSE-C encryption.
+ */
+public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
+ getSSEAlgorithm().getMethod());
+ conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
+ "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=");
+ return conf;
+ }
+
+ /**
+ * This will create and write to a file using encryption key A, then attempt
+ * to read from it again with encryption key B. This will not work as it
+ * cannot decrypt the file.
+ * @throws Exception
+ */
+ @Test
+ public void testCreateFileAndReadWithDifferentEncryptionKey() throws
+ Exception {
+ final Path[] path = new Path[1];
+ intercept(java.nio.file.AccessDeniedException.class,
+ "Forbidden (Service: Amazon S3; Status Code: 403;", () -> {
+
+ int len = 2048;
+ skipIfEncryptionTestsDisabled(getConfiguration());
+ describe("Create an encrypted file of size " + len);
+ String src = createFilename(len);
+ path[0] = writeThenReadFile(src, len);
+
+ Configuration conf = this.createConfiguration();
+ conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
+ "kX7SdwVc/1VXJr76kfKnkQ3ONYhxianyL2+C3rPVT9s=");
+
+ S3AContract contract = (S3AContract) createContract(conf);
+ contract.init();
+ //skip tests if they aren't enabled
+ assumeEnabled();
+ //extract the test FS
+ FileSystem fileSystem = contract.getTestFileSystem();
+ byte[] data = dataset(len, 'a', 'z');
+ ContractTestUtils.verifyFileContents(fileSystem, path[0], data);
+ throw new Exception("Fail");
+ });
+ rm(getFileSystem(), path[0], false, false);
+ }
+
+ @Override
+ protected S3AEncryptionMethods getSSEAlgorithm() {
+ return S3AEncryptionMethods.SSE_C;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSECBlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSECBlockOutputStream.java
new file mode 100644
index 0000000000000..afa044129350a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSECBlockOutputStream.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run the encryption tests against the Fast output stream.
+ * This verifies that both file writing paths can encrypt their data.
+ */
+
+public class ITestS3AEncryptionSSECBlockOutputStream
+ extends AbstractTestS3AEncryption {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ conf.setBoolean(Constants.FAST_UPLOAD, true);
+ conf.set(Constants.FAST_UPLOAD_BUFFER,
+ Constants.FAST_UPLOAD_BYTEBUFFER);
+ conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
+ "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=");
+ return conf;
+ }
+
+ @Override
+ protected S3AEncryptionMethods getSSEAlgorithm() {
+ return S3AEncryptionMethods.SSE_C;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java
new file mode 100644
index 0000000000000..8b68fcfb880bd
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.hamcrest.CoreMatchers.containsString;
+
+import java.io.IOException;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Concrete class that extends {@link AbstractTestS3AEncryption}
+ * and tests SSE-KMS encryption when no KMS encryption key is provided and AWS
+ * uses the default. Since this resource changes for every account and region,
+ * there is no good way to explicitly set this value to do a equality check
+ * in the response.
+ */
+public class ITestS3AEncryptionSSEKMSDefaultKey
+ extends AbstractTestS3AEncryption {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
+ return conf;
+ }
+
+ @Override
+ protected S3AEncryptionMethods getSSEAlgorithm() {
+ return S3AEncryptionMethods.SSE_KMS;
+ }
+
+ @Override
+ protected void assertEncrypted(Path path) throws IOException {
+ ObjectMetadata md = getFileSystem().getObjectMetadata(path);
+ assertEquals("aws:kms", md.getSSEAlgorithm());
+ assertThat(md.getSSEAwsKmsKeyId(), containsString("arn:aws:kms:"));
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
new file mode 100644
index 0000000000000..50c9fb554e261
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Concrete class that extends {@link AbstractTestS3AEncryption}
+ * and tests SSE-KMS encryption. This requires the SERVER_SIDE_ENCRYPTION_KEY
+ * to be set in auth-keys.xml for it to run.
+ */
+public class ITestS3AEncryptionSSEKMSUserDefinedKey
+ extends AbstractTestS3AEncryption {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ if(StringUtils.isBlank(conf.get(Constants.SERVER_SIDE_ENCRYPTION_KEY))){
+ skip(Constants.SERVER_SIDE_ENCRYPTION_KEY+ " is not set for " +
+ S3AEncryptionMethods.SSE_KMS.getMethod());
+ }
+ return conf;
+ }
+
+ @Override
+ protected S3AEncryptionMethods getSSEAlgorithm() {
+ return S3AEncryptionMethods.SSE_KMS;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream.java
new file mode 100644
index 0000000000000..8ce3a13791407
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run the encryption tests against the Fast output stream.
+ * This verifies that both file writing paths can encrypt their data. This
+ * requires the SERVER_SIDE_ENCRYPTION_KEY to be set in auth-keys.xml for it
+ * to run.
+ */
+public class ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream
+ extends AbstractTestS3AEncryption {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ if(StringUtils.isBlank(conf.get(Constants.SERVER_SIDE_ENCRYPTION_KEY))){
+ skip(Constants.SERVER_SIDE_ENCRYPTION_KEY+ " is not set for " +
+ S3AEncryptionMethods.SSE_KMS.getMethod());
+ }
+ conf.setBoolean(Constants.FAST_UPLOAD, true);
+ conf.set(Constants.FAST_UPLOAD_BUFFER,
+ Constants.FAST_UPLOAD_BYTEBUFFER);
+ return conf;
+ }
+
+ @Override
+ protected S3AEncryptionMethods getSSEAlgorithm() {
+ return S3AEncryptionMethods.SSE_KMS;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java
new file mode 100644
index 0000000000000..33a252a68b37a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Concrete class that extends {@link AbstractTestS3AEncryption}
+ * and tests SSE-S3 encryption.
+ */
+public class ITestS3AEncryptionSSES3 extends AbstractTestS3AEncryption {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ //must specify encryption key as empty because SSE-S3 does not allow it,
+ //nor can it be null.
+ conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
+ return conf;
+ }
+
+ @Override
+ protected S3AEncryptionMethods getSSEAlgorithm() {
+ return S3AEncryptionMethods.SSE_S3;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3BlockOutputStream.java
similarity index 77%
rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java
rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3BlockOutputStream.java
index 5239f30b4a10c..407601f1a0380 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3BlockOutputStream.java
@@ -23,7 +23,8 @@
/**
* Run the encryption tests against the block output stream.
*/
-public class ITestS3AEncryptionBlockOutputStream extends ITestS3AEncryption {
+public class ITestS3AEncryptionSSES3BlockOutputStream
+ extends AbstractTestS3AEncryption {
@Override
protected Configuration createConfiguration() {
@@ -31,6 +32,14 @@ protected Configuration createConfiguration() {
conf.setBoolean(Constants.FAST_UPLOAD, true);
conf.set(Constants.FAST_UPLOAD_BUFFER,
Constants.FAST_UPLOAD_BYTEBUFFER);
+ //must specify encryption key as empty because SSE-S3 does not allow it,
+ //nor can it be null.
+ conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
return conf;
}
+
+ @Override
+ protected S3AEncryptionMethods getSSEAlgorithm() {
+ return S3AEncryptionMethods.SSE_S3;
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
index f9e9c6bc74a0e..a5dc01ab87a59 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
@@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.Date;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
@@ -34,6 +35,9 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
import org.junit.Test;
/**
@@ -48,7 +52,8 @@ public void testFile() throws Exception {
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(1L);
meta.setLastModified(new Date(2L));
- when(s3.getObjectMetadata(BUCKET, key)).thenReturn(meta);
+ when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
+ .thenReturn(meta);
FileStatus stat = fs.getFileStatus(path);
assertNotNull(stat);
assertEquals(fs.makeQualified(path), stat.getPath());
@@ -61,10 +66,13 @@ public void testFile() throws Exception {
public void testFakeDirectory() throws Exception {
Path path = new Path("/dir");
String key = path.toUri().getPath().substring(1);
- when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
+ when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
+ .thenThrow(NOT_FOUND);
ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(0L);
- when(s3.getObjectMetadata(BUCKET, key + "/")).thenReturn(meta);
+ when(s3.getObjectMetadata(argThat(
+ correctGetMetadataRequest(BUCKET, key + "/"))
+ )).thenReturn(meta);
FileStatus stat = fs.getFileStatus(path);
assertNotNull(stat);
assertEquals(fs.makeQualified(path), stat.getPath());
@@ -75,8 +83,11 @@ public void testFakeDirectory() throws Exception {
public void testImplicitDirectory() throws Exception {
Path path = new Path("/dir");
String key = path.toUri().getPath().substring(1);
- when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
- when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
+ when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
+ .thenThrow(NOT_FOUND);
+ when(s3.getObjectMetadata(argThat(
+ correctGetMetadataRequest(BUCKET, key + "/"))
+ )).thenThrow(NOT_FOUND);
ObjectListing objects = mock(ObjectListing.class);
when(objects.getCommonPrefixes()).thenReturn(
Collections.singletonList("dir/"));
@@ -93,8 +104,11 @@ public void testImplicitDirectory() throws Exception {
public void testRoot() throws Exception {
Path path = new Path("/");
String key = path.toUri().getPath().substring(1);
- when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
- when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
+ when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
+ .thenThrow(NOT_FOUND);
+ when(s3.getObjectMetadata(argThat(
+ correctGetMetadataRequest(BUCKET, key + "/")
+ ))).thenThrow(NOT_FOUND);
ObjectListing objects = mock(ObjectListing.class);
when(objects.getCommonPrefixes()).thenReturn(
Collections.