Skip to content

[FLINK-39533][s3][backport] Use abort() instead of drain on close/seek when remaining bytes exceed threshold in NativeS3InputStream#28052

Merged
gaborgsomogyi merged 1 commit into
apache:release-2.3from
Samrat002:FLINK-39533-release-2.3
Apr 28, 2026
Merged

Conversation

@Samrat002
Copy link
Copy Markdown
Contributor

What is the purpose of the change

Backport of #28012 (master, merged as 3d6dff7f76ee4da1bc9e095a311d9055780222da) to release-2.3.

NativeS3InputStream calls ResponseInputStream.close() when releasing streams during seek(), skip(), and close() operations. Apache HttpClient's close() implementation drains all remaining bytes from the response body to enable HTTP connection reuse. For large S3 objects where only a small portion was read (e.g. checkpoint metadata from a multi-GB state file), this drains potentially gigabytes of data over the network — causing severe latency during checkpoint restore and seek-heavy read patterns.

The AWS SDK v2 ResponseInputStream JavaDoc explicitly recommends calling abort() when remaining data is not needed. This change replaces close() with abort() in the stream release path, plus the follow-ups from review:

  1. lazyInitialize() annotated @GuardedBy("lock") with the same runtime precondition as the other lock-guarded helpers, so a future lock-move refactor cannot silently break the invariants.
  2. EOF check (position >= contentLength) is now evaluated before lazyInitialize() in both read() and read(byte[], int, int). position and contentLength are authoritative, so we never need a stream just to return -1. This is also a prerequisite for (3).
  3. seek(contentLength) and skip() to EOF used to issue bytes=contentLength- on the open stream — RFC 7233 declares this range unsatisfiable and S3 returns HTTP 416. Both paths now release the stream instead of reopening it; subsequent reads short-circuit via the EOF check above. Factored into a single repositionOpenStream() helper used by both call sites.

Brief change log

  • NativeS3InputStream:
    • releaseStream() aborts before closing so subsequent SDK close() does not drain.
    • lazyInitialize() is now @GuardedBy("lock") with a runtime lock-held assertion.
    • EOF short-circuit moved before lazyInitialize() in read() / read(byte[], int, int).
    • New repositionOpenStream() helper called from seek() / skip(); releases instead of reopening at EOF.
    • seek() throws EOFException (matching the Hadoop S3A contract) for negative positions and seeks past contentLength.
  • NativeS3InputStreamTest (new): 13 tests covering abort-before-close ordering, position tracking, EOF semantics, error paths, and the seek/skip-to-EOF no-reopen invariant.

Verifying this change

This change added tests and was verified as follows:

  • Cherry-pick is content-identical to the merged master commit 3d6dff7f76ee4da1bc9e095a311d9055780222da (git diff is empty).
  • mvn -pl flink-filesystems/flink-s3-fs-native test -Dtest=NativeS3InputStreamTest13/13 passing on this branch.
  • Manually validated end-to-end on a local Flink cluster from this branch with a stateful job (RocksDB state backend, incremental checkpointing) writing ~1 GB of state across many SST files to a real AWS S3 bucket (ap-south-1). Triggered stop-with-savepoint, restored from the savepoint, ran further checkpoints. Confirmed:
    • Restore succeeded; all subtasks reached RUNNING.
    • First post-restore checkpoint (chk-15, 3.2 GB) completed successfully.
    • JM and TM logs contain no HTTP 416 / InvalidRange, no requires lock to be held assertions, no Error aborting S3 / Error closing warnings, no leaked-stream notices, and zero ERROR-level entries.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes (S3 read path used by checkpoint/savepoint download)
  • The S3 file system connector: yes

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes , Claude sonet is used to create the backport

…aining bytes exceed threshold in NativeS3InputStream

* [FLINK-39533][s3] Use abort() instead of drain on close/seek when remaining bytes exceed threshold in NativeS3InputStream

* [FLINK-39533][s3] Address to review comments. Bug at the end of stream and addres to s3 returning 416

(cherry picked from commit 3d6dff7)
@Samrat002
Copy link
Copy Markdown
Contributor Author

@gaborgsomogyi PTAL

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 27, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Samrat002
Copy link
Copy Markdown
Contributor Author

@gaborgsomogyi / @RocMarshal can you help merging the changes

@gaborgsomogyi gaborgsomogyi merged commit 4eaf745 into apache:release-2.3 Apr 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants