Checkpoint Store: S3 Backend Design #6554
Replies: 9 comments 4 replies
-
Beta Was this translation helpful? Give feedback.
-
|
The current design for A simpler approach: instead of updating each This turns This is essentially the same append-only log pattern that Delta Lake's transaction log uses, and for the same reasons. |
Beta Was this translation helpful? Give feedback.
-
|
One scenario not covered in the current design: what happens when the checkpoint store itself becomes unavailable? My thinking: checkpoint store availability shouldn't be on the critical path of task execution. A checkpoint is a best-effort durability enhancement, not a correctness requirement — if the S3 backend becomes inaccessible due to credential expiry, account suspension, or a service-side outage, the task should still complete normally. The real concern with naive retry logic isn't that retries happen, but that checkpoint operations are frequent throughout a job's lifetime. If each failure triggers multiple retries, the cumulative latency compounds across the entire run — a side-channel concern starts meaningfully degrading end-to-end job performance. A circuit-breaker approach seems more appropriate: after a configurable number of consecutive failures, the store silently degrades — subsequent stage_keys, stage_files, and checkpoint() calls become no-ops, with a warning logged. The only consequence is that this run isn't checkpointed; on re-run, the full job executes again. |
Beta Was this translation helpful? Give feedback.
-
|
My thinking on how a store maps to tasks and runs: the same base path should be reusable across re-runs — creating a new path per run defeats the purpose of cross-run progress tracking. But simple reuse is tricky, because partition boundaries aren't stable across runs (as discussed in #6446). A snapshot-based approach avoids this problem. The store path stays fixed; each run reads from snapshot Another properties this gives:
The trade-off worth flagging: the snapshot approach trades write/read amplification for simplicity — no partition matching required, no determinism assumptions needed. |
Beta Was this translation helpful? Give feedback.
-
|
A few thoughts after implementing the S3 backend: On orphaned staged entries They matter less than they might seem. Checkpoint data only has value until the next successful job run — at which point all checkpoint data (orphaned or not) can be deleted together. Orphaned entries don't need special handling; they become irrelevant at the same time as the rest. On TTL via S3 lifecycle rules I'd avoid this as a built-in approach: applying lifecycle rules requires Proposed design: Two policies cover the main use cases:
An explicit |
Beta Was this translation helpful? Give feedback.
-
|
Initial S3 backend implementation is up in #6599 — feedback and questions welcome. |
Beta Was this translation helpful? Give feedback.
-
|
Thanks for putting this together, chenghuichen — really solid work, and I appreciate both the implementation and the design discussion on #6554. I've left some comments inline. The core structure is good; most of my feedback is about simplifying scope so we can land the foundation and iterate from there. |
Beta Was this translation helpful? Give feedback.
-
|
Just a thought: One nice side-effect of this design: since the checkpoint store tracks both source keys and output file metadata, we effectively have a 2PC layer that works for any sink — including plain Parquet. The checkpoint store knows which files were produced by each task. After all tasks complete, the head node can treat the checkpointed file list as the authoritative output, and clean up any orphaned files from failed/partial tasks. Parquet writes become atomic without needing a catalog. |
Beta Was this translation helpful? Give feedback.
-
|
@rohitkulshreshtha Thanks for the thorough review! I agree on keeping the S3 backend focused — I've updated the PR accordingly (dropped On idempotency: you're right that with |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Interface
See #6511
Store Path
User provides:
s3://checkpoints/my-job/How a store maps to queries, tasks, and pipeline runs is still an open question. For now, the user provides this path and it identifies the store — but the relationship between store identity and execution concepts (e.g. does each run get its own path? do retries share a path?) needs more design work.
Key Layout
State Model
manifest.jsonmanifest.jsonexists with"status": "checkpointed"manifest.jsonexists with"status": "committed"Operations → S3 Calls
stage_keys(id, series)PutObjectto{id}/keys/NNNN.ipcstage_files(id, files)PutObjectto{id}/files/NNNN.bincheckpoint(id)PutObjectto{id}/manifest.jsonmark_committed(ids)PutObjectto{id}/manifest.json(overwrite)get_checkpointed_keys()ListObjectsV2for*/manifest.json, thenGetObjectfor*/keys/*.ipcget_checkpointed_files()status: checkpointedonlyget_checkpoint(id)GetObjectfor{id}/manifest.jsonlist_checkpoints()ListObjectsV2for*/manifest.jsonWhy Manifest Instead of Directory Moves
S3 doesn't have atomic directory moves. A "move" is copy + delete per object — not atomic, race-prone.
Instead, the presence of
manifest.jsonis the atomicity boundary.checkpoint()is a singlePutObject— atomic. Readers ignore directories without a manifest (orphaned staged entries).manifest.json
{ "checkpoint_id": "550e8400-e29b-41d4-a716-446655440000", "status": "checkpointed", "created_at": "2026-03-27T14:00:00Z", "sealed_at": "2026-03-27T14:00:05Z", "committed_at": null, "num_key_files": 3, "num_file_files": 1 }Serialization
RecordBatch::to_ipc_stream()already exists. Language-interoperable..bin)Vec<u8>blob — just write it directly.Read Path (get_checkpointed_keys)
ListObjectsV2with prefixs3://checkpoints/my-job/and delimiter/to find checkpoint IDsGetObjectonmanifest.json— check status ischeckpointedorcommittedListObjectsV2for{id}/keys/*.ipcGetObjectfor each IPC file → deserialize toSeriesOptimization: Could cache manifest statuses to avoid re-reading on every call.
Idempotency
checkpoint()— overwritingmanifest.jsonwith same content is a no-op. Idempotent.mark_committed()— overwritingmanifest.jsonwith updated status. Idempotent.stage_keys()/stage_files()— appending new files with incrementing names. Need to check manifest doesn't exist first (returnAlreadySealed).Open Questions
cleanup_older_than()method?_delta_loghandles atomicity, versioning, and listing.txnaction gives idempotency. No raw S3 key layout needed. Lower priority now that S3 is confirmed first, but worth revisiting.ListObjectsV2returns 1000 objects per call. Number of checkpoints = number of tasks in the pipeline. We don't know the scale for our customers (could be hundreds or millions of input files). Options if listing becomes expensive: (a) root-level index file (oneGetObjectinstead of listing, but sync issues), (b) prefix-based partitioning (checkpointed/,committed/), (c) just paginate. Defer until we know the scale.Beta Was this translation helpful? Give feedback.
All reactions