Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Async IO Select Hostcalls #188

Merged
merged 5 commits into from Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
362 changes: 208 additions & 154 deletions Cargo.lock

Large diffs are not rendered by default.

230 changes: 230 additions & 0 deletions cli/tests/integration/async_io.rs
@@ -0,0 +1,230 @@
use crate::common::Test;
use crate::common::TestResult;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::StatusCode;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::Barrier;

// On Windows, streaming body backpressure doesn't seem to work as expected, either
// due to the Hyper client or server too eagerly clearing the chunk buffer. This issue does
// not appear related to async I/O hostcalls; the behavior is seen within the streaming body
// implementation in general. For the time being, this test is unix-only.
//
// https://github.com/fastly/Viceroy/issues/207 tracks the broader issue.
#[cfg(target_family = "unix")]
#[tokio::test(flavor = "multi_thread")]
async fn async_io_methods() -> TestResult {
let request_count = Arc::new(AtomicUsize::new(0));
let req_count_1 = request_count.clone();
let req_count_2 = request_count.clone();
let req_count_3 = request_count.clone();

let barrier = Arc::new(Barrier::new(3));
let barrier_1 = barrier.clone();
let barrier_2 = barrier.clone();

let test = Test::using_fixture("async_io.wasm")
.backend("Simple", "http://127.0.0.1:9000/", None)
.async_host(9000, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "simple.org");
let req_count_1 = req_count_1.clone();
let barrier_1 = barrier_1.clone();
Box::new(async move {
match req_count_1.load(Ordering::Relaxed) {
0 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
1 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
2 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
3 => {
barrier_1.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
_ => unreachable!(),
}
})
})
.backend("ReadBody", "http://127.0.0.1:9001/", None)
.async_host(9001, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "readbody.org");
let req_count_2 = req_count_2.clone();
Box::new(async move {
match req_count_2.load(Ordering::Relaxed) {
0 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
1 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
2 => Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
3 => Response::builder()
.header("Transfer-Encoding", "chunked")
.status(StatusCode::OK)
.body(Body::empty())
.unwrap(),
_ => unreachable!(),
}
})
})
.backend("WriteBody", "http://127.0.0.1:9002/", None)
.async_host(9002, move |req: Request<Body>| {
assert_eq!(req.headers()["Host"], "writebody.org");
let req_count_3 = req_count_3.clone();
let barrier_2 = barrier_2.clone();
Box::new(async move {
match req_count_3.load(Ordering::Relaxed) {
0 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
1 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
2 => {
barrier_2.wait().await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
3 => {
let _body = hyper::body::to_bytes(req.into_body()).await;
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}
_ => unreachable!(),
}
})
});

// request_count is 0 here
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;

assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "false");
assert_eq!(resp.headers()["Read-Ready"], "false");
assert_eq!(resp.headers()["Write-Ready"], "false");
assert_eq!(resp.headers()["Ready-Index"], "timeout");

barrier.wait().await;

request_count.store(1, Ordering::Relaxed);
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "true");
assert_eq!(resp.headers()["Read-Ready"], "false");
assert_eq!(resp.headers()["Write-Ready"], "false");
assert_eq!(resp.headers()["Ready-Index"], "0");
let temp_barrier = barrier.clone();
let _task = tokio::task::spawn(async move { temp_barrier.wait().await });
barrier.wait().await;

request_count.store(2, Ordering::Relaxed);
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "false");
assert_eq!(resp.headers()["Read-Ready"], "true");
assert_eq!(resp.headers()["Write-Ready"], "false");
assert_eq!(resp.headers()["Ready-Index"], "1");
barrier.wait().await;

request_count.store(3, Ordering::Relaxed);
let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Simple-Ready"], "false");
assert_eq!(resp.headers()["Read-Ready"], "false");
assert_eq!(resp.headers()["Write-Ready"], "true");
assert_eq!(resp.headers()["Ready-Index"], "2");
let temp_barrier = barrier.clone();
let _task = tokio::task::spawn(async move { temp_barrier.wait().await });
barrier.wait().await;

let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.header("Empty-Select-Timeout", "0")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);

