diff --git a/LICENSE-binary b/LICENSE-binary index 661c654f02fee..a072260cee5ca 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -295,7 +295,7 @@ io.reactivex:rxnetty:0.4.20 io.swagger:swagger-annotations:1.5.4 javax.inject:javax.inject:1 net.java.dev.jna:jna:5.2.0 -net.minidev:accessors-smart:1.2 +net.minidev:accessors-smart:1.21 org.apache.avro:avro:1.11.4 org.apache.commons:commons-compress:1.26.1 org.apache.commons:commons-configuration2:2.10.1 @@ -360,7 +360,7 @@ org.objenesis:objenesis:2.6 org.xerial.snappy:snappy-java:1.1.10.4 org.yaml:snakeyaml:2.0 org.wildfly.openssl:wildfly-openssl:2.2.5.Final -software.amazon.awssdk:bundle:2.29.52 +software.amazon.awssdk:bundle:2.35.4 software.amazon.s3.analyticsaccelerator:analyticsaccelerator-s3:1.3.0 -------------------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java index adaf0a910c620..4cb66827d1d28 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.contract; +import org.assertj.core.api.Assertions; import org.junit.Test; import java.io.IOException; @@ -28,6 +29,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.readNBytes; /** * Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}. @@ -136,10 +138,12 @@ protected void validateFileContents(FSDataInputStream stream, int length, int startIndex) throws IOException { byte[] streamData = new byte[length]; - assertEquals("failed to read expected number of bytes from " - + "stream. This may be transient", - length, stream.read(streamData)); + final int read = readNBytes(stream, streamData, 0, length); + Assertions.assertThat(read) + .describedAs("failed to read expected number of bytes from stream. %s", stream) + .isEqualTo(length); byte[] validateFileBytes; + if (startIndex == 0 && length == fileBytes.length) { validateFileBytes = fileBytes; } else { diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index ca58ce5feccd7..39e59f59e918d 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -202,7 +202,7 @@ 1.0-beta-1 900 1.12.720 - 2.29.52 + 2.35.4 3.1.1 1.3.0 1.0.1 diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java index b61667d1c502b..af187e3580db1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java @@ -54,4 +54,8 @@ public String getMessage() { public boolean retryable() { return getCause().retryable(); } + + public String getOperation() { + return operation; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java index b8562714b1aae..49ebd3a42fdf4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java @@ -24,6 +24,12 @@ * Status code 443, no response from server. This is considered idempotent. */ public class AWSNoResponseException extends AWSServiceIOException { + + /** + * Constructor. + * @param operation operation in progress. + * @param cause inner cause + */ public AWSNoResponseException(String operation, AwsServiceException cause) { super(operation, cause); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 5d4f5eb40df93..0d326cc3411e6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.s3a.impl.ChecksumSupport; import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; @@ -1777,15 +1778,53 @@ private Constants() { */ public static final boolean CHECKSUM_VALIDATION_DEFAULT = false; + /** + * Should checksums always be generated? + * Not all third-party stores like this being enabled for every request. + * Value: {@value}. + */ + public static final String CHECKSUM_GENERATION = + "fs.s3a.checksum.generation"; + + /** + * Default value of {@link #CHECKSUM_GENERATION}. + * Value: {@value}. + */ + public static final boolean DEFAULT_CHECKSUM_GENERATION = false; + /** * Indicates the algorithm used to create the checksum for the object * to be uploaded to S3. Unset by default. It supports the following values: - * 'CRC32', 'CRC32C', 'SHA1', and 'SHA256' + * 'CRC32', 'CRC32C', 'SHA1', 'SHA256', 'CRC64_NVME 'NONE', ''. + * When checksum calculation is enabled this MUST be set to a valid algorithm. * value:{@value} */ public static final String CHECKSUM_ALGORITHM = "fs.s3a.create.checksum.algorithm"; + /** + * Default checksum algorithm: {@code "NONE"}. + */ + public static final String DEFAULT_CHECKSUM_ALGORITHM = + ChecksumSupport.NONE; + + /** + * Send a {@code Content-MD5 header} with every request. + * This is required when performing some operations with third party stores + * For example: bulk delete). + * It is supported by AWS S3, though has unexpected behavior with AWS S3 Express storage. + * See https://github.com/aws/aws-sdk-java-v2/issues/6459 for details. + */ + public static final String REQUEST_MD5_HEADER = + "fs.s3a.request.md5.header"; + + /** + * Default value of {@link #REQUEST_MD5_HEADER}. + * Value: {@value}. + */ + public static final boolean DEFAULT_REQUEST_MD5_HEADER = true; + + /** * Are extensions classes, such as {@code fs.s3a.aws.credentials.provider}, * going to be loaded from the same classloader that loaded diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index f52093abbe78a..e2464ea7d294d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.util.AwsHostNameUtils; +import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; +import software.amazon.awssdk.core.checksums.ResponseChecksumValidation; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; @@ -40,6 +42,7 @@ import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity; import software.amazon.awssdk.metrics.LoggingMetricPublisher; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.LegacyMd5Plugin; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3BaseClientBuilder; @@ -194,9 +197,32 @@ private , ClientT> Build configureEndpointAndRegion(builder, parameters, conf); + // add a plugin to add a Content-MD5 header. + // this is required when performing some operations with third party stores + // (for example: bulk delete), and is somewhat harmless when working with AWS S3. + if (parameters.isMd5HeaderEnabled()) { + LOG.debug("MD5 header enabled"); + builder.addPlugin(LegacyMd5Plugin.create()); + } + + //when to calculate request checksums. + final RequestChecksumCalculation checksumCalculation = + parameters.isChecksumCalculationEnabled() + ? RequestChecksumCalculation.WHEN_SUPPORTED + : RequestChecksumCalculation.WHEN_REQUIRED; + LOG.debug("Using checksum calculation policy: {}", checksumCalculation); + builder.requestChecksumCalculation(checksumCalculation); + + // response checksum validation. Slow, even with CRC32 checksums. + final ResponseChecksumValidation checksumValidation; + checksumValidation = parameters.isChecksumValidationEnabled() + ? ResponseChecksumValidation.WHEN_SUPPORTED + : ResponseChecksumValidation.WHEN_REQUIRED; + LOG.debug("Using checksum validation policy: {}", checksumValidation); + builder.responseChecksumValidation(checksumValidation); + S3Configuration serviceConfiguration = S3Configuration.builder() .pathStyleAccessEnabled(parameters.isPathStyleAccess()) - .checksumValidationEnabled(parameters.isChecksumValidationEnabled()) .build(); final ClientOverrideConfiguration.Builder override = diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index b277419001848..3d0b92e462567 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1182,10 +1182,15 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I .withTransferManagerExecutor(unboundedThreadPool) .withRegion(configuredRegion) .withFipsEnabled(fipsEnabled) + .withS3ExpressStore(s3ExpressStore) .withExpressCreateSession( conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT)) .withChecksumValidationEnabled( conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT)) + .withChecksumCalculationEnabled( + conf.getBoolean(CHECKSUM_GENERATION, DEFAULT_CHECKSUM_GENERATION)) + .withMd5HeaderEnabled(conf.getBoolean(REQUEST_MD5_HEADER, + DEFAULT_REQUEST_MD5_HEADER)) .withClientSideEncryptionEnabled(isCSEEnabled) .withClientSideEncryptionMaterials(cseMaterials) .withAnalyticsAcceleratorEnabled(isAnalyticsAcceleratorEnabled) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 057ed3d7a0129..aca134e832109 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.exception.AbortedException; import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException; @@ -239,8 +240,13 @@ public static IOException translateException(@Nullable String operation, ? (S3Exception) ase : null; int status = ase.statusCode(); - if (ase.awsErrorDetails() != null) { - message = message + ":" + ase.awsErrorDetails().errorCode(); + // error details, may be null + final AwsErrorDetails errorDetails = ase.awsErrorDetails(); + // error code, will be null if errorDetails is null + String errorCode = ""; + if (errorDetails != null) { + errorCode = errorDetails.errorCode(); + message = message + ":" + errorCode; } // big switch on the HTTP status code. @@ -307,6 +313,8 @@ public static IOException translateException(@Nullable String operation, // precondition failure: the object is there, but the precondition // (e.g. etag) didn't match. Assume remote file change during // rename or status passed in to openfile had an etag which didn't match. + // See the SC_200 handler for the treatment of the S3 Express failure + // variant. case SC_412_PRECONDITION_FAILED: ioe = new RemoteFileChangedException(path, message, "", ase); break; @@ -351,6 +359,16 @@ public static IOException translateException(@Nullable String operation, return ((MultiObjectDeleteException) exception) .translateException(message); } + if (PRECONDITION_FAILED.equals(errorCode)) { + // S3 Express stores report conflict in conditional writes + // as a 200 + an error code of "PreconditionFailed". + // This is mapped to RemoteFileChangedException for consistency + // with SC_412_PRECONDITION_FAILED handling. + return new RemoteFileChangedException(path, + operation, + exception.getMessage(), + exception); + } // other 200: FALL THROUGH default: diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index 559cd49c34582..58d3813075695 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -187,6 +187,11 @@ final class S3ClientCreationParameters { */ private String region; + /** + * Is this an S3 Express store? + */ + private boolean s3ExpressStore; + /** * Enable S3Express create session. */ @@ -207,6 +212,17 @@ final class S3ClientCreationParameters { */ private boolean isAnalyticsAcceleratorEnabled; + /** + * Is the MD5 Header Enabled? + */ + private boolean md5HeaderEnabled; + + /** + * Is Checksum calculation Enabled? + */ + private boolean checksumCalculationEnabled; + + /** * List of execution interceptors to include in the chain * of interceptors in the SDK. @@ -255,10 +271,18 @@ public S3ClientCreationParameters withRequesterPays( return this; } + /** + * Is this a requester pays bucket? + * @return true if the bucket is requester pays. + */ public boolean isRequesterPays() { return requesterPays; } + /** + * Get the credentials. + * @return the credential provider. + */ public AwsCredentialsProvider getCredentialSet() { return credentialSet; } @@ -275,6 +299,10 @@ public S3ClientCreationParameters withCredentialSet( return this; } + /** + * Get UA suffix. + * @return suffix. + */ public String getUserAgentSuffix() { return userAgentSuffix; } @@ -536,6 +564,20 @@ public String getKmsRegion() { return kmsRegion; } + public boolean isS3ExpressStore() { + return s3ExpressStore; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withS3ExpressStore(final boolean value) { + s3ExpressStore = value; + return this; + } + /** * Should s3express createSession be called? * @return true if the client should enable createSession. @@ -564,10 +606,46 @@ public S3ClientCreationParameters withChecksumValidationEnabled(final boolean va return this; } + /** + * Is checksum validation on every request enabled? + * @return true if validation is on every request. + */ public boolean isChecksumValidationEnabled() { return checksumValidationEnabled; } + /** + * Should MD5 headers be added? + * @return true to always add an MD5 header. + */ + public boolean isMd5HeaderEnabled() { + return md5HeaderEnabled; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withMd5HeaderEnabled(final boolean value) { + md5HeaderEnabled = value; + return this; + } + + public boolean isChecksumCalculationEnabled() { + return checksumCalculationEnabled; + } + + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withChecksumCalculationEnabled(final boolean value) { + checksumCalculationEnabled = value; + return this; + } + @Override public String toString() { return "S3ClientCreationParameters{" + @@ -580,8 +658,10 @@ public String toString() { ", multiPartThreshold=" + multiPartThreshold + ", multipartCopy=" + multipartCopy + ", region='" + region + '\'' + + ", s3ExpressStore=" + s3ExpressStore + ", expressCreateSession=" + expressCreateSession + ", checksumValidationEnabled=" + checksumValidationEnabled + + ", md5HeaderEnabled=" + md5HeaderEnabled + '}'; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index e96dce6b6b4d2..dda16e0b1f68c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -315,7 +315,8 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload( } try (AuditSpan span = activateAuditSpan()) { CompleteMultipartUploadResponse uploadResult; - uploadResult = invoker.retry("Completing multipart upload", destKey, + uploadResult = invoker.retry("Completing multipart upload id " + uploadId, + destKey, true, retrying, () -> { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomSdkSigner.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomSdkSigner.java index b378602165074..e374a1ad9fe5b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomSdkSigner.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomSdkSigner.java @@ -104,6 +104,7 @@ public SdkHttpFullRequest sign(SdkHttpFullRequest request, /** * Parse the bucket name from the host. + * This does not work for path-style access; the hostname of the endpoint is returned. * @param host hostname * @return the parsed bucket name; if "kms" is KMS signing. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java index bd76d83ee096f..c2f9ef4718db2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.s3a.auth; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -199,10 +198,30 @@ private RolePolicies() { public static final String S3_RESTORE_OBJECT = "s3:RestoreObject"; /** - * S3Express session permission; required unless sessions are disabled. + * Everything: {@value}. + */ + public static final String EVERYTHING_ARN = "*"; + + /** + * All S3Express buckets: {@value}. + * S3Express adds another "domain" for permissions: S3 express ARNs and S3 Express operations, + * of which createSession is one key operation. + * See https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-express-security.html + * Note: this wildcard patten came from AWS Q; if it is wrong blame GenerativeAI. + */ + public static final String S3EXPRESS_ALL_BUCKETS = "arn:aws:s3express:*:*:bucket/*--*--x-s3"; + + /** + * S3Express session permission; required unless sessions are disabled: {@value}. + * See https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateSession.html */ public static final String S3EXPRESS_CREATE_SESSION_POLICY = "s3express:CreateSession"; + /** + * S3 Express All operations: {@value}. + */ + public static final String S3EXPRESS_ALL_OPERATIONS = "s3express:*"; + /** * Actions needed to read a file in S3 through S3A, excluding * SSE-KMS. @@ -224,7 +243,7 @@ private RolePolicies() { */ private static final String[] S3_ROOT_READ_OPERATIONS = new String[]{ - S3_ALL_GET + S3_ALL_GET, }; public static final List S3_ROOT_READ_OPERATIONS_LIST = @@ -239,7 +258,7 @@ private RolePolicies() { public static final String[] S3_BUCKET_READ_OPERATIONS = new String[]{ S3_ALL_GET, - S3_BUCKET_ALL_LIST + S3_BUCKET_ALL_LIST, }; /** @@ -281,7 +300,7 @@ private RolePolicies() { S3_PUT_OBJECT, S3_PUT_OBJECT_ACL, S3_DELETE_OBJECT, - S3_ABORT_MULTIPART_UPLOAD + S3_ABORT_MULTIPART_UPLOAD, })); /** @@ -292,6 +311,13 @@ private RolePolicies() { S3_ALL_BUCKETS, S3_ALL_OPERATIONS); + /** + * S3 Express operations required for operation. + */ + public static final Statement STATEMENT_S3EXPRESS = statement(true, + S3EXPRESS_ALL_BUCKETS, + S3EXPRESS_ALL_OPERATIONS); + /** * The s3:GetBucketLocation permission is for all buckets, not for * any named bucket, which complicates permissions. @@ -310,8 +336,9 @@ private RolePolicies() { public static List allowS3Operations(String bucket, boolean write) { // add the bucket operations for the specific bucket ARN - ArrayList statements = + List statements = Lists.newArrayList( + STATEMENT_S3EXPRESS, statement(true, bucketToArn(bucket), S3_GET_BUCKET_LOCATION, S3_BUCKET_ALL_LIST)); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java index ba1dd400f6d7b..9ada0d565a342 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java @@ -64,7 +64,7 @@ public MagicCommitIntegration(S3AFileSystem owner, boolean magicCommitEnabled) { super(owner.createStoreContext()); this.owner = owner; - this.magicCommitEnabled = magicCommitEnabled; + this.magicCommitEnabled = magicCommitEnabled && owner.isMultipartUploadEnabled(); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java index b14f5f7bd2370..474d68c4a00dd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java @@ -18,12 +18,12 @@ package org.apache.hadoop.fs.s3a.impl; +import java.util.Locale; import java.util.Set; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ConfigurationHelper; @@ -34,6 +34,22 @@ */ public final class ChecksumSupport { + /** + * Special checksum algorithm to declare that no checksum + * is required: {@value}. + */ + public static final String NONE = "NONE"; + + /** + * CRC32C, mapped to CRC32_C algorithm class. + */ + public static final String CRC32C = "CRC32C"; + + /** + * CRC64NVME, mapped to CRC64_NVME algorithm class. + */ + public static final String CRC64NVME = "CRC64NVME"; + private ChecksumSupport() { } @@ -43,6 +59,7 @@ private ChecksumSupport() { private static final Set SUPPORTED_CHECKSUM_ALGORITHMS = ImmutableSet.of( ChecksumAlgorithm.CRC32, ChecksumAlgorithm.CRC32_C, + ChecksumAlgorithm.CRC64_NVME, ChecksumAlgorithm.SHA1, ChecksumAlgorithm.SHA256); @@ -58,14 +75,22 @@ public static ChecksumAlgorithm getChecksumAlgorithm(Configuration conf) { CHECKSUM_ALGORITHM, ChecksumAlgorithm.class, configValue -> { - if (StringUtils.isBlank(configValue)) { + // default values and handling algorithms names without underscores. + String val = configValue == null + ? NONE + : configValue.toUpperCase(Locale.ROOT); + switch (val) { + case "": + case NONE: return null; - } - if (ChecksumAlgorithm.CRC32_C.toString().equalsIgnoreCase(configValue)) { - // In case the configuration value is CRC32C, without underscore. + case CRC32C: return ChecksumAlgorithm.CRC32_C; + case CRC64NVME: + return ChecksumAlgorithm.CRC64_NVME; + default: + throw new IllegalArgumentException("Checksum algorithm is not supported: " + + configValue); } - throw new IllegalArgumentException("Checksum algorithm is not supported: " + configValue); }); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 12ec3fd019923..82f162481e020 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -311,4 +311,9 @@ private InternalConstants() { public static final String UPLOAD_PROGRESS_LOG_NAME = "org.apache.hadoop.fs.s3a.S3AFileSystem.Progress"; + /** + * AWS Error code for conditional put failure on s3 express buckets: {@value}. + */ + public static final String PRECONDITION_FAILED = "PreconditionFailed"; + } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md index 783a8a9e609e6..3645a652b64cb 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md @@ -144,6 +144,9 @@ the cost of a HEAD request, and caches it. Please note that some endpoint and region settings that require cross region access are complex and improving over time. Hence, they may be considered unstable. +*Important:* do not use `auto`, `ec2`, or `sdk` as these may be used +in the future for specific region-binding algorithms. + If you are working with third party stores, please check [third party stores in detail](third_party_stores.html). ### Network timeouts diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index 01ce813cbecf9..1d3415575cfd0 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1303,7 +1303,9 @@ Here are some the S3A properties for use in production. Indicates the algorithm used to create the checksum for the object to be uploaded to S3. Unset by default. It supports the following values: - 'CRC32', 'CRC32C', 'SHA1', and 'SHA256' + 'CRC32', 'CRC32C', 'SHA1', 'SHA256', "CRC64_NVME", "none" + The CRC64_NVME option requires aws-crt on the classpath, and is still + tangibly slower than CRC32C, which has its own instruction on x86 and ARM. @@ -1775,6 +1777,9 @@ The "fast" output stream 1. Uploads blocks in parallel in background threads. 1. Begins uploading blocks as soon as the buffered data exceeds this partition size. +1. Uses any checksum set in `fs.s3a.create.checksum.algorithm` to calculate an upload + checksum on data written; this is included in the file/part upload and verified + on the store. This can be a source of third-party store compatibility issues. 1. When buffering data to disk, uses the directory/directories listed in `fs.s3a.buffer.dir`. The size of data which can be buffered is limited to the available disk space. @@ -2049,16 +2054,7 @@ rate. The best practise for using this option is to disable multipart purges in normal use of S3A, enabling only in manual/scheduled housekeeping operations. -### S3A "fadvise" input policy support - -The S3A Filesystem client supports the notion of input policies, similar -to that of the Posix `fadvise()` API call. This tunes the behavior of the S3A -client to optimise HTTP GET requests for the different use cases. - -See [Improving data input performance through fadvise](./performance.html#fadvise) -for the details. - -##Metrics +## Metrics S3A metrics can be monitored through Hadoop's metrics2 framework. S3A creates its own metrics system called s3a-file-system, and each instance of the client @@ -2096,7 +2092,126 @@ also get recorded, for example the following: Note that low-level metrics from the AWS SDK itself are not currently included in these metrics. -## Other Topics + +## Checksums + +The S3 Client can use checksums in its requests to an S3 store in a number of ways: + +1. To provide a checksum of the request headers. +2. To provide a `Content-MD5` hash of the request headers +3. To provide a checksum of data being PUT/POSTed to the store. +4. To validate data downloaded from the store. + +The various options available can impact performance and compatibility. +To understand the risks and issues here know that: +* Request checksum generation (item 1) and validation (4) can be done "when required" or "always". + The "always" option is stricter, but can result in third-party compatibility issues. +* Some third-party stores require the `Content-MD5` header and will fail without it (item 2) +* Data upload checksums (item 3) can be computationally expensive and incompatible with third-party stores +* The most efficient data upload checksum is CRC32C; there are explicit opcodes for this in x86 and ARM CPUs, with the appropriate implementation circuitry. +* Data download validation checksums are also computationally expensive. + +| Option | Purpose | Values | Default | +|------------------------------------|------------------------------------------------|---------|----------| +| `fs.s3a.request.md5.header` | Enable MD5 header | boolean | `true` | +| `fs.s3a.checksum.generation` | Generate checksums on all requests | boolean | `false` | +| `fs.s3a.checksum.validation` | Validate checksums on download | boolean | `false` | +| `fs.s3a.create.checksum.algorithm` | Checksum Algorithm when creating/copying files | `NONE`, `CRC32`, `CRC32C`, `CRC32_C`, `CRC64NVME` , `CRC64_NVME`, `SHA256`, `SHA1` | `""` | + + +Turning on checksum generation and validation may seem like obvious actions, but consider +this: you are communicating with an S3 store over an HTTPS channels, which includes +cryptographically strong HMAC checksums of every block transmitted. +These are far more robust than the CRC* algorithms, and the computational cost is already +being paid for: so why add more? + +With TLS ensuring the network traffic isn't altered from the moment it is encrypted to when +it is decrypted, all extra checksum generation/validation does is ensure that there's no +accidental corruption between the data being generated and uploaded, or between being downloaded and read. + +This could potentially deal with memory/buffering/bus issues on the servers. +However this is what ECC RAM is for. +If you do suspect requests being corrupted during writing or reading, the options may +be worth considering. +As it is, they are off by default to avoid compatibility problems. + +Note: if you have a real example of where these checksum options have identified memory corruption, +please let us know. + +### Content-MD5 Header on requests: `fs.s3a.request.md5.header` + +Send a `Content-MD5 header` with every request? + +This header is required when interacting with some third-party stores. +It is supported by AWS S3, though has has some unexpected behavior with AWS S3 Express storage +[issue 6459](https://github.com/aws/aws-sdk-java-v2/issues/6459). +As that appears to have been fixed in the 2.35.4 SDK release, this option is enabled by default. + +### Request checksum generation: `fs.s3a.checksum.generation` + +Should checksums be generated for all requests made to the store? + +* Incompatible with some third-party stores. +* If `true` then multipart upload (i.e. large file upload) may fail if `fs.s3a.create.checksum.algorithm` + is not set to a valid algorithm (i.e. something other than `NONE`). + +Set `fs.s3a.checksum.generation` to `false` (the default value) to avoid these problems. + +### Checksum validation `fs.s3a.checksum.validation` + +Should the checksums of downloaded data be validated? + +This hurts performance and should be only used if considered important. + +### Creation checksum `fs.s3a.create.checksum.algorithm` + +This is the algorithm to use when checksumming data during file creation and copy. + +Options: `NONE`, `CRC32`, `CRC32C`, `CRC32_C`, `CRC64NVME` , `CRC64_NVME`, `SHA256`, `SHA1` + +The option `NONE` is new to Hadoop 3.4.3; previously an empty string was required for the same behavior. + +The `CRC64NVME`/`CRC64_NVME` option is also new to Hadoop 3.4.3 and requires the `aws-crt` module to be on the classpath, otherwise an error is printed: + +``` +java.lang.RuntimeException: Could not load software.amazon.awssdk.crt.checksums.CRC64NVME. +Add dependency on 'software.amazon.awssdk.crt:aws-crt' module to enable CRC64NVME feature. +``` + +Checksum/algorithm incompatibilities may surface as a failure in "Completing multipart upload"`. + +First as a failure reported as a "missing part". +``` +org.apache.hadoop.fs.s3a.AWSBadRequestException: Completing multipart upload id l8itQB. +5u7TcWqznqbGfTjHv06mxb4IlBNcZiDWrBAS0t1EMJGkr9J1QD2UAwDM5rLUZqypJfWCoPJtySxA3QK9QqKTBdKr3LXYjYJ_r9lRcGdzBRbnIJeI8tBr8yqtS on +test/testCommitEmptyFile/empty-commit.txt: +software.amazon.awssdk.services.s3.model.S3Exception: One or more of the specified parts could not be found. +The part may not have been uploaded, or the specified entity tag may not match the part's entity tag. +(Service: S3, Status Code: 400, Request ID: AQ0J4B66H626Y3FH, +Extended Request ID: g1zo25aQCZfqFh3vOzrzOBp9RjJEWmKImRcfWhvaeFHQ2hZo1xTm3GVMD03zN+d+cFB6oNeelNc=) +(SDK Attempt Count: 1):InvalidPart: One or more of the specified parts could not be found. +The part may not have been uploaded, or the specified entity tag may not match the part's entity tag. +(Service: S3, Status Code: 400, Request ID: AQ0J4B66H626Y3FH, Extended Request ID: +g1zo25aQCZfqFh3vOzrzOBp9RjJEWmKImRcfWhvaeFHQ2hZo1xTm3GVMD03zN+d+cFB6oNeelNc=) (SDK Attempt Count: 1) +``` + +Alternatively, as the failure of a multipart upload when a checksum algorithm is set and the part ordering is not in sequence. + +``` +org.apache.hadoop.fs.s3a.AWSStatus500Exception: + Completing multipart upload id A8rf256dBVbDtIVLr40KaMGKw9DY.rhgNP5zmn1ap97YjPaIO2Ac3XXL_T.2HCtIrGUpx5bdOTgvVeZzVHuoWI4pKv_MeMMVqBHJGP7u_q4PR8AxWvSq0Lsv724HT1fQ + on test/testMultipartUploadReverseOrderNonContiguousPartNumbers: +software.amazon.awssdk.services.s3.model.S3Exception: We encountered an internal error. +Please try again. +(Service: S3, Status Code: 500, Request ID: WTBY2FX76Q5F5YWB, +Extended Request ID: eWQWk8V8rmVmKImWVCI2rHyFS3XQSPgIkjfAyzzZCgVgyeRqox8mO8qO4ODMB6IUY0+rYqqsnOX2zXiQcRzFlb9p3nSkEEc+T0CYurLaH28=) +(SDK Attempt Count: 3) +``` + +This is only possible through the FileSystem multipart API; normal data writes including +those through the magic committer will not encounter it, + +## Other Topics ### Copying Data with distcp diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/reading.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/reading.md index fa4572bb165b1..6614ad439c7fb 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/reading.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/reading.md @@ -163,6 +163,15 @@ configured to use the vector IO API, it is likely to be significantly faster to use the classic stream and its parallel reads. +## S3A "fadvise" input policy support: `fs.s3a.experimental.input.fadvise` + +The S3A Filesystem client supports the notion of input policies, similar +to that of the Posix `fadvise()` API call. This tunes the behavior of the S3A +client to optimise HTTP GET requests for the different use cases. + +See [Improving data input performance through fadvise](./performance.html#fadvise) +for the details. + ## Developer Topics ### Stream IOStatistics diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md index 7222eee98baeb..21d6fc10fb4b2 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md @@ -628,6 +628,21 @@ on third party stores. test.fs.s3a.create.create.acl.enabled false + + test.fs.s3a.performance.enabled + false + + + + + fs.s3a.ext.test.multipart.commit.consumes.upload.id + true + ``` See [Third Party Stores](third_party_stores.html) for more on this topic. @@ -767,9 +782,35 @@ Tests in `ITestS3AContentEncoding` may need disabling false ``` + +### Disabling tests running in performance mode + +Some tests running in performance mode turn off the safety checks. They expect operations which break POSIX semantics to succeed. +For stores with stricter semantics, these test cases must be disabled. +```xml + + test.fs.s3a.performance.enabled + false + +``` + +### Changing expectations on multipart upload retries: `ITestS3AContractMultipartUploader` and `ITestUploadRecovery` + +If the store reports errors when trying to list/abort completed multipart uploads, +expect failures in `ITestUploadRecovery` and `ITestS3AContractMultipartUploader`. +The tests can be reconfigured to expect failure by setting the option +`fs.s3a.ext.test.multipart.commit.consumes.upload.id` to true. + +Note how this can be set as a per-bucket option. + +```xml + + fs.s3a.ext.test.multipart.commit.consumes.upload.id + true + +``` ### Tests which may fail (and which you can ignore) -* `ITestS3AContractMultipartUploader` tests `testMultipartUploadAbort` and `testSingleUpload` raising `FileNotFoundException` * `ITestS3AMiscOperations.testEmptyFileChecksums`: if the FS encrypts data always. ## Debugging Test failures @@ -868,10 +909,15 @@ Key features of `AbstractS3ATestBase` * `getFileSystem()` returns the S3A Filesystem bonded to the contract test Filesystem defined in `fs.s3a.contract.test` * will automatically skip all tests if that URL is unset. -* Extends `AbstractFSContractTestBase` and `Assert` for all their methods. +* Extends `AbstractFSContractTestBase` +* Uses AssertJ for all assertions, _not_ those of JUnit5. Having shared base classes may help reduce future maintenance too. Please -use them/ +use them. + +We adopted AssertJ assertions long before the move to JUnit5. +While there are still many tests with legacy JUnit 1.x assertions, all new test cases +should use AssertJ assertions and MUST NOT use JUnit5. ### Secure @@ -904,7 +950,7 @@ against other regions, or with third party S3 implementations. Thus the URL can be overridden for testing elsewhere. -### Works With Other S3 Stored +### Works With Other S3 Stores Don't assume AWS S3 US-East only, do allow for working with external S3 implementations. Those may be behind the latest S3 API features, not support encryption, session diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md index f6fea9338a424..0336efa677c0b 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md @@ -42,19 +42,9 @@ The features which may be unavailable include: * Bucket lifecycle rules to clean up pending uploads. * Support for multipart uploads. * Conditional file creation. (`fs.s3a.create.conditional.enabled = false`) +* Variations in checksum calculation on uploads. +* Requirement for Content-MD5 headers. -### Disabling Change Detection - -The (default) etag-based change detection logic expects stores to provide an Etag header in HEAD/GET requests, -and to support it as a precondition in subsequent GET and COPY calls. -If a store does not do this, disable the checks. - -```xml - - fs.s3a.change.detection.mode - none - -``` ## Connecting to a third party object store over HTTPS The core setting for a third party store is to change the endpoint in `fs.s3a.endpoint`. @@ -65,10 +55,11 @@ path style access must also be enabled in `fs.s3a.path.style.access`. The v4 signing algorithm requires a region to be set in `fs.s3a.endpoint.region`. A non-empty value is generally sufficient, though some deployments may require -a specific value. +a specific value. -*Important:* do not use `auto` or `sdk` as these may be used -in the future for specific region binding algorithms. +*Important:* do not use `auto`, `ec2`, or `sdk` as these may be used +in the future for specific region binding algorithms; while `null` +can be mis-interpreted. Finally, assuming the credential source is the normal access/secret key then these must be set, either in XML or (preferred) in a JCEKS file. @@ -87,7 +78,7 @@ then these must be set, either in XML or (preferred) in a JCEKS file. fs.s3a.endpoint.region - anything + anything except: sdk, auto, ec2 @@ -104,7 +95,14 @@ then these must be set, either in XML or (preferred) in a JCEKS file. If per-bucket settings are used here, then third-party stores and credentials may be used alongside an AWS store. +### region naming + +AWS SDK requires the name of a region is supplied for signing, and that region match the endpoint used. +Third-party stores don't normally care about the name of a region, *only that a region is supplied*. + +You should set `fs.s3a.endpoint.region` to anything except the following reserved names: `sdk`, `ec2` and `auto`. +We have plans for those. ## Other issues @@ -120,7 +118,7 @@ This can be addressed in two ways #### S3Guard uploads command -This can be executed on a schedule, or manually +This can be executed on a schedule, or manually: ``` hadoop s3guard uploads -abort -force s3a://bucket/ @@ -174,10 +172,78 @@ false to disable use of these features. ``` +## Controlling Upload Checksums and MD5 Headers + +It may be necessary to change checksums of uploads by +1. Enabling the attachment of a `Content-MD5 header` in requests +2. Restricting checksum generation to only when required. + +```xml + + fs.s3a.request.md5.header + true + Enable calculation and inclusion of an MD5 HEADER on data upload operations + + + + fs.s3a.checksum.generation + false + Calculate and attach a message checksum on every operation. + + + + fs.s3a.checksum.validation + false + Validate data checksums on download + +``` + +These options are set for best compatibility and performance by default; they may need tuning for specific stores. + +See [checksums](index.html#checksums) for more details. + +### Disabling Change Detection + +The (default) etag-based change detection logic expects stores to provide an Etag header in HEAD/GET requests, +and to support it as a precondition in subsequent GET and COPY calls. +If a store does not do this, disable the checks. + +```xml + + fs.s3a.change.detection.mode + none + +``` + +## Handling Null Etags + +Some object stores do not support etags, that is: they return `null` or an empty string as the etag of an object on both HEAD and GET requests. + +This breaks version management in the classic input stream *and* metadata caching in the analytics stream. + +To work with such a store: +* Set `fs.s3a.input.stream.type` to `classic` +* Set `fs.s3a.change.detection.mode` to `none` + +```xml + + fs.s3a.input.stream.type + classic + + + + fs.s3a.change.detection.mode + none + +``` + +Note: the [cloudstore](https://github.com/steveloughran/cloudstore) `etag` command will retrieve and print an object's etag, +and can be used to help debug this situation. +The etag value of a newly created object SHOULD be a non-empty string. # Troubleshooting -The most common problem when talking to third-party stores are +The most common problem when talking to third-party stores are: 1. The S3A client is still configured to talk to the AWS S3 endpoint. This leads to authentication failures and/or reports that the bucket is unknown. 2. Path access has not been enabled, the client is generating a host name for the target bucket and it does not exist. @@ -185,11 +251,12 @@ The most common problem when talking to third-party stores are 4. JVM HTTPS settings include the certificates needed to negotiate a TLS connection with the store. -## How to improve troubleshooting +## How to Troubleshoot problems + +### Log More Network Info -### log more network info +There are some very low level logs which can be printed. -There are some very low level logs. ```properties # Log all HTTP requests made; includes S3 interaction. This may # include sensitive information such as account IDs in HTTP headers. @@ -203,7 +270,7 @@ log4j.logger.io.netty.handler.logging=DEBUG log4j.logger.io.netty.handler.codec.http2.Http2FrameLogger=DEBUG ``` -### Cut back on retries, shorten timeouts +### Reduce on Retries; Shorten Timeouts By default, there's a lot of retries going on in the AWS connector (which even retries on DNS failures) and in the S3A code which invokes it. @@ -263,7 +330,7 @@ the AWS SDK itself still makes a limited attempt to retry. There's an external utility, [cloudstore](https://github.com/steveloughran/cloudstore) whose [storediag](https://github.com/steveloughran/cloudstore#command-storediag) exists to debug the connection settings to hadoop cloud storage. ```bash -hadoop jar cloudstore-1.0.jar storediag s3a://nonexistent-bucket-example/ +hadoop jar cloudstore-1.1.jar storediag s3a://nonexistent-bucket-example/ ``` The main reason it's not an ASF release is that it allows for a rapid release cycle, sometimes hours; if anyone doesn't trust @@ -414,7 +481,43 @@ Fix: path style access ``` -# Connecting to Google Cloud Storage through the S3A connector +# Settings for Specific Stores + +## Dell ECS through the S3A Connector + +As of November 2025 and the 2.35.4 AWS SDK, the settings needed to interact with Dell ECS +at [ECS Test Drive](https://portal.ecstestdrive.com/) were + +```xml + + fs.s3a.endpoint.region + dell + arbitrary name other than sdk, ec2, auto or null + + + + fs.s3a.path.style.access + true + + + + fs.s3a.create.conditional.enabled + false + + + + fs.s3a.bucket.request.md5.header + true + Enable calculation and inclusion of an MD5 HEADER on data upload operations + + + + fs.s3a.checksum.generation + false + +``` + +## Google Cloud Storage through the S3A connector It *is* possible to connect to google cloud storage through the S3A connector. However, Google provide their own [Cloud Storage connector](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). @@ -443,63 +546,68 @@ this makes renaming and deleting significantly slower. - fs.s3a.bucket.gcs-container.access.key + fs.s3a.access.key GOOG1EZ.... - fs.s3a.bucket.gcs-container.secret.key + fs.s3a.secret.key SECRETS - fs.s3a.bucket.gcs-container.endpoint + fs.s3a.endpoint https://storage.googleapis.com + + - fs.s3a.bucket.gcs-container.bucket.probe - 0 + fs.s3a.endpoint.region + gcs - fs.s3a.bucket.gcs-container.list.version - 1 + fs.s3a.path.style.access + true - fs.s3a.bucket.gcs-container.multiobjectdelete.enable + fs.s3a.checksum.generation false + Calculate and attach a message checksum on every operation. (default: true) - fs.s3a.bucket.gcs-container.path.style.access - true + fs.s3a.bucket.probe + 0 - - fs.s3a.bucket.gcs-container.endpoint.region - gcs + fs.s3a.list.version + 1 - - fs.s3a.multipart.uploads.enabled + fs.s3a.multiobjectdelete.enable false - + + + fs.s3a.committer.magic.enabled + false + + fs.s3a.optimized.copy.from.local.enabled false - + fs.s3a.create.conditional.enabled false - ``` @@ -531,3 +639,4 @@ It is also a way to regression test foundational S3A third-party store compatibi _Note_ If anyone is set up to test this regularly, please let the hadoop developer team know if regressions do surface, as it is not a common test configuration. +We do use it to help test compatibility during SDK updates. diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md index 151ee5bd8a465..d20e2d98c3f9b 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md @@ -243,7 +243,7 @@ A credential provider listed in `fs.s3a.aws.credentials.provider` does not imple the interface `software.amazon.awssdk.auth.credentials.AwsCredentialsProvider`. ``` -InstantiationIOException: `s3a://stevel-gcs/': Class org.apache.hadoop.fs.s3a.S3ARetryPolicy does not implement +InstantiationIOException: `s3a://gcs/': Class org.apache.hadoop.fs.s3a.S3ARetryPolicy does not implement software.amazon.awssdk.auth.credentials.AwsCredentialsProvider (configuration key fs.s3a.aws.credentials.provider) at org.apache.hadoop.fs.s3a.impl.InstantiationIOException.isNotInstanceOf(InstantiationIOException.java:128) at org.apache.hadoop.fs.s3a.S3AUtils.getInstanceFromReflection(S3AUtils.java:604) @@ -354,7 +354,7 @@ org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part #1 upload ID 112233 This is an obscure failure which was encountered as part of [HADOOP-19221](https://issues.apache.org/jira/browse/HADOOP-19221) : an upload of part of a file could not -be succesfully retried after a failure was reported on the first attempt. +be successfully retried after a failure was reported on the first attempt. 1. It was only encountered during uploading files via the Staging Committers 2. And is a regression in the V2 SDK. @@ -364,7 +364,7 @@ be succesfully retried after a failure was reported on the first attempt. * If it is encountered on a release without the fix, please upgrade. It may be that the problem arises in the AWS SDK's "TransferManager", which is used for a -higher performance upload of data from the local fileystem. If this is the case. disable this feature: +higher performance upload of data from the local filesystem. If this is the case. disable this feature: ``` fs.s3a.optimized.copy.from.local.enabled @@ -409,6 +409,48 @@ affect the performance. ``` +### Status Code 400 "XAmzContentSHA256Mismatch: The Content-SHA256 you specified did not match what we receive" + +Seen when working with a third-party store + +``` +org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object on test: +software.amazon.awssdk.services.s3.model.S3Exception: +The Content-SHA256 you specified did not match what we received +(Service: S3, Status Code: 400, Request ID: 0c07c87d:196d43d824a:d7bca:eeb, Extended Request ID: 2af53adb49ffb141a32b534ad7ffbdf33a247f6b95b422011e0b109649d1fab7) (SDK Attempt Count: 1): +XAmzContentSHA256Mismatch: The Content-SHA256 you specified did not match what we received +``` + +This happens when a file create checksum has been enabled but the store does not support it/support it consistently with AWS S3. + +```xml + + fs.s3a.create.checksum.algorithm + none + +``` + +### Status Code 400 "x-amz-sdk-checksum-algorithm specified, but no corresponding x-amz-checksum-* or x-amz-trailer headers were found" + +``` +org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object on test +software.amazon.awssdk.services.s3.model.InvalidRequestException +x-amz-sdk-checksum-algorithm specified, but no corresponding x-amz-checksum-* or x-amz-trailer headers were found. + (Service: S3, Status Code: 400, Request ID: 012929bd17000198c8bc82d20509eecd6df79b1a, Extended Request ID: P9bq0Iv) (SDK Attempt Count: 1): +``` + +The checksum algorithm to be used is not one supported by the store. +In particular, the value `unknown_to_sdk_version` appears to cause it. + +```xml + + fs.s3a.create.checksum.algorithm + unknown_to_sdk_version + +``` + +Fix: use a checksum the store knows about. + ## Access Denied HTTP error codes 401 and 403 are mapped to `AccessDeniedException` in the S3A connector. @@ -436,6 +478,9 @@ java.nio.file.AccessDeniedException: bucket: doesBucketExist on bucket: ``` +If working with a third-party bucket, verify the `fs.s3a.endpoint` setting +points to the third-party store. + ### `AccessDeniedException` All access to this object has been disabled Caller has no permission to access the bucket at all. @@ -560,13 +605,80 @@ Glacier. If you want to access the file with S3A after writes, do not set `fs.s3a.create.storage.class` to `glacier` or `deep_archive`. +### `AccessDeniedException` with `SignatureDoesNotMatch` on a third party bucket. + +This can surface when trying to interact, especially write data, to a third-party bucket + +``` + Writing Object on example-file: software.amazon.awssdk.services.s3.model.S3Exception: Invalid argument. (Service: S3, Status Code: 403, Request ID: null) (SDK Attempt Count: 1):SignatureDoesNotMatch +``` + +The store does not recognize checksum calculation on every operation. +Fix: disable it by setting `fs.s3a.checksum.generation` to `false`. + +```xml + + fs.s3a.checksum.generation + false + Calculate and attach a message checksum on every operation. (default: false) + +``` + +Full stack + +``` +> bin/hadoop fs -touchz s3a://gcs/example-file +2025-10-21 16:23:27,642 [main] WARN s3a.S3ABlockOutputStream (S3ABlockOutputStream.java:progressChanged(1335)) - Transfer failure of block FileBlock{index=1, destFile=/tmp/hadoop-stevel/s3a/s3ablock-0001-1358390699869033998.tmp, state=Upload, dataSize=0, limit=-1} +2025-10-21 16:23:27,645 [main] DEBUG shell.Command (Command.java:displayError(481)) - touchz failure +java.nio.file.AccessDeniedException: example-file: Writing Object on example-file: software.amazon.awssdk.services.s3.model.S3Exception: Invalid argument. (Service: S3, Status Code: 403, Request ID: null) (SDK Attempt Count: 1):SignatureDoesNotMatch + at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:271) + at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:124) + at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:376) + at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468) + at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:372) + at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:347) + at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:210) + at org.apache.hadoop.fs.s3a.WriteOperationHelper.putObject(WriteOperationHelper.java:534) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.putObject(S3ABlockOutputStream.java:726) + at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:518) + at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:77) + at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) + at org.apache.hadoop.fs.shell.TouchCommands$Touchz.touchz(TouchCommands.java:89) + at org.apache.hadoop.fs.shell.TouchCommands$Touchz.processNonexistentPath(TouchCommands.java:85) + at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:303) + at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:285) + at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:121) + at org.apache.hadoop.fs.shell.Command.run(Command.java:192) + at org.apache.hadoop.fs.FsShell.run(FsShell.java:327) + at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:82) + at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:97) + at org.apache.hadoop.fs.FsShell.main(FsShell.java:390) +Caused by: software.amazon.awssdk.services.s3.model.S3Exception: Invalid argument. (Service: S3, Status Code: 403, Request ID: null) (SDK Attempt Count: 1) + at software.amazon.awssdk.services.s3.model.S3Exception$BuilderImpl.build(S3Exception.java:113) + at software.amazon.awssdk.services.s3.model.S3Exception$BuilderImpl.build(S3Exception.java:61) +... + at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53) + at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:11883) + at software.amazon.awssdk.services.s3.DelegatingS3Client.lambda$putObject$89(DelegatingS3Client.java:9716) + at software.amazon.awssdk.services.s3.internal.crossregion.S3CrossRegionSyncClient.invokeOperation(S3CrossRegionSyncClient.java:67) + at software.amazon.awssdk.services.s3.DelegatingS3Client.putObject(DelegatingS3Client.java:9716) + at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$14(S3AFileSystem.java:3332) + at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:650) + at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:3330) + at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$putObject$7(WriteOperationHelper.java:535) + at org.apache.hadoop.fs.store.audit.AuditingFunctions.lambda$withinAuditSpan$0(AuditingFunctions.java:62) + at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122) + ... 20 more + touchz: example-file: Writing Object on example-file: software.amazon.awssdk.services.s3.model.S3Exception: Invalid argument. (Service: S3, Status Code: 403, Request ID: null) (SDK Attempt Count: 1):SignatureDoesNotMatch +``` + ### "Unable to find a region via the region provider chain." when using session credentials. Region must be provided when requesting session credentials, or an exception will be thrown with the message: ``` - Unable to find a region via the region provider +Unable to find a region via the region provider chain. Must provide an explicit region in the builder or setup environment to supply a region. ``` @@ -1241,6 +1353,21 @@ When working with S3 Express store buckets (unlike standard S3 buckets), follow 2. This setting ensures that all pending MPUs are aborted before the directory object is deleted, which is a requirement specific to S3 Express store buckets. +## Status Code: 200 + "PreconditionFailed: At least one of the pre-conditions you specified did not hold" + +``` +software.amazon.awssdk.services.s3.model.S3Exception: At least one of the pre-conditions you specified did not hold +(Service: S3, Status Code: 200, Request ID: 01a396cff3000198cc0439e40509a95e33467bdc, Extended Request ID: TZrsG8pBzlmXoV) (SDK Attempt Count: 1): +PreconditionFailed: At least one of the pre-conditions you specified did not hold +``` + +An attempt to write to S3Express bucket using conditional overwrite failed because another process was writing at the same time. + +Conditional overwrite during file creation is used when conditional creation has been enabled (`fs.s3a.create.conditional.enabled`). +This is true by default. + +* A file is created using the `createFile()` API with the option `fs.option.create.conditional.overwrite` set to true. +* File create performance has been enabled with (`fs.s3a.performance.flags` including `create` or being `*`) ### Application hangs after reading a number of files @@ -1333,6 +1460,39 @@ connections more frequently. Something has been trying to write data to "/". +### "Unable to create OutputStream with the given multipart upload and buffer configuration." + +This error is raised when an attemt it made to write to a store with +`fs.s3a.multipart.uploads.enabled` set to `false` and `fs.s3a.fast.upload.buffer` set to array. + +This is pre-emptively disabled before a write of so much data takes place that the process runs out of heap space. + +If the store doesn't support multipart uploads, _use disk for buffering_. +Nothing else is safe to use as it leads to a state where small jobs work, but those which generate large amounts of data fail. + +```xml + + fs.s3a.fast.upload.buffer + disk + +``` + +``` +org.apache.hadoop.fs.PathIOException: `s3a://gcs/a2a8c3e4-5788-40c0-ad66-fe3fe63f4507': Unable to create OutputStream with the given multipart upload and buffer configuration. + at org.apache.hadoop.fs.s3a.S3AUtils.validateOutputStreamConfiguration(S3AUtils.java:985) + at org.apache.hadoop.fs.s3a.S3AFileSystem.innerCreateFile(S3AFileSystem.java:2201) + at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$create$5(S3AFileSystem.java:2068) + at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:546) + at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:527) + at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:448) + at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2881) + at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2900) + at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:2067) + at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1233) + at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1210) + at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1091) +``` + ## Best Practises ### Enabling low-level logging diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMkdirWithCreatePerf.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMkdirWithCreatePerf.java index 4b2468de97bb8..b7c3ebae829e5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMkdirWithCreatePerf.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMkdirWithCreatePerf.java @@ -29,6 +29,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; /** @@ -38,9 +39,9 @@ public class ITestS3AContractMkdirWithCreatePerf extends AbstractContractMkdirTe @Override protected Configuration createConfiguration() { - return setPerformanceFlags( - super.createConfiguration(), - "create,mkdir"); + final Configuration conf = super.createConfiguration(); + disableFilesystemCaching(conf); + return setPerformanceFlags(conf, "create,mkdir"); } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java index 9a945ad0ee710..27c8732b0c7e2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java @@ -18,21 +18,32 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.FileNotFoundException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.impl.ChecksumSupport; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM; +import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_GENERATION; +import static org.apache.hadoop.fs.s3a.S3ATestConstants.DEFAULT_MULTIPART_COMMIT_CONSUMES_UPLOAD_ID; import static org.apache.hadoop.fs.s3a.S3ATestConstants.DEFAULT_SCALE_TESTS_ENABLED; import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_HUGE_PARTITION_SIZE; import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_SCALE_TESTS_ENABLED; +import static org.apache.hadoop.fs.s3a.S3ATestConstants.MULTIPART_COMMIT_CONSUMES_UPLOAD_ID; import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeNotS3ExpressFileSystem; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; +import static org.apache.hadoop.fs.s3a.impl.ChecksumSupport.getChecksumAlgorithm; import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE; /** @@ -47,6 +58,8 @@ public class ITestS3AContractMultipartUploader extends private int partitionSize; + private boolean mpuCommitConsumesUploadId; + /** * S3 requires a minimum part size of 5MB (except the last part). * @return 5MB+ value @@ -88,7 +101,18 @@ protected boolean supportsConcurrentUploadsToSamePath() { @Override protected boolean finalizeConsumesUploadIdImmediately() { - return false; + return mpuCommitConsumesUploadId; + } + + @Override + protected Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + // use whatever the default checksum generation option is. + removeBaseAndBucketOverrides(conf, CHECKSUM_GENERATION, CHECKSUM_ALGORITHM); + conf.setBoolean(CHECKSUM_GENERATION, false); + conf.set(CHECKSUM_ALGORITHM, ChecksumSupport.NONE); + disableFilesystemCaching(conf); + return conf; } @Override @@ -102,9 +126,16 @@ public void setup() throws Exception { assume("Scale test disabled: to enable set property " + KEY_SCALE_TESTS_ENABLED, enabled); + final Configuration fsConf = getFileSystem().getConf(); + assumeMultipartUploads(fsConf); partitionSize = (int) getTestPropertyBytes(conf, KEY_HUGE_PARTITION_SIZE, DEFAULT_HUGE_PARTITION_SIZE); + mpuCommitConsumesUploadId = fsConf.getBoolean( + MULTIPART_COMMIT_CONSUMES_UPLOAD_ID, + DEFAULT_MULTIPART_COMMIT_CONSUMES_UPLOAD_ID); + LOG.info("{} = {}", MULTIPART_COMMIT_CONSUMES_UPLOAD_ID, mpuCommitConsumesUploadId); + LOG.info("{} = {}", CHECKSUM_ALGORITHM, getChecksumAlgorithm(fsConf)); } /** @@ -122,6 +153,7 @@ public void testMultipartUploadReverseOrder() throws Exception { @Override public void testMultipartUploadReverseOrderNonContiguousPartNumbers() throws Exception { assumeNotS3ExpressFileSystem(getFileSystem()); + final Configuration fsConf = getFileSystem().getConf(); super.testMultipartUploadReverseOrderNonContiguousPartNumbers(); } @@ -136,4 +168,16 @@ public void testConcurrentUploads() throws Throwable { "Analytics Accelerator currently does not support reading of over written files"); super.testConcurrentUploads(); } + + @Override + public void testMultipartUploadAbort() throws Exception { + try { + super.testMultipartUploadAbort(); + } catch (FileNotFoundException e) { + LOG.info("Multipart upload not found in abort()." + + " This is common on third-party stores: {}", + e.toString()); + LOG.debug("Exception: ", e); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index 76afa6faaeb13..209e78b822b6f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -164,26 +164,33 @@ public void testMultiRowGroupParquet() throws Throwable { FileStatus fileStatus = getFileSystem().getFileStatus(dest); - byte[] buffer = new byte[3000]; + final int size = 3000; + byte[] buffer = new byte[size]; + int readLimit = Math.min(size, (int) fileStatus.getLen()); IOStatistics ioStats; + final IOStatistics fsIostats = getFileSystem().getIOStatistics(); + final long initialAuditCount = fsIostats.counters() + .getOrDefault(AUDIT_REQUEST_EXECUTION, 0L); + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { ioStats = inputStream.getIOStatistics(); - inputStream.readFully(buffer, 0, (int) fileStatus.getLen()); + inputStream.readFully(buffer, 0, readLimit); } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); try (FSDataInputStream inputStream = getFileSystem().openFile(dest) + .withFileStatus(fileStatus) .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET) .build().get()) { ioStats = inputStream.getIOStatistics(); - inputStream.readFully(buffer, 0, (int) fileStatus.getLen()); + inputStream.readFully(buffer, 0, readLimit); } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); - verifyStatisticCounterValue(getFileSystem().getIOStatistics(), AUDIT_REQUEST_EXECUTION, 4); + verifyStatisticCounterValue(fsIostats, AUDIT_REQUEST_EXECUTION, initialAuditCount + 2); } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java index b0e15adacd886..e0bd21b373ad3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java @@ -36,6 +36,7 @@ import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM; import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertCompleteAbort; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertNoopAbort; @@ -66,6 +67,15 @@ protected Configuration createConfiguration() { return conf; } + @Override + public void setup() throws Exception { + super.setup(); + + skipIfNotEnabled(getFileSystem().getConf(), + MULTIPART_UPLOADS_ENABLED, + "Store has disabled multipart uploads; skipping tests"); + } + protected String getBlockOutputBufferName() { return FAST_UPLOAD_BUFFER_ARRAY; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java index ce6d8a7e1ef6f..9997c42e144ed 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABucketExistence.java @@ -43,7 +43,11 @@ import static org.apache.hadoop.fs.s3a.Constants.FS_S3A; import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS; import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions; +import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.isAwsEndpoint; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -58,6 +62,15 @@ public class ITestS3ABucketExistence extends AbstractS3ATestBase { private final URI uri = URI.create(FS_S3A + "://" + randomBucket + "/"); + @Override + protected Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + String endpoint = propagateBucketOptions(conf, getTestBucketName(conf)).get(ENDPOINT, ""); + assume("Skipping existence probes", + isAwsEndpoint(endpoint)); + return conf; + } + @SuppressWarnings("deprecation") @Test public void testNoBucketProbing() throws Exception { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java index f477f46ceb6c8..40a2ee8d6cc9b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java @@ -19,9 +19,13 @@ package org.apache.hadoop.fs.s3a; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import org.assertj.core.api.Assertions; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; import software.amazon.awssdk.services.s3.model.ChecksumMode; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; @@ -31,8 +35,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.impl.ChecksumSupport; -import static org.apache.hadoop.fs.contract.ContractTestUtils.rm; +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM; +import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_GENERATION; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CHECKSUM_GENERATION; +import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS; /** @@ -40,31 +50,58 @@ * If CHECKSUM_ALGORITHM config is not set in auth-keys.xml, * SHA256 algorithm will be picked. */ +@RunWith(Parameterized.class) public class ITestS3AChecksum extends AbstractS3ATestBase { - private static final ChecksumAlgorithm DEFAULT_CHECKSUM_ALGORITHM = ChecksumAlgorithm.SHA256; + public static final String UNKNOWN = "UNKNOWN_TO_SDK_VERSION"; private ChecksumAlgorithm checksumAlgorithm; + /** + * Parameterization. + */ + @Parameterized.Parameters(name = "checksum={0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {"SHA256"}, + {"CRC32C"}, + {"SHA1"}, + {UNKNOWN}, + }); + } + private static final int[] SIZES = { - 1, 2, 3, 4, 5, 254, 255, 256, 257, 2 ^ 12 - 1 + 5, 255, 256, 257, 2 ^ 12 - 1 }; + private final String algorithmName; + + public ITestS3AChecksum(final String algorithmName) { + this.algorithmName = algorithmName; + } + @Override protected Configuration createConfiguration() { final Configuration conf = super.createConfiguration(); + // get the base checksum algorithm, if set it will be left alone. + final String al = conf.getTrimmed(CHECKSUM_ALGORITHM, ""); + if (!UNKNOWN.equals(algorithmName) && + (ChecksumSupport.NONE.equalsIgnoreCase(al) || UNKNOWN.equalsIgnoreCase(al))) { + skip("Skipping checksum algorithm tests"); + } S3ATestUtils.removeBaseAndBucketOverrides(conf, CHECKSUM_ALGORITHM, + CHECKSUM_VALIDATION, REJECT_OUT_OF_SPAN_OPERATIONS); S3ATestUtils.disableFilesystemCaching(conf); - checksumAlgorithm = ChecksumSupport.getChecksumAlgorithm(conf); - if (checksumAlgorithm == null) { - checksumAlgorithm = DEFAULT_CHECKSUM_ALGORITHM; - LOG.info("No checksum algorithm found in configuration, will use default {}", - checksumAlgorithm); - conf.set(CHECKSUM_ALGORITHM, checksumAlgorithm.toString()); - } + conf.set(CHECKSUM_ALGORITHM, algorithmName); + conf.setBoolean(CHECKSUM_VALIDATION, true); conf.setBoolean(REJECT_OUT_OF_SPAN_OPERATIONS, false); + checksumAlgorithm = ChecksumSupport.getChecksumAlgorithm(conf); + LOG.info("Using checksum algorithm {}/{}", algorithmName, checksumAlgorithm); + assume("Skipping checksum tests as " + CHECKSUM_GENERATION + " is set", + propagateBucketOptions(conf, getTestBucketName(conf)) + .getBoolean(CHECKSUM_GENERATION, DEFAULT_CHECKSUM_GENERATION)); return conf; } @@ -77,14 +114,15 @@ public void testChecksum() throws IOException { private void validateChecksumForFilesize(int len) throws IOException { describe("Create a file of size " + len); - String src = String.format("%s-%04x", methodName.getMethodName(), len); - Path path = writeThenReadFile(src, len); + final Path path = methodPath(); + writeThenReadFile(path, len); assertChecksum(path); - rm(getFileSystem(), path, false, false); } private void assertChecksum(Path path) throws IOException { final String key = getFileSystem().pathToKey(path); + // issue a head request and include asking for the checksum details. + // such a query may require extra IAM permissions. HeadObjectRequest.Builder requestBuilder = getFileSystem().getRequestFactory() .newHeadObjectRequestBuilder(key) .checksumMode(ChecksumMode.ENABLED); @@ -101,6 +139,9 @@ private void assertChecksum(Path path) throws IOException { Assertions.assertThat(headObject.checksumCRC32C()) .describedAs("headObject.checksumCRC32C()") .isNotNull(); + Assertions.assertThat(headObject.checksumSHA256()) + .describedAs("headObject.checksumSHA256()") + .isNull(); break; case SHA1: Assertions.assertThat(headObject.checksumSHA1()) @@ -112,6 +153,14 @@ private void assertChecksum(Path path) throws IOException { .describedAs("headObject.checksumSHA256()") .isNotNull(); break; + case UNKNOWN_TO_SDK_VERSION: + // expect values to be null + // this is brittle with different stores; crc32 assertions have been cut + // because S3 express always set them. + Assertions.assertThat(headObject.checksumSHA256()) + .describedAs("headObject.checksumSHA256()") + .isNull(); + break; default: fail("Checksum algorithm not supported: " + checksumAlgorithm); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java index 62f2ffbc0df5d..ab6181741ed68 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java @@ -55,6 +55,7 @@ import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.ERROR_ENDPOINT_WITH_FIPS; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeNotS3ExpressFileSystem; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.DEFAULT_REQUESTER_PAYS_BUCKET_NAME; @@ -360,7 +361,7 @@ public void testCentralEndpointAndDifferentRegionThanBucket() throws Throwable { public void testWithOutCrossRegionAccess() throws Exception { describe("Verify cross region access fails when disabled"); // skip the test if the region is sa-east-1 - skipCrossRegionTest(); + assumeCrossRegionTestSupported(); final Configuration newConf = new Configuration(getConfiguration()); removeBaseAndBucketOverrides(newConf, ENDPOINT, @@ -381,7 +382,7 @@ public void testWithOutCrossRegionAccess() throws Exception { public void testWithCrossRegionAccess() throws Exception { describe("Verify cross region access succeed when enabled"); // skip the test if the region is sa-east-1 - skipCrossRegionTest(); + assumeCrossRegionTestSupported(); final Configuration newConf = new Configuration(getConfiguration()); removeBaseAndBucketOverrides(newConf, ENDPOINT, @@ -482,7 +483,7 @@ public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable { describe("Access the test bucket using central endpoint and" + " null region, perform file system CRUD operations"); final Configuration conf = getConfiguration(); - assumeStoreAwsHosted(getFileSystem()); + assumeCrossRegionTestSupported(); final Configuration newConf = new Configuration(conf); @@ -505,7 +506,7 @@ public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable { public void testCentralEndpointAndNullRegionFipsWithCRUD() throws Throwable { describe("Access the test bucket using central endpoint and" + " null region and fips enabled, perform file system CRUD operations"); - assumeStoreAwsHosted(getFileSystem()); + assumeCrossRegionTestSupported(); final String bucketLocation = getFileSystem().getBucketLocation(); assume("FIPS can be enabled to access buckets from US or Canada endpoints only", @@ -513,17 +514,19 @@ public void testCentralEndpointAndNullRegionFipsWithCRUD() throws Throwable { || bucketLocation.startsWith(CA_REGION_PREFIX) || bucketLocation.startsWith(US_DUAL_STACK_PREFIX)); - final Configuration conf = getConfiguration(); + final Configuration conf = getFileSystem().getConf(); final Configuration newConf = new Configuration(conf); removeBaseAndBucketOverrides( newConf, ENDPOINT, AWS_REGION, - FIPS_ENDPOINT); + FIPS_ENDPOINT, + PATH_STYLE_ACCESS); newConf.set(ENDPOINT, CENTRAL_ENDPOINT); newConf.setBoolean(FIPS_ENDPOINT, true); + newConf.setBoolean(PATH_STYLE_ACCESS, false); newFS = new S3AFileSystem(); newFS.initialize(getFileSystem().getUri(), newConf); @@ -532,10 +535,18 @@ public void testCentralEndpointAndNullRegionFipsWithCRUD() throws Throwable { } /** - * Skip the test if the region is null or sa-east-1. + * Skip the test if the region is null, sa-east-1, or otherwise + * not compatible with the test. */ - private void skipCrossRegionTest() throws IOException { - String region = getFileSystem().getS3AInternals().getBucketMetadata().bucketRegion(); + private void assumeCrossRegionTestSupported() throws IOException { + final S3AFileSystem fs = getFileSystem(); + + // not S3 as the store URLs may not resolve. + assumeNotS3ExpressFileSystem(fs); + // aws hosted. + assumeStoreAwsHosted(fs); + + String region = fs.getS3AInternals().getBucketMetadata().bucketRegion(); if (region == null || SA_EAST_1.equals(region)) { skip("Skipping test since region is null or it is set to sa-east-1"); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java index e0559b7c49edc..210d548df7b94 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.Set; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.util.functional.RemoteIterators.foreach; /** @@ -54,6 +55,12 @@ protected Configuration createConfiguration() { return conf; } + @Override + public void setup() throws Exception { + super.setup(); + assumeMultipartUploads(getFileSystem().getConf()); + } + /** * Main test case for upload part listing and iterator paging. * @throws Exception on failure. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index f64c59573484e..c6af3a4cabae0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -130,8 +130,6 @@ public void testReadLargeFileFully() throws Throwable { } // Verify that once stream is closed, all memory is freed verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0); - assertThatStatisticMaximum(ioStats, - ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0); } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java index 9ab1768b2aba1..46610429f2643 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java @@ -285,4 +285,22 @@ public interface S3ATestConstants { * Default policy on root tests: {@value}. */ boolean DEFAULT_ROOT_TESTS_ENABLED = true; + + /** + * Flag to set when testing third party stores: {@value}. + *

+ * Set to true when a completed MPU commit consumes the ID so it is no + * longer visible in list operations; and abort reports {@code NoSuchUploadException}. + *

+ * This will change assertions in relevant tests. + *

+ * Can be set as a per-bucket setting; test runner will pick this up. + */ + String MULTIPART_COMMIT_CONSUMES_UPLOAD_ID = + "fs.s3a.ext.test.multipart.commit.consumes.upload.id"; + + /** + * Default value of {@link #MULTIPART_COMMIT_CONSUMES_UPLOAD_ID}: {@value}. + */ + boolean DEFAULT_MULTIPART_COMMIT_CONSUMES_UPLOAD_ID = false; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index d74c84463252e..02a7419fa5a1b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -528,6 +528,32 @@ public static void skipIfNotEnabled(final Configuration configuration, } } + /** + * Skip a test suite/case if a configuration option is true. + * @param configuration configuration to probe + * @param key key to resolve + * @param defVal default value. + * @param message assertion text + */ + public static void skipIfEnabled(final Configuration configuration, + final String key, + final boolean defVal, + final String message) { + if (!configuration.getBoolean(key, defVal)) { + skip(message); + } + } + + /** + * Require multipart uploads; skip tests if not enabled in the configuration. + * @param conf filesystem configuration. + */ + public static void assumeMultipartUploads(Configuration conf) { + skipIfNotEnabled(conf, + MULTIPART_UPLOADS_ENABLED, + "Store has disabled multipart uploads; skipping tests"); + } + /** * Skip a test if storage class tests are disabled, * or the bucket is an S3Express bucket. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java index 6b894a6813704..edb6f277cda35 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java @@ -447,6 +447,19 @@ public void testOpenSSLErrorRetry() throws Throwable { sdkClientException(WFOPENSSL_0035_STREAM_IS_CLOSED, null)))); } + @Test + public void testS3ExpressPreconditionFailure() throws Throwable { + AwsServiceException ase = AwsServiceException.builder() + .message("unwind") + .statusCode(SC_200_OK) + .awsErrorDetails(AwsErrorDetails.builder() + .errorCode(PRECONDITION_FAILED) + .build()) + .build(); + verifyExceptionClass(RemoteFileChangedException.class, + translateException("commit", "/path", ase)); + } + /** * Create a shaded NoHttpResponseException. * @return an exception. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java index 592529b553d24..2680e1a712a8e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java @@ -84,6 +84,10 @@ /** * Tests use of assumed roles. * Only run if an assumed role is provided. + *

+ * S3Express buckets only support access restrictions at the bucket level, + * rather than at paths underneath. + * All partial permission tests are disabled. */ @SuppressWarnings("ThrowableNotThrown") public class ITestAssumeRole extends AbstractS3ATestBase { @@ -197,9 +201,6 @@ protected Configuration createValidRoleConf() throws JsonProcessingException { conf.set(ASSUMED_ROLE_ARN, roleARN); conf.set(ASSUMED_ROLE_SESSION_NAME, "valid"); conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m"); - // disable create session so there's no need to - // add a role policy for it. - disableCreateSession(conf); bindRolePolicy(conf, RESTRICTED_POLICY); return conf; @@ -458,12 +459,15 @@ public void testAssumeRolePoliciesOverrideRolePerms() throws Throwable { public void testReadOnlyOperations() throws Throwable { describe("Restrict role to read only"); + skipIfS3ExpressBucket(getConfiguration()); Configuration conf = createAssumedRoleConfig(); bindRolePolicy(conf, policy( statement(false, S3_ALL_BUCKETS, S3_PATH_WRITE_OPERATIONS), - STATEMENT_ALL_S3, STATEMENT_ALLOW_KMS_RW)); + STATEMENT_ALL_S3, + STATEMENT_S3EXPRESS, + STATEMENT_ALLOW_KMS_RW)); Path path = methodPath(); roleFS = (S3AFileSystem) path.getFileSystem(conf); // list the root path, expect happy @@ -501,6 +505,7 @@ public void testRestrictedWriteSubdir() throws Throwable { describe("Attempt writing to paths where a role only has" + " write access to a subdir of the bucket"); + skipIfS3ExpressBucket(getConfiguration()); Path restrictedDir = methodPath(); Path child = new Path(restrictedDir, "child"); // the full FS @@ -563,6 +568,7 @@ public void testAssumedRoleRetryHandler() throws Throwable { @Test public void testRestrictedCommitActions() throws Throwable { describe("Attempt commit operations against a path with restricted rights"); + skipIfS3ExpressBucket(getConfiguration()); Configuration conf = createAssumedRoleConfig(); final int uploadPartSize = 5 * 1024 * 1024; @@ -700,12 +706,14 @@ public void writeCSVData(final File localSrc) throws IOException { @Test public void testPartialDelete() throws Throwable { describe("delete with part of the child tree read only; multidelete"); + skipIfS3ExpressBucket(getConfiguration()); executePartialDelete(createAssumedRoleConfig(), false); } @Test public void testPartialDeleteSingleDelete() throws Throwable { describe("delete with part of the child tree read only"); + skipIfS3ExpressBucket(getConfiguration()); executePartialDelete(createAssumedRoleConfig(), true); } @@ -718,6 +726,7 @@ public void testBulkDeleteOnReadOnlyAccess() throws Throwable { @Test public void testBulkDeleteWithReadWriteAccess() throws Throwable { describe("Bulk delete with read write access"); + skipIfS3ExpressBucket(getConfiguration()); executeBulkDeleteOnSomeReadOnlyFiles(createAssumedRoleConfig()); } @@ -807,6 +816,7 @@ private static void bindReadOnlyRolePolicy(Configuration assumedRoleConfig, throws JsonProcessingException { bindRolePolicyStatements(assumedRoleConfig, STATEMENT_ALLOW_KMS_RW, statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS), + STATEMENT_S3EXPRESS, new Statement(Effects.Deny) .addActions(S3_PATH_WRITE_OPERATIONS) .addResources(directory(readOnlyDir)) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java index 0cb42b0f31096..67df9a77df1a0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumedRoleCommitOperations.java @@ -67,7 +67,9 @@ public void setup() throws Exception { restrictedDir = super.path("restricted"); Configuration conf = newAssumedRoleConfig(getConfiguration(), getAssumedRoleARN()); - bindRolePolicyStatements(conf, STATEMENT_ALLOW_KMS_RW, + bindRolePolicyStatements(conf, + STATEMENT_ALLOW_KMS_RW, + STATEMENT_S3EXPRESS, statement(true, S3_ALL_BUCKETS, S3_BUCKET_READ_OPERATIONS), new RoleModel.Statement(RoleModel.Effects.Allow) .addActions(S3_PATH_RW_OPERATIONS) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java index e6eb60ad06134..c357c8aac0f50 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.runner.RunWith; @@ -51,14 +52,19 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.auth.ITestCustomSigner.CustomSignerInitializer.StoreValue; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; +import org.apache.hadoop.fs.s3a.impl.ChecksumSupport; import org.apache.hadoop.security.UserGroupInformation; +import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM; +import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION; import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS; import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; +import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS; import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; /** * Tests for custom Signers and SignerInitializers. @@ -74,6 +80,13 @@ public class ITestCustomSigner extends AbstractS3ATestBase { private static final String TEST_ID_KEY = "TEST_ID_KEY"; private static final String TEST_REGION_KEY = "TEST_REGION_KEY"; + /** + * Is the store using path style access? + */ + private static final AtomicBoolean PATH_STYLE_ACCESS_IN_USE = new AtomicBoolean(false); + + public static final String BUCKET = "bucket"; + /** * Parameterization. */ @@ -106,7 +119,11 @@ public void setup() throws Exception { super.setup(); final S3AFileSystem fs = getFileSystem(); final Configuration conf = fs.getConf(); + if (bulkDelete) { + skipIfNotEnabled(conf, ENABLE_MULTI_DELETE, "no bulk delete"); + } endpoint = conf.getTrimmed(Constants.ENDPOINT, Constants.CENTRAL_ENDPOINT); + PATH_STYLE_ACCESS_IN_USE.set(conf.getBoolean(PATH_STYLE_ACCESS, false)); LOG.info("Test endpoint is {}", endpoint); regionName = conf.getTrimmed(Constants.AWS_REGION, ""); if (regionName.isEmpty()) { @@ -153,6 +170,7 @@ private S3AFileSystem runStoreOperationsAndVerify(UserGroupInformation ugi, throws IOException, InterruptedException { Configuration conf = createTestConfig(identifier); return ugi.doAs((PrivilegedExceptionAction) () -> { + LOG.info("Performing store operations for {}", ugi.getShortUserName()); int instantiationCount = CustomSigner.getInstantiationCount(); int invocationCount = CustomSigner.getInvocationCount(); S3AFileSystem fs = (S3AFileSystem)finalPath.getFileSystem(conf); @@ -186,11 +204,13 @@ private S3AFileSystem runStoreOperationsAndVerify(UserGroupInformation ugi, ContractTestUtils.touch(fs, new Path(subdir, "file1")); // create a magic file. - createMagicFile(fs, subdir); - ContentSummary summary = fs.getContentSummary(finalPath); - fs.getS3AInternals().abortMultipartUploads(subdir); - fs.rename(subdir, new Path(finalPath, "renamed")); - fs.delete(finalPath, true); + if (fs.isMagicCommitEnabled()) { + createMagicFile(fs, subdir); + ContentSummary summary = fs.getContentSummary(finalPath); + fs.getS3AInternals().abortMultipartUploads(subdir); + fs.rename(subdir, new Path(finalPath, "renamed")); + fs.delete(finalPath, true); + } return fs; }); } @@ -204,10 +224,13 @@ private S3AFileSystem runStoreOperationsAndVerify(UserGroupInformation ugi, private Configuration createTestConfig(String identifier) { Configuration conf = createConfiguration(); + // bulk delete is not disabled; if it has been set to false by the bucket + // then one of the test runs will be skipped. removeBaseAndBucketOverrides(conf, + CHECKSUM_ALGORITHM, + CHECKSUM_VALIDATION, CUSTOM_SIGNERS, - SIGNING_ALGORITHM_S3, - ENABLE_MULTI_DELETE); + SIGNING_ALGORITHM_S3); conf.set(CUSTOM_SIGNERS, "CustomS3Signer:" + CustomSigner.class.getName() + ":" + CustomSignerInitializer.class.getName()); @@ -220,7 +243,8 @@ private Configuration createTestConfig(String identifier) { // Having the checksum algorithm in this test causes // x-amz-sdk-checksum-algorithm specified, but no corresponding // x-amz-checksum-* or x-amz-trailer headers were found - conf.unset(Constants.CHECKSUM_ALGORITHM); + conf.set(CHECKSUM_ALGORITHM, ChecksumSupport.NONE); + conf.setBoolean(CHECKSUM_VALIDATION, false); // make absolutely sure there is no caching. disableFilesystemCaching(conf); @@ -270,6 +294,9 @@ public SdkHttpFullRequest sign(SdkHttpFullRequest request, String host = request.host(); String bucketName = parseBucketFromHost(host); + if (PATH_STYLE_ACCESS_IN_USE.get()) { + bucketName = BUCKET; + } try { lastStoreValue = CustomSignerInitializer .getStoreValue(bucketName, UserGroupInformation.getCurrentUser()); @@ -312,11 +339,20 @@ public static String description() { public static final class CustomSignerInitializer implements AwsSignerInitializer { + /** + * Map of (bucket-name, ugi) -> store value. + *

+ * When working with buckets using path-style resolution, the store bucket name + * is just {@link #BUCKET}. + */ private static final Map knownStores = new HashMap<>(); @Override public void registerStore(String bucketName, Configuration storeConf, DelegationTokenProvider dtProvider, UserGroupInformation storeUgi) { + if (PATH_STYLE_ACCESS_IN_USE.get()) { + bucketName = BUCKET; + } StoreKey storeKey = new StoreKey(bucketName, storeUgi); StoreValue storeValue = new StoreValue(storeConf, dtProvider); LOG.info("Registering store {} with value {}", storeKey, storeValue); @@ -326,6 +362,9 @@ public void registerStore(String bucketName, Configuration storeConf, @Override public void unregisterStore(String bucketName, Configuration storeConf, DelegationTokenProvider dtProvider, UserGroupInformation storeUgi) { + if (PATH_STYLE_ACCESS_IN_USE.get()) { + bucketName = BUCKET; + } StoreKey storeKey = new StoreKey(bucketName, storeUgi); LOG.info("Unregistering store {}", storeKey); knownStores.remove(storeKey); @@ -341,9 +380,17 @@ public static void reset() { public static StoreValue getStoreValue(String bucketName, UserGroupInformation ugi) { StoreKey storeKey = new StoreKey(bucketName, ugi); - return knownStores.get(storeKey); + final StoreValue storeValue = knownStores.get(storeKey); + LOG.info("Getting store value for key {}: {}", storeKey, storeValue); + return storeValue; } + /** + * The key for the signer map: bucket-name and UGI. + *

+ * In path-style-access the bucket name is mapped to {@link #BUCKET} so only + * one bucket per UGI instance is supported. + */ private static class StoreKey { private final String bucketName; private final UserGroupInformation ugi; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestHttpSigner.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestHttpSigner.java index db0aaa6be0eca..e886fca5d71c6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestHttpSigner.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestHttpSigner.java @@ -140,11 +140,13 @@ private S3AFileSystem runStoreOperationsAndVerify(UserGroupInformation ugi, ContractTestUtils.touch(fs, new Path(subdir, "file1")); // create a magic file. - createMagicFile(fs, subdir); - ContentSummary summary = fs.getContentSummary(finalPath); - fs.getS3AInternals().abortMultipartUploads(subdir); - fs.rename(subdir, new Path(finalPath, "renamed")); - fs.delete(finalPath, true); + if (fs.isMagicCommitEnabled()) { + createMagicFile(fs, subdir); + ContentSummary summary = fs.getContentSummary(finalPath); + fs.getS3AInternals().abortMultipartUploads(subdir); + fs.rename(subdir, new Path(finalPath, "renamed")); + fs.delete(finalPath, true); + } return fs; }); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java index 7151c38ad3e27..85e9f3f142c4c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java @@ -47,6 +47,7 @@ import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfS3ExpressBucket; import static org.apache.hadoop.fs.s3a.auth.RoleModel.Effects; import static org.apache.hadoop.fs.s3a.auth.RoleModel.Statement; import static org.apache.hadoop.fs.s3a.auth.RoleModel.directory; @@ -86,7 +87,9 @@ * To simplify maintenance, the operations tested are broken up into * their own methods, with fields used to share the restricted role and * created paths. - * + *

+ * Test are skipped if no assumed role was provided, or if the test bucket + * is an S3Express bucket, whose permissions model is different. */ public class ITestRestrictedReadAccess extends AbstractS3ATestBase { @@ -158,6 +161,7 @@ public class ITestRestrictedReadAccess extends AbstractS3ATestBase { public void setup() throws Exception { super.setup(); assumeRoleTests(); + skipIfS3ExpressBucket(getConfiguration()); } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java index b0f685b076094..ac244be668b9d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/RoleTestUtils.java @@ -164,8 +164,8 @@ public static Configuration newAssumedRoleConfig( conf.set(ASSUMED_ROLE_ARN, roleARN); conf.set(ASSUMED_ROLE_SESSION_NAME, "test"); conf.set(ASSUMED_ROLE_SESSION_DURATION, "15m"); - // force in bucket resolution during startup - conf.setInt(S3A_BUCKET_PROBE, 1); + // disable bucket resolution during startup as s3 express doesn't like it + conf.setInt(S3A_BUCKET_PROBE, 0); disableCreateSession(conf); disableFilesystemCaching(conf); return conf; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index 3a7cceb2369ee..1aaf58152473e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -170,7 +170,7 @@ public void setup() throws Exception { taskAttempt0 = TaskAttemptID.forName(attempt0); attempt1 = "attempt_" + jobId + "_m_000001_0"; taskAttempt1 = TaskAttemptID.forName(attempt1); - + assumeMultipartUploads(getFileSystem().getConf()); outDir = path(getMethodName()); abortMultipartUploadsUnderPath(outDir); cleanupDestDir(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java index aea0f1883243a..2b288bfec0ad4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED; @@ -91,6 +92,7 @@ public ITestCommitOperationCost() { @Override public void setup() throws Exception { super.setup(); + assumeMultipartUploads(getFileSystem().getConf()); testHelper = new CommitterTestHelper(getFileSystem()); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java index 34b856c21b7e8..dd07500684785 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java @@ -92,6 +92,7 @@ protected Configuration createConfiguration() { public void setup() throws Exception { FileSystem.closeAll(); super.setup(); + assumeMultipartUploads(getFileSystem().getConf()); verifyIsMagicCommitFS(getFileSystem()); progress = new ProgressCounter(); progress.assertCount("progress", 0); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java index 2561a69f60b59..9abbfd97cbe42 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java @@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.security.UserGroupInformation; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.COMMITTER_NAME_STAGING; @@ -182,6 +183,7 @@ public void setup() throws Exception { // destroy all filesystems from previous runs. FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); super.setup(); + assumeMultipartUploads(getFileSystem().getConf()); jobId = randomJobId(); attempt0 = "attempt_" + jobId + "_m_000000_0"; taskAttempt0 = TaskAttemptID.forName(attempt0); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java index 2ede6d82798d0..ec5e807b23d08 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a.commit; import java.io.File; +import java.io.FileNotFoundException; import java.util.Arrays; import java.util.Collection; import java.util.UUID; @@ -57,11 +58,13 @@ import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES; import static org.apache.hadoop.fs.s3a.Constants.RETRY_HTTP_5XX_ERRORS; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_EXECUTION_INTERCEPTORS; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX; import static org.apache.hadoop.fs.s3a.test.SdkFaultInjector.setRequestFailureConditions; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Test upload recovery by injecting failures into the response chain. @@ -156,6 +159,10 @@ public Configuration createConfiguration() { public void setup() throws Exception { SdkFaultInjector.resetFaultInjector(); super.setup(); + if (!FAST_UPLOAD_BUFFER_DISK.equals(buffer)) { + assumeMultipartUploads(getFileSystem().getConf()); + } + } @Override @@ -163,7 +170,6 @@ public void teardown() throws Exception { // safety check in case the evaluation is failing any // request needed in cleanup. SdkFaultInjector.resetFaultInjector(); - super.teardown(); } @@ -260,9 +266,18 @@ public void testCommitOperations() throws Throwable { setRequestFailureConditions(2, SdkFaultInjector::isCompleteMultipartUploadRequest); + boolean mpuCommitConsumesUploadId = getFileSystem().getConf().getBoolean( + MULTIPART_COMMIT_CONSUMES_UPLOAD_ID, + DEFAULT_MULTIPART_COMMIT_CONSUMES_UPLOAD_ID); try (CommitContext commitContext = actions.createCommitContextForTesting(dest, JOB_ID, 0)) { - commitContext.commitOrFail(commit); + + if (mpuCommitConsumesUploadId) { + intercept(FileNotFoundException.class, () -> + commitContext.commitOrFail(commit)); + } else { + commitContext.commitOrFail(commit); + } } // make sure the saved data is as expected verifyFileContents(fs, dest, dataset); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java index 7488de41ce638..fda87d2d67d60 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java @@ -72,6 +72,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.DurationInfo; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles; @@ -173,6 +174,7 @@ public ITestS3ACommitterMRJob( @Override public void setup() throws Exception { super.setup(); + assumeMultipartUploads(getFileSystem().getConf()); // configure the test binding for this specific test case. committerTestBinding.setup(getClusterBinding(), getFileSystem()); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java index 116d48e9de5fc..89b471bbd29fc 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java @@ -112,6 +112,15 @@ public void setup() throws Exception { pendingDataFile = new Path(jobDir, filename + PENDING_SUFFIX); } + /** + * Skip this test suite when MPUS are not avaialable. + * @return false + */ + @Override + protected boolean requireMultipartUploads() { + return true; + } + /** * Returns the path to the commit metadata file, not that of the huge file. * @return a file in the job dir diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java index da1580076dbb8..e2b70a9b83b80 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java @@ -56,6 +56,7 @@ import org.apache.hadoop.util.ToolRunner; import static java.util.Optional.empty; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; /** @@ -142,6 +143,7 @@ protected String committerName() { public void setup() throws Exception { super.setup(); requireScaleTestsEnabled(); + assumeMultipartUploads(getFileSystem().getConf()); prepareToTerasort(); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java index aeb9629b3a6d1..bdb61a4e46786 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java @@ -52,6 +52,8 @@ import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE; +import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE_CLASSIC; import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS; import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES; import static org.apache.hadoop.fs.s3a.Constants.PART_UPLOAD_TIMEOUT; @@ -110,6 +112,7 @@ private Configuration timingOutConfiguration() { CONNECTION_ACQUISITION_TIMEOUT, CONNECTION_IDLE_TIME, ESTABLISH_TIMEOUT, + INPUT_STREAM_TYPE, MAX_ERROR_RETRIES, MAXIMUM_CONNECTIONS, PART_UPLOAD_TIMEOUT, @@ -125,6 +128,7 @@ private Configuration timingOutConfiguration() { // needed to ensure that streams are kept open. // without this the tests is unreliable in batch runs. disablePrefetching(conf); + conf.set(INPUT_STREAM_TYPE, INPUT_STREAM_TYPE_CLASSIC); conf.setInt(RETRY_LIMIT, 0); conf.setBoolean(FS_S3A_CREATE_PERFORMANCE, true); final Duration ms10 = Duration.ofMillis(10); @@ -190,7 +194,7 @@ public void testObjectUploadTimeouts() throws Throwable { AWSClientConfig.setMinimumOperationDuration(Duration.ZERO); final Path dir = methodPath(); Path file = new Path(dir, "file"); - Configuration conf = new Configuration(getConfiguration()); + Configuration conf = new Configuration(getFileSystem().getConf()); removeBaseAndBucketOverrides(conf, PART_UPLOAD_TIMEOUT, REQUEST_TIMEOUT, @@ -257,20 +261,21 @@ public void testObjectUploadTimeouts() throws Throwable { // and try a multipart upload to verify that its requests also outlast // the short requests - SdkFaultInjector.setRequestFailureConditions(999, - SdkFaultInjector::isPartUpload); - Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "0001/__base/file2"); - totalSleepTime.set(0); - OperationDuration dur2 = new DurationInfo(LOG, "Creating File"); - ContractTestUtils.createFile(brittleFS, magicFile, true, DATASET); - dur2.finished(); - Assertions.assertThat(totalSleepTime.get()) - .describedAs("total sleep time of magic write") - .isGreaterThan(0); - Assertions.assertThat(dur2.asDuration()) - .describedAs("Duration of magic write") - .isGreaterThan(shortTimeout); - brittleFS.delete(dir, true); + if (fs.isMagicCommitEnabled()) { + SdkFaultInjector.setRequestFailureConditions(999, + SdkFaultInjector::isPartUpload); + Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "0001/__base/file2"); + totalSleepTime.set(0); + OperationDuration dur2 = new DurationInfo(LOG, "Creating File"); + ContractTestUtils.createFile(brittleFS, magicFile, true, DATASET); + dur2.finished(); + Assertions.assertThat(totalSleepTime.get()) + .describedAs("total sleep time of magic write") + .isGreaterThan(0); + Assertions.assertThat(dur2.asDuration()) + .describedAs("Duration of magic write") + .isGreaterThan(shortTimeout); + } } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java index 8a285a4db40a8..3bf338814f6dc 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java @@ -72,7 +72,9 @@ * Test partial failures of delete and rename operations,. * * All these test have a unique path for each run, with a roleFS having - * full RW access to part of it, and R/O access to a restricted subdirectory + * full RW access to part of it, and R/O access to a restricted subdirectory. + *

+ * Tests are skipped on S3Express buckets or if no assumed role is provided. * *

    *
  1. @@ -218,6 +220,7 @@ public ITestPartialRenamesDeletes(final boolean multiDelete) { public void setup() throws Exception { super.setup(); assumeRoleTests(); + skipIfS3ExpressBucket(getConfiguration()); basePath = uniquePath(); readOnlyDir = new Path(basePath, "readonlyDir"); writableDir = new Path(basePath, "writableDir"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java index fd7df286219c9..b306f573c64f3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatchAndIfNoneMatch.java @@ -42,10 +42,12 @@ import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; +import org.apache.hadoop.test.LambdaTestUtils; import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE; import static org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_200_OK; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART; @@ -75,6 +77,8 @@ public class ITestS3APutIfMatchAndIfNoneMatch extends AbstractS3ATestBase { private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 255); private static final byte[] MULTIPART_FILE_BYTES = dataset(UPDATED_MULTIPART_THRESHOLD * 5, 'a', 'z' - 'a'); + public static final String PRECONDITION_FAILED = "PreconditionFailed"; + private BlockOutputStreamStatistics statistics; @Override @@ -99,22 +103,30 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); - Configuration conf = getConfiguration(); + Configuration conf = getFileSystem().getConf(); assumeConditionalCreateEnabled(conf); } /** * Asserts that an S3Exception has the expected HTTP status code. - * * @param code Expected HTTP status code. - * @param ex Exception to validate. + * @param ex Exception to validate. + * @return the inner exception + * @throws AssertionError if the status code doesn't match. */ - private static void assertS3ExceptionStatusCode(int code, Exception ex) { - S3Exception s3Exception = (S3Exception) ex.getCause(); - + private static S3Exception verifyS3ExceptionStatusCode(int code, Exception ex) { + final Throwable cause = ex.getCause(); + if (cause == null) { + throw new AssertionError("No inner exception of" + ex, ex); + } + if (!(cause instanceof S3Exception)) { + throw new AssertionError("Inner exception is not S3Exception under " + ex, ex); + } + S3Exception s3Exception = (S3Exception) cause; if (s3Exception.statusCode() != code) { throw new AssertionError("Expected status code " + code + " from " + ex, ex); } + return s3Exception; } /** @@ -294,12 +306,12 @@ public void testIfNoneMatchConflictOnOverwrite() throws Throwable { // attempted overwrite fails RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null)); - assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException); + verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException); // second attempt also fails RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null)); - assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException); + verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException); // Delete file and verify an overwrite works again fs.delete(testFile, false); @@ -318,13 +330,11 @@ public void testIfNoneMatchConflictOnMultipartUpload() throws Throwable { createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true); - RemoteFileChangedException firstException = intercept(RemoteFileChangedException.class, - () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true)); - assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, firstException); + expectPreconditionFailure(() -> + createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true)); - RemoteFileChangedException secondException = intercept(RemoteFileChangedException.class, - () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true)); - assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, secondException); + expectPreconditionFailure(() -> + createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true)); } @Test @@ -346,8 +356,7 @@ public void testIfNoneMatchMultipartUploadWithRaceCondition() throws Throwable { createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null); // Closing the first stream should throw RemoteFileChangedException - RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close); - assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + expectPreconditionFailure(stream::close); } @Test @@ -369,8 +378,24 @@ public void testIfNoneMatchTwoConcurrentMultipartUploads() throws Throwable { createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, true); // Closing the first stream should throw RemoteFileChangedException - RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close); - assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + // or or the S3 Express equivalent. + expectPreconditionFailure(stream::close); + } + + /** + * Expect an operation to fail with an S3 classic or S3 Express precondition failure. + * @param eval closure to eval + * @throws Exception any other failure. + */ + private static void expectPreconditionFailure(final LambdaTestUtils.VoidCallable eval) + throws Exception { + RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, eval); + S3Exception s3Exception = (S3Exception) exception.getCause(); + if (!(s3Exception.statusCode() == SC_412_PRECONDITION_FAILED + || (s3Exception.statusCode() == SC_200_OK) + && PRECONDITION_FAILED.equals(s3Exception.awsErrorDetails().errorCode()))) { + throw exception; + } } @Test @@ -388,7 +413,7 @@ public void testIfNoneMatchOverwriteWithEmptyFile() throws Throwable { // close the stream, should throw RemoteFileChangedException RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream::close); - assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); } @Test @@ -405,7 +430,7 @@ public void testIfNoneMatchOverwriteEmptyFileWithFile() throws Throwable { // overwrite with non-empty file, should throw RemoteFileChangedException RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null)); - assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); } @Test @@ -423,7 +448,7 @@ public void testIfNoneMatchOverwriteEmptyWithEmptyFile() throws Throwable { FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, true, null); assertHasCapabilityConditionalCreate(stream2); RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream2::close); - assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); } @Test @@ -478,7 +503,7 @@ public void testIfMatchOverwriteWithOutdatedEtag() throws Throwable { // overwrite file with outdated etag. Should throw RemoteFileChangedException RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, etag)); - assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + verifyS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); } @Test @@ -502,7 +527,7 @@ public void testIfMatchOverwriteDeletedFileWithEtag() throws Throwable { // overwrite file with etag. Should throw FileNotFoundException FileNotFoundException exception = intercept(FileNotFoundException.class, () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, etag)); - assertS3ExceptionStatusCode(SC_404_NOT_FOUND, exception); + verifyS3ExceptionStatusCode(SC_404_NOT_FOUND, exception); } @Test @@ -551,8 +576,7 @@ public void testIfMatchTwoMultipartUploadsRaceConditionOneClosesFirst() throws T stream1.close(); // Close second stream, should fail due to etag mismatch - RemoteFileChangedException exception = intercept(RemoteFileChangedException.class, stream2::close); - assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception); + expectPreconditionFailure(stream2::close); } @Ignore("conditional_write statistics not yet fully implemented") diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java index 6c84b374b93e9..2b17c0ff968b9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java @@ -56,6 +56,7 @@ import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.magicPath; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.S3ATestUtils.toPathList; import static org.apache.hadoop.fs.s3a.S3AUtils.HIDDEN_FILE_FILTER; @@ -103,6 +104,7 @@ public void setup() throws Exception { final S3AFileSystem fs = getFileSystem(); final Path path = methodPath(); assertHasPathCapabilities(fs, path, DIRECTORY_OPERATIONS_PURGE_UPLOADS); + assumeMultipartUploads(fs.getConf()); listingInconsistent = fs.hasPathCapability(path, DIRECTORY_LISTING_INCONSISTENT); clearAnyUploads(fs, path); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java index 80a44e22b8de7..4253b3cdf562c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java @@ -35,6 +35,7 @@ import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED; @@ -63,6 +64,7 @@ public Configuration createConfiguration() { public void setup() throws Exception { super.setup(); final S3AFileSystem fs = getFileSystem(); + assumeMultipartUploads(fs.getConf()); assertHasPathCapabilities(fs, new Path("/"), DIRECTORY_OPERATIONS_PURGE_UPLOADS); clearAnyUploads(fs, methodPath()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java index 5e8ee5d8c7641..d7464bc184220 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java @@ -48,14 +48,39 @@ public void testGetSupportedChecksumAlgorithmSHA256() { testGetSupportedChecksumAlgorithm(ChecksumAlgorithm.SHA256); } - private void testGetSupportedChecksumAlgorithm(ChecksumAlgorithm checksumAlgorithm) { - final Configuration conf = new Configuration(); - conf.set(CHECKSUM_ALGORITHM, checksumAlgorithm.toString()); + /** + * Assert that a checksum algorithm string resolves to a value. + * @param checksumAlgorithm expected value + */ + private static void testGetSupportedChecksumAlgorithm(final ChecksumAlgorithm checksumAlgorithm) { + assertChecksumAlgorithm(checksumAlgorithm, checksumAlgorithm.toString()); + } + /** + * Assert that a checksum algorithm string resolves to a value. + * @param checksumAlgorithm expected value + * @param algorithm algorithm name + */ + private static void assertChecksumAlgorithm(final ChecksumAlgorithm checksumAlgorithm, + final String algorithm) { + final Configuration conf = new Configuration(false); + conf.set(CHECKSUM_ALGORITHM, algorithm); Assertions.assertThat(ChecksumSupport.getChecksumAlgorithm(conf)) .describedAs("Checksum algorithm must match value set in the configuration") .isEqualTo(checksumAlgorithm); } + @Test + public void testCRC32C() throws Throwable { + assertChecksumAlgorithm(ChecksumAlgorithm.CRC32_C, "CRC32C"); + assertChecksumAlgorithm(ChecksumAlgorithm.CRC32_C, "CRC32_C"); + } + + @Test + public void testCRC64NVME() throws Throwable { + assertChecksumAlgorithm(ChecksumAlgorithm.CRC64_NVME, "CRC64_NVME"); + assertChecksumAlgorithm(ChecksumAlgorithm.CRC64_NVME, "CRC64NVME"); + } + @Test public void testGetChecksumAlgorithmWhenNull() { final Configuration conf = new Configuration(); @@ -73,4 +98,5 @@ public void testGetNotSupportedChecksumAlgorithm() { .describedAs("Invalid checksum algorithm should throw an exception") .isInstanceOf(IllegalArgumentException.class); } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java index 74920b2365002..08194a9c42d85 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestCreateFileCost.java @@ -41,6 +41,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.contract.ContractTestUtils.toChar; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CONDITIONAL_CREATE_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_HEADER; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.XA_HEADER_PREFIX; @@ -203,9 +204,14 @@ public void testCreateBuilderSequence() throws Throwable { () -> buildFile(testFile, false, true, GET_FILE_STATUS_ON_FILE)); } else { - // will trigger conditional create and throw RemoteFileChangedException - intercept(RemoteFileChangedException.class, - () -> buildFile(testFile, false, true, NO_HEAD_OR_LIST)); + if (getFileSystem().getConf().getBoolean(FS_S3A_CONDITIONAL_CREATE_ENABLED, true)) { + // will trigger conditional create and throw RemoteFileChangedException + intercept(RemoteFileChangedException.class, + () -> buildFile(testFile, false, true, NO_HEAD_OR_LIST)); + } else { + // third party store w/out conditional overwrite support + buildFile(testFile, false, true, NO_HEAD_OR_LIST); + } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java index 6bd4114f07cc3..d6b843bb85e83 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java @@ -124,23 +124,13 @@ public void testDeleteSingleFileInDir() throws Throwable { FILESTATUS_FILE_PROBE_L + FILESTATUS_DIR_PROBE_L), with(DIRECTORIES_DELETED, 0), with(FILES_DELETED, 1), - // a single DELETE call is made to delete the object - probe(bulkDelete, OBJECT_DELETE_REQUEST, DELETE_OBJECT_REQUEST), - probe(!bulkDelete, OBJECT_DELETE_REQUEST, - DELETE_OBJECT_REQUEST + DELETE_MARKER_REQUEST), - - // keeping: create no parent dirs or delete parents - withWhenKeeping(DIRECTORIES_CREATED, 0), - withWhenKeeping(OBJECT_BULK_DELETE_REQUEST, 0), + with(OBJECT_DELETE_REQUEST, DELETE_OBJECT_REQUEST), - // deleting: create a parent and delete any of its parents - withWhenDeleting(DIRECTORIES_CREATED, 1), - // a bulk delete for all parents is issued. - // the number of objects in it depends on the depth of the tree; - // don't worry about that - probe(deleting && bulkDelete, OBJECT_BULK_DELETE_REQUEST, - DELETE_MARKER_REQUEST) + // create no parent dirs or delete parents + with(DIRECTORIES_CREATED, 0), + // even when bulk delete is enabled, there is no use of this. + with(OBJECT_BULK_DELETE_REQUEST, 0) ); // there is an empty dir for a parent diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java index 59787617b884f..10a3df3914167 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java @@ -44,6 +44,7 @@ import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.countUploadsAt; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createPartUpload; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.BucketInfo; import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.E_BAD_STATE; @@ -129,6 +130,7 @@ public void testStoreInfoFips() throws Throwable { @Test public void testUploads() throws Throwable { + assumeMultipartUploads(getFileSystem().getConf()); S3AFileSystem fs = getFileSystem(); Path path = methodPath(); Path file = new Path(path, UPLOAD_NAME); @@ -173,14 +175,17 @@ public void testUploads() throws Throwable { } } + @Test public void testUploadListByAge() throws Throwable { S3AFileSystem fs = getFileSystem(); Path path = methodPath(); Path file = new Path(path, UPLOAD_NAME); + assumeMultipartUploads(getFileSystem().getConf()); describe("Cleaning up any leftover uploads from previous runs."); + // 1. Make sure key doesn't already exist clearAnyUploads(fs, path); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 70cab0d75544e..90ca90a2eadff 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -106,6 +106,9 @@ public void setup() throws Exception { uploadBlockSize = uploadBlockSize(); filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE, DEFAULT_HUGE_FILESIZE); + if (requireMultipartUploads()) { + assumeMultipartUploads(getFileSystem().getConf()); + } } /** @@ -127,6 +130,14 @@ public String getTestSuiteName() { return getBlockOutputBufferName(); } + /** + * Override point: does this test suite require MPUs? + * @return true if the test suite must be skipped if MPUS are off. + */ + protected boolean requireMultipartUploads() { + return false; + } + /** * Note that this can get called before test setup. * @return the configuration to use. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java index 24ba519adf0cc..13069902a217e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java @@ -54,6 +54,7 @@ import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; import static org.apache.hadoop.fs.s3a.Constants.RETRY_HTTP_5XX_ERRORS; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyInt; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED; @@ -82,6 +83,9 @@ *

    * Marked as a scale test even though it tries to aggressively abort streams being written * and should, if working, complete fast. + *

    + * Assumes multipart uploads are enabled; single part upload interruptions aren't the complicated + * ones. */ @RunWith(Parameterized.class) public class ITestS3ABlockOutputStreamInterruption extends S3AScaleTestBase { @@ -168,6 +172,7 @@ protected Configuration createScaleConfiguration() { public void setup() throws Exception { SdkFaultInjector.resetFaultInjector(); super.setup(); + assumeMultipartUploads(getFileSystem().getConf()); } @Override @@ -283,6 +288,7 @@ public void testAbortDuringUpload() throws Throwable { @Test public void testPartUploadFailure() throws Throwable { describe("Trigger a failure during a multipart upload"); + assumeMultipartUploads(getFileSystem().getConf()); int len = 6 * _1MB; final byte[] dataset = dataset(len, 'a', 'z' - 'a'); final String text = "Simulated failure"; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java index d6f15c8c84476..c85f5d54b537e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java @@ -28,4 +28,13 @@ public class ITestS3AHugeFilesArrayBlocks extends AbstractSTestS3AHugeFiles { protected String getBlockOutputBufferName() { return Constants.FAST_UPLOAD_BUFFER_ARRAY; } + + /** + * Skip this test suite when MPUS are not avaialable. + * @return false + */ + @Override + protected boolean requireMultipartUploads() { + return true; + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java index 1e74d715b88fa..6cc62f8512746 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java @@ -40,6 +40,15 @@ protected String getBlockOutputBufferName() { return FAST_UPLOAD_BYTEBUFFER; } + /** + * Skip this test suite when MPUS are not avaialable. + * @return false + */ + @Override + protected boolean requireMultipartUploads() { + return true; + } + /** * Rename the parent directory, rather than the file itself. * @param src source file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java index b83d12b4c1a66..121a8c07b3dd7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AMultipartUploadSizeLimits.java @@ -43,6 +43,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_ABORT; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED; @@ -74,6 +75,12 @@ protected Configuration createScaleConfiguration() { return configuration; } + @Override + public void setup() throws Exception { + super.setup(); + assumeMultipartUploads(getFileSystem().getConf()); + } + /** * Uploads under the limit are valid. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestAWSStatisticCollection.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestAWSStatisticCollection.java index 594cb0cdafb87..e6d7afd824fb7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestAWSStatisticCollection.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestAWSStatisticCollection.java @@ -19,12 +19,17 @@ package org.apache.hadoop.fs.s3a.statistics; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; -import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; +import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_REQUEST; /** @@ -32,19 +37,27 @@ */ public class ITestAWSStatisticCollection extends AbstractS3ACostTest { + private static final Logger LOG = + LoggerFactory.getLogger(ITestAWSStatisticCollection.class); + @Override public Configuration createConfiguration() { final Configuration conf = super.createConfiguration(); - conf.setBoolean(FS_S3A_CREATE_PERFORMANCE, true); + S3ATestUtils.removeBaseAndBucketOverrides(conf, + S3EXPRESS_CREATE_SESSION); + setPerformanceFlags(conf, "create"); + conf.setBoolean(S3EXPRESS_CREATE_SESSION, false); return conf; } @Test public void testSDKMetricsCostOfGetFileStatusOnFile() throws Throwable { - describe("performing getFileStatus on a file"); + describe("Performing getFileStatus() on a file"); Path simpleFile = file(methodPath()); // and repeat on the file looking at AWS wired up stats - verifyMetrics(() -> getFileSystem().getFileStatus(simpleFile), + final S3AFileSystem fs = getFileSystem(); + LOG.info("Initiating GET request for {}", simpleFile); + verifyMetrics(() -> fs.getFileStatus(simpleFile), with(STORE_IO_REQUEST, 1)); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java index 87b37b7f8ffbc..5ab153df31626 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java @@ -44,6 +44,7 @@ import org.junit.Test; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES; import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS; @@ -71,6 +72,7 @@ public void setup() throws Exception { super.setup(); S3AFileSystem fs = getFileSystem(); Configuration conf = getConfiguration(); + assumeMultipartUploads(fs.getConf()); rootPath = path("MiniClusterWordCount"); Path workingDir = path("working"); fs.setWorkingDirectory(workingDir); diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index f61668643a1b5..fe651742e02be 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -68,8 +68,13 @@ log4j.logger.org.apache.hadoop.fs.s3a.SDKV2Upgrade=WARN # include sensitive information such as account IDs in HTTP headers. # log4j.logger.software.amazon.awssdk.request=DEBUG +# Log TLS info +#log4j.logger.software.amazon.awssdk.thirdparty.org.apache.http.conn.ssl.SSLConnectionSocketFactory=DEBUG + + # Turn on low level HTTP protocol debugging -#log4j.logger.org.apache.http.wire=DEBUG +#log4j.logger.software.amazon.awssdk.thirdparty.org.apache.http.wire=DEBUG +#log4j.logger.software.amazon.awssdk.thirdparty.org.apache.http=DEBUG # async client #log4j.logger.io.netty.handler.logging=DEBUG