Skip to content

Commit

Permalink
✨ Add get_metadata method for bucket environment.
Browse files Browse the repository at this point in the history
  • Loading branch information
langyo committed Aug 20, 2024
1 parent 93a6cab commit 4f13295
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 49 deletions.
14 changes: 12 additions & 2 deletions packages/database/src/mock/bucket.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::ops::RangeInclusive;

use anyhow::Result;
use bytes::Bytes;

Expand All @@ -12,7 +14,15 @@ impl BucketStore for ProxyBucket {
unimplemented!()
}

async fn get(&self, _key: String) -> Result<Option<Bytes>> {
async fn get(
&self,
_key: String,
_range: Option<RangeInclusive<usize>>,
) -> Result<Option<Bytes>> {
unimplemented!()
}

async fn get_metadata(&self, _key: String) -> Result<BucketItemMetadata> {
unimplemented!()
}

Expand All @@ -32,7 +42,7 @@ impl BucketStore for ProxyBucket {
&self,
_upload_id: String,
_final_data_key: Option<String>,
) -> Result<BucketMultipartUploadResult> {
) -> Result<BucketItemMetadata> {
todo!()
}

Expand Down
105 changes: 68 additions & 37 deletions packages/database_driver_cloudflare/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result};
use bytes::Bytes;
use chrono::DateTime;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::{ops::RangeInclusive, sync::Arc};
use uuid::Uuid;

use worker::{send::SendFuture, Env};
Expand Down Expand Up @@ -39,11 +39,25 @@ impl BucketStore for ProxyBucket {
Ok(())
}

