Skip to content

Commit

Permalink
Expose S3TransferManager config property S3ClientConfiguration.endpoi…
Browse files Browse the repository at this point in the history
…ntOverride (#3084)
  • Loading branch information
ron1 committed Mar 17, 2022
1 parent de37830 commit 7121f4f
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.transfer.s3;

import java.net.URI;
import java.util.Objects;
import java.util.Optional;
import software.amazon.awssdk.annotations.SdkPreviewApi;
Expand All @@ -39,6 +40,7 @@ public final class S3ClientConfiguration implements ToCopyableBuilder<S3ClientCo
private final Long minimumPartSizeInBytes;
private final Double targetThroughputInGbps;
private final Integer maxConcurrency;
private final URI endpointOverride;

private S3ClientConfiguration(DefaultBuilder builder) {
this.credentialsProvider = builder.credentialsProvider;
Expand All @@ -47,6 +49,7 @@ private S3ClientConfiguration(DefaultBuilder builder) {
this.targetThroughputInGbps = Validate.isPositiveOrNull(builder.targetThroughputInGbps, "targetThroughputInGbps");
this.maxConcurrency = Validate.isPositiveOrNull(builder.maxConcurrency,
"maxConcurrency");
this.endpointOverride = builder.endpointOverride;
}

/**
Expand Down Expand Up @@ -84,6 +87,13 @@ public Optional<Integer> maxConcurrency() {
return Optional.ofNullable(maxConcurrency);
}

/**
* @return the optional endpoint override with which the SDK should communicate.
*/
public Optional<URI> endpointOverride() {
return Optional.ofNullable(endpointOverride);
}

@Override
public Builder toBuilder() {
return new DefaultBuilder(this);
Expand Down Expand Up @@ -112,7 +122,10 @@ public boolean equals(Object o) {
if (!Objects.equals(targetThroughputInGbps, that.targetThroughputInGbps)) {
return false;
}
return Objects.equals(maxConcurrency, that.maxConcurrency);
if (!Objects.equals(maxConcurrency, that.maxConcurrency)) {
return false;
}
return Objects.equals(endpointOverride, that.endpointOverride);
}

@Override
Expand All @@ -122,6 +135,7 @@ public int hashCode() {
result = 31 * result + (minimumPartSizeInBytes != null ? minimumPartSizeInBytes.hashCode() : 0);
result = 31 * result + (targetThroughputInGbps != null ? targetThroughputInGbps.hashCode() : 0);
result = 31 * result + (maxConcurrency != null ? maxConcurrency.hashCode() : 0);
result = 31 * result + (endpointOverride != null ? endpointOverride.hashCode() : 0);
return result;
}

Expand Down Expand Up @@ -216,6 +230,14 @@ public interface Builder extends CopyableBuilder<Builder, S3ClientConfiguration>
* @see #targetThroughputInGbps(Double)
*/
Builder maxConcurrency(Integer maxConcurrency);

/**
* Configure the endpoint override with which the SDK should communicate.
*
* @param endpointOverride the endpoint override to be used
* @return this builder for method chaining.
*/
Builder endpointOverride(URI endpointOverride);
}

private static final class DefaultBuilder implements Builder {
Expand All @@ -224,6 +246,7 @@ private static final class DefaultBuilder implements Builder {
private Long minimumPartSizeInBytes;
private Double targetThroughputInGbps;
private Integer maxConcurrency;
private URI endpointOverride;

private DefaultBuilder() {
}
Expand All @@ -234,6 +257,7 @@ private DefaultBuilder(S3ClientConfiguration configuration) {
this.minimumPartSizeInBytes = configuration.minimumPartSizeInBytes;
this.targetThroughputInGbps = configuration.targetThroughputInGbps;
this.maxConcurrency = configuration.maxConcurrency;
this.endpointOverride = configuration.endpointOverride;
}

@Override
Expand Down Expand Up @@ -266,6 +290,12 @@ public Builder maxConcurrency(Integer maxConcurrency) {
return this;
}

@Override
public Builder endpointOverride(URI endpointOverride) {
this.endpointOverride = endpointOverride;
return this;
}

@Override
public S3ClientConfiguration build() {
return new S3ClientConfiguration(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
import static software.amazon.awssdk.transfer.s3.internal.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;

import java.net.URI;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
Expand Down Expand Up @@ -58,6 +59,7 @@ private DefaultS3CrtAsyncClient(DefaultS3CrtClientBuilder builder) {
.minimumPartSizeInBytes(builder.minimumPartSizeInBytes())
.maxConcurrency(builder.maxConcurrency)
.region(builder.region)
.endpointOverride(builder.endpointOverride)
.credentialsProvider(builder.credentialsProvider)
.build();

Expand All @@ -79,6 +81,7 @@ private S3AsyncClient initializeS3AsyncClient(DefaultS3CrtClientBuilder builder)
.checksumValidationEnabled(false)
.build())
.region(builder.region)
.endpointOverride(builder.endpointOverride)
.credentialsProvider(builder.credentialsProvider)
.overrideConfiguration(o -> o.putAdvancedOption(SdkAdvancedClientOption.SIGNER,
new NoOpSigner())
Expand Down Expand Up @@ -144,6 +147,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB
private Long minimalPartSizeInBytes;
private Double targetThroughputInGbps;
private Integer maxConcurrency;
private URI endpointOverride;

public AwsCredentialsProvider credentialsProvider() {
return credentialsProvider;
Expand All @@ -165,6 +169,11 @@ public Integer maxConcurrency() {
return maxConcurrency;
}

public URI endpointOverride() {
return endpointOverride;
}


@Override
public S3CrtAsyncClientBuilder credentialsProvider(AwsCredentialsProvider credentialsProvider) {
this.credentialsProvider = credentialsProvider;
Expand Down Expand Up @@ -195,6 +204,12 @@ public S3CrtAsyncClientBuilder maxConcurrency(Integer maxConcurrency) {
return this;
}

@Override
public S3CrtAsyncClientBuilder endpointOverride(URI endpointOverride) {
this.endpointOverride = endpointOverride;
return this;
}

@Override
public S3CrtAsyncClient build() {
return new DefaultS3CrtAsyncClient(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private static S3CrtAsyncClient initializeS3CrtClient(DefaultBuilder tmBuilder)
tmBuilder.s3ClientConfiguration.minimumPartSizeInBytes().ifPresent(clientBuilder::minimumPartSizeInBytes);
tmBuilder.s3ClientConfiguration.region().ifPresent(clientBuilder::region);
tmBuilder.s3ClientConfiguration.targetThroughputInGbps().ifPresent(clientBuilder::targetThroughputInGbps);
tmBuilder.s3ClientConfiguration.endpointOverride().ifPresent(clientBuilder::endpointOverride);

return clientBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.transfer.s3.internal;

import java.net.URI;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
Expand All @@ -39,6 +40,8 @@ interface S3CrtAsyncClientBuilder extends SdkBuilder<S3CrtAsyncClientBuilder, S3

S3CrtAsyncClientBuilder maxConcurrency(Integer maxConcurrency);

S3CrtAsyncClientBuilder endpointOverride(URI endpointOverride);

@Override
S3CrtAsyncClient build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,14 @@ private S3CrtAsyncHttpClient(Builder builder) {
.partSizeInBytes(builder.minimalPartSizeInBytes)
.maxConcurrency(builder.maxConcurrency)
.signingRegion(builder.region == null ? null : builder.region.id())
.endpointOverride(builder.endpointOverride)
.credentialsProvider(builder.credentialsProvider)
.build();

S3ClientOptions s3ClientOptions =
new S3ClientOptions().withRegion(s3NativeClientConfiguration.signingRegion())
.withEndpoint(s3NativeClientConfiguration.endpointOverride() == null ? null :
s3NativeClientConfiguration.endpointOverride().toString())
.withCredentialsProvider(s3NativeClientConfiguration.credentialsProvider())
.withClientBootstrap(s3NativeClientConfiguration.clientBootstrap())
.withPartSize(s3NativeClientConfiguration.partSizeBytes())
Expand All @@ -76,17 +79,20 @@ private S3CrtAsyncHttpClient(Builder builder) {
}

@Override
public CompletableFuture<Void> execute(AsyncExecuteRequest request) {
public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
CompletableFuture<Void> executeFuture = new CompletableFuture<>();
HttpRequest httpRequest = toCrtRequest(request);
S3CrtResponseHandlerAdapter responseHandler = new S3CrtResponseHandlerAdapter(executeFuture, request.responseHandler());
URI uri = asyncRequest.request().getUri();
HttpRequest httpRequest = toCrtRequest(uri, asyncRequest);
S3CrtResponseHandlerAdapter responseHandler =
new S3CrtResponseHandlerAdapter(executeFuture, asyncRequest.responseHandler());

S3MetaRequestOptions.MetaRequestType requestType = requestType(request);
S3MetaRequestOptions.MetaRequestType requestType = requestType(asyncRequest);

S3MetaRequestOptions requestOptions = new S3MetaRequestOptions()
.withHttpRequest(httpRequest)
.withMetaRequestType(requestType)
.withResponseHandler(responseHandler);
.withResponseHandler(responseHandler)
.withEndpoint(s3NativeClientConfiguration.endpointOverride());

try (S3MetaRequest s3MetaRequest = crtS3Client.makeMetaRequest(requestOptions)) {
closeResourcesWhenComplete(executeFuture, s3MetaRequest);
Expand All @@ -100,8 +106,8 @@ public String clientName() {
return "s3crt";
}

private static S3MetaRequestOptions.MetaRequestType requestType(AsyncExecuteRequest request) {
String operationName = request.httpExecutionAttributes().getAttribute(OPERATION_NAME);
private static S3MetaRequestOptions.MetaRequestType requestType(AsyncExecuteRequest asyncRequest) {
String operationName = asyncRequest.httpExecutionAttributes().getAttribute(OPERATION_NAME);
if (operationName != null) {
switch (operationName) {
case "GetObject":
Expand All @@ -128,8 +134,7 @@ private static void closeResourcesWhenComplete(CompletableFuture<Void> executeFu
});
}

private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) {
URI uri = asyncRequest.request().getUri();
private static HttpRequest toCrtRequest(URI uri, AsyncExecuteRequest asyncRequest) {
SdkHttpRequest sdkRequest = asyncRequest.request();

String method = sdkRequest.method().name();
Expand Down Expand Up @@ -166,6 +171,7 @@ public static final class Builder implements SdkAsyncHttpClient.Builder<S3CrtAsy
private Long minimalPartSizeInBytes;
private Double targetThroughputInGbps;
private Integer maxConcurrency;
private URI endpointOverride;

/**
* Configure the credentials that should be used to authenticate with S3.
Expand Down Expand Up @@ -221,6 +227,14 @@ public Builder maxConcurrency(Integer maxConcurrency) {
return this;
}

/**
* Configure the endpoint override with which the SDK should communicate.
*/
public Builder endpointOverride(URI endpointOverride) {
this.endpointOverride = endpointOverride;
return this;
}

@Override
public SdkAsyncHttpClient build() {
return new S3CrtAsyncHttpClient(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR;

import java.net.URI;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class S3NativeClientConfiguration implements SdkAutoCloseable {
private final long partSizeInBytes;
private final double targetThroughputInGbps;
private final int maxConcurrency;
private final URI endpointOverride;
private final Executor futureCompletionExecutor;

public S3NativeClientConfiguration(Builder builder) {
Expand All @@ -72,6 +74,8 @@ public S3NativeClientConfiguration(Builder builder) {
// Using 0 so that CRT will calculate it based on targetThroughputGbps
this.maxConcurrency = builder.maxConcurrency == null ? 0 : builder.maxConcurrency;

this.endpointOverride = builder.endpointOverride;

this.futureCompletionExecutor = resolveAsyncFutureCompletionExecutor(builder.asynConfiguration);
}

Expand Down Expand Up @@ -103,6 +107,10 @@ public int maxConcurrency() {
return maxConcurrency;
}

public URI endpointOverride() {
return endpointOverride;
}

public Executor futureCompletionExecutor() {
return futureCompletionExecutor;
}
Expand Down Expand Up @@ -155,6 +163,7 @@ public static final class Builder {
private Long partSizeInBytes;
private Double targetThroughputInGbps;
private Integer maxConcurrency;
private URI endpointOverride;
private ClientAsyncConfiguration asynConfiguration;

private Builder() {
Expand Down Expand Up @@ -185,6 +194,11 @@ public Builder maxConcurrency(Integer maxConcurrency) {
return this;
}

public Builder endpointOverride(URI endpointOverride) {
this.endpointOverride = endpointOverride;
return this;
}

public Builder asyncConfiguration(ClientAsyncConfiguration asyncConfiguration) {
this.asynConfiguration = asyncConfiguration;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;

import java.net.URI;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
Expand Down Expand Up @@ -73,12 +74,15 @@ public void build_allProperties() {
.maxConcurrency(100)
.targetThroughputInGbps(10.0)
.region(Region.US_WEST_2)
.endpointOverride(URI.create(
"http://s3.us-west-1.amazonaws.com:80"))
.minimumPartSizeInBytes(5 * MB)
.build();

assertThat(configuration.credentialsProvider()).contains(credentials);
assertThat(configuration.maxConcurrency()).contains(100);
assertThat(configuration.region()).contains(Region.US_WEST_2);
assertThat(configuration.endpointOverride().toString()).contains("http://s3.us-west-1.amazonaws.com:80");
assertThat(configuration.targetThroughputInGbps()).contains(10.0);
assertThat(configuration.minimumPartSizeInBytes()).contains(5 * MB);
}
Expand All @@ -91,6 +95,7 @@ public void build_emptyBuilder() {
assertThat(configuration.credentialsProvider()).isEmpty();
assertThat(configuration.maxConcurrency()).isEmpty();
assertThat(configuration.region()).isEmpty();
assertThat(configuration.endpointOverride()).isEmpty();
assertThat(configuration.targetThroughputInGbps()).isEmpty();
assertThat(configuration.minimumPartSizeInBytes()).isEmpty();
}
Expand Down
Loading

0 comments on commit 7121f4f

Please sign in to comment.