Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3352,6 +3352,7 @@ public FileStatus getFileStatus(final Path f) throws IOException {
S3AFileStatus innerGetFileStatus(final Path f,
final boolean needEmptyDirectoryFlag,
final Set<StatusProbeEnum> probes) throws IOException {

final Path path = qualify(f);
String key = pathToKey(path);
LOG.debug("Getting path status for {} ({}); needEmptyDirectory={}",
Expand Down Expand Up @@ -3424,6 +3425,22 @@ S3AFileStatus s3GetFileStatus(final Path path,
return new S3AFileStatus(Tristate.UNKNOWN, path, username);
}

// get file status without a probe
if (!key.isEmpty() && !key.endsWith("/")
&& probes.isEmpty()) {
// use negative value for length
// to tell that object length is unknown
// it will be updated on first read
// also no information about modification time, etag and version id
return new S3AFileStatus(-1,
0L,
path,
getDefaultBlockSize(path),
username,
null,
null);
}

if (!key.isEmpty() && !key.endsWith("/")
&& probes.contains(StatusProbeEnum.Head)) {
try {
Expand Down Expand Up @@ -4902,11 +4919,10 @@ private S3AFileStatus extractOrFetchSimpleFileStatus(
throw new FileNotFoundException(path.toString() + " is a directory");
}
} else {
// Executes a HEAD only.
// therefore: if there is is a dir marker, this
// will raise a FileNotFoundException
// Get simple file status without a probe
// therefore: no content length, modification time, etag or version id
fileStatus = innerGetFileStatus(path, false,
StatusProbeEnum.HEAD_ONLY);
StatusProbeEnum.NONE);
}

return fileStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import javax.annotation.Nullable;

import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
Expand All @@ -49,6 +51,7 @@

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.util.StringUtils.toLowerCase;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_416;

/**
* The input stream for an S3A object.
Expand Down Expand Up @@ -98,12 +101,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
*/
private S3Object object;
private S3ObjectInputStream wrappedStream;
private long contentLength;
private final S3AReadOpContext context;
private final InputStreamCallbacks client;
private final String bucket;
private final String key;
private final String pathStr;
private final long contentLength;
private final String uri;
private static final Logger LOG =
LoggerFactory.getLogger(S3AInputStream.class);
Expand Down Expand Up @@ -151,7 +154,6 @@ public S3AInputStream(S3AReadOpContext ctx,
"No Bucket");
Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
long l = s3Attributes.getLen();
Preconditions.checkArgument(l >= 0, "Negative content length");
this.context = ctx;
this.bucket = s3Attributes.getBucket();
this.key = s3Attributes.getKey();
Expand Down Expand Up @@ -180,6 +182,28 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) {
streamStatistics.inputPolicySet(inputPolicy.ordinal());
}

/***
* Extract content length from Content-Range header.
* @param objectMetadata S3 Object metadata
* @return content length
*/
public static long getContentLength(ObjectMetadata objectMetadata) {
String contentRange = (String)objectMetadata.getRawMetadataValue("Content-Range");
long length = -1;
if (contentRange != null) {
String[] tokens = contentRange.split("[ -/]+");

try {
length = Long.parseLong(tokens[tokens.length - 1]);
} catch (NumberFormatException e) {
LOG.warn("Unable to parse content range. Header 'Content-Range' has corrupted data"
+ e.getMessage(), e);
}
}

return length;
}

/**
* Opens up the stream at specified target position and for given length.
*
Expand Down Expand Up @@ -215,7 +239,27 @@ private synchronized void reopen(String reason, long targetPos, long length,
try {
object = Invoker.once(text, uri,
() -> client.getObject(request));

// Update content length from first read
// by getting information from Content-Range header
if (contentLength < 0) {
contentLength = getContentLength(object.getObjectMetadata());
LOG.debug("Updating contentLength to {}", contentLength);
}

} catch(IOException e) {
// when error out of range
// update content length to ActualObjectSize from additional details
if (e.getCause() instanceof AmazonS3Exception) {
AmazonS3Exception s3Exception = (AmazonS3Exception) e.getCause();
if (s3Exception.getStatusCode() == SC_416
&& s3Exception.getAdditionalDetails() != null
&& s3Exception.getAdditionalDetails().containsKey("ActualObjectSize")) {
String objectSize = s3Exception.getAdditionalDetails().get("ActualObjectSize");
contentLength = Long.parseLong(objectSize);
LOG.debug("Updating contentLength to {}", contentLength);
}
}
// input function failed: note it
tracker.failed();
// and rethrow
Expand Down Expand Up @@ -254,7 +298,7 @@ public synchronized void seek(long targetPos) throws IOException {
+ " " + targetPos);
}

if (this.contentLength <= 0) {
if (this.contentLength == 0) {
return;
}

Expand Down Expand Up @@ -396,16 +440,18 @@ private void incrementBytesRead(long bytesRead) {
@Retries.RetryTranslated
public synchronized int read() throws IOException {
checkNotClosed();
if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
return -1;
}

try {
lazySeek(nextReadPos, 1);
} catch (EOFException e) {
return -1;
}

// content length is updated after the first read from lazySeek
if (this.contentLength == 0 || (contentLength > 0 && nextReadPos >= contentLength)) {
return -1;
}

Invoker invoker = context.getReadInvoker();
int byteRead = invoker.retry("read", pathStr, true,
() -> {
Expand Down Expand Up @@ -480,17 +526,18 @@ public synchronized int read(byte[] buf, int off, int len)
return 0;
}

if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
return -1;
}

try {
lazySeek(nextReadPos, len);
} catch (EOFException e) {
// the end of the file has moved
return -1;
}

// content length is updated after the first read from lazySeek
if (this.contentLength == 0 || (contentLength > 0 && nextReadPos >= contentLength)) {
return -1;
}

Invoker invoker = context.getReadInvoker();

streamStatistics.readOperationStarted(nextReadPos, len);
Expand Down Expand Up @@ -844,6 +891,15 @@ static long calculateRequestLimit(
}
// cannot read past the end of the object
rangeLimit = Math.min(contentLength, rangeLimit);

// negative range limit means content length is unknown
// we should set it to a positive value that greater than targetPos
// this is possible because AWS SDK will response within content length limit
// even if request range exceed content length
if (rangeLimit < 0) {
rangeLimit = targetPos + Math.max(readahead, length);
}

return rangeLimit;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ private InternalConstants() {
/** 404 error code. */
public static final int SC_404 = 404;

/** 416 Range Not Satisfiable error code. */
public static final int SC_416 = 416;

/** Name of the log for throttling events. Value: {@value}. */
public static final String THROTTLE_LOG_NAME =
"org.apache.hadoop.fs.s3a.throttled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public enum StatusProbeEnum {
public static final Set<StatusProbeEnum> LIST_ONLY =
EnumSet.of(List);

/**
* No probe.
*/
public static final Set<StatusProbeEnum> NONE =
EnumSet.noneOf(StatusProbeEnum.class);

/** Look for files and directories. */
public static final Set<StatusProbeEnum> FILE =
HEAD_ONLY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@
package org.apache.hadoop.fs.contract.s3a;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.Ignore;

import java.io.FileNotFoundException;
import java.io.IOException;

/**
* S3A contract tests opening files.
*/
public class ITestS3AContractOpen extends AbstractContractOpenTest {
private FSDataInputStream instream;

@Override
protected AbstractFSContract createContract(Configuration conf) {
Expand All @@ -40,4 +48,91 @@ protected AbstractFSContract createContract(Configuration conf) {
protected boolean areZeroByteFilesEncrypted() {
return true;
}

/**
* From HADOOP-17415, S3A will skip HEAD request to get file status when open file.
* Therefore, FileNotFoundException will be delayed until the first read occur.
*/
@Override
public void testOpenReadDir() throws Throwable {
describe("create & read a directory");
Path path = path("zero.dir");
mkdirs(path);

try {
instream = getFileSystem().open(path);
int c = instream.read();
fail("A directory has been opened for reading");
} catch (FileNotFoundException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("opening a directory for reading", "FileNotFoundException", e);
}
}

@Override
public void testOpenReadDirWithChild() throws Throwable {
describe("create & read a directory which has a child");
Path path = path("zero.dir");
mkdirs(path);
Path path2 = new Path(path, "child");
mkdirs(path2);

try {
instream = getFileSystem().open(path);
int c = instream.read();
fail("A directory has been opened for reading");
} catch (FileNotFoundException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("opening a directory for reading", "FileNotFoundException", e);
}
}

@Override
public void testOpenFileLazyFail() throws Throwable {
describe("openFile fails on a missing file in the read() and not before");
FutureDataInputStreamBuilder builder =
getFileSystem().openFile(path("testOpenFileLazyFail")).opt("fs.test.something", true);

try {
instream = builder.build().get();
int c = instream.read();
fail("A non existing file has been opened for reading");
} catch (FileNotFoundException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("opening a non existing file for reading", "FileNotFoundException", e);
}
}

@Override
@Ignore
public void testOpenFileFailExceptionally() throws Throwable {
// does not fail on openFile
}

@Override
@Ignore
public void testAwaitFutureFailToFNFE() throws Throwable {
// does not fail on openFile
}

@Override
@Ignore
public void testAwaitFutureTimeoutFailToFNFE() throws Throwable {
// does not fail on openFile
}

@Override
@Ignore
public void testOpenFileExceptionallyTranslating() throws Throwable {
// does not fail on openFile
}

@Override
@Ignore
public void testChainedFailureAwaitFuture() throws Throwable {
// does not fail on openFile
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,11 @@ public void testDirProbes() throws Throwable {
FILE_STATUS_DIR_PROBE);
assertEmptyDirStatus(status, Tristate.TRUE);

// skip all probes and expect no operations to take place
interceptGetFileStatusFNFE(emptydir, false,
// skip all probes and return file status with no content length
status = verifyInnerGetFileStatus(emptydir, false,
EnumSet.noneOf(StatusProbeEnum.class),
NO_IO);
assertEmptyDirStatus(status, Tristate.FALSE);

// now add a trailing slash to the key and use the
// deep internal s3GetFileStatus method call.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,27 @@ public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{S3AInputPolicy.Normal, 0, -1, 0, _64K, 0},
{S3AInputPolicy.Normal, 0, -1, _10MB, _64K, _10MB},
{S3AInputPolicy.Normal, 0, -1, -1, _64K, _64K},
{S3AInputPolicy.Normal, _64K, _64K, _10MB, _64K, _10MB},
{S3AInputPolicy.Sequential, 0, -1, 0, _64K, 0},
{S3AInputPolicy.Sequential, 0, -1, _10MB, _64K, _10MB},
{S3AInputPolicy.Sequential, 0, -1, -1, _64K, _64K},
{S3AInputPolicy.Random, 0, -1, 0, _64K, 0},
{S3AInputPolicy.Random, 0, -1, _10MB, _64K, _10MB},
{S3AInputPolicy.Random, 0, -1, -1, _64K, _64K},
{S3AInputPolicy.Random, 0, _128K, _10MB, _64K, _128K},
{S3AInputPolicy.Random, 0, _128K, _10MB, _256K, _256K},
{S3AInputPolicy.Random, 0, _128K, -1, _64K, _128K},
{S3AInputPolicy.Random, 0, _128K, -1, _256K, _256K},
{S3AInputPolicy.Random, 0, 0, _10MB, _256K, _256K},
{S3AInputPolicy.Random, 0, 1, _10MB, _256K, _256K},
{S3AInputPolicy.Random, 0, _1MB, _10MB, _256K, _1MB},
{S3AInputPolicy.Random, 0, _1MB, _10MB, 0, _1MB},
{S3AInputPolicy.Random, 0, _1MB, -1, _256K, _1MB},
{S3AInputPolicy.Random, 0, _1MB, -1, 0, _1MB},
{S3AInputPolicy.Random, _10MB + _64K, _1MB, _10MB, _256K, _10MB},
{S3AInputPolicy.Random, _10MB, _64K, -1, _256K, _10MB + _256K},
{S3AInputPolicy.Random, _10MB, _1MB, -1, _256K, _10MB + _1MB},
});
}

Expand Down
Loading