Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti

### New Features

- Updated catalogs creation to include AWS kms key ,as an extra param in the storage config info, to be used in S3 data encryption
- Added a finer grained authorization model for UpdateTable requests. Existing privileges continue to work for granting UpdateTable, such as `TABLE_WRITE_PROPERTIES`.
However, you can now instead grant privileges just for specific operations, such as `TABLE_ADD_SNAPSHOT`
- Added a Management API endpoint to reset principal credentials, controlled by the `ENABLE_CREDENTIAL_RESET` (default: true) feature flag.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ private StorageConfigInfo getStorageInfo(Map<String, String> internalProperties)
.setRoleArn(awsConfig.getRoleARN())
.setExternalId(awsConfig.getExternalId())
.setUserArn(awsConfig.getUserARN())
.setKmsKeyArn(awsConfig.getKmsKeyArn())
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setAllowedLocations(awsConfig.getAllowedLocations())
.setRegion(awsConfig.getRegion())
Expand Down Expand Up @@ -308,6 +309,7 @@ public Builder setStorageConfigurationInfo(
AwsStorageConfigurationInfo.builder()
.allowedLocations(allowedLocations)
.roleARN(awsConfigModel.getRoleArn())
.kmsKeyArn(awsConfigModel.getKmsKeyArn())
.externalId(awsConfigModel.getExternalId())
.region(awsConfigModel.getRegion())
.endpoint(awsConfigModel.getEndpoint())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1598,7 +1598,8 @@ private void revokeGrantRecord(
boolean allowListOperation,
@Nonnull Set<String> allowedReadLocations,
@Nonnull Set<String> allowedWriteLocations,
Optional<String> refreshCredentialsEndpoint) {
Optional<String> refreshCredentialsEndpoint,
Map props) {

// get meta store session we should be using
BasePersistence ms = callCtx.getMetaStore();
Expand Down Expand Up @@ -1639,7 +1640,8 @@ private void revokeGrantRecord(
allowListOperation,
allowedReadLocations,
allowedWriteLocations,
refreshCredentialsEndpoint);
refreshCredentialsEndpoint,
props);
return new ScopedCredentialsResult(accessConfig);
} catch (Exception ex) {
return new ScopedCredentialsResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ public ScopedCredentialsResult getSubscopedCredsForEntity(
boolean allowListOperation,
@Nonnull Set<String> allowedReadLocations,
@Nonnull Set<String> allowedWriteLocations,
Optional<String> refreshCredentialsEndpoint) {
Optional<String> refreshCredentialsEndpoint,
Map props) {
return delegate.getSubscopedCredsForEntity(
callCtx,
catalogId,
Expand All @@ -355,7 +356,8 @@ public ScopedCredentialsResult getSubscopedCredsForEntity(
allowListOperation,
allowedReadLocations,
allowedWriteLocations,
refreshCredentialsEndpoint);
refreshCredentialsEndpoint,
props);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2095,7 +2095,8 @@ private PolarisEntityResolver resolveSecurableToRoleGrant(
boolean allowListOperation,
@Nonnull Set<String> allowedReadLocations,
@Nonnull Set<String> allowedWriteLocations,
Optional<String> refreshCredentialsEndpoint) {
Optional<String> refreshCredentialsEndpoint,
Map props) {

// get meta store session we should be using
TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore());
Expand Down Expand Up @@ -2131,7 +2132,8 @@ private PolarisEntityResolver resolveSecurableToRoleGrant(
allowListOperation,
allowedReadLocations,
allowedWriteLocations,
refreshCredentialsEndpoint);
refreshCredentialsEndpoint,
props);
return new ScopedCredentialsResult(accessConfig);
} catch (Exception ex) {
return new ScopedCredentialsResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.polaris.core.storage;

import jakarta.annotation.Nonnull;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.polaris.core.PolarisCallContext;
Expand Down Expand Up @@ -53,5 +54,6 @@ ScopedCredentialsResult getSubscopedCredsForEntity(
boolean allowListOperation,
@Nonnull Set<String> allowedReadLocations,
@Nonnull Set<String> allowedWriteLocations,
Optional<String> refreshCredentialsEndpoint);
Optional<String> refreshCredentialsEndpoint,
Map props);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public abstract AccessConfig getSubscopedCreds(
boolean allowListOperation,
@Nonnull Set<String> allowedReadLocations,
@Nonnull Set<String> allowedWriteLocations,
Optional<String> refreshCredentialsEndpoint);
Optional<String> refreshCredentialsEndpoint,
Map props);

