Skip to content

Commit

Permalink
Merge pull request #163 from fastly/awick/dynamic-backends
Browse files Browse the repository at this point in the history
Add support for dynamic backends.
  • Loading branch information
acw committed Jul 18, 2022
2 parents 975fdc1 + 4a8061d commit 1f950a7
Show file tree
Hide file tree
Showing 14 changed files with 433 additions and 65 deletions.
6 changes: 5 additions & 1 deletion cli/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ mod opts_tests {
kind: ErrorKind::ValueValidation,
message,
..
}) if message.contains("invalid socket address syntax") => Ok(()),
}) if message.contains("invalid socket address syntax")
| message.contains("invalid IP address syntax") =>
{
Ok(())
}
res => panic!("unexpected result: {:?}", res),
}
}
Expand Down
64 changes: 47 additions & 17 deletions cli/tests/integration/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,22 @@ impl Test {
}
}

/// Pass the given request through this test.
/// Pass the given requests through this test, returning the associated responses.
///
/// A `Test` can be used repeatedly against different requests. Note, however, that
/// a fresh execution context is set up each time.
pub async fn against(&self, req: Request<impl Into<HyperBody>>) -> Response<Body> {
/// A `Test` can be used repeatedly against different requests, either individually
/// (as with `against()`) or in batches (as with `against_many()`). The difference
/// between calling this function with many requests, rather than calling `against()`
/// multiple times, is that the requests shared in an `against_many()` call will share
/// the same execution context. This can be useful when validating interactions across
/// shared state in the context.
///
/// Subsequent calls to `against_many()` (or `against()`) will use a fresh context.
pub async fn against_many(
&self,
mut reqs: Vec<Request<impl Into<HyperBody>>>,
) -> Vec<Response<Body>> {
let _test_lock_guard = TEST_LOCK.lock().await;
let mut responses = Vec::with_capacity(reqs.len());

// Install a tracing subscriber. We use a human-readable event formatter in tests, using a
// writer that supports input capturing for `cargo test`. This subscribes to all events in
Expand All @@ -178,7 +188,7 @@ impl Test {
// spawn any mock hosts, keeping a handle on each host task for clean termination.
let host_handles: Vec<_> = self.hosts.iter().map(HostSpec::spawn).collect();

let resp = if self.via_hyper {
if self.via_hyper {
let svc = ViceroyService::new(ctx);
// We are going to host the service at port 7878, and so it's vital to make sure
// that we shut down the service after our test request, so that if there are
Expand All @@ -197,28 +207,48 @@ impl Test {
.expect("receiver error while shutting down hyper server")
}),
);
// Pass the request to the server via a Hyper client on the _current_ task:
let resp = hyper::Client::new()
.request(req.map(Into::into))
.await
.expect("hyper client error making test request");
// We're done with this test request, so shut down the server.

for req in reqs.drain(..) {
// Pass the request to the server via a Hyper client on the _current_ task:
let resp = hyper::Client::new()
.request(req.map(Into::into))
.await
.expect("hyper client error making test request");
responses.push(resp.map(Into::into));
}

// We're done with these test requests, so shut down the server.
tx.send(())
.expect("sender error while shutting down hyper server");
// Reap the task handle to ensure that the server did indeed shut down.
let _ = server_handle.await.expect("hyper server yielded an error");
resp.map(Into::into)
} else {
ctx.handle_request(req.map(Into::into), addr.ip())
.await
.expect("failed to handle the request")
};
for req in reqs.drain(..) {
let resp = ctx
.clone()
.handle_request(req.map(Into::into), addr.ip())
.await
.expect("failed to handle the request");
responses.push(resp);
}
}

for host in host_handles {
host.shutdown().await;
}

resp
responses
}

/// Pass the given request through this test.
///
/// A `Test` can be used repeatedly against different requests. Note, however, that
/// a fresh execution context is set up each time.
pub async fn against(&self, req: Request<impl Into<HyperBody>>) -> Response<Body> {
self.against_many(vec![req])
.await
.pop()
.expect("singleton back from against_many")
}

