Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
<exclude>**/TestS3AFastOutputStream.java</exclude>
<exclude>**/TestS3AFileSystemContract.java</exclude>
<exclude>**/TestS3AMiniYarnCluster.java</exclude>
<exclude>**/TestS3AEncryptionSSEC.java</exclude>
<exclude>**/Test*Root*.java</exclude>
</excludes>
</configuration>
Expand All @@ -154,6 +155,7 @@
<include>**/TestS3AFastOutputStream.java</include>
<include>**/TestS3AFileSystemContract.java</include>
<include>**/TestS3AMiniYarnCluster.java</include>
<include>**/TestS3AEncryptionSSEC.java</include>
<include>**/Test*Root*.java</include>
</includes>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,8 @@ private Constants() {
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
"fs.s3a.server-side-encryption-algorithm";

/**
* The standard encryption algorithm AWS supports.
* Different implementations may support others (or none).
*/
public static final String SERVER_SIDE_ENCRYPTION_AES256 =
"AES256";
public static final String SERVER_SIDE_ENCRYPTION_KEY =
"fs.s3a.server-side-encryption-key";

//override signature algorithm used for signing requests
public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

/**
* This enum is to centralize the encryption methods and
* the value required in the configuration.
*/
public enum S3AEncryptionMethods {

SSE_S3("AES256"),
SSE_KMS("SSE-KMS"),
SSE_C("SSE-C");

private String method;

S3AEncryptionMethods(String method) {
this.method = method;
}

public String getMethod() {
return method;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ private MultiPartUpload initiateMultiPartUpload() throws IOException {
new InitiateMultipartUploadRequest(bucket,
key,
createDefaultMetadata());
fs.setSSEKMSOrCIfRequired(initiateMPURequest);
initiateMPURequest.setCannedACL(cannedACL);
try {
return new MultiPartUpload(
Expand All @@ -295,6 +296,7 @@ private void putObject() throws IOException {
fs.newPutObjectRequest(key,
om,
new ByteArrayInputStream(buffer.toByteArray()));
fs.setSSEKMSOrCIfRequired(putObjectRequest);
putObjectRequest.setGeneralProgressListener(progressListener);
ListenableFuture<PutObjectResult> putObjectResult =
executorService.submit(new Callable<PutObjectResult>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.model.SSECustomerKey;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.Copy;
Expand Down Expand Up @@ -123,6 +127,7 @@ public class S3AFileSystem extends FileSystem {
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm;
private String serverSideEncryptionKey;
private S3AInstrumentation instrumentation;
private S3AStorageStatistics storageStatistics;
private long readAhead;
Expand Down Expand Up @@ -228,6 +233,8 @@ public StorageStatistics provide() {

serverSideEncryptionAlgorithm =
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
serverSideEncryptionKey = conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY);

inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
} catch (AmazonClientException e) {
Expand Down Expand Up @@ -559,9 +566,19 @@ public FSDataInputStream open(Path f, int bufferSize)
+ " because it is a directory");
}

return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
fileStatus.getLen(), s3, statistics, instrumentation, readAhead,
inputPolicy));
return new FSDataInputStream(
new S3AInputStream(
new S3ObjectAttributes(
bucket,
pathToKey(f),
serverSideEncryptionAlgorithm,
serverSideEncryptionKey),
fileStatus.getLen(),
s3,
statistics,
instrumentation,
readAhead,
inputPolicy));
}

/**
Expand Down Expand Up @@ -832,7 +849,10 @@ protected void incrementStatistic(Statistic statistic, long count) {
*/
protected ObjectMetadata getObjectMetadata(String key) {
incrementStatistic(OBJECT_METADATA_REQUESTS);
ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
GetObjectMetadataRequest request =
new GetObjectMetadataRequest(bucket, key);
setSSECIfRequired(request);
ObjectMetadata meta = s3.getObjectMetadata(request);
incrementReadOperations();
return meta;
}
Expand Down Expand Up @@ -912,6 +932,7 @@ public PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, File srcfile) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
srcfile);
setSSEKMSOrCIfRequired(putObjectRequest);
putObjectRequest.setCannedAcl(cannedACL);
putObjectRequest.setMetadata(metadata);
return putObjectRequest;
Expand All @@ -930,6 +951,7 @@ PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, InputStream inputStream) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
inputStream, metadata);
setSSEKMSOrCIfRequired(putObjectRequest);
putObjectRequest.setCannedAcl(cannedACL);
return putObjectRequest;
}
Expand All @@ -942,9 +964,7 @@ PutObjectRequest newPutObjectRequest(String key,
*/
public ObjectMetadata newObjectMetadata() {
final ObjectMetadata om = new ObjectMetadata();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
setSSES3IfRequired(om);
return om;
}

Expand Down Expand Up @@ -1576,11 +1596,10 @@ private void copyFile(String srcKey, String dstKey, long size)
try {
ObjectMetadata srcom = getObjectMetadata(srcKey);
ObjectMetadata dstom = cloneObjectMetadata(srcom);
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
setSSES3IfRequired(dstom);
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
setSSEKMSOrCIfRequired(copyObjectRequest);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);

Expand Down Expand Up @@ -1612,6 +1631,111 @@ public void progressChanged(ProgressEvent progressEvent) {
}
}

