diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java index f6dca047f648d..d7490e4c93e8b 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java @@ -37,6 +37,18 @@ * *

Thread Safety: Internal state is guarded by a lock to ensure safe concurrent access and * resource cleanup. + * + *

Partial reads and {@code seek}: Apache HttpClient tries to drain the remainder of the + * response body on {@link java.io.InputStream#close()} to reuse connections. After a {@link #seek} + * or early close, draining can fail against S3-compatible endpoints (for example MinIO) with {@code + * ConnectionClosedException: Premature end of Content-Length delimited message body}. + * + *

The {@code close()} path handles this gracefully: it attempts normal connection cleanup first + * (which preserves HTTP connection reuse for well-behaved servers), and only falls back to {@link + * ResponseInputStream#abort()} when the connection was already closed early by the server — in + * which case the connection is not reusable anyway, so aborting carries no performance penalty. + * Abandoned streams after {@link #seek} always use {@code abort()} immediately since the connection + * is being discarded regardless. */ class NativeS3InputStream extends FSDataInputStream { @@ -100,7 +112,7 @@ private void lazyInitialize() throws IOException { *

This method: * *

@@ -108,24 +120,7 @@ private void lazyInitialize() throws IOException { private void openStreamAtCurrentPosition() throws IOException { lock.lock(); try { - if (bufferedStream != null) { - try { - bufferedStream.close(); - } catch (IOException e) { - LOG.warn("Error closing buffered stream for {}/{}", bucketName, key, e); - } finally { - bufferedStream = null; - } - } - if (currentStream != null) { - try { - currentStream.close(); - } catch (IOException e) { - LOG.warn("Error closing S3 response stream for {}/{}", bucketName, key, e); - } finally { - currentStream = null; - } - } + releaseCurrentObjectStream(true); try { GetObjectRequest.Builder requestBuilder = @@ -143,20 +138,7 @@ private void openStreamAtCurrentPosition() throws IOException { currentStream = s3Client.getObject(requestBuilder.build()); bufferedStream = new BufferedInputStream(currentStream, readBufferSize); } catch (Exception e) { - if (bufferedStream != null) { - try { - bufferedStream.close(); - } catch (IOException ignored) { - } - bufferedStream = null; - } - if (currentStream != null) { - try { - currentStream.close(); - } catch (IOException ignored) { - } - currentStream = null; - } + releaseCurrentObjectStream(true); throw new IOException("Failed to open S3 stream for " + bucketName + "/" + key, e); } } finally { @@ -164,6 +146,110 @@ private void openStreamAtCurrentPosition() throws IOException { } } + /** + * Releases the in-flight {@code GetObject} HTTP response. + * + *

When {@code abandon} is {@code true}, the stream is being discarded (e.g. after a {@link + * #seek}) and {@link ResponseInputStream#abort()} is called immediately, bypassing any attempt + * to drain the response body. This is necessary for correctness because the caller has already + * moved on and will not consume the rest of the body. + * + *

When {@code abandon} is {@code false}, normal {@code close()} is attempted first. This + * preserves HTTP connection reuse for well-behaved S3 servers. If the server closes the + * connection early (a pattern seen on MinIO and other S3-compatible storage), the resulting + * {@code ConnectionClosedException} is caught, treated as non-fatal, and escalated to a WARN + * log. The connection is then aborted since it is no longer usable anyway. + * + * @param abandon {@code true} to use {@code abort()} immediately (stream is being abandoned + * after seek); {@code false} to try {@code close()} first, falling back to {@code abort()} + * on early-connection-close from MinIO + */ + private void releaseCurrentObjectStream(boolean abandon) { + if (currentStream == null && bufferedStream == null) { + return; + } + if (abandon && currentStream != null) { + try { + currentStream.abort(); + } catch (RuntimeException e) { + LOG.warn("Error aborting S3 response stream for {}/{}", bucketName, key, e); + } finally { + bufferedStream = null; + currentStream = null; + } + return; + } + + // Normal close path: attempt graceful cleanup to preserve HTTP connection reuse. + // S3-compatible storage (MinIO) may close the connection before all bytes are drained. + // In that case the ConnectionClosedException is non-fatal — abort the stream and + // log a WARN so the task does not fail. + if (bufferedStream != null) { + try { + bufferedStream.close(); + } catch (IOException e) { + if (isPrematureEndOfMessage(e)) { + LOG.warn( + "S3 server closed connection prematurely for {}/{} (expected {} bytes) " + + "-- aborting and treating as non-fatal", + bucketName, + key, + contentLength, + e); + abortSafely(); + } else { + LOG.warn("Error closing buffered stream for {}/{}", bucketName, key, e); + } + } finally { + bufferedStream = null; + currentStream = null; + } + } else if (currentStream != null) { + try { + currentStream.close(); + } catch (IOException e) { + if (isPrematureEndOfMessage(e)) { + LOG.warn( + "S3 server closed connection prematurely for {}/{} (expected {} bytes) " + + "-- aborting and treating as non-fatal", + bucketName, + key, + contentLength, + e); + abortSafely(); + } else { + LOG.warn("Error closing S3 response stream for {}/{}", bucketName, key, e); + } + } finally { + currentStream = null; + } + } + } + + /** Abort the underlying {@code ResponseInputStream} safely, logging any exception as a WARN. */ + private void abortSafely() { + if (currentStream == null) { + return; + } + try { + currentStream.abort(); + } catch (RuntimeException e) { + LOG.warn("Error aborting S3 response stream for {}/{}", bucketName, key, e); + } + } + + /** + * Returns true if the given I/O exception represents a connection closed before all bytes were + * received — a pattern seen on S3-compatible storage (MinIO) when it closes a connection early + * and Apache HttpClient tries to drain the remainder. + */ + private static boolean isPrematureEndOfMessage(IOException e) { + String msg = e.getMessage() != null ? e.getMessage() : ""; + return msg.contains("Premature end of Content-Length") + || msg.contains("Connection closed") + || msg.contains("ConnectionClosed"); + } + @Override public void seek(long desired) throws IOException { lock(); @@ -268,35 +354,13 @@ public void close() throws IOException { if (closed) { return; } - closed = true; - IOException exception = null; - if (bufferedStream != null) { - try { - bufferedStream.close(); - } catch (IOException e) { - exception = e; - LOG.warn("Error closing buffered stream for {}/{}", bucketName, key, e); - } finally { - bufferedStream = null; - } - } - - if (currentStream != null) { - try { - currentStream.close(); - } catch (IOException e) { - if (exception == null) { - exception = e; - } else { - exception.addSuppressed(e); - } - LOG.warn("Error closing S3 response stream for {}/{}", bucketName, key, e); - } finally { - currentStream = null; - } - } + // Only skip normal close and go straight to abort if the stream was fully read. + // Otherwise we try close() first (to preserve connection reuse), falling back to + // abort() when MinIO closed the connection early (which makes reuse impossible anyway). + boolean discardRemaining = position >= contentLength; + releaseCurrentObjectStream(discardRemaining); LOG.debug( "Closed S3 input stream - bucket: {}, key: {}, final position: {}/{}", @@ -304,9 +368,6 @@ public void close() throws IOException { key, position, contentLength); - if (exception != null) { - throw exception; - } } finally { lock.unlock(); }