Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dynamic backends. #163

Merged
merged 5 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cli/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ mod opts_tests {
kind: ErrorKind::ValueValidation,
message,
..
}) if message.contains("invalid socket address syntax") => Ok(()),
}) if message.contains("invalid socket address syntax")
| message.contains("invalid IP address syntax") =>
{
Ok(())
}
res => panic!("unexpected result: {:?}", res),
}
}
Expand Down
64 changes: 47 additions & 17 deletions cli/tests/integration/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,22 @@ impl Test {
}
}

/// Pass the given request through this test.
/// Pass the given requests through this test, returning the associated responses.
///
/// A `Test` can be used repeatedly against different requests. Note, however, that
/// a fresh execution context is set up each time.
pub async fn against(&self, req: Request<impl Into<HyperBody>>) -> Response<Body> {
/// A `Test` can be used repeatedly against different requests, either individually
/// (as with `against()`) or in batches (as with `against_many()`). The difference
/// between calling this function with many requests, rather than calling `against()`
/// multiple times, is that the requests shared in an `against_many()` call will share
/// the same execution context. This can be useful when validating interactions across
/// shared state in the context.
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
///
/// Subsequent calls to `against_many()` (or `against()`) will use a fresh context.
pub async fn against_many(
&self,
mut reqs: Vec<Request<impl Into<HyperBody>>>,
) -> Vec<Response<Body>> {
let _test_lock_guard = TEST_LOCK.lock().await;
let mut responses = Vec::with_capacity(reqs.len());

// Install a tracing subscriber. We use a human-readable event formatter in tests, using a
// writer that supports input capturing for `cargo test`. This subscribes to all events in
Expand All @@ -178,7 +188,7 @@ impl Test {
// spawn any mock hosts, keeping a handle on each host task for clean termination.
let host_handles: Vec<_> = self.hosts.iter().map(HostSpec::spawn).collect();

let resp = if self.via_hyper {
if self.via_hyper {
let svc = ViceroyService::new(ctx);
// We are going to host the service at port 7878, and so it's vital to make sure
// that we shut down the service after our test request, so that if there are
Expand All @@ -197,28 +207,48 @@ impl Test {
.expect("receiver error while shutting down hyper server")
}),
);
// Pass the request to the server via a Hyper client on the _current_ task:
let resp = hyper::Client::new()
.request(req.map(Into::into))
.await
.expect("hyper client error making test request");
// We're done with this test request, so shut down the server.

for req in reqs.drain(..) {
// Pass the request to the server via a Hyper client on the _current_ task:
let resp = hyper::Client::new()
.request(req.map(Into::into))
.await
.expect("hyper client error making test request");
responses.push(resp.map(Into::into));
}

// We're done with these test requests, so shut down the server.
tx.send(())
.expect("sender error while shutting down hyper server");
// Reap the task handle to ensure that the server did indeed shut down.
let _ = server_handle.await.expect("hyper server yielded an error");
resp.map(Into::into)
} else {
ctx.handle_request(req.map(Into::into), addr.ip())
.await
.expect("failed to handle the request")
};
for req in reqs.drain(..) {
let resp = ctx
.clone()
.handle_request(req.map(Into::into), addr.ip())
.await
.expect("failed to handle the request");
responses.push(resp);
}
}

for host in host_handles {
host.shutdown().await;
}

resp
responses
}

/// Pass the given request through this test.
///
/// A `Test` can be used repeatedly against different requests. Note, however, that
/// a fresh execution context is set up each time.
pub async fn against(&self, req: Request<impl Into<HyperBody>>) -> Response<Body> {
self.against_many(vec![req])
.await
.pop()
.expect("singleton back from against_many")
}

/// Pass an empty `GET 127.0.0.1:7878` request through this test.
Expand Down
1 change: 1 addition & 0 deletions cli/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ mod sending_response;
mod sleep;
mod upstream;
mod upstream_async;
mod upstream_dynamic;
mod upstream_streaming;
128 changes: 128 additions & 0 deletions cli/tests/integration/upstream_dynamic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use {
crate::common::{Test, TestResult},
hyper::{
header::{self, HeaderValue},
Request, Response, StatusCode,
},
};

