Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,18 @@
<property>
<name>fs.s3a.server-side-encryption-algorithm</name>
<description>Specify a server-side encryption algorithm for s3a: file system.
Unset by default, and the only other currently allowable value is AES256.
Unset by default. It supports the following values: 'AES256' (for SSE-S3),
'SSE-KMS' and 'SSE-C'.
</description>
</property>

<property>
<name>fs.s3a.server-side-encryption-key</name>
<description>Specific encryption key to use if fs.s3a.server-side-encryption-algorithm
has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property
should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty,
you'll be using your default's S3 KMS key, otherwise you should set this property to
the specific KMS key id.
</description>
</property>

Expand Down
2 changes: 2 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
<exclude>**/ITestJets3tNativeS3FileSystemContract.java</exclude>
<exclude>**/ITest*Root*.java</exclude>
<exclude>**/ITestS3AFileContextStatistics.java</exclude>
<exclude>**/ITestS3AEncryptionSSE*.java</exclude>
<include>**/ITestS3AHuge*.java</include>
</excludes>
</configuration>
Expand Down Expand Up @@ -211,6 +212,7 @@
<include>**/ITest*Root*.java</include>
<include>**/ITestS3AFileContextStatistics.java</include>
<include>**/ITestS3AHuge*.java</include>
<include>**/ITestS3AEncryptionSSE*.java</include>
</includes>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,28 @@ private Constants() {
"fs.s3a.multipart.purge.age";
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;

// s3 server-side encryption
// s3 server-side encryption, see S3AEncryptionMethods for valid options
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
"fs.s3a.server-side-encryption-algorithm";

/**
* The standard encryption algorithm AWS supports.
* Different implementations may support others (or none).
* Use the S3AEncryptionMethods instead when configuring
* which Server Side Encryption to use.
*/
@Deprecated
public static final String SERVER_SIDE_ENCRYPTION_AES256 =
"AES256";

/**
* Used to specify which AWS KMS key to use if
* SERVER_SIDE_ENCRYPTION_ALGORITHM is AWS_KMS (will default to aws/s3
* master key if left blank) or with SSE_C, the actual AES 256 key.
*/
public static final String SERVER_SIDE_ENCRYPTION_KEY =
"fs.s3a.server-side-encryption-key";

//override signature algorithm used for signing requests
public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm";

Expand Down Expand Up @@ -296,4 +307,13 @@ private Constants() {
*/
@InterfaceAudience.Private
public static final int MAX_MULTIPART_COUNT = 10000;

@InterfaceAudience.Private
public static final String SSE_C_NO_KEY_ERROR = S3AEncryptionMethods.SSE_C
.getMethod() +" is enabled and no encryption key is provided.";


@InterfaceAudience.Private
public static final String SSE_S3_WITH_KEY_ERROR = S3AEncryptionMethods.SSE_S3
.getMethod() +" is configured and an " + "encryption key is provided";
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ private void putObject() throws IOException {
writeOperationHelper.newPutRequest(
block.startUpload(),
size);
fs.setOptionalPutRequestParameters(putObjectRequest);
long transferQueueTime = now();
BlockUploadProgress callback =
new BlockUploadProgress(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import java.io.IOException;

import org.apache.commons.lang3.StringUtils;

/**
* This enum is to centralize the encryption methods and
* the value required in the configuration.
*/
public enum S3AEncryptionMethods {

SSE_S3("AES256"),
SSE_KMS("SSE-KMS"),
SSE_C("SSE-C"),
NONE("");

private String method;

S3AEncryptionMethods(String method) {
this.method = method;
}

public String getMethod() {
return method;
}

public static S3AEncryptionMethods getMethod(String name) throws IOException {
if(StringUtils.isBlank(name)) {
return NONE;
}
switch(name) {
case "AES256":
return SSE_S3;
case "SSE-KMS":
return SSE_KMS;
case "SSE-C":
return SSE_C;
default:
throw new IOException("Unknown Server Side algorithm "+name);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
Expand All @@ -51,6 +52,8 @@
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.model.SSECustomerKey;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.Copy;
Expand Down Expand Up @@ -135,7 +138,7 @@ public class S3AFileSystem extends FileSystem {
LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress");
private LocalDirAllocator directoryAllocator;
private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm;
private S3AEncryptionMethods serverSideEncryptionAlgorithm;
private S3AInstrumentation instrumentation;
private S3AStorageStatistics storageStatistics;
private long readAhead;
Expand Down Expand Up @@ -227,8 +230,17 @@ public StorageStatistics provide() {

initMultipartUploads(conf);

serverSideEncryptionAlgorithm =
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
serverSideEncryptionAlgorithm = S3AEncryptionMethods.getMethod(
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM));
if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
StringUtils.isBlank(getServerSideEncryptionKey(getConf()))) {
throw new IOException(Constants.SSE_C_NO_KEY_ERROR);
}
if(S3AEncryptionMethods.SSE_S3.equals(serverSideEncryptionAlgorithm) &&
StringUtils.isNotBlank(getServerSideEncryptionKey(
getConf()))) {
throw new IOException(Constants.SSE_S3_WITH_KEY_ERROR);
}
LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm);
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
Expand Down Expand Up @@ -514,9 +526,18 @@ public FSDataInputStream open(Path f, int bufferSize)
+ " because it is a directory");
}

return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
fileStatus.getLen(), s3, statistics, instrumentation, readAhead,
inputPolicy));
return new FSDataInputStream(
new S3AInputStream(new S3ObjectAttributes(
bucket,
pathToKey(f),
serverSideEncryptionAlgorithm,
getServerSideEncryptionKey(getConf())),
fileStatus.getLen(),
s3,
statistics,
instrumentation,
readAhead,
inputPolicy));
}

/**
Expand Down Expand Up @@ -892,7 +913,14 @@ public S3AStorageStatistics getStorageStatistics() {
*/
protected ObjectMetadata getObjectMetadata(String key) {
incrementStatistic(OBJECT_METADATA_REQUESTS);
ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
GetObjectMetadataRequest request =
new GetObjectMetadataRequest(bucket, key);
//SSE-C requires to be filled in if enabled for object metadata
if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))){
request.setSSECustomerKey(generateSSECustomerKey());
}
ObjectMetadata meta = s3.getObjectMetadata(request);
incrementReadOperations();
return meta;
}
Expand Down Expand Up @@ -986,6 +1014,7 @@ public PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, File srcfile) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
srcfile);
setOptionalPutRequestParameters(putObjectRequest);
putObjectRequest.setCannedAcl(cannedACL);
putObjectRequest.setMetadata(metadata);
return putObjectRequest;
Expand All @@ -1004,6 +1033,7 @@ PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, InputStream inputStream) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
inputStream, metadata);
setOptionalPutRequestParameters(putObjectRequest);
putObjectRequest.setCannedAcl(cannedACL);
return putObjectRequest;
}
Expand All @@ -1016,9 +1046,7 @@ PutObjectRequest newPutObjectRequest(String key,
*/
public ObjectMetadata newObjectMetadata() {
final ObjectMetadata om = new ObjectMetadata();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
setOptionalObjectMetadata(om);
return om;
}

Expand Down Expand Up @@ -1752,11 +1780,10 @@ private void copyFile(String srcKey, String dstKey, long size)
try {
ObjectMetadata srcom = getObjectMetadata(srcKey);
ObjectMetadata dstom = cloneObjectMetadata(srcom);
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
setOptionalObjectMetadata(dstom);
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
setOptionalCopyObjectRequestParameters(copyObjectRequest);
copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom);

Expand Down Expand Up @@ -1788,6 +1815,83 @@ public void progressChanged(ProgressEvent progressEvent) {
}
}

