-
Notifications
You must be signed in to change notification settings - Fork 934
Faster uploader #1256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Faster uploader #1256
Conversation
WalkthroughAdds OS and architecture properties to PostHog events and refactors the multipart uploader into a concurrent, streaming pipeline with presigned-URL prefetch/cache, optional MD5 support, and explicit MIN_CHUNK_SIZE (5 MB) / MAX_CHUNK_SIZE (20 MB); uploader stream now requires Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client Code
participant Pipeline as Multipart Pipeline (unfold)
participant Presigner as Presign Cache/Service
participant Uploader as Part Uploader (retry)
participant S3 as Storage Service
Client->>Pipeline: supply chunk stream (Send + 'static)
loop parts (streamed, up to concurrency)
Pipeline->>Presigner: request presigned URL (check cache / prefetch)
Presigner-->>Pipeline: presigned URL (+ optional MD5 info)
Pipeline->>Uploader: start upload task for part
par concurrent uploads
Uploader->>S3: PUT part (include Content-MD5 if present)
S3-->>Uploader: response (ETag / status)
end
Uploader-->>Pipeline: UploadedPart (ETag) or error
end
Pipeline-->>Client: stream of UploadedPart results
note over Pipeline: final completion logged when stream ends
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
apps/desktop/src-tauri/src/posthog.rs(1 hunks)apps/desktop/src-tauri/src/upload.rs(4 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Format Rust code usingrustfmtand ensure all Rust code passes workspace-level clippy lints.
Rust modules should be named with snake_case, and crate directories should be in kebab-case.
Files:
apps/desktop/src-tauri/src/posthog.rsapps/desktop/src-tauri/src/upload.rs
🧬 Code graph analysis (1)
apps/desktop/src-tauri/src/upload.rs (1)
apps/desktop/src-tauri/src/api.rs (5)
upload_multipart_presign_part(50-95)resp(43-43)resp(91-91)resp(166-166)resp(222-222)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Build Desktop (aarch64-apple-darwin, macos-latest)
- GitHub Check: Build Desktop (x86_64-pc-windows-msvc, windows-latest)
- GitHub Check: Analyze (rust)
🔇 Additional comments (8)
apps/desktop/src-tauri/src/posthog.rs (1)
57-62: LGTM! OS and architecture telemetry added correctly.The new properties follow the same error-handling pattern as existing properties and use standard library constants. This aligns well with the PR objective to enhance observability.
apps/desktop/src-tauri/src/upload.rs (7)
16-17: LGTM! Imports support the new concurrent architecture.The addition of
joinfor concurrent operations, reorganized stream imports, and concurrency primitives (Arc,Mutex) properly support the streaming pipeline refactor.Also applies to: 28-28
59-59: LGTM! Chunk size increase improves upload efficiency.Increasing the chunk size from 5 MB to 20 MB reduces the number of parts and API calls while staying well within S3's multipart upload limits (5 MB minimum, 5 GB maximum per part).
625-627: LGTM! Function signature properly supports concurrent uploads.The
Send + 'staticbounds on the input stream and'staticreturn type enable safe concurrent processing, whileMAX_CONCURRENT_UPLOADS = 3provides a reasonable balance between throughput and resource consumption.
634-668: LGTM! Presigned URL prefetching reduces upload latency.The concurrent prefetching of presigned URLs while waiting for the next chunk (via
join()) is a smart optimization that reduces idle time. The conditional logic correctly skips prefetching when MD5 hashes are required, since the MD5 must be computed from the chunk data before the URL can be generated.
696-705: LGTM! First chunk URL caching implements the PR objective.The caching logic for the first chunk's presigned URL enables reuse when re-uploading part 1 (in instant mode with updated MP4 header), reducing upload latency as stated in the PR objectives. The 50-minute expiry provides a safe buffer before the typical 1-hour S3 presigned URL expiration.
The
unwrap_or_else(PoisonError::into_inner)handling recovers from poisoned mutexes, which is reasonable here since the cached URL is an optimization rather than critical state.Also applies to: 719-724
727-779: LGTM! Upload execution has proper error handling and retry logic.The upload implementation correctly:
- Parses and validates presigned URLs
- Uses the retryable client for resilience against transient failures
- Applies a reasonable 5-minute timeout for 20 MB chunks
- Conditionally includes Content-MD5 header when required
- Extracts and validates ETag from the response
- Provides detailed error messages with part numbers for debugging
787-799: LGTM! Concurrency control and completion logging are well-implemented.The
.buffered(MAX_CONCURRENT_UPLOADS)correctly limits concurrent uploads to 3, balancing throughput with resource consumption. The completion logging wrapped instream::once().chain()is an established pattern for side effects in stream processing, providing valuable observability for upload completion times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
apps/desktop/src-tauri/src/upload.rs (2)
692-692: Fix base64 API incompatibility.The
base64::encode()function was removed in base64 v0.21.0. With base64 0.22.1, you must use the engine API.Apply this diff:
- let md5_sum = - use_md5_hashes.then(|| base64::encode(md5::compute(&chunk).0)); + let md5_sum = use_md5_hashes.then(|| { + use base64::engine::{general_purpose::STANDARD, Engine}; + STANDARD.encode(md5::compute(&chunk).0) + });Alternatively, add
use base64::prelude::*;at the top of the file and use:- let md5_sum = - use_md5_hashes.then(|| base64::encode(md5::compute(&chunk).0)); + let md5_sum = + use_md5_hashes.then(|| BASE64_STANDARD.encode(md5::compute(&chunk).0));
693-696: Remove redundant dead code condition.The condition
|| md5_sum.is_some()is unreachable. Whenuse_md5_hashesis true (makingmd5_sum.is_some()true),presigned_url?unwraps toOk(None)(line 650), so the patternlet Some(url) = presigned_url?never matches.Apply this diff:
let presigned_url = if let Some(url) = presigned_url? - && (part_number == expected_part_number || md5_sum.is_some()) + && part_number == expected_part_number { url
🧹 Nitpick comments (1)
apps/desktop/src-tauri/src/upload.rs (1)
628-628: Consider moving constant to module level.
MAX_CONCURRENT_UPLOADSis defined inside the function. While functional, module-level constants are more discoverable and easier to tune.Apply this diff to move it to module level:
const MIN_CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5 MB const MAX_CHUNK_SIZE: u64 = 20 * 1024 * 1024; // 20 MB +const MAX_CONCURRENT_UPLOADS: usize = 3;Then remove it from line 628.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
apps/desktop/src-tauri/src/upload.rs(7 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Format Rust code usingrustfmtand ensure all Rust code passes workspace-level clippy lints.
Rust modules should be named with snake_case, and crate directories should be in kebab-case.
Files:
apps/desktop/src-tauri/src/upload.rs
🧬 Code graph analysis (1)
apps/desktop/src-tauri/src/upload.rs (1)
apps/desktop/src-tauri/src/api.rs (5)
upload_multipart_presign_part(50-95)resp(43-43)resp(91-91)resp(166-166)resp(222-222)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Build Desktop (x86_64-pc-windows-msvc, windows-latest)
- GitHub Check: Build Desktop (aarch64-apple-darwin, macos-latest)
- GitHub Check: Analyze (rust)
🔇 Additional comments (8)
apps/desktop/src-tauri/src/upload.rs (8)
16-17: LGTM: Imports correctly support the new concurrent architecture.The new imports for
join,Arc,Mutex, andPoisonErrorare necessary for the concurrent upload pipeline and presigned URL caching introduced in this PR.Also applies to: 28-28
58-60: LGTM: Chunk size constants properly aligned with S3 limits.The separation into
MIN_CHUNK_SIZE(5 MB) andMAX_CHUNK_SIZE(20 MB) provides better control over chunking behavior. The 20 MB increase should reduce the number of parts and API calls, improving upload efficiency.
463-463: LGTM: Buffer size correctly updated.The buffer size properly uses
MAX_CHUNK_SIZEfor chunk reads.
503-503: LGTM: Chunking logic correctly balances S3 requirements and efficiency.The use of
MIN_CHUNK_SIZEfor the read threshold (line 533) ensures chunks meet S3's 5MB minimum, whileMAX_CHUNK_SIZE(lines 503, 539) prevents oversized chunks. This properly handles both real-time and completed recording scenarios.Also applies to: 533-533, 539-539
647-669: LGTM: Presigned URL prefetch reduces upload latency.The concurrent prefetch using
join()is a good optimization that reduces wait time between chunks. The conditional logic correctly handles the self-hosted case where MD5 hashes are required.
700-703: Verify PoisonError handling is appropriate.Using
PoisonError::into_innerignores mutex poisoning, which occurs when a thread panics while holding the lock. For this URL caching use case, the worst-case scenario is refetching a presigned URL, so this is likely acceptable. However, confirm this aligns with your error handling strategy.
728-781: LGTM: HTTP request logic is well-structured.The per-part upload implementation correctly:
- Constructs retryable PUT requests with proper timeouts (5 min)
- Adds Content-MD5 header when required for self-hosted deployments
- Extracts ETags for multipart completion
- Handles errors with detailed context including part numbers
634-801: Well-designed concurrent streaming architecture.The refactored uploader uses a sophisticated streaming pipeline:
stream::unfoldgenerates upload futures with part sequencingbuffered(MAX_CONCURRENT_UPLOADS)enables parallel uploads (up to 3 concurrent)- Presigned URL prefetch reduces latency between chunks
- First chunk URL caching optimizes instant mode re-uploads
This design effectively reduces upload time from recording stop to playback availability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
apps/desktop/src-tauri/src/upload.rs (1)
691-692: Fix deprecatedbase64::encodeAPI.This code will not compile with base64 crate v0.21+. The
base64::encode()function was removed in favor of the engine API.Apply this diff to fix:
- let md5_sum = - use_md5_hashes.then(|| base64::encode(md5::compute(&chunk).0)); + let md5_sum = use_md5_hashes.then(|| { + use base64::prelude::*; + BASE64_STANDARD.encode(md5::compute(&chunk).as_ref()) + });Or alternatively, if you import
use base64::engine::general_purpose::STANDARD;at the top:- let md5_sum = - use_md5_hashes.then(|| base64::encode(md5::compute(&chunk).0)); + let md5_sum = use_md5_hashes.then(|| { + STANDARD.encode(md5::compute(&chunk).as_ref()) + });Based on past review comments.
🧹 Nitpick comments (1)
apps/desktop/src-tauri/src/upload.rs (1)
28-28: Consider usingtokio::sync::Mutexinstead ofstd::sync::Mutex.Using
std::sync::Mutexin async code can block the runtime when the lock is held across await points. While your critical sections appear small (just cloning/setting a cached URL),tokio::sync::Mutexis the recommended choice for async contexts to prevent blocking the executor.Apply this diff:
- sync::{Arc, Mutex, PoisonError}, + sync::Arc,And add this import:
use tokio::sync::Mutex;Then update the PoisonError handling (lines 702, 724) since
tokio::sync::Mutexdoesn't poison - just remove the.unwrap_or_else(PoisonError::into_inner)and use.lock().await:- && let Some((url, expiry)) = first_chunk_presigned_url - .lock() - .unwrap_or_else(PoisonError::into_inner) - .clone() + && let Some((url, expiry)) = first_chunk_presigned_url.lock().await.clone()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
apps/desktop/src-tauri/src/upload.rs(7 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs
📄 CodeRabbit inference engine (AGENTS.md)
**/*.rs: Format Rust code usingrustfmtand ensure all Rust code passes workspace-level clippy lints.
Rust modules should be named with snake_case, and crate directories should be in kebab-case.
Files:
apps/desktop/src-tauri/src/upload.rs
🧬 Code graph analysis (1)
apps/desktop/src-tauri/src/upload.rs (1)
apps/desktop/src-tauri/src/api.rs (5)
upload_multipart_presign_part(50-95)resp(43-43)resp(91-91)resp(166-166)resp(222-222)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Build Desktop (x86_64-pc-windows-msvc, windows-latest)
- GitHub Check: Build Desktop (aarch64-apple-darwin, macos-latest)
- GitHub Check: Analyze (rust)
🔇 Additional comments (11)
apps/desktop/src-tauri/src/upload.rs (11)
58-60: LGTM!The chunk size constants are well-chosen and comply with S3 multipart upload requirements (5 MB minimum except for the last part, 5 GB maximum per part). The 20 MB maximum provides a good balance between network efficiency and memory usage.
463-463: LGTM!The buffer allocation using
MAX_CHUNK_SIZE(20 MB) is appropriate for the chunk reading logic and provides good memory efficiency by reusing the buffer across multiple chunk reads.Also applies to: 503-503
533-536: LGTM!The use of
MIN_CHUNK_SIZEto gate chunk reads ensures compliance with S3's 5 MB minimum part size, while the fallback conditionis_done && new_data_size > 0correctly handles the last part being smaller than 5 MB (which S3 allows).
539-539: LGTM!Capping the chunk size to
MAX_CHUNK_SIZEprevents reading more data than the pre-allocated buffer can hold and ensures consistent upload chunk sizes.
628-636: LGTM!The initialization is well-structured:
MAX_CONCURRENT_UPLOADS = 3provides good parallelism without overwhelming the connection- MD5 requirement check for self-hosted instances is appropriate
- First chunk URL caching enables the latency optimization mentioned in the PR description
638-669: LGTM!The
stream::unfoldpattern with concurrent prefetching is an excellent optimization. Usingjointo overlap fetching the next chunk with presigning the URL reduces upload latency. The early termination when the stream ends (line 668) is correctly handled.
671-688: LGTM!The chunk processing logic is clean with good error handling and descriptive logging. The defensive part number extraction for logging (using "--" for errors) is a nice touch.
719-726: LGTM!The presigned URL caching for part 1 enables the latency optimization mentioned in the PR objectives: "Reuse the presigned URL for the last chunk to reduce upload latency from stop button to working video." The implementation correctly caches the URL with a timestamp for expiry checking.
728-753: LGTM!The HTTP request construction is robust:
- Proper error handling with descriptive messages including
part_numbercontext- Retry logic via
retryable_client()improves reliability- 5-minute timeout is appropriate for large chunks
- Content-MD5 header is conditionally added for self-hosted deployments
- Span instrumentation aids debugging
755-781: LGTM!The response handling is thorough:
- ETag extraction correctly strips surrounding quotes
- Error responses include the body text for debugging
- The ETag is properly required (S3 multipart requires it for completion)
- All necessary fields are captured in
UploadedPart
789-801: LGTM!The stream composition is well-designed and achieves the PR's main objective of concurrent uploads:
buffered(MAX_CONCURRENT_UPLOADS)enables processing up to 3 parts concurrently- The completion logging via
chainandstream::onceis a clean pattern- The overall architecture (unfold → buffered → flatten) creates an efficient concurrent upload pipeline
| stream: impl Stream<Item = io::Result<Chunk>> + Send + 'static, | ||
| ) -> impl Stream<Item = Result<UploadedPart, AuthedApiError>> + 'static { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
LGTM!
The Send + 'static bounds on the input stream and 'static return type are necessary for the concurrent, buffered upload architecture. The stream must be Send to be processed across threads, and 'static to live beyond the function scope.
Verify that all call sites of multipart_uploader satisfy these new bounds:
🏁 Script executed:
#!/bin/bash
# Check all call sites of multipart_uploader
rg -n -C5 "multipart_uploader\(" --type rustLength of output: 2519
---
🏁 Script executed:
```shell
#!/bin/bash
# Find the from_pending_file_to_chunks function definition
rg -n -A10 "fn from_pending_file_to_chunks" --type rust
Length of output: 962
🏁 Script executed:
#!/bin/bash
# Find the progress function definition
rg -n -A10 "fn progress" --type rustLength of output: 836
🏁 Script executed:
#!/bin/bash
# Check if there are any imports or uses of futures or stream utilities
rg -n "use.*Stream|use.*stream" --type rust apps/desktop/src-tauri/src/upload.rs | head -20Length of output: 137
🏁 Script executed:
#!/bin/bash
# Get more context around the function signatures at lines 626-627
sed -n '620,635p' apps/desktop/src-tauri/src/upload.rsLength of output: 641
🏁 Script executed:
#!/bin/bash
# Check the full context of both call sites
sed -n '75,90p' apps/desktop/src-tauri/src/upload.rs
sed -n '395,415p' apps/desktop/src-tauri/src/upload.rsLength of output: 1185
---
🏁 Script executed:
```shell
#!/bin/bash
# Get full from_pending_file_to_chunks implementation (at least 30 lines)
sed -n '483,530p' apps/desktop/src-tauri/src/upload.rs
Length of output: 2135
🏁 Script executed:
#!/bin/bash
# Get full progress function implementation to understand the wrapper
sed -n '876,920p' apps/desktop/src-tauri/src/upload.rsLength of output: 1844
🏁 Script executed:
#!/bin/bash
# Check Cargo.toml for async_stream version
grep -A2 "async_stream\|async-stream" Cargo.toml 2>/dev/null || grep -r "async_stream\|async-stream" Cargo.* 2>/dev/null | head -5Length of output: 131
🏁 Script executed:
#!/bin/bash
# Check if there are any recent changes or TODOs related to Send bounds
rg -n "Send|'static" apps/desktop/src-tauri/src/upload.rs | grep -E "626|627|483|486" Length of output: 149
Address bounds incompatibility between multipart_uploader and its call sites
The new Send + 'static bounds on the input stream parameter are incompatible with the actual streams being passed. The function from_pending_file_to_chunks (line 483) captures a Receiver<()> which is not Send, making the returned stream non-Send. Similarly, the progress() wrapper (line 876) does not guarantee Send + 'static bounds on its output.
Both call sites (lines 80 and 404) will fail to compile because they pass non-Send streams to multipart_uploader, violating the new bounds.
Fix either:
- Update
from_pending_file_to_chunksto returnimpl Stream<Item = io::Result<Chunk>> + Send + 'static(e.g., by removing the capturedReceiver<()>or refactoring to avoid capturing it in the stream) - Update
progress()to preserve/addSend + 'staticbounds - Reconsider the bounds on
multipart_uploaderif the concurrent architecture doesn't actually requireSend + 'static
🤖 Prompt for AI Agents
In apps/desktop/src-tauri/src/upload.rs around lines 626-627, the
multipart_uploader signature requires the input stream to be Send + 'static but
two call sites pass streams that capture a non-Send Receiver
(from_pending_file_to_chunks) or return non-Send streams from progress(); fix by
making the produced streams Send + 'static or by relaxing multipart_uploader
bounds. Preferred fix: change from_pending_file_to_chunks (and/or progress) so
they do not capture a non-Send Receiver—e.g., replace the non-Send channel with
a Send-compatible primitive (tokio::sync::watch or Arc<AtomicBool>) or
restructure the stream generation so the Receiver is not captured inside the
returned async stream—thereby ensuring the returned impl Stream is Send +
'static and the current multipart_uploader bounds remain valid.
Summary by CodeRabbit
Chores
Refactor