-
Notifications
You must be signed in to change notification settings - Fork 922
Uploader Improvements #1251
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Uploader Improvements #1251
Conversation
WalkthroughRemoved the legacy uploader and refactored the desktop multipart upload flow: error-message formatting changes, propagated video metadata through upload/control paths, extended timeouts and tracing, expanded telemetry events (length/size), and tightened recording lifecycle to abort in-progress multipart uploads. Changes
Sequence Diagram(s)sequenceDiagram
participant UI
participant RecordingActor as Recording Actor
participant Uploader as InstantMultipartUpload
participant API as Backend API
participant PostHog
UI->>RecordingActor: start recording
RecordingActor->>RecordingActor: build actor
RecordingActor->>Uploader: spawn progressive upload (returns metadata on completion)
Uploader->>API: create_or_get_video / initiate multipart
Uploader->>API: presign parts / upload parts (each with timeout + tracing)
API-->>Uploader: presigned URLs / part-ack
Uploader->>API: upload_multipart_complete(metadata)
API-->>Uploader: completion response (status numeric in error messages if error)
Uploader->>PostHog: emit MultipartUploadComplete { duration, length, size, cap_version }
RecordingActor->>UI: recording stopped/cleaned (abort upload if deleted)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (11)
apps/web/app/api/upload/[...route]/multipart.ts (2)
81-132
: Consider simplifying nested try-catch blocks.The inner try-catch (lines 115-121) catches S3 errors only to wrap and re-throw them, and the outer try-catch (lines 123-132) catches everything including the re-thrown error. This nested structure adds complexity without providing distinct error handling at each level.
Consider flattening to a single try-catch:
try { - try { - const uploadId = await Effect.gen(function* () { - const [bucket] = yield* S3Buckets.getBucketAccessForUser(user.id); - - const finalContentType = contentType || "video/mp4"; - console.log( - `Creating multipart upload in bucket: ${bucket.bucketName}, content-type: ${finalContentType}, key: ${fileKey}`, - ); - - const { UploadId } = yield* bucket.multipart.create(fileKey, { - ContentType: finalContentType, - Metadata: { - userId: user.id, - source: "cap-multipart-upload", - }, - CacheControl: "max-age=31536000", - }); - - if (!UploadId) { - throw new Error("No UploadId returned from S3"); - } - - console.log( - `Successfully initiated multipart upload with ID: ${UploadId}`, - ); - console.log( - `Upload details: Bucket=${bucket.bucketName}, Key=${fileKey}, ContentType=${finalContentType}`, - ); - - return UploadId; - }).pipe(runPromise); - - return c.json({ uploadId: uploadId }); - } catch (s3Error) { - console.error("S3 operation failed:", s3Error); - throw new Error( - `S3 operation failed: ${ - s3Error instanceof Error ? s3Error.message : "Unknown error" - }`, - ); - } + const uploadId = await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccessForUser(user.id); + + const finalContentType = contentType || "video/mp4"; + console.log( + `Creating multipart upload in bucket: ${bucket.bucketName}, content-type: ${finalContentType}, key: ${fileKey}`, + ); + + const { UploadId } = yield* bucket.multipart.create(fileKey, { + ContentType: finalContentType, + Metadata: { + userId: user.id, + source: "cap-multipart-upload", + }, + CacheControl: "max-age=31536000", + }); + + if (!UploadId) { + throw new Error("No UploadId returned from S3"); + } + + console.log( + `Successfully initiated multipart upload with ID: ${UploadId}`, + ); + console.log( + `Upload details: Bucket=${bucket.bucketName}, Key=${fileKey}, ContentType=${finalContentType}`, + ); + + return UploadId; + }).pipe(runPromise); + + return c.json({ uploadId: uploadId }); } catch (error) { console.error("Error initiating multipart upload", error); return c.json( { error: "Error initiating multipart upload", details: error instanceof Error ? error.message : String(error), }, 500, ); }
87-109
: Consider using Effect.log for consistency.The file mixes
console.log
statements withEffect.log
calls. Since the codebase uses Effect throughout and already employsEffect.log
in some places (lines 331, 347, 352), usingEffect.log
consistently would provide better integration with Effect's structured logging and tracing capabilities.Also applies to: 169-171, 263-301, 313-321
apps/desktop/src-tauri/src/api.rs (1)
83-89
: Status code formatting change is fine; consider adding reason textSwitching to
as_u16()
yields cleaner numeric codes. Optionally include the canonical reason for quicker triage:
- e.g., format as "{code} {reason}: {body}" using
let sc = resp.status(); let code = sc.as_u16(); let reason = sc.canonical_reason().unwrap_or(\"\");
Also applies to: 158-164, 214-220, 248-254
apps/desktop/src-tauri/src/web_api.rs (1)
20-26
: NewTimeout
variant andFrom<reqwest::Error>
mapping improves error clarityGood change: timeouts surface distinctly; other reqwest errors retain source. This also enables
map_err(|e| format!(..))?
patterns via your newFrom<String>
.
- Optional: consider a separate variant for network/connect errors (
is_connect
,is_request
,is_body
), if you want finer PostHog/error analytics later.Also applies to: 29-36
apps/desktop/src-tauri/src/posthog.rs (1)
6-19
: Record explicit units and avoid truncation to enable accurate speed metricsYou’re emitting:
- upload duration:
duration.as_secs()
(truncated)- video length:
length.as_secs()
(truncated)- size:
u64
MB (truncated in upload.rs)For robust avg-speed analytics and small uploads:
- Use floating seconds (
as_secs_f64
) for durations.- Emit size with explicit unit (e.g.,
size_mb
), ideally asf64
.- Maintain backward-compatible props for dashboards by adding new fields rather than renaming.
Suggested tweak (adds explicit-unit fields; keeps existing ones):
- e.insert_prop("duration", duration.as_secs())... + e.insert_prop("duration", duration.as_secs())... // legacy + e.insert_prop("upload_duration_seconds", duration.as_secs_f64())... - e.insert_prop("length", length.as_secs())... + e.insert_prop("length", length.as_secs())... // legacy + e.insert_prop("video_length_seconds", length.as_secs_f64())... - e.insert_prop("size", size)... + e.insert_prop("size", size)... // legacy (MB, truncated) + e.insert_prop("size_mb", size as f64)...Note: In upload.rs,
size
is computed as truncated MB; consider emitting raw bytes there for best precision, then convert here. Based on relevant code snippet.Also applies to: 24-41, 54-56
apps/desktop/src-tauri/src/recording.rs (1)
745-760
: Abort-then-delete can race on Windows; add a short await/backoff before removing filesAfter
progressive_upload.handle.abort()
, the OS may still hold file handles briefly. Deleting immediately can fail (EPERM/EBUSY). Prefer async removal with retries.Proposed change:
- let _ = recording.cancel().await; - - std::fs::remove_dir_all(&recording_dir).ok(); + let _ = recording.cancel().await; + + // Give the uploader a moment to drop file handles, then retry delete a few times. + use tokio::time::{sleep, Duration}; + let mut attempts = 0; + while attempts < 5 { + match tokio::fs::remove_dir_all(&recording_dir).await { + Ok(_) => break, + Err(e) => { + attempts += 1; + if attempts >= 5 { + error!("Failed to remove recording dir {:?}: {e}", recording_dir); + break; + } + sleep(Duration::from_millis(200)).await; + } + } + }This reduces flakiness on Windows when users delete immediately after starting/aborting an instant recording.
Also applies to: 764-774
apps/desktop/src-tauri/src/upload.rs (5)
105-109
: Propagate metadata to multipart_complete and return it upstream.Good change; aligns with emitting richer PostHog fields.
The
meta: S3VideoMeta
parameter inupload_video(...)
(Lines 60-67) is now unused. Consider removing it or using it to avoid clippy warnings.
369-371
: run now returns Option.Nice for surfacing metadata. Minor nit: naming inconsistency between
realtime_upload_done
(spawn) andrealtime_video_done
(run). Align for readability.
477-487
: File-open retry loop: reduce log noise and clarify error.Current code logs
error!
on every failed open attempt, which can spam logs during normal races. Suggest logging at debug/trace during retries and emitting a single error on timeout. Also considerTimedOut
error kind.Apply this diff:
- let mut file = timeout(Duration::from_secs(20), async move { - loop { - if let Ok(file) = tokio::fs::File::open(&path).await.map_err(|err| error!("from_pending_file_to_chunks/open: {err:?}")) { - break file; - } - tokio::time::sleep(Duration::from_millis(100)).await; - } - }) - .await - .map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to open file. The recording pipeline may have crashed?"))?; + let mut file = timeout(Duration::from_secs(20), async { + loop { + match File::open(&path).await { + Ok(f) => break f, + Err(_) => tokio::time::sleep(Duration::from_millis(100)).await, + } + } + }) + .await + .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "Opening recording file timed out"))?;
128-137
: Option: include average upload speed in event payload.If useful to avoid downstream calc, add
avg_speed_mbps = (size_mb as f64) / duration.as_secs_f64()
with safe zero-division handling.Confirm the
PostHogEvent::MultipartUploadComplete
schema has a field for this (or add one).Also applies to: 343-351
128-137
: Minor: avoid blocking fs metadata in async context (optional).Size retrieval is non-critical, but you can move it earlier (from parts/total) or use
tokio::fs::metadata
if you want to stay on the async path.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
apps/desktop/src-tauri/src/api.rs
(4 hunks)apps/desktop/src-tauri/src/lib.rs
(1 hunks)apps/desktop/src-tauri/src/posthog.rs
(2 hunks)apps/desktop/src-tauri/src/recording.rs
(2 hunks)apps/desktop/src-tauri/src/upload.rs
(10 hunks)apps/desktop/src-tauri/src/upload_legacy.rs
(0 hunks)apps/desktop/src-tauri/src/web_api.rs
(2 hunks)apps/web/app/api/upload/[...route]/multipart.ts
(1 hunks)
💤 Files with no reviewable changes (1)
- apps/desktop/src-tauri/src/upload_legacy.rs
🧰 Additional context used
📓 Path-based instructions (7)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs
: Format Rust code usingrustfmt
and ensure all Rust code passes workspace-level clippy lints.
Rust modules should be named with snake_case, and crate directories should be in kebab-case.
Files:
apps/desktop/src-tauri/src/lib.rs
apps/desktop/src-tauri/src/recording.rs
apps/desktop/src-tauri/src/api.rs
apps/desktop/src-tauri/src/web_api.rs
apps/desktop/src-tauri/src/posthog.rs
apps/desktop/src-tauri/src/upload.rs
**/*.{ts,tsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.{ts,tsx}
: Use a 2-space indent for TypeScript code.
Use Biome for formatting and linting TypeScript/JavaScript files by runningpnpm format
.Use strict TypeScript and avoid any; leverage shared types
Files:
apps/web/app/api/upload/[...route]/multipart.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.{ts,tsx,js,jsx}
: Use kebab-case for filenames for TypeScript/JavaScript modules (e.g.,user-menu.tsx
).
Use PascalCase for React/Solid components.
Files:
apps/web/app/api/upload/[...route]/multipart.ts
apps/web/**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
On the client, always use
useEffectQuery
oruseEffectMutation
from@/lib/EffectRuntime
; never callEffectRuntime.run*
directly in components.
Files:
apps/web/app/api/upload/[...route]/multipart.ts
apps/web/app/api/**/*.{ts,tsx}
📄 CodeRabbit inference engine (CLAUDE.md)
apps/web/app/api/**/*.{ts,tsx}
: Prefer Server Actions for API surface; when routes are necessary, implement under app/api and export only the handler from apiToHandler(ApiLive)
Construct API routes with @effect/platform HttpApi/HttpApiBuilder, declare contracts with Schema, and only export the handler
Use HttpAuthMiddleware for required auth and provideOptionalAuth for guests; avoid duplicating session lookups
Map domain errors to transport with HttpApiError.* and keep translation exhaustive (catchTags/tapErrorCause)
Inside HttpApiBuilder.group, acquire services with Effect.gen and provide dependencies via Layer.provide instead of manual provideService
Files:
apps/web/app/api/upload/[...route]/multipart.ts
apps/web/**/*.{ts,tsx}
📄 CodeRabbit inference engine (CLAUDE.md)
apps/web/**/*.{ts,tsx}
: Use TanStack Query v5 for all client-side server state and fetching in the web app
Mutations should call Server Actions directly and perform targeted cache updates with setQueryData/setQueriesData
Run server-side effects via the ManagedRuntime from apps/web/lib/server.ts using EffectRuntime.runPromise/runPromiseExit; do not create runtimes ad hoc
Client code should use helpers from apps/web/lib/EffectRuntime.ts (useEffectQuery, useEffectMutation, useRpcClient); never call ManagedRuntime.make inside components
Files:
apps/web/app/api/upload/[...route]/multipart.ts
apps/web/app/**/*.{ts,tsx}
📄 CodeRabbit inference engine (CLAUDE.md)
Server components needing Effect services must call EffectRuntime.runPromise(effect.pipe(provideOptionalAuth))
Files:
apps/web/app/api/upload/[...route]/multipart.ts
🧬 Code graph analysis (3)
apps/desktop/src-tauri/src/recording.rs (1)
apps/desktop/src-tauri/src/upload.rs (1)
spawn
(324-362)
apps/desktop/src-tauri/src/posthog.rs (1)
apps/desktop/src-tauri/src/upload.rs (4)
size
(748-748)size
(756-758)size
(766-768)size
(776-778)
apps/desktop/src-tauri/src/upload.rs (3)
apps/desktop/src-tauri/src/api.rs (5)
upload_multipart_complete
(119-170)resp
(43-43)resp
(91-91)resp
(166-166)resp
(222-222)crates/utils/src/lib.rs (1)
spawn_actor
(6-13)apps/desktop/src-tauri/src/posthog.rs (1)
async_capture_event
(73-82)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Build Desktop (x86_64-pc-windows-msvc, windows-latest)
- GitHub Check: Build Desktop (aarch64-apple-darwin, macos-latest)
- GitHub Check: Analyze (rust)
🔇 Additional comments (10)
apps/desktop/src-tauri/src/lib.rs (1)
2596-2603
: Resuming multipart: passingNone
for realtime signal looks correctThe new
InstantMultipartUpload::spawn(.., None)
arg matches the updated signature and intent for resumed uploads (no realtime sender). LGTM.apps/desktop/src-tauri/src/web_api.rs (1)
140-142
: Minor borrow cleanup LGTMUsing an immutable
app_state
read simplifiesis_server_url_custom
. Looks good.apps/desktop/src-tauri/src/recording.rs (1)
562-569
: Spawning progressive upload after actor build reduces the file-race riskPassing
Some(finish_upload_rx)
to gate completion is a solid improvement. LGTM.Please confirm upload.rs now tolerates the file-not-yet-present case during startup (retry/backoff) as per the PR description.
apps/desktop/src-tauri/src/upload.rs (7)
36-39
: Tracing/timeouts import looks good.No issues; enables Instant/timeout and info_span as needed.
128-137
: PostHog: added length and size.Looks right. Please confirm units downstream:
- length: seconds (Duration)
- size: MB (10^6 divisor, truncated)
If PostHog expects bytes or MiB, adjust accordingly.
330-340
: Realtime completion signal plumbed through spawn/run.Good—non-fatal handling will improve resiliency when senders drop.
343-351
: PostHog in InstantMultipartUpload: length/size added.Mirrors top-level event; looks consistent.
415-416
: Send metadata to multipart_complete.Good; aligns with API flattening for S3VideoMeta.
429-429
: Return captured metadata.LGTM.
501-505
: Treat dropped senders as “realtime done”.Good call; prevents spurious fatal errors when producer ends first.
Ensure UI/UX reflects non-realtime state correctly when this path triggers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
apps/desktop/src-tauri/src/upload.rs (3)
351-359
: Emit average upload speed in InstantMultipartUpload::spawn event too.Mirror the speed metric here for consistency with upload_video.
Example:
- Ok(meta) => PostHogEvent::MultipartUploadComplete { + Ok(meta) => { + let size_bytes = std::fs::metadata(&file_path).map(|m| m.len()).unwrap_or(0); + let secs = start.elapsed().as_secs_f64().max(0.001); + let avg_speed_mbps = (size_bytes as f64 * 8.0) / secs / 1_000_000.0; + PostHogEvent::MultipartUploadComplete { duration: start.elapsed(), length: meta .as_ref() .map(|v| Duration::from_secs(v.duration_in_secs as u64)) .unwrap_or_default(), size: std::fs::metadata(file_path) .map(|m| ((m.len() as f64) / 1_000_000.0) as u64) .unwrap_or_default(), + avg_speed_mbps, + } - }, + },
728-739
: Add explicit timeout and same error mapping/instrumentation for single-part uploads.Screenshot upload can hang without a request timeout; also String-wrapping loses timeout classification.
Apply:
- let resp = retryable_client(url.host().unwrap_or("<unknown>").to_string()) + let resp = retryable_client(url.host().unwrap_or("<unknown>").to_string()) .build() .map_err(|err| format!("singlepart_uploader/client: {err:?}"))? .put(&presigned_url) .header("Content-Length", total_size) - .body(reqwest::Body::wrap_stream(stream)) - .send() - .await - .map_err(|err| format!("singlepart_uploader/error: {err:?}"))?; + .timeout(Duration::from_secs(5 * 60)) + .body(reqwest::Body::wrap_stream(stream)) + .send() + .instrument(info_span!("send", video_id = %request.video_id, size = total_size)) + .await + .map_err(|err| { + error!("singlepart_uploader/send: {err:?}"); + AuthedApiError::from(err) + })?;
682-682
: Preserve timeout classification and enrich spans for S3 part uploads.Don’t String-wrap reqwest::Error; map to AuthedApiError to classify timeouts. Add part (and video_id) to the span. This was raised earlier and still applies.
Apply:
- let resp = req - .send() - .instrument(info_span!("send", size = size)) - .await - .map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?; + let resp = req + .send() + .instrument(info_span!("send", video_id = %video_id, part = part_number, size = size)) + .await + .map_err(|err| { + error!("uploader/part/{part_number}/send: {err:?}"); + AuthedApiError::from(err) + })?;Also applies to: 690-692
🧹 Nitpick comments (1)
apps/desktop/src-tauri/src/upload.rs (1)
485-495
: Open-with-timeout retry is great; reduce log noise and use TimedOut error kind.
- Logging error! on every 100ms retry will spam logs; prefer trace! for attempts and warn! once near timeout.
- Use io::ErrorKind::TimedOut for clarity.
Apply:
- .await - .map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to open file. The recording pipeline may have crashed?"))?; + .await + .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "Failed to open file within 20s. The recording pipeline may have crashed?"))?;Optional: throttle logs
let started = Instant::now(); loop { match tokio::fs::File::open(&path).await { Ok(f) => break f, Err(err) => { if started.elapsed() > Duration::from_secs(5) { debug!("from_pending_file_to_chunks/open retrying: {err:?}"); } else { trace!("from_pending_file_to_chunks/open retrying..."); } } } tokio::time::sleep(Duration::from_millis(100)).await; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
apps/desktop/src-tauri/src/upload.rs
(11 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs
: Format Rust code usingrustfmt
and ensure all Rust code passes workspace-level clippy lints.
Rust modules should be named with snake_case, and crate directories should be in kebab-case.
Files:
apps/desktop/src-tauri/src/upload.rs
🧬 Code graph analysis (1)
apps/desktop/src-tauri/src/upload.rs (3)
apps/desktop/src-tauri/src/api.rs (5)
upload_multipart_complete
(119-170)resp
(43-43)resp
(91-91)resp
(166-166)resp
(222-222)crates/utils/src/lib.rs (1)
spawn_actor
(6-13)apps/desktop/src-tauri/src/posthog.rs (1)
async_capture_event
(73-82)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Clippy
- GitHub Check: Build Desktop (x86_64-pc-windows-msvc, windows-latest)
- GitHub Check: Build Desktop (aarch64-apple-darwin, macos-latest)
- GitHub Check: Analyze (rust)
🔇 Additional comments (5)
apps/desktop/src-tauri/src/upload.rs (5)
36-37
: Imports for timeout and tracing look good.Brings in tokio::time::timeout and richer tracing symbols used below. No issues.
Also applies to: 39-39
372-379
: Returning metadata from run() and passing to complete: LGTM.Signature and propagation align with the new flow; completion receives meta and run returns it.
Also applies to: 423-424, 437-437
509-513
: Treating dropped senders as “done” matches PR goal.Good change to persist uploader when all senders drop.
239-247
: The review comment can be dismissed; if-let chains are fully supported in this project.The codebase targets edition 2024 (requiring Rust 1.85.0+), and if-let chains have been stable since Rust 1.80.0 (released October 2024). There is no MSRV constraint that would prevent this pattern, and the workspace clippy lints do not restrict it. The suggested refactor would reduce readability by introducing unnecessary nesting without benefit.
Likely an incorrect or invalid review comment.
105-109
: Apply error mapping fixes to preserve timeout classification in api.rs.The current
upload_multipart_complete
function at lines 155 and 168 wraps reqwest errors in format strings, losing timeout information. TheAuthedApiError::From<reqwest::Error>
impl in web_api.rs correctly distinguishes timeouts viaerr.is_timeout()
. Replace the format-based error mapping withAuthedApiError::from()
to preserve this classification.Fix at line 155:
- .await - .map_err(|err| format!("api/upload_multipart_complete/request: {err}"))?; + .await + .map_err(AuthedApiError::from)?;Fix at line 168:
- resp.json::<Response>() - .await - .map_err(|err| format!("api/upload_multipart_complete/response: {err}").into()) + resp.json::<Response>() + .await + .map_err(AuthedApiError::from)Likely an incorrect or invalid review comment.
reqwest
timeouts a dedicated error so they will show up better in PostHogSummary by CodeRabbit
Bug Fixes
Improvements
Chores