Skip to content
Merged
5 changes: 2 additions & 3 deletions flink-filesystems/flink-s3-fs-native/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"),
|-----|---------|-------------|
| s3.sse.type | none | Encryption type: `none`, `sse-s3` (AES256), `sse-kms` (AWS KMS) |
| s3.sse.kms.key-id | (none) | KMS key ID/ARN/alias for SSE-KMS (uses default aws/s3 key if not specified) |
| s3.sse.kms.encryption-context | (none) | Encryption context key-value pairs for SSE-KMS. Format: `key1:value1,key2:value2`. Keys/values containing `:` must be quoted. |

### IAM Assume Role

Expand Down Expand Up @@ -128,9 +129,7 @@ This enables IAM policies to restrict access based on context values:
```yaml
s3.sse.type: sse-kms
s3.sse.kms.key-id: alias/my-key
# Configure encryption context as key-value pairs
s3.sse.kms.encryption-context.department: finance
s3.sse.kms.encryption-context.project: budget-reports
s3.sse.kms.encryption-context: {"aws:s3:arn": "arn:aws:s3:::my-bucket/my-file"}
```

With encryption context, you can create IAM policies like:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;

/**
* Factory for creating Native S3 FileSystem instances.
Expand Down Expand Up @@ -190,6 +192,16 @@ public class NativeS3FileSystemFactory implements FileSystemFactory {
+ "Example: 'arn:aws:kms:us-east-1:123456789:key/12345678-1234-1234-1234-123456789abc' "
+ "or 'alias/my-s3-key'");

public static final ConfigOption<Map<String, String>> SSE_KMS_ENCRYPTION_CONTEXT =
ConfigOptions.key("s3.sse.kms.encryption-context")
.mapType()
.noDefaultValue()
.withDescription(
"Encryption context key-value pairs for SSE-KMS. "
+ "Provides additional authenticated data and enables "
+ "fine-grained IAM policy conditions. "
+ "Format: 'key1:value1,key2:value2'");

// IAM Assume Role Configuration
public static final ConfigOption<String> ASSUME_ROLE_ARN =
ConfigOptions.key("s3.assume-role.arn")
Expand Down Expand Up @@ -312,7 +324,11 @@ public FileSystem create(URI fsUri) throws IOException {
boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS);

S3EncryptionConfig encryptionConfig =
S3EncryptionConfig.fromConfig(config.get(SSE_TYPE), config.get(SSE_KMS_KEY_ID));
S3EncryptionConfig.fromConfig(
config.get(SSE_TYPE),
config.get(SSE_KMS_KEY_ID),
config.getOptional(SSE_KMS_ENCRYPTION_CONTEXT)
.orElse(Collections.emptyMap()));
String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION);
int numEntropyChars = -1;
if (entropyInjectionKey != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,14 @@ private void uploadToS3() throws IOException {
if (encryptionConfig.isEnabled()) {
putRequestBuilder.serverSideEncryption(encryptionConfig.getServerSideEncryption());
if (encryptionConfig.getEncryptionType()
== S3EncryptionConfig.EncryptionType.SSE_KMS
&& encryptionConfig.getKmsKeyId() != null) {
putRequestBuilder.ssekmsKeyId(encryptionConfig.getKmsKeyId());
== S3EncryptionConfig.EncryptionType.SSE_KMS) {
if (encryptionConfig.getKmsKeyId() != null) {
putRequestBuilder.ssekmsKeyId(encryptionConfig.getKmsKeyId());
}
if (encryptionConfig.hasEncryptionContext()) {
putRequestBuilder.ssekmsEncryptionContext(
encryptionConfig.serializeEncryptionContext());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -74,7 +75,7 @@ class S3ClientProvider implements AutoCloseableAsync {
private final S3Client s3Client;
private final S3TransferManager transferManager;
private final S3EncryptionConfig encryptionConfig;
@Nullable private final AwsCredentialsProvider credentialsProvider;
private final AwsCredentialsProvider credentialsProvider;
@Nullable private final StsClient stsClient;
private final Duration clientCloseTimeout;
private final Duration connectionTimeout;
Expand All @@ -90,8 +91,8 @@ class S3ClientProvider implements AutoCloseableAsync {
private S3ClientProvider(
S3Client s3Client,
S3TransferManager transferManager,
S3EncryptionConfig encryptionConfig,
@Nullable AwsCredentialsProvider credentialsProvider,
@Nullable S3EncryptionConfig encryptionConfig,
AwsCredentialsProvider credentialsProvider,
@Nullable StsClient stsClient,
Duration clientCloseTimeout,
Duration connectionTimeout,
Expand All @@ -102,16 +103,23 @@ private S3ClientProvider(
boolean checksumValidation,
int maxConnections,
int maxRetries) {
this.s3Client = s3Client;
this.transferManager = transferManager;
this.s3Client = Objects.requireNonNull(s3Client, "s3Client must not be null");
this.transferManager =
Objects.requireNonNull(transferManager, "transferManager must not be null");
this.encryptionConfig =
encryptionConfig != null ? encryptionConfig : S3EncryptionConfig.none();
this.credentialsProvider = credentialsProvider;
this.credentialsProvider =
Objects.requireNonNull(credentialsProvider, "credentialsProvider must not be null");
this.stsClient = stsClient;
this.clientCloseTimeout = clientCloseTimeout;
this.connectionTimeout = connectionTimeout;
this.socketTimeout = socketTimeout;
this.connectionMaxIdleTime = connectionMaxIdleTime;
this.clientCloseTimeout =
Objects.requireNonNull(clientCloseTimeout, "clientCloseTimeout must not be null");
this.connectionTimeout =
Objects.requireNonNull(connectionTimeout, "connectionTimeout must not be null");
this.socketTimeout =
Objects.requireNonNull(socketTimeout, "socketTimeout must not be null");
this.connectionMaxIdleTime =
Objects.requireNonNull(
connectionMaxIdleTime, "connectionMaxIdleTime must not be null");
this.pathStyleAccess = pathStyleAccess;
this.chunkedEncoding = chunkedEncoding;
this.checksumValidation = checksumValidation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
package org.apache.flink.fs.s3native;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.StringUtils;

import software.amazon.awssdk.services.s3.model.ServerSideEncryption;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

/**
* Configuration for S3 server-side encryption (SSE).
Expand Down Expand Up @@ -76,8 +81,9 @@ private S3EncryptionConfig(EncryptionType encryptionType, @Nullable String kmsKe
private S3EncryptionConfig(
EncryptionType encryptionType,
@Nullable String kmsKeyId,
Map<String, String> encryptionContext) {
this.encryptionType = encryptionType;
@Nullable Map<String, String> encryptionContext) {
this.encryptionType =
Objects.requireNonNull(encryptionType, "encryptionType must not be null");
this.kmsKeyId = kmsKeyId;
this.encryptionContext =
encryptionContext != null
Expand All @@ -96,74 +102,49 @@ public static S3EncryptionConfig sseS3() {
}

/**
* Creates a config for SSE-KMS encryption with the default KMS key.
*
* <p>Uses the AWS-managed KMS key (aws/s3) for the S3 bucket.
*/
public static S3EncryptionConfig sseKms() {
return new S3EncryptionConfig(EncryptionType.SSE_KMS, null);
}

