Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HADOOP-13075] Adding support for SSE-KMS and SSE-C #113

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
<exclude>**/TestS3AFastOutputStream.java</exclude>
<exclude>**/TestS3AFileSystemContract.java</exclude>
<exclude>**/TestS3AMiniYarnCluster.java</exclude>
<exclude>**/TestS3AEncryptionSSEC.java</exclude>
<exclude>**/Test*Root*.java</exclude>
</excludes>
</configuration>
Expand All @@ -154,6 +155,7 @@
<include>**/TestS3AFastOutputStream.java</include>
<include>**/TestS3AFileSystemContract.java</include>
<include>**/TestS3AMiniYarnCluster.java</include>
<include>**/TestS3AEncryptionSSEC.java</include>
<include>**/Test*Root*.java</include>
</includes>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,8 @@ private Constants() {
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
"fs.s3a.server-side-encryption-algorithm";

/**
* The standard encryption algorithm AWS supports.
* Different implementations may support others (or none).
*/
public static final String SERVER_SIDE_ENCRYPTION_AES256 =
"AES256";
public static final String SERVER_SIDE_ENCRYPTION_KEY =
"fs.s3a.server-side-encryption-key";

//override signature algorithm used for signing requests
public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm";
Copy link
Contributor

@steveloughran steveloughran Jul 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this cut? This is directly referred to in {{TestS3AEncryption}}, a file which this patch doesn't touch. I don't think a clean build of this patch is going to work. Have you tried it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you said, that constant is only used in that test which I did change. I changed it to abstract and created 3 different implementations: one for SSE-S3, SSE-KMS and SSE-C. Basically I'm running all the tests in TestS3AEncryption, but with different encryption algorithms depending on the concrete class.
Yes, it builds and all test pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't pull values out of public interfaces like this, even if its not supported. They'd need to be tagged as deprecated

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

/**
* This enum is to centralize the encryption methods and
* the value required in the configuration.
*/
public enum S3AEncryptionMethods {

SSE_S3("AES256"),
SSE_KMS("SSE-KMS"),
SSE_C("SSE-C");

private String method;

S3AEncryptionMethods(String method) {
this.method = method;
}

public String getMethod() {
return method;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ private MultiPartUpload initiateMultiPartUpload() throws IOException {
new InitiateMultipartUploadRequest(bucket,
key,
createDefaultMetadata());
fs.setSSEKMSOrCIfRequired(initiateMPURequest);
initiateMPURequest.setCannedACL(cannedACL);
try {
return new MultiPartUpload(
Expand All @@ -295,6 +296,7 @@ private void putObject() throws IOException {
fs.newPutObjectRequest(key,
om,
new ByteArrayInputStream(buffer.toByteArray()));
fs.setSSEKMSOrCIfRequired(putObjectRequest);
putObjectRequest.setGeneralProgressListener(progressListener);
ListenableFuture<PutObjectResult> putObjectResult =
executorService.submit(new Callable<PutObjectResult>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.model.SSECustomerKey;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.Copy;
Expand Down Expand Up @@ -126,6 +130,7 @@ public class S3AFileSystem extends FileSystem {
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm;
private String serverSideEncryptionKey;
private S3AInstrumentation instrumentation;
private S3AStorageStatistics storageStatistics;
private long readAhead;
Expand Down Expand Up @@ -290,6 +295,8 @@ public StorageStatistics provide() {

serverSideEncryptionAlgorithm =
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
serverSideEncryptionKey = conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY);

inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
} catch (AmazonClientException e) {
Expand Down Expand Up @@ -621,9 +628,19 @@ public FSDataInputStream open(Path f, int bufferSize)
+ " because it is a directory");
}

return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
fileStatus.getLen(), s3, statistics, instrumentation, readAhead,
inputPolicy));
return new FSDataInputStream(
new S3AInputStream(
new S3ObjectAttributes(
bucket,
pathToKey(f),
serverSideEncryptionAlgorithm,
serverSideEncryptionKey),
fileStatus.getLen(),
s3,
statistics,
instrumentation,
readAhead,
inputPolicy));
}

/**
Expand Down Expand Up @@ -894,7 +911,10 @@ protected void incrementStatistic(Statistic statistic, long count) {
*/
protected ObjectMetadata getObjectMetadata(String key) {
incrementStatistic(OBJECT_METADATA_REQUESTS);
ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
GetObjectMetadataRequest request =
new GetObjectMetadataRequest(bucket, key);
setSSECIfRequired(request);
ObjectMetadata meta = s3.getObjectMetadata(request);
incrementReadOperations();
return meta;
}
Expand Down Expand Up @@ -974,6 +994,7 @@ public PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, File srcfile) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
srcfile);
setSSEKMSOrCIfRequired(putObjectRequest);
putObjectRequest.setCannedAcl(cannedACL);
putObjectRequest.setMetadata(metadata);
return putObjectRequest;
Expand All @@ -992,6 +1013,7 @@ PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, InputStream inputStream) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
inputStream, metadata);
setSSEKMSOrCIfRequired(putObjectRequest);
putObjectRequest.setCannedAcl(cannedACL);
return putObjectRequest;
}
Expand All @@ -1004,9 +1026,7 @@ PutObjectRequest newPutObjectRequest(String key,
*/
public ObjectMetadata newObjectMetadata() {
final ObjectMetadata om = new ObjectMetadata();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
setSSES3IfRequired(om);
return om;
}

Expand Down Expand Up @@ -1638,11 +1658,10 @@ private void copyFile(String srcKey, String dstKey, long size)
try {
ObjectMetadata srcom = getObjectMetadata(srcKey);
ObjectMetadata dstom = cloneObjectMetadata(srcom);
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
setSSES3IfRequired(dstom);
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
setSSEKMSOrCIfRequired(copyObjectRequest);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);

Expand Down Expand Up @@ -1674,6 +1693,111 @@ public void progressChanged(ProgressEvent progressEvent) {
}
}

