Use TUS to do replication and make it non-blocking when uploading#99
Use TUS to do replication and make it non-blocking when uploading#99rickyrombo merged 19 commits intomainfrom
Conversation
raymondjacobson
left a comment
There was a problem hiding this comment.
seems good, one high level comment.
not 100% sure what the benefit is of having tus do server-server replication, but i dont see it hurting either
| return fmt.Errorf("failed to create TUS request: %w", err) | ||
| } | ||
|
|
||
| createReq.Header.Set("Tus-Resumable", "1.0.0") |
There was a problem hiding this comment.
what's this version from? maybe we can lift it from version.go?
| } | ||
| // Handle image uploads immediately with synchronous replication | ||
| if upload.Template == JobTemplateImgSquare || upload.Template == JobTemplateImgBackdrop { | ||
| upload.Mirrors, err = ss.replicateFileParallel(ctx, formFileCID, filePath, upload.PlacementHosts) |
There was a problem hiding this comment.
why not handle all replication async?
There was a problem hiding this comment.
i agree. unclear why the AI did this
…ons from normal uploads
There was a problem hiding this comment.
Pull request overview
This pull request refactors the file replication system to use TUS (Tus Resumable Upload Protocol) and makes replication non-blocking. The main goal is to improve upload responsiveness by decoupling replication from the upload request path.
Changes:
- Adds asynchronous replication using worker pools that process uploads from a buffered channel
- Implements TUS-based replication with authentication for peer-to-peer file transfers
- Modifies upload processing to queue replication asynchronously instead of blocking on it
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 16 comments.
| File | Description |
|---|---|
| pkg/mediorum/server/tusd.go | Adds authentication for TUS replication uploads and handles incoming replication requests with CID validation |
| pkg/mediorum/server/server.go | Adds replicationWork channel and starts replication workers as a managed routine |
| pkg/mediorum/server/serve_upload_grpc.go | Changes upload processing to queue replication asynchronously instead of blocking, marks images as done immediately |
| pkg/mediorum/server/replicate_worker.go | New file implementing replication worker pool with TUS client, periodic retry logic, and authenticated uploads |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // For images, mark as done immediately (no transcoding needed) | ||
| if upload.Template == JobTemplateImgSquare || upload.Template == JobTemplateImgBackdrop { | ||
| upload.TranscodeResults["original.jpg"] = formFileCID | ||
| upload.TranscodeProgress = 1 | ||
| upload.TranscodedAt = time.Now().UTC() | ||
| upload.Status = JobStatusDone | ||
| if shouldCreate { | ||
| return ss.crud.Create(upload) | ||
| } | ||
| return ss.crud.Update(upload) | ||
| } |
There was a problem hiding this comment.
Images are marked as JobStatusDone but are still queued for replication. The findMissedReplications function only looks for uploads with status=JobStatusNew and template='audio', so images with status=JobStatusDone will never be picked up by the periodic replication job if they're missed. Either include JobStatusDone in the query, or ensure images are also replicated synchronously, or change the logic so images stay in JobStatusNew until replication completes.
| func (ss *MediorumServer) findMissedReplications() { | ||
| // Find uploads that don't have enough replicas | ||
| uploads := []*Upload{} | ||
| ss.crud.DB.Where("status = ? AND template = 'audio'", JobStatusNew).Find(&uploads) | ||
|
|
||
| for _, upload := range uploads { | ||
| if len(upload.Mirrors) < ss.Config.ReplicationFactor { | ||
| select { | ||
| case ss.replicationWork <- upload: | ||
| ss.logger.Debug("queued upload for replication", zap.String("uploadID", upload.ID)) | ||
| default: | ||
| // Channel full, skip for now | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Multiple workers may process the same upload concurrently when findMissedReplications queues uploads. If an upload with insufficient mirrors is found by findMissedReplications, it could be processed by multiple workers simultaneously, leading to redundant replication attempts and potential race conditions when updating the Mirrors field. Consider adding a locking mechanism or status tracking to prevent concurrent processing of the same upload.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ading (#99)" (#120) * Reapply "Use TUS to do replication and make it non-blocking when uploading (#99)" (#117) This reverts commit c4f1cca. * fix panic * fix panic * refresh upload struct before writes * Update pkg/mediorum/server/replicate_worker.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * don't replicate to every node * better error handling * buffer managed workers * better error management * better error management * validate computed cid matches replication filename * prevent mirror overwrites * prevent crash on malformed header * support placementHosts when replicating * persist transcode results * only add self to mirrors if self is supposed to be a mirror * test wrapper so that vscode "run tests" run in container * log panics in handlers, add logs for failed validations, and add retries for querying upload (fixes race condition in test) * fix replications to placement hosts * queue transcode replication _after_ saving upload record * generate private keys for testing * add tests for tus uploads * Update pkg/mediorum/server/replicate_worker.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * replicate using old method for now * exit if error setting upload status * fix transcode progress getting overwritten --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
No description provided.