From 738be7ef137bc14990a69f87e75f6e5237c27d04 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Mon, 17 Jan 2022 13:57:59 +0000 Subject: [PATCH 1/4] HADOOP-17415. Add option to skip HEAD when getting file status --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 54 +++++++++++++++---- .../apache/hadoop/fs/s3a/S3AInputStream.java | 40 +++++++++++++- .../hadoop/fs/s3a/TestS3AInputPolicies.java | 8 +++ 3 files changed, 90 insertions(+), 12 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index c8a73d956d844..c4d4b79389726 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -3347,11 +3347,20 @@ public FileStatus getFileStatus(final Path f) throws IOException { * @throws FileNotFoundException when the path does not exist * @throws IOException on other problems. */ + @VisibleForTesting + @Retries.RetryTranslated + S3AFileStatus innerGetFileStatus(final Path f, + final boolean needEmptyDirectoryFlag, + final Set probes) throws IOException { + return innerGetFileStatus(f, needEmptyDirectoryFlag, probes, false); + } + @VisibleForTesting @Retries.RetryTranslated S3AFileStatus innerGetFileStatus(final Path f, final boolean needEmptyDirectoryFlag, - final Set probes) throws IOException { + final Set probes, + final boolean skipHead) throws IOException { final Path path = qualify(f); String key = pathToKey(path); LOG.debug("Getting path status for {} ({}); needEmptyDirectory={}", @@ -3359,7 +3368,8 @@ S3AFileStatus innerGetFileStatus(final Path f, return s3GetFileStatus(path, key, probes, - needEmptyDirectoryFlag); + needEmptyDirectoryFlag, + skipHead); } @@ -3407,12 +3417,22 @@ S3AFileStatus innerGetFileStatus(final Path f, * @throws FileNotFoundException the supplied probes failed. * @throws IOException on other problems. */ + @VisibleForTesting + @Retries.RetryTranslated + S3AFileStatus s3GetFileStatus(final Path path, + final String key, + final Set probes, + final boolean needEmptyDirectoryFlag) throws IOException { + return s3GetFileStatus(path, key, probes, needEmptyDirectoryFlag, false); + } + @VisibleForTesting @Retries.RetryTranslated S3AFileStatus s3GetFileStatus(final Path path, final String key, final Set probes, - final boolean needEmptyDirectoryFlag) throws IOException { + final boolean needEmptyDirectoryFlag, + final boolean skipHead) throws IOException { LOG.debug("S3GetFileStatus {}", path); // either you aren't looking for the directory flag, or you are, // and if you are, the probe list must contain list. @@ -3430,12 +3450,17 @@ S3AFileStatus s3GetFileStatus(final Path path, // look for the simple file ObjectMetadata meta = getObjectMetadata(key); LOG.debug("Found exact file: normal file {}", key); - long contentLength = meta.getContentLength(); - // check if CSE is enabled, then strip padded length. - if (isCSEEnabled - && meta.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null - && contentLength >= CSE_PADDING_LENGTH) { - contentLength -= CSE_PADDING_LENGTH; + long contentLength; + if (skipHead) { + contentLength = -1; + } else { + contentLength = meta.getContentLength(); + // check if CSE is enabled, then strip padded length. + if (isCSEEnabled + && meta.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null + && contentLength >= CSE_PADDING_LENGTH) { + contentLength -= CSE_PADDING_LENGTH; + } } return new S3AFileStatus(contentLength, dateToLong(meta.getLastModified()), @@ -4891,7 +4916,13 @@ private void requireSelectSupport(final Path source) throws * @throws IOException IO failure */ private S3AFileStatus extractOrFetchSimpleFileStatus( - final Path path, final Optional optStatus) + final Path path, final Optional optStatus) + throws IOException { + return extractOrFetchSimpleFileStatus(path, optStatus, false); + } + + private S3AFileStatus extractOrFetchSimpleFileStatus( + final Path path, final Optional optStatus, final boolean skipHead) throws IOException { S3AFileStatus fileStatus; if (optStatus.isPresent()) { @@ -4906,7 +4937,8 @@ private S3AFileStatus extractOrFetchSimpleFileStatus( // therefore: if there is is a dir marker, this // will raise a FileNotFoundException fileStatus = innerGetFileStatus(path, false, - StatusProbeEnum.HEAD_ONLY); + StatusProbeEnum.HEAD_ONLY, + skipHead); } return fileStatus; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 79a65acb4381b..5df16a86b30f3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -20,7 +20,9 @@ import javax.annotation.Nullable; +import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; import org.apache.hadoop.classification.VisibleForTesting; @@ -103,7 +105,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, private final String bucket; private final String key; private final String pathStr; - private final long contentLength; + private long contentLength; private final String uri; private static final Logger LOG = LoggerFactory.getLogger(S3AInputStream.class); @@ -215,6 +217,14 @@ private synchronized void reopen(String reason, long targetPos, long length, try { object = Invoker.once(text, uri, () -> client.getObject(request)); + + // Update content length from first read + // by getting information con Content-Range header + if (contentLength < 0) { + contentLength = getContentLength(object.getObjectMetadata()); + LOG.debug("Updating contentLength to {}", contentLength); + } + } catch(IOException e) { // input function failed: note it tracker.failed(); @@ -821,6 +831,15 @@ static long calculateRequestLimit( long length, long contentLength, long readahead) { + // if content length is unknown + // range limit should be set to more than 0 + // since readahead and length can be 0 + // we should select from the one with higher value + // SDK will handle it and response within content length limit + if (contentLength < 0) { + contentLength = Math.max(readahead, length); + } + long rangeLimit; switch (inputPolicy) { case Random: @@ -832,6 +851,8 @@ static long calculateRequestLimit( case Sequential: // sequential: plan for reading the entire object. + // if range limit exceed content length + // SDK will handle it and response within content length limit rangeLimit = contentLength; break; @@ -920,4 +941,21 @@ public interface InputStreamCallbacks extends Closeable { } + private long getContentLength(ObjectMetadata objectMetadata) { + String contentRange = (String)objectMetadata.getRawMetadataValue("Content-Range"); + long length = 0; + if (contentRange != null) { + String[] tokens = contentRange.split("[ -/]+"); + + try { + length = Long.parseLong(tokens[tokens.length - 1]); + } catch (NumberFormatException var5) { + throw new SdkClientException("Unable to parse content range. Header 'Content-Range' has corrupted data" + var5.getMessage(), var5); + } + } + + return length; + } + + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java index c0c8137aaf676..25c3f2e14b6ac 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java @@ -64,18 +64,26 @@ public static Collection data() { return Arrays.asList(new Object[][]{ {S3AInputPolicy.Normal, 0, -1, 0, _64K, 0}, {S3AInputPolicy.Normal, 0, -1, _10MB, _64K, _10MB}, + {S3AInputPolicy.Normal, 0, -1, -1, _64K, _64K}, {S3AInputPolicy.Normal, _64K, _64K, _10MB, _64K, _10MB}, {S3AInputPolicy.Sequential, 0, -1, 0, _64K, 0}, {S3AInputPolicy.Sequential, 0, -1, _10MB, _64K, _10MB}, + {S3AInputPolicy.Sequential, 0, -1, -1, _64K, _64K}, {S3AInputPolicy.Random, 0, -1, 0, _64K, 0}, {S3AInputPolicy.Random, 0, -1, _10MB, _64K, _10MB}, + {S3AInputPolicy.Random, 0, -1, -1, _64K, _64K}, {S3AInputPolicy.Random, 0, _128K, _10MB, _64K, _128K}, {S3AInputPolicy.Random, 0, _128K, _10MB, _256K, _256K}, + {S3AInputPolicy.Random, 0, _128K, -1, _64K, _128K}, + {S3AInputPolicy.Random, 0, _128K, -1, _256K, _256K}, {S3AInputPolicy.Random, 0, 0, _10MB, _256K, _256K}, {S3AInputPolicy.Random, 0, 1, _10MB, _256K, _256K}, {S3AInputPolicy.Random, 0, _1MB, _10MB, _256K, _1MB}, {S3AInputPolicy.Random, 0, _1MB, _10MB, 0, _1MB}, + {S3AInputPolicy.Random, 0, _1MB, -1, _256K, _1MB}, + {S3AInputPolicy.Random, 0, _1MB, -1, 0, _1MB}, {S3AInputPolicy.Random, _10MB + _64K, _1MB, _10MB, _256K, _10MB}, + }); } From 18629110e94ef6f4e90d2f9ea64ea1a710f0944d Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Mon, 17 Jan 2022 15:37:24 +0000 Subject: [PATCH 2/4] HADOOP-17415. Skip HEAD when call open file --- .../main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 2 +- .../main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index c4d4b79389726..5017f56d1a0fe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1412,7 +1412,7 @@ private FSDataInputStream open( // this span is passed into the stream. final AuditSpan auditSpan = entryPoint(INVOCATION_OPEN, path); S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, - providedStatus); + providedStatus, true); S3AReadOpContext readContext; if (options.isPresent()) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 5df16a86b30f3..e9ee60a60bd08 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -153,7 +153,6 @@ public S3AInputStream(S3AReadOpContext ctx, "No Bucket"); Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key"); long l = s3Attributes.getLen(); - Preconditions.checkArgument(l >= 0, "Negative content length"); this.context = ctx; this.bucket = s3Attributes.getBucket(); this.key = s3Attributes.getKey(); @@ -406,7 +405,7 @@ private void incrementBytesRead(long bytesRead) { @Retries.RetryTranslated public synchronized int read() throws IOException { checkNotClosed(); - if (this.contentLength == 0 || (nextReadPos >= contentLength)) { + if (this.contentLength == 0 || (contentLength > 0 && nextReadPos >= contentLength)) { return -1; } @@ -490,7 +489,7 @@ public synchronized int read(byte[] buf, int off, int len) return 0; } - if (this.contentLength == 0 || (nextReadPos >= contentLength)) { + if (this.contentLength == 0 || (contentLength > 0 && nextReadPos >= contentLength)) { return -1; } From a115e1b8b1d5b7a52a077143fe097c3fb53da5af Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Mon, 17 Jan 2022 17:41:52 +0000 Subject: [PATCH 3/4] HADOOP-17415. Update default content length to support random read --- .../java/org/apache/hadoop/fs/s3a/S3AInputStream.java | 8 +++----- .../org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java | 3 ++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index e9ee60a60bd08..77a714deb0e5b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -263,7 +263,7 @@ public synchronized void seek(long targetPos) throws IOException { + " " + targetPos); } - if (this.contentLength <= 0) { + if (this.contentLength == 0) { return; } @@ -831,12 +831,10 @@ static long calculateRequestLimit( long contentLength, long readahead) { // if content length is unknown - // range limit should be set to more than 0 - // since readahead and length can be 0 - // we should select from the one with higher value + // set content length to be as far as possible // SDK will handle it and response within content length limit if (contentLength < 0) { - contentLength = Math.max(readahead, length); + contentLength = targetPos + Math.max(readahead, length); } long rangeLimit; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java index 25c3f2e14b6ac..996d4833ff684 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java @@ -83,7 +83,8 @@ public static Collection data() { {S3AInputPolicy.Random, 0, _1MB, -1, _256K, _1MB}, {S3AInputPolicy.Random, 0, _1MB, -1, 0, _1MB}, {S3AInputPolicy.Random, _10MB + _64K, _1MB, _10MB, _256K, _10MB}, - + {S3AInputPolicy.Random, _10MB, _64K, -1, _256K, _10MB + _256K}, + {S3AInputPolicy.Random, _10MB, _1MB, -1, _256K, _10MB + _1MB}, }); } From 89778a2848732a68767a4d26caf53fee50072286 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Mon, 24 Jan 2022 17:14:02 +0000 Subject: [PATCH 4/4] HADOOP-17415. Change default openFile operation to skip head request --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 78 ++++++--------- .../apache/hadoop/fs/s3a/S3AInputStream.java | 93 +++++++++++------- .../hadoop/fs/s3a/impl/InternalConstants.java | 3 + .../hadoop/fs/s3a/impl/StatusProbeEnum.java | 6 ++ .../fs/contract/s3a/ITestS3AContractOpen.java | 95 +++++++++++++++++++ .../fs/s3a/ITestS3AFileOperationCost.java | 5 +- .../apache/hadoop/fs/s3a/TestS3AUnbuffer.java | 1 + .../ITestS3AFileContextMainOperations.java | 5 + 8 files changed, 201 insertions(+), 85 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 5017f56d1a0fe..455259795048a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1412,7 +1412,7 @@ private FSDataInputStream open( // this span is passed into the stream. final AuditSpan auditSpan = entryPoint(INVOCATION_OPEN, path); S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, - providedStatus, true); + providedStatus); S3AReadOpContext readContext; if (options.isPresent()) { @@ -3347,20 +3347,12 @@ public FileStatus getFileStatus(final Path f) throws IOException { * @throws FileNotFoundException when the path does not exist * @throws IOException on other problems. */ - @VisibleForTesting - @Retries.RetryTranslated - S3AFileStatus innerGetFileStatus(final Path f, - final boolean needEmptyDirectoryFlag, - final Set probes) throws IOException { - return innerGetFileStatus(f, needEmptyDirectoryFlag, probes, false); - } - @VisibleForTesting @Retries.RetryTranslated S3AFileStatus innerGetFileStatus(final Path f, final boolean needEmptyDirectoryFlag, - final Set probes, - final boolean skipHead) throws IOException { + final Set probes) throws IOException { + final Path path = qualify(f); String key = pathToKey(path); LOG.debug("Getting path status for {} ({}); needEmptyDirectory={}", @@ -3368,8 +3360,7 @@ S3AFileStatus innerGetFileStatus(final Path f, return s3GetFileStatus(path, key, probes, - needEmptyDirectoryFlag, - skipHead); + needEmptyDirectoryFlag); } @@ -3417,22 +3408,12 @@ S3AFileStatus innerGetFileStatus(final Path f, * @throws FileNotFoundException the supplied probes failed. * @throws IOException on other problems. */ - @VisibleForTesting - @Retries.RetryTranslated - S3AFileStatus s3GetFileStatus(final Path path, - final String key, - final Set probes, - final boolean needEmptyDirectoryFlag) throws IOException { - return s3GetFileStatus(path, key, probes, needEmptyDirectoryFlag, false); - } - @VisibleForTesting @Retries.RetryTranslated S3AFileStatus s3GetFileStatus(final Path path, final String key, final Set probes, - final boolean needEmptyDirectoryFlag, - final boolean skipHead) throws IOException { + final boolean needEmptyDirectoryFlag) throws IOException { LOG.debug("S3GetFileStatus {}", path); // either you aren't looking for the directory flag, or you are, // and if you are, the probe list must contain list. @@ -3444,23 +3425,34 @@ S3AFileStatus s3GetFileStatus(final Path path, return new S3AFileStatus(Tristate.UNKNOWN, path, username); } + // get file status without a probe + if (!key.isEmpty() && !key.endsWith("/") + && probes.isEmpty()) { + // use negative value for length + // to tell that object length is unknown + // it will be updated on first read + // also no information about modification time, etag and version id + return new S3AFileStatus(-1, + 0L, + path, + getDefaultBlockSize(path), + username, + null, + null); + } + if (!key.isEmpty() && !key.endsWith("/") && probes.contains(StatusProbeEnum.Head)) { try { // look for the simple file ObjectMetadata meta = getObjectMetadata(key); LOG.debug("Found exact file: normal file {}", key); - long contentLength; - if (skipHead) { - contentLength = -1; - } else { - contentLength = meta.getContentLength(); - // check if CSE is enabled, then strip padded length. - if (isCSEEnabled - && meta.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null - && contentLength >= CSE_PADDING_LENGTH) { - contentLength -= CSE_PADDING_LENGTH; - } + long contentLength = meta.getContentLength(); + // check if CSE is enabled, then strip padded length. + if (isCSEEnabled + && meta.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null + && contentLength >= CSE_PADDING_LENGTH) { + contentLength -= CSE_PADDING_LENGTH; } return new S3AFileStatus(contentLength, dateToLong(meta.getLastModified()), @@ -4916,13 +4908,7 @@ private void requireSelectSupport(final Path source) throws * @throws IOException IO failure */ private S3AFileStatus extractOrFetchSimpleFileStatus( - final Path path, final Optional optStatus) - throws IOException { - return extractOrFetchSimpleFileStatus(path, optStatus, false); - } - - private S3AFileStatus extractOrFetchSimpleFileStatus( - final Path path, final Optional optStatus, final boolean skipHead) + final Path path, final Optional optStatus) throws IOException { S3AFileStatus fileStatus; if (optStatus.isPresent()) { @@ -4933,12 +4919,10 @@ private S3AFileStatus extractOrFetchSimpleFileStatus( throw new FileNotFoundException(path.toString() + " is a directory"); } } else { - // Executes a HEAD only. - // therefore: if there is is a dir marker, this - // will raise a FileNotFoundException + // Get simple file status without a probe + // therefore: no content length, modification time, etag or version id fileStatus = innerGetFileStatus(path, false, - StatusProbeEnum.HEAD_ONLY, - skipHead); + StatusProbeEnum.NONE); } return fileStatus; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 77a714deb0e5b..9e52dbd64dc2a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -20,11 +20,11 @@ import javax.annotation.Nullable; -import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.model.AmazonS3Exception; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; @@ -51,6 +51,7 @@ import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.util.StringUtils.toLowerCase; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_416; /** * The input stream for an S3A object. @@ -100,12 +101,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, */ private S3Object object; private S3ObjectInputStream wrappedStream; + private long contentLength; private final S3AReadOpContext context; private final InputStreamCallbacks client; private final String bucket; private final String key; private final String pathStr; - private long contentLength; private final String uri; private static final Logger LOG = LoggerFactory.getLogger(S3AInputStream.class); @@ -181,6 +182,28 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) { streamStatistics.inputPolicySet(inputPolicy.ordinal()); } + /*** + * Extract content length from Content-Range header. + * @param objectMetadata S3 Object metadata + * @return content length + */ + public static long getContentLength(ObjectMetadata objectMetadata) { + String contentRange = (String)objectMetadata.getRawMetadataValue("Content-Range"); + long length = -1; + if (contentRange != null) { + String[] tokens = contentRange.split("[ -/]+"); + + try { + length = Long.parseLong(tokens[tokens.length - 1]); + } catch (NumberFormatException e) { + LOG.warn("Unable to parse content range. Header 'Content-Range' has corrupted data" + + e.getMessage(), e); + } + } + + return length; + } + /** * Opens up the stream at specified target position and for given length. * @@ -218,13 +241,25 @@ private synchronized void reopen(String reason, long targetPos, long length, () -> client.getObject(request)); // Update content length from first read - // by getting information con Content-Range header + // by getting information from Content-Range header if (contentLength < 0) { contentLength = getContentLength(object.getObjectMetadata()); LOG.debug("Updating contentLength to {}", contentLength); } } catch(IOException e) { + // when error out of range + // update content length to ActualObjectSize from additional details + if (e.getCause() instanceof AmazonS3Exception) { + AmazonS3Exception s3Exception = (AmazonS3Exception) e.getCause(); + if (s3Exception.getStatusCode() == SC_416 + && s3Exception.getAdditionalDetails() != null + && s3Exception.getAdditionalDetails().containsKey("ActualObjectSize")) { + String objectSize = s3Exception.getAdditionalDetails().get("ActualObjectSize"); + contentLength = Long.parseLong(objectSize); + LOG.debug("Updating contentLength to {}", contentLength); + } + } // input function failed: note it tracker.failed(); // and rethrow @@ -405,9 +440,6 @@ private void incrementBytesRead(long bytesRead) { @Retries.RetryTranslated public synchronized int read() throws IOException { checkNotClosed(); - if (this.contentLength == 0 || (contentLength > 0 && nextReadPos >= contentLength)) { - return -1; - } try { lazySeek(nextReadPos, 1); @@ -415,6 +447,11 @@ public synchronized int read() throws IOException { return -1; } + // content length is updated after the first read from lazySeek + if (this.contentLength == 0 || (contentLength > 0 && nextReadPos >= contentLength)) { + return -1; + } + Invoker invoker = context.getReadInvoker(); int byteRead = invoker.retry("read", pathStr, true, () -> { @@ -489,10 +526,6 @@ public synchronized int read(byte[] buf, int off, int len) return 0; } - if (this.contentLength == 0 || (contentLength > 0 && nextReadPos >= contentLength)) { - return -1; - } - try { lazySeek(nextReadPos, len); } catch (EOFException e) { @@ -500,6 +533,11 @@ public synchronized int read(byte[] buf, int off, int len) return -1; } + // content length is updated after the first read from lazySeek + if (this.contentLength == 0 || (contentLength > 0 && nextReadPos >= contentLength)) { + return -1; + } + Invoker invoker = context.getReadInvoker(); streamStatistics.readOperationStarted(nextReadPos, len); @@ -830,13 +868,6 @@ static long calculateRequestLimit( long length, long contentLength, long readahead) { - // if content length is unknown - // set content length to be as far as possible - // SDK will handle it and response within content length limit - if (contentLength < 0) { - contentLength = targetPos + Math.max(readahead, length); - } - long rangeLimit; switch (inputPolicy) { case Random: @@ -848,8 +879,6 @@ static long calculateRequestLimit( case Sequential: // sequential: plan for reading the entire object. - // if range limit exceed content length - // SDK will handle it and response within content length limit rangeLimit = contentLength; break; @@ -862,6 +891,15 @@ static long calculateRequestLimit( } // cannot read past the end of the object rangeLimit = Math.min(contentLength, rangeLimit); + + // negative range limit means content length is unknown + // we should set it to a positive value that greater than targetPos + // this is possible because AWS SDK will response within content length limit + // even if request range exceed content length + if (rangeLimit < 0) { + rangeLimit = targetPos + Math.max(readahead, length); + } + return rangeLimit; } @@ -938,21 +976,4 @@ public interface InputStreamCallbacks extends Closeable { } - private long getContentLength(ObjectMetadata objectMetadata) { - String contentRange = (String)objectMetadata.getRawMetadataValue("Content-Range"); - long length = 0; - if (contentRange != null) { - String[] tokens = contentRange.split("[ -/]+"); - - try { - length = Long.parseLong(tokens[tokens.length - 1]); - } catch (NumberFormatException var5) { - throw new SdkClientException("Unable to parse content range. Header 'Content-Range' has corrupted data" + var5.getMessage(), var5); - } - } - - return length; - } - - } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 1e4a1262764ed..d26c8c828e9a3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -101,6 +101,9 @@ private InternalConstants() { /** 404 error code. */ public static final int SC_404 = 404; + /** 416 Range Not Satisfiable error code. */ + public static final int SC_416 = 416; + /** Name of the log for throttling events. Value: {@value}. */ public static final String THROTTLE_LOG_NAME = "org.apache.hadoop.fs.s3a.throttled"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java index 3b69c7efe3741..b11af4ef51e2c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java @@ -48,6 +48,12 @@ public enum StatusProbeEnum { public static final Set LIST_ONLY = EnumSet.of(List); + /** + * No probe. + */ + public static final Set NONE = + EnumSet.noneOf(StatusProbeEnum.class); + /** Look for files and directories. */ public static final Set FILE = HEAD_ONLY; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java index 4765fa8e8d769..b7eaaa50503c8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java @@ -19,13 +19,21 @@ package org.apache.hadoop.fs.contract.s3a; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractOpenTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.Ignore; + +import java.io.FileNotFoundException; +import java.io.IOException; /** * S3A contract tests opening files. */ public class ITestS3AContractOpen extends AbstractContractOpenTest { + private FSDataInputStream instream; @Override protected AbstractFSContract createContract(Configuration conf) { @@ -40,4 +48,91 @@ protected AbstractFSContract createContract(Configuration conf) { protected boolean areZeroByteFilesEncrypted() { return true; } + + /** + * From HADOOP-17415, S3A will skip HEAD request to get file status when open file. + * Therefore, FileNotFoundException will be delayed until the first read occur. + */ + @Override + public void testOpenReadDir() throws Throwable { + describe("create & read a directory"); + Path path = path("zero.dir"); + mkdirs(path); + + try { + instream = getFileSystem().open(path); + int c = instream.read(); + fail("A directory has been opened for reading"); + } catch (FileNotFoundException e) { + handleExpectedException(e); + } catch (IOException e) { + handleRelaxedException("opening a directory for reading", "FileNotFoundException", e); + } + } + + @Override + public void testOpenReadDirWithChild() throws Throwable { + describe("create & read a directory which has a child"); + Path path = path("zero.dir"); + mkdirs(path); + Path path2 = new Path(path, "child"); + mkdirs(path2); + + try { + instream = getFileSystem().open(path); + int c = instream.read(); + fail("A directory has been opened for reading"); + } catch (FileNotFoundException e) { + handleExpectedException(e); + } catch (IOException e) { + handleRelaxedException("opening a directory for reading", "FileNotFoundException", e); + } + } + + @Override + public void testOpenFileLazyFail() throws Throwable { + describe("openFile fails on a missing file in the read() and not before"); + FutureDataInputStreamBuilder builder = + getFileSystem().openFile(path("testOpenFileLazyFail")).opt("fs.test.something", true); + + try { + instream = builder.build().get(); + int c = instream.read(); + fail("A non existing file has been opened for reading"); + } catch (FileNotFoundException e) { + handleExpectedException(e); + } catch (IOException e) { + handleRelaxedException("opening a non existing file for reading", "FileNotFoundException", e); + } + } + + @Override + @Ignore + public void testOpenFileFailExceptionally() throws Throwable { + // does not fail on openFile + } + + @Override + @Ignore + public void testAwaitFutureFailToFNFE() throws Throwable { + // does not fail on openFile + } + + @Override + @Ignore + public void testAwaitFutureTimeoutFailToFNFE() throws Throwable { + // does not fail on openFile + } + + @Override + @Ignore + public void testOpenFileExceptionallyTranslating() throws Throwable { + // does not fail on openFile + } + + @Override + @Ignore + public void testChainedFailureAwaitFuture() throws Throwable { + // does not fail on openFile + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java index 27c70b2b2148d..71dd2dd3ea504 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java @@ -336,10 +336,11 @@ public void testDirProbes() throws Throwable { FILE_STATUS_DIR_PROBE); assertEmptyDirStatus(status, Tristate.TRUE); - // skip all probes and expect no operations to take place - interceptGetFileStatusFNFE(emptydir, false, + // skip all probes and return file status with no content length + status = verifyInnerGetFileStatus(emptydir, false, EnumSet.noneOf(StatusProbeEnum.class), NO_IO); + assertEmptyDirStatus(status, Tristate.FALSE); // now add a trailing slash to the key and use the // deep internal s3GetFileStatus method call. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java index c858c9933fc7e..fbd8333f9eff5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java @@ -52,6 +52,7 @@ public void testUnbuffer() throws IOException { Path path = new Path("/file"); ObjectMetadata meta = mock(ObjectMetadata.class); when(meta.getContentLength()).thenReturn(1L); + when(meta.getRawMetadataValue("Content-Range")).thenReturn("bytes 0-0/1"); when(meta.getLastModified()).thenReturn(new Date(2L)); when(meta.getETag()).thenReturn("mock-etag"); when(s3.getObjectMetadata(any())).thenReturn(meta); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextMainOperations.java index 3b4eaf4a80667..ea1a1e9157ebb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextMainOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextMainOperations.java @@ -63,4 +63,9 @@ public void testSetVerifyChecksum() throws IOException { //checksums ignored, so test removed } + @Test + @Ignore + public void testOpenFileLazyFail() throws Throwable { + // FileNotFoundException with be delayed until first read + } }