Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-s3-fe521b0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "s3",
"contributor": "",
"description": "Add CRT shouldStream config as CRT_MEMORY_BUFFER_DISABLED SDK advanced client option"
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ public final class SdkAdvancedAsyncClientOption<T> extends ClientOption<T> {
public static final SdkAdvancedAsyncClientOption<Executor> FUTURE_COMPLETION_EXECUTOR =
new SdkAdvancedAsyncClientOption<>(Executor.class);

/**
* Advanced configuration for the native S3CrtAsyncClient, which only applies for file-based multipart uploads.
* By default, the S3CrtAsyncClient will skip buffering parts in memory for large objects.
* <p>
* When set to false, the client will not buffer parts for large object, but will buffer parts for smaller objects .
* When set to true, the client will skip buffering parts even for small objects.
* Parts for large objects are never buffered.
* <p>
* Default to {@code false}.
*/
public static final SdkAdvancedAsyncClientOption<Boolean> CRT_MEMORY_BUFFER_DISABLED =
new SdkAdvancedAsyncClientOption<>(Boolean.class);

private SdkAdvancedAsyncClientOption(Class<T> valueClass) {
super(valueClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.crt.CrtResource;
Expand Down Expand Up @@ -79,6 +80,25 @@ public static void teardown() throws IOException {

@Test
void putObject_fileRequestBody_objectSentCorrectly() throws Exception {
S3AsyncClient crtClientWithMemoryBufferDisabled = S3CrtAsyncClient.builder()
.credentialsProvider(AwsTestBase.CREDENTIALS_PROVIDER_CHAIN)
.region(S3IntegrationTestBase.DEFAULT_REGION)
.advancedOption(SdkAdvancedAsyncClientOption.CRT_MEMORY_BUFFER_DISABLED, true)
.build();

AsyncRequestBody body = AsyncRequestBody.fromFile(testFile.toPath());
crtClientWithMemoryBufferDisabled.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join();

ResponseInputStream<GetObjectResponse> objContent = S3IntegrationTestBase.s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
ResponseTransformer.toInputStream());

byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath()));

Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
}

@Test
void putObject_withMemoryBufferDisabled_fileRequestBody_objectSentCorrectly() throws Exception {
AsyncRequestBody body = AsyncRequestBody.fromFile(testFile.toPath());
s3Crt.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join();

Expand All @@ -90,6 +110,7 @@ void putObject_fileRequestBody_objectSentCorrectly() throws Exception {
Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
}


@Test
void putObject_file_objectSentCorrectly() throws Exception {
s3Crt.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), testFile.toPath()).join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.net.URI;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -25,6 +26,7 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.regions.Region;
Expand Down Expand Up @@ -361,6 +363,21 @@ default S3CrtAsyncClientBuilder retryConfiguration(Consumer<S3CrtRetryConfigurat
*/
S3CrtAsyncClientBuilder disableS3ExpressSessionAuth(Boolean disableS3ExpressSessionAuth);

/**
* Configure an advanced async option. These values are used very rarely, and the majority of SDK customers can ignore
* them.
*
* @param option The option to configure.
* @param value The value of the option.
* @param <T> The type of the option.
*/
<T> S3CrtAsyncClientBuilder advancedOption(SdkAdvancedAsyncClientOption<T> option, T value);

/**
* Configure the map of advanced override options. This will override all values currently configured. The values in the
* map must match the key type of the map, or a runtime exception will be raised.
*/
S3CrtAsyncClientBuilder advancedOptions(Map<SdkAdvancedAsyncClientOption<?>, ?> advancedOptions);

@Override
S3AsyncClient build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import software.amazon.awssdk.annotations.SdkInternalApi;
Expand All @@ -47,6 +48,7 @@
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
Expand Down Expand Up @@ -78,6 +80,7 @@
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.Validate;

Expand Down Expand Up @@ -222,7 +225,8 @@ private static S3CrtAsyncHttpClient.Builder initializeS3CrtAsyncHttpClient(Defau
.readBufferSizeInBytes(builder.readBufferSizeInBytes)
.httpConfiguration(builder.httpConfiguration)
.thresholdInBytes(builder.thresholdInBytes)
.maxNativeMemoryLimitInBytes(builder.maxNativeMemoryLimitInBytes);
.maxNativeMemoryLimitInBytes(builder.maxNativeMemoryLimitInBytes)
.advancedOptions(builder.advancedOptions.build());

if (builder.retryConfiguration != null) {
nativeClientBuilder.standardRetryOptions(
Expand Down Expand Up @@ -257,6 +261,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB
private Executor futureCompletionExecutor;
private Boolean disableS3ExpressSessionAuth;

private AttributeMap.Builder advancedOptions = AttributeMap.builder();

@Override
public DefaultS3CrtClientBuilder credentialsProvider(AwsCredentialsProvider credentialsProvider) {
Expand Down Expand Up @@ -388,6 +393,18 @@ public DefaultS3CrtClientBuilder disableS3ExpressSessionAuth(Boolean disableS3Ex
return this;
}

@Override
public <T> DefaultS3CrtClientBuilder advancedOption(SdkAdvancedAsyncClientOption<T> option, T value) {
this.advancedOptions.put(option, value);
return this;
}

@Override
public DefaultS3CrtClientBuilder advancedOptions(Map<SdkAdvancedAsyncClientOption<?>, ?> advancedOptions) {
this.advancedOptions.putAll(advancedOptions);
return this;
}

@Override
public S3CrtAsyncClient build() {
return new DefaultS3CrtAsyncClient(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import software.amazon.awssdk.crt.http.HttpProxyEnvironmentVariableSetting;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.s3.ChecksumConfig;
import software.amazon.awssdk.crt.s3.FileIoOptions;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.crt.s3.S3Client;
import software.amazon.awssdk.crt.s3.S3ClientOptions;
Expand Down Expand Up @@ -127,6 +128,8 @@ private S3ClientOptions createS3ClientOption() {
.ifPresent(options::withConnectTimeoutMs);
Optional.ofNullable(s3NativeClientConfiguration.httpMonitoringOptions())
.ifPresent(options::withHttpMonitoringOptions);
Optional.ofNullable(s3NativeClientConfiguration.memoryBufferDisabled())
.ifPresent(memoryBufferDisabled -> options.withFileIoOptions(new FileIoOptions(memoryBufferDisabled, 0.0, false)));
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.services.s3.internal.crt;

import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.CRT_MEMORY_BUFFER_DISABLED;
import static software.amazon.awssdk.crtcore.CrtConfigurationUtils.resolveHttpMonitoringOptions;
import static software.amazon.awssdk.crtcore.CrtConfigurationUtils.resolveProxy;

Expand All @@ -33,6 +34,7 @@
import software.amazon.awssdk.identity.spi.IdentityProvider;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.SdkAutoCloseable;
import software.amazon.awssdk.utils.Validate;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class S3NativeClientConfiguration implements SdkAutoCloseable {
private final HttpMonitoringOptions httpMonitoringOptions;
private final Boolean useEnvironmentVariableProxyOptionsValues;
private final long maxNativeMemoryLimitInBytes;
private final Boolean memoryBufferDisabled;

public S3NativeClientConfiguration(Builder builder) {
this.signingRegion = builder.signingRegion == null ? DefaultAwsRegionProviderChain.builder().build().getRegion().id() :
Expand Down Expand Up @@ -113,6 +116,8 @@ public S3NativeClientConfiguration(Builder builder) {
}
this.standardRetryOptions = builder.standardRetryOptions;
this.useEnvironmentVariableProxyOptionsValues = resolveUseEnvironmentVariableValues(builder);
this.memoryBufferDisabled =
builder.advancedOptions == null ? null : builder.advancedOptions.get(CRT_MEMORY_BUFFER_DISABLED);
}

private static Boolean resolveUseEnvironmentVariableValues(Builder builder) {
Expand Down Expand Up @@ -191,6 +196,10 @@ public Long readBufferSizeInBytes() {
return readBufferSizeInBytes;
}

public Boolean memoryBufferDisabled() {
return memoryBufferDisabled;
}

@Override
public void close() {
clientBootstrap.close();
Expand All @@ -213,6 +222,8 @@ public static final class Builder {
private Long thresholdInBytes;
private Long maxNativeMemoryLimitInBytes;

private AttributeMap advancedOptions;

private Builder() {
}

Expand Down Expand Up @@ -274,5 +285,10 @@ public Builder thresholdInBytes(Long thresholdInBytes) {
this.thresholdInBytes = thresholdInBytes;
return this;
}

public Builder advancedOptions(AttributeMap advancedOptions) {
this.advancedOptions = advancedOptions;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
Expand All @@ -37,6 +38,7 @@
import software.amazon.awssdk.services.s3.endpoints.S3ClientContextParams;
import software.amazon.awssdk.services.s3.internal.crossregion.S3CrossRegionAsyncClient;
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.MapUtils;

class DefaultS3CrtAsyncClientTest {

Expand Down Expand Up @@ -102,7 +104,7 @@ void invalidConfig_shouldThrowException(long value) {
void crtClient_with_crossRegionAccessEnabled_asTrue() {
try (S3AsyncClient crossRegionCrtClient = S3AsyncClient.crtBuilder().crossRegionAccessEnabled(true).build()) {
assertThat(crossRegionCrtClient).isInstanceOf(DefaultS3CrtAsyncClient.class);
assertThat(((DelegatingS3AsyncClient)crossRegionCrtClient).delegate()).isInstanceOf(S3CrossRegionAsyncClient.class);
assertThat(((DelegatingS3AsyncClient) crossRegionCrtClient).delegate()).isInstanceOf(S3CrossRegionAsyncClient.class);
}
}

Expand All @@ -115,7 +117,7 @@ void crtClient_with_crossRegionAccessEnabled_asFalse() {

try (S3AsyncClient defaultCrtClient = S3AsyncClient.crtBuilder().build()) {
assertThat(defaultCrtClient).isInstanceOf(DefaultS3CrtAsyncClient.class);
assertThat(((DelegatingS3AsyncClient)defaultCrtClient).delegate()).isNotInstanceOf(S3CrossRegionAsyncClient.class);
assertThat(((DelegatingS3AsyncClient) defaultCrtClient).delegate()).isNotInstanceOf(S3CrossRegionAsyncClient.class);
}
}

Expand All @@ -140,4 +142,23 @@ void defaultClient_credentialsProvidersNotSingleton() {
.isNotEqualTo(DefaultCredentialsProvider.create());
}
}

@Test
void build_withAdvancedOptions() {
try (DefaultS3CrtAsyncClient client = (DefaultS3CrtAsyncClient) S3AsyncClient
.crtBuilder()
.advancedOption(SdkAdvancedAsyncClientOption.CRT_MEMORY_BUFFER_DISABLED, true)
.build()) {
assertThat(client).isNotNull();
assertThat(client).isInstanceOf(DefaultS3CrtAsyncClient.class);
}

try (DefaultS3CrtAsyncClient client = (DefaultS3CrtAsyncClient) S3AsyncClient
.crtBuilder()
.advancedOptions(MapUtils.of(SdkAdvancedAsyncClientOption.CRT_MEMORY_BUFFER_DISABLED, true))
.build()) {
assertThat(client).isNotNull();
assertThat(client).isInstanceOf(DefaultS3CrtAsyncClient.class);
}
}
}
Loading
Loading