Skip to content

Commit

Permalink
Implement Async IO Select Hostcalls in Viceroy
Browse files Browse the repository at this point in the history
This commit adds the fastly_async_io hostcalls `select` and `is_ready`
to Viceroy. Up to this point we only would do `select` for things like
`PendingRequest`. Now we abstract over an `AsyncItem` which could be a
`PendingRequest`, `StreamingBody`, or `Body` and you can call `select`
over any of these async io operations. Most of this code is to
accomodate these changes while maintaining the behavior of `select` for
`PendingRequest` utilizing these new calls and creating the `AsyncItem`
enum to hold all of these differing types in the same `async_item` map.

We also add an integration test similar to the C@E platform's to make
sure that we implement the behavior correctly. With this we can be
confident that select works the way we intend it too! As a bonus side
effect of these changes we also add async support for hosts in our
testing code in Viceroy which gives us some more flexibility in what we
can accomplish or use!
  • Loading branch information
mgattozzi committed Nov 15, 2022
1 parent 48ea9ef commit 9fa9420
Show file tree
Hide file tree
Showing 22 changed files with 1,172 additions and 445 deletions.
362 changes: 208 additions & 154 deletions Cargo.lock

Large diffs are not rendered by default.

223 changes: 223 additions & 0 deletions cli/tests/integration/async_io.rs
@@ -0,0 +1,223 @@
use crate::common::Test;
use crate::common::TestResult;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::StatusCode;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::Barrier;

#[tokio::test(flavor = "multi_thread")]
async fn async_io_methods() -> TestResult {
let request_count = Arc::new(AtomicUsize::new(0));
let req_count_1 = request_count.clone();
let req_count_2 = request_count.clone();
let req_count_3 = request_count.clone();

let barrier = Arc::new(Barrier::new(3));
let barrier_1 = barrier.clone();
let barrier_2 = barrier.clone();

let test = Test::using_fixture("async_io.wasm")
.backend("Simple", "http://127.0.0.1:9000/", None)
.async_host(9000, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "simple.org");
let req_count_1 = req_count_1.clone();
let barrier_1 = barrier_1.clone();
Box::new(async move {
match req_count_1.load(Ordering::Relaxed) {
0 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
1 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
2 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
3 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
_ => unreachable!(),
}
})
})
.backend("ReadBody", "http://127.0.0.1:9001/", None)
.async_host(9001, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "readbody.org");
let req_count_2 = req_count_2.clone();
Box::new(async move {
match req_count_2.load(Ordering::Relaxed) {
0 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
1 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
2 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
3 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
_ => unreachable!(),
}
})
})
.backend("WriteBody", "http://127.0.0.1:9002/", None)
.async_host(9002, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "writebody.org");
let req_count_3 = req_count_3.clone();
let barrier_2 = barrier_2.clone();
Box::new(async move {
match req_count_3.load(Ordering::Relaxed) {
0 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
1 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
2 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
3 => {
let _body = hyper::body::to_bytes(req.into_body()).await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
_ => unreachable!(),
}
})
});

// request_count is 0 here
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;

assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "false");
assert_eq!(resp.headers()["Read-Ready"], "false");
assert_eq!(resp.headers()["Write-Ready"], "false");
assert_eq!(resp.headers()["Ready-Index"], "timeout");

barrier.wait().await;

request_count.store(1, Ordering::Relaxed);
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "true");
assert_eq!(resp.headers()["Read-Ready"], "false");
assert_eq!(resp.headers()["Write-Ready"], "false");
assert_eq!(resp.headers()["Ready-Index"], "0");
let temp_barrier = barrier.clone();
let _task = tokio::task::spawn(async move { temp_barrier.wait().await });
barrier.wait().await;

request_count.store(2, Ordering::Relaxed);
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "false");
assert_eq!(resp.headers()["Read-Ready"], "true");
assert_eq!(resp.headers()["Write-Ready"], "false");
assert_eq!(resp.headers()["Ready-Index"], "1");
barrier.wait().await;

request_count.store(3, Ordering::Relaxed);
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "false");
assert_eq!(resp.headers()["Read-Ready"], "false");
assert_eq!(resp.headers()["Write-Ready"], "true");
assert_eq!(resp.headers()["Ready-Index"], "2");
let temp_barrier = barrier.clone();
let _task = tokio::task::spawn(async move { temp_barrier.wait().await });
barrier.wait().await;

