Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ io.reactivex:rxnetty:0.4.20
io.swagger:swagger-annotations:1.5.4
javax.inject:javax.inject:1
net.java.dev.jna:jna:5.2.0
net.minidev:accessors-smart:1.2
net.minidev:accessors-smart:1.21
org.apache.avro:avro:1.11.4
org.apache.commons:commons-compress:1.26.1
org.apache.commons:commons-configuration2:2.10.1
Expand Down Expand Up @@ -360,7 +360,7 @@ org.objenesis:objenesis:2.6
org.xerial.snappy:snappy-java:1.1.10.4
org.yaml:snakeyaml:2.0
org.wildfly.openssl:wildfly-openssl:2.2.5.Final
software.amazon.awssdk:bundle:2.29.52
software.amazon.awssdk:bundle:2.35.4
software.amazon.s3.analyticsaccelerator:analyticsaccelerator-s3:1.3.0

--------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.contract;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import java.io.IOException;
Expand All @@ -28,6 +29,7 @@

import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readNBytes;

/**
* Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}.
Expand Down Expand Up @@ -136,10 +138,12 @@ protected void validateFileContents(FSDataInputStream stream, int length,
int startIndex)
throws IOException {
byte[] streamData = new byte[length];
assertEquals("failed to read expected number of bytes from "
+ "stream. This may be transient",
length, stream.read(streamData));
final int read = readNBytes(stream, streamData, 0, length);
Assertions.assertThat(read)
.describedAs("failed to read expected number of bytes from stream. %s", stream)
.isEqualTo(length);
byte[] validateFileBytes;

if (startIndex == 0 && length == fileBytes.length) {
validateFileBytes = fileBytes;
} else {
Expand Down
2 changes: 1 addition & 1 deletion hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@
<make-maven-plugin.version>1.0-beta-1</make-maven-plugin.version>
<surefire.fork.timeout>900</surefire.fork.timeout>
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.29.52</aws-java-sdk-v2.version>
<aws-java-sdk-v2.version>2.35.4</aws-java-sdk-v2.version>
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
<amazon-s3-analyticsaccelerator-s3.version>1.3.0</amazon-s3-analyticsaccelerator-s3.version>
<aws.eventstream.version>1.0.1</aws.eventstream.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,8 @@ public String getMessage() {
public boolean retryable() {
return getCause().retryable();
}

public String getOperation() {
return operation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
* Status code 443, no response from server. This is considered idempotent.
*/
public class AWSNoResponseException extends AWSServiceIOException {

/**
* Constructor.
* @param operation operation in progress.
* @param cause inner cause
*/
public AWSNoResponseException(String operation,
AwsServiceException cause) {
super(operation, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.s3a.impl.ChecksumSupport;
import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;

Expand Down Expand Up @@ -1777,15 +1778,53 @@ private Constants() {
*/
public static final boolean CHECKSUM_VALIDATION_DEFAULT = false;

/**
* Should checksums always be generated?
* Not all third-party stores like this being enabled for every request.
* Value: {@value}.
*/
public static final String CHECKSUM_GENERATION =
"fs.s3a.checksum.generation";

/**
* Default value of {@link #CHECKSUM_GENERATION}.
* Value: {@value}.
*/
public static final boolean DEFAULT_CHECKSUM_GENERATION = false;

/**
* Indicates the algorithm used to create the checksum for the object
* to be uploaded to S3. Unset by default. It supports the following values:
* 'CRC32', 'CRC32C', 'SHA1', and 'SHA256'
* 'CRC32', 'CRC32C', 'SHA1', 'SHA256', 'CRC64_NVME 'NONE', ''.
* When checksum calculation is enabled this MUST be set to a valid algorithm.
* value:{@value}
*/
public static final String CHECKSUM_ALGORITHM =
"fs.s3a.create.checksum.algorithm";

/**
* Default checksum algorithm: {@code "NONE"}.
*/
public static final String DEFAULT_CHECKSUM_ALGORITHM =
ChecksumSupport.NONE;

/**
* Send a {@code Content-MD5 header} with every request.
* This is required when performing some operations with third party stores
* For example: bulk delete).
* It is supported by AWS S3, though has unexpected behavior with AWS S3 Express storage.
* See https://github.com/aws/aws-sdk-java-v2/issues/6459 for details.
*/
public static final String REQUEST_MD5_HEADER =
"fs.s3a.request.md5.header";

/**
* Default value of {@link #REQUEST_MD5_HEADER}.
* Value: {@value}.
*/
public static final boolean DEFAULT_REQUEST_MD5_HEADER = true;


/**
* Are extensions classes, such as {@code fs.s3a.aws.credentials.provider},
* going to be loaded from the same classloader that loaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.core.checksums.ResponseChecksumValidation;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
Expand All @@ -40,6 +42,7 @@
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.metrics.LoggingMetricPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.LegacyMd5Plugin;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
Expand Down Expand Up @@ -194,9 +197,32 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build

configureEndpointAndRegion(builder, parameters, conf);

// add a plugin to add a Content-MD5 header.
// this is required when performing some operations with third party stores
// (for example: bulk delete), and is somewhat harmless when working with AWS S3.
if (parameters.isMd5HeaderEnabled()) {
LOG.debug("MD5 header enabled");
builder.addPlugin(LegacyMd5Plugin.create());
}

//when to calculate request checksums.
final RequestChecksumCalculation checksumCalculation =
parameters.isChecksumCalculationEnabled()
? RequestChecksumCalculation.WHEN_SUPPORTED
: RequestChecksumCalculation.WHEN_REQUIRED;
LOG.debug("Using checksum calculation policy: {}", checksumCalculation);
builder.requestChecksumCalculation(checksumCalculation);

// response checksum validation. Slow, even with CRC32 checksums.
final ResponseChecksumValidation checksumValidation;
checksumValidation = parameters.isChecksumValidationEnabled()
? ResponseChecksumValidation.WHEN_SUPPORTED
: ResponseChecksumValidation.WHEN_REQUIRED;
LOG.debug("Using checksum validation policy: {}", checksumValidation);
builder.responseChecksumValidation(checksumValidation);

S3Configuration serviceConfiguration = S3Configuration.builder()
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
.checksumValidationEnabled(parameters.isChecksumValidationEnabled())
.build();

final ClientOverrideConfiguration.Builder override =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1182,10 +1182,15 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I
.withTransferManagerExecutor(unboundedThreadPool)
.withRegion(configuredRegion)
.withFipsEnabled(fipsEnabled)
.withS3ExpressStore(s3ExpressStore)
.withExpressCreateSession(
conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT))
.withChecksumValidationEnabled(
conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT))
.withChecksumCalculationEnabled(
conf.getBoolean(CHECKSUM_GENERATION, DEFAULT_CHECKSUM_GENERATION))
.withMd5HeaderEnabled(conf.getBoolean(REQUEST_MD5_HEADER,
DEFAULT_REQUEST_MD5_HEADER))
.withClientSideEncryptionEnabled(isCSEEnabled)
.withClientSideEncryptionMaterials(cseMaterials)
.withAnalyticsAcceleratorEnabled(isAnalyticsAcceleratorEnabled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.s3a;

import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.AbortedException;
import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
Expand Down Expand Up @@ -239,8 +240,13 @@ public static IOException translateException(@Nullable String operation,
? (S3Exception) ase
: null;
int status = ase.statusCode();
if (ase.awsErrorDetails() != null) {
message = message + ":" + ase.awsErrorDetails().errorCode();
// error details, may be null
final AwsErrorDetails errorDetails = ase.awsErrorDetails();
// error code, will be null if errorDetails is null
String errorCode = "";
if (errorDetails != null) {
errorCode = errorDetails.errorCode();
message = message + ":" + errorCode;
}

// big switch on the HTTP status code.
Expand Down Expand Up @@ -307,6 +313,8 @@ public static IOException translateException(@Nullable String operation,
// precondition failure: the object is there, but the precondition
// (e.g. etag) didn't match. Assume remote file change during
// rename or status passed in to openfile had an etag which didn't match.
// See the SC_200 handler for the treatment of the S3 Express failure
// variant.
case SC_412_PRECONDITION_FAILED:
ioe = new RemoteFileChangedException(path, message, "", ase);
break;
Expand Down Expand Up @@ -351,6 +359,16 @@ public static IOException translateException(@Nullable String operation,
return ((MultiObjectDeleteException) exception)
.translateException(message);
}
if (PRECONDITION_FAILED.equals(errorCode)) {
// S3 Express stores report conflict in conditional writes
// as a 200 + an error code of "PreconditionFailed".
// This is mapped to RemoteFileChangedException for consistency
// with SC_412_PRECONDITION_FAILED handling.
return new RemoteFileChangedException(path,
operation,
exception.getMessage(),
exception);
}
// other 200: FALL THROUGH

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ final class S3ClientCreationParameters {
*/
private String region;

/**
* Is this an S3 Express store?
*/
private boolean s3ExpressStore;

/**
* Enable S3Express create session.
*/
Expand All @@ -207,6 +212,17 @@ final class S3ClientCreationParameters {
*/
private boolean isAnalyticsAcceleratorEnabled;

/**
* Is the MD5 Header Enabled?
*/
private boolean md5HeaderEnabled;

/**
* Is Checksum calculation Enabled?
*/
private boolean checksumCalculationEnabled;


/**
* List of execution interceptors to include in the chain
* of interceptors in the SDK.
Expand Down Expand Up @@ -255,10 +271,18 @@ public S3ClientCreationParameters withRequesterPays(
return this;
}

/**
* Is this a requester pays bucket?
* @return true if the bucket is requester pays.
*/
public boolean isRequesterPays() {
return requesterPays;
}

/**
* Get the credentials.
* @return the credential provider.
*/
public AwsCredentialsProvider getCredentialSet() {
return credentialSet;
}
Expand All @@ -275,6 +299,10 @@ public S3ClientCreationParameters withCredentialSet(
return this;
}

/**
* Get UA suffix.
* @return suffix.
*/
public String getUserAgentSuffix() {
return userAgentSuffix;
}
Expand Down Expand Up @@ -536,6 +564,20 @@ public String getKmsRegion() {
return kmsRegion;
}

public boolean isS3ExpressStore() {
return s3ExpressStore;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withS3ExpressStore(final boolean value) {
s3ExpressStore = value;
return this;
}

/**
* Should s3express createSession be called?
* @return true if the client should enable createSession.
Expand Down Expand Up @@ -564,10 +606,46 @@ public S3ClientCreationParameters withChecksumValidationEnabled(final boolean va
return this;
}

/**
* Is checksum validation on every request enabled?
* @return true if validation is on every request.
*/
public boolean isChecksumValidationEnabled() {
return checksumValidationEnabled;
}

/**
* Should MD5 headers be added?
* @return true to always add an MD5 header.
*/
public boolean isMd5HeaderEnabled() {
return md5HeaderEnabled;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withMd5HeaderEnabled(final boolean value) {
md5HeaderEnabled = value;
return this;
}

public boolean isChecksumCalculationEnabled() {
return checksumCalculationEnabled;
}

/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withChecksumCalculationEnabled(final boolean value) {
checksumCalculationEnabled = value;
return this;
}

@Override
public String toString() {
return "S3ClientCreationParameters{" +
Expand All @@ -580,8 +658,10 @@ public String toString() {
", multiPartThreshold=" + multiPartThreshold +
", multipartCopy=" + multipartCopy +
", region='" + region + '\'' +
", s3ExpressStore=" + s3ExpressStore +
", expressCreateSession=" + expressCreateSession +
", checksumValidationEnabled=" + checksumValidationEnabled +
", md5HeaderEnabled=" + md5HeaderEnabled +
'}';
}

Expand Down
Loading