/// Pass an empty `GET 127.0.0.1:7878` request through this test.
Expand Down
1 change: 1 addition & 0 deletions cli/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ mod sending_response;
mod sleep;
mod upstream;
mod upstream_async;
mod upstream_dynamic;
mod upstream_streaming;
128 changes: 128 additions & 0 deletions cli/tests/integration/upstream_dynamic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use {
crate::common::{Test, TestResult},
hyper::{
header::{self, HeaderValue},
Request, Response, StatusCode,
},
};

#[tokio::test(flavor = "multi_thread")]
async fn upstream_sync() -> TestResult {
////////////////////////////////////////////////////////////////////////////////////
// Setup
////////////////////////////////////////////////////////////////////////////////////

// Set up the test harness
let test = Test::using_fixture("upstream-dynamic.wasm")
.backend("origin", "http://127.0.0.1:9000/", None)
// The "origin" backend simply echos the request body
.host(9000, |req| {
let body = req.into_body();
Response::new(body)
});

////////////////////////////////////////////////////////////////////////////////////
// A simple round-trip echo test to "origin", but with a dynamic backend
////////////////////////////////////////////////////////////////////////////////////

let resp = test
.against(
Request::post("http://localhost/")
.header("Dynamic-Backend", "127.0.0.1:9000")
.body("Hello, Viceroy!")
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.into_body().read_into_string().await?,
"Hello, Viceroy!"
);

////////////////////////////////////////////////////////////////////////////////////
// Test that you can still use standard backends without a problem
////////////////////////////////////////////////////////////////////////////////////

let resp = test
.against(
Request::post("http://localhost/")
.header("Static-Backend", "origin")
.body("Hello, Viceroy!")
.unwrap(),
)
.await;
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.into_body().read_into_string().await?,
"Hello, Viceroy!"
);

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn override_host_works() -> TestResult {
// Set up the test harness
let test = Test::using_fixture("upstream-dynamic.wasm")
.backend(
"override-host",
"http://127.0.0.1:9000/",
None, // Some("otherhost.com"),
)
.host(9000, |req| {
assert_eq!(
req.headers().get(header::HOST),
Some(&HeaderValue::from_static("otherhost.com"))
);
Response::new(vec![])
});

let resp = test
.via_hyper()
.against(
Request::get("http://localhost:7878/override")
.header("Dynamic-Backend", "127.0.0.1:9000")
.header("With-Override", "otherhost.com")
.body("")
.unwrap(),
)
.await;

assert_eq!(resp.status(), StatusCode::OK);

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn duplication_errors_right() -> TestResult {
// Set up the test harness
let test = Test::using_fixture("upstream-dynamic.wasm")
.backend("static", "http://127.0.0.1:9000/", None)
.host(9000, |_| Response::new(vec![]));

let resp = test
.against(
Request::get("http://localhost:7878/override")
.header("Dynamic-Backend", "127.0.0.1:9000")
.header("Supplementary-Backend", "dynamic-backend")
.body("")
.unwrap(),
)
.await;

assert_eq!(resp.status(), StatusCode::CONFLICT);

let resp = test
.against(
Request::get("http://localhost:7878/override")
.header("Dynamic-Backend", "127.0.0.1:9000")
.header("Supplementary-Backend", "static")
.body("")
.unwrap(),
)
.await;

assert_eq!(resp.status(), StatusCode::CONFLICT);

Ok(())
}
9 changes: 9 additions & 0 deletions lib/compute-at-edge-abi/compute-at-edge.witx
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@
(param $mode $framing_headers_mode)
(result $err (expected (error $fastly_status)))
)

;;; Create a backend for later use
(@interface func (export "register_dynamic_backend")
(param $name_prefix string)
(param $target string)
(param $backend_config_mask $backend_config_options)
(param $backend_configuration (@witx pointer $dynamic_backend_config))
(result $err (expected (error $fastly_status)))
)
)

(module $fastly_http_resp
Expand Down
41 changes: 41 additions & 0 deletions lib/compute-at-edge-abi/typenames.witx
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,44 @@
(enum (@witx tag u32)
$automatic
$manually_from_headers))

(typename $tls_version
(flags (@witx repr u32)
$tls_1
$tls_1_1
$tls_1_2
$tls_1_3))

