Skip to content

Commit

Permalink
✨ Completed Cloudflare's multipart upload logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
langyo committed Aug 17, 2024
1 parent 968d5a3 commit 93a6cab
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 109 deletions.
11 changes: 7 additions & 4 deletions packages/database/src/init/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tairitsu_database_types::providers::bucket::BucketStore;

#[derive(Clone)]
pub enum InitBucketParams {
Cloudflare((Arc<worker::Env>, String)),
Cloudflare((Arc<worker::Env>, String, String)),
Native(String),
WASI(String),
}
Expand All @@ -18,10 +18,13 @@ impl Init<Box<crate::prelude::ProxyBucket>> for InitBucketParams {
cfg_if::cfg_if! {
if #[cfg(feature = "cloudflare")] {
match self {
InitBucketParams::Cloudflare((env, bucket_name)) => {
InitBucketParams::Cloudflare((env, bucket_name, multipart_kv_name)) => {
Ok(Box::new(
tairitsu_database_driver_cloudflare::bucket::init_bucket(env, bucket_name)
.await?,
tairitsu_database_driver_cloudflare::bucket::init_bucket(
env,
bucket_name,
multipart_kv_name
).await?,
))
}

Expand Down
1 change: 0 additions & 1 deletion packages/database_driver_cloudflare/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ log = "^0.4"
serde = { version = "^1", features = ["derive"] }
serde_json = { version = "^1" }
strum = { version = "^0.26", features = ["derive"] }
postcard = { version = "^1", features = ["alloc"] }
uuid = { version = "^1", features = [
'v4',
'fast-rng',
Expand Down
232 changes: 128 additions & 104 deletions packages/database_driver_cloudflare/src/bucket.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{anyhow, Result};
use bytes::Bytes;
use chrono::DateTime;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use uuid::Uuid;

Expand All @@ -12,6 +13,14 @@ use tairitsu_database_types::providers::bucket::*;
pub struct ProxyBucket {
env: Arc<Env>,
bucket_name: String,
multipart_kv_name: String,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct BucketMultipartUploadInfo {
key: String,
upload_id: String,
etags: Vec<String>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -68,23 +77,31 @@ impl BucketStore for ProxyBucket {

async fn create_multipart_upload(&self) -> Result<String> {
let env = self.env.bucket(self.bucket_name.as_str())?;
let multipart_kv_env = self.env.kv(self.multipart_kv_name.as_str())?;

let ret = SendFuture::new(async move {
let key = Uuid::new_v4().to_string();
match env.create_multipart_upload(key.clone()).execute().await {
Ok(info) => {
let upload_id = info.upload_id().await;
let upload_id = format!("{}_{}", key, upload_id);

let parts_metadata: Vec<String> = vec![];
let parts_metadata = postcard::to_allocvec(&parts_metadata)?;

self.set(
format!("__multi_{}", upload_id),
Bytes::from(parts_metadata),
)
.await?;
Ok(upload_id)
let parts_metadata = BucketMultipartUploadInfo {
key: key.clone(),
upload_id: upload_id.clone(),
etags: Vec::new(),
};
let parts_metadata = serde_json::to_string(&parts_metadata)?;

multipart_kv_env
.put(&format!("__multi_{}", key), parts_metadata)
.map_err(|err| {
anyhow!("Failed to write multipart upload metadata: {:?}", err)
})?
.execute()
.await
.map_err(|err| {
anyhow!("Failed to write multipart upload metadata: {:?}", err)
})?;
Ok(key)
}
Err(err) => Err(anyhow!("Failed to create multipart upload: {:?}", err)),
}
Expand All @@ -94,45 +111,38 @@ impl BucketStore for ProxyBucket {
Ok(ret)
}

async fn append_multipart_upload(&self, upload_id: String, data: Bytes) -> Result<()> {
async fn append_multipart_upload(&self, key: String, data: Bytes) -> Result<()> {
let env = self.env.bucket(self.bucket_name.as_str())?;

let (key, upload_id) = upload_id
.split_once('_')
.map(|(key, upload_id)| (key.to_string(), upload_id.to_string()))
.ok_or(anyhow!(
"Failed to split into key and upload_id: {:?}",
upload_id
))?;
let multipart_kv_env = self.env.kv(self.multipart_kv_name.as_str())?;

let ret = SendFuture::new(async move {
let parts_metadata =
self.get(format!("__multi_{}", upload_id))
.await?
.ok_or(anyhow!(
"Failed to get part number for multipart upload: {:?}",
upload_id
))?;
let parts_metadata: Vec<String> = postcard::from_bytes(&parts_metadata)?;

match env.resume_multipart_upload(key, upload_id.clone()) {
let parts_metadata = multipart_kv_env
.get(&format!("__multi_{}", key))
.text()
.await
.map_err(|err| anyhow!("Failed to read multipart upload metadata: {:?}", err))?
.ok_or(anyhow!("Failed to read multipart upload metadata."))?;
let parts_metadata: BucketMultipartUploadInfo = serde_json::from_str(&parts_metadata)?;

match env.resume_multipart_upload(key.clone(), parts_metadata.upload_id.clone()) {
Ok(uploader) => match uploader
.upload_part(
parts_metadata.len() as u16,
(parts_metadata.etags.len() + 1) as u16,
worker::Data::Bytes(data.to_vec()),
)
.await
{
Ok(info) => {
let mut parts_metadata = parts_metadata.clone();
parts_metadata.push(info.etag());
let parts_metadata = postcard::to_allocvec(&parts_metadata)?;

match self
.set(
format!("__multi_{}", upload_id),
Bytes::from(parts_metadata),
)
parts_metadata.etags.push(info.etag());
let parts_metadata = serde_json::to_string(&parts_metadata)?;

match multipart_kv_env
.put(&format!("__multi_{}", key), parts_metadata)
.map_err(|err| {
anyhow!("Failed to write multipart upload metadata: {:?}", err)
})?
.execute()
.await
{
Ok(_) => Ok(()),
Expand All @@ -154,77 +164,80 @@ impl BucketStore for ProxyBucket {

async fn complete_multipart_upload(
&self,
upload_id: String,
key: String,
final_data_key: Option<String>,
) -> Result<BucketMultipartUploadResult> {
if final_data_key.is_some() {
unimplemented!("final_data_key is not supported yet");
}

let env = self.env.bucket(self.bucket_name.as_str())?;

let (key, upload_id) = upload_id
.split_once('_')
.map(|(key, upload_id)| (key.to_string(), upload_id.to_string()))
.ok_or(anyhow!(
"Failed to split into key and upload_id: {:?}",
upload_id
))?;
let multipart_kv_env = self.env.kv(self.multipart_kv_name.as_str())?;

let ret = SendFuture::new(async move {
let parts_metadata =
self.get(format!("__multi_{}", upload_id))
.await?
.ok_or(anyhow!(
"Failed to get part number for multipart upload: {:?}",
upload_id
))?;
let parts_metadata: Vec<String> = postcard::from_bytes(&parts_metadata)?;

match env.resume_multipart_upload(key, upload_id.clone()) {
let parts_metadata = multipart_kv_env
.get(&format!("__multi_{}", key))
.text()
.await
.map_err(|err| anyhow!("Failed to read multipart upload metadata: {:?}", err))?
.ok_or(anyhow!("Failed to read multipart upload metadata."))?;
let parts_metadata: BucketMultipartUploadInfo = serde_json::from_str(&parts_metadata)?;

match env.resume_multipart_upload(key.clone(), parts_metadata.upload_id.clone()) {
Ok(uploader) => match uploader
.complete(
parts_metadata
.etags
.iter()
.enumerate()
.map(|(index, item)| {
worker::UploadedPart::new(index as u16, item.clone())
})
.map(|(index, item)| ((index + 1) as u16, item))
.map(|(index, item)| worker::UploadedPart::new(index, item.clone()))
.collect::<Vec<_>>(),
)
.await
{
Ok(data) => Ok(BucketMultipartUploadResult {
key: data.key().to_string(),
version: data.version().to_string(),
size: data.size() as usize,

etag: data.etag().to_string(),
http_etag: data.http_etag().to_string(),
uploaded: DateTime::from_timestamp_millis(
data.uploaded().as_millis() as i64
)
.unwrap_or_default()
.to_utc(),

http_metadata: {
let obj = data.http_metadata();

BucketMultipartUploadResultHttpMetadata {
content_type: obj.content_type.map(|s| s.to_string()),
content_language: obj.content_language.map(|s| s.to_string()),
content_disposition: obj.content_disposition.map(|s| s.to_string()),
content_encoding: obj.content_encoding.map(|s| s.to_string()),
cache_control: obj.cache_control.map(|s| s.to_string()),
cache_expiry: obj.cache_expiry.map(|ts| {
DateTime::from_timestamp_millis(ts.as_millis() as i64)
.unwrap_or_default()
.to_utc()
}),
}
},
custom_metadata: data.custom_metadata().unwrap_or_default(),
}),
Ok(data) => {
multipart_kv_env
.delete(&format!("__multi_{}", key))
.await
.map_err(|err| {
anyhow!("Failed to delete multipart upload metadata: {:?}", err)
})?;

Ok(BucketMultipartUploadResult {
key: data.key().to_string(),
version: data.version().to_string(),
size: data.size() as usize,

etag: data.etag().to_string(),
http_etag: data.http_etag().to_string(),
uploaded: DateTime::from_timestamp_millis(
data.uploaded().as_millis() as i64
)
.unwrap_or_default()
.to_utc(),

http_metadata: {
let obj = data.http_metadata();

BucketMultipartUploadResultHttpMetadata {
content_type: obj.content_type.map(|s| s.to_string()),
content_language: obj.content_language.map(|s| s.to_string()),
content_disposition: obj
.content_disposition
.map(|s| s.to_string()),
content_encoding: obj.content_encoding.map(|s| s.to_string()),
cache_control: obj.cache_control.map(|s| s.to_string()),
cache_expiry: obj.cache_expiry.map(|ts| {
DateTime::from_timestamp_millis(ts.as_millis() as i64)
.unwrap_or_default()
.to_utc()
}),
}
},
custom_metadata: data.custom_metadata().unwrap_or_default(),
})
}
Err(err) => Err(anyhow!("Failed to append multipart upload: {:?}", err)),
},
Err(err) => Err(anyhow!("Failed to resume multipart upload: {:?}", err)),
Expand All @@ -235,22 +248,28 @@ impl BucketStore for ProxyBucket {
Ok(ret)
}

async fn abort_multipart_upload(&self, upload_id: String) -> Result<()> {
async fn abort_multipart_upload(&self, key: String) -> Result<()> {
let env = self.env.bucket(self.bucket_name.as_str())?;

let (key, upload_id) = upload_id
.split_once('_')
.map(|(key, upload_id)| (key.to_string(), upload_id.to_string()))
.ok_or(anyhow!(
"Failed to split into key and upload_id: {:?}",
upload_id
))?;
let multipart_kv_env = self.env.kv(self.multipart_kv_name.as_str())?;

let ret = SendFuture::new(async move {
match env.resume_multipart_upload(key, upload_id.clone()) {
let parts_metadata = multipart_kv_env
.get(&format!("__multi_{}", key))
.text()
.await
.map_err(|err| anyhow!("Failed to read multipart upload metadata: {:?}", err))?
.ok_or(anyhow!("Failed to read multipart upload metadata."))?;
let parts_metadata: BucketMultipartUploadInfo = serde_json::from_str(&parts_metadata)?;

match env.resume_multipart_upload(key.clone(), parts_metadata.upload_id.clone()) {
Ok(uploader) => match uploader.abort().await {
Ok(_) => {
self.delete(format!("__multi_{}", upload_id)).await?;
multipart_kv_env
.delete(&format!("__multi_{}", key))
.await
.map_err(|err| {
anyhow!("Failed to delete multipart upload metadata: {:?}", err)
})?;
Ok(())
}
Err(err) => Err(anyhow!("Failed to abort multipart upload: {:?}", err)),
Expand All @@ -264,9 +283,14 @@ impl BucketStore for ProxyBucket {
}
}

pub async fn init_bucket(env: Arc<Env>, bucket_name: impl ToString) -> Result<ProxyBucket> {
pub async fn init_bucket(
env: Arc<Env>,
bucket_name: impl ToString,
multipart_kv_name: impl ToString,
) -> Result<ProxyBucket> {
Ok(ProxyBucket {
env,
bucket_name: bucket_name.to_string(),
multipart_kv_name: multipart_kv_name.to_string(),
})
}

0 comments on commit 93a6cab

Please sign in to comment.