/**
* Validate access for the provided operation actions and locations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.polaris.core.storage.StorageAccessProperty;
import org.apache.polaris.core.storage.StorageUtil;
import org.apache.polaris.core.storage.aws.StsClientProvider.StsDestination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
import software.amazon.awssdk.policybuilder.iam.IamEffect;
Expand All @@ -49,6 +51,9 @@ public class AwsCredentialsStorageIntegration
private final StsClientProvider stsClientProvider;
private final Optional<AwsCredentialsProvider> credentialsProvider;

private static final Logger LOGGER =
LoggerFactory.getLogger(AwsCredentialsStorageIntegration.class);

public AwsCredentialsStorageIntegration(
AwsStorageConfigurationInfo config, StsClient fixedClient) {
this(config, (destination) -> fixedClient);
Expand All @@ -75,11 +80,15 @@ public AccessConfig getSubscopedCreds(
boolean allowListOperation,
@Nonnull Set<String> allowedReadLocations,
@Nonnull Set<String> allowedWriteLocations,
Optional<String> refreshCredentialsEndpoint) {
Optional<String> refreshCredentialsEndpoint,
Map props) {
LOGGER.info("Getting subscoped creds props: {}", props);
String kmsKey = props.get("s3.sse.key") != null ? props.get("s3.sse.key").toString() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think that "storage integration" code should work independently of table properties. In other words Polaris should own storage integration and AccessConfig. Table properties are settable by the end users outside the context of a catalog (e.g. via registerTable). Mixing both sources of integration config is confusing IMHO.

Is the intention here to support different KMS config per table?

Polaris may need to be able to get AccessConfig in order to load table metadata (which has table properties)... Does this not create a chicken-and-egg problem?

That said, I'm open to a wider discussion about this on the dev ML.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is to support the usage of the s3 properties defined in iceberg, we don't really need to pass down props or have that code in there as long as we can agree on the code https://github.com/apache/polaris/pull/2802/files#diff-d305f7a426a7690c576722c114257792b3fcee726624655d15893b71499827f8R274.
If no kms key was defined in the polaris AWS storage then we allow the usage of any keys owned by that account.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we discuss arnKeyAll (and I do not have any objections there), I think we need to resolve the conceptual issue of using metadata properties for generating AccessConfig. Polaris may need AccessConfig in order to read metadata JSON files (which have table properties)... How can this work in principle?

int storageCredentialDurationSeconds =
realmConfig.getConfig(STORAGE_CREDENTIAL_DURATION_SECONDS);
AwsStorageConfigurationInfo storageConfig = config();
String region = storageConfig.getRegion();
String accountId = storageConfig.getAwsAccountId();
AccessConfig.Builder accessConfig = AccessConfig.builder();

if (shouldUseSts(storageConfig)) {
Expand All @@ -90,10 +99,13 @@ public AccessConfig getSubscopedCreds(
.roleSessionName("PolarisAwsCredentialsStorageIntegration")
.policy(
policyString(
storageConfig.getAwsPartition(),
storageConfig,
allowListOperation,
allowedReadLocations,
allowedWriteLocations)
allowedWriteLocations,
kmsKey,
region,
accountId)
.toJson())
.durationSeconds(storageCredentialDurationSeconds);
credentialsProvider.ifPresent(
Expand Down Expand Up @@ -163,12 +175,14 @@ private boolean shouldUseSts(AwsStorageConfigurationInfo storageConfig) {
* ListBucket privileges with no resources. This prevents us from sending an empty policy to AWS
* and just assuming the role with full privileges.
*/
// TODO - add KMS key access
private IamPolicy policyString(
String awsPartition,
AwsStorageConfigurationInfo storageConfigurationInfo,
boolean allowList,
Set<String> readLocations,
Set<String> writeLocations) {
Set<String> writeLocations,
String kmsKey,
String region,
String accountId) {
IamPolicy.Builder policyBuilder = IamPolicy.builder();
IamStatement.Builder allowGetObjectStatementBuilder =
IamStatement.builder()
Expand All @@ -178,7 +192,9 @@ private IamPolicy policyString(
Map<String, IamStatement.Builder> bucketListStatementBuilder = new HashMap<>();
Map<String, IamStatement.Builder> bucketGetLocationStatementBuilder = new HashMap<>();

String arnPrefix = arnPrefixForPartition(awsPartition);
String arnPrefix = arnPrefixForPartition(storageConfigurationInfo.getAwsPartition());
String kmsKeyArn = storageConfigurationInfo.getKmsKeyArn();
Copy link
Contributor

@dimas-b dimas-b Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can / will this work with MinIO KMS? Cf. https://github.com/minio/minio/blob/master/docs/kms/README.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will need to test that.

Copy link
Contributor

@dimas-b dimas-b Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine either way, but if it did not work with MinIO it would be good to mention AWS (only) in the PR description.

Also, I'm pretty sure we'll have to support KMS in MinIO at some point, so if current changes were reusable it would be great (but not required).


Stream.concat(readLocations.stream(), writeLocations.stream())
.distinct()
.forEach(
Expand Down Expand Up @@ -225,6 +241,9 @@ private IamPolicy policyString(
arnPrefix + StorageUtil.concatFilePrefixes(parseS3Path(uri), "*", "/")));
});
policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
addKmsKeyPolicy(kmsKey, kmsKeyArn, policyBuilder, true, region, accountId);
} else {
addKmsKeyPolicy(kmsKey, kmsKeyArn, policyBuilder, false, region, accountId);
}
if (!bucketListStatementBuilder.isEmpty()) {
bucketListStatementBuilder
Expand All @@ -239,7 +258,45 @@ private IamPolicy policyString(
bucketGetLocationStatementBuilder
.values()
.forEach(statementBuilder -> policyBuilder.addStatement(statementBuilder.build()));
return policyBuilder.addStatement(allowGetObjectStatementBuilder.build()).build();
var r = policyBuilder.addStatement(allowGetObjectStatementBuilder.build()).build();
LOGGER.info("Policies {}", r);
return r;
}

private static void addKmsKeyPolicy(
String kmsKeyArnOverride,
String kmsKeyArn,
IamPolicy.Builder policyBuilder,
boolean canEncrypt,
String region,
String accountId) {
if (kmsKeyArn == null && kmsKeyArnOverride == null) {
kmsKeyArn = arnKeyAll(region, accountId);
}
IamStatement.Builder allowKms =
IamStatement.builder()
.effect(IamEffect.ALLOW)
.addAction("kms:GenerateDataKeyWithoutPlaintext")
.addAction("kms:DescribeKey")
.addAction("kms:Decrypt")
.addAction("kms:GenerateDataKey");

if (canEncrypt) {
allowKms.addAction("kms:Encrypt");
}
if (kmsKeyArnOverride != null) {
LOGGER.info("Adding KMS key policy for key {}", kmsKeyArnOverride);
allowKms.addResource(IamResource.create(kmsKeyArnOverride));
}
if (kmsKeyArn != null) {
LOGGER.info("Adding KMS key policy for key {}", kmsKeyArn);
allowKms.addResource(IamResource.create(kmsKeyArn));
}
policyBuilder.addStatement(allowKms.build());
}

private static String arnKeyAll(String region, String accountId) {
return String.format("arn:aws:kms:%s:%s:key/*", region, accountId);
}

private static String arnPrefixForPartition(String awsPartition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public String getFileIoImplClassName() {
@Nullable
public abstract String getRoleARN();

/** KMS Key ARN for server-side encryption, optional */
@Nullable
public abstract String getKmsKeyArn();

/** AWS external ID, optional */
@Nullable
public abstract String getExternalId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -78,7 +79,8 @@ public AccessConfig getSubscopedCreds(
boolean allowListOperation,
@Nonnull Set<String> allowedReadLocations,
@Nonnull Set<String> allowedWriteLocations,
Optional<String> refreshCredentialsEndpoint) {
Optional<String> refreshCredentialsEndpoint,
Map props) {
String loc =
!allowedWriteLocations.isEmpty()
? allowedWriteLocations.stream().findAny().orElse(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public AccessConfig getOrGenerateSubScopeCreds(
boolean allowListOperation,
@Nonnull Set<String> allowedReadLocations,
@Nonnull Set<String> allowedWriteLocations,
Optional<String> refreshCredentialsEndpoint) {
Optional<String> refreshCredentialsEndpoint,
Map props) {
if (!isTypeSupported(polarisEntity.getType())) {
diagnostics.fail(
"entity_type_not_suppported_to_scope_creds", "type={}", polarisEntity.getType());
Expand All @@ -136,7 +137,8 @@ public AccessConfig getOrGenerateSubScopeCreds(
k.allowedListAction(),
k.allowedReadLocations(),
k.allowedWriteLocations(),
k.refreshCredentialsEndpoint());
k.refreshCredentialsEndpoint(),
props);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If prop is an argument to generating AccessConfig is should be part of the cache key.

if (scopedCredentialsResult.isSuccess()) {
long maxCacheDurationMs = maxCacheDurationMs(callCtx.getRealmConfig());
return new StorageCredentialCacheEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public AccessConfig getSubscopedCreds(
boolean allowListOperation,
@Nonnull Set<String> allowedReadLocations,
@Nonnull Set<String> allowedWriteLocations,
Optional<String> refreshCredentialsEndpoint) {
Optional<String> refreshCredentialsEndpoint,
Map props) {
try {
sourceCredentials.refresh();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public void testStsEndpoint() {

private static ImmutableAwsStorageConfigurationInfo.Builder newBuilder() {
return AwsStorageConfigurationInfo.builder()
.roleARN("arn:aws:iam::123456789012:role/polaris-test");
.roleARN("arn:aws:iam::123456789012:role/polaris-test")
.kmsKeyArn("arn:aws:kms:us-east-1:012345678901:key/444343245");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,74 @@ public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() {
String.valueOf(EXPIRE_TIME.toEpochMilli()));
}

@Test
public void testGetSubscopedCredsInlinePolicyWithKmsKey() {
StsClient stsClient = Mockito.mock(StsClient.class);
String roleARN = "arn:aws:iam::012345678901:role/jdoe";
String externalId = "externalId";
String bucket = "bucket";
String warehouseKeyPrefix = "path/to/warehouse";
String firstPath = warehouseKeyPrefix + "/namespace/table";
String kmsKeyArn = "arn:aws:kms:us-east-1:012345678901:key/444343245";
Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class)))
.thenAnswer(
invocation -> {
assertThat(invocation.getArguments()[0])
.isInstanceOf(AssumeRoleRequest.class)
.asInstanceOf(InstanceOfAssertFactories.type(AssumeRoleRequest.class))
.extracting(AssumeRoleRequest::policy)
.extracting(IamPolicy::fromJson)
.satisfies(
policy -> {
assertThat(policy)
.extracting(IamPolicy::statements)
.asInstanceOf(InstanceOfAssertFactories.list(IamStatement.class))
.hasSize(5)
.anySatisfy(
statement ->
assertThat(statement)
.returns(IamEffect.ALLOW, IamStatement::effect)
.returns(
List.of(
IamAction.create(
"kms:GenerateDataKeyWithoutPlaintext"),
IamAction.create("kms:DescribeKey"),
IamAction.create("kms:Decrypt"),
IamAction.create("kms:GenerateDataKey"),
IamAction.create("kms:Encrypt")),
IamStatement::actions)
.returns(
List.of(IamResource.create(kmsKeyArn)),
IamStatement::resources));
});
return ASSUME_ROLE_RESPONSE;
});
AccessConfig accessConfig =
new AwsCredentialsStorageIntegration(
AwsStorageConfigurationInfo.builder()
.addAllowedLocation(s3Path(bucket, warehouseKeyPrefix))
.roleARN(roleARN)
.externalId(externalId)
.region("us-east-1")
.kmsKeyArn(kmsKeyArn)
.build(),
stsClient)
.getSubscopedCreds(
EMPTY_REALM_CONFIG,
true,
Set.of(s3Path(bucket, firstPath)),
Set.of(s3Path(bucket, firstPath)),
Optional.empty());
assertThat(accessConfig.credentials())
.isNotEmpty()
.containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess")
.containsEntry(StorageAccessProperty.AWS_KEY_ID.getPropertyName(), "accessKey")
.containsEntry(StorageAccessProperty.AWS_SECRET_KEY.getPropertyName(), "secretKey")
.containsEntry(
StorageAccessProperty.AWS_SESSION_TOKEN_EXPIRES_AT_MS.getPropertyName(),
String.valueOf(EXPIRE_TIME.toEpochMilli()));
}

@ParameterizedTest
@ValueSource(strings = {AWS_PARTITION, "aws-cn", "aws-us-gov"})
public void testClientRegion(String awsPartition) {
Expand Down
Loading
Loading