Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions apps/desktop/src-tauri/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ pub async fn upload_multipart_presign_part(
video_id: &str,
upload_id: &str,
part_number: u32,
md5_sum: &str,
) -> Result<String, String> {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
Expand All @@ -64,7 +63,6 @@ pub async fn upload_multipart_presign_part(
"videoId": video_id,
"uploadId": upload_id,
"partNumber": part_number,
"md5Sum": md5_sum
}))
})
.await
Expand Down
58 changes: 55 additions & 3 deletions apps/desktop/src-tauri/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,16 +607,49 @@ fn multipart_uploader(
try_stream! {
let mut stream = pin!(stream);
let mut prev_part_number = None;

let mut optimistic_presigned_url_task: Option<tokio::task::JoinHandle<Result<String, String>>> = Some(
tokio::spawn({
let app = app.clone();
let video_id = video_id.clone();
let upload_id = upload_id.clone();

async move {
api::upload_multipart_presign_part(&app, &video_id, &upload_id, 1)
.await
}
})
);
let mut expected_part_number = 1u32;

while let Some(item) = stream.next().await {
let Chunk { total_size, part_number, chunk } = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?;
debug!("Uploading chunk {part_number} for video {video_id:?}");
prev_part_number = Some(part_number);
let md5_sum = base64::encode(md5::compute(&chunk).0);
let size = chunk.len();

let presigned_url =
api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, &md5_sum)
.await?;
let presigned_url = if expected_part_number == part_number {
// The optimistic presigned URL matches, wait for it
if let Some(task) = optimistic_presigned_url_task.take() {
task.await
.map_err(|e| format!("uploader/part/{part_number}/task_join: {e:?}"))?
.map_err(|e| format!("uploader/part/{part_number}/presign: {e}"))?
} else {
// Fallback if no task available
api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number)
.await?
}
} else {
// The optimistic presigned URL doesn't match, abort it and generate a new correct one
if let Some(task) = optimistic_presigned_url_task.take() {
task.abort();
}
debug!("Throwing out optimistic presigned URL for part {expected_part_number} as part {part_number} was requested!");
expected_part_number = part_number;
api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number)
.await?
};

let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?;
let resp = retryable_client(url.host().unwrap_or("<unknown>").to_string())
Expand Down Expand Up @@ -644,6 +677,25 @@ fn multipart_uploader(
size,
total_size
};

// We generate the presigned URL ahead of time for the next expected part.
// This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, we aren't just doing nothing.
expected_part_number = part_number + 1;
optimistic_presigned_url_task = Some(tokio::spawn({
let app = app.clone();
let video_id = video_id.clone();
let upload_id = upload_id.clone();

async move {
api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number)
.await
}
}));
}

// Clean up any remaining optimistic task
if let Some(task) = optimistic_presigned_url_task.take() {
task.abort();
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions apps/web/app/api/upload/[...route]/multipart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,19 @@ app.post(
.object({
uploadId: z.string(),
partNumber: z.number(),
md5Sum: z.string(),
})
.and(
z.union([
z.object({ videoId: z.string() }),
// deprecated
z.object({ fileKey: z.string() }),
// deprecated
// z.object({ md5Sum: z.string() }),
]),
),
),
async (c) => {
const { uploadId, partNumber, md5Sum, ...body } = c.req.valid("json");
const { uploadId, partNumber, ...body } = c.req.valid("json");
const user = c.get("user");

const fileKey = parseVideoIdOrFileKey(user.id, {
Expand All @@ -132,7 +133,6 @@ app.post(
fileKey,
uploadId,
partNumber,
{ ContentMD5: md5Sum },
);

return presignedUrl;
Expand Down
Loading