From 0ba8b0eb41481c39bdd3b5c84d86b4ee33a29cf0 Mon Sep 17 00:00:00 2001 From: Ben Young Date: Mon, 6 Nov 2023 09:21:01 -0600 Subject: [PATCH] Add async inserts for kv store (#329) --- lib/compute-at-edge-abi/compute-at-edge.witx | 13 ++++ lib/compute-at-edge-abi/typenames.witx | 4 +- lib/src/error.rs | 4 ++ lib/src/session.rs | 74 ++++++++++++++++++-- lib/src/session/async_item.rs | 35 +++++++-- lib/src/wiggle_abi.rs | 2 +- lib/src/wiggle_abi/obj_store_impl.rs | 28 +++++++- 7 files changed, 145 insertions(+), 15 deletions(-) diff --git a/lib/compute-at-edge-abi/compute-at-edge.witx b/lib/compute-at-edge-abi/compute-at-edge.witx index ff58cb15..05409d55 100644 --- a/lib/compute-at-edge-abi/compute-at-edge.witx +++ b/lib/compute-at-edge-abi/compute-at-edge.witx @@ -661,6 +661,19 @@ (param $body_handle $body_handle) (result $err (expected (error $fastly_status))) ) + + (@interface func (export "insert_async") + (param $store $object_store_handle) + (param $key string) + (param $body_handle $body_handle) + (param $pending_handle_out (@witx pointer $pending_kv_insert_handle)) + (result $err (expected (error $fastly_status))) + ) + + (@interface func (export "pending_insert_wait") + (param $pending_objstr_handle $pending_kv_insert_handle) + (result $err (expected (error $fastly_status))) + ) ) (module $fastly_secret_store diff --git a/lib/compute-at-edge-abi/typenames.witx b/lib/compute-at-edge-abi/typenames.witx index 1f69fcae..efecd946 100644 --- a/lib/compute-at-edge-abi/typenames.witx +++ b/lib/compute-at-edge-abi/typenames.witx @@ -93,8 +93,10 @@ (typename $dictionary_handle (handle)) ;;; A handle to an Object Store. (typename $object_store_handle (handle)) -;;; A handle to a pending KV request. +;;; A handle to a pending KV lookup request. (typename $pending_kv_lookup_handle (handle)) +;;; A handle to a pending KV insert request. +(typename $pending_kv_insert_handle (handle)) ;;; A handle to a Secret Store. (typename $secret_store_handle (handle)) ;;; A handle to an individual secret. diff --git a/lib/src/error.rs b/lib/src/error.rs index 0a2a1ec1..9d726149 100644 --- a/lib/src/error.rs +++ b/lib/src/error.rs @@ -265,6 +265,10 @@ pub enum HandleError { #[error("Invalid pending KV lookup handle: {0}")] InvalidPendingKvLookupHandle(crate::wiggle_abi::types::PendingKvLookupHandle), + /// A insert handle was not valid. + #[error("Invalid pending KV insert handle: {0}")] + InvalidPendingKvInsertHandle(crate::wiggle_abi::types::PendingKvInsertHandle), + /// A dictionary handle was not valid. #[error("Invalid dictionary handle: {0}")] InvalidDictionaryHandle(crate::wiggle_abi::types::DictionaryHandle), diff --git a/lib/src/session.rs b/lib/src/session.rs index 15156a31..554beacc 100644 --- a/lib/src/session.rs +++ b/lib/src/session.rs @@ -3,7 +3,7 @@ mod async_item; mod downstream; -pub use async_item::{AsyncItem, PeekableTask, PendingKvTask}; +pub use async_item::{AsyncItem, PeekableTask, PendingKvInsertTask, PendingKvLookupTask}; use { self::downstream::DownstreamResponse, @@ -18,8 +18,8 @@ use { upstream::{SelectTarget, TlsConfig}, wiggle_abi::types::{ self, BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle, - ObjectStoreHandle, PendingKvLookupHandle, PendingRequestHandle, RequestHandle, - ResponseHandle, SecretHandle, SecretStoreHandle, + ObjectStoreHandle, PendingKvInsertHandle, PendingKvLookupHandle, PendingRequestHandle, + RequestHandle, ResponseHandle, SecretHandle, SecretStoreHandle, }, }, cranelift_entity::{entity_impl, PrimaryMap}, @@ -631,6 +631,53 @@ impl Session { ) -> Result<(), ObjectStoreError> { self.object_store.insert(obj_store_key, obj_key, obj) } + + /// Insert a [`PendingKvInsert`] into the session. + /// + /// This method returns a new [`PendingKvInsertHandle`], which can then be used to access + /// and mutate the pending insert. + pub fn insert_pending_kv_insert( + &mut self, + pending: PendingKvInsertTask, + ) -> PendingKvInsertHandle { + self.async_items + .push(Some(AsyncItem::PendingKvInsert(pending))) + .into() + } + + /// Take ownership of a [`PendingKvInsert`], given its [`PendingKvInsertHandle`]. + /// + /// Returns a [`HandleError`] if the handle is not associated with a pending insert in the + /// session. + pub fn take_pending_kv_insert( + &mut self, + handle: PendingKvInsertHandle, + ) -> Result { + // check that this is a pending request before removing it + let _ = self.pending_kv_insert(handle)?; + + self.async_items + .get_mut(handle.into()) + .and_then(Option::take) + .and_then(AsyncItem::into_pending_kv_insert) + .ok_or(HandleError::InvalidPendingKvInsertHandle(handle)) + } + + /// Get a reference to a [`PendingInsert`], given its [`PendingKvInsertHandle`]. + /// + /// Returns a [`HandleError`] if the handle is not associated with a insert in the + /// session. + pub fn pending_kv_insert( + &self, + handle: PendingKvInsertHandle, + ) -> Result<&PendingKvInsertTask, HandleError> { + self.async_items + .get(handle.into()) + .and_then(Option::as_ref) + .and_then(AsyncItem::as_pending_kv_insert) + .ok_or(HandleError::InvalidPendingKvInsertHandle(handle)) + } + pub fn obj_lookup( &self, obj_store_key: &ObjectStoreKey, @@ -643,7 +690,10 @@ impl Session { /// /// This method returns a new [`PendingKvLookupHandle`], which can then be used to access /// and mutate the pending lookup. - pub fn insert_pending_kv_lookup(&mut self, pending: PendingKvTask) -> PendingKvLookupHandle { + pub fn insert_pending_kv_lookup( + &mut self, + pending: PendingKvLookupTask, + ) -> PendingKvLookupHandle { self.async_items .push(Some(AsyncItem::PendingKvLookup(pending))) .into() @@ -656,7 +706,7 @@ impl Session { pub fn take_pending_kv_lookup( &mut self, handle: PendingKvLookupHandle, - ) -> Result { + ) -> Result { // check that this is a pending request before removing it let _ = self.pending_kv_lookup(handle)?; @@ -674,7 +724,7 @@ impl Session { pub fn pending_kv_lookup( &self, handle: PendingKvLookupHandle, - ) -> Result<&PendingKvTask, HandleError> { + ) -> Result<&PendingKvLookupTask, HandleError> { self.async_items .get(handle.into()) .and_then(Option::as_ref) @@ -972,3 +1022,15 @@ impl From for PendingKvLookupHandle { PendingKvLookupHandle::from(h.as_u32()) } } + +impl From for AsyncItemHandle { + fn from(h: PendingKvInsertHandle) -> AsyncItemHandle { + AsyncItemHandle::from_u32(h.into()) + } +} + +impl From for PendingKvInsertHandle { + fn from(h: AsyncItemHandle) -> PendingKvInsertHandle { + PendingKvInsertHandle::from(h.as_u32()) + } +} diff --git a/lib/src/session/async_item.rs b/lib/src/session/async_item.rs index 46a443cd..7804960d 100644 --- a/lib/src/session/async_item.rs +++ b/lib/src/session/async_item.rs @@ -6,7 +6,8 @@ use futures::FutureExt; use http::Response; use tokio::sync::oneshot; -pub type PendingKvTask = PeekableTask, ObjectStoreError>>; +pub type PendingKvLookupTask = PeekableTask, ObjectStoreError>>; +pub type PendingKvInsertTask = PeekableTask>; /// Represents either a full body, or the write end of a streaming body. /// @@ -17,7 +18,8 @@ pub enum AsyncItem { Body(Body), StreamingBody(StreamingBody), PendingReq(PeekableTask>), - PendingKvLookup(PendingKvTask), + PendingKvLookup(PendingKvLookupTask), + PendingKvInsert(PendingKvInsertTask), } impl AsyncItem { @@ -74,20 +76,34 @@ impl AsyncItem { } } - pub fn as_pending_kv_lookup(&self) -> Option<&PendingKvTask> { + pub fn as_pending_kv_lookup(&self) -> Option<&PendingKvLookupTask> { match self { Self::PendingKvLookup(req) => Some(req), _ => None, } } - pub fn into_pending_kv_lookup(self) -> Option { + pub fn into_pending_kv_lookup(self) -> Option { match self { Self::PendingKvLookup(req) => Some(req), _ => None, } } + pub fn as_pending_kv_insert(&self) -> Option<&PendingKvInsertTask> { + match self { + Self::PendingKvInsert(req) => Some(req), + _ => None, + } + } + + pub fn into_pending_kv_insert(self) -> Option { + match self { + Self::PendingKvInsert(req) => Some(req), + _ => None, + } + } + pub fn as_pending_req(&self) -> Option<&PeekableTask>> { match self { Self::PendingReq(req) => Some(req), @@ -115,6 +131,7 @@ impl AsyncItem { Self::Body(body) => body.await_ready().await, Self::PendingReq(req) => req.await_ready().await, Self::PendingKvLookup(obj) => obj.await_ready().await, + Self::PendingKvInsert(obj) => obj.await_ready().await, } } @@ -129,12 +146,18 @@ impl From>> for AsyncItem { } } -impl From for AsyncItem { - fn from(task: PendingKvTask) -> Self { +impl From for AsyncItem { + fn from(task: PendingKvLookupTask) -> Self { Self::PendingKvLookup(task) } } +impl From for AsyncItem { + fn from(task: PendingKvInsertTask) -> Self { + Self::PendingKvInsert(task) + } +} + #[derive(Debug)] pub enum PeekableTask { Waiting(oneshot::Receiver>), diff --git a/lib/src/wiggle_abi.rs b/lib/src/wiggle_abi.rs index 9ee80fe8..b22ad93f 100644 --- a/lib/src/wiggle_abi.rs +++ b/lib/src/wiggle_abi.rs @@ -72,7 +72,7 @@ wiggle::from_witx!({ errors: { fastly_status => Error }, async: { fastly_async_io::{select}, - fastly_object_store::{insert, lookup_async, pending_lookup_wait}, + fastly_object_store::{insert, insert_async, pending_insert_wait, lookup_async, pending_lookup_wait}, fastly_http_body::{append, read, write}, fastly_http_req::{ pending_req_select, pending_req_select_v2, pending_req_poll, pending_req_poll_v2, diff --git a/lib/src/wiggle_abi/obj_store_impl.rs b/lib/src/wiggle_abi/obj_store_impl.rs index 1d0cc908..9c5a54d5 100644 --- a/lib/src/wiggle_abi/obj_store_impl.rs +++ b/lib/src/wiggle_abi/obj_store_impl.rs @@ -1,6 +1,6 @@ //! fastly_obj_store` hostcall implementations. -use super::types::PendingKvLookupHandle; +use super::types::{PendingKvInsertHandle, PendingKvLookupHandle}; use crate::session::PeekableTask; use { @@ -101,4 +101,30 @@ impl FastlyObjectStore for Session { Ok(()) } + + async fn insert_async<'a>( + &mut self, + store: ObjectStoreHandle, + key: &GuestPtr, + body_handle: BodyHandle, + opt_pending_body_handle_out: &GuestPtr, + ) -> Result<(), Error> { + let store = self.get_obj_store_key(store).unwrap().clone(); + let key = ObjectKey::new(&*key.as_str()?.ok_or(Error::SharedMemory)?)?; + let bytes = self.take_body(body_handle)?.read_into_vec().await?; + let fut = futures::future::ok(self.obj_insert(store, key, bytes)); + let task = PeekableTask::spawn(fut).await; + opt_pending_body_handle_out.write(self.insert_pending_kv_insert(task))?; + Ok(()) + } + + async fn pending_insert_wait( + &mut self, + pending_insert_handle: PendingKvInsertHandle, + ) -> Result<(), Error> { + Ok((self + .take_pending_kv_insert(pending_insert_handle)? + .recv() + .await?)?) + } }