protected void setSSEKMSOrCIfRequired(InitiateMultipartUploadRequest req) {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
if(S3AEncryptionMethods.SSE_KMS.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
//Use specified key
req.setSSEAwsKeyManagementParams(
new SSEAwsKeyManagementParams(serverSideEncryptionKey)
);
}else{
//Use default key
req.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams());
}
}else if(S3AEncryptionMethods.SSE_C.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
//at the moment, only supports copy using the same key
req.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
}
}
}
}


protected void setSSEKMSOrCIfRequired(CopyObjectRequest copyObjectRequest) {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
if(S3AEncryptionMethods.SSE_KMS.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
//Use specified key
copyObjectRequest.setSSEAwsKeyManagementParams(
new SSEAwsKeyManagementParams(serverSideEncryptionKey)
);
}else{
//Use default key
copyObjectRequest.setSSEAwsKeyManagementParams(
new SSEAwsKeyManagementParams()
);
}
}else if(S3AEncryptionMethods.SSE_C.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
//at the moment, only supports copy using the same key
copyObjectRequest.setSourceSSECustomerKey(
new SSECustomerKey(serverSideEncryptionKey)
);
copyObjectRequest.setDestinationSSECustomerKey(
new SSECustomerKey(serverSideEncryptionKey)
);
}
}
}
}

protected void setSSECIfRequired(GetObjectMetadataRequest request) {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
if(S3AEncryptionMethods.SSE_C.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
//at the moment, only supports copy using the same key
request.setSSECustomerKey(
new SSECustomerKey(serverSideEncryptionKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is three chained conditions which could be merged through &&

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, but not all of them can be merged. I'm relying in else clauses as well depending on some of the conditions being false. I'll try to rewrite it though and will see how it looks.

);
}
}
}
}

protected void setSSEKMSOrCIfRequired(PutObjectRequest request) {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
if(S3AEncryptionMethods.SSE_KMS.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
request.setSSEAwsKeyManagementParams(
new SSEAwsKeyManagementParams(serverSideEncryptionKey)
);
}else{
request.setSSEAwsKeyManagementParams(
new SSEAwsKeyManagementParams()
);
}
}else if(S3AEncryptionMethods.SSE_C.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
request.setSSECustomerKey(
new SSECustomerKey(serverSideEncryptionKey)
);
}
}
}
}

private void setSSES3IfRequired(ObjectMetadata metadata) {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
if(S3AEncryptionMethods.SSE_S3.getMethod()
.equals(serverSideEncryptionAlgorithm) ||
(!S3AEncryptionMethods.SSE_KMS.getMethod()
.equals(serverSideEncryptionAlgorithm) &&
!S3AEncryptionMethods.SSE_C.getMethod()
.equals(serverSideEncryptionAlgorithm))) {
metadata.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
}
}

/**
* Perform post-write actions.
* @param key key written to
Expand Down Expand Up @@ -1846,6 +1970,11 @@ public String toString() {
.append(serverSideEncryptionAlgorithm)
.append('\'');
}
if (serverSideEncryptionKey != null) {
sb.append(", serverSideEncryptionKey='")
.append(serverSideEncryptionKey)
.append('\'');
}
sb.append(", statistics {")
.append(statistics)
.append("}");
Expand Down
Loading