Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@
*
* <p><b>Thread Safety:</b> Internal state is guarded by a lock to ensure safe concurrent access and
* resource cleanup.
*
* <p><b>Partial reads and {@code seek}:</b> Apache HttpClient tries to drain the remainder of the
* response body on {@link java.io.InputStream#close()} to reuse connections. After a {@link #seek}
* or early close, draining can fail against S3-compatible endpoints (for example MinIO) with {@code
* ConnectionClosedException: Premature end of Content-Length delimited message body}.
*
* <p>The {@code close()} path handles this gracefully: it attempts normal connection cleanup first
* (which preserves HTTP connection reuse for well-behaved servers), and only falls back to {@link
* ResponseInputStream#abort()} when the connection was already closed early by the server — in
* which case the connection is not reusable anyway, so aborting carries no performance penalty.
* Abandoned streams after {@link #seek} always use {@code abort()} immediately since the connection
* is being discarded regardless.
*/
class NativeS3InputStream extends FSDataInputStream {

Expand Down Expand Up @@ -100,32 +112,15 @@ private void lazyInitialize() throws IOException {
* <p>This method:
*
* <ul>
* <li>Closes any existing stream
* <li>Closes any existing stream (using {@code abort()} since it is being discarded)
* <li>Opens a new stream starting at {@link #position}
* <li>Uses HTTP range requests for non-zero positions
* </ul>
*/
private void openStreamAtCurrentPosition() throws IOException {
lock.lock();
try {
if (bufferedStream != null) {
try {
bufferedStream.close();
} catch (IOException e) {
LOG.warn("Error closing buffered stream for {}/{}", bucketName, key, e);
} finally {
bufferedStream = null;
}
}
if (currentStream != null) {
try {
currentStream.close();
} catch (IOException e) {
LOG.warn("Error closing S3 response stream for {}/{}", bucketName, key, e);
} finally {
currentStream = null;
}
}
releaseCurrentObjectStream(true);

try {
GetObjectRequest.Builder requestBuilder =
Expand All @@ -143,27 +138,118 @@ private void openStreamAtCurrentPosition() throws IOException {
currentStream = s3Client.getObject(requestBuilder.build());
bufferedStream = new BufferedInputStream(currentStream, readBufferSize);
} catch (Exception e) {
if (bufferedStream != null) {
try {
bufferedStream.close();
} catch (IOException ignored) {
}
bufferedStream = null;
}
if (currentStream != null) {
try {
currentStream.close();
} catch (IOException ignored) {
}
currentStream = null;
}
releaseCurrentObjectStream(true);
throw new IOException("Failed to open S3 stream for " + bucketName + "/" + key, e);
}
} finally {
lock.unlock();
}
}

/**
* Releases the in-flight {@code GetObject} HTTP response.
*
* <p>When {@code abandon} is {@code true}, the stream is being discarded (e.g. after a {@link
* #seek}) and {@link ResponseInputStream#abort()} is called immediately, bypassing any attempt
* to drain the response body. This is necessary for correctness because the caller has already
* moved on and will not consume the rest of the body.
*
* <p>When {@code abandon} is {@code false}, normal {@code close()} is attempted first. This
* preserves HTTP connection reuse for well-behaved S3 servers. If the server closes the
* connection early (a pattern seen on MinIO and other S3-compatible storage), the resulting
* {@code ConnectionClosedException} is caught, treated as non-fatal, and escalated to a WARN
* log. The connection is then aborted since it is no longer usable anyway.
*
* @param abandon {@code true} to use {@code abort()} immediately (stream is being abandoned
* after seek); {@code false} to try {@code close()} first, falling back to {@code abort()}
* on early-connection-close from MinIO
*/
private void releaseCurrentObjectStream(boolean abandon) {
if (currentStream == null && bufferedStream == null) {
return;
}
if (abandon && currentStream != null) {
try {
currentStream.abort();
} catch (RuntimeException e) {
LOG.warn("Error aborting S3 response stream for {}/{}", bucketName, key, e);
} finally {
bufferedStream = null;
currentStream = null;
}
return;
}

// Normal close path: attempt graceful cleanup to preserve HTTP connection reuse.
// S3-compatible storage (MinIO) may close the connection before all bytes are drained.
// In that case the ConnectionClosedException is non-fatal — abort the stream and
// log a WARN so the task does not fail.
if (bufferedStream != null) {
try {
bufferedStream.close();
} catch (IOException e) {
if (isPrematureEndOfMessage(e)) {
LOG.warn(
"S3 server closed connection prematurely for {}/{} (expected {} bytes) "
+ "-- aborting and treating as non-fatal",
bucketName,
key,
contentLength,
e);
abortSafely();
} else {
LOG.warn("Error closing buffered stream for {}/{}", bucketName, key, e);
}
} finally {
bufferedStream = null;
currentStream = null;
}
} else if (currentStream != null) {
try {
currentStream.close();
} catch (IOException e) {
if (isPrematureEndOfMessage(e)) {
LOG.warn(
"S3 server closed connection prematurely for {}/{} (expected {} bytes) "
+ "-- aborting and treating as non-fatal",
bucketName,
key,
contentLength,
e);
abortSafely();
} else {
LOG.warn("Error closing S3 response stream for {}/{}", bucketName, key, e);
}
} finally {
currentStream = null;
}
}
}

/** Abort the underlying {@code ResponseInputStream} safely, logging any exception as a WARN. */
private void abortSafely() {
if (currentStream == null) {
return;
}
try {
currentStream.abort();
} catch (RuntimeException e) {
LOG.warn("Error aborting S3 response stream for {}/{}", bucketName, key, e);
}
}

/**
* Returns true if the given I/O exception represents a connection closed before all bytes were
* received — a pattern seen on S3-compatible storage (MinIO) when it closes a connection early
* and Apache HttpClient tries to drain the remainder.
*/
private static boolean isPrematureEndOfMessage(IOException e) {
String msg = e.getMessage() != null ? e.getMessage() : "";
return msg.contains("Premature end of Content-Length")
|| msg.contains("Connection closed")
|| msg.contains("ConnectionClosed");
}

@Override
public void seek(long desired) throws IOException {
lock();
Expand Down Expand Up @@ -268,45 +354,20 @@ public void close() throws IOException {
if (closed) {
return;
}

closed = true;
IOException exception = null;

if (bufferedStream != null) {
try {
bufferedStream.close();
} catch (IOException e) {
exception = e;
LOG.warn("Error closing buffered stream for {}/{}", bucketName, key, e);
} finally {
bufferedStream = null;
}
}

if (currentStream != null) {
try {
currentStream.close();
} catch (IOException e) {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
LOG.warn("Error closing S3 response stream for {}/{}", bucketName, key, e);
} finally {
currentStream = null;
}
}
// Only skip normal close and go straight to abort if the stream was fully read.
// Otherwise we try close() first (to preserve connection reuse), falling back to
// abort() when MinIO closed the connection early (which makes reuse impossible anyway).
boolean discardRemaining = position >= contentLength;
releaseCurrentObjectStream(discardRemaining);

LOG.debug(
"Closed S3 input stream - bucket: {}, key: {}, final position: {}/{}",
bucketName,
key,
position,
contentLength);
if (exception != null) {
throw exception;
}
} finally {
lock.unlock();
}
Expand Down