Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions aws/src/integration/java/org/apache/iceberg/aws/s3/MinioUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.LegacyMd5Plugin;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;

public class MinioUtil {
public static final String LATEST_TAG = "latest";
// This version doesn't support strong integrity checks
static final String LEGACY_TAG = "RELEASE.2024-12-18T13-15-44Z";

private MinioUtil() {}

public static MinIOContainer createContainer() {
return createContainer(null);
return createContainer(LATEST_TAG, null);
}

public static MinIOContainer createContainer(AwsCredentials credentials) {
var container = new MinIOContainer(DockerImageName.parse("minio/minio:latest"));
public static MinIOContainer createContainer(String tag, AwsCredentials credentials) {
var container = new MinIOContainer(DockerImageName.parse("minio/minio").withTag(tag));

// this enables virtual-host-style requests. see
// https://github.com/minio/minio/tree/master/docs/config#domain
Expand All @@ -52,8 +56,15 @@ public static MinIOContainer createContainer(AwsCredentials credentials) {
}

public static S3Client createS3Client(MinIOContainer container) {
return createS3Client(container, false);
}

public static S3Client createS3Client(MinIOContainer container, boolean legacyMd5PluginEnabled) {
URI uri = URI.create(container.getS3URL());
S3ClientBuilder builder = S3Client.builder();
if (legacyMd5PluginEnabled) {
builder.addPlugin(LegacyMd5Plugin.create());
}
builder.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(container.getUserName(), container.getPassword())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,10 @@

@Testcontainers
public class TestS3FileIO {
@Container private static final MinIOContainer MINIO = MinioUtil.createContainer();
@Container private final MinIOContainer minio = createMinIOContainer();

private final SerializableSupplier<S3Client> s3 = () -> MinioUtil.createS3Client(MINIO);
private final SerializableSupplier<S3Client> s3 =
() -> MinioUtil.createS3Client(minio, legacyMd5PluginEnabled());
private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3.get()));
private final Random random = new Random(1);
private final int numBucketsForBatchDeletion = 3;
Expand All @@ -135,6 +136,16 @@ public class TestS3FileIO {
"s3.delete.batch-size",
Integer.toString(batchDeletionSize));

protected MinIOContainer createMinIOContainer() {
MinIOContainer container = MinioUtil.createContainer();
container.start();
return container;
}

protected boolean legacyMd5PluginEnabled() {
return false;
}

@BeforeEach
public void before() {
s3FileIO = new S3FileIO(() -> s3mock);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws.s3;

import org.testcontainers.containers.MinIOContainer;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
public class TestS3FileIOWithLegacyMinIO extends TestS3FileIO {
@Override
protected MinIOContainer createMinIOContainer() {
MinIOContainer container = MinioUtil.createContainer(MinioUtil.LEGACY_TAG, null);
container.start();
return container;
}

@Override
protected boolean legacyMd5PluginEnabled() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class TestS3RestSigner {

@Container
private static final MinIOContainer MINIO_CONTAINER =
MinioUtil.createContainer(CREDENTIALS_PROVIDER.resolveCredentials());
MinioUtil.createContainer(MinioUtil.LATEST_TAG, CREDENTIALS_PROVIDER.resolveCredentials());

private static Server httpServer;
private static ValidatingSigner validatingSigner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class AssumeRoleAwsClientFactory implements AwsClientFactory {
public S3Client s3() {
return S3Client.builder()
.applyMutation(this::applyAssumeRoleConfigurations)
.applyMutation(awsClientProperties::applyLegacyMd5Plugin)
.applyMutation(httpClientProperties::applyHttpClientConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyServiceConfigurations)
Expand All @@ -70,6 +71,7 @@ public S3AsyncClient s3Async() {
.applyMutation(this::applyAssumeRoleConfigurations)
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(awsClientProperties::applyLegacyMd5Plugin)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ static class DefaultAwsClientFactory implements AwsClientFactory {
public S3Client s3() {
return S3Client.builder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyLegacyMd5Plugin)
.applyMutation(httpClientProperties::applyHttpClientConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyServiceConfigurations)
Expand All @@ -132,6 +133,7 @@ public S3AsyncClient s3Async() {
}
return S3AsyncClient.builder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyLegacyMd5Plugin)
.applyMutation(
b -> s3FileIOProperties.applyCredentialConfigurations(awsClientProperties, b))
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
Expand Down
38 changes: 35 additions & 3 deletions aws/src/main/java/org/apache/iceberg/aws/AwsClientProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.LegacyMd5Plugin;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;

public class AwsClientProperties implements Serializable {
Expand Down Expand Up @@ -82,11 +83,20 @@ public class AwsClientProperties implements Serializable {
/** Controls whether vended credentials should be refreshed or not. Defaults to true. */
public static final String REFRESH_CREDENTIALS_ENABLED = "client.refresh-credentials-enabled";

/**
* Controls whether legacy MD5 plugin should be added or not. Defaults to false. AWS SDK version
* 2.30.0 introduced integrity protections that are not backward compatible. Enable this property
* only when you need to access older S3-compatible object storages that depend on the legacy MD5
* checksum.
*/
public static final String LEGACY_MD5_PLUGIN_ENABLED = "client.legacy-md5-plugin-enabled";

private String clientRegion;
private final String clientCredentialsProvider;
private final Map<String, String> clientCredentialsProviderProperties;
private final String refreshCredentialsEndpoint;
private final boolean refreshCredentialsEnabled;
private final boolean legacyMd5pluginEnabled;
private final Map<String, String> allProperties;

public AwsClientProperties() {
Expand All @@ -95,6 +105,7 @@ public AwsClientProperties() {
this.clientCredentialsProviderProperties = null;
this.refreshCredentialsEndpoint = null;
this.refreshCredentialsEnabled = true;
this.legacyMd5pluginEnabled = false;
this.allProperties = null;
}

Expand All @@ -109,6 +120,8 @@ public AwsClientProperties(Map<String, String> properties) {
properties.get(CatalogProperties.URI), properties.get(REFRESH_CREDENTIALS_ENDPOINT));
this.refreshCredentialsEnabled =
PropertyUtil.propertyAsBoolean(properties, REFRESH_CREDENTIALS_ENABLED, true);
this.legacyMd5pluginEnabled =
PropertyUtil.propertyAsBoolean(properties, LEGACY_MD5_PLUGIN_ENABLED, false);
}

public String clientRegion() {
Expand All @@ -128,7 +141,8 @@ public void setClientRegion(String clientRegion) {
* S3Client.builder().applyMutation(awsClientProperties::applyClientRegionConfiguration)
* </pre>
*/
public <T extends AwsClientBuilder> void applyClientRegionConfiguration(T builder) {
public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT>
void applyClientRegionConfiguration(BuilderT builder) {
if (clientRegion != null) {
builder.region(Region.of(clientRegion));
}
Expand Down Expand Up @@ -158,7 +172,8 @@ public <T extends S3CrtAsyncClientBuilder> void applyClientRegionConfiguration(T
* DynamoDbClient.builder().applyMutation(awsClientProperties::applyClientCredentialConfigurations)
* </pre>
*/
public <T extends AwsClientBuilder> void applyClientCredentialConfigurations(T builder) {
public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT>
void applyClientCredentialConfigurations(BuilderT builder) {
if (!Strings.isNullOrEmpty(this.clientCredentialsProvider)) {
builder.credentialsProvider(credentialsProvider(this.clientCredentialsProvider));
}
Expand Down Expand Up @@ -231,7 +246,8 @@ public AwsCredentialsProvider credentialsProvider(
* KmsClient.builder().applyMutation(awsClientProperties::applyRetryConfigurations)
* </pre>
*/
public <T extends AwsClientBuilder> void applyRetryConfigurations(T builder) {
public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT>
void applyRetryConfigurations(BuilderT builder) {
ClientOverrideConfiguration.Builder configBuilder =
null != builder.overrideConfiguration()
? builder.overrideConfiguration().toBuilder()
Expand All @@ -240,6 +256,22 @@ public <T extends AwsClientBuilder> void applyRetryConfigurations(T builder) {
builder.overrideConfiguration(configBuilder.retryStrategy(RetryMode.ADAPTIVE_V2).build());
}

/**
* Add a legacy md5 plugin if it is enabled.
*
* <p>Sample usage:
*
* <pre>
* S3Client.builder().applyMutation(awsClientProperties::applyLegacyMd5Plugin)
* </pre>
*/
public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> void applyLegacyMd5Plugin(
BuilderT builder) {
if (legacyMd5pluginEnabled) {
builder.addPlugin(LegacyMd5Plugin.create());
}
}

private AwsCredentialsProvider credentialsProvider(String credentialsProviderClass) {
Class<?> providerClass;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void initialize(Map<String, String> catalogProperties) {
public S3Client s3() {
if (isTableRegisteredWithLakeFormation()) {
return S3Client.builder()
.applyMutation(awsClientProperties()::applyLegacyMd5Plugin)
.applyMutation(httpClientProperties()::applyHttpClientConfigurations)
.applyMutation(s3FileIOProperties()::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties()::applyServiceConfigurations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public void initialize(Map<String, String> properties) {
public S3Client s3() {
return S3Client.builder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyLegacyMd5Plugin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should most likely be applied to the s3Async() case below as well

.applyMutation(httpClientProperties::applyHttpClientConfigurations)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.applyMutation(s3FileIOProperties::applyServiceConfigurations)
Expand Down Expand Up @@ -73,6 +74,7 @@ public S3AsyncClient s3Async() {
return S3AsyncClient.builder()
.applyMutation(awsClientProperties::applyClientRegionConfiguration)
.applyMutation(awsClientProperties::applyClientCredentialConfigurations)
.applyMutation(awsClientProperties::applyLegacyMd5Plugin)
.applyMutation(s3FileIOProperties::applyEndpointConfigurations)
.build();
}
Expand Down