#[tokio::test(flavor = "multi_thread")]
async fn upstream_sync() -> TestResult {
////////////////////////////////////////////////////////////////////////////////////
// Setup
////////////////////////////////////////////////////////////////////////////////////

// Set up the test harness
let test = Test::using_fixture("upstream-dynamic.wasm")
.backend("origin", "http://127.0.0.1:9000/", None)
// The "origin" backend simply echos the request body
.host(9000, |req| {
let body = req.into_body();
Response::new(body)
});

////////////////////////////////////////////////////////////////////////////////////
// A simple round-trip echo test to "origin", but with a dynamic backend
////////////////////////////////////////////////////////////////////////////////////

let resp = test
.against(
Request::post("http://localhost/")
.header("Dynamic-Backend", "127.0.0.1:9000")
.body("Hello, Viceroy!")
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.into_body().read_into_string().await?,
"Hello, Viceroy!"
);

////////////////////////////////////////////////////////////////////////////////////
// Test that you can still use standard backends without a problem
////////////////////////////////////////////////////////////////////////////////////

let resp = test
.against(
Request::post("http://localhost/")
.header("Static-Backend", "origin")
.body("Hello, Viceroy!")
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.into_body().read_into_string().await?,
"Hello, Viceroy!"
);

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn override_host_works() -> TestResult {
// Set up the test harness
let test = Test::using_fixture("upstream-dynamic.wasm")
.backend(
"override-host",
"http://127.0.0.1:9000/",
None, // Some("otherhost.com"),
)
.host(9000, |req| {
assert_eq!(
req.headers().get(header::HOST),
Some(&HeaderValue::from_static("otherhost.com"))
);
Response::new(vec![])
});

let resp = test
.via_hyper()
.against(
Request::get("http://localhost:7878/override")
.header("Dynamic-Backend", "127.0.0.1:9000")
.header("With-Override", "otherhost.com")
.body("")
.unwrap(),
)
.await;

assert_eq!(resp.status(), StatusCode::OK);

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn duplication_errors_right() -> TestResult {
// Set up the test harness
let test = Test::using_fixture("upstream-dynamic.wasm")
.backend("static", "http://127.0.0.1:9000/", None)
.host(9000, |_| Response::new(vec![]));

let resp = test
.against(
Request::get("http://localhost:7878/override")
.header("Dynamic-Backend", "127.0.0.1:9000")
.header("Supplementary-Backend", "dynamic-backend")
.body("")
.unwrap(),
)
.await;

assert_eq!(resp.status(), StatusCode::CONFLICT);

let resp = test
.against(
Request::get("http://localhost:7878/override")
.header("Dynamic-Backend", "127.0.0.1:9000")
.header("Supplementary-Backend", "static")
.body("")
.unwrap(),
)
.await;

assert_eq!(resp.status(), StatusCode::CONFLICT);

Ok(())
}
9 changes: 9 additions & 0 deletions lib/compute-at-edge-abi/compute-at-edge.witx
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@
(param $mode $framing_headers_mode)
(result $err (expected (error $fastly_status)))
)

;;; Create a backend for later use
(@interface func (export "register_dynamic_backend")
(param $name_prefix string)
(param $target string)
(param $backend_config_mask $backend_config_options)
(param $backend_configuration (@witx pointer $dynamic_backend_config))
(result $err (expected (error $fastly_status)))
)
)

(module $fastly_http_resp
Expand Down
41 changes: 41 additions & 0 deletions lib/compute-at-edge-abi/typenames.witx
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,44 @@
(enum (@witx tag u32)
$automatic
$manually_from_headers))

(typename $tls_version
(flags (@witx repr u32)
$tls_1
$tls_1_1
$tls_1_2
$tls_1_3))

(typename $backend_config_options
(flags (@witx repr u32)
$reserved
$host_override
$connect_timeout
$first_byte_timeout
$between_bytes_timeout
$use_ssl
$ssl_min_version
$ssl_max_version
$cert_hostname
$ca_cert
$ciphers
$sni_hostname))