/**
* Creates a config for SSE-KMS encryption with a specific KMS key.
*
* @param kmsKeyId The KMS key ID, ARN, or alias (e.g., "arn:aws:kms:region:account:key/key-id"
* or "alias/my-key")
*/
public static S3EncryptionConfig sseKms(String kmsKeyId) {
return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId);
}

/**
* Creates a config for SSE-KMS encryption with a specific KMS key and encryption context.
* Creates a config for SSE-KMS encryption.
*
* <p>The encryption context is a set of key-value pairs that:
*
* <ul>
* <li>Provides additional authenticated data (AAD) for encryption
* <li>Can be used in IAM policy conditions for fine-grained access control
* <li>Is logged in AWS CloudTrail for auditing
* </ul>
*
* <p>Example: You might include context like {"department": "finance", "project": "budget"} to
* restrict which principals can encrypt/decrypt based on these values.
*
* @param kmsKeyId The KMS key ID, ARN, or alias
* @param encryptionContext The encryption context key-value pairs
* @param kmsKeyId The KMS key ID, ARN, or alias; null uses the AWS-managed default key
* @param encryptionContext Optional key-value pairs for IAM policy conditions and CloudTrail
* auditing; null or empty means no context
* @see <a href="https://docs.aws.amazon.com/kms/latest/developerguide/encrypt_context.html">AWS
* KMS Encryption Context</a>
*/
public static S3EncryptionConfig sseKms(
String kmsKeyId, Map<String, String> encryptionContext) {
return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId, encryptionContext);
@Nullable String kmsKeyId, @Nullable Map<String, String> encryptionContext) {
return new S3EncryptionConfig(
EncryptionType.SSE_KMS,
StringUtils.isNullOrWhitespaceOnly(kmsKeyId) ? null : kmsKeyId,
encryptionContext);
}

