Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions objectstore-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use secrecy::{CloneableSecret, SecretBox, SerializableSecret, zeroize::Zeroize};
use serde::{Deserialize, Serialize};

pub use objectstore_log::{LevelFilter, LogFormat, LoggingConfig};
pub use objectstore_service::backend::StorageConfig;
pub use objectstore_service::backend::{MultipartUploadStorageConfig, StorageConfig};

use crate::killswitches::Killswitches;
use crate::rate_limits::RateLimits;
Expand Down Expand Up @@ -625,7 +625,7 @@ impl Config {
mod tests {
use std::io::Write;

use objectstore_service::backend::HighVolumeStorageConfig;
use objectstore_service::backend::{HighVolumeStorageConfig, MultipartUploadStorageConfig};
use secrecy::ExposeSecret;

use crate::killswitches::Killswitch;
Expand Down Expand Up @@ -771,7 +771,7 @@ mod tests {
};
let HighVolumeStorageConfig::BigTable(hv) = &c.high_volume;
assert_eq!(hv.project_id, "my-project");
let StorageConfig::Gcs(lt) = c.long_term.as_ref() else {
let MultipartUploadStorageConfig::Gcs(lt) = &c.long_term else {
panic!("expected gcs long_term");
};
assert_eq!(lt.bucket, "my-objectstore-bucket");
Expand Down Expand Up @@ -800,7 +800,7 @@ mod tests {
assert_eq!(hv.project_id, "my-project");
assert_eq!(hv.instance_name, "my-instance");
assert_eq!(hv.table_name, "my-table");
let StorageConfig::FileSystem(lt) = c.long_term.as_ref() else {
let MultipartUploadStorageConfig::FileSystem(lt) = &c.long_term else {
panic!("expected filesystem long_term");
};
assert_eq!(lt.path, Path::new("/data/lt"));
Expand Down
1 change: 1 addition & 0 deletions objectstore-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ publish = false
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
base64 = "0.22"
bigtable_rs = { git = "https://github.com/getsentry/bigtable_rs.git", rev = "4cb75bc5e5f87204363973f6302107768e64972e" }
chrono = "0.4"
bytes = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions objectstore-service/src/backend/changelog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::time::Duration;
use tokio_util::task::TaskTracker;
use tokio_util::task::task_tracker::TaskTrackerToken;

use crate::backend::common::{Backend, HighVolumeBackend, TieredMetadata};
use crate::backend::common::{HighVolumeBackend, MultipartUploadBackend, TieredMetadata};
use crate::error::Result;
use crate::id::ObjectId;

Expand Down Expand Up @@ -88,7 +88,7 @@ pub struct ChangeManager {
/// The backend for small objects (≤ 1 MiB).
pub(crate) high_volume: Box<dyn HighVolumeBackend>,
/// The backend for large objects (> 1 MiB).
pub(crate) long_term: Box<dyn Backend>,
pub(crate) long_term: Box<dyn MultipartUploadBackend>,
/// Durable write-ahead log for multi-step changes.
pub(crate) changelog: Box<dyn ChangeLog>,
/// Tracks outstanding background cleanup operations for graceful shutdown.
Expand All @@ -99,7 +99,7 @@ impl ChangeManager {
/// Creates a new `ChangeManager` with the given backends and changelog.
pub fn new(
high_volume: Box<dyn HighVolumeBackend>,
long_term: Box<dyn Backend>,
long_term: Box<dyn MultipartUploadBackend>,
changelog: Box<dyn ChangeLog>,
) -> Arc<Self> {
Arc::new(Self {
Expand Down
27 changes: 26 additions & 1 deletion objectstore-service/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn from_config(config: StorageConfig) -> Result<Box<dyn common::Backen
Ok(match config {
StorageConfig::Tiered(c) => {
let hv = hv_from_config(c.high_volume).await?;
let lt = from_leaf_config(*c.long_term).await?;
let lt = lt_from_config(c.long_term).await?;
let log = Box::new(changelog::NoopChangeLog);
Box::new(tiered::TieredStorage::new(hv, lt, log))
}
Expand Down Expand Up @@ -104,3 +104,28 @@ async fn hv_from_config(
HighVolumeStorageConfig::BigTable(c) => Box::new(bigtable::BigTableBackend::new(c).await?),
})
}

/// Configuration for the long-term backend in a [`tiered::TieredStorageConfig`].
///
/// Only backends that implement [`common::MultipartUploadBackend`] are valid here.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum MultipartUploadStorageConfig {
/// Local filesystem storage backend (type `"filesystem"`).
FileSystem(local_fs::FileSystemConfig),

/// [Google Cloud Storage] backend (type `"gcs"`).
///
/// [Google Cloud Storage]: https://cloud.google.com/storage
Gcs(gcs::GcsConfig),
}

/// Constructs a type-erased [`common::MultipartUploadBackend`] from the given config.
async fn lt_from_config(
config: MultipartUploadStorageConfig,
) -> anyhow::Result<Box<dyn common::MultipartUploadBackend>> {
Ok(match config {
MultipartUploadStorageConfig::FileSystem(c) => Box::new(local_fs::LocalFsBackend::new(c)),
MultipartUploadStorageConfig::Gcs(c) => Box::new(gcs::GcsBackend::new(c).await?),
})
}
145 changes: 143 additions & 2 deletions objectstore-service/src/backend/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ use bytes::Bytes;
use objectstore_types::metadata::Metadata;

use crate::backend::common::{
Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse,
TieredGet, TieredMetadata, TieredWrite, Tombstone,
Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse,
MultipartUploadBackend, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone,
};
use crate::backend::in_memory::InMemoryBackend;
use crate::error::Result;
use crate::id::ObjectId;
use crate::multipart::{
AbortMultipartResponse, CompleteMultipartResponse, CompletedPart, InitiateMultipartResponse,
ListPartsResponse, PartNumber, UploadId, UploadPartResponse,
};
use crate::stream::ClientStream;

/// Hooks for [`TestBackend`].
Expand Down Expand Up @@ -150,6 +154,77 @@ pub trait Hooks: fmt::Debug + Send + Sync + 'static {
) -> Result<bool> {
inner.compare_and_write(id, current, write).await
}

// --- MultipartUploadBackend methods ---

/// Intercepts [`MultipartUploadBackend::initiate_multipart`]. Default delegates to `inner`.
async fn initiate_multipart(
&self,
inner: &InMemoryBackend,
id: &ObjectId,
metadata: &Metadata,
) -> Result<InitiateMultipartResponse> {
inner.initiate_multipart(id, metadata).await
}

/// Intercepts [`MultipartUploadBackend::upload_part`]. Default delegates to `inner`.
#[allow(clippy::too_many_arguments)]
async fn upload_part(
&self,
inner: &InMemoryBackend,
id: &ObjectId,
upload_id: &UploadId,
part_number: PartNumber,
content_length: u64,
content_md5: Option<&str>,
body: ClientStream,
) -> Result<UploadPartResponse> {
inner
.upload_part(
id,
upload_id,
part_number,
content_length,
content_md5,
body,
)
.await
}

/// Intercepts [`MultipartUploadBackend::list_parts`]. Default delegates to `inner`.
async fn list_parts(
&self,
inner: &InMemoryBackend,
id: &ObjectId,
upload_id: &UploadId,
max_parts: Option<u32>,
part_number_marker: Option<PartNumber>,
) -> Result<ListPartsResponse> {
inner
.list_parts(id, upload_id, max_parts, part_number_marker)
.await
}

/// Intercepts [`MultipartUploadBackend::abort_multipart`]. Default delegates to `inner`.
async fn abort_multipart(
&self,
inner: &InMemoryBackend,
id: &ObjectId,
upload_id: &UploadId,
) -> Result<AbortMultipartResponse> {
inner.abort_multipart(id, upload_id).await
}

/// Intercepts [`MultipartUploadBackend::complete_multipart`]. Default delegates to `inner`.
async fn complete_multipart(
&self,
inner: &InMemoryBackend,
id: &ObjectId,
upload_id: &UploadId,
parts: Vec<CompletedPart>,
) -> Result<CompleteMultipartResponse> {
inner.complete_multipart(id, upload_id, parts).await
}
}

/// Generic test backend that implements both [`Backend`] and [`HighVolumeBackend`].
Expand Down Expand Up @@ -258,3 +333,69 @@ impl<H: Hooks> HighVolumeBackend for TestBackend<H> {
.await
}
}

#[async_trait::async_trait]
impl<H: Hooks> MultipartUploadBackend for TestBackend<H> {
async fn initiate_multipart(
&self,
id: &ObjectId,
metadata: &Metadata,
) -> Result<InitiateMultipartResponse> {
self.hooks
.initiate_multipart(&self.inner, id, metadata)
.await
}

async fn upload_part(
&self,
id: &ObjectId,
upload_id: &UploadId,
part_number: PartNumber,
content_length: u64,
content_md5: Option<&str>,
body: ClientStream,
) -> Result<UploadPartResponse> {
self.hooks
.upload_part(
&self.inner,
id,
upload_id,
part_number,
content_length,
content_md5,
body,
)
.await
}

async fn list_parts(
&self,
id: &ObjectId,
upload_id: &UploadId,
max_parts: Option<u32>,
part_number_marker: Option<PartNumber>,
) -> Result<ListPartsResponse> {
self.hooks
.list_parts(&self.inner, id, upload_id, max_parts, part_number_marker)
.await
}

async fn abort_multipart(
&self,
id: &ObjectId,
upload_id: &UploadId,
) -> Result<AbortMultipartResponse> {
self.hooks.abort_multipart(&self.inner, id, upload_id).await
}

async fn complete_multipart(
&self,
id: &ObjectId,
upload_id: &UploadId,
parts: Vec<CompletedPart>,
) -> Result<CompleteMultipartResponse> {
self.hooks
.complete_multipart(&self.inner, id, upload_id, parts)
.await
}
}
Loading
Loading