Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion clients/python/src/objectstore_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,14 @@ def put(
for k, v in metadata.items():
headers[f"{HEADER_META_PREFIX}{k}"] = v

if key == "":
key = None

with measure_storage_operation(
self._metrics_backend, "put", self._usecase.name
) as metric_emitter:
response = self._pool.request(
"PUT",
"POST" if not key else "PUT",
self._make_url(key),
body=body,
headers=headers,
Expand Down
13 changes: 8 additions & 5 deletions clients/python/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,18 @@ def test_full_cycle(server_url: str) -> None:

session = client.session(test_usecase, org=42, project=1337)

data = b"test data"

object_key = session.put(data)
object_key = session.put(b"test data")
assert object_key is not None

retrieved = session.get(object_key)
assert retrieved.payload.read() == data
assert retrieved.payload.read() == b"test data"
assert retrieved.metadata.time_created is not None

new_key = session.put(b"new data", key=object_key)
assert new_key == object_key
retrieved = session.get(object_key)
assert retrieved.payload.read() == b"new data"

session.delete(object_key)

with pytest.raises(RequestError, check=lambda e: e.status == 404):
Expand Down Expand Up @@ -168,4 +171,4 @@ def test_connect_timeout() -> None:
)

with pytest.raises(urllib3.exceptions.MaxRetryError):
session.put(b"foo")
session.get("foo")
14 changes: 9 additions & 5 deletions clients/rust/src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl PutBuilder {
/// If a key is specified, the object will be stored under that key. Otherwise, the Objectstore
/// server will automatically assign a random key, which is then returned from this request.
pub fn key(mut self, key: impl Into<String>) -> Self {
self.key = Some(key.into());
self.key = Some(key.into()).filter(|k| !k.is_empty());
self
}

Expand Down Expand Up @@ -138,10 +138,14 @@ impl PutBuilder {
impl PutBuilder {
/// Sends the built PUT request to the upstream service.
pub async fn send(self) -> crate::Result<PutResponse> {
let mut builder = self.session.request(
reqwest::Method::PUT,
self.key.as_deref().unwrap_or_default(),
);
let method = match self.key {
Some(_) => reqwest::Method::PUT,
None => reqwest::Method::POST,
};

let mut builder = self
.session
.request(method, self.key.as_deref().unwrap_or_default());

let body = match (self.metadata.compression, self.body) {
(Some(Compression::Zstd), PutBody::Buffer(bytes)) => {
Expand Down
24 changes: 24 additions & 0 deletions clients/rust/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,30 @@ async fn stores_under_given_key() {
assert_eq!(stored_id, "test-key123!!");
}

#[tokio::test]
async fn overwrites_existing_key() {
let server = TestServer::new().await;

let client = ClientBuilder::new(server.url("/")).build().unwrap();
let usecase = Usecase::new("usecase");
let session = client.session(usecase.for_project(12345, 1337)).unwrap();

let stored_id = session.put("initial body").send().await.unwrap().key;
let overwritten_id = session
.put("new body")
.key(&stored_id)
.send()
.await
.unwrap()
.key;

assert_eq!(stored_id, overwritten_id);

let response = session.get(&stored_id).send().await.unwrap().unwrap();
let payload = response.payload().await.unwrap();
assert_eq!(payload, "new body");
}

#[derive(Debug)]
pub struct TestServer {
handle: tokio::task::JoinHandle<()>,
Expand Down
40 changes: 28 additions & 12 deletions objectstore-server/src/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::time::SystemTime;
use anyhow::Context;
use axum::body::Body;
use axum::extract::{Path, State};
use axum::http::{HeaderMap, StatusCode};
use axum::http::{HeaderMap, Method, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, put};
use axum::routing;
use axum::{Json, Router};
use futures_util::{StreamExt, TryStreamExt};
use objectstore_service::ObjectPath;
use objectstore_service::{ObjectPath, OptionalObjectPath};
use objectstore_types::Metadata;
use serde::Serialize;

Expand All @@ -21,11 +21,14 @@ use crate::state::ServiceState;
pub fn routes() -> Router<ServiceState> {
let service_routes = Router::new().route(
"/{*path}",
put(put_object).get(get_object).delete(delete_object),
routing::post(insert_object)
.put(insert_object)
.get(get_object)
.delete(delete_object),
);

Router::new()
.route("/health", get(health))
.route("/health", routing::get(health))
.nest("/v1/", service_routes)
}

Expand All @@ -38,24 +41,37 @@ struct PutBlobResponse {
key: String,
}

async fn put_object(
async fn insert_object(
State(state): State<ServiceState>,
Path(path): Path<ObjectPath>,
Path(path): Path<OptionalObjectPath>,
method: Method,
headers: HeaderMap,
body: Body,
) -> ApiResult<impl IntoResponse> {
) -> ApiResult<Response> {
let (expected_method, response_status) = match path.key {
Some(_) => (Method::PUT, StatusCode::OK),
None => (Method::POST, StatusCode::CREATED),
};

// TODO: For now allow PUT everywhere. Remove the second condition when all clients are updated.
if method != expected_method && method == Method::POST {
return Ok(StatusCode::METHOD_NOT_ALLOWED.into_response());
}

let path = path.create_key();
populate_sentry_scope(&path);

let mut metadata =
Metadata::from_headers(&headers, "").context("extracting metadata from headers")?;
metadata.time_created = Some(SystemTime::now());

let stream = body.into_data_stream().map_err(io::Error::other).boxed();
let key = state.service.put_object(path, &metadata, stream).await?;
let response_path = state.service.put_object(path, &metadata, stream).await?;
let response = Json(PutBlobResponse {
key: response_path.key.to_string(),
});

Ok(Json(PutBlobResponse {
key: key.key.to_string(),
}))
Ok((response_status, response).into_response())
}

async fn get_object(
Expand Down
113 changes: 90 additions & 23 deletions objectstore-service/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,66 @@ use serde::de;
/// Magic URL segment that separates objectstore context from an object's user-provided key.
const PATH_CONTEXT_SEPARATOR: &str = "objects";

/// The fully scoped path of an object.
///
/// This consists of a usecase, the scope, and the user-defined object key.
/// An [`ObjectPath`] that may or may not have a user-provided key.
// DO NOT derive Eq, see the implementation of PartialEq below.
#[derive(Debug, Clone)]
pub struct ObjectPath {
pub struct OptionalObjectPath {
/// The usecase, or "product" this object belongs to.
///
/// This can be defined on-the-fly by the client, but special server logic
/// (such as the concrete backend/bucket) can be tied to this as well.
pub usecase: String,

/// The scope of the object, used for compartmentalization.
///
/// This is treated as a prefix, and includes such things as the organization and project.
pub scope: Vec<String>,
/// The optional, user-provided key.
pub key: Option<String>,
}

/// This is a user-defined key, which uniquely identifies the object within its usecase/scope.
pub key: String,
impl OptionalObjectPath {
/// Converts to an [`ObjectPath`], generating a unique `key` if none was provided.
pub fn create_key(self) -> ObjectPath {
ObjectPath {
usecase: self.usecase,
scope: self.scope,
key: self.key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
}
}

/// Converts to an [`ObjectPath`], returning an error if no `key` was provided.
pub fn require_key(self) -> Result<ObjectPath, &'static str> {
Ok(ObjectPath {
usecase: self.usecase,
scope: self.scope,
key: self
.key
.ok_or("object key is required but was not provided")?,
})
}
}

impl Display for ObjectPath {
impl Display for OptionalObjectPath {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/", self.usecase)?;
for scope in &self.scope {
write!(f, "{}/", scope)?;
}
write!(f, "{PATH_CONTEXT_SEPARATOR}/{}", self.key)
write!(f, "{PATH_CONTEXT_SEPARATOR}/")?;
if let Some(ref key) = self.key {
f.write_str(key)?;
}
Ok(())
}
}

impl PartialEq for OptionalObjectPath {
fn eq(&self, other: &Self) -> bool {
self.usecase == other.usecase
&& self.scope == other.scope
&& self.key == other.key
&& self.key.is_some() // Two paths without keys are considered unequal!
}
}

struct ObjectPathVisitor;
impl<'de> serde::de::Visitor<'de> for ObjectPathVisitor {
type Value = ObjectPath;
struct OptionalObjectPathVisitor;
impl<'de> serde::de::Visitor<'de> for OptionalObjectPathVisitor {
type Value = OptionalObjectPath;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(
Expand Down Expand Up @@ -86,25 +113,65 @@ impl<'de> serde::de::Visitor<'de> for ObjectPathVisitor {
}

// The rest of the path is a user-provided key.
// If no key is provided, generate a UUIDv4 by default.
let key = match iter.peek() {
None => uuid::Uuid::new_v4().to_string(),
Some(_) => iter.collect(),
let key = if iter.peek().is_some() {
Some(iter.collect())
} else {
None
};

Ok(ObjectPath {
Ok(OptionalObjectPath {
usecase,
scope,
key,
})
}
}

impl<'de> de::Deserialize<'de> for OptionalObjectPath {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: de::Deserializer<'de>,
{
deserializer.deserialize_str(OptionalObjectPathVisitor)
}
}

/// The fully scoped path of an object.
///
/// This consists of a usecase, the scope, and the object key.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ObjectPath {
/// The usecase, or "product" this object belongs to.
///
/// This can be defined on-the-fly by the client, but special server logic
/// (such as the concrete backend/bucket) can be tied to this as well.
pub usecase: String,

/// The scope of the object, used for compartmentalization.
///
/// This is treated as a prefix, and includes such things as the organization and project.
pub scope: Vec<String>,

/// This key uniquely identifies the object within its usecase/scope.
pub key: String,
}

impl Display for ObjectPath {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/", self.usecase)?;
for scope in &self.scope {
write!(f, "{}/", scope)?;
}
write!(f, "{PATH_CONTEXT_SEPARATOR}/{}", self.key)
}
}

impl<'de> de::Deserialize<'de> for ObjectPath {
fn deserialize<D>(deserializer: D) -> Result<ObjectPath, D::Error>
where
D: de::Deserializer<'de>,
{
deserializer.deserialize_str(ObjectPathVisitor)
let optional_path = OptionalObjectPath::deserialize(deserializer)?;
optional_path.require_key().map_err(de::Error::custom)
}
}
Loading