let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.header("Empty-Select-Timeout", "0")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);

let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.header("Empty-Select-Timeout", "1")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Ready-Index"], "timeout");

Ok(())
}
59 changes: 41 additions & 18 deletions cli/tests/integration/common.rs
Expand Up @@ -3,7 +3,7 @@

use futures::stream::StreamExt;
use hyper::{service, Body as HyperBody, Request, Response, Server};
use std::{convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc};
use std::{convert::Infallible, future::Future, net::SocketAddr, path::PathBuf, sync::Arc};
use tokio::sync::Mutex;
use tracing_subscriber::filter::EnvFilter;
use viceroy_lib::{
Expand Down Expand Up @@ -126,7 +126,17 @@ impl Test {
HostFn: Fn(Request<Vec<u8>>) -> Response<Vec<u8>>,
HostFn: Send + Sync + 'static,
{
let service = Arc::new(service);
let service = Arc::new(TestService::Sync(Arc::new(service)));
self.hosts.push(HostSpec { port, service });
self
}

pub fn async_host<HostFn>(mut self, port: u16, service: HostFn) -> Self
where
HostFn: Fn(Request<HyperBody>) -> AsyncResp,
HostFn: Send + Sync + 'static,
{
let service = Arc::new(TestService::Async(Arc::new(service)));
self.hosts.push(HostSpec { port, service });
self
}
Expand Down Expand Up @@ -273,7 +283,7 @@ impl Test {
/// The specification of a mock host, as part of a `Test` builder.
struct HostSpec {
port: u16,
service: Arc<dyn Fn(Request<Vec<u8>>) -> Response<Vec<u8>> + Send + Sync>,
service: Arc<TestService>,
}

/// A handle to a running mock host, used to gracefully shut down the host on test completion.
Expand All @@ -292,21 +302,26 @@ impl HostSpec {
// we transform `service` into an async function that consumes Hyper bodies. that requires a bit
// of `Arc` and `move` operations because each invocation needs to produce a distinct `Future`
let async_service = Arc::new(move |req: Request<HyperBody>| {
let (parts, body) = req.into_parts();
let mut body = Box::new(body); // for pinning
let service = service.clone();

async move {
// read out all of the bytes from the body into a vector, then re-assemble the request
let mut body_bytes = Vec::new();
while let Some(chunk) = body.next().await {
body_bytes.extend_from_slice(&chunk.unwrap());
}
let req = Request::from_parts(parts, body_bytes);

// pass the request through the host function, then convert its body into the form
// that Hyper wants
let resp = service(req).map(HyperBody::from);
let resp = match &*service {
TestService::Sync(s) => {
let (parts, body) = req.into_parts();
let mut body = Box::new(body); // for pinning
// read out all of the bytes from the body into a vector, then re-assemble the request
let mut body_bytes = Vec::new();
while let Some(chunk) = body.next().await {
body_bytes.extend_from_slice(&chunk.unwrap());
}
let req = Request::from_parts(parts, body_bytes);

// pass the request through the host function, then convert its body into the form
// that Hyper wants
s(req).map(HyperBody::from)
}
TestService::Async(s) => Box::into_pin(s(req)).await.map(HyperBody::from),
};

let res: Result<_, hyper::Error> = Ok(resp);
res
Expand Down Expand Up @@ -345,8 +360,16 @@ impl HostHandle {
self.terminate_signal
.send(())
.expect("could not send terminate signal to mock host");
self.task_handle
.await
.expect("mock host did not terminate cleanly")
if !self.task_handle.is_finished() {
self.task_handle.abort();
}
}
}

#[derive(Clone)]
pub enum TestService {
Sync(Arc<dyn Fn(Request<Vec<u8>>) -> Response<Vec<u8>> + Send + Sync>),
Async(Arc<dyn Fn(Request<HyperBody>) -> AsyncResp + Send + Sync>),
}

type AsyncResp = Box<dyn Future<Output = Response<HyperBody>> + Send + Sync>;
1 change: 1 addition & 0 deletions cli/tests/integration/main.rs
@@ -1,3 +1,4 @@
mod async_io;
mod body;
mod common;
mod dictionary_lookup;
Expand Down

0 comments on commit 9fa9420

Please sign in to comment.