(typename $backend_config_options
(flags (@witx repr u32)
$reserved
$host_override
$connect_timeout
$first_byte_timeout
$between_bytes_timeout
$use_ssl
$ssl_min_version
$ssl_max_version
$cert_hostname
$ca_cert
$ciphers
$sni_hostname))

(typename $dynamic_backend_config
(record
(field $host_override (@witx pointer (@witx char8)))
(field $host_override_len u32)
(field $connect_timeout_ms u32)
(field $first_byte_timeout_ms u32)
(field $between_bytes_timeout_ms u32)
(field $ssl_min_version $tls_version)
(field $ssl_max_version $tls_version)
(field $cert_hostname (@witx pointer (@witx char8)))
(field $cert_hostname_len u32)
(field $ca_cert (@witx pointer (@witx char8)))
(field $ca_cert_len u32)
(field $ciphers (@witx pointer (@witx char8)))
(field $ciphers_len u32)
(field $sni_hostname (@witx pointer (@witx char8)))
(field $sni_hostname_len u32)
))
10 changes: 9 additions & 1 deletion lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ pub enum Error {

#[error("Could not load native certificates: {0}")]
BadCerts(std::io::Error),

#[error("Could not generate new backend name from '{0}'")]
BackendNameRegistryError(String),

#[error(transparent)]
HttpError(#[from] http::Error),
}

impl Error {
Expand Down Expand Up @@ -141,7 +147,9 @@ impl Error {
| Error::Other(_)
| Error::StreamingChunkSend
| Error::UnknownBackend(_)
| Error::Utf8Expected(_) => FastlyStatus::Error,
| Error::Utf8Expected(_)
| Error::BackendNameRegistryError(_)
| Error::HttpError(_) => FastlyStatus::Error,
}
}

Expand Down
35 changes: 30 additions & 5 deletions lib/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub struct Session {
///
/// Populated prior to guest execution, and never modified.
backends: Arc<Backends>,
/// The backends dynamically added by the program. This is separated from
/// `backends` because we do not want one session to effect the backends
/// available to any other session.
dynamic_backends: Backends,
/// The TLS configuration for this execution.
///
/// Populated prior to guest execution, and never modified.
Expand Down Expand Up @@ -119,6 +123,7 @@ impl Session {
log_endpoints: PrimaryMap::new(),
log_endpoints_by_name: HashMap::new(),
backends,
dynamic_backends: Backends::default(),
tls_config,
dictionaries,
dictionaries_by_name: PrimaryMap::new(),
Expand Down Expand Up @@ -517,12 +522,32 @@ impl Session {

/// Look up a backend by name.
pub fn backend(&self, name: &str) -> Option<&Backend> {
self.backends.get(name).map(std::ops::Deref::deref)
}
// it doesn't actually matter what order we do this search, because
// the namespaces should be unique.
self.backends
.get(name)
.or_else(|| self.dynamic_backends.get(name))
.map(std::ops::Deref::deref)
}

/// Return the full list of static and dynamic backend names as an [`Iterator`].
pub fn backend_names(&self) -> impl Iterator<Item = &String> {
self.backends.keys().chain(self.dynamic_backends.keys())
}

/// Try to add a backend with the given name prefix to our set of current backends.
/// Upon success, return true. If the name already exists somewhere, return false;
/// the caller should signal an appropriate error.
pub fn add_backend(&mut self, name: &str, info: Backend) -> bool {
// if this name already exists, either as a built in or dynamic backend, say no
if self.backends.contains_key(name) || self.dynamic_backends.contains_key(name) {
return false;
}

self.dynamic_backends
.insert(name.to_string(), Arc::new(info));

/// Access the backend map.
pub fn backends(&self) -> &Arc<Backends> {
&self.backends
true
}

// ----- TLS config -----
Expand Down
4 changes: 2 additions & 2 deletions lib/src/wiggle_abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ impl UserErrorConversion for Session {
match e {
Error::UnknownBackend(ref backend) => {
let config_path = &self.config_path();
let backends_buffer = itertools::join(self.backends().keys(), ",");
let backends_len = self.backends().len();
let backends_buffer = itertools::join(self.backend_names(), ",");
let backends_len = self.backend_names().count();

match (backends_len, (**config_path).as_ref()) {
(_, None) => event!(
Expand Down
Loading

0 comments on commit 1f950a7

Please sign in to comment.