async fn get(&self, key: String) -> Result<Option<Bytes>> {
async fn get(
&self,
key: String,
range: Option<RangeInclusive<usize>>,
) -> Result<Option<Bytes>> {
let env = self.env.bucket(self.bucket_name.as_str())?;

let ret = SendFuture::new(async move {
match env.get(key.to_string().as_str()).execute().await {
let handle = env.get(key.to_string().as_str());
let handle = if let Some(range) = range {
handle.range(worker::Range::OffsetWithLength {
offset: *range.start() as u64,
length: (*range.end() - *range.start()) as u64,
})
} else {
handle
};

match handle.execute().await {
Ok(data) => match data {
Some(data) => match data.body() {
Some(body) => match body.bytes().await {
Expand All @@ -62,6 +76,23 @@ impl BucketStore for ProxyBucket {
Ok(ret)
}

async fn get_metadata(&self, key: String) -> Result<BucketItemMetadata> {
let env = self.env.bucket(self.bucket_name.as_str())?;

let ret = SendFuture::new(async move {
match env.head(key.to_string().as_str()).await {
Ok(data) => match data {
Some(data) => Ok(into_metadata(data)),
None => Err(anyhow!("Failed to get key-value pair: key not found.")),
},
Err(err) => Err(anyhow!("Failed to get key-value pair: {:?}", err)),
}
})
.await?;

Ok(ret)
}

async fn delete(&self, key: String) -> Result<()> {
let env = self.env.bucket(self.bucket_name.as_str())?;

Expand Down Expand Up @@ -166,7 +197,7 @@ impl BucketStore for ProxyBucket {
&self,
key: String,
final_data_key: Option<String>,
) -> Result<BucketMultipartUploadResult> {
) -> Result<BucketItemMetadata> {
if final_data_key.is_some() {
unimplemented!("final_data_key is not supported yet");
}
Expand Down Expand Up @@ -204,39 +235,7 @@ impl BucketStore for ProxyBucket {
anyhow!("Failed to delete multipart upload metadata: {:?}", err)
})?;

Ok(BucketMultipartUploadResult {
key: data.key().to_string(),
version: data.version().to_string(),
size: data.size() as usize,

etag: data.etag().to_string(),
http_etag: data.http_etag().to_string(),
uploaded: DateTime::from_timestamp_millis(
data.uploaded().as_millis() as i64
)
.unwrap_or_default()
.to_utc(),

http_metadata: {
let obj = data.http_metadata();

BucketMultipartUploadResultHttpMetadata {
content_type: obj.content_type.map(|s| s.to_string()),
content_language: obj.content_language.map(|s| s.to_string()),
content_disposition: obj
.content_disposition
.map(|s| s.to_string()),
content_encoding: obj.content_encoding.map(|s| s.to_string()),
cache_control: obj.cache_control.map(|s| s.to_string()),
cache_expiry: obj.cache_expiry.map(|ts| {
DateTime::from_timestamp_millis(ts.as_millis() as i64)
.unwrap_or_default()
.to_utc()
}),
}
},
custom_metadata: data.custom_metadata().unwrap_or_default(),
})
Ok(into_metadata(data))
}
Err(err) => Err(anyhow!("Failed to append multipart upload: {:?}", err)),
},
Expand Down Expand Up @@ -283,6 +282,38 @@ impl BucketStore for ProxyBucket {
}
}

pub fn into_metadata(data: worker::Object) -> BucketItemMetadata {
BucketItemMetadata {
key: data.key().to_string(),
version: data.version().to_string(),
size: data.size() as usize,

etag: data.etag().to_string(),
http_etag: data.http_etag().to_string(),
uploaded: DateTime::from_timestamp_millis(data.uploaded().as_millis() as i64)
.unwrap_or_default()
.to_utc(),

http_metadata: {
let obj = data.http_metadata();

BucketItemHTTPMetadata {
content_type: obj.content_type.map(|s| s.to_string()),
content_language: obj.content_language.map(|s| s.to_string()),
content_disposition: obj.content_disposition.map(|s| s.to_string()),
content_encoding: obj.content_encoding.map(|s| s.to_string()),
cache_control: obj.cache_control.map(|s| s.to_string()),
cache_expiry: obj.cache_expiry.map(|ts| {
DateTime::from_timestamp_millis(ts.as_millis() as i64)
.unwrap_or_default()
.to_utc()
}),
}
},
custom_metadata: data.custom_metadata().unwrap_or_default(),
}
}

pub async fn init_bucket(
env: Arc<Env>,
bucket_name: impl ToString,
Expand Down
13 changes: 11 additions & 2 deletions packages/database_driver_native/src/bucket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use bytes::Bytes;
use std::ops::RangeInclusive;

use tairitsu_database_types::providers::bucket::*;

Expand All @@ -15,7 +16,15 @@ impl BucketStore for ProxyBucket {
todo!()
}

async fn get(&self, _key: String) -> Result<Option<Bytes>> {
async fn get(
&self,
_key: String,
_range: Option<RangeInclusive<usize>>,
) -> Result<Option<Bytes>> {
todo!()
}

async fn get_metadata(&self, _key: String) -> Result<BucketItemMetadata> {
todo!()
}

Expand All @@ -35,7 +44,7 @@ impl BucketStore for ProxyBucket {
&self,
_upload_id: String,
_final_data_key: Option<String>,
) -> Result<BucketMultipartUploadResult> {
) -> Result<BucketItemMetadata> {
todo!()
}

Expand Down
13 changes: 11 additions & 2 deletions packages/database_driver_wasi/src/bucket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use bytes::Bytes;
use std::ops::RangeInclusive;

use tairitsu_database_types::providers::bucket::*;

Expand All @@ -12,7 +13,15 @@ impl BucketStore for ProxyBucket {
todo!()
}

async fn get(&self, _key: String) -> Result<Option<Bytes>> {
async fn get(
&self,
_key: String,
_range: Option<RangeInclusive<usize>>,
) -> Result<Option<Bytes>> {
todo!()
}

async fn get_metadata(&self, _key: String) -> Result<BucketItemMetadata> {
todo!()
}

Expand All @@ -32,7 +41,7 @@ impl BucketStore for ProxyBucket {
&self,
_upload_id: String,
_final_data_key: Option<String>,
) -> Result<BucketMultipartUploadResult> {
) -> Result<BucketItemMetadata> {
todo!()
}

Expand Down
14 changes: 8 additions & 6 deletions packages/database_types/src/providers/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use anyhow::Result;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::{collections::HashMap, ops::RangeInclusive};

#[async_trait::async_trait]
pub trait BucketStore {
async fn get(&self, key: String) -> Result<Option<Bytes>>;
async fn set(&self, key: String, value: Bytes) -> Result<()>;
async fn get(&self, key: String, range: Option<RangeInclusive<usize>>)
-> Result<Option<Bytes>>;
async fn get_metadata(&self, key: String) -> Result<BucketItemMetadata>;
async fn delete(&self, key: String) -> Result<()>;

async fn create_multipart_upload(&self) -> Result<String>;
Expand All @@ -16,12 +18,12 @@ pub trait BucketStore {
&self,
upload_id: String,
final_data_key: Option<String>,
) -> Result<BucketMultipartUploadResult>;
) -> Result<BucketItemMetadata>;
async fn abort_multipart_upload(&self, upload_id: String) -> Result<()>;
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BucketMultipartUploadResult {
pub struct BucketItemMetadata {
pub key: String,
pub version: String,
pub size: usize,
Expand All @@ -30,12 +32,12 @@ pub struct BucketMultipartUploadResult {
pub http_etag: String,
pub uploaded: DateTime<Utc>,

pub http_metadata: BucketMultipartUploadResultHttpMetadata,
pub http_metadata: BucketItemHTTPMetadata,
pub custom_metadata: HashMap<String, String>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct BucketMultipartUploadResultHttpMetadata {
pub struct BucketItemHTTPMetadata {
pub content_type: Option<String>,
pub content_language: Option<String>,
pub content_disposition: Option<String>,
Expand Down

0 comments on commit 4f13295

Please sign in to comment.