Skip to content

[FLINK-39778][s3] Recoverable writer silently loses the in-flight tail on resume#28268

Open
Samrat002 wants to merge 2 commits into
apache:masterfrom
Samrat002:FLINK-39778
Open

[FLINK-39778][s3] Recoverable writer silently loses the in-flight tail on resume#28268
Samrat002 wants to merge 2 commits into
apache:masterfrom
Samrat002:FLINK-39778

Conversation

@Samrat002
Copy link
Copy Markdown
Contributor

@Samrat002 Samrat002 commented May 27, 2026

What is the purpose of the change

NativeS3RecoverableWriter.recover() silently discarded the sub-part-size tail that persist() had durably uploaded to S3 as a side object. After a crash-and-restore cycle, any bytes written since the last full-part boundary were permanently lost, violating Flink's exactly-once guarantee.

This patch fixes the data loss by downloading the side object during recover() and seeding the resumed output stream with those bytes before accepting further writes.

Brief change log

recover() in NativeS3RecoverableWriter now downloads the incomplete-tail side object and seeds the resumed stream with those bytes before accepting new writes. A downloadIncompleteTail() helper validates the length and cleans up the local file on failure. NativeS3RecoverableFsDataOutputStream gains a resume constructor that opens the seed file in append mode so position accounting is correct from the start.

Verifying this change

UT to showcase the bug and fix working

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no) no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no) no
  • The serializers: (yes / no / don't know) no
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know) no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no
  • The S3 file system connector: (yes / no / don't know) yes

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 27, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@Izeren Izeren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't reviewed all tests in details yet, but I would like first to understand better the logic about partially uploaded subparts. My initial impression from the FLIP was that we would like to store incomplete parts in-line in state and treat part upload as atomic operation. Did we choose not to do that? I wonder because I am not sure in reliability of incomplete parts. Are they subject to the same lifecycle policies as incomplete MPUs or some different policy?

s3AccessHelper.getObject(s3recoverable.incompleteObjectName(), target);
if (downloaded != s3recoverable.incompleteObjectLength()) {
throw new IOException(
"Incomplete-tail object "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception doesn't tell what are the implications. Does it mean that state is corrupted and can't be recovered unless object on S3 is restored? If so, would be useful to explain it. Would help both oncall engineer and to classify such errors correctly (retriable/non-retriable)

* <p><b>Thread safety:</b> not thread-safe. Use a single thread per instance, matching the
* single-thread invariant of the production {@link NativeS3RecoverableFsDataOutputStream}.
*/
public final class InMemoryNativeS3Operations extends NativeS3ObjectOperations {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would it be better to have NativeS3ObjectOperations as interface otherwise you still would need to bring in all sdk based implementation dependencies.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. An interface would make the test seam cleaner and keep SDK types off the test classpath. The reason I didn't do it here is scope:

NativeS3ObjectOperations returns SDK types directly (CompletedPart, UploadPartResponse, PartETag-like records), so introducing an interface means either

(a) extracting Flink-owned DTOs to replace those return types across NativeS3RecoverableFsDataOutputStream / NativeS3Committer / NativeS3RecoverableWriter, or

(b) leaving the SDK types in the interface signatures , which doesn't actually remove the dependency.

Both options are meaningful refactors that I'd rather not bundle into a data-loss bugfix. For now the test subclass passes null for the SDK ctor args and overrides every method it touches, so no SDK client is constructed at test time (the SDK is only on the compile classpath, which it already is for main code). I'll file a follow-up to do the interface extraction properly — happy to take it on right after this lands. WDYT?

Checked https://github.com/localstack/localstack. It is not maintained anymore.

* <p><b>Thread safety:</b> not thread-safe. Use a single thread per instance, matching the
* single-thread invariant of the production {@link NativeS3RecoverableFsDataOutputStream}.
*/
public final class InMemoryNativeS3Operations extends NativeS3ObjectOperations {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is meant to be used as test harness for FileSystem testing (as replacement of localStack). Arguably it is good to have tests for it too

InMemoryNativeS3Operations s3 = new InMemoryNativeS3Operations();
NativeS3RecoverableWriter writer1 =
NativeS3RecoverableWriter.writer(
s3, tmp.toString(), MIN_PART_SIZE, /* maxConcurrent */ 1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the impact of concurrency on these incomplete subparts? How many of incomplete subparts can exist per file path at the same time?

Copy link
Copy Markdown
Contributor Author

@Samrat002 Samrat002 Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Flink's RecoverableWriter contract gives a single writer instance ownership of a single open stream per file path. There is no concurrent writer for the same path.
  • Each persist() call uploads a fresh side object under /.incomplete/ (unique per call, see NativeS3RecoverableFsDataOutputStream). So back-to-back persist()s do not race. they produce distinct objects.
  • At any point in time, the number of live side objects for a given path equals the number of un-retired checkpoints that carried tail bytes for that file. When Flink retires a checkpoint it calls cleanupRecoverableState(), which deletes the corresponding side object NativeS3RecoverableWriter#cleanupRecoverableState.
  • On recovery, only the side object referenced by the restored ResumeRecoverable is consulted. Older side objects from earlier checkpoints are independent. Flink's retention policy governs their lifetime, not the recover path.

So there's no concurrency on the side object itself. It's written once during persist(), read at most once during recover(), and deleted once during checkpoint retirement.

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label May 30, 2026
@Samrat002
Copy link
Copy Markdown
Contributor Author

I haven't reviewed all tests in details yet, but I would like first to understand better the logic about partially uploaded subparts. My initial impression from the FLIP was that we would like to store incomplete parts in-line in state and treat part upload as atomic operation. Did we choose not to do that? I wonder because I am not sure in reliability of incomplete parts. Are they subject to the same lifecycle policies as incomplete MPUs or some different policy?

Yes, you are right. That was initially discussed. what we are observing at the scale of production, users don't really set policies. There are billions of MPU get accumulated and, leading to high cost.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants