Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/desktop/src-tauri/src/posthog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ impl From<PostHogEvent> for posthog_rs::Event {
e.insert_prop("cap_version", env!("CARGO_PKG_VERSION"))
.map_err(|err| error!("Error adding PostHog property: {err:?}"))
.ok();
e.insert_prop("os", std::env::consts::OS)
.map_err(|err| error!("Error adding PostHog property: {err:?}"))
.ok();
e.insert_prop("arch", std::env::consts::ARCH)
.map_err(|err| error!("Error adding PostHog property: {err:?}"))
.ok();

e
}
Expand Down
270 changes: 178 additions & 92 deletions apps/desktop/src-tauri/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use cap_project::{RecordingMeta, S3UploadMeta, UploadMeta};
use cap_utils::spawn_actor;
use ffmpeg::ffi::AV_TIME_BASE;
use flume::Receiver;
use futures::{Stream, StreamExt, TryStreamExt, future::join, stream};
use futures::future::join;
use futures::{Stream, StreamExt, TryStreamExt, stream};
use image::{ImageReader, codecs::jpeg::JpegEncoder};
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
Expand All @@ -24,6 +25,7 @@ use std::{
path::{Path, PathBuf},
pin::pin,
str::FromStr,
sync::{Arc, Mutex, PoisonError},
time::Duration,
};
use tauri::{AppHandle, ipc::Channel};
Expand Down Expand Up @@ -53,8 +55,9 @@ pub struct UploadProgressEvent {
total: String,
}

// a typical recommended chunk size is 5MB (AWS min part size).
const CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5MB
// The size of each S3 multipart upload chunk
const MIN_CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5 MB
const MAX_CHUNK_SIZE: u64 = 20 * 1024 * 1024; // 20 MB

