-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HADOOP-13075] Adding support for SSE-KMS and SSE-C #113
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* <p> | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* <p> | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hadoop.fs.s3a; | ||
|
||
/** | ||
* This enum is to centralize the encryption methods and | ||
* the value required in the configuration. | ||
*/ | ||
public enum S3AEncryptionMethods { | ||
|
||
SSE_S3("AES256"), | ||
SSE_KMS("SSE-KMS"), | ||
SSE_C("SSE-C"); | ||
|
||
private String method; | ||
|
||
S3AEncryptionMethods(String method) { | ||
this.method = method; | ||
} | ||
|
||
public String getMethod() { | ||
return method; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,12 +48,16 @@ | |
import com.amazonaws.services.s3.model.AmazonS3Exception; | ||
import com.amazonaws.services.s3.model.CannedAccessControlList; | ||
import com.amazonaws.services.s3.model.DeleteObjectsRequest; | ||
import com.amazonaws.services.s3.model.GetObjectMetadataRequest; | ||
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; | ||
import com.amazonaws.services.s3.model.ListObjectsRequest; | ||
import com.amazonaws.services.s3.model.ObjectListing; | ||
import com.amazonaws.services.s3.model.ObjectMetadata; | ||
import com.amazonaws.services.s3.model.PutObjectRequest; | ||
import com.amazonaws.services.s3.model.CopyObjectRequest; | ||
import com.amazonaws.services.s3.model.S3ObjectSummary; | ||
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; | ||
import com.amazonaws.services.s3.model.SSECustomerKey; | ||
import com.amazonaws.services.s3.model.UploadPartRequest; | ||
import com.amazonaws.services.s3.model.UploadPartResult; | ||
import com.amazonaws.services.s3.transfer.Copy; | ||
|
@@ -126,6 +130,7 @@ public class S3AFileSystem extends FileSystem { | |
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); | ||
private CannedAccessControlList cannedACL; | ||
private String serverSideEncryptionAlgorithm; | ||
private String serverSideEncryptionKey; | ||
private S3AInstrumentation instrumentation; | ||
private S3AStorageStatistics storageStatistics; | ||
private long readAhead; | ||
|
@@ -290,6 +295,8 @@ public StorageStatistics provide() { | |
|
||
serverSideEncryptionAlgorithm = | ||
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM); | ||
serverSideEncryptionKey = conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY); | ||
|
||
inputPolicy = S3AInputPolicy.getPolicy( | ||
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL)); | ||
} catch (AmazonClientException e) { | ||
|
@@ -621,9 +628,19 @@ public FSDataInputStream open(Path f, int bufferSize) | |
+ " because it is a directory"); | ||
} | ||
|
||
return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), | ||
fileStatus.getLen(), s3, statistics, instrumentation, readAhead, | ||
inputPolicy)); | ||
return new FSDataInputStream( | ||
new S3AInputStream( | ||
new S3ObjectAttributes( | ||
bucket, | ||
pathToKey(f), | ||
serverSideEncryptionAlgorithm, | ||
serverSideEncryptionKey), | ||
fileStatus.getLen(), | ||
s3, | ||
statistics, | ||
instrumentation, | ||
readAhead, | ||
inputPolicy)); | ||
} | ||
|
||
/** | ||
|
@@ -894,7 +911,10 @@ protected void incrementStatistic(Statistic statistic, long count) { | |
*/ | ||
protected ObjectMetadata getObjectMetadata(String key) { | ||
incrementStatistic(OBJECT_METADATA_REQUESTS); | ||
ObjectMetadata meta = s3.getObjectMetadata(bucket, key); | ||
GetObjectMetadataRequest request = | ||
new GetObjectMetadataRequest(bucket, key); | ||
setSSECIfRequired(request); | ||
ObjectMetadata meta = s3.getObjectMetadata(request); | ||
incrementReadOperations(); | ||
return meta; | ||
} | ||
|
@@ -974,6 +994,7 @@ public PutObjectRequest newPutObjectRequest(String key, | |
ObjectMetadata metadata, File srcfile) { | ||
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, | ||
srcfile); | ||
setSSEKMSOrCIfRequired(putObjectRequest); | ||
putObjectRequest.setCannedAcl(cannedACL); | ||
putObjectRequest.setMetadata(metadata); | ||
return putObjectRequest; | ||
|
@@ -992,6 +1013,7 @@ PutObjectRequest newPutObjectRequest(String key, | |
ObjectMetadata metadata, InputStream inputStream) { | ||
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, | ||
inputStream, metadata); | ||
setSSEKMSOrCIfRequired(putObjectRequest); | ||
putObjectRequest.setCannedAcl(cannedACL); | ||
return putObjectRequest; | ||
} | ||
|
@@ -1004,9 +1026,7 @@ PutObjectRequest newPutObjectRequest(String key, | |
*/ | ||
public ObjectMetadata newObjectMetadata() { | ||
final ObjectMetadata om = new ObjectMetadata(); | ||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { | ||
om.setSSEAlgorithm(serverSideEncryptionAlgorithm); | ||
} | ||
setSSES3IfRequired(om); | ||
return om; | ||
} | ||
|
||
|
@@ -1638,11 +1658,10 @@ private void copyFile(String srcKey, String dstKey, long size) | |
try { | ||
ObjectMetadata srcom = getObjectMetadata(srcKey); | ||
ObjectMetadata dstom = cloneObjectMetadata(srcom); | ||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { | ||
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); | ||
} | ||
setSSES3IfRequired(dstom); | ||
CopyObjectRequest copyObjectRequest = | ||
new CopyObjectRequest(bucket, srcKey, bucket, dstKey); | ||
setSSEKMSOrCIfRequired(copyObjectRequest); | ||
copyObjectRequest.setCannedAccessControlList(cannedACL); | ||
copyObjectRequest.setNewObjectMetadata(dstom); | ||
|
||
|
@@ -1674,6 +1693,111 @@ public void progressChanged(ProgressEvent progressEvent) { | |
} | ||
} | ||
|
||
protected void setSSEKMSOrCIfRequired(InitiateMultipartUploadRequest req) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){ | ||
if(S3AEncryptionMethods.SSE_KMS.getMethod() | ||
.equals(serverSideEncryptionAlgorithm)) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionKey)) { | ||
//Use specified key | ||
req.setSSEAwsKeyManagementParams( | ||
new SSEAwsKeyManagementParams(serverSideEncryptionKey) | ||
); | ||
}else{ | ||
//Use default key | ||
req.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams()); | ||
} | ||
}else if(S3AEncryptionMethods.SSE_C.getMethod() | ||
.equals(serverSideEncryptionAlgorithm)) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionKey)) { | ||
//at the moment, only supports copy using the same key | ||
req.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey)); | ||
} | ||
} | ||
} | ||
} | ||
|
||
|
||
protected void setSSEKMSOrCIfRequired(CopyObjectRequest copyObjectRequest) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){ | ||
if(S3AEncryptionMethods.SSE_KMS.getMethod() | ||
.equals(serverSideEncryptionAlgorithm)) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionKey)) { | ||
//Use specified key | ||
copyObjectRequest.setSSEAwsKeyManagementParams( | ||
new SSEAwsKeyManagementParams(serverSideEncryptionKey) | ||
); | ||
}else{ | ||
//Use default key | ||
copyObjectRequest.setSSEAwsKeyManagementParams( | ||
new SSEAwsKeyManagementParams() | ||
); | ||
} | ||
}else if(S3AEncryptionMethods.SSE_C.getMethod() | ||
.equals(serverSideEncryptionAlgorithm)) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionKey)) { | ||
//at the moment, only supports copy using the same key | ||
copyObjectRequest.setSourceSSECustomerKey( | ||
new SSECustomerKey(serverSideEncryptionKey) | ||
); | ||
copyObjectRequest.setDestinationSSECustomerKey( | ||
new SSECustomerKey(serverSideEncryptionKey) | ||
); | ||
} | ||
} | ||
} | ||
} | ||
|
||
protected void setSSECIfRequired(GetObjectMetadataRequest request) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){ | ||
if(S3AEncryptionMethods.SSE_C.getMethod() | ||
.equals(serverSideEncryptionAlgorithm)) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionKey)) { | ||
//at the moment, only supports copy using the same key | ||
request.setSSECustomerKey( | ||
new SSECustomerKey(serverSideEncryptionKey) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is three chained conditions which could be merged through There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. true, but not all of them can be merged. I'm relying in else clauses as well depending on some of the conditions being false. I'll try to rewrite it though and will see how it looks. |
||
); | ||
} | ||
} | ||
} | ||
} | ||
|
||
protected void setSSEKMSOrCIfRequired(PutObjectRequest request) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){ | ||
if(S3AEncryptionMethods.SSE_KMS.getMethod() | ||
.equals(serverSideEncryptionAlgorithm)) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionKey)) { | ||
request.setSSEAwsKeyManagementParams( | ||
new SSEAwsKeyManagementParams(serverSideEncryptionKey) | ||
); | ||
}else{ | ||
request.setSSEAwsKeyManagementParams( | ||
new SSEAwsKeyManagementParams() | ||
); | ||
} | ||
}else if(S3AEncryptionMethods.SSE_C.getMethod() | ||
.equals(serverSideEncryptionAlgorithm)) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionKey)) { | ||
request.setSSECustomerKey( | ||
new SSECustomerKey(serverSideEncryptionKey) | ||
); | ||
} | ||
} | ||
} | ||
} | ||
|
||
private void setSSES3IfRequired(ObjectMetadata metadata) { | ||
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){ | ||
if(S3AEncryptionMethods.SSE_S3.getMethod() | ||
.equals(serverSideEncryptionAlgorithm) || | ||
(!S3AEncryptionMethods.SSE_KMS.getMethod() | ||
.equals(serverSideEncryptionAlgorithm) && | ||
!S3AEncryptionMethods.SSE_C.getMethod() | ||
.equals(serverSideEncryptionAlgorithm))) { | ||
metadata.setSSEAlgorithm(serverSideEncryptionAlgorithm); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Perform post-write actions. | ||
* @param key key written to | ||
|
@@ -1846,6 +1970,11 @@ public String toString() { | |
.append(serverSideEncryptionAlgorithm) | ||
.append('\''); | ||
} | ||
if (serverSideEncryptionKey != null) { | ||
sb.append(", serverSideEncryptionKey='") | ||
.append(serverSideEncryptionKey) | ||
.append('\''); | ||
} | ||
sb.append(", statistics {") | ||
.append(statistics) | ||
.append("}"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this cut? This is directly referred to in {{TestS3AEncryption}}, a file which this patch doesn't touch. I don't think a clean build of this patch is going to work. Have you tried it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you said, that constant is only used in that test which I did change. I changed it to abstract and created 3 different implementations: one for SSE-S3, SSE-KMS and SSE-C. Basically I'm running all the tests in TestS3AEncryption, but with different encryption algorithms depending on the concrete class.
Yes, it builds and all test pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't pull values out of public interfaces like this, even if its not supported. They'd need to be tagged as deprecated