(typename $dynamic_backend_config
(record
(field $host_override (@witx pointer (@witx char8)))
(field $host_override_len u32)
(field $connect_timeout_ms u32)
(field $first_byte_timeout_ms u32)
(field $between_bytes_timeout_ms u32)
(field $ssl_min_version $tls_version)
(field $ssl_max_version $tls_version)
(field $cert_hostname (@witx pointer (@witx char8)))
(field $cert_hostname_len u32)
(field $ca_cert (@witx pointer (@witx char8)))
(field $ca_cert_len u32)
(field $ciphers (@witx pointer (@witx char8)))
(field $ciphers_len u32)
(field $sni_hostname (@witx pointer (@witx char8)))
(field $sni_hostname_len u32)
))
10 changes: 9 additions & 1 deletion lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ pub enum Error {

#[error("Could not load native certificates: {0}")]
BadCerts(std::io::Error),

#[error("Could not generate new backend name from '{0}'")]
BackendNameRegistryError(String),

#[error(transparent)]
HttpError(#[from] http::Error),
}

impl Error {
Expand Down Expand Up @@ -141,7 +147,9 @@ impl Error {
| Error::Other(_)
| Error::StreamingChunkSend
| Error::UnknownBackend(_)
| Error::Utf8Expected(_) => FastlyStatus::Error,
| Error::Utf8Expected(_)
| Error::BackendNameRegistryError(_)
| Error::HttpError(_) => FastlyStatus::Error,
}
}

Expand Down
35 changes: 30 additions & 5 deletions lib/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub struct Session {
///
/// Populated prior to guest execution, and never modified.
backends: Arc<Backends>,
/// The backends dynamically added by the program. This is separated from
/// `backends` because we do not want one session to effect the backends
/// available to any other session.
dynamic_backends: Backends,
/// The TLS configuration for this execution.
///
/// Populated prior to guest execution, and never modified.
Expand Down Expand Up @@ -119,6 +123,7 @@ impl Session {
log_endpoints: PrimaryMap::new(),
log_endpoints_by_name: HashMap::new(),
backends,
dynamic_backends: Backends::default(),
tls_config,
dictionaries,
dictionaries_by_name: PrimaryMap::new(),
Expand Down Expand Up @@ -517,12 +522,32 @@ impl Session {

/// Look up a backend by name.
pub fn backend(&self, name: &str) -> Option<&Backend> {
self.backends.get(name).map(std::ops::Deref::deref)
}
// it doesn't actually matter what order we do this search, because
// the namespaces should be unique.
self.backends
.get(name)
.or_else(|| self.dynamic_backends.get(name))
.map(std::ops::Deref::deref)
}

/// Return the full list of static and dynamic backend names as an [`Iterator`].
pub fn backend_names(&self) -> impl Iterator<Item = &String> {
self.backends.keys().chain(self.dynamic_backends.keys())
}

/// Try to add a backend with the given name prefix to our set of current backends.
/// Upon success, return true. If the name already exists somewhere, return false;
/// the caller should signal an appropriate error.
pub fn add_backend(&mut self, name: &str, info: Backend) -> bool {
// if this name already exists, either as a built in or dynamic backend, say no
if self.backends.contains_key(name) || self.dynamic_backends.contains_key(name) {
return false;
}

self.dynamic_backends
.insert(name.to_string(), Arc::new(info));

/// Access the backend map.
pub fn backends(&self) -> &Arc<Backends> {
&self.backends
true
}

// ----- TLS config -----
Expand Down
4 changes: 2 additions & 2 deletions lib/src/wiggle_abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ impl UserErrorConversion for Session {
match e {
Error::UnknownBackend(ref backend) => {
let config_path = &self.config_path();
let backends_buffer = itertools::join(self.backends().keys(), ",");
let backends_len = self.backends().len();
let backends_buffer = itertools::join(self.backend_names(), ",");
let backends_len = self.backend_names().count();

match (backends_len, (**config_path).as_ref()) {
(_, None) => event!(
Expand Down
Loading