#[instrument(skip(app, channel, file_path, screenshot_path))]
pub async fn upload_video(
Expand Down Expand Up @@ -457,7 +460,7 @@ pub fn from_file_to_chunks(path: PathBuf) -> impl Stream<Item = io::Result<Chunk
let total_size = file.metadata().await?.len();
let mut file = BufReader::new(file);

let mut buf = vec![0u8; CHUNK_SIZE as usize];
let mut buf = vec![0u8; MAX_CHUNK_SIZE as usize];
let mut part_number = 0;
loop {
part_number += 1;
Expand Down Expand Up @@ -497,7 +500,7 @@ pub fn from_pending_file_to_chunks(
let mut last_read_position: u64 = 0;
let mut realtime_is_done = realtime_upload_done.as_ref().map(|_| false);
let mut first_chunk_size: Option<u64> = None;
let mut chunk_buffer = vec![0u8; CHUNK_SIZE as usize];
let mut chunk_buffer = vec![0u8; MAX_CHUNK_SIZE as usize];

loop {
// Check if realtime recording is done
Expand Down Expand Up @@ -527,13 +530,13 @@ pub fn from_pending_file_to_chunks(

// Determine if we should read a chunk
let should_read_chunk = if let Some(is_done) = realtime_is_done {
(new_data_size >= CHUNK_SIZE) || (is_done && new_data_size > 0)
(new_data_size >= MIN_CHUNK_SIZE) || (is_done && new_data_size > 0)
} else {
new_data_size > 0
};

if should_read_chunk {
let chunk_size = std::cmp::min(new_data_size, CHUNK_SIZE) as usize;
let chunk_size = std::cmp::min(new_data_size, MAX_CHUNK_SIZE) as usize;

file.seek(std::io::SeekFrom::Start(last_read_position)).await?;

Expand Down Expand Up @@ -620,98 +623,181 @@ fn multipart_uploader(
app: AppHandle,
video_id: String,
upload_id: String,
stream: impl Stream<Item = io::Result<Chunk>>,
) -> impl Stream<Item = Result<UploadedPart, AuthedApiError>> {
stream: impl Stream<Item = io::Result<Chunk>> + Send + 'static,
) -> impl Stream<Item = Result<UploadedPart, AuthedApiError>> + 'static {
Comment on lines +626 to +627
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

LGTM!

The Send + 'static bounds on the input stream and 'static return type are necessary for the concurrent, buffered upload architecture. The stream must be Send to be processed across threads, and 'static to live beyond the function scope.

Verify that all call sites of multipart_uploader satisfy these new bounds:


🏁 Script executed:

#!/bin/bash
# Check all call sites of multipart_uploader
rg -n -C5 "multipart_uploader\(" --type rust

Length of output: 2519



---

🏁 Script executed:

```shell
#!/bin/bash
# Find the from_pending_file_to_chunks function definition
rg -n -A10 "fn from_pending_file_to_chunks" --type rust

Length of output: 962


🏁 Script executed:

#!/bin/bash
# Find the progress function definition
rg -n -A10 "fn progress" --type rust

Length of output: 836


🏁 Script executed:

#!/bin/bash
# Check if there are any imports or uses of futures or stream utilities
rg -n "use.*Stream|use.*stream" --type rust apps/desktop/src-tauri/src/upload.rs | head -20

Length of output: 137


🏁 Script executed:

#!/bin/bash
# Get more context around the function signatures at lines 626-627
sed -n '620,635p' apps/desktop/src-tauri/src/upload.rs

Length of output: 641


🏁 Script executed:

#!/bin/bash
# Check the full context of both call sites
sed -n '75,90p' apps/desktop/src-tauri/src/upload.rs
sed -n '395,415p' apps/desktop/src-tauri/src/upload.rs

Length of output: 1185



---

🏁 Script executed:

```shell
#!/bin/bash
# Get full from_pending_file_to_chunks implementation (at least 30 lines)
sed -n '483,530p' apps/desktop/src-tauri/src/upload.rs

Length of output: 2135


🏁 Script executed:

#!/bin/bash
# Get full progress function implementation to understand the wrapper
sed -n '876,920p' apps/desktop/src-tauri/src/upload.rs

Length of output: 1844


🏁 Script executed:

#!/bin/bash
# Check Cargo.toml for async_stream version
grep -A2 "async_stream\|async-stream" Cargo.toml 2>/dev/null || grep -r "async_stream\|async-stream" Cargo.* 2>/dev/null | head -5

Length of output: 131


🏁 Script executed:

#!/bin/bash
# Check if there are any recent changes or TODOs related to Send bounds
rg -n "Send|'static" apps/desktop/src-tauri/src/upload.rs | grep -E "626|627|483|486" 

Length of output: 149


Address bounds incompatibility between multipart_uploader and its call sites

The new Send + 'static bounds on the input stream parameter are incompatible with the actual streams being passed. The function from_pending_file_to_chunks (line 483) captures a Receiver<()> which is not Send, making the returned stream non-Send. Similarly, the progress() wrapper (line 876) does not guarantee Send + 'static bounds on its output.

Both call sites (lines 80 and 404) will fail to compile because they pass non-Send streams to multipart_uploader, violating the new bounds.

Fix either:

  • Update from_pending_file_to_chunks to return impl Stream<Item = io::Result<Chunk>> + Send + 'static (e.g., by removing the captured Receiver<()> or refactoring to avoid capturing it in the stream)
  • Update progress() to preserve/add Send + 'static bounds
  • Reconsider the bounds on multipart_uploader if the concurrent architecture doesn't actually require Send + 'static
🤖 Prompt for AI Agents
In apps/desktop/src-tauri/src/upload.rs around lines 626-627, the
multipart_uploader signature requires the input stream to be Send + 'static but
two call sites pass streams that capture a non-Send Receiver
(from_pending_file_to_chunks) or return non-Send streams from progress(); fix by
making the produced streams Send + 'static or by relaxing multipart_uploader
bounds. Preferred fix: change from_pending_file_to_chunks (and/or progress) so
they do not capture a non-Send Receiver—e.g., replace the non-Send channel with
a Send-compatible primitive (tokio::sync::watch or Arc<AtomicBool>) or
restructure the stream generation so the Receiver is not captured inside the
returned async stream—thereby ensuring the returned impl Stream is Send +
'static and the current multipart_uploader bounds remain valid.

const MAX_CONCURRENT_UPLOADS: usize = 3;

debug!("Initializing multipart uploader for video {video_id:?}");
let start = Instant::now();
let video_id2 = video_id.clone();

stream::once(async move {
let use_md5_hashes = app.is_server_url_custom().await;
let first_chunk_presigned_url = Arc::new(Mutex::new(None::<(String, Instant)>));

stream::unfold(
(Box::pin(stream), 1),
move |(mut stream, expected_part_number)| {
let app = app.clone();
let video_id = video_id.clone();
let upload_id = upload_id.clone();
let first_chunk_presigned_url = first_chunk_presigned_url.clone();

async move {
let (Some(item), presigned_url) = join(stream.next(), async {
// Self-hosted still uses the legacy web API which requires these so we can't presign the URL.
if use_md5_hashes {
return Ok(None);
}

try_stream! {
let use_md5_hashes = app.is_server_url_custom().await;

let mut stream = pin!(stream);
let mut prev_part_number = None;
let mut expected_part_number = 1u32;

loop {
let (item, mut presigned_url, md5_sum) = if use_md5_hashes {
let Some(item) = stream.next().await else {
break;
};
let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?;
let md5_sum = base64::encode(md5::compute(&item.chunk).0);
let presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, Some(
&md5_sum
)).await?;

(item, presigned_url, Some(md5_sum))
} else {
let (Some(item), presigned_url) = join(
stream.next(),
// We generate the presigned URL ahead of time for the part we expect to come next.
// If it's not the chunk that actually comes next we just throw it out.
// This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing.
api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, None)
).await else {
break;
};

let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?;

(item, presigned_url?, None)
};

let Chunk { total_size, part_number, chunk } = item;
trace!("Uploading chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len());
prev_part_number = Some(part_number);
let size = chunk.len();

// We prefetched for the wrong chunk. Let's try again.
if expected_part_number != part_number {
presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, md5_sum.as_deref())
.await?
}

trace!("Uploading part {part_number}");

let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?;
let mut req = retryable_client(url.host().unwrap_or("<unknown>").to_string())
.build()
.map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))?
.put(&presigned_url)
.header("Content-Length", chunk.len())
.timeout(Duration::from_secs(5 * 60)).body(chunk);

if let Some(md5_sum) = &md5_sum {
req = req.header("Content-MD5", md5_sum);
}

let resp = req
.send()
.instrument(info_span!("send", size = size))
.await
.map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?;

let etag = resp.headers().get("ETag").as_ref().and_then(|etag| etag.to_str().ok()).map(|v| v.trim_matches('"').to_string());

match !resp.status().is_success() {
true => Err(format!("uploader/part/{part_number}/error: {}", resp.text().await.unwrap_or_default())),
false => Ok(()),
}?;
// We generate the presigned URL ahead of time for the part we expect to come next.
// If it's not the chunk that actually comes next we just throw it out.
// This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing.
api::upload_multipart_presign_part(
&app,
&video_id,
&upload_id,
expected_part_number,
None,
)
.await
.map(Some)
})
.await
else {
return None;
};

trace!("Completed upload of part {part_number}");
let part_number = item
.as_ref()
.map(|c| c.part_number.to_string())
.unwrap_or_else(|_| "--".into());

Some((
async move {
let Chunk {
total_size,
part_number,
chunk,
} = item.map_err(|err| {
format!("uploader/part/{:?}/fs: {err:?}", expected_part_number)
})?;
trace!(
"Uploading chunk {part_number} ({} bytes) for video {video_id:?}",
chunk.len()
);

// We prefetched for the wrong chunk. Let's try again with the correct part number now that we know it.
let md5_sum =
use_md5_hashes.then(|| base64::encode(md5::compute(&chunk).0));
let presigned_url = if let Some(url) = presigned_url?
&& part_number == expected_part_number
{
url
} else if part_number == 1
&& !use_md5_hashes
// We have a presigned URL left around from the first chunk
&& let Some((url, expiry)) = first_chunk_presigned_url
.lock()
.unwrap_or_else(PoisonError::into_inner)
.clone()
// The URL hasn't expired
&& expiry.elapsed() < Duration::from_secs(60 * 50)
{
url
} else {
api::upload_multipart_presign_part(
&app,
&video_id,
&upload_id,
part_number,
md5_sum.as_deref(),
)
.await?
};

// We cache the presigned URL for the first chunk,
// as for instant mode we upload the first chunk at the end again to include the updated video metadata.
if part_number == 1 {
*first_chunk_presigned_url
.lock()
.unwrap_or_else(PoisonError::into_inner) =
Some((presigned_url.clone(), Instant::now()));
}

yield UploadedPart {
etag: etag.ok_or_else(|| format!("uploader/part/{part_number}/error: ETag header not found"))?,
part_number,
size,
total_size
};
let size = chunk.len();
let url = Uri::from_str(&presigned_url).map_err(|err| {
format!("uploader/part/{part_number}/invalid_url: {err:?}")
})?;
let mut req =
retryable_client(url.host().unwrap_or("<unknown>").to_string())
.build()
.map_err(|err| {
format!("uploader/part/{part_number}/client: {err:?}")
})?
.put(&presigned_url)
.header("Content-Length", chunk.len())
.timeout(Duration::from_secs(5 * 60))
.body(chunk);

if let Some(md5_sum) = &md5_sum {
req = req.header("Content-MD5", md5_sum);
}

expected_part_number = part_number + 1;
}
let resp = req
.send()
.instrument(info_span!("s3_put", size = size))
.await
.map_err(|err| {
format!("uploader/part/{part_number}/error: {err:?}")
})?;

let etag = resp
.headers()
.get("ETag")
.as_ref()
.and_then(|etag| etag.to_str().ok())
.map(|v| v.trim_matches('"').to_string());

match !resp.status().is_success() {
true => Err(format!(
"uploader/part/{part_number}/error: {}",
resp.text().await.unwrap_or_default()
)),
false => Ok(()),
}?;

trace!("Completed upload of part {part_number}");

Ok::<_, AuthedApiError>(UploadedPart {
etag: etag.ok_or_else(|| {
format!(
"uploader/part/{part_number}/error: ETag header not found"
)
})?,
part_number,
size,
total_size,
})
}
.instrument(info_span!("upload_part", part_number = part_number)),
(stream, expected_part_number + 1),
))
}
},
)
.buffered(MAX_CONCURRENT_UPLOADS)
.boxed()
})
.chain(stream::once(async move {
debug!(
"Completed multipart upload for {video_id2:?} in {:?}",
start.elapsed()
);

debug!("Completed multipart upload for {video_id:?} in {:?}", start.elapsed());
}
stream::empty().boxed()
}))
.flatten()
.instrument(Span::current())
}

Expand Down
Loading