diff --git a/apps/desktop/src-tauri/src/posthog.rs b/apps/desktop/src-tauri/src/posthog.rs index 394d5e9c90..d260e0c8b2 100644 --- a/apps/desktop/src-tauri/src/posthog.rs +++ b/apps/desktop/src-tauri/src/posthog.rs @@ -54,6 +54,12 @@ impl From for posthog_rs::Event { e.insert_prop("cap_version", env!("CARGO_PKG_VERSION")) .map_err(|err| error!("Error adding PostHog property: {err:?}")) .ok(); + e.insert_prop("os", std::env::consts::OS) + .map_err(|err| error!("Error adding PostHog property: {err:?}")) + .ok(); + e.insert_prop("arch", std::env::consts::ARCH) + .map_err(|err| error!("Error adding PostHog property: {err:?}")) + .ok(); e } diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 21e0268c53..d27d1246bc 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -13,7 +13,8 @@ use cap_project::{RecordingMeta, S3UploadMeta, UploadMeta}; use cap_utils::spawn_actor; use ffmpeg::ffi::AV_TIME_BASE; use flume::Receiver; -use futures::{Stream, StreamExt, TryStreamExt, future::join, stream}; +use futures::future::join; +use futures::{Stream, StreamExt, TryStreamExt, stream}; use image::{ImageReader, codecs::jpeg::JpegEncoder}; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; @@ -24,6 +25,7 @@ use std::{ path::{Path, PathBuf}, pin::pin, str::FromStr, + sync::{Arc, Mutex, PoisonError}, time::Duration, }; use tauri::{AppHandle, ipc::Channel}; @@ -53,8 +55,9 @@ pub struct UploadProgressEvent { total: String, } -// a typical recommended chunk size is 5MB (AWS min part size). -const CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5MB +// The size of each S3 multipart upload chunk +const MIN_CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5 MB +const MAX_CHUNK_SIZE: u64 = 20 * 1024 * 1024; // 20 MB #[instrument(skip(app, channel, file_path, screenshot_path))] pub async fn upload_video( @@ -457,7 +460,7 @@ pub fn from_file_to_chunks(path: PathBuf) -> impl Stream = None; - let mut chunk_buffer = vec![0u8; CHUNK_SIZE as usize]; + let mut chunk_buffer = vec![0u8; MAX_CHUNK_SIZE as usize]; loop { // Check if realtime recording is done @@ -527,13 +530,13 @@ pub fn from_pending_file_to_chunks( // Determine if we should read a chunk let should_read_chunk = if let Some(is_done) = realtime_is_done { - (new_data_size >= CHUNK_SIZE) || (is_done && new_data_size > 0) + (new_data_size >= MIN_CHUNK_SIZE) || (is_done && new_data_size > 0) } else { new_data_size > 0 }; if should_read_chunk { - let chunk_size = std::cmp::min(new_data_size, CHUNK_SIZE) as usize; + let chunk_size = std::cmp::min(new_data_size, MAX_CHUNK_SIZE) as usize; file.seek(std::io::SeekFrom::Start(last_read_position)).await?; @@ -620,98 +623,181 @@ fn multipart_uploader( app: AppHandle, video_id: String, upload_id: String, - stream: impl Stream>, -) -> impl Stream> { + stream: impl Stream> + Send + 'static, +) -> impl Stream> + 'static { + const MAX_CONCURRENT_UPLOADS: usize = 3; + debug!("Initializing multipart uploader for video {video_id:?}"); let start = Instant::now(); + let video_id2 = video_id.clone(); + + stream::once(async move { + let use_md5_hashes = app.is_server_url_custom().await; + let first_chunk_presigned_url = Arc::new(Mutex::new(None::<(String, Instant)>)); + + stream::unfold( + (Box::pin(stream), 1), + move |(mut stream, expected_part_number)| { + let app = app.clone(); + let video_id = video_id.clone(); + let upload_id = upload_id.clone(); + let first_chunk_presigned_url = first_chunk_presigned_url.clone(); + + async move { + let (Some(item), presigned_url) = join(stream.next(), async { + // Self-hosted still uses the legacy web API which requires these so we can't presign the URL. + if use_md5_hashes { + return Ok(None); + } - try_stream! { - let use_md5_hashes = app.is_server_url_custom().await; - - let mut stream = pin!(stream); - let mut prev_part_number = None; - let mut expected_part_number = 1u32; - - loop { - let (item, mut presigned_url, md5_sum) = if use_md5_hashes { - let Some(item) = stream.next().await else { - break; - }; - let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; - let md5_sum = base64::encode(md5::compute(&item.chunk).0); - let presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, Some( - &md5_sum - )).await?; - - (item, presigned_url, Some(md5_sum)) - } else { - let (Some(item), presigned_url) = join( - stream.next(), - // We generate the presigned URL ahead of time for the part we expect to come next. - // If it's not the chunk that actually comes next we just throw it out. - // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. - api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, None) - ).await else { - break; - }; - - let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; - - (item, presigned_url?, None) - }; - - let Chunk { total_size, part_number, chunk } = item; - trace!("Uploading chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len()); - prev_part_number = Some(part_number); - let size = chunk.len(); - - // We prefetched for the wrong chunk. Let's try again. - if expected_part_number != part_number { - presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, md5_sum.as_deref()) - .await? - } - - trace!("Uploading part {part_number}"); - - let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; - let mut req = retryable_client(url.host().unwrap_or("").to_string()) - .build() - .map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))? - .put(&presigned_url) - .header("Content-Length", chunk.len()) - .timeout(Duration::from_secs(5 * 60)).body(chunk); - - if let Some(md5_sum) = &md5_sum { - req = req.header("Content-MD5", md5_sum); - } - - let resp = req - .send() - .instrument(info_span!("send", size = size)) - .await - .map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?; - - let etag = resp.headers().get("ETag").as_ref().and_then(|etag| etag.to_str().ok()).map(|v| v.trim_matches('"').to_string()); - - match !resp.status().is_success() { - true => Err(format!("uploader/part/{part_number}/error: {}", resp.text().await.unwrap_or_default())), - false => Ok(()), - }?; + // We generate the presigned URL ahead of time for the part we expect to come next. + // If it's not the chunk that actually comes next we just throw it out. + // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. + api::upload_multipart_presign_part( + &app, + &video_id, + &upload_id, + expected_part_number, + None, + ) + .await + .map(Some) + }) + .await + else { + return None; + }; - trace!("Completed upload of part {part_number}"); + let part_number = item + .as_ref() + .map(|c| c.part_number.to_string()) + .unwrap_or_else(|_| "--".into()); + + Some(( + async move { + let Chunk { + total_size, + part_number, + chunk, + } = item.map_err(|err| { + format!("uploader/part/{:?}/fs: {err:?}", expected_part_number) + })?; + trace!( + "Uploading chunk {part_number} ({} bytes) for video {video_id:?}", + chunk.len() + ); + + // We prefetched for the wrong chunk. Let's try again with the correct part number now that we know it. + let md5_sum = + use_md5_hashes.then(|| base64::encode(md5::compute(&chunk).0)); + let presigned_url = if let Some(url) = presigned_url? + && part_number == expected_part_number + { + url + } else if part_number == 1 + && !use_md5_hashes + // We have a presigned URL left around from the first chunk + && let Some((url, expiry)) = first_chunk_presigned_url + .lock() + .unwrap_or_else(PoisonError::into_inner) + .clone() + // The URL hasn't expired + && expiry.elapsed() < Duration::from_secs(60 * 50) + { + url + } else { + api::upload_multipart_presign_part( + &app, + &video_id, + &upload_id, + part_number, + md5_sum.as_deref(), + ) + .await? + }; + + // We cache the presigned URL for the first chunk, + // as for instant mode we upload the first chunk at the end again to include the updated video metadata. + if part_number == 1 { + *first_chunk_presigned_url + .lock() + .unwrap_or_else(PoisonError::into_inner) = + Some((presigned_url.clone(), Instant::now())); + } - yield UploadedPart { - etag: etag.ok_or_else(|| format!("uploader/part/{part_number}/error: ETag header not found"))?, - part_number, - size, - total_size - }; + let size = chunk.len(); + let url = Uri::from_str(&presigned_url).map_err(|err| { + format!("uploader/part/{part_number}/invalid_url: {err:?}") + })?; + let mut req = + retryable_client(url.host().unwrap_or("").to_string()) + .build() + .map_err(|err| { + format!("uploader/part/{part_number}/client: {err:?}") + })? + .put(&presigned_url) + .header("Content-Length", chunk.len()) + .timeout(Duration::from_secs(5 * 60)) + .body(chunk); + + if let Some(md5_sum) = &md5_sum { + req = req.header("Content-MD5", md5_sum); + } - expected_part_number = part_number + 1; - } + let resp = req + .send() + .instrument(info_span!("s3_put", size = size)) + .await + .map_err(|err| { + format!("uploader/part/{part_number}/error: {err:?}") + })?; + + let etag = resp + .headers() + .get("ETag") + .as_ref() + .and_then(|etag| etag.to_str().ok()) + .map(|v| v.trim_matches('"').to_string()); + + match !resp.status().is_success() { + true => Err(format!( + "uploader/part/{part_number}/error: {}", + resp.text().await.unwrap_or_default() + )), + false => Ok(()), + }?; + + trace!("Completed upload of part {part_number}"); + + Ok::<_, AuthedApiError>(UploadedPart { + etag: etag.ok_or_else(|| { + format!( + "uploader/part/{part_number}/error: ETag header not found" + ) + })?, + part_number, + size, + total_size, + }) + } + .instrument(info_span!("upload_part", part_number = part_number)), + (stream, expected_part_number + 1), + )) + } + }, + ) + .buffered(MAX_CONCURRENT_UPLOADS) + .boxed() + }) + .chain(stream::once(async move { + debug!( + "Completed multipart upload for {video_id2:?} in {:?}", + start.elapsed() + ); - debug!("Completed multipart upload for {video_id:?} in {:?}", start.elapsed()); - } + stream::empty().boxed() + })) + .flatten() .instrument(Span::current()) }