/**
* Creates an encryption config from configuration strings.
*
* @param encryptionTypeStr The encryption type: "none", "sse-s3", "sse-kms", or "SSE_S3",
* "SSE_KMS"
* @param encryptionTypeStr The encryption type: "none", "sse-s3", "sse-kms", "aws:kms",
* "aes256" (case-insensitive)
* @param kmsKeyId The KMS key ID (required for SSE-KMS, ignored for other types)
* @return The encryption configuration
* @throws IllegalArgumentException if the encryption type is invalid
*/
public static S3EncryptionConfig fromConfig(
@Nullable String encryptionTypeStr, @Nullable String kmsKeyId) {
if (encryptionTypeStr == null
|| encryptionTypeStr.isEmpty()
@Nullable String encryptionTypeStr,
@Nullable String kmsKeyId,
Comment thread
gaborgsomogyi marked this conversation as resolved.
Map<String, String> encryptionContext) {
if (StringUtils.isNullOrWhitespaceOnly(encryptionTypeStr)
|| "none".equalsIgnoreCase(encryptionTypeStr)) {
return none();
}

String normalizedType = encryptionTypeStr.toUpperCase().replace("-", "_").replace(":", "_");
String normalizedType = encryptionTypeStr.toLowerCase(Locale.ROOT);

switch (normalizedType) {
case "SSE_S3":
case "AES256":
case "sse-s3":
case "aes256":
return sseS3();
case "SSE_KMS":
case "AWS_KMS":
return kmsKeyId != null && !kmsKeyId.isEmpty() ? sseKms(kmsKeyId) : sseKms();
Comment thread
gaborgsomogyi marked this conversation as resolved.
case "sse-kms":
case "aws:kms":
return sseKms(kmsKeyId, encryptionContext);
default:
throw new IllegalArgumentException(
"Unknown encryption type: "
Expand Down Expand Up @@ -220,6 +201,28 @@ public ServerSideEncryption getServerSideEncryption() {
}
}

public String serializeEncryptionContext() {
StringBuilder json = new StringBuilder("{");
boolean first = true;
for (Map.Entry<String, String> entry : encryptionContext.entrySet()) {
if (!first) {
json.append(",");
}
json.append("\"")
.append(escapeJson(entry.getKey()))
.append("\":\"")
.append(escapeJson(entry.getValue()))
.append("\"");
first = false;
}
json.append("}");
return Base64.getEncoder().encodeToString(json.toString().getBytes(StandardCharsets.UTF_8));
}

private String escapeJson(String value) {
return value.replace("\\", "\\\\").replace("\"", "\\\"");
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder("S3EncryptionConfig{type=").append(encryptionType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
* <ul>
* <li>SSE-C (customer-provided keys) via a KeyProvider interface
* <li>Client-side encryption via an EncryptionHandler interface
* <li>Encryption context for SSE-KMS (see HADOOP-19197)
* </ul>
*
* <p><b>S3 URI Handling:</b> The {@link #extractKey(Path)} and {@link #extractBucketName(Path)}
Expand Down Expand Up @@ -155,7 +154,7 @@ private void applyEncryption(CreateMultipartUploadRequest.Builder requestBuilder
}
if (encryptionConfig.hasEncryptionContext()) {
requestBuilder.ssekmsEncryptionContext(
serializeEncryptionContext(encryptionConfig.getEncryptionContext()));
encryptionConfig.serializeEncryptionContext());
}
}
}
Expand All @@ -171,36 +170,11 @@ private void applyEncryption(PutObjectRequest.Builder requestBuilder) {
}
if (encryptionConfig.hasEncryptionContext()) {
requestBuilder.ssekmsEncryptionContext(
serializeEncryptionContext(encryptionConfig.getEncryptionContext()));
encryptionConfig.serializeEncryptionContext());
}
}
}

/**
* Serializes the encryption context map to a Base64-encoded JSON string as required by S3 API.
*/
private String serializeEncryptionContext(java.util.Map<String, String> context) {
StringBuilder json = new StringBuilder("{");
boolean first = true;
for (java.util.Map.Entry<String, String> entry : context.entrySet()) {
if (!first) {
json.append(",");
}
json.append("\"")
.append(escapeJson(entry.getKey()))
.append("\":\"")
.append(escapeJson(entry.getValue()))
.append("\"");
first = false;
}
json.append("}");
return java.util.Base64.getEncoder().encodeToString(json.toString().getBytes());
}

private String escapeJson(String value) {
return value.replace("\\", "\\\\").replace("\"", "\\\"");
}

public UploadPartResult uploadPart(
String key, String uploadId, int partNumber, File inputFile, long length)
throws IOException {
Expand Down Expand Up @@ -275,9 +249,8 @@ private PutObjectResult putObjectViaTransferManager(String key, File inputFile)
}
if (encryptionConfig.hasEncryptionContext()) {
req.ssekmsEncryptionContext(
serializeEncryptionContext(
encryptionConfig
.getEncryptionContext()));
encryptionConfig
.serializeEncryptionContext());
}
}
}
Expand Down
Loading