Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,27 @@ public class NativeS3FileSystemFactory implements FileSystemFactory {
.booleanType()
.defaultValue(false)
.withFallbackKeys("s3.path.style.access")
.withDescription("Use path-style access for S3 (for S3-compatible storage)");
.withDescription(
"Use path-style access for S3. Required for most S3-compatible servers "
+ "(e.g. MinIO, Ceph) which do not support virtual-hosted style.");

public static final ConfigOption<Boolean> CHUNKED_ENCODING_ENABLED =
ConfigOptions.key("s3.chunked-encoding.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Enable AWS chunked transfer encoding for S3 requests. "
+ "Most S3-compatible servers do not support this AWS-specific "
+ "extension and require it to be disabled.");

public static final ConfigOption<Boolean> CHECKSUM_VALIDATION_ENABLED =
ConfigOptions.key("s3.checksum-validation.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Enable checksum validation for S3 requests. "
+ "Most S3-compatible servers do not support AWS checksum "
+ "headers and require it to be disabled.");

public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE =
ConfigOptions.key("s3.upload.min.part.size")
Expand Down Expand Up @@ -295,10 +315,6 @@ public FileSystem create(URI fsUri) throws IOException {
String endpoint = config.get(ENDPOINT);
boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS);

if (endpoint != null && !pathStyleAccess) {
pathStyleAccess = true;
}

S3EncryptionConfig encryptionConfig =
S3EncryptionConfig.fromConfig(config.get(SSE_TYPE), config.get(SSE_KMS_KEY_ID));
String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION);
Expand Down Expand Up @@ -371,6 +387,8 @@ public FileSystem create(URI fsUri) throws IOException {
.region(region)
.endpoint(endpoint)
.pathStyleAccess(pathStyleAccess)
.chunkedEncoding(config.get(CHUNKED_ENCODING_ENABLED))
.checksumValidation(config.get(CHECKSUM_VALIDATION_ENABLED))
.maxConnections(maxConnections)
.connectionTimeout(config.get(CONNECTION_TIMEOUT))
.socketTimeout(config.get(SOCKET_TIMEOUT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class S3ClientProvider implements AutoCloseableAsync {
private final Duration connectionTimeout;
private final Duration socketTimeout;
private final Duration connectionMaxIdleTime;
private final boolean pathStyleAccess;
private final boolean chunkedEncoding;
private final boolean checksumValidation;
private final int maxConnections;
private final int maxRetries;
private final AtomicBoolean closed = new AtomicBoolean(false);

private S3ClientProvider(
Expand All @@ -91,7 +96,12 @@ private S3ClientProvider(
Duration clientCloseTimeout,
Duration connectionTimeout,
Duration socketTimeout,
Duration connectionMaxIdleTime) {
Duration connectionMaxIdleTime,
boolean pathStyleAccess,
boolean chunkedEncoding,
boolean checksumValidation,
int maxConnections,
int maxRetries) {
this.s3Client = s3Client;
this.transferManager = transferManager;
this.encryptionConfig =
Expand All @@ -102,6 +112,11 @@ private S3ClientProvider(
this.connectionTimeout = connectionTimeout;
this.socketTimeout = socketTimeout;
this.connectionMaxIdleTime = connectionMaxIdleTime;
this.pathStyleAccess = pathStyleAccess;
this.chunkedEncoding = chunkedEncoding;
this.checksumValidation = checksumValidation;
this.maxConnections = maxConnections;
this.maxRetries = maxRetries;
}

public S3Client getS3Client() {
Expand Down Expand Up @@ -144,6 +159,31 @@ Duration getConnectionMaxIdleTime() {
return connectionMaxIdleTime;
}

@VisibleForTesting
boolean isPathStyleAccess() {
return pathStyleAccess;
}

@VisibleForTesting
boolean isChunkedEncoding() {
return chunkedEncoding;
}

@VisibleForTesting
boolean isChecksumValidation() {
return checksumValidation;
}

@VisibleForTesting
int getMaxConnections() {
return maxConnections;
}

@VisibleForTesting
int getMaxRetries() {
return maxRetries;
}

@Override
public CompletableFuture<Void> closeAsync() {
if (!closed.compareAndSet(false, true)) {
Expand Down Expand Up @@ -204,11 +244,12 @@ public static class Builder {
private String region;
private String endpoint;
private boolean pathStyleAccess = false;
private boolean chunkedEncoding = true;
private boolean checksumValidation = true;
private int maxConnections = 50;
private Duration connectionTimeout = Duration.ofSeconds(60);
private Duration socketTimeout = Duration.ofSeconds(60);
private Duration connectionMaxIdleTime = Duration.ofSeconds(60);
private boolean disableCertCheck = false;
private int maxRetries = 3;
private Duration clientCloseTimeout = Duration.ofSeconds(30);

Expand Down Expand Up @@ -249,6 +290,16 @@ public Builder pathStyleAccess(boolean pathStyleAccess) {
return this;
}

public Builder chunkedEncoding(boolean chunkedEncoding) {
this.chunkedEncoding = chunkedEncoding;
return this;
}

public Builder checksumValidation(boolean checksumValidation) {
this.checksumValidation = checksumValidation;
return this;
}

public Builder maxConnections(int maxConnections) {
this.maxConnections = maxConnections;
return this;
Expand All @@ -274,11 +325,6 @@ public Builder clientCloseTimeout(Duration clientCloseTimeout) {
return this;
}

public Builder disableCertCheck(boolean disableCertCheck) {
this.disableCertCheck = disableCertCheck;
return this;
}

public Builder maxRetries(int maxRetries) {
this.maxRetries = maxRetries;
return this;
Expand Down Expand Up @@ -326,14 +372,6 @@ public S3ClientProvider build() {
}

URI endpointUri = (endpoint != null) ? URI.create(endpoint) : null;
boolean isS3Compatible = endpointUri != null;

if (isS3Compatible && !pathStyleAccess) {
pathStyleAccess = true;
}
if (isS3Compatible && "http".equalsIgnoreCase(endpointUri.getScheme())) {
disableCertCheck = true;
}

Region awsRegion = resolveRegion(region);
StsClient stsClient = null;
Expand All @@ -347,12 +385,12 @@ public S3ClientProvider build() {
credentialsProvider = baseProvider;
}

S3Configuration.Builder s3ConfigBuilder =
S3Configuration.builder().pathStyleAccessEnabled(pathStyleAccess);
if (isS3Compatible) {
s3ConfigBuilder.chunkedEncodingEnabled(false).checksumValidationEnabled(false);
}
S3Configuration s3Config = s3ConfigBuilder.build();
S3Configuration s3Config =
S3Configuration.builder()
.pathStyleAccessEnabled(pathStyleAccess)
.chunkedEncodingEnabled(chunkedEncoding)
.checksumValidationEnabled(checksumValidation)
.build();

ClientOverrideConfiguration overrideConfig =
ClientOverrideConfiguration.builder()
Expand Down Expand Up @@ -406,7 +444,12 @@ public S3ClientProvider build() {
clientCloseTimeout,
connectionTimeout,
socketTimeout,
connectionMaxIdleTime);
connectionMaxIdleTime,
pathStyleAccess,
chunkedEncoding,
checksumValidation,
maxConnections,
maxRetries);
}

private AwsCredentialsProvider buildBaseCredentialsProvider() {
Expand Down
Loading