protected void setSSEKMSOrCIfRequired(InitiateMultipartUploadRequest req) {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
if(S3AEncryptionMethods.SSE_KMS.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
//Use specified key
req.setSSEAwsKeyManagementParams(
new SSEAwsKeyManagementParams(serverSideEncryptionKey)
);
}else{
//Use default key
req.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams());
}
}else if(S3AEncryptionMethods.SSE_C.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
//at the moment, only supports copy using the same key
req.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
}
}
}
}


protected void setSSEKMSOrCIfRequired(CopyObjectRequest copyObjectRequest) {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
if(S3AEncryptionMethods.SSE_KMS.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
//Use specified key
copyObjectRequest.setSSEAwsKeyManagementParams(
new SSEAwsKeyManagementParams(serverSideEncryptionKey)
);
}else{
//Use default key
copyObjectRequest.setSSEAwsKeyManagementParams(
new SSEAwsKeyManagementParams()
);
}
}else if(S3AEncryptionMethods.SSE_C.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
//at the moment, only supports copy using the same key
copyObjectRequest.setSourceSSECustomerKey(
new SSECustomerKey(serverSideEncryptionKey)
);
copyObjectRequest.setDestinationSSECustomerKey(
new SSECustomerKey(serverSideEncryptionKey)
);
}
}
}
}

protected void setSSECIfRequired(GetObjectMetadataRequest request) {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
if(S3AEncryptionMethods.SSE_C.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
//at the moment, only supports copy using the same key
request.setSSECustomerKey(
new SSECustomerKey(serverSideEncryptionKey)
);
}
}
}
}

protected void setSSEKMSOrCIfRequired(PutObjectRequest request) {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
if(S3AEncryptionMethods.SSE_KMS.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
request.setSSEAwsKeyManagementParams(
new SSEAwsKeyManagementParams(serverSideEncryptionKey)
);
}else{
request.setSSEAwsKeyManagementParams(
new SSEAwsKeyManagementParams()
);
}
}else if(S3AEncryptionMethods.SSE_C.getMethod()
.equals(serverSideEncryptionAlgorithm)) {
if (StringUtils.isNotBlank(serverSideEncryptionKey)) {
request.setSSECustomerKey(
new SSECustomerKey(serverSideEncryptionKey)
);
}
}
}
}

private void setSSES3IfRequired(ObjectMetadata metadata) {
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)){
if(S3AEncryptionMethods.SSE_S3.getMethod()
.equals(serverSideEncryptionAlgorithm) ||
(!S3AEncryptionMethods.SSE_KMS.getMethod()
.equals(serverSideEncryptionAlgorithm) &&
!S3AEncryptionMethods.SSE_C.getMethod()
.equals(serverSideEncryptionAlgorithm))) {
metadata.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
}
}

/**
* Perform post-write actions.
* @param key key written to
Expand Down Expand Up @@ -1784,6 +1908,11 @@ public String toString() {
.append(serverSideEncryptionAlgorithm)
.append('\'');
}
if (serverSideEncryptionKey != null) {
sb.append(", serverSideEncryptionKey='")
.append(serverSideEncryptionKey)
.append('\'');
}
sb.append(", statistics {")
.append(statistics)
.append("}");
Expand Down
Loading