let resp = test
.against(
Request::builder()
.header("Host", "example.org")
.header("Empty-Select-Timeout", "1")
.body(Body::empty())
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.headers()["Ready-Index"], "timeout");

Ok(())
}
59 changes: 41 additions & 18 deletions cli/tests/integration/common.rs
Expand Up @@ -3,7 +3,7 @@

use futures::stream::StreamExt;
use hyper::{service, Body as HyperBody, Request, Response, Server};
use std::{convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc};
use std::{convert::Infallible, future::Future, net::SocketAddr, path::PathBuf, sync::Arc};
use tokio::sync::Mutex;
use tracing_subscriber::filter::EnvFilter;
use viceroy_lib::{
Expand Down Expand Up @@ -126,7 +126,17 @@ impl Test {
HostFn: Fn(Request<Vec<u8>>) -> Response<Vec<u8>>,
HostFn: Send + Sync + 'static,
{
let service = Arc::new(service);
let service = Arc::new(TestService::Sync(Arc::new(service)));
self.hosts.push(HostSpec { port, service });
self
}

pub fn async_host<HostFn>(mut self, port: u16, service: HostFn) -> Self
where
HostFn: Fn(Request<HyperBody>) -> AsyncResp,
HostFn: Send + Sync + 'static,
{
let service = Arc::new(TestService::Async(Arc::new(service)));
self.hosts.push(HostSpec { port, service });
self
}
Expand Down Expand Up @@ -273,7 +283,7 @@ impl Test {
/// The specification of a mock host, as part of a `Test` builder.
struct HostSpec {
port: u16,
service: Arc<dyn Fn(Request<Vec<u8>>) -> Response<Vec<u8>> + Send + Sync>,
service: Arc<TestService>,
}

/// A handle to a running mock host, used to gracefully shut down the host on test completion.
Expand All @@ -292,21 +302,26 @@ impl HostSpec {
// we transform `service` into an async function that consumes Hyper bodies. that requires a bit
// of `Arc` and `move` operations because each invocation needs to produce a distinct `Future`
let async_service = Arc::new(move |req: Request<HyperBody>| {
let (parts, body) = req.into_parts();
let mut body = Box::new(body); // for pinning
let service = service.clone();

async move {
// read out all of the bytes from the body into a vector, then re-assemble the request
let mut body_bytes = Vec::new();
while let Some(chunk) = body.next().await {
body_bytes.extend_from_slice(&chunk.unwrap());
}
let req = Request::from_parts(parts, body_bytes);

// pass the request through the host function, then convert its body into the form
// that Hyper wants
let resp = service(req).map(HyperBody::from);
let resp = match &*service {
TestService::Sync(s) => {
let (parts, body) = req.into_parts();
let mut body = Box::new(body); // for pinning
// read out all of the bytes from the body into a vector, then re-assemble the request
let mut body_bytes = Vec::new();
while let Some(chunk) = body.next().await {
body_bytes.extend_from_slice(&chunk.unwrap());
}
let req = Request::from_parts(parts, body_bytes);

// pass the request through the host function, then convert its body into the form
// that Hyper wants
s(req).map(HyperBody::from)
}
TestService::Async(s) => Box::into_pin(s(req)).await.map(HyperBody::from),
};

let res: Result<_, hyper::Error> = Ok(resp);
res
Expand Down Expand Up @@ -345,8 +360,16 @@ impl HostHandle {
self.terminate_signal
.send(())
.expect("could not send terminate signal to mock host");
self.task_handle
.await
.expect("mock host did not terminate cleanly")
if !self.task_handle.is_finished() {
self.task_handle.abort();
}
}
}

#[derive(Clone)]
pub enum TestService {
Sync(Arc<dyn Fn(Request<Vec<u8>>) -> Response<Vec<u8>> + Send + Sync>),
Async(Arc<dyn Fn(Request<HyperBody>) -> AsyncResp + Send + Sync>),
}

type AsyncResp = Box<dyn Future<Output = Response<HyperBody>> + Send + Sync>;
1 change: 1 addition & 0 deletions cli/tests/integration/main.rs
@@ -1,3 +1,4 @@
mod async_io;
mod body;
mod common;
mod dictionary_lookup;
Expand Down