feat(cluster): resumable peer-replication transfers (#398)#404
Conversation
Failed file pulls now resume from the last committed byte instead of restarting from zero. Large compacted Parquet outputs on slow or flaky links no longer re-transfer already-received data. Protocol: - FetchFileRequest.ByteOffset: byte position to resume from (omitempty) - FetchFileAckHeader.ByteOffset: server echo for confirmation - FetchFileAckHeader.SizeBytes now carries tail bytes, not total size - AckCodeBadOffset: server rejects invalid/stale offsets Storage interface: - ReadToAt(ctx, path, writer, offset): range-read; used server-side - StatFile(ctx, path): size or -1 if not found; drives pre-check and resume-offset detection in the puller - AppendingBackend: new optional interface for backends that support append writes. LocalBackend implements it (O_WRONLY|O_APPEND). S3 and Azure do not; the puller falls back to full re-fetch via type-assertion rather than a mandatory method that always errors. Puller behavior on retry (attempt > 1): 1. tryResumeFromPartial: StatFile → ReadTo into sha256.Hash (prefix) 2. Send ByteOffset to peer; peer sends only the tail via ReadToAt 3. writeFileTail: AppendingBackend.AppendReader or WriteReader 4. ErrBadOffset (server) → totalBadOffsetServer++, delete partial 5. ErrResumeNotSupported (backend) → totalBadOffsetBackend++, delete Other improvements from post-implementation review: - Coordinator uses ReadToAt for both full and partial fetches (no branch) - pullOnce goroutine uses sync.WaitGroup; write logic in writeFileTail - Resume detection extracted to tryResumeFromPartial helper - bad_offset counter split into bad_offset_server / bad_offset_backend Full SHA-256 verification preserved for all code paths.
There was a problem hiding this comment.
Code Review
This pull request implements resumable file transfers for cluster replication by introducing ReadToAt, StatFile, and AppendReader methods to the storage backends. The Puller is updated to detect partial files, verify their integrity via prefix hashing, and request only the remaining data from peers. Review feedback indicates a significant issue where the LocalBackend's atomic write strategy prevents partial files from being preserved, effectively disabling the resume feature for local storage. Suggestions were also made to optimize the resume check by verifying AppendingBackend support before hashing and to refine the Azure storage implementation to avoid unnecessary Range headers for full-file fetches.
| var byteOffset int64 | ||
| var prefixHasher hash.Hash | ||
| if attempt > 1 { | ||
| byteOffset, prefixHasher = p.tryResumeFromPartial(log, entry) |
There was a problem hiding this comment.
The resume feature appears to be non-functional for the LocalBackend because WriteReader (called for the initial transfer at offset 0) is atomic. It writes to a temporary file and deletes it on failure (see internal/storage/local.go:181). Consequently, no partial file is left at entry.Path for tryResumeFromPartial to discover on subsequent attempts. To support resumption, the initial write strategy needs to preserve partial data (e.g., by using a persistent .part file that is not cleaned up on transport errors).
| return 0, nil | ||
| } | ||
|
|
||
| h := sha256.New() |
There was a problem hiding this comment.
tryResumeFromPartial should check if the backend implements storage.AppendingBackend before proceeding to hash the prefix. For backends like S3 or Azure that do not support appending, hashing the local partial file (which might involve a full download if the backend is remote) is a wasted operation, as writeFileTail will eventually reject the resume attempt with ErrResumeNotSupported.
if _, ok := p.cfg.Backend.(storage.AppendingBackend); !ok {
return 0, nil
}
h := sha256.New()| resp, err := blobClient.DownloadStream(ctx, &blob.DownloadStreamOptions{ | ||
| Range: blob.HTTPRange{Offset: offset}, | ||
| }) |
There was a problem hiding this comment.
For consistency with the S3 implementation (line 299 in internal/storage/s3.go), it is recommended to avoid sending the Range header when offset is 0. This ensures the standard full-blob download path is used.
| resp, err := blobClient.DownloadStream(ctx, &blob.DownloadStreamOptions{ | |
| Range: blob.HTTPRange{Offset: offset}, | |
| }) | |
| var downloadOpts *blob.DownloadStreamOptions | |
| if offset > 0 { | |
| downloadOpts = &blob.DownloadStreamOptions{ | |
| Range: blob.HTTPRange{Offset: offset}, | |
| } | |
| } | |
| resp, err := blobClient.DownloadStream(ctx, downloadOpts) |
| // Read whatever the reader provides. If the reader closes with an error | ||
| // (e.g. a broken pipe from a cancelled fetch), store the partial bytes and | ||
| // return the error — matching real backend behaviour where partial bytes | ||
| // are flushed to disk before the write fails. |
There was a problem hiding this comment.
The fakeBackend implementation of WriteReader in tests is non-atomic and stores partial bytes on failure. This is inconsistent with the real LocalBackend implementation, which is atomic and cleans up partial writes. This discrepancy masks the issue where the resume feature cannot actually function with the current LocalBackend.
…der, and non-appending backend fallback - TestLocalBackend_StatFile: not-found (-1), existing file size, correct - TestLocalBackend_ReadToAt: full fetch, partial read from offset, negative offset error, missing file - TestLocalBackend_AppendReader: verifies LocalBackend implements AppendingBackend; prefix+tail concatenation - TestPuller_NonAppendingBackendFallback: backend without AppendingBackend triggers bad_offset_backend counter, deletes partial, retries full fetch from zero Also fixes a subtle bug in pullOnce: when the write goroutine returns ErrResumeNotSupported (closing the pipe), the resulting broken-pipe fetchErr was masking the write-side error. ErrResumeNotSupported is now checked before fetchErr so the counter increments and the partial is deleted correctly.
…ansfers - fix(storage/local): WriteReader now uses a deterministic `.part` staging file instead of a random `.tmp`. On transport error the staging file is left in place so the puller can resume; on success it is atomically renamed to the final path. This was the root cause of non-functional resume on local backends (HIGH severity finding). - fix(storage/azure): ReadToAt no longer sends a Range header when offset=0, avoiding a spurious 400 from Azure Blob Storage (MEDIUM). - fix(filereplication/puller): removed premature AppendingBackend early-return from tryResumeFromPartial so non-appendable backends correctly increment bad_offset_backend and delete the partial file before retrying from zero. - fix(filereplication/puller_test): merged duplicate fakeBackend.Delete methods; fakeBackend now mirrors LocalBackend .part staging semantics so resume tests catch the atomicity bug at the unit level. - fix(storage/backend_test): TestLocalBackend_AppendReader now exercises the real contract — failed WriteReader leaves .part, AppendReader appends tail and promotes to final.
|
@gemini-code-assist All findings from your previous review have been addressed in the latest push. Here's a summary of the fixes: HIGH — LocalBackend.WriteReader non-resumable (staging file deleted on error) MEDIUM — Azure ReadToAt sends Range header at offset=0 MEDIUM — tryResumeFromPartial skips resume for non-AppendingBackend backends fakeBackend staging semantics Please re-review when you get a chance. Thanks! |
There was a problem hiding this comment.
Code Review
This pull request introduces resumable file transfers to the replication system, enabling interrupted downloads to continue from the last committed byte. It extends the storage Backend interface with ReadToAt and StatFile methods and adds an AppendingBackend interface specifically for local-SSD storage. The Puller has been updated to identify partial files, verify their existing content via prefix hashing, and request only the missing data from peers. Review feedback highlights a logic error in prefix hashing for local storage, suggests scaling the hashing timeout for large files, and identifies a race condition in the file-opening sequence of the local backend.
| } | ||
|
|
||
| h := sha256.New() | ||
| hashCtx, hashCancel := context.WithTimeout(p.ctx, 30*time.Second) |
There was a problem hiding this comment.
A fixed 30-second timeout for hashing the partial file prefix may be insufficient for very large files (e.g., several gigabytes), especially on systems with slow or heavily contended disk I/O. Since this operation is local and essential for resuming large transfers, consider using a more generous timeout or one scaled by the expected file size.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Summary
ByteOffsetfield onFetchFileRequest/FetchFileAckHeader(omitempty); HMAC does not bind the offset (path is already bound)AppendingBackendoptional interface replaces a mandatoryAppendReadermethod that S3/Azure would always reject — puller type-asserts and falls back gracefullybad_offsetcounter split intobad_offset_server(AckCodeBadOffset) andbad_offset_backend(ErrResumeNotSupported) for cleaner observabilitytryResumeFromPartialhelper,writeFileTailextracted, pipe goroutine usessync.WaitGroup, coordinator usesReadToAtunconditionallyTest plan
go test ./internal/cluster/filereplication/...— all existing + new tests pass (resume happy path, AckCodeBadOffset, offset echo mismatch, partial file retry, bad-offset deletes partial)go test ./internal/storage/...— ReadToAt, StatFile, AppendReader on LocalBackendgo test ./internal/cluster/...— integration test: full fetch round-trip still worksgo build ./...— clean build