protected void setOptionalMultipartUploadRequestParameters(
InitiateMultipartUploadRequest req) {
switch (serverSideEncryptionAlgorithm) {
case SSE_KMS:
req.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
break;
case SSE_C:
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
//at the moment, only supports copy using the same key
req.setSSECustomerKey(generateSSECustomerKey());
}
break;
default:
}
}


protected void setOptionalCopyObjectRequestParameters(
CopyObjectRequest copyObjectRequest) throws IOException {
switch (serverSideEncryptionAlgorithm) {
case SSE_KMS:
copyObjectRequest.setSSEAwsKeyManagementParams(
generateSSEAwsKeyParams()
);
break;
case SSE_C:
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
//at the moment, only supports copy using the same key
SSECustomerKey customerKey = generateSSECustomerKey();
copyObjectRequest.setSourceSSECustomerKey(customerKey);
copyObjectRequest.setDestinationSSECustomerKey(customerKey);
}
break;
default:
}
}

protected void setOptionalPutRequestParameters(PutObjectRequest request) {
switch (serverSideEncryptionAlgorithm) {
case SSE_KMS:
request.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
break;
case SSE_C:
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
request.setSSECustomerKey(generateSSECustomerKey());
}
break;
default:
}
}

private void setOptionalObjectMetadata(ObjectMetadata metadata) {
if (S3AEncryptionMethods.SSE_S3.equals(serverSideEncryptionAlgorithm)) {
metadata.setSSEAlgorithm(serverSideEncryptionAlgorithm.getMethod());
}
}

private SSEAwsKeyManagementParams generateSSEAwsKeyParams() {
//Use specified key, otherwise default to default master aws/s3 key by AWS
SSEAwsKeyManagementParams sseAwsKeyManagementParams =
new SSEAwsKeyManagementParams();
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
sseAwsKeyManagementParams =
new SSEAwsKeyManagementParams(
getServerSideEncryptionKey(getConf())
);
}
return sseAwsKeyManagementParams;
}

private SSECustomerKey generateSSECustomerKey() {
SSECustomerKey customerKey = new SSECustomerKey(
getServerSideEncryptionKey(getConf())
);
return customerKey;
}

/**
* Perform post-write actions.
* @param key key written to
Expand Down Expand Up @@ -2240,6 +2344,7 @@ String initiateMultiPartUpload() throws IOException {
key,
newObjectMetadata(-1));
initiateMPURequest.setCannedACL(cannedACL);
setOptionalMultipartUploadRequestParameters(initiateMPURequest);
try {
return s3.initiateMultipartUpload(initiateMPURequest)
.getUploadId();
Expand Down
Loading