Skip to content

Commit

Permalink
Add async inserts for kv store (fastly#329)
Browse files Browse the repository at this point in the history
  • Loading branch information
computermouth authored and cmckendry committed Feb 8, 2024
1 parent 743f8a2 commit 0ba8b0e
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 15 deletions.
13 changes: 13 additions & 0 deletions lib/compute-at-edge-abi/compute-at-edge.witx
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,19 @@
(param $body_handle $body_handle)
(result $err (expected (error $fastly_status)))
)

(@interface func (export "insert_async")
(param $store $object_store_handle)
(param $key string)
(param $body_handle $body_handle)
(param $pending_handle_out (@witx pointer $pending_kv_insert_handle))
(result $err (expected (error $fastly_status)))
)

(@interface func (export "pending_insert_wait")
(param $pending_objstr_handle $pending_kv_insert_handle)
(result $err (expected (error $fastly_status)))
)
)

(module $fastly_secret_store
Expand Down
4 changes: 3 additions & 1 deletion lib/compute-at-edge-abi/typenames.witx
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@
(typename $dictionary_handle (handle))
;;; A handle to an Object Store.
(typename $object_store_handle (handle))
;;; A handle to a pending KV request.
;;; A handle to a pending KV lookup request.
(typename $pending_kv_lookup_handle (handle))
;;; A handle to a pending KV insert request.
(typename $pending_kv_insert_handle (handle))
;;; A handle to a Secret Store.
(typename $secret_store_handle (handle))
;;; A handle to an individual secret.
Expand Down
4 changes: 4 additions & 0 deletions lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ pub enum HandleError {
#[error("Invalid pending KV lookup handle: {0}")]
InvalidPendingKvLookupHandle(crate::wiggle_abi::types::PendingKvLookupHandle),

/// A insert handle was not valid.
#[error("Invalid pending KV insert handle: {0}")]
InvalidPendingKvInsertHandle(crate::wiggle_abi::types::PendingKvInsertHandle),

/// A dictionary handle was not valid.
#[error("Invalid dictionary handle: {0}")]
InvalidDictionaryHandle(crate::wiggle_abi::types::DictionaryHandle),
Expand Down
74 changes: 68 additions & 6 deletions lib/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod async_item;
mod downstream;

pub use async_item::{AsyncItem, PeekableTask, PendingKvTask};
pub use async_item::{AsyncItem, PeekableTask, PendingKvInsertTask, PendingKvLookupTask};

use {
self::downstream::DownstreamResponse,
Expand All @@ -18,8 +18,8 @@ use {
upstream::{SelectTarget, TlsConfig},
wiggle_abi::types::{
self, BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle,
ObjectStoreHandle, PendingKvLookupHandle, PendingRequestHandle, RequestHandle,
ResponseHandle, SecretHandle, SecretStoreHandle,
ObjectStoreHandle, PendingKvInsertHandle, PendingKvLookupHandle, PendingRequestHandle,
RequestHandle, ResponseHandle, SecretHandle, SecretStoreHandle,
},
},
cranelift_entity::{entity_impl, PrimaryMap},
Expand Down Expand Up @@ -631,6 +631,53 @@ impl Session {
) -> Result<(), ObjectStoreError> {
self.object_store.insert(obj_store_key, obj_key, obj)
}

/// Insert a [`PendingKvInsert`] into the session.
///
/// This method returns a new [`PendingKvInsertHandle`], which can then be used to access
/// and mutate the pending insert.
pub fn insert_pending_kv_insert(
&mut self,
pending: PendingKvInsertTask,
) -> PendingKvInsertHandle {
self.async_items
.push(Some(AsyncItem::PendingKvInsert(pending)))
.into()
}

/// Take ownership of a [`PendingKvInsert`], given its [`PendingKvInsertHandle`].
///
/// Returns a [`HandleError`] if the handle is not associated with a pending insert in the
/// session.
pub fn take_pending_kv_insert(
&mut self,
handle: PendingKvInsertHandle,
) -> Result<PendingKvInsertTask, HandleError> {
// check that this is a pending request before removing it
let _ = self.pending_kv_insert(handle)?;

self.async_items
.get_mut(handle.into())
.and_then(Option::take)
.and_then(AsyncItem::into_pending_kv_insert)
.ok_or(HandleError::InvalidPendingKvInsertHandle(handle))
}

/// Get a reference to a [`PendingInsert`], given its [`PendingKvInsertHandle`].
///
/// Returns a [`HandleError`] if the handle is not associated with a insert in the
/// session.
pub fn pending_kv_insert(
&self,
handle: PendingKvInsertHandle,
) -> Result<&PendingKvInsertTask, HandleError> {
self.async_items
.get(handle.into())
.and_then(Option::as_ref)
.and_then(AsyncItem::as_pending_kv_insert)
.ok_or(HandleError::InvalidPendingKvInsertHandle(handle))
}

pub fn obj_lookup(
&self,
obj_store_key: &ObjectStoreKey,
Expand All @@ -643,7 +690,10 @@ impl Session {
///
/// This method returns a new [`PendingKvLookupHandle`], which can then be used to access
/// and mutate the pending lookup.
pub fn insert_pending_kv_lookup(&mut self, pending: PendingKvTask) -> PendingKvLookupHandle {
pub fn insert_pending_kv_lookup(
&mut self,
pending: PendingKvLookupTask,
) -> PendingKvLookupHandle {
self.async_items
.push(Some(AsyncItem::PendingKvLookup(pending)))
.into()
Expand All @@ -656,7 +706,7 @@ impl Session {
pub fn take_pending_kv_lookup(
&mut self,
handle: PendingKvLookupHandle,
) -> Result<PendingKvTask, HandleError> {
) -> Result<PendingKvLookupTask, HandleError> {
// check that this is a pending request before removing it
let _ = self.pending_kv_lookup(handle)?;

Expand All @@ -674,7 +724,7 @@ impl Session {
pub fn pending_kv_lookup(
&self,
handle: PendingKvLookupHandle,
) -> Result<&PendingKvTask, HandleError> {
) -> Result<&PendingKvLookupTask, HandleError> {
self.async_items
.get(handle.into())
.and_then(Option::as_ref)
Expand Down Expand Up @@ -972,3 +1022,15 @@ impl From<AsyncItemHandle> for PendingKvLookupHandle {
PendingKvLookupHandle::from(h.as_u32())
}
}

impl From<PendingKvInsertHandle> for AsyncItemHandle {
fn from(h: PendingKvInsertHandle) -> AsyncItemHandle {
AsyncItemHandle::from_u32(h.into())
}
}

impl From<AsyncItemHandle> for PendingKvInsertHandle {
fn from(h: AsyncItemHandle) -> PendingKvInsertHandle {
PendingKvInsertHandle::from(h.as_u32())
}
}
35 changes: 29 additions & 6 deletions lib/src/session/async_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use futures::FutureExt;
use http::Response;
use tokio::sync::oneshot;

pub type PendingKvTask = PeekableTask<Result<Vec<u8>, ObjectStoreError>>;
pub type PendingKvLookupTask = PeekableTask<Result<Vec<u8>, ObjectStoreError>>;
pub type PendingKvInsertTask = PeekableTask<Result<(), ObjectStoreError>>;

/// Represents either a full body, or the write end of a streaming body.
///
Expand All @@ -17,7 +18,8 @@ pub enum AsyncItem {
Body(Body),
StreamingBody(StreamingBody),
PendingReq(PeekableTask<Response<Body>>),
PendingKvLookup(PendingKvTask),
PendingKvLookup(PendingKvLookupTask),
PendingKvInsert(PendingKvInsertTask),
}

impl AsyncItem {
Expand Down Expand Up @@ -74,20 +76,34 @@ impl AsyncItem {
}
}

pub fn as_pending_kv_lookup(&self) -> Option<&PendingKvTask> {
pub fn as_pending_kv_lookup(&self) -> Option<&PendingKvLookupTask> {
match self {
Self::PendingKvLookup(req) => Some(req),
_ => None,
}
}

pub fn into_pending_kv_lookup(self) -> Option<PendingKvTask> {
pub fn into_pending_kv_lookup(self) -> Option<PendingKvLookupTask> {
match self {
Self::PendingKvLookup(req) => Some(req),
_ => None,
}
}

pub fn as_pending_kv_insert(&self) -> Option<&PendingKvInsertTask> {
match self {
Self::PendingKvInsert(req) => Some(req),
_ => None,
}
}

pub fn into_pending_kv_insert(self) -> Option<PendingKvInsertTask> {
match self {
Self::PendingKvInsert(req) => Some(req),
_ => None,
}
}

pub fn as_pending_req(&self) -> Option<&PeekableTask<Response<Body>>> {
match self {
Self::PendingReq(req) => Some(req),
Expand Down Expand Up @@ -115,6 +131,7 @@ impl AsyncItem {
Self::Body(body) => body.await_ready().await,
Self::PendingReq(req) => req.await_ready().await,
Self::PendingKvLookup(obj) => obj.await_ready().await,
Self::PendingKvInsert(obj) => obj.await_ready().await,
}
}

Expand All @@ -129,12 +146,18 @@ impl From<PeekableTask<Response<Body>>> for AsyncItem {
}
}

impl From<PendingKvTask> for AsyncItem {
fn from(task: PendingKvTask) -> Self {
impl From<PendingKvLookupTask> for AsyncItem {
fn from(task: PendingKvLookupTask) -> Self {
Self::PendingKvLookup(task)
}
}

impl From<PendingKvInsertTask> for AsyncItem {
fn from(task: PendingKvInsertTask) -> Self {
Self::PendingKvInsert(task)
}
}

#[derive(Debug)]
pub enum PeekableTask<T> {
Waiting(oneshot::Receiver<Result<T, Error>>),
Expand Down
2 changes: 1 addition & 1 deletion lib/src/wiggle_abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ wiggle::from_witx!({
errors: { fastly_status => Error },
async: {
fastly_async_io::{select},
fastly_object_store::{insert, lookup_async, pending_lookup_wait},
fastly_object_store::{insert, insert_async, pending_insert_wait, lookup_async, pending_lookup_wait},
fastly_http_body::{append, read, write},
fastly_http_req::{
pending_req_select, pending_req_select_v2, pending_req_poll, pending_req_poll_v2,
Expand Down
28 changes: 27 additions & 1 deletion lib/src/wiggle_abi/obj_store_impl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! fastly_obj_store` hostcall implementations.

use super::types::PendingKvLookupHandle;
use super::types::{PendingKvInsertHandle, PendingKvLookupHandle};
use crate::session::PeekableTask;

use {
Expand Down Expand Up @@ -101,4 +101,30 @@ impl FastlyObjectStore for Session {

Ok(())
}

async fn insert_async<'a>(
&mut self,
store: ObjectStoreHandle,
key: &GuestPtr<str>,
body_handle: BodyHandle,
opt_pending_body_handle_out: &GuestPtr<PendingKvInsertHandle>,
) -> Result<(), Error> {
let store = self.get_obj_store_key(store).unwrap().clone();
let key = ObjectKey::new(&*key.as_str()?.ok_or(Error::SharedMemory)?)?;
let bytes = self.take_body(body_handle)?.read_into_vec().await?;
let fut = futures::future::ok(self.obj_insert(store, key, bytes));
let task = PeekableTask::spawn(fut).await;
opt_pending_body_handle_out.write(self.insert_pending_kv_insert(task))?;
Ok(())
}

async fn pending_insert_wait(
&mut self,
pending_insert_handle: PendingKvInsertHandle,
) -> Result<(), Error> {
Ok((self
.take_pending_kv_insert(pending_insert_handle)?
.recv()
.await?)?)
}
}

0 comments on commit 0ba8b0e

Please sign in to comment.