diff --git a/Cargo.lock b/Cargo.lock index bf4b85873..c1f23d0d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1171,7 +1171,7 @@ dependencies = [ [[package]] name = "cap-desktop" -version = "0.3.74" +version = "0.3.75" dependencies = [ "anyhow", "async-stream", diff --git a/apps/desktop/src-tauri/src/api.rs b/apps/desktop/src-tauri/src/api.rs index ac1367a85..b79cc5c7e 100644 --- a/apps/desktop/src-tauri/src/api.rs +++ b/apps/desktop/src-tauri/src/api.rs @@ -52,6 +52,7 @@ pub async fn upload_multipart_presign_part( video_id: &str, upload_id: &str, part_number: u32, + md5_sum: Option<&str>, ) -> Result { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] @@ -59,15 +60,21 @@ pub async fn upload_multipart_presign_part( presigned_url: String, } + let mut body = serde_json::Map::from_iter([ + ("videoId".to_string(), json!(video_id)), + ("uploadId".to_string(), json!(upload_id)), + ("partNumber".to_string(), json!(part_number)), + ]); + + if let Some(md5_sum) = md5_sum { + body.insert("md5Sum".to_string(), json!(md5_sum)); + } + let resp = app .authed_api_request("/api/upload/multipart/presign-part", |c, url| { c.post(url) .header("Content-Type", "application/json") - .json(&serde_json::json!({ - "videoId": video_id, - "uploadId": upload_id, - "partNumber": part_number, - })) + .json(&serde_json::json!(body)) }) .await .map_err(|err| format!("api/upload_multipart_presign_part/request: {err}"))?; diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 3a3884520..61a4ce425 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -56,7 +56,7 @@ pub struct UploadProgressEvent { // a typical recommended chunk size is 5MB (AWS min part size). const CHUNK_SIZE: u64 = 5 * 1024 * 1024; // 5MB -#[instrument(skip(app, channel))] +#[instrument(skip(app, channel, file_path, screenshot_path))] pub async fn upload_video( app: &AppHandle, video_id: String, @@ -593,44 +593,66 @@ fn multipart_uploader( let start = Instant::now(); try_stream! { - let mut stream = pin!(stream); + let use_md5_hashes = app.is_server_url_custom().await; + + let mut stream = pin!(stream); let mut prev_part_number = None; let mut expected_part_number = 1u32; loop { - let (Some(item), presigned_url) = join( - stream.next(), - // We generate the presigned URL ahead of time for the part we expect to come next. - // If it's not the chunk that actually comes next we just throw it out. - // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. - api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number) - ).await else { - break; - }; - let mut presigned_url = presigned_url?; - - - let Chunk { total_size, part_number, chunk } = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; + let (item, mut presigned_url, md5_sum) = if use_md5_hashes { + let Some(item) = stream.next().await else { + break; + }; + let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; + let md5_sum = base64::encode(md5::compute(&item.chunk).0); + let presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, Some( + &md5_sum + )).await?; + + (item, presigned_url, Some(md5_sum)) + } else { + let (Some(item), presigned_url) = join( + stream.next(), + // We generate the presigned URL ahead of time for the part we expect to come next. + // If it's not the chunk that actually comes next we just throw it out. + // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. + api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number, None) + ).await else { + break; + }; + + let item = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; + + (item, presigned_url?, None) + }; + + let Chunk { total_size, part_number, chunk } = item; trace!("Uploading chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len()); prev_part_number = Some(part_number); let size = chunk.len(); // We prefetched for the wrong chunk. Let's try again. if expected_part_number != part_number { - presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) + presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, md5_sum.as_deref()) .await? } trace!("Uploading part {part_number}"); let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; - let resp = retryable_client(url.host().unwrap_or("").to_string()) + let mut req = retryable_client(url.host().unwrap_or("").to_string()) .build() .map_err(|err| format!("uploader/part/{part_number}/client: {err:?}"))? .put(&presigned_url) .header("Content-Length", chunk.len()) - .timeout(Duration::from_secs(120)) - .body(chunk) + .timeout(Duration::from_secs(120)).body(chunk); + + if let Some(md5_sum) = &md5_sum { + req = req.header("Content-MD5", md5_sum); + } + + let resp = req .send() .await .map_err(|err| format!("uploader/part/{part_number}/error: {err:?}"))?; diff --git a/apps/desktop/src-tauri/src/web_api.rs b/apps/desktop/src-tauri/src/web_api.rs index 73a1aa6b9..b87c61066 100644 --- a/apps/desktop/src-tauri/src/web_api.rs +++ b/apps/desktop/src-tauri/src/web_api.rs @@ -84,6 +84,8 @@ pub trait ManagerExt: Manager { ) -> Result; async fn make_app_url(&self, pathname: impl AsRef) -> String; + + async fn is_server_url_custom(&self) -> bool; } impl + Emitter, R: Runtime> ManagerExt for T { @@ -125,4 +127,15 @@ impl + Emitter, R: Runtime> ManagerExt for T { let server_url = &app_state.read().await.server_url; format!("{}{}", server_url, pathname.as_ref()) } + + async fn is_server_url_custom(&self) -> bool { + let mut state = self.state::>(); + let mut app_state = state.read().await; + + if let Some(env_url) = std::option_env!("VITE_SERVER_URL") { + return app_state.server_url != env_url; + } + + false + } }