diff --git a/Cargo.lock b/Cargo.lock index c7da1d47..03e7f7a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -292,12 +292,44 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "attohttpc" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e2cdb6d5ed835199484bb92bb8b3edd526effe995c61732580439c1a67e2e9" +dependencies = [ + "base64", + "http 1.4.0", + "log", + "native-tls", + "serde", + "serde_json", + "url", +] + [[package]] name = "autocfg" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-creds" +version = "0.39.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3b85155d265df828f84e53886ed9e427aed979dd8a39f5b8b2162c77e142d7" +dependencies = [ + "attohttpc", + "home", + "log", + "quick-xml", + "rust-ini", + "serde", + "thiserror 2.0.18", + "time", + "url", +] + [[package]] name = "aws-lc-rs" version = "1.16.2" @@ -320,6 +352,15 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "aws-region" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "838b36c8dc927b6db1b6c6b8f5d05865f2213550b9e83bf92fa99ed6525472c0" +dependencies = [ + "thiserror 2.0.18", +] + [[package]] name = "axum" version = "0.8.8" @@ -385,7 +426,7 @@ dependencies = [ "miniz_oxide", "object", "rustc-demangle", - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -564,7 +605,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -634,6 +675,26 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.17", + "once_cell", + "tiny-keccak", +] + [[package]] name = "convert_case" version = "0.10.0" @@ -708,6 +769,12 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + [[package]] name = "crypto-bigint" version = "0.5.5" @@ -885,6 +952,15 @@ dependencies = [ "syn", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "dunce" version = "1.0.5" @@ -1031,7 +1107,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1364,6 +1440,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -1464,6 +1546,15 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "hostname" version = "0.4.2" @@ -1472,7 +1563,7 @@ checksum = "617aaa3557aef3810a6369d0a99fac8a080891b68bd9f9812a1eeda0c0730cbd" dependencies = [ "cfg-if", "libc", - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -1633,7 +1724,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.2", "system-configuration", "tokio", "tower-service", @@ -1653,7 +1744,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core", + "windows-core 0.62.2", ] [[package]] @@ -2049,6 +2140,23 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "maybe-async" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "md5" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0" + [[package]] name = "mediatype" version = "0.21.0" @@ -2221,13 +2329,22 @@ dependencies = [ "libc", ] +[[package]] +name = "ntapi" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3b335231dfd352ffb0f8017f3b6027a4917f7df785ea2143d8af2adc66980ae" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2407,6 +2524,16 @@ dependencies = [ "objc2-core-foundation", ] +[[package]] +name = "objc2-io-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15" +dependencies = [ + "libc", + "objc2-core-foundation", +] + [[package]] name = "objc2-io-surface" version = "0.3.2" @@ -2579,6 +2706,7 @@ dependencies = [ "objectstore-types", "regex", "reqwest 0.12.28", + "rust-s3", "sentry", "serde", "serde_json", @@ -2679,6 +2807,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown 0.14.5", +] + [[package]] name = "os_info" version = "3.14.0" @@ -2749,7 +2887,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -3075,6 +3213,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "quick-xml" +version = "0.38.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66c2058c55a409d601666cffe35f04333cf1013010882cec174a7467cd4e21c" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quinn" version = "0.11.9" @@ -3088,7 +3236,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.5.10", + "socket2 0.6.2", "thiserror 2.0.18", "tokio", "tracing", @@ -3126,9 +3274,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.2", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -3459,6 +3607,50 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rust-ini" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "796e8d2b6696392a43bea58116b667fb4c29727dc5abd27d6acf338bb4f688c7" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + +[[package]] +name = "rust-s3" +version = "0.37.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4af74047374528b627109d579ce86b23ccf6ffba7ff363c807126c1aff69e1bb" +dependencies = [ + "async-trait", + "aws-creds", + "aws-region", + "base64", + "bytes", + "cfg-if", + "futures-util", + "hex", + "hmac", + "http 1.4.0", + "log", + "maybe-async", + "md5", + "percent-encoding", + "quick-xml", + "reqwest 0.12.28", + "serde", + "serde_derive", + "serde_json", + "sha2", + "sysinfo", + "thiserror 2.0.18", + "time", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "rustc-demangle" version = "0.1.27" @@ -3490,7 +3682,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3549,7 +3741,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3684,7 +3876,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b55fb86dfd3a2f5f76ea78310a88f96c4ea21a3031f8d212443d56123fd0521" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4141,6 +4333,20 @@ dependencies = [ "syn", ] +[[package]] +name = "sysinfo" +version = "0.37.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16607d5caffd1c07ce073528f9ed972d88db15dd44023fa57142963be3feb11f" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows", +] + [[package]] name = "system-configuration" version = "0.7.0" @@ -4178,7 +4384,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4281,6 +4487,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -4996,7 +5211,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] @@ -5005,6 +5220,41 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.61.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" +dependencies = [ + "windows-collections", + "windows-core 0.61.2", + "windows-future", + "windows-link 0.1.3", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" +dependencies = [ + "windows-core 0.61.2", +] + +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link 0.1.3", + "windows-result 0.3.4", + "windows-strings 0.4.2", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -5013,9 +5263,20 @@ checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" dependencies = [ "windows-implement", "windows-interface", - "windows-link", - "windows-result", - "windows-strings", + "windows-link 0.2.1", + "windows-result 0.4.1", + "windows-strings 0.5.1", +] + +[[package]] +name = "windows-future" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" +dependencies = [ + "windows-core 0.61.2", + "windows-link 0.1.3", + "windows-threading", ] [[package]] @@ -5040,21 +5301,46 @@ dependencies = [ "syn", ] +[[package]] +name = "windows-link" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-numerics" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" +dependencies = [ + "windows-core 0.61.2", + "windows-link 0.1.3", +] + [[package]] name = "windows-registry" version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720" dependencies = [ - "windows-link", - "windows-result", - "windows-strings", + "windows-link 0.2.1", + "windows-result 0.4.1", + "windows-strings 0.5.1", +] + +[[package]] +name = "windows-result" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" +dependencies = [ + "windows-link 0.1.3", ] [[package]] @@ -5063,7 +5349,16 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" dependencies = [ - "windows-link", + "windows-link 0.2.1", +] + +[[package]] +name = "windows-strings" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" +dependencies = [ + "windows-link 0.1.3", ] [[package]] @@ -5072,7 +5367,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" dependencies = [ - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -5126,7 +5421,7 @@ version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" dependencies = [ - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -5181,7 +5476,7 @@ version = "0.53.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" dependencies = [ - "windows-link", + "windows-link 0.2.1", "windows_aarch64_gnullvm 0.53.1", "windows_aarch64_msvc 0.53.1", "windows_i686_gnu 0.53.1", @@ -5192,6 +5487,15 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows-threading" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" +dependencies = [ + "windows-link 0.1.3", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" diff --git a/objectstore-service/Cargo.toml b/objectstore-service/Cargo.toml index eb9e0478..2a80e9e3 100644 --- a/objectstore-service/Cargo.toml +++ b/objectstore-service/Cargo.toml @@ -29,6 +29,7 @@ reqwest = { workspace = true, features = [ "multipart", "json", ] } +rust-s3 = { version = "0.37.1", default-features = false, features = ["tokio-native-tls", "fail-on-err"] } sentry = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/objectstore-service/docs/architecture.md b/objectstore-service/docs/architecture.md index 568420da..8256daa9 100644 --- a/objectstore-service/docs/architecture.md +++ b/objectstore-service/docs/architecture.md @@ -149,6 +149,11 @@ capabilities. For example, BigTable has built-in TTL via garbage collection policies, and GCS supports object lifecycle management. The service does not perform active garbage collection. +Backends without native lifecycle support — currently the +[filesystem](backend::local_fs) and [S3-compatible](backend::s3_compatible) +backends — accept and persist expiration policies on objects but require an +external cleanup job to actually remove expired data. + # Backpressure The service applies backpressure to protect backends from overload. Rather than diff --git a/objectstore-service/src/backend/mod.rs b/objectstore-service/src/backend/mod.rs index 401aad1b..d8f39413 100644 --- a/objectstore-service/src/backend/mod.rs +++ b/objectstore-service/src/backend/mod.rs @@ -75,7 +75,7 @@ async fn from_leaf_config(config: StorageConfig) -> Result Box::new(local_fs::LocalFsBackend::new(c)), StorageConfig::S3Compatible(c) => { - Box::new(s3_compatible::S3CompatibleBackend::without_token(c)) + Box::new(s3_compatible::S3CompatibleBackend::without_token(c)?) } StorageConfig::Gcs(c) => Box::new(gcs::GcsBackend::new(c).await?), StorageConfig::BigTable(c) => Box::new(bigtable::BigTableBackend::new(c).await?), diff --git a/objectstore-service/src/backend/s3_compatible.rs b/objectstore-service/src/backend/s3_compatible.rs index 51069406..2d305038 100644 --- a/objectstore-service/src/backend/s3_compatible.rs +++ b/objectstore-service/src/backend/s3_compatible.rs @@ -1,26 +1,35 @@ -//! S3-compatible backend with generic protocol support. +//! Backend that can be used with any S3-compatible object store (Amazon S3, MinIO, R2, etc.). use std::time::{Duration, SystemTime}; use std::{fmt, io}; use futures_util::{StreamExt, TryStreamExt}; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; -use reqwest::header::HeaderMap; -use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode}; - -use crate::backend::common::{ - self, Backend, DeleteResponse, GetResponse, MetadataResponse, PutResponse, -}; +use reqwest::header::{self, HeaderMap, HeaderName}; +use s3::Bucket; +use s3::command::Command; +use s3::creds::Credentials; +use s3::error::S3Error; +use s3::region::Region; +use s3::request::Request as _; +use s3::request::tokio_backend::ReqwestRequest; +use serde::{Deserialize, Serialize}; +use tokio_util::io::StreamReader; + +use crate::backend::common::{Backend, DeleteResponse, GetResponse, MetadataResponse, PutResponse}; use crate::error::{Error, Result}; use crate::id::ObjectId; use crate::stream::{self, ClientStream}; /// Configuration for [`S3CompatibleBackend`]. /// -/// Supports [Amazon S3] and other S3-compatible services. Authentication is handled via -/// environment variables (`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`) or IAM roles. +/// Supports [Amazon S3] and other S3-compatible services such as MinIO. AWS +/// credentials are resolved via the standard [SDK chain] (env vars, profile, +/// STS web identity, ECS container, EC2 IMDS); if no credentials can be +/// resolved, the backend falls back to anonymous (unauthenticated) requests. /// /// [Amazon S3]: https://aws.amazon.com/s3/ +/// [SDK chain]: https://docs.rs/aws-creds/0.39.1/aws_creds/credentials/struct.Credentials.html /// /// # Example /// @@ -29,8 +38,12 @@ use crate::stream::{self, ClientStream}; /// type: s3compatible /// endpoint: https://s3.amazonaws.com /// bucket: my-bucket +/// region: us-east-1 +/// use_path_style: false +/// metadata_prefix: x-amz-meta- +/// protocol_prefix: x-amz- /// ``` -#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct S3CompatibleConfig { /// S3 endpoint URL. /// @@ -50,17 +63,94 @@ pub struct S3CompatibleConfig { /// /// - `OS__STORAGE__BUCKET=my-bucket` pub bucket: String, + + /// Region label sent in SigV4 signatures. + /// + /// On AWS this must match the bucket's actual region. MinIO and most other + /// S3-compatible services accept any value. + /// + /// # Default + /// + /// `"us-east-1"` + /// + /// # Environment Variables + /// + /// - `OS__STORAGE__REGION=us-west-2` + #[serde(default = "default_region")] + pub region: String, + + /// Whether to use path-style addressing (`https://host/bucket/key`) + /// instead of virtual-hosted-style (`https://bucket.host/key`). + /// + /// # Default + /// + /// `true` + /// + /// # Environment Variables + /// + /// - `OS__STORAGE__USE_PATH_STYLE=false` + #[serde(default = "default_use_path_style")] + pub use_path_style: bool, + + /// Prefix used for custom object metadata on the wire. + /// + /// # Default + /// + /// `"x-amz-meta-"` + /// + /// # Environment Variables + /// + /// - `OS__STORAGE__METADATA_PREFIX=x-goog-meta-` + #[serde(default = "default_metadata_prefix")] + pub metadata_prefix: String, + + /// Protocol-level header prefix used for non-metadata request headers + /// like `{prefix}copy-source` and `{prefix}metadata-directive`. + /// + /// # Default + /// + /// `"x-amz-"` + /// + /// # Environment Variables + /// + /// - `OS__STORAGE__PROTOCOL_PREFIX=x-goog-` + #[serde(default = "default_protocol_prefix")] + pub protocol_prefix: String, + + /// Optional extra header that mirrors the object's resolved expiration + /// timestamp, on top of the canonical `x-sn-time-expires`. + /// + /// Used to interoperate with S3-compatible backends that recognize a + /// dedicated header for object lifecycle (such as GCS, which uses + /// `x-goog-custom-time`). Leave unset for plain S3. + /// + /// # Default + /// + /// `None` + /// + /// # Environment Variables + /// + /// - `OS__STORAGE__CUSTOM_TIME_HEADER=x-goog-custom-time` + #[serde(default)] + pub custom_time_header: Option, +} + +fn default_region() -> String { + "us-east-1".to_owned() +} + +fn default_use_path_style() -> bool { + true +} + +fn default_metadata_prefix() -> String { + "x-amz-meta-".to_owned() +} + +fn default_protocol_prefix() -> String { + "x-amz-".to_owned() } -/// Prefix used for custom metadata in headers for the GCS backend. -/// -/// See: -const GCS_CUSTOM_PREFIX: &str = "x-goog-meta-"; -/// Header used to store the expiration time for GCS using the `daysSinceCustomTime` lifecycle -/// condition. -/// -/// See: -const GCS_CUSTOM_TIME: &str = "x-goog-custom-time"; /// Time to debounce bumping an object with configured TTI. const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); // 1 day @@ -94,177 +184,240 @@ impl Token for NoToken { /// S3-compatible storage backend with pluggable authentication. pub struct S3CompatibleBackend { - client: reqwest::Client, - + bucket: Box, endpoint: String, - bucket: String, - token_provider: Option, + metadata_prefix: String, + protocol_prefix: String, + custom_time_header: Option, } -impl S3CompatibleBackend { - /// Creates a new S3-compatible backend bound to the given bucket. - pub fn new(endpoint: &str, bucket: &str, token_provider: T) -> Self { - Self { - client: common::reqwest_client(), - endpoint: endpoint.into(), - bucket: bucket.into(), - token_provider: Some(token_provider), - } - } - - /// Formats the S3 object URL for the given key. - fn object_url(&self, id: &ObjectId) -> String { - format!("{}/{}/{}", self.endpoint, self.bucket, id.as_storage_path()) - } -} - -/// Wraps [`Metadata::to_headers`] with GCS-specific concerns (tombstone + custom-time). -fn metadata_to_gcs_headers( +/// Wraps [`Metadata::to_headers`], additionally echoing `time_expires` under +/// `custom_time_header` when both are set. +fn metadata_to_s3_headers( metadata: &Metadata, prefix: &str, + custom_time_header: Option<&str>, ) -> Result { let mut headers = metadata.to_headers(prefix)?; - // GCS custom-time for lifecycle expiration - if let Some(expires_in) = metadata.expiration_policy.expires_in() { - let expires_at = - humantime::format_rfc3339_seconds(std::time::SystemTime::now() + expires_in); - headers.append(GCS_CUSTOM_TIME, expires_at.to_string().parse()?); + if let (Some(name), Some(time)) = (custom_time_header, metadata.time_expires) { + let formatted = humantime::format_rfc3339_seconds(time); + let name = HeaderName::try_from(name)?; + headers.append(name, formatted.to_string().parse()?); } Ok(headers) } -impl S3CompatibleBackend -where - T: TokenProvider, -{ - /// Creates a request builder with the appropriate authentication. - async fn request(&self, method: Method, url: impl IntoUrl) -> Result { - let mut builder = self.client.request(method, url); - if let Some(provider) = &self.token_provider { - builder = builder.bearer_auth( - provider - .get_token() - .await - .map_err(|err| Error::Generic { - context: "S3: failed to get authentication token".to_owned(), - cause: Some(err.into()), - })? - .as_str(), - ); - } - Ok(builder) +fn metadata_from_headers(headers: &HeaderMap, prefix: &str) -> Result { + let mut metadata = Metadata::from_headers(headers, prefix)?; + metadata.size = headers + .get(header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()?.parse::().ok()); + Ok(metadata) +} + +impl S3CompatibleBackend { + /// Creates a new S3-compatible backend bound to the given config and token provider. + pub fn new(config: S3CompatibleConfig, token_provider: T) -> anyhow::Result { + let mut this = Self::from_config(config)?; + this.token_provider = Some(token_provider); + Ok(this) } - /// Fetches object metadata using the given HTTP method (GET or HEAD), - /// bumps TTI if needed, and returns the parsed metadata along with the - /// response (so `get_object` can read the body from a GET). - async fn request_object( - &self, - method: Method, - id: &ObjectId, - ) -> Result> { - let object_url = self.object_url(id); + fn from_config(config: S3CompatibleConfig) -> anyhow::Result { + let credentials = Credentials::default().or_else(|_| Credentials::anonymous())?; - let response = self - .request(method, &object_url) - .await? - .send() - .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send request".to_string(), - cause, - })?; + let region = Region::Custom { + region: config.region, + endpoint: config.endpoint.clone(), + }; - if response.status() == StatusCode::NOT_FOUND { - objectstore_log::debug!("Object not found"); - return Ok(None); + let mut bucket = Bucket::new(&config.bucket, region, credentials)?; + if config.use_path_style { + bucket.set_path_style(); } - let response = response - .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to get object".to_string(), - cause, - })?; + Ok(Self { + bucket, + endpoint: config.endpoint, + metadata_prefix: config.metadata_prefix, + protocol_prefix: config.protocol_prefix, + custom_time_header: config.custom_time_header, + token_provider: None, + }) + } + + fn object_path(&self, id: &ObjectId) -> String { + format!("/{}", id.as_storage_path()) + } - let headers = response.headers(); - let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX)?; - metadata.size = response.content_length().map(|len| len as usize); + /// If `metadata` has a [`TimeToIdle`] policy, bumps the recorded expiry + /// via a metadata-only copy-in-place when it would otherwise fall outside + /// the 1-day debounce window. + /// + /// [`TimeToIdle`]: objectstore_types::metadata::ExpirationPolicy::TimeToIdle + async fn bump_tti_if_needed(&self, path: &str, metadata: &Metadata) -> Result<()> { + let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy else { + return Ok(()); + }; + + // TODO: Inject the access time from the request. + let access_time = SystemTime::now(); + + let current_expiry = metadata.time_expires.unwrap_or(access_time); + let new_expiry = access_time + tti; + + let bump_amount = new_expiry + .duration_since(current_expiry) + .unwrap_or_default(); - // TODO: Schedule into background persistently so this doesn't get lost on restarts - if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy { - // TODO: Inject the access time from the request. - let access_time = SystemTime::now(); + if bump_amount > TTI_DEBOUNCE { + // TODO: Schedule into background persistently so this doesn't get lost on restarts. + self.update_custom_time(path, metadata, new_expiry).await?; + } + + Ok(()) + } - let expire_at = headers - .get(GCS_CUSTOM_TIME) - .and_then(|s| s.to_str().ok()) - .and_then(|s| humantime::parse_rfc3339(s).ok()) - .unwrap_or(access_time); + /// Rewrites the object's metadata in place via a copy-with-REPLACE + /// request, setting `time_expires` (and the configured + /// [`custom_time_header`], if any) to `custom_time`. Used to bump the + /// expiration time for TTI objects. + /// + /// [`custom_time_header`]: S3CompatibleConfig::custom_time_header + async fn update_custom_time( + &self, + path: &str, + metadata: &Metadata, + custom_time: SystemTime, + ) -> Result<()> { + let copy_source_header = format!("{}copy-source", self.protocol_prefix); + let metadata_directive_header = format!("{}metadata-directive", self.protocol_prefix); + let copy_source = format!("/{}{}", self.bucket.name(), path); + + let mut request = self + .bucket + .put_object_builder(path, &[]) + .with_content_type(metadata.content_type.as_ref()) + .with_header(©_source_header, copy_source) + .map_err(|e| map_s3_error(e, "S3: failed to set copy-source header"))? + .with_header(&metadata_directive_header, "REPLACE") + .map_err(|e| map_s3_error(e, "S3: failed to set metadata-directive header"))?; + + if let Some(compression) = metadata.compression { + request = request + .with_content_encoding(compression.as_str()) + .map_err(|e| map_s3_error(e, "S3: failed to set content-encoding"))?; + } - if expire_at < access_time + tti - TTI_DEBOUNCE { - self.update_metadata(id, &metadata).await?; + let mut metadata = metadata.clone(); + metadata.time_expires = Some(custom_time); + let headers = metadata_to_s3_headers( + &metadata, + &self.metadata_prefix, + self.custom_time_header.as_deref(), + ) + .map_err(Error::Metadata)?; + for (name, value) in &headers { + if name == header::CONTENT_TYPE || name == header::CONTENT_ENCODING { + continue; } + let value_str = value.to_str().map_err(|e| Error::Generic { + context: format!("S3: non-ascii metadata header value for {name}"), + cause: Some(Box::new(e)), + })?; + request = request + .with_header(name.as_str(), value_str) + .map_err(|e| map_s3_error(e, "S3: failed to set object metadata"))?; } - Ok(Some((metadata, response))) - } - - /// Issues a request to update the metadata for the given object. - async fn update_metadata(&self, id: &ObjectId, metadata: &Metadata) -> Result<()> { - // NB: Meta updates require copy + REPLACE along with *all* metadata. See - // https://cloud.google.com/storage/docs/xml-api/put-object-copy - self.request(Method::PUT, self.object_url(id)) - .await? - .header( - "x-goog-copy-source", - format!("/{}/{}", self.bucket, id.as_storage_path()), - ) - .header("x-goog-metadata-directive", "REPLACE") - .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?) - .send() + request + .execute() .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send TTI update request".to_string(), - cause, - })? - .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to update expiration time for object with TTI".to_string(), - cause, - })?; + .map_err(|e| map_s3_error(e, "S3: failed to update custom_time"))?; Ok(()) } + + /// HEADs an object, filters expired entries, and bumps TTI. + /// + /// Returns `None` if the object is absent or past its expiry. + async fn fetch_live_metadata(&self, path: &str) -> Result> { + let Some(headers) = self.head_object(path).await? else { + return Ok(None); + }; + let metadata = metadata_from_headers(&headers, &self.metadata_prefix)?; + + // Filter already expired objects but leave them to garbage collection. + if metadata.expiration_policy.is_timeout() + && metadata + .time_expires + .is_some_and(|ts| ts < SystemTime::now()) + { + objectstore_log::debug!("Object found but past expiry"); + return Ok(None); + } + + self.bump_tti_if_needed(path, &metadata).await?; + Ok(Some(metadata)) + } + + /// Issues a HEAD request and returns the raw response headers. + /// + /// `Bucket::head_object` is not sufficient because it only surfaces + /// `x-amz-meta-*` keys, which could be different from `self.metadata_prefix`. + async fn head_object(&self, path: &str) -> Result> { + let request = ReqwestRequest::new(&self.bucket, path, Command::HeadObject) + .await + .map_err(|e| map_s3_error(e, "S3: failed to build head request"))?; + + match request.response_header().await { + Ok((headers, _)) => Ok(Some(headers)), + Err(ref e) if is_not_found(e) => { + objectstore_log::debug!("Object not found"); + Ok(None) + } + Err(e) => Err(map_s3_error(e, "S3: failed to head object")), + } + } +} + +impl S3CompatibleBackend { + /// Creates a new S3-compatible backend that sends unauthenticated requests. + pub fn without_token(config: S3CompatibleConfig) -> anyhow::Result { + Self::from_config(config) + } } impl fmt::Debug for S3CompatibleBackend { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("S3Compatible") - .field("client", &self.client) - .field("endpoint", &self.endpoint) + f.debug_struct("S3CompatibleBackend") .field("bucket", &self.bucket) + .field("endpoint", &self.endpoint) + .field("metadata_prefix", &self.metadata_prefix) + .field("protocol_prefix", &self.protocol_prefix) + .field("custom_time_header", &self.custom_time_header) .finish_non_exhaustive() } } -impl S3CompatibleBackend { - /// Creates a new S3-compatible backend that sends unauthenticated requests. - pub fn without_token(config: S3CompatibleConfig) -> Self { - Self { - client: common::reqwest_client(), - endpoint: config.endpoint, - bucket: config.bucket, - token_provider: None, - } +/// Maps an [`S3Error`] to our [`Error`] type with the given context. +fn map_s3_error(error: S3Error, context: &str) -> Error { + Error::Generic { + context: context.to_owned(), + cause: Some(Box::new(error)), } } +/// Returns `true` if `error` is an HTTP 404 from rust-s3. +fn is_not_found(error: &S3Error) -> bool { + matches!(error, S3Error::HttpFailWithBody(404, _)) +} + #[async_trait::async_trait] impl Backend for S3CompatibleBackend { fn name(&self) -> &'static str { - "s3-compatible" + "s3_compatible" } #[tracing::instrument(level = "trace", fields(?id), skip_all)] @@ -275,19 +428,52 @@ impl Backend for S3CompatibleBackend { stream: ClientStream, ) -> Result { objectstore_log::debug!("Writing to s3_compatible backend"); - self.request(Method::PUT, self.object_url(id)) - .await? - .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?) - .body(Body::wrap_stream(stream)) - .send() + let path = self.object_path(id); + + let mut request = self + .bucket + .put_object_stream_builder(&path) + .with_content_type(metadata.content_type.as_ref()); + + if let Some(compression) = metadata.compression { + request = request + .with_content_encoding(compression.as_str()) + .map_err(|e| map_s3_error(e, "S3: failed to set content-encoding"))?; + } + + let mut metadata = metadata.clone(); + if let Some(d) = metadata.expiration_policy.expires_in() { + metadata.time_expires = Some(SystemTime::now() + d); + } + let headers = metadata_to_s3_headers( + &metadata, + &self.metadata_prefix, + self.custom_time_header.as_deref(), + ) + .map_err(Error::Metadata)?; + for (name, value) in &headers { + if name == header::CONTENT_TYPE || name == header::CONTENT_ENCODING { + continue; + } + let value_str = value.to_str().map_err(|e| Error::Generic { + context: format!("S3: non-ascii metadata header value for {name}"), + cause: Some(Box::new(e)), + })?; + request = request + .with_header(name, value_str) + .map_err(|e| map_s3_error(e, "S3: failed to set object metadata"))?; + } + + let mut reader = StreamReader::new(stream.map_err(io::Error::other)); + request + .execute_stream(&mut reader) .await - .and_then(|response| response.error_for_status()) - .map_err(|cause| match stream::unpack_client_error(&cause) { - Some(ce) => Error::Client(ce), - _ => Error::Reqwest { - context: "S3: failed to put object".to_string(), - cause, + .map_err(|cause| match &cause { + S3Error::Io(io_err) => match stream::unpack_client_error(io_err) { + Some(ce) => Error::Client(ce), + None => map_s3_error(cause, "S3: failed to put object"), }, + _ => map_s3_error(cause, "S3: failed to put object"), })?; Ok(()) @@ -296,58 +482,62 @@ impl Backend for S3CompatibleBackend { #[tracing::instrument(level = "trace", fields(?id), skip_all)] async fn get_object(&self, id: &ObjectId) -> Result { objectstore_log::debug!("Reading from s3_compatible backend"); + let path = self.object_path(id); - let Some((metadata, response)) = self.request_object(Method::GET, id).await? else { + let Some(metadata) = self.fetch_live_metadata(&path).await? else { return Ok(None); }; - let stream = response.bytes_stream().map_err(io::Error::other); - Ok(Some((metadata, stream.boxed()))) + let response = match self.bucket.get_object_stream(&path).await { + Ok(response) => response, + Err(ref e) if is_not_found(e) => { + // Object was deleted between HEAD and GET; treat as missing. + objectstore_log::debug!("Object disappeared between head and get"); + return Ok(None); + } + Err(e) => return Err(map_s3_error(e, "S3: failed to get object")), + }; + + let stream = response.bytes.map_err(io::Error::other).boxed(); + Ok(Some((metadata, stream))) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] async fn get_metadata(&self, id: &ObjectId) -> Result { objectstore_log::debug!("Reading metadata from s3_compatible backend"); - let response = self.request_object(Method::HEAD, id).await?; - Ok(response.map(|(metadata, _)| metadata)) + let path = self.object_path(id); + self.fetch_live_metadata(&path).await } #[tracing::instrument(level = "trace", fields(?id), skip_all)] async fn delete_object(&self, id: &ObjectId) -> Result { objectstore_log::debug!("Deleting from s3_compatible backend"); - let response = self - .request(Method::DELETE, self.object_url(id)) - .await? - .send() - .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send delete request".to_string(), - cause, - })?; + let path = self.object_path(id); - // Do not error for objects that do not exist. - if response.status() != StatusCode::NOT_FOUND { - objectstore_log::debug!("Object not found"); - response - .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to delete object".to_string(), - cause, - })?; + match self.bucket.delete_object(&path).await { + Ok(_) => Ok(()), + Err(ref e) if is_not_found(e) => { + objectstore_log::debug!("Object not found"); + Ok(()) + } + Err(e) => Err(map_s3_error(e, "S3: failed to delete object")), } - - Ok(()) } } #[cfg(test)] mod tests { + use std::collections::BTreeMap; + use std::time::SystemTime; + use anyhow::Result; + use objectstore_types::metadata::ExpirationPolicy; use objectstore_types::scope::{Scope, Scopes}; use super::*; use crate::backend::common::Backend; use crate::id::ObjectContext; + use crate::stream; // NB: To run these tests, you need to have a MinIO server running. This is done // automatically in CI. @@ -358,7 +548,13 @@ mod tests { S3CompatibleBackend::without_token(S3CompatibleConfig { endpoint: "http://localhost:8089".into(), bucket: "test-bucket".into(), + region: default_region(), + use_path_style: default_use_path_style(), + metadata_prefix: default_metadata_prefix(), + protocol_prefix: default_protocol_prefix(), + custom_time_header: None, }) + .unwrap() } fn make_id() -> ObjectId { @@ -368,12 +564,326 @@ mod tests { }) } + #[tokio::test] + async fn test_roundtrip() -> Result<()> { + let backend = create_test_backend(); + + let id = make_id(); + let metadata = Metadata { + content_type: "text/plain".into(), + expiration_policy: ExpirationPolicy::Manual, + compression: None, + origin: Some("203.0.113.42".into()), + custom: BTreeMap::from_iter([("hello".into(), "world".into())]), + time_created: Some(SystemTime::now()), + time_expires: None, + size: None, + }; + + backend + .put_object(&id, &metadata, stream::single("hello, world")) + .await?; + + let (meta, stream) = backend.get_object(&id).await?.unwrap(); + + let payload = stream::read_to_vec(stream).await?; + let str_payload = str::from_utf8(&payload).unwrap(); + assert_eq!(str_payload, "hello, world"); + assert_eq!(meta.content_type, metadata.content_type); + assert_eq!(meta.origin, metadata.origin); + assert_eq!(meta.custom, metadata.custom); + assert!(metadata.time_created.is_some()); + + Ok(()) + } + + #[tokio::test] + async fn test_get_nonexistent() -> Result<()> { + let backend = create_test_backend(); + + let id = make_id(); + let result = backend.get_object(&id).await?; + assert!(result.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_delete_nonexistent() -> Result<()> { + let backend = create_test_backend(); + + let id = make_id(); + backend.delete_object(&id).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_overwrite() -> Result<()> { + let backend = create_test_backend(); + + let id = make_id(); + let metadata = Metadata { + custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]), + ..Default::default() + }; + + backend + .put_object(&id, &metadata, stream::single("hello")) + .await?; + + let metadata = Metadata { + custom: BTreeMap::from_iter([("hello".into(), "world".into())]), + ..Default::default() + }; + + backend + .put_object(&id, &metadata, stream::single("world")) + .await?; + + let (meta, stream) = backend.get_object(&id).await?.unwrap(); + + let payload = stream::read_to_vec(stream).await?; + let str_payload = str::from_utf8(&payload).unwrap(); + assert_eq!(str_payload, "world"); + assert_eq!(meta.custom, metadata.custom); + + Ok(()) + } + + #[tokio::test] + async fn test_streaming_upload() -> Result<()> { + let backend = create_test_backend(); + + let id = make_id(); + let metadata = Metadata::default(); + + // rust-s3 dispatches to multipart uploads when the stream exceeds the + // 8 MiB chunk size. Use 20 MiB to ensure multiple parts are uploaded. + let chunk = bytes::Bytes::from(vec![0xab; 1024 * 1024]); // 1 MiB + let stream = + futures_util::stream::iter(std::iter::repeat_with(move || Ok(chunk.clone())).take(20)) + .boxed(); + + backend.put_object(&id, &metadata, stream).await?; + + let (meta, stream) = backend.get_object(&id).await?.unwrap(); + let payload = stream::read_to_vec(stream).await?; + + assert_eq!(payload.len(), 20 * 1024 * 1024); + assert!(payload.iter().all(|&b| b == 0xab)); + assert_eq!(meta.size, Some(20 * 1024 * 1024)); + + Ok(()) + } + + #[tokio::test] + async fn test_read_after_delete() -> Result<()> { + let backend = create_test_backend(); + + let id = make_id(); + let metadata = Metadata::default(); + + backend + .put_object(&id, &metadata, stream::single("hello, world")) + .await?; + + backend.delete_object(&id).await?; + + let result = backend.get_object(&id).await?; + assert!(result.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_ttl_immediate() -> Result<()> { + // NB: We create a TTL that immediately expires in this tests. This might be optimized away + // in a future implementation, so we will have to update this test accordingly. + + let backend = create_test_backend(); + + let id = make_id(); + let metadata = Metadata { + expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)), + ..Default::default() + }; + + backend + .put_object(&id, &metadata, stream::single("hello, world")) + .await?; + + let result = backend.get_object(&id).await?; + assert!(result.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_tti_immediate() -> Result<()> { + // NB: We create a TTI that immediately expires in this tests. This might be optimized away + // in a future implementation, so we will have to update this test accordingly. + + let backend = create_test_backend(); + + let id = make_id(); + let metadata = Metadata { + expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)), + ..Default::default() + }; + + backend + .put_object(&id, &metadata, stream::single("hello, world")) + .await?; + + let result = backend.get_object(&id).await?; + assert!(result.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_get_metadata_returns_metadata() -> Result<()> { + let backend = create_test_backend(); + + let id = make_id(); + let metadata = Metadata { + content_type: "text/plain".into(), + origin: Some("203.0.113.42".into()), + custom: BTreeMap::from_iter([("hello".into(), "world".into())]), + ..Default::default() + }; + + backend + .put_object(&id, &metadata, stream::single("hello, world")) + .await?; + + let meta = backend.get_metadata(&id).await?.unwrap(); + assert_eq!(meta.content_type, metadata.content_type); + assert_eq!(meta.origin, metadata.origin); + assert_eq!(meta.custom, metadata.custom); + + Ok(()) + } + #[tokio::test] async fn test_get_metadata_nonexistent() -> Result<()> { let backend = create_test_backend(); + let id = make_id(); let result = backend.get_metadata(&id).await?; assert!(result.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_get_metadata_bumps_tti() -> Result<()> { + let backend = create_test_backend(); + + let id = make_id(); + // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable. + let tti = Duration::from_secs(2 * 24 * 3600); // 2 days + let metadata = Metadata { + content_type: "text/plain".into(), + expiration_policy: ExpirationPolicy::TimeToIdle(tti), + ..Default::default() + }; + + backend + .put_object(&id, &metadata, stream::single("hello, world")) + .await?; + + // Manually set custom_time to just inside the bump window. + // The bump condition is: expire_at < now + tti - TTI_DEBOUNCE. + let path = backend.object_path(&id); + let old_deadline = SystemTime::now() + tti - TTI_DEBOUNCE - Duration::from_secs(60); + backend + .update_custom_time(&path, &metadata, old_deadline) + .await?; + + // First get_metadata sees the old timestamp and triggers a TTI bump. + let pre_meta = backend.get_metadata(&id).await?.unwrap(); + let pre_expiry = pre_meta.time_expires.unwrap(); + + // Second get_metadata sees the bumped timestamp. + let post_meta = backend.get_metadata(&id).await?.unwrap(); + let post_expiry = post_meta.time_expires.unwrap(); + assert!( + post_expiry > pre_expiry, + "TTI bump should have extended the expiry: {pre_expiry:?} -> {post_expiry:?}" + ); + + // Verify the payload is still intact after the bump. + let (_, stream) = backend.get_object(&id).await?.unwrap(); + let payload = stream::read_to_vec(stream).await?; + assert_eq!(&payload, b"hello, world"); + + Ok(()) + } + + #[tokio::test] + async fn test_get_metadata_does_not_bump_fresh_tti() -> Result<()> { + let backend = create_test_backend(); + + let id = make_id(); + // TTI must exceed TTI_DEBOUNCE (1 day) for the bump condition to be reachable. + let tti = Duration::from_secs(2 * 24 * 3600); // 2 days + let metadata = Metadata { + content_type: "text/plain".into(), + expiration_policy: ExpirationPolicy::TimeToIdle(tti), + ..Default::default() + }; + + backend + .put_object(&id, &metadata, stream::single("hello, world")) + .await?; + + // A freshly written object has time_expires ≈ now + 2d, which is well outside + // the bump window (now + 2d - 1d = now + 1d). No bump should occur. + let first = backend.get_metadata(&id).await?.unwrap(); + let first_expiry = first.time_expires.unwrap(); + + let second = backend.get_metadata(&id).await?.unwrap(); + let second_expiry = second.time_expires.unwrap(); + + assert_eq!( + first_expiry, second_expiry, + "Fresh TTI object should not have its expiry bumped" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_compressed_payload_roundtrip() -> Result<()> { + use objectstore_types::metadata::Compression; + + let backend = create_test_backend(); + + let plaintext = b"hello, world (but compressed with zstd)"; + let compressed = zstd::encode_all(&plaintext[..], 3)?; + + let id = make_id(); + let metadata = Metadata { + content_type: "text/plain".into(), + compression: Some(Compression::Zstd), + ..Default::default() + }; + + backend + .put_object(&id, &metadata, stream::single(compressed.clone())) + .await?; + + let (meta, stream) = backend.get_object(&id).await?.unwrap(); + let payload = stream::read_to_vec(stream).await?; + + assert_eq!(meta.compression, Some(Compression::Zstd)); + assert_eq!( + payload, compressed, + "Payload should be returned still compressed, not auto-decompressed" + ); + Ok(()) } }