diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index c8a73d956d844..455259795048a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -3352,6 +3352,7 @@ public FileStatus getFileStatus(final Path f) throws IOException { S3AFileStatus innerGetFileStatus(final Path f, final boolean needEmptyDirectoryFlag, final Set probes) throws IOException { + final Path path = qualify(f); String key = pathToKey(path); LOG.debug("Getting path status for {} ({}); needEmptyDirectory={}", @@ -3424,6 +3425,22 @@ S3AFileStatus s3GetFileStatus(final Path path, return new S3AFileStatus(Tristate.UNKNOWN, path, username); } + // get file status without a probe + if (!key.isEmpty() && !key.endsWith("/") + && probes.isEmpty()) { + // use negative value for length + // to tell that object length is unknown + // it will be updated on first read + // also no information about modification time, etag and version id + return new S3AFileStatus(-1, + 0L, + path, + getDefaultBlockSize(path), + username, + null, + null); + } + if (!key.isEmpty() && !key.endsWith("/") && probes.contains(StatusProbeEnum.Head)) { try { @@ -4902,11 +4919,10 @@ private S3AFileStatus extractOrFetchSimpleFileStatus( throw new FileNotFoundException(path.toString() + " is a directory"); } } else { - // Executes a HEAD only. - // therefore: if there is is a dir marker, this - // will raise a FileNotFoundException + // Get simple file status without a probe + // therefore: no content length, modification time, etag or version id fileStatus = innerGetFileStatus(path, false, - StatusProbeEnum.HEAD_ONLY); + StatusProbeEnum.NONE); } return fileStatus; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 79a65acb4381b..9e52dbd64dc2a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -21,8 +21,10 @@ import javax.annotation.Nullable; import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.model.AmazonS3Exception; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; @@ -49,6 +51,7 @@ import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.util.StringUtils.toLowerCase; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_416; /** * The input stream for an S3A object. @@ -98,12 +101,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, */ private S3Object object; private S3ObjectInputStream wrappedStream; + private long contentLength; private final S3AReadOpContext context; private final InputStreamCallbacks client; private final String bucket; private final String key; private final String pathStr; - private final long contentLength; private final String uri; private static final Logger LOG = LoggerFactory.getLogger(S3AInputStream.class); @@ -151,7 +154,6 @@ public S3AInputStream(S3AReadOpContext ctx, "No Bucket"); Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key"); long l = s3Attributes.getLen(); - Preconditions.checkArgument(l >= 0, "Negative content length"); this.context = ctx; this.bucket = s3Attributes.getBucket(); this.key = s3Attributes.getKey(); @@ -180,6 +182,28 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) { streamStatistics.inputPolicySet(inputPolicy.ordinal()); } + /*** + * Extract content length from Content-Range header. + * @param objectMetadata S3 Object metadata + * @return content length + */ + public static long getContentLength(ObjectMetadata objectMetadata) { + String contentRange = (String)objectMetadata.getRawMetadataValue("Content-Range"); + long length = -1; + if (contentRange != null) { + String[] tokens = contentRange.split("[ -/]+"); + + try { + length = Long.parseLong(tokens[tokens.length - 1]); + } catch (NumberFormatException e) { + LOG.warn("Unable to parse content range. Header 'Content-Range' has corrupted data" + + e.getMessage(), e); + } + } + + return length; + } + /** * Opens up the stream at specified target position and for given length. * @@ -215,7 +239,27 @@ private synchronized void reopen(String reason, long targetPos, long length, try { object = Invoker.once(text, uri, () -> client.getObject(request)); + + // Update content length from first read + // by getting information from Content-Range header + if (contentLength < 0) { + contentLength = getContentLength(object.getObjectMetadata()); + LOG.debug("Updating contentLength to {}", contentLength); + } + } catch(IOException e) { + // when error out of range + // update content length to ActualObjectSize from additional details + if (e.getCause() instanceof AmazonS3Exception) { + AmazonS3Exception s3Exception = (AmazonS3Exception) e.getCause(); + if (s3Exception.getStatusCode() == SC_416 + && s3Exception.getAdditionalDetails() != null + && s3Exception.getAdditionalDetails().containsKey("ActualObjectSize")) { + String objectSize = s3Exception.getAdditionalDetails().get("ActualObjectSize"); + contentLength = Long.parseLong(objectSize); + LOG.debug("Updating contentLength to {}", contentLength); + } + } // input function failed: note it tracker.failed(); // and rethrow @@ -254,7 +298,7 @@ public synchronized void seek(long targetPos) throws IOException { + " " + targetPos); } - if (this.contentLength <= 0) { + if (this.contentLength == 0) { return; } @@ -396,9 +440,6 @@ private void incrementBytesRead(long bytesRead) { @Retries.RetryTranslated public synchronized int read() throws IOException { checkNotClosed(); - if (this.contentLength == 0 || (nextReadPos >= contentLength)) { - return -1; - } try { lazySeek(nextReadPos, 1); @@ -406,6 +447,11 @@ public synchronized int read() throws IOException { return -1; } + // content length is updated after the first read from lazySeek + if (this.contentLength == 0 || (contentLength > 0 && nextReadPos >= contentLength)) { + return -1; + } + Invoker invoker = context.getReadInvoker(); int byteRead = invoker.retry("read", pathStr, true, () -> { @@ -480,10 +526,6 @@ public synchronized int read(byte[] buf, int off, int len) return 0; } - if (this.contentLength == 0 || (nextReadPos >= contentLength)) { - return -1; - } - try { lazySeek(nextReadPos, len); } catch (EOFException e) { @@ -491,6 +533,11 @@ public synchronized int read(byte[] buf, int off, int len) return -1; } + // content length is updated after the first read from lazySeek + if (this.contentLength == 0 || (contentLength > 0 && nextReadPos >= contentLength)) { + return -1; + } + Invoker invoker = context.getReadInvoker(); streamStatistics.readOperationStarted(nextReadPos, len); @@ -844,6 +891,15 @@ static long calculateRequestLimit( } // cannot read past the end of the object rangeLimit = Math.min(contentLength, rangeLimit); + + // negative range limit means content length is unknown + // we should set it to a positive value that greater than targetPos + // this is possible because AWS SDK will response within content length limit + // even if request range exceed content length + if (rangeLimit < 0) { + rangeLimit = targetPos + Math.max(readahead, length); + } + return rangeLimit; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 1e4a1262764ed..d26c8c828e9a3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -101,6 +101,9 @@ private InternalConstants() { /** 404 error code. */ public static final int SC_404 = 404; + /** 416 Range Not Satisfiable error code. */ + public static final int SC_416 = 416; + /** Name of the log for throttling events. Value: {@value}. */ public static final String THROTTLE_LOG_NAME = "org.apache.hadoop.fs.s3a.throttled"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java index 3b69c7efe3741..b11af4ef51e2c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StatusProbeEnum.java @@ -48,6 +48,12 @@ public enum StatusProbeEnum { public static final Set LIST_ONLY = EnumSet.of(List); + /** + * No probe. + */ + public static final Set NONE = + EnumSet.noneOf(StatusProbeEnum.class); + /** Look for files and directories. */ public static final Set FILE = HEAD_ONLY; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java index 4765fa8e8d769..b7eaaa50503c8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractOpen.java @@ -19,13 +19,21 @@ package org.apache.hadoop.fs.contract.s3a; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractOpenTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.Ignore; + +import java.io.FileNotFoundException; +import java.io.IOException; /** * S3A contract tests opening files. */ public class ITestS3AContractOpen extends AbstractContractOpenTest { + private FSDataInputStream instream; @Override protected AbstractFSContract createContract(Configuration conf) { @@ -40,4 +48,91 @@ protected AbstractFSContract createContract(Configuration conf) { protected boolean areZeroByteFilesEncrypted() { return true; } + + /** + * From HADOOP-17415, S3A will skip HEAD request to get file status when open file. + * Therefore, FileNotFoundException will be delayed until the first read occur. + */ + @Override + public void testOpenReadDir() throws Throwable { + describe("create & read a directory"); + Path path = path("zero.dir"); + mkdirs(path); + + try { + instream = getFileSystem().open(path); + int c = instream.read(); + fail("A directory has been opened for reading"); + } catch (FileNotFoundException e) { + handleExpectedException(e); + } catch (IOException e) { + handleRelaxedException("opening a directory for reading", "FileNotFoundException", e); + } + } + + @Override + public void testOpenReadDirWithChild() throws Throwable { + describe("create & read a directory which has a child"); + Path path = path("zero.dir"); + mkdirs(path); + Path path2 = new Path(path, "child"); + mkdirs(path2); + + try { + instream = getFileSystem().open(path); + int c = instream.read(); + fail("A directory has been opened for reading"); + } catch (FileNotFoundException e) { + handleExpectedException(e); + } catch (IOException e) { + handleRelaxedException("opening a directory for reading", "FileNotFoundException", e); + } + } + + @Override + public void testOpenFileLazyFail() throws Throwable { + describe("openFile fails on a missing file in the read() and not before"); + FutureDataInputStreamBuilder builder = + getFileSystem().openFile(path("testOpenFileLazyFail")).opt("fs.test.something", true); + + try { + instream = builder.build().get(); + int c = instream.read(); + fail("A non existing file has been opened for reading"); + } catch (FileNotFoundException e) { + handleExpectedException(e); + } catch (IOException e) { + handleRelaxedException("opening a non existing file for reading", "FileNotFoundException", e); + } + } + + @Override + @Ignore + public void testOpenFileFailExceptionally() throws Throwable { + // does not fail on openFile + } + + @Override + @Ignore + public void testAwaitFutureFailToFNFE() throws Throwable { + // does not fail on openFile + } + + @Override + @Ignore + public void testAwaitFutureTimeoutFailToFNFE() throws Throwable { + // does not fail on openFile + } + + @Override + @Ignore + public void testOpenFileExceptionallyTranslating() throws Throwable { + // does not fail on openFile + } + + @Override + @Ignore + public void testChainedFailureAwaitFuture() throws Throwable { + // does not fail on openFile + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java index 27c70b2b2148d..71dd2dd3ea504 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java @@ -336,10 +336,11 @@ public void testDirProbes() throws Throwable { FILE_STATUS_DIR_PROBE); assertEmptyDirStatus(status, Tristate.TRUE); - // skip all probes and expect no operations to take place - interceptGetFileStatusFNFE(emptydir, false, + // skip all probes and return file status with no content length + status = verifyInnerGetFileStatus(emptydir, false, EnumSet.noneOf(StatusProbeEnum.class), NO_IO); + assertEmptyDirStatus(status, Tristate.FALSE); // now add a trailing slash to the key and use the // deep internal s3GetFileStatus method call. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java index c0c8137aaf676..996d4833ff684 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java @@ -64,18 +64,27 @@ public static Collection data() { return Arrays.asList(new Object[][]{ {S3AInputPolicy.Normal, 0, -1, 0, _64K, 0}, {S3AInputPolicy.Normal, 0, -1, _10MB, _64K, _10MB}, + {S3AInputPolicy.Normal, 0, -1, -1, _64K, _64K}, {S3AInputPolicy.Normal, _64K, _64K, _10MB, _64K, _10MB}, {S3AInputPolicy.Sequential, 0, -1, 0, _64K, 0}, {S3AInputPolicy.Sequential, 0, -1, _10MB, _64K, _10MB}, + {S3AInputPolicy.Sequential, 0, -1, -1, _64K, _64K}, {S3AInputPolicy.Random, 0, -1, 0, _64K, 0}, {S3AInputPolicy.Random, 0, -1, _10MB, _64K, _10MB}, + {S3AInputPolicy.Random, 0, -1, -1, _64K, _64K}, {S3AInputPolicy.Random, 0, _128K, _10MB, _64K, _128K}, {S3AInputPolicy.Random, 0, _128K, _10MB, _256K, _256K}, + {S3AInputPolicy.Random, 0, _128K, -1, _64K, _128K}, + {S3AInputPolicy.Random, 0, _128K, -1, _256K, _256K}, {S3AInputPolicy.Random, 0, 0, _10MB, _256K, _256K}, {S3AInputPolicy.Random, 0, 1, _10MB, _256K, _256K}, {S3AInputPolicy.Random, 0, _1MB, _10MB, _256K, _1MB}, {S3AInputPolicy.Random, 0, _1MB, _10MB, 0, _1MB}, + {S3AInputPolicy.Random, 0, _1MB, -1, _256K, _1MB}, + {S3AInputPolicy.Random, 0, _1MB, -1, 0, _1MB}, {S3AInputPolicy.Random, _10MB + _64K, _1MB, _10MB, _256K, _10MB}, + {S3AInputPolicy.Random, _10MB, _64K, -1, _256K, _10MB + _256K}, + {S3AInputPolicy.Random, _10MB, _1MB, -1, _256K, _10MB + _1MB}, }); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java index c858c9933fc7e..fbd8333f9eff5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java @@ -52,6 +52,7 @@ public void testUnbuffer() throws IOException { Path path = new Path("/file"); ObjectMetadata meta = mock(ObjectMetadata.class); when(meta.getContentLength()).thenReturn(1L); + when(meta.getRawMetadataValue("Content-Range")).thenReturn("bytes 0-0/1"); when(meta.getLastModified()).thenReturn(new Date(2L)); when(meta.getETag()).thenReturn("mock-etag"); when(s3.getObjectMetadata(any())).thenReturn(meta); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextMainOperations.java index 3b4eaf4a80667..ea1a1e9157ebb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextMainOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextMainOperations.java @@ -63,4 +63,9 @@ public void testSetVerifyChecksum() throws IOException { //checksums ignored, so test removed } + @Test + @Ignore + public void testOpenFileLazyFail() throws Throwable { + // FileNotFoundException with be delayed until first read + } }