From 761e5d3fd9f124b03f7db11a6ee60a8886461e76 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 21 Oct 2025 13:47:35 +0800 Subject: [PATCH 01/10] upload multiple chunks at a time --- apps/desktop/src-tauri/src/upload.rs | 147 ++++++++++++++------------- 1 file changed, 75 insertions(+), 72 deletions(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 21e0268c53..01c154fd4d 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -13,7 +13,8 @@ use cap_project::{RecordingMeta, S3UploadMeta, UploadMeta}; use cap_utils::spawn_actor; use ffmpeg::ffi::AV_TIME_BASE; use flume::Receiver; -use futures::{Stream, StreamExt, TryStreamExt, future::join, stream}; +use futures::stream::FuturesUnordered; +use futures::{Stream, StreamExt, TryStreamExt, stream}; use image::{ImageReader, codecs::jpeg::JpegEncoder}; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; @@ -622,92 +623,94 @@ fn multipart_uploader( upload_id: String, stream: impl Stream>, ) -> impl Stream> { + const MAX_CONCURRENT_UPLOADS: usize = 3; + debug!("Initializing multipart uploader for video {video_id:?}"); let start = Instant::now(); try_stream! { - let use_md5_hashes = app.is_server_url_custom().await; - - let mut stream = pin!(stream); - let mut prev_part_number = None; - let mut expected_part_number = 1u32; + let use_md5_hashes = app.is_server_url_custom().await; + let mut stream = pin!(stream); + let mut futures = FuturesUnordered::new(); loop { - let (item, mut presigned_url, md5_sum) = if use_md5_hashes { - let Some(item) = stream.next().await else { - break; - }; - let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; - let md5_sum = base64::encode(md5::compute(&item.chunk).0); - let presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, Some( - &md5_sum - )).await?; - - (item, presigned_url, Some(md5_sum)) - } else { - let (Some(item), presigned_url) = join( - stream.next(), - // We generate the presigned URL ahead of time for the part we expect to come next. - // If it's not the chunk that actually comes next we just throw it out. - // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. - api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, None) - ).await else { - break; - }; - - let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; - - (item, presigned_url?, None) - }; - - let Chunk { total_size, part_number, chunk } = item; - trace!("Uploading chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len()); - prev_part_number = Some(part_number); - let size = chunk.len(); - - // We prefetched for the wrong chunk. Let's try again. - if expected_part_number != part_number { - presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, md5_sum.as_deref()) - .await? - } + // Fill up to MAX_CONCURRENT_UPLOADS concurrent uploads + while futures.len() < MAX_CONCURRENT_UPLOADS { + let Some(item_result) = stream.next().await else { + break; + }; + + let item = item_result.map_err(|err| format!("uploader/fs: {err:?}"))?; + let Chunk { total_size, part_number, chunk } = item; + + trace!("Preparing chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len()); + + let app_clone = app.clone(); + let video_id_clone = video_id.clone(); + let upload_id_clone = upload_id.clone(); + + let upload_future = async move { + let size = chunk.len(); + + let (presigned_url, md5_sum) = if use_md5_hashes { + let md5_sum = base64::encode(md5::compute(&chunk).0); + let presigned_url = api::upload_multipart_presign_part(&app_clone, &video_id_clone, &upload_id_clone, part_number, Some(&md5_sum)).await?; + (presigned_url, Some(md5_sum)) + } else { + let presigned_url = api::upload_multipart_presign_part(&app_clone, &video_id_clone, &upload_id_clone, part_number, None).await?; + (presigned_url, None) + }; - trace!("Uploading part {part_number}"); + trace!("Uploading part {part_number}"); - let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; - let mut req = retryable_client(url.host().unwrap_or("").to_string()) - .build() - .map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))? - .put(&presigned_url) - .header("Content-Length", chunk.len()) - .timeout(Duration::from_secs(5 * 60)).body(chunk); + let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; + let mut req = retryable_client(url.host().unwrap_or("").to_string()) + .build() + .map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))? + .put(&presigned_url) + .header("Content-Length", chunk.len()) + .timeout(Duration::from_secs(5 * 60)) + .body(chunk); - if let Some(md5_sum) = &md5_sum { - req = req.header("Content-MD5", md5_sum); - } + if let Some(md5_sum) = &md5_sum { + req = req.header("Content-MD5", md5_sum); + } - let resp = req - .send() - .instrument(info_span!("send", size = size)) - .await - .map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?; + let resp = req + .send() + .instrument(info_span!("send", size = size)) + .await + .map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?; - let etag = resp.headers().get("ETag").as_ref().and_then(|etag| etag.to_str().ok()).map(|v| v.trim_matches('"').to_string()); + let etag = resp.headers().get("ETag").as_ref().and_then(|etag| etag.to_str().ok()).map(|v| v.trim_matches('"').to_string()); - match !resp.status().is_success() { - true => Err(format!("uploader/part/{part_number}/error: {}", resp.text().await.unwrap_or_default())), - false => Ok(()), - }?; + match !resp.status().is_success() { + true => Err(format!("uploader/part/{part_number}/error: {}", resp.text().await.unwrap_or_default())), + false => Ok(()), + }?; - trace!("Completed upload of part {part_number}"); + trace!("Completed upload of part {part_number}"); - yield UploadedPart { - etag: etag.ok_or_else(|| format!("uploader/part/{part_number}/error: ETag header not found"))?, - part_number, - size, - total_size - }; + Ok(UploadedPart { + etag: etag.ok_or_else(|| format!("uploader/part/{part_number}/error: ETag header not found"))?, + part_number, + size, + total_size, + }) as Result + }; + + futures.push(upload_future); + } - expected_part_number = part_number + 1; + // If all futures are done, the upload is complete + if futures.is_empty() { + break; + } + + // Wait for one more future, then we will have a free future slot for the next chunk + if let Some(result) = futures.next().await { + yield result?; + } } debug!("Completed multipart upload for {video_id:?} in {:?}", start.elapsed()); From 942b4e44086b0076526959945e997859b2aa1597 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 21 Oct 2025 14:46:33 +0800 Subject: [PATCH 02/10] capture OS and Arch in PostHog events --- apps/desktop/src-tauri/src/posthog.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/desktop/src-tauri/src/posthog.rs b/apps/desktop/src-tauri/src/posthog.rs index 394d5e9c90..d260e0c8b2 100644 --- a/apps/desktop/src-tauri/src/posthog.rs +++ b/apps/desktop/src-tauri/src/posthog.rs @@ -54,6 +54,12 @@ impl From for posthog_rs::Event { e.insert_prop("cap_version", env!("CARGO_PKG_VERSION")) .map_err(|err| error!("Error adding PostHog property: {err:?}")) .ok(); + e.insert_prop("os", std::env::consts::OS) + .map_err(|err| error!("Error adding PostHog property: {err:?}")) + .ok(); + e.insert_prop("arch", std::env::consts::ARCH) + .map_err(|err| error!("Error adding PostHog property: {err:?}")) + .ok(); e } From e0a8cf441693e9992e573223cc196621b202f11b Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 21 Oct 2025 14:46:44 +0800 Subject: [PATCH 03/10] wip: upload multiple chunks at once w/ presigning --- apps/desktop/src-tauri/src/upload.rs | 84 ++++++++++++++++++---------- 1 file changed, 53 insertions(+), 31 deletions(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 01c154fd4d..26c2463457 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -13,8 +13,8 @@ use cap_project::{RecordingMeta, S3UploadMeta, UploadMeta}; use cap_utils::spawn_actor; use ffmpeg::ffi::AV_TIME_BASE; use flume::Receiver; -use futures::stream::FuturesUnordered; use futures::{Stream, StreamExt, TryStreamExt, stream}; +use futures::{future::join, stream::FuturesUnordered}; use image::{ImageReader, codecs::jpeg::JpegEncoder}; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; @@ -629,37 +629,60 @@ fn multipart_uploader( let start = Instant::now(); try_stream! { - let use_md5_hashes = app.is_server_url_custom().await; - let mut stream = pin!(stream); + let use_md5_hashes = app.is_server_url_custom().await; + + let mut stream = pin!(stream); let mut futures = FuturesUnordered::new(); + let mut prev_part_number = None; loop { // Fill up to MAX_CONCURRENT_UPLOADS concurrent uploads while futures.len() < MAX_CONCURRENT_UPLOADS { - let Some(item_result) = stream.next().await else { - break; - }; + let expected_part_number = prev_part_number.map(|p| p + 1).unwrap_or(1); + let (expected_part_number, item, mut presigned_url, md5_sum) = if use_md5_hashes { + let Some(item) = stream.next().await else { + break; + }; + let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; + let md5_sum = base64::encode(md5::compute(&item.chunk).0); + let presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, item.part_number, Some( + &md5_sum + )).await?; + + // `expected_part_number` being `item.part_number` means it will never be wrong (and hence cause a second presigning later on). + (item.part_number, item, presigned_url, Some(md5_sum)) + } else { + let (Some(item), presigned_url) = join( + stream.next(), + // We generate the presigned URL ahead of time for the part we expect to come next. + // If it's not the chunk that actually comes next we just throw it out. + // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. + api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, None) + ).await else { + break; + }; + + let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; + + (expected_part_number, item, presigned_url?, None) + }; + + let app = app.clone(); + let video_id = video_id.clone(); + let upload_id = upload_id.clone(); - let item = item_result.map_err(|err| format!("uploader/fs: {err:?}"))?; let Chunk { total_size, part_number, chunk } = item; + trace!("Uploading chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len()); + prev_part_number = Some(part_number); - trace!("Preparing chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len()); - - let app_clone = app.clone(); - let video_id_clone = video_id.clone(); - let upload_id_clone = upload_id.clone(); - - let upload_future = async move { + let upload_future = async move { let size = chunk.len(); - let (presigned_url, md5_sum) = if use_md5_hashes { - let md5_sum = base64::encode(md5::compute(&chunk).0); - let presigned_url = api::upload_multipart_presign_part(&app_clone, &video_id_clone, &upload_id_clone, part_number, Some(&md5_sum)).await?; - (presigned_url, Some(md5_sum)) - } else { - let presigned_url = api::upload_multipart_presign_part(&app_clone, &video_id_clone, &upload_id_clone, part_number, None).await?; - (presigned_url, None) - }; + // We prefetched for the wrong chunk. Let's try again with the correct part number now that we know it. + if expected_part_number != part_number { + presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, md5_sum.as_deref()) + .await? + } trace!("Uploading part {part_number}"); @@ -669,16 +692,15 @@ fn multipart_uploader( .map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))? .put(&presigned_url) .header("Content-Length", chunk.len()) - .timeout(Duration::from_secs(5 * 60)) - .body(chunk); + .timeout(Duration::from_secs(5 * 60)).body(chunk); if let Some(md5_sum) = &md5_sum { - req = req.header("Content-MD5", md5_sum); + req = req.header("Content-MD5", md5_sum); } let resp = req .send() - .instrument(info_span!("send", size = size)) + .instrument(info_span!("s3_put", size = size)) .await .map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?; @@ -691,15 +713,15 @@ fn multipart_uploader( trace!("Completed upload of part {part_number}"); - Ok(UploadedPart { + Ok::<_, AuthedApiError>(UploadedPart { etag: etag.ok_or_else(|| format!("uploader/part/{part_number}/error: ETag header not found"))?, part_number, size, - total_size, - }) as Result - }; + total_size + }) + }.instrument(info_span!("upload_part", part_number = part_number)); - futures.push(upload_future); + futures.push(upload_future); } // If all futures are done, the upload is complete From 683e25b12ad4de115739994288265f243ef74213 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 21 Oct 2025 14:54:09 +0800 Subject: [PATCH 04/10] wip --- apps/desktop/src-tauri/src/upload.rs | 178 +++++++++++++-------------- 1 file changed, 89 insertions(+), 89 deletions(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 26c2463457..8eca0c2100 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -13,7 +13,10 @@ use cap_project::{RecordingMeta, S3UploadMeta, UploadMeta}; use cap_utils::spawn_actor; use ffmpeg::ffi::AV_TIME_BASE; use flume::Receiver; -use futures::{Stream, StreamExt, TryStreamExt, stream}; +use futures::{ + Stream, StreamExt, TryStreamExt, + stream::{self, FuturesOrdered}, +}; use futures::{future::join, stream::FuturesUnordered}; use image::{ImageReader, codecs::jpeg::JpegEncoder}; use reqwest::StatusCode; @@ -632,107 +635,104 @@ fn multipart_uploader( let use_md5_hashes = app.is_server_url_custom().await; let mut stream = pin!(stream); - let mut futures = FuturesUnordered::new(); + let mut futures = FuturesOrdered::new(); // Being ordered is important for progress tracking! let mut prev_part_number = None; loop { - // Fill up to MAX_CONCURRENT_UPLOADS concurrent uploads - while futures.len() < MAX_CONCURRENT_UPLOADS { - let expected_part_number = prev_part_number.map(|p| p + 1).unwrap_or(1); - let (expected_part_number, item, mut presigned_url, md5_sum) = if use_md5_hashes { - let Some(item) = stream.next().await else { - break; - }; - let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; - let md5_sum = base64::encode(md5::compute(&item.chunk).0); - let presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, item.part_number, Some( - &md5_sum - )).await?; - - // `expected_part_number` being `item.part_number` means it will never be wrong (and hence cause a second presigning later on). - (item.part_number, item, presigned_url, Some(md5_sum)) - } else { - let (Some(item), presigned_url) = join( - stream.next(), - // We generate the presigned URL ahead of time for the part we expect to come next. - // If it's not the chunk that actually comes next we just throw it out. - // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. - api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, None) - ).await else { - break; - }; - - let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; - - (expected_part_number, item, presigned_url?, None) - }; - - let app = app.clone(); - let video_id = video_id.clone(); - let upload_id = upload_id.clone(); - - let Chunk { total_size, part_number, chunk } = item; - trace!("Uploading chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len()); - prev_part_number = Some(part_number); - - let upload_future = async move { - let size = chunk.len(); - - // We prefetched for the wrong chunk. Let's try again with the correct part number now that we know it. - if expected_part_number != part_number { - presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, md5_sum.as_deref()) - .await? - } + let expected_part_number = prev_part_number.map(|p| p + 1).unwrap_or(1); + let (expected_part_number, item, mut presigned_url, md5_sum) = if use_md5_hashes { + let Some(item) = stream.next().await else { + break; + }; + let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; + let md5_sum = base64::encode(md5::compute(&item.chunk).0); + let presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, item.part_number, Some( + &md5_sum + )).await?; + + // `expected_part_number` being `item.part_number` means it will never be wrong (and hence cause a second presigning later on). + (item.part_number, item, presigned_url, Some(md5_sum)) + } else { + let (Some(item), presigned_url) = join( + stream.next(), + // We generate the presigned URL ahead of time for the part we expect to come next. + // If it's not the chunk that actually comes next we just throw it out. + // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. + api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, None) + ).await else { + break; + }; + + let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; + + (expected_part_number, item, presigned_url?, None) + }; + + // Stop processing new chunks until we have space for more concurrent uploads + while futures.len() >= MAX_CONCURRENT_UPLOADS { + futures.next().await?; + } + + let app = app.clone(); + let video_id = video_id.clone(); + let upload_id = upload_id.clone(); + + let Chunk { total_size, part_number, chunk } = item; + trace!("Uploading chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len()); + prev_part_number = Some(part_number); + + let upload_future = async move { + let size = chunk.len(); + + // We prefetched for the wrong chunk. Let's try again with the correct part number now that we know it. + if expected_part_number != part_number { + presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, md5_sum.as_deref()) + .await? + } - trace!("Uploading part {part_number}"); + trace!("Uploading part {part_number}"); - let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; - let mut req = retryable_client(url.host().unwrap_or("").to_string()) - .build() - .map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))? - .put(&presigned_url) - .header("Content-Length", chunk.len()) - .timeout(Duration::from_secs(5 * 60)).body(chunk); + let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; + let mut req = retryable_client(url.host().unwrap_or("").to_string()) + .build() + .map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))? + .put(&presigned_url) + .header("Content-Length", chunk.len()) + .timeout(Duration::from_secs(5 * 60)).body(chunk); - if let Some(md5_sum) = &md5_sum { - req = req.header("Content-MD5", md5_sum); - } - - let resp = req - .send() - .instrument(info_span!("s3_put", size = size)) - .await - .map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?; + if let Some(md5_sum) = &md5_sum { + req = req.header("Content-MD5", md5_sum); + } - let etag = resp.headers().get("ETag").as_ref().and_then(|etag| etag.to_str().ok()).map(|v| v.trim_matches('"').to_string()); + let resp = req + .send() + .instrument(info_span!("s3_put", size = size)) + .await + .map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?; - match !resp.status().is_success() { - true => Err(format!("uploader/part/{part_number}/error: {}", resp.text().await.unwrap_or_default())), - false => Ok(()), - }?; + let etag = resp.headers().get("ETag").as_ref().and_then(|etag| etag.to_str().ok()).map(|v| v.trim_matches('"').to_string()); - trace!("Completed upload of part {part_number}"); + match !resp.status().is_success() { + true => Err(format!("uploader/part/{part_number}/error: {}", resp.text().await.unwrap_or_default())), + false => Ok(()), + }?; - Ok::<_, AuthedApiError>(UploadedPart { - etag: etag.ok_or_else(|| format!("uploader/part/{part_number}/error: ETag header not found"))?, - part_number, - size, - total_size - }) - }.instrument(info_span!("upload_part", part_number = part_number)); + trace!("Completed upload of part {part_number}"); - futures.push(upload_future); - } + Ok::<_, AuthedApiError>(UploadedPart { + etag: etag.ok_or_else(|| format!("uploader/part/{part_number}/error: ETag header not found"))?, + part_number, + size, + total_size + }) + }.instrument(info_span!("upload_part", part_number = part_number)); - // If all futures are done, the upload is complete - if futures.is_empty() { - break; - } + futures.push(upload_future); + } - // Wait for one more future, then we will have a free future slot for the next chunk - if let Some(result) = futures.next().await { - yield result?; - } + // Wait for everything to complete + while !futures.is_empty(){ + futures.next().await?; } debug!("Completed multipart upload for {video_id:?} in {:?}", start.elapsed()); From d6db418a8255d30b14caeb590bbf52a08e200f94 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 21 Oct 2025 16:28:04 +0800 Subject: [PATCH 05/10] new buffered stream uploader --- apps/desktop/src-tauri/src/upload.rs | 253 ++++++++++++++++----------- 1 file changed, 148 insertions(+), 105 deletions(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 8eca0c2100..d9caf40d37 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -14,7 +14,7 @@ use cap_utils::spawn_actor; use ffmpeg::ffi::AV_TIME_BASE; use flume::Receiver; use futures::{ - Stream, StreamExt, TryStreamExt, + FutureExt, Stream, StreamExt, TryStreamExt, stream::{self, FuturesOrdered}, }; use futures::{future::join, stream::FuturesUnordered}; @@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize}; use specta::Type; use std::{ collections::HashMap, + future::Ready, io, path::{Path, PathBuf}, pin::pin, @@ -624,119 +625,161 @@ fn multipart_uploader( app: AppHandle, video_id: String, upload_id: String, - stream: impl Stream>, -) -> impl Stream> { + stream: impl Stream> + Send + 'static, +) -> impl Stream> + 'static { const MAX_CONCURRENT_UPLOADS: usize = 3; debug!("Initializing multipart uploader for video {video_id:?}"); let start = Instant::now(); + let video_id2 = video_id.clone(); + + stream::once(async move { + let use_md5_hashes = app.is_server_url_custom().await; + + stream::unfold( + (Box::pin(stream), 1), + move |(mut stream, expected_part_number)| { + let app = app.clone(); + let video_id = video_id.clone(); + let upload_id = upload_id.clone(); + + async move { + let (Some(item), presigned_url) = join(stream.next(), async { + // Self-hosted still uses the legacy web API which requires these so we can't presign the URL. + if use_md5_hashes { + return Ok(None); + } - try_stream! { - let use_md5_hashes = app.is_server_url_custom().await; - - let mut stream = pin!(stream); - let mut futures = FuturesOrdered::new(); // Being ordered is important for progress tracking! - let mut prev_part_number = None; - - loop { - let expected_part_number = prev_part_number.map(|p| p + 1).unwrap_or(1); - let (expected_part_number, item, mut presigned_url, md5_sum) = if use_md5_hashes { - let Some(item) = stream.next().await else { - break; - }; - let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; - let md5_sum = base64::encode(md5::compute(&item.chunk).0); - let presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, item.part_number, Some( - &md5_sum - )).await?; - - // `expected_part_number` being `item.part_number` means it will never be wrong (and hence cause a second presigning later on). - (item.part_number, item, presigned_url, Some(md5_sum)) - } else { - let (Some(item), presigned_url) = join( - stream.next(), - // We generate the presigned URL ahead of time for the part we expect to come next. - // If it's not the chunk that actually comes next we just throw it out. - // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. - api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, None) - ).await else { - break; - }; - - let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; - - (expected_part_number, item, presigned_url?, None) - }; - - // Stop processing new chunks until we have space for more concurrent uploads - while futures.len() >= MAX_CONCURRENT_UPLOADS { - futures.next().await?; - } - - let app = app.clone(); - let video_id = video_id.clone(); - let upload_id = upload_id.clone(); - - let Chunk { total_size, part_number, chunk } = item; - trace!("Uploading chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len()); - prev_part_number = Some(part_number); - - let upload_future = async move { - let size = chunk.len(); - - // We prefetched for the wrong chunk. Let's try again with the correct part number now that we know it. - if expected_part_number != part_number { - presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, md5_sum.as_deref()) - .await? - } - - trace!("Uploading part {part_number}"); - - let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; - let mut req = retryable_client(url.host().unwrap_or("").to_string()) - .build() - .map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))? - .put(&presigned_url) - .header("Content-Length", chunk.len()) - .timeout(Duration::from_secs(5 * 60)).body(chunk); - - if let Some(md5_sum) = &md5_sum { - req = req.header("Content-MD5", md5_sum); - } - - let resp = req - .send() - .instrument(info_span!("s3_put", size = size)) + // We generate the presigned URL ahead of time for the part we expect to come next. + // If it's not the chunk that actually comes next we just throw it out. + // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. + api::upload_multipart_presign_part( + &app, + &video_id, + &upload_id, + expected_part_number, + None, + ) + .await + .map(Some) + }) .await - .map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?; - - let etag = resp.headers().get("ETag").as_ref().and_then(|etag| etag.to_str().ok()).map(|v| v.trim_matches('"').to_string()); - - match !resp.status().is_success() { - true => Err(format!("uploader/part/{part_number}/error: {}", resp.text().await.unwrap_or_default())), - false => Ok(()), - }?; - - trace!("Completed upload of part {part_number}"); - - Ok::<_, AuthedApiError>(UploadedPart { - etag: etag.ok_or_else(|| format!("uploader/part/{part_number}/error: ETag header not found"))?, - part_number, - size, - total_size - }) - }.instrument(info_span!("upload_part", part_number = part_number)); + else { + return None; + }; - futures.push(upload_future); - } + let part_number = item + .as_ref() + .map(|c| c.part_number.to_string()) + .unwrap_or_else(|_| "--".into()); + + Some(( + async move { + let Chunk { + total_size, + part_number, + chunk, + } = item.map_err(|err| { + format!("uploader/part/{:?}/fs: {err:?}", expected_part_number) + })?; + trace!( + "Uploading chunk {part_number} ({} bytes) for video {video_id:?}", + chunk.len() + ); + + // We prefetched for the wrong chunk. Let's try again with the correct part number now that we know it. + let md5_sum = + use_md5_hashes.then(|| base64::encode(md5::compute(&chunk).0)); + let presigned_url = if let Some(url) = presigned_url? + && (part_number == expected_part_number || md5_sum.is_some()) + { + url + } else { + api::upload_multipart_presign_part( + &app, + &video_id, + &upload_id, + part_number, + md5_sum.as_deref(), + ) + .await? + }; + let size = chunk.len(); + + let url = Uri::from_str(&presigned_url).map_err(|err| { + format!("uploader/part/{part_number}/invalid_url: {err:?}") + })?; + let mut req = + retryable_client(url.host().unwrap_or("").to_string()) + .build() + .map_err(|err| { + format!("uploader/part/{part_number}/client: {err:?}") + })? + .put(&presigned_url) + .header("Content-Length", chunk.len()) + .timeout(Duration::from_secs(5 * 60)) + .body(chunk); + + if let Some(md5_sum) = &md5_sum { + req = req.header("Content-MD5", md5_sum); + } - // Wait for everything to complete - while !futures.is_empty(){ - futures.next().await?; - } + let resp = req + .send() + .instrument(info_span!("s3_put", size = size)) + .await + .map_err(|err| { + format!("uploader/part/{part_number}/error: {err:?}") + })?; + + let etag = resp + .headers() + .get("ETag") + .as_ref() + .and_then(|etag| etag.to_str().ok()) + .map(|v| v.trim_matches('"').to_string()); + + match !resp.status().is_success() { + true => Err(format!( + "uploader/part/{part_number}/error: {}", + resp.text().await.unwrap_or_default() + )), + false => Ok(()), + }?; + + trace!("Completed upload of part {part_number}"); + + Ok::<_, AuthedApiError>(UploadedPart { + etag: etag.ok_or_else(|| { + format!( + "uploader/part/{part_number}/error: ETag header not found" + ) + })?, + part_number, + size, + total_size, + }) + } + .instrument(info_span!("upload_part", part_number = part_number)), + (stream, expected_part_number + 1), + )) + } + // .instrument(Span::current()) + }, + ) + .buffered(MAX_CONCURRENT_UPLOADS) + .boxed() + // .instrument(Span::current()) + }) + .chain(stream::once(async move { + debug!( + "Completed multipart upload for {video_id2:?} in {:?}", + start.elapsed() + ); - debug!("Completed multipart upload for {video_id:?} in {:?}", start.elapsed()); - } + stream::empty().boxed() + })) + .flatten() .instrument(Span::current()) } From 2653376297114ffb4793f64ab5f8884520528a75 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 21 Oct 2025 16:33:02 +0800 Subject: [PATCH 06/10] bump chunk size --- apps/desktop/src-tauri/src/upload.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index d9caf40d37..7051ecfb56 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -13,18 +13,14 @@ use cap_project::{RecordingMeta, S3UploadMeta, UploadMeta}; use cap_utils::spawn_actor; use ffmpeg::ffi::AV_TIME_BASE; use flume::Receiver; -use futures::{ - FutureExt, Stream, StreamExt, TryStreamExt, - stream::{self, FuturesOrdered}, -}; -use futures::{future::join, stream::FuturesUnordered}; +use futures::future::join; +use futures::{Stream, StreamExt, TryStreamExt, stream}; use image::{ImageReader, codecs::jpeg::JpegEncoder}; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use specta::Type; use std::{ collections::HashMap, - future::Ready, io, path::{Path, PathBuf}, pin::pin, @@ -58,8 +54,8 @@ pub struct UploadProgressEvent { total: String, } -// a typical recommended chunk size is 5MB (AWS min part size). -const CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5MB +// The size of each S3 multipart upload chunk +const CHUNK_SIZE: u64 = 20 * 1024 * 1024; // 20 MB #[instrument(skip(app, channel, file_path, screenshot_path))] pub async fn upload_video( @@ -764,12 +760,10 @@ fn multipart_uploader( (stream, expected_part_number + 1), )) } - // .instrument(Span::current()) }, ) .buffered(MAX_CONCURRENT_UPLOADS) .boxed() - // .instrument(Span::current()) }) .chain(stream::once(async move { debug!( From 659e253b7e0d59d6252b9c893a9c00b51bfec27d Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 21 Oct 2025 16:56:28 +0800 Subject: [PATCH 07/10] reuse part 1 presigned URL --- apps/desktop/src-tauri/src/upload.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 7051ecfb56..0ec0aab07d 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -25,6 +25,7 @@ use std::{ path::{Path, PathBuf}, pin::pin, str::FromStr, + sync::{Arc, Mutex, PoisonError}, time::Duration, }; use tauri::{AppHandle, ipc::Channel}; @@ -631,6 +632,7 @@ fn multipart_uploader( stream::once(async move { let use_md5_hashes = app.is_server_url_custom().await; + let first_chunk_presigned_url = Arc::new(Mutex::new(None::<(String, Instant)>)); stream::unfold( (Box::pin(stream), 1), @@ -638,6 +640,7 @@ fn multipart_uploader( let app = app.clone(); let video_id = video_id.clone(); let upload_id = upload_id.clone(); + let first_chunk_presigned_url = first_chunk_presigned_url.clone(); async move { let (Some(item), presigned_url) = join(stream.next(), async { @@ -690,6 +693,16 @@ fn multipart_uploader( && (part_number == expected_part_number || md5_sum.is_some()) { url + } else if part_number == 1 + // We have a presigned URL left around from the first chunk + && let Some((url, expiry)) = first_chunk_presigned_url + .lock() + .unwrap_or_else(PoisonError::into_inner) + .clone() + // The URL hasn't expired + && expiry.elapsed() < Duration::from_secs(60 * 50) + { + url } else { api::upload_multipart_presign_part( &app, @@ -700,8 +713,17 @@ fn multipart_uploader( ) .await? }; - let size = chunk.len(); + // We cache the presigned URL for the first chunk, + // as for instant mode we upload the first chunk at the end again to include the updated video metadata. + if part_number == 1 { + *first_chunk_presigned_url + .lock() + .unwrap_or_else(PoisonError::into_inner) = + Some((presigned_url.clone(), Instant::now())); + } + + let size = chunk.len(); let url = Uri::from_str(&presigned_url).map_err(|err| { format!("uploader/part/{part_number}/invalid_url: {err:?}") })?; From 451545a2efe28fc1125741e52eed2dc9fb1de675 Mon Sep 17 00:00:00 2001 From: Brendan Allan Date: Tue, 21 Oct 2025 19:20:36 +0800 Subject: [PATCH 08/10] don't reuse first presigned url if using md5 hashes --- apps/desktop/src-tauri/src/upload.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 0ec0aab07d..fbf1ca74cc 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -694,6 +694,7 @@ fn multipart_uploader( { url } else if part_number == 1 + && !use_md5_hashes // We have a presigned URL left around from the first chunk && let Some((url, expiry)) = first_chunk_presigned_url .lock() From 9c900bbcddcb15ab7bd3481f3aa8f59eeaa9606d Mon Sep 17 00:00:00 2001 From: Brendan Allan Date: Tue, 21 Oct 2025 20:08:49 +0800 Subject: [PATCH 09/10] use min and max chunk size --- apps/desktop/src-tauri/src/upload.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index fbf1ca74cc..77ef2407cc 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -56,7 +56,8 @@ pub struct UploadProgressEvent { } // The size of each S3 multipart upload chunk -const CHUNK_SIZE: u64 = 20 * 1024 * 1024; // 20 MB +const MIN_CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5 MB +const MAX_CHUNK_SIZE: u64 = 20 * 1024 * 1024; // 20 MB #[instrument(skip(app, channel, file_path, screenshot_path))] pub async fn upload_video( @@ -459,7 +460,7 @@ pub fn from_file_to_chunks(path: PathBuf) -> impl Stream = None; - let mut chunk_buffer = vec![0u8; CHUNK_SIZE as usize]; + let mut chunk_buffer = vec![0u8; MAX_CHUNK_SIZE as usize]; loop { // Check if realtime recording is done @@ -529,13 +530,13 @@ pub fn from_pending_file_to_chunks( // Determine if we should read a chunk let should_read_chunk = if let Some(is_done) = realtime_is_done { - (new_data_size >= CHUNK_SIZE) || (is_done && new_data_size > 0) + (new_data_size >= MIN_CHUNK_SIZE) || (is_done && new_data_size > 0) } else { new_data_size > 0 }; if should_read_chunk { - let chunk_size = std::cmp::min(new_data_size, CHUNK_SIZE) as usize; + let chunk_size = std::cmp::min(new_data_size, MAX_CHUNK_SIZE) as usize; file.seek(std::io::SeekFrom::Start(last_read_position)).await?; From 83f31951e2383d7a9f521787e69cb4563af2ea1f Mon Sep 17 00:00:00 2001 From: Brendan Allan Date: Tue, 21 Oct 2025 20:20:10 +0800 Subject: [PATCH 10/10] remove unnecessary check --- apps/desktop/src-tauri/src/upload.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 77ef2407cc..d27d1246bc 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -691,7 +691,7 @@ fn multipart_uploader( let md5_sum = use_md5_hashes.then(|| base64::encode(md5::compute(&chunk).0)); let presigned_url = if let Some(url) = presigned_url? - && (part_number == expected_part_number || md5_sum.is_some()) + && part_number == expected_part_number { url } else if part_number == 1