From 046d3804dabc352373a7a3ce92d558914f84b2ec Mon Sep 17 00:00:00 2001 From: David Calavera Date: Mon, 29 Nov 2021 11:19:04 -0800 Subject: [PATCH 01/12] Create new Runtime Client crate Extract the HTTP client logic into a new crate. This allows other crates to share the same logic to interact with the Runtime API. Signed-off-by: David Calavera --- Cargo.toml | 3 +- lambda-runtime-client/Cargo.toml | 22 ++ lambda-runtime-client/src/lib.rs | 112 ++++++++++ lambda-runtime/Cargo.toml | 1 + lambda-runtime/src/client.rs | 308 --------------------------- lambda-runtime/src/lib.rs | 348 +++++++++++++++++++++++-------- lambda-runtime/src/requests.rs | 18 +- 7 files changed, 406 insertions(+), 406 deletions(-) create mode 100644 lambda-runtime-client/Cargo.toml create mode 100644 lambda-runtime-client/src/lib.rs delete mode 100644 lambda-runtime/src/client.rs diff --git a/Cargo.toml b/Cargo.toml index 291345b5..660d9458 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ "lambda-http", - "lambda-runtime" + "lambda-runtime-client", + "lambda-runtime", ] \ No newline at end of file diff --git a/lambda-runtime-client/Cargo.toml b/lambda-runtime-client/Cargo.toml new file mode 100644 index 00000000..33bc6566 --- /dev/null +++ b/lambda-runtime-client/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "lambda_runtime_client" +version = "0.1.0" +edition = "2021" +authors = ["David Calavera "] +description = "AWS Lambda Runtime interaction API" +license = "Apache-2.0" +repository = "https://github.com/awslabs/aws-lambda-rust-runtime" +categories = ["web-programming::http-server"] +keywords = ["AWS", "Lambda", "API"] +readme = "../README.md" + +[dependencies] +http = "0.2" +hyper = { version = "0.14", features = ["http1", "client", "server", "stream", "runtime"] } +tower-service = "0.3" +tokio = { version = "1.0", features = ["io-util"] } + +[dev-dependencies] +serde_json = "^1" +async-stream = "0.3" +tokio-stream = "0.1.2" \ No newline at end of file diff --git a/lambda-runtime-client/src/lib.rs b/lambda-runtime-client/src/lib.rs new file mode 100644 index 00000000..989b2eeb --- /dev/null +++ b/lambda-runtime-client/src/lib.rs @@ -0,0 +1,112 @@ +use http::{uri::Scheme, Request, Response, Uri}; +use hyper::client::{connect::Connection, HttpConnector}; +use hyper::Body; +use std::fmt::Debug; +use tokio::io::{AsyncRead, AsyncWrite}; +use tower_service::Service; + +const USER_AGENT_HEADER: &str = "User-Agent"; +const USER_AGENT: &str = concat!("aws-lambda-rust/", env!("CARGO_PKG_VERSION")); + +/// Error type that lambdas may result in +pub type Error = Box; + +#[derive(Debug)] +pub struct Client { + pub base: Uri, + pub client: hyper::Client, +} + +impl Client { + pub fn builder() -> ClientBuilder { + ClientBuilder { + connector: HttpConnector::new(), + uri: None, + } + } +} + +impl Client +where + C: hyper::client::connect::Connect + Sync + Send + Clone + 'static, +{ + pub async fn call(&self, req: Request) -> Result, Error> { + let req = self.set_origin(req)?; + let response = self.client.request(req).await?; + Ok(response) + } + + pub fn with(base: Uri, connector: C) -> Self { + let client = hyper::Client::builder().build(connector); + Self { base, client } + } + + fn set_origin(&self, req: Request) -> Result, Error> { + let (mut parts, body) = req.into_parts(); + let (scheme, authority) = { + let scheme = self.base.scheme().unwrap_or(&Scheme::HTTP); + let authority = self.base.authority().expect("Authority not found"); + (scheme, authority) + }; + let path = parts.uri.path_and_query().expect("PathAndQuery not found"); + + let uri = Uri::builder() + .scheme(scheme.clone()) + .authority(authority.clone()) + .path_and_query(path.clone()) + .build(); + + match uri { + Ok(u) => { + parts.uri = u; + Ok(Request::from_parts(parts, body)) + } + Err(e) => Err(Box::new(e)), + } + } +} + +pub struct ClientBuilder = hyper::client::HttpConnector> { + connector: C, + uri: Option, +} + +impl ClientBuilder +where + C: Service + Clone + Send + Sync + Unpin + 'static, + >::Future: Unpin + Send, + >::Error: Into>, + >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, +{ + pub fn with_connector(self, connector: C2) -> ClientBuilder + where + C2: Service + Clone + Send + Sync + Unpin + 'static, + >::Future: Unpin + Send, + >::Error: Into>, + >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + { + ClientBuilder { + connector, + uri: self.uri, + } + } + + pub fn with_endpoint(self, uri: http::Uri) -> Self { + Self { uri: Some(uri), ..self } + } + + pub fn build(self) -> Result, Error> { + let uri = match self.uri { + Some(uri) => uri, + None => { + let uri = std::env::var("AWS_LAMBDA_RUNTIME_API").expect("Missing AWS_LAMBDA_RUNTIME_API env var"); + uri.clone().try_into().expect("Unable to convert to URL") + } + }; + Ok(Client::with(uri, self.connector)) + } +} + +pub fn build_request() -> http::request::Builder { + http::Request::builder().header(USER_AGENT_HEADER, USER_AGENT) +} diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index 4d0de675..e6939144 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -27,6 +27,7 @@ tracing-error = "0.2" tracing = { version = "0.1", features = ["log"] } tower-service = "0.3" tokio-stream = "0.1.2" +lambda_runtime_client = { version = "*", path = "../lambda-runtime-client" } [dev-dependencies] tracing-subscriber = "0.3" diff --git a/lambda-runtime/src/client.rs b/lambda-runtime/src/client.rs deleted file mode 100644 index 5e39e300..00000000 --- a/lambda-runtime/src/client.rs +++ /dev/null @@ -1,308 +0,0 @@ -use crate::Error; -use http::{uri::Scheme, Request, Response, Uri}; -use hyper::{client::HttpConnector, Body}; -use std::fmt::Debug; - -#[derive(Debug)] -pub(crate) struct Client { - pub(crate) base: Uri, - pub(crate) client: hyper::Client, -} - -impl Client -where - C: hyper::client::connect::Connect + Sync + Send + Clone + 'static, -{ - pub fn with(base: Uri, connector: C) -> Self { - let client = hyper::Client::builder().build(connector); - Self { base, client } - } - - fn set_origin(&self, req: Request) -> Result, Error> { - let (mut parts, body) = req.into_parts(); - let (scheme, authority) = { - let scheme = self.base.scheme().unwrap_or(&Scheme::HTTP); - let authority = self.base.authority().expect("Authority not found"); - (scheme, authority) - }; - let path = parts.uri.path_and_query().expect("PathAndQuery not found"); - - let uri = Uri::builder() - .scheme(scheme.clone()) - .authority(authority.clone()) - .path_and_query(path.clone()) - .build(); - - match uri { - Ok(u) => { - parts.uri = u; - Ok(Request::from_parts(parts, body)) - } - Err(e) => Err(Box::new(e)), - } - } - - pub(crate) async fn call(&self, req: Request) -> Result, Error> { - let req = self.set_origin(req)?; - let response = self.client.request(req).await?; - Ok(response) - } -} - -#[cfg(test)] -mod endpoint_tests { - use crate::{ - client::Client, - incoming, - requests::{ - EventCompletionRequest, EventErrorRequest, IntoRequest, IntoResponse, NextEventRequest, NextEventResponse, - }, - simulated, - types::Diagnostic, - Error, Runtime, - }; - use http::{uri::PathAndQuery, HeaderValue, Method, Request, Response, StatusCode, Uri}; - use hyper::{server::conn::Http, service::service_fn, Body}; - use serde_json::json; - use simulated::DuplexStreamWrapper; - use std::{convert::TryFrom, env}; - use tokio::{ - io::{self, AsyncRead, AsyncWrite}, - select, - sync::{self, oneshot}, - }; - use tokio_stream::StreamExt; - - #[cfg(test)] - async fn next_event(req: &Request) -> Result, Error> { - let path = "/2018-06-01/runtime/invocation/next"; - assert_eq!(req.method(), Method::GET); - assert_eq!(req.uri().path_and_query().unwrap(), &PathAndQuery::from_static(path)); - let body = json!({"message": "hello"}); - - let rsp = NextEventResponse { - request_id: "8476a536-e9f4-11e8-9739-2dfe598c3fcd", - deadline: 1_542_409_706_888, - arn: "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime", - trace_id: "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419", - body: serde_json::to_vec(&body)?, - }; - rsp.into_rsp() - } - - #[cfg(test)] - async fn complete_event(req: &Request, id: &str) -> Result, Error> { - assert_eq!(Method::POST, req.method()); - let rsp = Response::builder() - .status(StatusCode::ACCEPTED) - .body(Body::empty()) - .expect("Unable to construct response"); - - let expected = format!("/2018-06-01/runtime/invocation/{}/response", id); - assert_eq!(expected, req.uri().path()); - - Ok(rsp) - } - - #[cfg(test)] - async fn event_err(req: &Request, id: &str) -> Result, Error> { - let expected = format!("/2018-06-01/runtime/invocation/{}/error", id); - assert_eq!(expected, req.uri().path()); - - assert_eq!(req.method(), Method::POST); - let header = "lambda-runtime-function-error-type"; - let expected = "unhandled"; - assert_eq!(req.headers()[header], HeaderValue::try_from(expected)?); - - let rsp = Response::builder().status(StatusCode::ACCEPTED).body(Body::empty())?; - Ok(rsp) - } - - #[cfg(test)] - async fn handle_incoming(req: Request) -> Result, Error> { - let path: Vec<&str> = req - .uri() - .path_and_query() - .expect("PathAndQuery not found") - .as_str() - .split('/') - .collect::>(); - match path[1..] { - ["2018-06-01", "runtime", "invocation", "next"] => next_event(&req).await, - ["2018-06-01", "runtime", "invocation", id, "response"] => complete_event(&req, id).await, - ["2018-06-01", "runtime", "invocation", id, "error"] => event_err(&req, id).await, - ["2018-06-01", "runtime", "init", "error"] => unimplemented!(), - _ => unimplemented!(), - } - } - - #[cfg(test)] - async fn handle(io: I, rx: oneshot::Receiver<()>) -> Result<(), hyper::Error> - where - I: AsyncRead + AsyncWrite + Unpin + 'static, - { - let conn = Http::new().serve_connection(io, service_fn(handle_incoming)); - select! { - _ = rx => { - Ok(()) - } - res = conn => { - match res { - Ok(()) => Ok(()), - Err(e) => { - Err(e) - } - } - } - } - } - - #[tokio::test] - async fn test_next_event() -> Result<(), Error> { - let base = Uri::from_static("http://localhost:9001"); - let (client, server) = io::duplex(64); - - let (tx, rx) = sync::oneshot::channel(); - let server = tokio::spawn(async { - handle(server, rx).await.expect("Unable to handle request"); - }); - - let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; - let client = Client::with(base, conn); - - let req = NextEventRequest.into_req()?; - let rsp = client.call(req).await.expect("Unable to send request"); - - assert_eq!(rsp.status(), StatusCode::OK); - let header = "lambda-runtime-deadline-ms"; - assert_eq!(rsp.headers()[header], &HeaderValue::try_from("1542409706888")?); - - // shutdown server... - tx.send(()).expect("Receiver has been dropped"); - match server.await { - Ok(_) => Ok(()), - Err(e) if e.is_panic() => Err::<(), Error>(e.into()), - Err(_) => unreachable!("This branch shouldn't be reachable"), - } - } - - #[tokio::test] - async fn test_ok_response() -> Result<(), Error> { - let (client, server) = io::duplex(64); - let (tx, rx) = sync::oneshot::channel(); - let base = Uri::from_static("http://localhost:9001"); - - let server = tokio::spawn(async { - handle(server, rx).await.expect("Unable to handle request"); - }); - - let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; - let client = Client::with(base, conn); - - let req = EventCompletionRequest { - request_id: "156cb537-e2d4-11e8-9b34-d36013741fb9", - body: "done", - }; - let req = req.into_req()?; - - let rsp = client.call(req).await?; - assert_eq!(rsp.status(), StatusCode::ACCEPTED); - - // shutdown server - tx.send(()).expect("Receiver has been dropped"); - match server.await { - Ok(_) => Ok(()), - Err(e) if e.is_panic() => Err::<(), Error>(e.into()), - Err(_) => unreachable!("This branch shouldn't be reachable"), - } - } - - #[tokio::test] - async fn test_error_response() -> Result<(), Error> { - let (client, server) = io::duplex(200); - let (tx, rx) = sync::oneshot::channel(); - let base = Uri::from_static("http://localhost:9001"); - - let server = tokio::spawn(async { - handle(server, rx).await.expect("Unable to handle request"); - }); - - let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; - let client = Client::with(base, conn); - - let req = EventErrorRequest { - request_id: "156cb537-e2d4-11e8-9b34-d36013741fb9", - diagnostic: Diagnostic { - error_type: "InvalidEventDataError".to_string(), - error_message: "Error parsing event data".to_string(), - }, - }; - let req = req.into_req()?; - let rsp = client.call(req).await?; - assert_eq!(rsp.status(), StatusCode::ACCEPTED); - - // shutdown server - tx.send(()).expect("Receiver has been dropped"); - match server.await { - Ok(_) => Ok(()), - Err(e) if e.is_panic() => Err::<(), Error>(e.into()), - Err(_) => unreachable!("This branch shouldn't be reachable"), - } - } - - #[tokio::test] - async fn successful_end_to_end_run() -> Result<(), Error> { - let (client, server) = io::duplex(64); - let (tx, rx) = sync::oneshot::channel(); - let base = Uri::from_static("http://localhost:9001"); - - let server = tokio::spawn(async { - handle(server, rx).await.expect("Unable to handle request"); - }); - let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; - - let runtime = Runtime::builder() - .with_endpoint(base) - .with_connector(conn) - .build() - .expect("Unable to build runtime"); - - async fn func(event: serde_json::Value, _: crate::Context) -> Result { - Ok(event) - } - let f = crate::handler_fn(func); - - // set env vars needed to init Config if they are not already set in the environment - if env::var("AWS_LAMBDA_RUNTIME_API").is_err() { - env::set_var("AWS_LAMBDA_RUNTIME_API", "http://localhost:9001"); - } - if env::var("AWS_LAMBDA_FUNCTION_NAME").is_err() { - env::set_var("AWS_LAMBDA_FUNCTION_NAME", "test_fn"); - } - if env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE").is_err() { - env::set_var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "128"); - } - if env::var("AWS_LAMBDA_FUNCTION_VERSION").is_err() { - env::set_var("AWS_LAMBDA_FUNCTION_VERSION", "1"); - } - if env::var("AWS_LAMBDA_LOG_STREAM_NAME").is_err() { - env::set_var("AWS_LAMBDA_LOG_STREAM_NAME", "test_stream"); - } - if env::var("AWS_LAMBDA_LOG_GROUP_NAME").is_err() { - env::set_var("AWS_LAMBDA_LOG_GROUP_NAME", "test_log"); - } - let config = crate::Config::from_env().expect("Failed to read env vars"); - - let client = &runtime.client; - let incoming = incoming(client).take(1); - runtime.run(incoming, f, &config).await?; - - // shutdown server - tx.send(()).expect("Receiver has been dropped"); - match server.await { - Ok(_) => Ok(()), - Err(e) if e.is_panic() => Err::<(), Error>(e.into()), - Err(_) => unreachable!("This branch shouldn't be reachable"), - } - } -} diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index 30220e45..61de9baf 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -6,21 +6,15 @@ //! Create a type that conforms to the [`Handler`] trait. This type can then be passed //! to the the `lambda_runtime::run` function, which launches and runs the Lambda runtime. pub use crate::types::Context; -use client::Client; use hyper::client::{connect::Connection, HttpConnector}; +use lambda_runtime_client::Client; use serde::{Deserialize, Serialize}; -use std::{ - convert::{TryFrom, TryInto}, - env, fmt, - future::Future, - panic, -}; +use std::{convert::TryFrom, env, fmt, future::Future, panic}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::{Stream, StreamExt}; use tower_service::Service; use tracing::{error, trace}; -mod client; mod requests; #[cfg(test)] mod simulated; @@ -30,14 +24,11 @@ mod types; use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest}; use types::Diagnostic; -/// Error type that lambdas may result in -pub type Error = Box; +pub type Error = lambda_runtime_client::Error; /// Configuration derived from environment variables. #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] pub struct Config { - /// The host and port of the [runtime API](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html). - pub endpoint: String, /// The name of the function. pub function_name: String, /// The amount of memory available to the function in MB. @@ -54,7 +45,6 @@ impl Config { /// Attempts to read configuration from environment variables. pub fn from_env() -> Result { let conf = Config { - endpoint: env::var("AWS_LAMBDA_RUNTIME_API").expect("Missing AWS_LAMBDA_RUNTIME_API env var"), function_name: env::var("AWS_LAMBDA_FUNCTION_NAME").expect("Missing AWS_LAMBDA_FUNCTION_NAME env var"), memory: env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") .expect("Missing AWS_LAMBDA_FUNCTION_MEMORY_SIZE env var") @@ -106,25 +96,10 @@ where } } -#[non_exhaustive] -#[derive(Debug, PartialEq)] -enum BuilderError { - UnsetUri, -} - struct Runtime = HttpConnector> { client: Client, } -impl Runtime { - pub fn builder() -> RuntimeBuilder { - RuntimeBuilder { - connector: HttpConnector::new(), - uri: None, - } - } -} - impl Runtime where C: Service + Clone + Send + Sync + Unpin + 'static, @@ -209,56 +184,6 @@ where } } -struct RuntimeBuilder = hyper::client::HttpConnector> { - connector: C, - uri: Option, -} - -impl RuntimeBuilder -where - C: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, -{ - pub fn with_connector(self, connector: C2) -> RuntimeBuilder - where - C2: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, - { - RuntimeBuilder { - connector, - uri: self.uri, - } - } - - pub fn with_endpoint(self, uri: http::Uri) -> Self { - Self { uri: Some(uri), ..self } - } - - pub fn build(self) -> Result, BuilderError> { - let uri = match self.uri { - Some(uri) => uri, - None => return Err(BuilderError::UnsetUri), - }; - let client = Client::with(uri, self.connector); - - Ok(Runtime { client }) - } -} - -#[test] -fn test_builder() { - let runtime = Runtime::builder() - .with_connector(HttpConnector::new()) - .with_endpoint(http::Uri::from_static("http://nomatter.com")) - .build(); - - runtime.unwrap(); -} - fn incoming(client: &Client) -> impl Stream, Error>> + Send + '_ where C: Service + Clone + Send + Sync + Unpin + 'static, @@ -305,12 +230,8 @@ where { trace!("Loading config from env"); let config = Config::from_env()?; - let uri = config.endpoint.clone().try_into().expect("Unable to convert to URL"); - let runtime = Runtime::builder() - .with_connector(HttpConnector::new()) - .with_endpoint(uri) - .build() - .expect("Unable to create a runtime"); + let client = Client::builder().build().expect("Unable to create a runtime client"); + let runtime = Runtime { client }; let client = &runtime.client; let incoming = incoming(client); @@ -320,3 +241,262 @@ where fn type_name_of_val(_: T) -> &'static str { std::any::type_name::() } + +#[cfg(test)] +mod endpoint_tests { + use lambda_runtime_client::Client; + use crate::{ + incoming, + requests::{ + EventCompletionRequest, EventErrorRequest, IntoRequest, IntoResponse, NextEventRequest, NextEventResponse, + }, + simulated, + types::Diagnostic, + Error, Runtime, + }; + use http::{uri::PathAndQuery, HeaderValue, Method, Request, Response, StatusCode, Uri}; + use hyper::{server::conn::Http, service::service_fn, Body}; + use serde_json::json; + use simulated::DuplexStreamWrapper; + use std::{convert::TryFrom, env}; + use tokio::{ + io::{self, AsyncRead, AsyncWrite}, + select, + sync::{self, oneshot}, + }; + use tokio_stream::StreamExt; + + #[cfg(test)] + async fn next_event(req: &Request) -> Result, Error> { + let path = "/2018-06-01/runtime/invocation/next"; + assert_eq!(req.method(), Method::GET); + assert_eq!(req.uri().path_and_query().unwrap(), &PathAndQuery::from_static(path)); + let body = json!({"message": "hello"}); + + let rsp = NextEventResponse { + request_id: "8476a536-e9f4-11e8-9739-2dfe598c3fcd", + deadline: 1_542_409_706_888, + arn: "arn:aws:lambda:us-east-2:123456789012:function:custom-runtime", + trace_id: "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419", + body: serde_json::to_vec(&body)?, + }; + rsp.into_rsp() + } + + #[cfg(test)] + async fn complete_event(req: &Request, id: &str) -> Result, Error> { + assert_eq!(Method::POST, req.method()); + let rsp = Response::builder() + .status(StatusCode::ACCEPTED) + .body(Body::empty()) + .expect("Unable to construct response"); + + let expected = format!("/2018-06-01/runtime/invocation/{}/response", id); + assert_eq!(expected, req.uri().path()); + + Ok(rsp) + } + + #[cfg(test)] + async fn event_err(req: &Request, id: &str) -> Result, Error> { + let expected = format!("/2018-06-01/runtime/invocation/{}/error", id); + assert_eq!(expected, req.uri().path()); + + assert_eq!(req.method(), Method::POST); + let header = "lambda-runtime-function-error-type"; + let expected = "unhandled"; + assert_eq!(req.headers()[header], HeaderValue::try_from(expected)?); + + let rsp = Response::builder().status(StatusCode::ACCEPTED).body(Body::empty())?; + Ok(rsp) + } + + #[cfg(test)] + async fn handle_incoming(req: Request) -> Result, Error> { + let path: Vec<&str> = req + .uri() + .path_and_query() + .expect("PathAndQuery not found") + .as_str() + .split('/') + .collect::>(); + match path[1..] { + ["2018-06-01", "runtime", "invocation", "next"] => next_event(&req).await, + ["2018-06-01", "runtime", "invocation", id, "response"] => complete_event(&req, id).await, + ["2018-06-01", "runtime", "invocation", id, "error"] => event_err(&req, id).await, + ["2018-06-01", "runtime", "init", "error"] => unimplemented!(), + _ => unimplemented!(), + } + } + + #[cfg(test)] + async fn handle(io: I, rx: oneshot::Receiver<()>) -> Result<(), hyper::Error> + where + I: AsyncRead + AsyncWrite + Unpin + 'static, + { + let conn = Http::new().serve_connection(io, service_fn(handle_incoming)); + select! { + _ = rx => { + Ok(()) + } + res = conn => { + match res { + Ok(()) => Ok(()), + Err(e) => { + Err(e) + } + } + } + } + } + + #[tokio::test] + async fn test_next_event() -> Result<(), Error> { + let base = Uri::from_static("http://localhost:9001"); + let (client, server) = io::duplex(64); + + let (tx, rx) = sync::oneshot::channel(); + let server = tokio::spawn(async { + handle(server, rx).await.expect("Unable to handle request"); + }); + + let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; + let client = Client::with(base, conn); + + let req = NextEventRequest.into_req()?; + let rsp = client.call(req).await.expect("Unable to send request"); + + assert_eq!(rsp.status(), StatusCode::OK); + let header = "lambda-runtime-deadline-ms"; + assert_eq!(rsp.headers()[header], &HeaderValue::try_from("1542409706888")?); + + // shutdown server... + tx.send(()).expect("Receiver has been dropped"); + match server.await { + Ok(_) => Ok(()), + Err(e) if e.is_panic() => Err::<(), Error>(e.into()), + Err(_) => unreachable!("This branch shouldn't be reachable"), + } + } + + #[tokio::test] + async fn test_ok_response() -> Result<(), Error> { + let (client, server) = io::duplex(64); + let (tx, rx) = sync::oneshot::channel(); + let base = Uri::from_static("http://localhost:9001"); + + let server = tokio::spawn(async { + handle(server, rx).await.expect("Unable to handle request"); + }); + + let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; + let client = Client::with(base, conn); + + let req = EventCompletionRequest { + request_id: "156cb537-e2d4-11e8-9b34-d36013741fb9", + body: "done", + }; + let req = req.into_req()?; + + let rsp = client.call(req).await?; + assert_eq!(rsp.status(), StatusCode::ACCEPTED); + + // shutdown server + tx.send(()).expect("Receiver has been dropped"); + match server.await { + Ok(_) => Ok(()), + Err(e) if e.is_panic() => Err::<(), Error>(e.into()), + Err(_) => unreachable!("This branch shouldn't be reachable"), + } + } + + #[tokio::test] + async fn test_error_response() -> Result<(), Error> { + let (client, server) = io::duplex(200); + let (tx, rx) = sync::oneshot::channel(); + let base = Uri::from_static("http://localhost:9001"); + + let server = tokio::spawn(async { + handle(server, rx).await.expect("Unable to handle request"); + }); + + let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; + let client = Client::with(base, conn); + + let req = EventErrorRequest { + request_id: "156cb537-e2d4-11e8-9b34-d36013741fb9", + diagnostic: Diagnostic { + error_type: "InvalidEventDataError".to_string(), + error_message: "Error parsing event data".to_string(), + }, + }; + let req = req.into_req()?; + let rsp = client.call(req).await?; + assert_eq!(rsp.status(), StatusCode::ACCEPTED); + + // shutdown server + tx.send(()).expect("Receiver has been dropped"); + match server.await { + Ok(_) => Ok(()), + Err(e) if e.is_panic() => Err::<(), Error>(e.into()), + Err(_) => unreachable!("This branch shouldn't be reachable"), + } + } + + #[tokio::test] + async fn successful_end_to_end_run() -> Result<(), Error> { + let (client, server) = io::duplex(64); + let (tx, rx) = sync::oneshot::channel(); + let base = Uri::from_static("http://localhost:9001"); + + let server = tokio::spawn(async { + handle(server, rx).await.expect("Unable to handle request"); + }); + let conn = simulated::Connector::with(base.clone(), DuplexStreamWrapper::new(client))?; + + let client = Client::builder() + .with_endpoint(base) + .with_connector(conn) + .build() + .expect("Unable to build client"); + + async fn func(event: serde_json::Value, _: crate::Context) -> Result { + Ok(event) + } + let f = crate::handler_fn(func); + + // set env vars needed to init Config if they are not already set in the environment + if env::var("AWS_LAMBDA_RUNTIME_API").is_err() { + env::set_var("AWS_LAMBDA_RUNTIME_API", "http://localhost:9001"); + } + if env::var("AWS_LAMBDA_FUNCTION_NAME").is_err() { + env::set_var("AWS_LAMBDA_FUNCTION_NAME", "test_fn"); + } + if env::var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE").is_err() { + env::set_var("AWS_LAMBDA_FUNCTION_MEMORY_SIZE", "128"); + } + if env::var("AWS_LAMBDA_FUNCTION_VERSION").is_err() { + env::set_var("AWS_LAMBDA_FUNCTION_VERSION", "1"); + } + if env::var("AWS_LAMBDA_LOG_STREAM_NAME").is_err() { + env::set_var("AWS_LAMBDA_LOG_STREAM_NAME", "test_stream"); + } + if env::var("AWS_LAMBDA_LOG_GROUP_NAME").is_err() { + env::set_var("AWS_LAMBDA_LOG_GROUP_NAME", "test_log"); + } + let config = crate::Config::from_env().expect("Failed to read env vars"); + + let runtime = Runtime { client }; + let client = &runtime.client; + let incoming = incoming(client).take(1); + runtime.run(incoming, f, &config).await?; + + // shutdown server + tx.send(()).expect("Receiver has been dropped"); + match server.await { + Ok(_) => Ok(()), + Err(e) if e.is_panic() => Err::<(), Error>(e.into()), + Err(_) => unreachable!("This branch shouldn't be reachable"), + } + } +} \ No newline at end of file diff --git a/lambda-runtime/src/requests.rs b/lambda-runtime/src/requests.rs index 8aa2edbe..dd08fc33 100644 --- a/lambda-runtime/src/requests.rs +++ b/lambda-runtime/src/requests.rs @@ -1,11 +1,10 @@ use crate::{types::Diagnostic, Error}; use http::{Method, Request, Response, Uri}; use hyper::Body; +use lambda_runtime_client::build_request; use serde::Serialize; use std::str::FromStr; -const USER_AGENT: &str = concat!("aws-lambda-rust/", env!("CARGO_PKG_VERSION")); - pub(crate) trait IntoRequest { fn into_req(self) -> Result, Error>; } @@ -20,9 +19,8 @@ pub(crate) struct NextEventRequest; impl IntoRequest for NextEventRequest { fn into_req(self) -> Result, Error> { - let req = Request::builder() + let req = build_request() .method(Method::GET) - .header("User-Agent", USER_AGENT) .uri(Uri::from_static("/2018-06-01/runtime/invocation/next")) .body(Body::empty())?; Ok(req) @@ -82,11 +80,7 @@ where let body = serde_json::to_vec(&self.body)?; let body = Body::from(body); - let req = Request::builder() - .header("User-Agent", USER_AGENT) - .method(Method::POST) - .uri(uri) - .body(body)?; + let req = build_request().method(Method::POST).uri(uri).body(body)?; Ok(req) } } @@ -120,10 +114,9 @@ impl<'a> IntoRequest for EventErrorRequest<'a> { let body = serde_json::to_vec(&self.diagnostic)?; let body = Body::from(body); - let req = Request::builder() + let req = build_request() .method(Method::POST) .uri(uri) - .header("User-Agent", USER_AGENT) .header("lambda-runtime-function-error-type", "unhandled") .body(body)?; Ok(req) @@ -157,10 +150,9 @@ impl IntoRequest for InitErrorRequest { let uri = "/2018-06-01/runtime/init/error".to_string(); let uri = Uri::from_str(&uri)?; - let req = Request::builder() + let req = build_request() .method(Method::POST) .uri(uri) - .header("User-Agent", USER_AGENT) .header("lambda-runtime-function-error-type", "unhandled") .body(Body::empty())?; Ok(req) From 19e1ceba6475edc97628a4745d348436b1836696 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Mon, 29 Nov 2021 11:20:37 -0800 Subject: [PATCH 02/12] Add Lambda Extension crate This new crate encapsulates the logic to create Lambda Extensions. It includes reference examples. Signed-off-by: David Calavera --- Cargo.toml | 1 + lambda-extension/Cargo.toml | 35 ++++++ lambda-extension/examples/basic.rs | 26 ++++ lambda-extension/examples/error_handling.rs | 55 +++++++++ lambda-extension/src/lib.rs | 129 ++++++++++++++++++++ lambda-extension/src/requests.rs | 78 ++++++++++++ 6 files changed, 324 insertions(+) create mode 100644 lambda-extension/Cargo.toml create mode 100644 lambda-extension/examples/basic.rs create mode 100644 lambda-extension/examples/error_handling.rs create mode 100644 lambda-extension/src/lib.rs create mode 100644 lambda-extension/src/requests.rs diff --git a/Cargo.toml b/Cargo.toml index 660d9458..b6280c11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,4 +3,5 @@ members = [ "lambda-http", "lambda-runtime-client", "lambda-runtime", + "lambda-extension" ] \ No newline at end of file diff --git a/lambda-extension/Cargo.toml b/lambda-extension/Cargo.toml new file mode 100644 index 00000000..90b46413 --- /dev/null +++ b/lambda-extension/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "lambda_extension" +version = "0.1.0" +edition = "2021" +authors = ["David Calavera "] +description = "AWS Lambda Extension API" +license = "Apache-2.0" +repository = "https://github.com/awslabs/aws-lambda-rust-runtime" +categories = ["web-programming::http-server"] +keywords = ["AWS", "Lambda", "API"] +readme = "../README.md" + +[dependencies] +anyhow = "1.0.48" +async-trait = "0.1.51" +tokio = { version = "1.0", features = ["macros", "io-util", "sync", "rt-multi-thread"] } +hyper = { version = "0.14", features = ["http1", "client", "server", "stream", "runtime"] } +serde = { version = "1", features = ["derive"] } +serde_json = "^1" +bytes = "1.0" +http = "0.2" +async-stream = "0.3" +futures = "0.3" +tracing-error = "0.2" +tracing = { version = "0.1", features = ["log"] } +tower-service = "0.3" +tokio-stream = "0.1.2" +lambda_runtime_client = { version = "*", path = "../lambda-runtime-client" } + +[dev-dependencies] +tracing-subscriber = "0.3" +once_cell = "1.4.0" +simple_logger = "1.6.0" +log = "^0.4" +simple-error = "0.2" diff --git a/lambda-extension/examples/basic.rs b/lambda-extension/examples/basic.rs new file mode 100644 index 00000000..35952d08 --- /dev/null +++ b/lambda-extension/examples/basic.rs @@ -0,0 +1,26 @@ +use lambda_extension::{run, Error, Extension, InvokeEvent, ShutdownEvent}; +use log::LevelFilter; +use simple_logger::SimpleLogger; + +struct BasicExtension {} + +#[async_trait::async_trait] +impl Extension for BasicExtension { + async fn on_invoke(&self, _extension_id: &str, _event: InvokeEvent) -> Result<(), Error> { + Ok(()) + } + + async fn on_shutdown(&self, _extension_id: &str, _event: ShutdownEvent) -> Result<(), Error> { + Ok(()) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // required to enable CloudWatch error logging by the runtime + // can be replaced with any other method of initializing `log` + SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap(); + + run(BasicExtension {}).await?; + Ok(()) +} diff --git a/lambda-extension/examples/error_handling.rs b/lambda-extension/examples/error_handling.rs new file mode 100644 index 00000000..3496cc60 --- /dev/null +++ b/lambda-extension/examples/error_handling.rs @@ -0,0 +1,55 @@ +use lambda_extension::{run, Error, Extension, InvokeEvent, ShutdownEvent, requests::exit_error, requests::init_error}; +use lambda_runtime_client::Client; +use log::LevelFilter; +use simple_logger::SimpleLogger; + +#[derive(Debug)] +enum ErrorExample { + OnInvokeError, + OnShutdownError, +} + +impl std::error::Error for ErrorExample {} + +impl std::fmt::Display for ErrorExample { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + ErrorExample::OnInvokeError => write!(f, "error processing invocation call"), + ErrorExample::OnShutdownError => write!(f, "error processing shutdown call"), + } + } +} + +struct ErrorHandlingExtension { + client: Client +} + +#[async_trait::async_trait] +impl Extension for ErrorHandlingExtension { + async fn on_invoke(&self, extension_id: &str, _event: InvokeEvent) -> Result<(), Error> { + let err = ErrorExample::OnInvokeError; + let req = init_error(extension_id, &format!("{}", err), None)?; + self.client.call(req).await?; + Err(Box::new(err)) + } + + async fn on_shutdown(&self, extension_id: &str, _event: ShutdownEvent) -> Result<(), Error> { + let err = ErrorExample::OnShutdownError; + let req = exit_error(extension_id, &format!("{}", err), None)?; + self.client.call(req).await?; + Err(Box::new(err)) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // required to enable CloudWatch error logging by the runtime + // can be replaced with any other method of initializing `log` + SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap(); + + let client = Client::builder().build()?; + let extension = ErrorHandlingExtension { client }; + + run(extension).await?; + Ok(()) +} diff --git a/lambda-extension/src/lib.rs b/lambda-extension/src/lib.rs new file mode 100644 index 00000000..49a33914 --- /dev/null +++ b/lambda-extension/src/lib.rs @@ -0,0 +1,129 @@ +// #![deny(clippy::all, clippy::cargo)] +// #![warn(missing_docs,? nonstandard_style, rust_2018_idioms)] + +use async_trait::async_trait; +use hyper::client::{connect::Connection, HttpConnector}; +use lambda_runtime_client::Client; +use serde::Deserialize; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_stream::{StreamExt}; +use tower_service::Service; +use tracing::trace; + +pub mod requests; + +pub type Error = lambda_runtime_client::Error; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Tracing { + pub r#type: String, + pub value: String, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct InvokeEvent { + deadline_ms: u64, + request_id: String, + invoked_function_arn: String, + tracing: Tracing, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ShutdownEvent { + shutdown_reason: String, + deadline_ms: u64, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "UPPERCASE", tag = "eventType")] +pub enum NextEvent { + Invoke(InvokeEvent), + Shutdown(ShutdownEvent), +} + +/// A trait describing an asynchronous extension. +#[async_trait] +pub trait Extension { + async fn on_invoke(&self, extension_id: &str, event: InvokeEvent) -> Result<(), Error>; + async fn on_shutdown(&self, extension_id: &str, event: ShutdownEvent) -> Result<(), Error>; +} + +struct Runtime<'a, C: Service = HttpConnector> { + extension_id: &'a str, + client: Client, +} + +impl<'a, C> Runtime<'a, C> +where + C: Service + Clone + Send + Sync + Unpin + 'static, + >::Future: Unpin + Send, + >::Error: Into>, + >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, +{ + pub async fn run(&self, extension: impl Extension) -> Result<(), Error> { + let client = &self.client; + let extension_id = self.extension_id; + + let incoming = async_stream::stream! { + loop { + trace!("Waiting for next event (incoming loop)"); + let req = requests::next_event_request(extension_id)?; + let res = client.call(req).await; + yield res; + } + }; + + tokio::pin!(incoming); + while let Some(event) = incoming.next().await { + trace!("New event arrived (run loop)"); + let event = event?; + let (_parts, body) = event.into_parts(); + + let body = hyper::body::to_bytes(body).await?; + trace!("{}", std::str::from_utf8(&body)?); // this may be very verbose + let event: NextEvent = serde_json::from_slice(&body)?; + + match event { + NextEvent::Invoke(event) => { + extension.on_invoke(extension_id, event).await?; + } + NextEvent::Shutdown(event) => { + extension.on_shutdown(extension_id, event).await?; + } + }; + } + + Ok(()) + } +} + +async fn register(client: &Client, extension_name: &str) -> Result +where + C: Service + Clone + Send + Sync + Unpin + 'static, + >::Future: Unpin + Send, + >::Error: Into>, + >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, +{ + let req = requests::register_request(extension_name)?; + let res = client.call(req).await?; + // ensure!(res.status() == http::StatusCode::OK, "Unable to register extension",); + + let ext_id = res.headers().get(requests::EXTENSION_ID_HEADER).unwrap().to_str()?; + Ok(ext_id.into()) +} + +pub async fn run(extension: impl Extension) -> Result<(), Error> { + let args: Vec = std::env::args().collect(); + + let client = Client::builder().build().expect("Unable to create a runtime client"); + let extension_id = register(&client, &args[0]).await?; + let runtime = Runtime { + extension_id: &extension_id, + client, + }; + + runtime.run(extension).await +} diff --git a/lambda-extension/src/requests.rs b/lambda-extension/src/requests.rs new file mode 100644 index 00000000..713af697 --- /dev/null +++ b/lambda-extension/src/requests.rs @@ -0,0 +1,78 @@ +use crate::Error; +use http::{Method, Request}; +use hyper::Body; +use lambda_runtime_client::build_request; +use serde::Serialize; + +const EXTENSION_NAME_HEADER: &str = "Lambda-Extension-Name"; +pub(crate) const EXTENSION_ID_HEADER: &str = "Lambda-Extension-Identifier"; +const EXTENSION_ERROR_TYPE_HEADER: &str = "Lambda-Extension-Function-Error-Type"; + +pub(crate) fn next_event_request(extension_id: &str) -> Result, Error> { + let req = build_request() + .method(Method::GET) + .header(EXTENSION_ID_HEADER, extension_id) + .uri("/2020-01-01/extension/event/next") + .body(Body::empty())?; + Ok(req) +} + +pub(crate) fn register_request(extension_name: &str) -> Result, Error> { + let events = serde_json::json!({ + "events": ["INVOKE", "SHUTDOWN"] + }); + + let req = build_request() + .method(Method::POST) + .uri("/2020-01-01/extension/register") + .header(EXTENSION_NAME_HEADER, extension_name) + .body(Body::from(serde_json::to_string(&events)?))?; + + Ok(req) +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ErrorRequest<'a> { + error_message: &'a str, + error_type: &'a str, + stack_trace: Vec<&'a str>, +} + +pub fn init_error<'a>( + extension_id: &str, + error_type: &str, + request: Option>, +) -> Result, Error> { + error_request("init", extension_id, error_type, request) +} + +pub fn exit_error<'a>( + extension_id: &str, + error_type: &str, + request: Option>, +) -> Result, Error> { + error_request("exit", extension_id, error_type, request) +} + +fn error_request<'a>( + error_type: &str, + extension_id: &str, + error_str: &str, + request: Option>, +) -> Result, Error> { + let uri = format!("/2020-01-01/extension/{}/error", error_type); + + let body = match request { + None => Body::empty(), + Some(err) => Body::from(serde_json::to_string(&err)?), + }; + + let req = build_request() + .method(Method::POST) + .uri(uri) + .header(EXTENSION_ID_HEADER, extension_id) + .header(EXTENSION_ERROR_TYPE_HEADER, error_str) + .body(body)?; + Ok(req) +} From 9d46537ff701217b8fcdab92e3c60d598149ad15 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Wed, 1 Dec 2021 10:40:14 -0800 Subject: [PATCH 03/12] Rename client crate to avoid confusion. Once upon a time, there was a crate called `lambda-runtime-client`. We don't want people to mistake this new crate with the old one. Signed-off-by: David Calavera --- Cargo.toml | 2 +- lambda-extension/Cargo.toml | 3 +-- lambda-extension/examples/error_handling.rs | 2 +- lambda-extension/src/lib.rs | 4 ++-- lambda-extension/src/requests.rs | 2 +- .../Cargo.toml | 4 ++-- .../src/lib.rs | 0 lambda-runtime/Cargo.toml | 2 +- lambda-runtime/src/lib.rs | 6 +++--- lambda-runtime/src/requests.rs | 2 +- 10 files changed, 13 insertions(+), 14 deletions(-) rename {lambda-runtime-client => lambda-runtime-api-client}/Cargo.toml (91%) rename {lambda-runtime-client => lambda-runtime-api-client}/src/lib.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index b6280c11..01bc46b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] members = [ "lambda-http", - "lambda-runtime-client", + "lambda-runtime-api-client", "lambda-runtime", "lambda-extension" ] \ No newline at end of file diff --git a/lambda-extension/Cargo.toml b/lambda-extension/Cargo.toml index 90b46413..e8ed3fe3 100644 --- a/lambda-extension/Cargo.toml +++ b/lambda-extension/Cargo.toml @@ -11,7 +11,6 @@ keywords = ["AWS", "Lambda", "API"] readme = "../README.md" [dependencies] -anyhow = "1.0.48" async-trait = "0.1.51" tokio = { version = "1.0", features = ["macros", "io-util", "sync", "rt-multi-thread"] } hyper = { version = "0.14", features = ["http1", "client", "server", "stream", "runtime"] } @@ -25,7 +24,7 @@ tracing-error = "0.2" tracing = { version = "0.1", features = ["log"] } tower-service = "0.3" tokio-stream = "0.1.2" -lambda_runtime_client = { version = "*", path = "../lambda-runtime-client" } +lambda_runtime_api_client = { version = "0.4", path = "../lambda-runtime-api-client" } [dev-dependencies] tracing-subscriber = "0.3" diff --git a/lambda-extension/examples/error_handling.rs b/lambda-extension/examples/error_handling.rs index 3496cc60..1a68cb63 100644 --- a/lambda-extension/examples/error_handling.rs +++ b/lambda-extension/examples/error_handling.rs @@ -1,5 +1,5 @@ use lambda_extension::{run, Error, Extension, InvokeEvent, ShutdownEvent, requests::exit_error, requests::init_error}; -use lambda_runtime_client::Client; +use lambda_runtime_api_client::Client; use log::LevelFilter; use simple_logger::SimpleLogger; diff --git a/lambda-extension/src/lib.rs b/lambda-extension/src/lib.rs index 49a33914..dcebecb1 100644 --- a/lambda-extension/src/lib.rs +++ b/lambda-extension/src/lib.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use hyper::client::{connect::Connection, HttpConnector}; -use lambda_runtime_client::Client; +use lambda_runtime_api_client::Client; use serde::Deserialize; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::{StreamExt}; @@ -12,7 +12,7 @@ use tracing::trace; pub mod requests; -pub type Error = lambda_runtime_client::Error; +pub type Error = lambda_runtime_api_client::Error; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/lambda-extension/src/requests.rs b/lambda-extension/src/requests.rs index 713af697..69d98200 100644 --- a/lambda-extension/src/requests.rs +++ b/lambda-extension/src/requests.rs @@ -1,7 +1,7 @@ use crate::Error; use http::{Method, Request}; use hyper::Body; -use lambda_runtime_client::build_request; +use lambda_runtime_api_client::build_request; use serde::Serialize; const EXTENSION_NAME_HEADER: &str = "Lambda-Extension-Name"; diff --git a/lambda-runtime-client/Cargo.toml b/lambda-runtime-api-client/Cargo.toml similarity index 91% rename from lambda-runtime-client/Cargo.toml rename to lambda-runtime-api-client/Cargo.toml index 33bc6566..34ee7476 100644 --- a/lambda-runtime-client/Cargo.toml +++ b/lambda-runtime-api-client/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "lambda_runtime_client" -version = "0.1.0" +name = "lambda_runtime_api_client" +version = "0.4.1" edition = "2021" authors = ["David Calavera "] description = "AWS Lambda Runtime interaction API" diff --git a/lambda-runtime-client/src/lib.rs b/lambda-runtime-api-client/src/lib.rs similarity index 100% rename from lambda-runtime-client/src/lib.rs rename to lambda-runtime-api-client/src/lib.rs diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index e6939144..25bc26ec 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -27,7 +27,7 @@ tracing-error = "0.2" tracing = { version = "0.1", features = ["log"] } tower-service = "0.3" tokio-stream = "0.1.2" -lambda_runtime_client = { version = "*", path = "../lambda-runtime-client" } +lambda_runtime_api_client = { version = "0.4", path = "../lambda-runtime-api-client" } [dev-dependencies] tracing-subscriber = "0.3" diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index 61de9baf..b5df15a9 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -7,7 +7,7 @@ //! to the the `lambda_runtime::run` function, which launches and runs the Lambda runtime. pub use crate::types::Context; use hyper::client::{connect::Connection, HttpConnector}; -use lambda_runtime_client::Client; +use lambda_runtime_api_client::Client; use serde::{Deserialize, Serialize}; use std::{convert::TryFrom, env, fmt, future::Future, panic}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -24,7 +24,7 @@ mod types; use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest}; use types::Diagnostic; -pub type Error = lambda_runtime_client::Error; +pub type Error = lambda_runtime_api_client::Error; /// Configuration derived from environment variables. #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] @@ -244,7 +244,7 @@ fn type_name_of_val(_: T) -> &'static str { #[cfg(test)] mod endpoint_tests { - use lambda_runtime_client::Client; + use lambda_runtime_api_client::Client; use crate::{ incoming, requests::{ diff --git a/lambda-runtime/src/requests.rs b/lambda-runtime/src/requests.rs index dd08fc33..4d033614 100644 --- a/lambda-runtime/src/requests.rs +++ b/lambda-runtime/src/requests.rs @@ -1,7 +1,7 @@ use crate::{types::Diagnostic, Error}; use http::{Method, Request, Response, Uri}; use hyper::Body; -use lambda_runtime_client::build_request; +use lambda_runtime_api_client::build_request; use serde::Serialize; use std::str::FromStr; From a7161beb65faf18918e44728430f79b54c8c293e Mon Sep 17 00:00:00 2001 From: David Calavera Date: Thu, 2 Dec 2021 15:54:55 -0800 Subject: [PATCH 04/12] Modify user API. - Remove async_trait dependency. - Use a similar handler api than the runtime. - Allow to register the extension for only certain events. Signed-off-by: David Calavera --- lambda-extension/examples/basic.rs | 25 ++-- lambda-extension/examples/custom_events.rs | 30 ++++ lambda-extension/examples/error_handling.rs | 55 ------- lambda-extension/src/lib.rs | 156 +++++++++++++++----- lambda-extension/src/requests.rs | 6 +- lambda-runtime/src/lib.rs | 4 +- 6 files changed, 161 insertions(+), 115 deletions(-) create mode 100644 lambda-extension/examples/custom_events.rs delete mode 100644 lambda-extension/examples/error_handling.rs diff --git a/lambda-extension/examples/basic.rs b/lambda-extension/examples/basic.rs index 35952d08..2a17a6c0 100644 --- a/lambda-extension/examples/basic.rs +++ b/lambda-extension/examples/basic.rs @@ -1,18 +1,17 @@ -use lambda_extension::{run, Error, Extension, InvokeEvent, ShutdownEvent}; +use lambda_extension::{extension_fn, Error, ExtensionId, NextEvent}; use log::LevelFilter; use simple_logger::SimpleLogger; -struct BasicExtension {} - -#[async_trait::async_trait] -impl Extension for BasicExtension { - async fn on_invoke(&self, _extension_id: &str, _event: InvokeEvent) -> Result<(), Error> { - Ok(()) - } - - async fn on_shutdown(&self, _extension_id: &str, _event: ShutdownEvent) -> Result<(), Error> { - Ok(()) +async fn my_extension(_extension_id: ExtensionId, event: NextEvent) -> Result<(), Error> { + match event { + NextEvent::Shutdown(_e) => { + // do something with the shutdown event + } + NextEvent::Invoke(_e) => { + // do something with the invoke event + } } + Ok(()) } #[tokio::main] @@ -21,6 +20,6 @@ async fn main() -> Result<(), Error> { // can be replaced with any other method of initializing `log` SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap(); - run(BasicExtension {}).await?; - Ok(()) + let func = extension_fn(my_extension); + lambda_extension::run(func).await } diff --git a/lambda-extension/examples/custom_events.rs b/lambda-extension/examples/custom_events.rs new file mode 100644 index 00000000..7fc84b67 --- /dev/null +++ b/lambda-extension/examples/custom_events.rs @@ -0,0 +1,30 @@ +use lambda_extension::{extension_fn, Error, ExtensionId, NextEvent, Runtime}; +use log::LevelFilter; +use simple_logger::SimpleLogger; + +async fn my_extension(_extension_id: ExtensionId, event: NextEvent) -> Result<(), Error> { + match event { + NextEvent::Shutdown(_e) => { + // do something with the shutdown event + } + _ => { + // ignore any other event + // because we've registered the extension + // only to receive SHUTDOWN events + } + } + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // required to enable CloudWatch error logging by the runtime + // can be replaced with any other method of initializing `log` + SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap(); + + let func = extension_fn(my_extension); + + let runtime = Runtime::builder().with_events(&["SHUTDOWN"]).register().await?; + + runtime.run(func).await +} diff --git a/lambda-extension/examples/error_handling.rs b/lambda-extension/examples/error_handling.rs deleted file mode 100644 index 1a68cb63..00000000 --- a/lambda-extension/examples/error_handling.rs +++ /dev/null @@ -1,55 +0,0 @@ -use lambda_extension::{run, Error, Extension, InvokeEvent, ShutdownEvent, requests::exit_error, requests::init_error}; -use lambda_runtime_api_client::Client; -use log::LevelFilter; -use simple_logger::SimpleLogger; - -#[derive(Debug)] -enum ErrorExample { - OnInvokeError, - OnShutdownError, -} - -impl std::error::Error for ErrorExample {} - -impl std::fmt::Display for ErrorExample { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - ErrorExample::OnInvokeError => write!(f, "error processing invocation call"), - ErrorExample::OnShutdownError => write!(f, "error processing shutdown call"), - } - } -} - -struct ErrorHandlingExtension { - client: Client -} - -#[async_trait::async_trait] -impl Extension for ErrorHandlingExtension { - async fn on_invoke(&self, extension_id: &str, _event: InvokeEvent) -> Result<(), Error> { - let err = ErrorExample::OnInvokeError; - let req = init_error(extension_id, &format!("{}", err), None)?; - self.client.call(req).await?; - Err(Box::new(err)) - } - - async fn on_shutdown(&self, extension_id: &str, _event: ShutdownEvent) -> Result<(), Error> { - let err = ErrorExample::OnShutdownError; - let req = exit_error(extension_id, &format!("{}", err), None)?; - self.client.call(req).await?; - Err(Box::new(err)) - } -} - -#[tokio::main] -async fn main() -> Result<(), Error> { - // required to enable CloudWatch error logging by the runtime - // can be replaced with any other method of initializing `log` - SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap(); - - let client = Client::builder().build()?; - let extension = ErrorHandlingExtension { client }; - - run(extension).await?; - Ok(()) -} diff --git a/lambda-extension/src/lib.rs b/lambda-extension/src/lib.rs index dcebecb1..f6b95bd1 100644 --- a/lambda-extension/src/lib.rs +++ b/lambda-extension/src/lib.rs @@ -1,18 +1,19 @@ // #![deny(clippy::all, clippy::cargo)] // #![warn(missing_docs,? nonstandard_style, rust_2018_idioms)] -use async_trait::async_trait; use hyper::client::{connect::Connection, HttpConnector}; use lambda_runtime_api_client::Client; use serde::Deserialize; +use std::future::Future; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_stream::{StreamExt}; +use tokio_stream::StreamExt; use tower_service::Service; use tracing::trace; pub mod requests; pub type Error = lambda_runtime_api_client::Error; +pub type ExtensionId = String; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] @@ -44,19 +45,61 @@ pub enum NextEvent { Shutdown(ShutdownEvent), } +impl NextEvent { + fn is_invoke(&self) -> bool { + match self { + NextEvent::Invoke(_) => true, + _ => false, + } + } +} + /// A trait describing an asynchronous extension. -#[async_trait] pub trait Extension { - async fn on_invoke(&self, extension_id: &str, event: InvokeEvent) -> Result<(), Error>; - async fn on_shutdown(&self, extension_id: &str, event: ShutdownEvent) -> Result<(), Error>; + /// Response of this Extension. + type Fut: Future>; + /// Handle the incoming event. + fn call(&self, extension_id: ExtensionId, event: NextEvent) -> Self::Fut; +} + +/// Returns a new [`ExtensionFn`] with the given closure. +/// +/// [`ExtensionFn`]: struct.ExtensionFn.html +pub fn extension_fn(f: F) -> ExtensionFn { + ExtensionFn { f } } -struct Runtime<'a, C: Service = HttpConnector> { - extension_id: &'a str, +/// An [`Extension`] implemented by a closure. +/// +/// [`Extension`]: trait.Extension.html +#[derive(Clone, Debug)] +pub struct ExtensionFn { + f: F, +} + +impl Extension for ExtensionFn +where + F: Fn(ExtensionId, NextEvent) -> Fut, + Fut: Future>, +{ + type Fut = Fut; + fn call(&self, extension_id: ExtensionId, event: NextEvent) -> Self::Fut { + (self.f)(extension_id, event) + } +} + +pub struct Runtime = HttpConnector> { + extension_id: ExtensionId, client: Client, } -impl<'a, C> Runtime<'a, C> +impl Runtime { + pub fn builder<'a>() -> RuntimeBuilder<'a> { + RuntimeBuilder::default() + } +} + +impl Runtime where C: Service + Clone + Send + Sync + Unpin + 'static, >::Future: Unpin + Send, @@ -65,12 +108,11 @@ where { pub async fn run(&self, extension: impl Extension) -> Result<(), Error> { let client = &self.client; - let extension_id = self.extension_id; let incoming = async_stream::stream! { loop { trace!("Waiting for next event (incoming loop)"); - let req = requests::next_event_request(extension_id)?; + let req = requests::next_event_request(&self.extension_id)?; let res = client.call(req).await; yield res; } @@ -85,45 +127,77 @@ where let body = hyper::body::to_bytes(body).await?; trace!("{}", std::str::from_utf8(&body)?); // this may be very verbose let event: NextEvent = serde_json::from_slice(&body)?; - - match event { - NextEvent::Invoke(event) => { - extension.on_invoke(extension_id, event).await?; - } - NextEvent::Shutdown(event) => { - extension.on_shutdown(extension_id, event).await?; - } - }; + let is_invoke = event.is_invoke(); + + let res = extension.call(self.extension_id.clone(), event).await; + if let Err(error) = res { + let req = if is_invoke { + requests::init_error(&self.extension_id, &error.to_string(), None)? + } else { + requests::exit_error(&self.extension_id, &error.to_string(), None)? + }; + + self.client.call(req).await?; + return Err(error); + } } Ok(()) } } -async fn register(client: &Client, extension_name: &str) -> Result -where - C: Service + Clone + Send + Sync + Unpin + 'static, - >::Future: Unpin + Send, - >::Error: Into>, - >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, -{ - let req = requests::register_request(extension_name)?; - let res = client.call(req).await?; - // ensure!(res.status() == http::StatusCode::OK, "Unable to register extension",); - - let ext_id = res.headers().get(requests::EXTENSION_ID_HEADER).unwrap().to_str()?; - Ok(ext_id.into()) +#[derive(Default)] +pub struct RuntimeBuilder<'a> { + extension_name: Option<&'a str>, + events: Option<&'a [&'a str]>, } -pub async fn run(extension: impl Extension) -> Result<(), Error> { - let args: Vec = std::env::args().collect(); +impl<'a> RuntimeBuilder<'a> { + pub fn with_extension_name(self, extension_name: &'a str) -> Self { + RuntimeBuilder { + extension_name: Some(extension_name), + ..self + } + } + + pub fn with_events(self, events: &'a [&'a str]) -> Self { + RuntimeBuilder { + events: Some(events), + ..self + } + } + + pub async fn register(&self) -> Result { + let name = match self.extension_name { + Some(name) => name.into(), + None => { + let args: Vec = std::env::args().collect(); + args[0].clone() + } + }; + + let events = match self.events { + Some(events) => events, + None => &["INVOKE", "SHUTDOWN"], + }; + + let client = Client::builder().build()?; - let client = Client::builder().build().expect("Unable to create a runtime client"); - let extension_id = register(&client, &args[0]).await?; - let runtime = Runtime { - extension_id: &extension_id, - client, - }; + let req = requests::register_request(&name, events)?; + let res = client.call(req).await?; + // ensure!(res.status() == http::StatusCode::OK, "Unable to register extension",); - runtime.run(extension).await + let extension_id = res.headers().get(requests::EXTENSION_ID_HEADER).unwrap().to_str()?; + Ok(Runtime { + extension_id: extension_id.into(), + client: client, + }) + } +} + +pub async fn run(extension: Ex) -> Result<(), Error> +where + Ex: Extension, +{ + Runtime::builder().register().await?.run(extension).await } diff --git a/lambda-extension/src/requests.rs b/lambda-extension/src/requests.rs index 69d98200..25906cd7 100644 --- a/lambda-extension/src/requests.rs +++ b/lambda-extension/src/requests.rs @@ -17,10 +17,8 @@ pub(crate) fn next_event_request(extension_id: &str) -> Result, Er Ok(req) } -pub(crate) fn register_request(extension_name: &str) -> Result, Error> { - let events = serde_json::json!({ - "events": ["INVOKE", "SHUTDOWN"] - }); +pub(crate) fn register_request(extension_name: &str, events: &[&str]) -> Result, Error> { + let events = serde_json::json!({ "events": events }); let req = build_request() .method(Method::POST) diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index b5df15a9..9ffaa275 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -244,7 +244,6 @@ fn type_name_of_val(_: T) -> &'static str { #[cfg(test)] mod endpoint_tests { - use lambda_runtime_api_client::Client; use crate::{ incoming, requests::{ @@ -256,6 +255,7 @@ mod endpoint_tests { }; use http::{uri::PathAndQuery, HeaderValue, Method, Request, Response, StatusCode, Uri}; use hyper::{server::conn::Http, service::service_fn, Body}; + use lambda_runtime_api_client::Client; use serde_json::json; use simulated::DuplexStreamWrapper; use std::{convert::TryFrom, env}; @@ -499,4 +499,4 @@ mod endpoint_tests { Err(_) => unreachable!("This branch shouldn't be reachable"), } } -} \ No newline at end of file +} From af95f363070646c914d7a1c38620c2e591062ca4 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Fri, 3 Dec 2021 11:20:38 -0800 Subject: [PATCH 05/12] Cleanup user API. - Remove extension ID from call signature. - Make extension mutable. - Add example showing how to implement an extension with a struct. Signed-off-by: David Calavera --- lambda-extension/examples/basic.rs | 4 +-- lambda-extension/examples/custom_events.rs | 4 +-- .../examples/custom_trait_implementation.rs | 34 +++++++++++++++++++ lambda-extension/src/lib.rs | 15 ++++---- 4 files changed, 45 insertions(+), 12 deletions(-) create mode 100644 lambda-extension/examples/custom_trait_implementation.rs diff --git a/lambda-extension/examples/basic.rs b/lambda-extension/examples/basic.rs index 2a17a6c0..573b3281 100644 --- a/lambda-extension/examples/basic.rs +++ b/lambda-extension/examples/basic.rs @@ -1,8 +1,8 @@ -use lambda_extension::{extension_fn, Error, ExtensionId, NextEvent}; +use lambda_extension::{extension_fn, Error, NextEvent}; use log::LevelFilter; use simple_logger::SimpleLogger; -async fn my_extension(_extension_id: ExtensionId, event: NextEvent) -> Result<(), Error> { +async fn my_extension(event: NextEvent) -> Result<(), Error> { match event { NextEvent::Shutdown(_e) => { // do something with the shutdown event diff --git a/lambda-extension/examples/custom_events.rs b/lambda-extension/examples/custom_events.rs index 7fc84b67..88f040aa 100644 --- a/lambda-extension/examples/custom_events.rs +++ b/lambda-extension/examples/custom_events.rs @@ -1,8 +1,8 @@ -use lambda_extension::{extension_fn, Error, ExtensionId, NextEvent, Runtime}; +use lambda_extension::{extension_fn, Error, NextEvent, Runtime}; use log::LevelFilter; use simple_logger::SimpleLogger; -async fn my_extension(_extension_id: ExtensionId, event: NextEvent) -> Result<(), Error> { +async fn my_extension(event: NextEvent) -> Result<(), Error> { match event { NextEvent::Shutdown(_e) => { // do something with the shutdown event diff --git a/lambda-extension/examples/custom_trait_implementation.rs b/lambda-extension/examples/custom_trait_implementation.rs new file mode 100644 index 00000000..42c1db81 --- /dev/null +++ b/lambda-extension/examples/custom_trait_implementation.rs @@ -0,0 +1,34 @@ +use lambda_extension::{run, Error, NextEvent, Extension}; +use log::LevelFilter; +use simple_logger::SimpleLogger; +use std::future::{Future, ready}; +use std::pin::Pin; + +struct MyExtension {} + +impl Extension for MyExtension +{ + type Fut = Pin>>>; + fn call(&mut self, event: NextEvent) -> Self::Fut { + match event { + NextEvent::Shutdown(_e) => { + // do something with the shutdown event + } + _ => { + // ignore any other event + // because we've registered the extension + // only to receive SHUTDOWN events + } + } + Box::pin(ready(Ok(()))) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // required to enable CloudWatch error logging by the runtime + // can be replaced with any other method of initializing `log` + SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap(); + + run(MyExtension {}).await +} diff --git a/lambda-extension/src/lib.rs b/lambda-extension/src/lib.rs index f6b95bd1..0248ce2a 100644 --- a/lambda-extension/src/lib.rs +++ b/lambda-extension/src/lib.rs @@ -13,7 +13,6 @@ use tracing::trace; pub mod requests; pub type Error = lambda_runtime_api_client::Error; -pub type ExtensionId = String; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] @@ -59,7 +58,7 @@ pub trait Extension { /// Response of this Extension. type Fut: Future>; /// Handle the incoming event. - fn call(&self, extension_id: ExtensionId, event: NextEvent) -> Self::Fut; + fn call(&mut self, event: NextEvent) -> Self::Fut; } /// Returns a new [`ExtensionFn`] with the given closure. @@ -79,17 +78,17 @@ pub struct ExtensionFn { impl Extension for ExtensionFn where - F: Fn(ExtensionId, NextEvent) -> Fut, + F: Fn(NextEvent) -> Fut, Fut: Future>, { type Fut = Fut; - fn call(&self, extension_id: ExtensionId, event: NextEvent) -> Self::Fut { - (self.f)(extension_id, event) + fn call(&mut self, event: NextEvent) -> Self::Fut { + (self.f)(event) } } pub struct Runtime = HttpConnector> { - extension_id: ExtensionId, + extension_id: String, client: Client, } @@ -106,7 +105,7 @@ where >::Error: Into>, >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { - pub async fn run(&self, extension: impl Extension) -> Result<(), Error> { + pub async fn run(&self, mut extension: impl Extension) -> Result<(), Error> { let client = &self.client; let incoming = async_stream::stream! { @@ -129,7 +128,7 @@ where let event: NextEvent = serde_json::from_slice(&body)?; let is_invoke = event.is_invoke(); - let res = extension.call(self.extension_id.clone(), event).await; + let res = extension.call(event).await; if let Err(error) = res { let req = if is_invoke { requests::init_error(&self.extension_id, &error.to_string(), None)? From c5b423ae2f3a813d1ecce6cc518b865c742c4ffb Mon Sep 17 00:00:00 2001 From: David Calavera Date: Fri, 3 Dec 2021 17:34:20 -0800 Subject: [PATCH 06/12] Add documentation and cleanup code. Signed-off-by: David Calavera --- .../examples/custom_trait_implementation.rs | 7 +- lambda-extension/src/lib.rs | 85 +++++++++++++++---- lambda-extension/src/requests.rs | 12 ++- lambda-runtime-api-client/src/lib.rs | 22 ++++- lambda-runtime/src/lib.rs | 1 + 5 files changed, 101 insertions(+), 26 deletions(-) diff --git a/lambda-extension/examples/custom_trait_implementation.rs b/lambda-extension/examples/custom_trait_implementation.rs index 42c1db81..23f4c1a0 100644 --- a/lambda-extension/examples/custom_trait_implementation.rs +++ b/lambda-extension/examples/custom_trait_implementation.rs @@ -1,13 +1,12 @@ -use lambda_extension::{run, Error, NextEvent, Extension}; +use lambda_extension::{run, Error, Extension, NextEvent}; use log::LevelFilter; use simple_logger::SimpleLogger; -use std::future::{Future, ready}; +use std::future::{ready, Future}; use std::pin::Pin; struct MyExtension {} -impl Extension for MyExtension -{ +impl Extension for MyExtension { type Fut = Pin>>>; fn call(&mut self, event: NextEvent) -> Self::Fut { match event { diff --git a/lambda-extension/src/lib.rs b/lambda-extension/src/lib.rs index 0248ce2a..71b0d893 100644 --- a/lambda-extension/src/lib.rs +++ b/lambda-extension/src/lib.rs @@ -1,6 +1,10 @@ -// #![deny(clippy::all, clippy::cargo)] -// #![warn(missing_docs,? nonstandard_style, rust_2018_idioms)] +#![deny(clippy::all, clippy::cargo)] +#![warn(missing_docs, nonstandard_style, rust_2018_idioms)] +//! This module includes utilities to create Lambda Runtime Extensions. +//! +//! Create a type that conforms to the [`Extension`] trait. This type can then be passed +//! to the the `lambda_extension::run` function, which launches and runs the Lambda runtime extension. use hyper::client::{connect::Connection, HttpConnector}; use lambda_runtime_api_client::Client; use serde::Deserialize; @@ -10,46 +14,82 @@ use tokio_stream::StreamExt; use tower_service::Service; use tracing::trace; +/// Include several request builders to interact with the Extension API. pub mod requests; +/// Error type that extensions may result in pub type Error = lambda_runtime_api_client::Error; +/// Simple error that encapsulates human readable descriptions +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ExtensionError { + err: String, +} + +impl ExtensionError { + fn boxed>(str: T) -> Box { + Box::new(ExtensionError { err: str.into() }) + } +} + +impl std::fmt::Display for ExtensionError { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.err.fmt(f) + } +} + +impl std::error::Error for ExtensionError {} + +/// Request tracing information #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Tracing { + /// The type of tracing exposed to the extension pub r#type: String, + /// The span value pub value: String, } +/// Event received when there is a new Lambda invocation. #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct InvokeEvent { - deadline_ms: u64, - request_id: String, - invoked_function_arn: String, - tracing: Tracing, + /// The time that the function times out + pub deadline_ms: u64, + /// The ID assigned to the Lambda request + pub request_id: String, + /// The function's Amazon Resource Name + pub invoked_function_arn: String, + /// The request tracing information + pub tracing: Tracing, } +/// Event received when a Lambda function shuts down. #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ShutdownEvent { - shutdown_reason: String, - deadline_ms: u64, + /// The reason why the function terminates + /// It can be SPINDOWN, TIMEOUT, or FAILURE + pub shutdown_reason: String, + /// The time that the function times out + pub deadline_ms: u64, } +/// Event that the extension receives in +/// either the INVOKE or SHUTDOWN phase #[derive(Debug, Deserialize)] #[serde(rename_all = "UPPERCASE", tag = "eventType")] pub enum NextEvent { + /// Payload when the event happens in the INVOKE phase Invoke(InvokeEvent), + /// Payload when the event happens in the SHUTDOWN phase Shutdown(ShutdownEvent), } impl NextEvent { fn is_invoke(&self) -> bool { - match self { - NextEvent::Invoke(_) => true, - _ => false, - } + matches!(self, NextEvent::Invoke(_)) } } @@ -87,12 +127,14 @@ where } } +/// The Runtime handles all the incoming extension requests pub struct Runtime = HttpConnector> { extension_id: String, client: Client, } impl Runtime { + /// Create a [`RuntimeBuilder`] to initialize the [`Runtime`] pub fn builder<'a>() -> RuntimeBuilder<'a> { RuntimeBuilder::default() } @@ -105,6 +147,8 @@ where >::Error: Into>, >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { + /// Execute the given extension. + /// Register the extension with the Extensions API and wait for incoming events. pub async fn run(&self, mut extension: impl Extension) -> Result<(), Error> { let client = &self.client; @@ -145,6 +189,7 @@ where } } +/// Builder to construct a new extension [`Runtime`] #[derive(Default)] pub struct RuntimeBuilder<'a> { extension_name: Option<&'a str>, @@ -152,6 +197,7 @@ pub struct RuntimeBuilder<'a> { } impl<'a> RuntimeBuilder<'a> { + /// Create a new [`RuntimeBuilder`] with a given extension name pub fn with_extension_name(self, extension_name: &'a str) -> Self { RuntimeBuilder { extension_name: Some(extension_name), @@ -159,6 +205,8 @@ impl<'a> RuntimeBuilder<'a> { } } + /// Create a new [`RuntimeBuilder`] with a list of given events. + /// The only accepted events are `INVOKE` and `SHUTDOWN`. pub fn with_events(self, events: &'a [&'a str]) -> Self { RuntimeBuilder { events: Some(events), @@ -166,6 +214,7 @@ impl<'a> RuntimeBuilder<'a> { } } + /// Initialize and register the extension in the Extensions API pub async fn register(&self) -> Result { let name = match self.extension_name { Some(name) => name.into(), @@ -175,25 +224,25 @@ impl<'a> RuntimeBuilder<'a> { } }; - let events = match self.events { - Some(events) => events, - None => &["INVOKE", "SHUTDOWN"], - }; + let events = self.events.unwrap_or(&["INVOKE", "SHUTDOWN"]); let client = Client::builder().build()?; let req = requests::register_request(&name, events)?; let res = client.call(req).await?; - // ensure!(res.status() == http::StatusCode::OK, "Unable to register extension",); + if res.status() != http::StatusCode::OK { + return Err(ExtensionError::boxed("unable to register the extension")); + } let extension_id = res.headers().get(requests::EXTENSION_ID_HEADER).unwrap().to_str()?; Ok(Runtime { extension_id: extension_id.into(), - client: client, + client, }) } } +/// Execute the given extension pub async fn run(extension: Ex) -> Result<(), Error> where Ex: Extension, diff --git a/lambda-extension/src/requests.rs b/lambda-extension/src/requests.rs index 25906cd7..2fdbf2a6 100644 --- a/lambda-extension/src/requests.rs +++ b/lambda-extension/src/requests.rs @@ -29,14 +29,19 @@ pub(crate) fn register_request(extension_name: &str, events: &[&str]) -> Result< Ok(req) } +/// Payload to send error information to the Extensions API. #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct ErrorRequest<'a> { - error_message: &'a str, - error_type: &'a str, - stack_trace: Vec<&'a str>, + /// Human readable error description + pub error_message: &'a str, + /// The type of error to categorize + pub error_type: &'a str, + /// The error backtrace + pub stack_trace: Vec<&'a str>, } +/// Create a new init error request to send to the Extensions API pub fn init_error<'a>( extension_id: &str, error_type: &str, @@ -45,6 +50,7 @@ pub fn init_error<'a>( error_request("init", extension_id, error_type, request) } +/// Create a new exit error request to send to the Extensions API pub fn exit_error<'a>( extension_id: &str, error_type: &str, diff --git a/lambda-runtime-api-client/src/lib.rs b/lambda-runtime-api-client/src/lib.rs index 989b2eeb..6fca23c6 100644 --- a/lambda-runtime-api-client/src/lib.rs +++ b/lambda-runtime-api-client/src/lib.rs @@ -1,3 +1,8 @@ +#![deny(clippy::all, clippy::cargo)] +#![warn(missing_docs, nonstandard_style, rust_2018_idioms)] + +//! This crate includes a base HTTP client to interact with +//! the AWS Lambda Runtime API. use http::{uri::Scheme, Request, Response, Uri}; use hyper::client::{connect::Connection, HttpConnector}; use hyper::Body; @@ -11,13 +16,17 @@ const USER_AGENT: &str = concat!("aws-lambda-rust/", env!("CARGO_PKG_VERSION")); /// Error type that lambdas may result in pub type Error = Box; +/// API client to interact with the AWS Lambda Runtime API. #[derive(Debug)] pub struct Client { + /// The runtime API URI pub base: Uri, + /// The client that manages the API connections pub client: hyper::Client, } impl Client { + /// Create a builder struct to configure the client. pub fn builder() -> ClientBuilder { ClientBuilder { connector: HttpConnector::new(), @@ -30,12 +39,15 @@ impl Client where C: hyper::client::connect::Connect + Sync + Send + Clone + 'static, { + /// Send a given request to the Runtime API. + /// Use the client's base URI to ensure the API endpoint is correct. pub async fn call(&self, req: Request) -> Result, Error> { let req = self.set_origin(req)?; let response = self.client.request(req).await?; Ok(response) } + /// Create a new client with a given base URI and HTTP connector. pub fn with(base: Uri, connector: C) -> Self { let client = hyper::Client::builder().build(connector); Self { base, client } @@ -66,6 +78,7 @@ where } } +/// Builder implementation to construct any Runtime API clients. pub struct ClientBuilder = hyper::client::HttpConnector> { connector: C, uri: Option, @@ -78,6 +91,7 @@ where >::Error: Into>, >::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, { + /// Create a new builder with a given HTTP connector. pub fn with_connector(self, connector: C2) -> ClientBuilder where C2: Service + Clone + Send + Sync + Unpin + 'static, @@ -91,22 +105,28 @@ where } } + /// Create a new builder with a given base URI. + /// Inherits all other attributes from the existent builder. pub fn with_endpoint(self, uri: http::Uri) -> Self { Self { uri: Some(uri), ..self } } + /// Create the new client to interact with the Runtime API. pub fn build(self) -> Result, Error> { let uri = match self.uri { Some(uri) => uri, None => { let uri = std::env::var("AWS_LAMBDA_RUNTIME_API").expect("Missing AWS_LAMBDA_RUNTIME_API env var"); - uri.clone().try_into().expect("Unable to convert to URL") + uri.try_into().expect("Unable to convert to URL") } }; Ok(Client::with(uri, self.connector)) } } +/// Create a request builder. +/// This builder uses `aws-lambda-rust/CRATE_VERSION` as +/// the default User-Agent. pub fn build_request() -> http::request::Builder { http::Request::builder().header(USER_AGENT_HEADER, USER_AGENT) } diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index 9ffaa275..85403ea3 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -24,6 +24,7 @@ mod types; use requests::{EventCompletionRequest, EventErrorRequest, IntoRequest, NextEventRequest}; use types::Diagnostic; +/// Error type that lambdas may result in pub type Error = lambda_runtime_api_client::Error; /// Configuration derived from environment variables. From 23b363155831da95b438b9d89fae228fcd57a440 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Fri, 3 Dec 2021 17:41:27 -0800 Subject: [PATCH 07/12] Make custom trait example more useful. Signed-off-by: David Calavera --- .../examples/custom_trait_implementation.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/lambda-extension/examples/custom_trait_implementation.rs b/lambda-extension/examples/custom_trait_implementation.rs index 23f4c1a0..87567220 100644 --- a/lambda-extension/examples/custom_trait_implementation.rs +++ b/lambda-extension/examples/custom_trait_implementation.rs @@ -1,22 +1,23 @@ -use lambda_extension::{run, Error, Extension, NextEvent}; +use lambda_extension::{run, Error, Extension, InvokeEvent, NextEvent}; use log::LevelFilter; use simple_logger::SimpleLogger; use std::future::{ready, Future}; use std::pin::Pin; -struct MyExtension {} +#[derive(Default)] +struct MyExtension { + data: Vec, +} impl Extension for MyExtension { type Fut = Pin>>>; fn call(&mut self, event: NextEvent) -> Self::Fut { match event { NextEvent::Shutdown(_e) => { - // do something with the shutdown event + self.data.clear(); } - _ => { - // ignore any other event - // because we've registered the extension - // only to receive SHUTDOWN events + NextEvent::Invoke(e) => { + self.data.push(e); } } Box::pin(ready(Ok(()))) @@ -29,5 +30,5 @@ async fn main() -> Result<(), Error> { // can be replaced with any other method of initializing `log` SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap(); - run(MyExtension {}).await + run(MyExtension::default()).await } From 36cdbd857d35d0c05c645f4b70fb10a8f6c83687 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Sat, 4 Dec 2021 15:01:28 -0800 Subject: [PATCH 08/12] Fix formatting. Signed-off-by: David Calavera --- lambda-extension/examples/custom_trait_implementation.rs | 6 ++++-- lambda-runtime-api-client/src/lib.rs | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/lambda-extension/examples/custom_trait_implementation.rs b/lambda-extension/examples/custom_trait_implementation.rs index 87567220..caef7730 100644 --- a/lambda-extension/examples/custom_trait_implementation.rs +++ b/lambda-extension/examples/custom_trait_implementation.rs @@ -1,8 +1,10 @@ use lambda_extension::{run, Error, Extension, InvokeEvent, NextEvent}; use log::LevelFilter; use simple_logger::SimpleLogger; -use std::future::{ready, Future}; -use std::pin::Pin; +use std::{ + future::{ready, Future}, + pin::Pin, +}; #[derive(Default)] struct MyExtension { diff --git a/lambda-runtime-api-client/src/lib.rs b/lambda-runtime-api-client/src/lib.rs index 6fca23c6..e585944e 100644 --- a/lambda-runtime-api-client/src/lib.rs +++ b/lambda-runtime-api-client/src/lib.rs @@ -4,8 +4,10 @@ //! This crate includes a base HTTP client to interact with //! the AWS Lambda Runtime API. use http::{uri::Scheme, Request, Response, Uri}; -use hyper::client::{connect::Connection, HttpConnector}; -use hyper::Body; +use hyper::{ + client::{connect::Connection, HttpConnector}, + Body, +}; use std::fmt::Debug; use tokio::io::{AsyncRead, AsyncWrite}; use tower_service::Service; From 61109266fb8760a2a4394b2ef14b151d56a94f80 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Mon, 6 Dec 2021 15:33:01 -0800 Subject: [PATCH 09/12] Remove unused dependencies. Signed-off-by: David Calavera --- lambda-extension/Cargo.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/lambda-extension/Cargo.toml b/lambda-extension/Cargo.toml index e8ed3fe3..e78e2bf0 100644 --- a/lambda-extension/Cargo.toml +++ b/lambda-extension/Cargo.toml @@ -11,7 +11,6 @@ keywords = ["AWS", "Lambda", "API"] readme = "../README.md" [dependencies] -async-trait = "0.1.51" tokio = { version = "1.0", features = ["macros", "io-util", "sync", "rt-multi-thread"] } hyper = { version = "0.14", features = ["http1", "client", "server", "stream", "runtime"] } serde = { version = "1", features = ["derive"] } @@ -19,8 +18,6 @@ serde_json = "^1" bytes = "1.0" http = "0.2" async-stream = "0.3" -futures = "0.3" -tracing-error = "0.2" tracing = { version = "0.1", features = ["log"] } tower-service = "0.3" tokio-stream = "0.1.2" From f607efae9945723ee94c6d8932f2df892b679154 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Mon, 6 Dec 2021 16:07:41 -0800 Subject: [PATCH 10/12] Add README files for the new crates. Signed-off-by: David Calavera --- README.md | 3 ++ lambda-extension/Cargo.toml | 2 +- lambda-extension/README.md | 56 ++++++++++++++++++++++++++++ lambda-runtime-api-client/Cargo.toml | 2 +- lambda-runtime-api-client/README.md | 35 +++++++++++++++++ 5 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 lambda-extension/README.md create mode 100644 lambda-runtime-api-client/README.md diff --git a/README.md b/README.md index a9616f1a..96f24476 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,9 @@ This package makes it easy to run AWS Lambda Functions written in Rust. This wor - [![Docs](https://docs.rs/lambda_runtime/badge.svg)](https://docs.rs/lambda_runtime) **`lambda-runtime`** is a library that provides a Lambda runtime for applications written in Rust. - [![Docs](https://docs.rs/lambda_http/badge.svg)](https://docs.rs/lambda_http) **`lambda-http`** is a library that makes it easy to write API Gateway proxy event focused Lambda functions in Rust. +- [![Docs](https://docs.rs/lambda_extension/badge.svg)](https://docs.rs/lambda_extension) **`lambda-extension`** is a library that makes it easy to write Lambda Runtime Extensions in Rust. +- [![Docs](https://docs.rs/lambda_runtime_api_client/badge.svg)](https://docs.rs/lambda_runtime_api_client) **`lambda-runtime-api-client`** is a shared library between the lambda runtime and lambda extension libraries that includes a common API client to talk with the AWS Lambda Runtime API. + ## Example function diff --git a/lambda-extension/Cargo.toml b/lambda-extension/Cargo.toml index e78e2bf0..05f1d6f7 100644 --- a/lambda-extension/Cargo.toml +++ b/lambda-extension/Cargo.toml @@ -8,7 +8,7 @@ license = "Apache-2.0" repository = "https://github.com/awslabs/aws-lambda-rust-runtime" categories = ["web-programming::http-server"] keywords = ["AWS", "Lambda", "API"] -readme = "../README.md" +readme = "README.md" [dependencies] tokio = { version = "1.0", features = ["macros", "io-util", "sync", "rt-multi-thread"] } diff --git a/lambda-extension/README.md b/lambda-extension/README.md new file mode 100644 index 00000000..2f28b8e3 --- /dev/null +++ b/lambda-extension/README.md @@ -0,0 +1,56 @@ +# Runtime Extensions for AWS Lambda in Rust + +[![Docs](https://docs.rs/lambda_extension/badge.svg)](https://docs.rs/lambda_extension) + +**`lambda-extension`** is a library that makes it easy to write AWS Lambda Runtime Extensions in Rust. + +## Example extension + +The code below creates a simple extension that's registered to every `INVOKE` and `SHUTDOWN` events, and logs them in CloudWatch. + +```rust,no_run +use lambda_extension::{extension_fn, Error, NextEvent}; +use log::LevelFilter; +use simple_logger::SimpleLogger; +use tracing::info; + +async fn log_extension(event: NextEvent) -> Result<(), Error> { + match event { + NextEvent::Shutdown(event) => { + info!("{}", event); + } + NextEvent::Invoke(event) => { + info!("{}", event); + } + } + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + SimpleLogger::new().with_level(LevelFilter::Info).init().unwrap(); + + let func = extension_fn(log_extension); + lambda_extension::run(func).await +} +``` + +## Deployment + +Lambda extensions can be added to your functions either using [Lambda layers](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#using-extensions-config), or adding them to [containers images](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#invocation-extensions-images). Regardless of how you deploy them, the extensions MUST be compiled against the same architecture that your lambda functions runs on. The only two architectures that AWS Lambda supports are `aarch64-unknown-linux-gnu` for ARM functions, and `x86_64-unknown-linux-gnu` for x86 functions. + +### Building extensions + +Once you've decided which target you'll use, you can install it by running the next `rustup` command: + +```bash +$ rustup target add x86_64-unknown-linux-gnu +``` + +Then, you can compile the extension against that target: + +```bash +$ cargo build -p lambda_extension --example basic --release --target x86_64-unknown-linux-gnu +``` + +This previous command will generate a binary file in `target/x86_64-unknown-linux-gnu/release/examples` called `basic`. When the extension is registered with the [Runtime Extensions API](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html#runtimes-extensions-api-reg), that's the name that the extension will be registered with. If you want to register the extension with a different name, you only have to rename this binary file and deploy it with the new name. \ No newline at end of file diff --git a/lambda-runtime-api-client/Cargo.toml b/lambda-runtime-api-client/Cargo.toml index 34ee7476..48188a91 100644 --- a/lambda-runtime-api-client/Cargo.toml +++ b/lambda-runtime-api-client/Cargo.toml @@ -8,7 +8,7 @@ license = "Apache-2.0" repository = "https://github.com/awslabs/aws-lambda-rust-runtime" categories = ["web-programming::http-server"] keywords = ["AWS", "Lambda", "API"] -readme = "../README.md" +readme = "README.md" [dependencies] http = "0.2" diff --git a/lambda-runtime-api-client/README.md b/lambda-runtime-api-client/README.md new file mode 100644 index 00000000..25cfbc03 --- /dev/null +++ b/lambda-runtime-api-client/README.md @@ -0,0 +1,35 @@ +# AWS Lambda Runtime API Client + +[![Docs](https://docs.rs/lambda_runtime_api_client/badge.svg)](https://docs.rs/lambda_runtime_api_client) + +**`lambda-runtime-api-client`** is a library to interact with the AWS Lambda Runtime API. + +This crate provides simple building blocks to send REST request to this API. + +## Example + +```rust,no_run +use http::{Method, Request}; +use hyper::Body; +use lambda_runtime_api_client::{build_request, Client, Error}; + +fn register_request(extension_name: &str, events: &[&str]) -> Result, Error> { + let events = serde_json::json!({ "events": events }); + + let req = build_request() + .method(Method::POST) + .uri("/2020-01-01/extension/register") + .header("Lambda-Extension-Name", extension_name) + .body(Body::from(serde_json::to_string(&events)?))?; + + Ok(req) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let client = Client::builder().build()?; + let request = register_request("my_extension", &["INVOKE"])?; + + client.call(request).await +} +``` From bbde54137ee2a33b1c3a7b5cb114e57972a0e2c1 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Tue, 7 Dec 2021 10:14:27 -0800 Subject: [PATCH 11/12] Update readme files. Signed-off-by: David Calavera --- lambda-extension/README.md | 12 +++++++----- lambda-runtime-api-client/README.md | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/lambda-extension/README.md b/lambda-extension/README.md index 2f28b8e3..4982779f 100644 --- a/lambda-extension/README.md +++ b/lambda-extension/README.md @@ -2,7 +2,7 @@ [![Docs](https://docs.rs/lambda_extension/badge.svg)](https://docs.rs/lambda_extension) -**`lambda-extension`** is a library that makes it easy to write AWS Lambda Runtime Extensions in Rust. +**`lambda-extension`** is a library that makes it easy to write [AWS Lambda Runtime Extensions](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html) in Rust. ## Example extension @@ -37,20 +37,22 @@ async fn main() -> Result<(), Error> { ## Deployment -Lambda extensions can be added to your functions either using [Lambda layers](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#using-extensions-config), or adding them to [containers images](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#invocation-extensions-images). Regardless of how you deploy them, the extensions MUST be compiled against the same architecture that your lambda functions runs on. The only two architectures that AWS Lambda supports are `aarch64-unknown-linux-gnu` for ARM functions, and `x86_64-unknown-linux-gnu` for x86 functions. +Lambda extensions can be added to your functions either using [Lambda layers](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#using-extensions-config), or adding them to [containers images](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#invocation-extensions-images). + +Regardless of how you deploy them, the extensions MUST be compiled against the same architecture that your lambda functions runs on. ### Building extensions Once you've decided which target you'll use, you can install it by running the next `rustup` command: ```bash -$ rustup target add x86_64-unknown-linux-gnu +$ rustup target add x86_64-unknown-linux-musl ``` Then, you can compile the extension against that target: ```bash -$ cargo build -p lambda_extension --example basic --release --target x86_64-unknown-linux-gnu +$ cargo build -p lambda_extension --example basic --release --target x86_64-unknown-linux-musl ``` -This previous command will generate a binary file in `target/x86_64-unknown-linux-gnu/release/examples` called `basic`. When the extension is registered with the [Runtime Extensions API](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html#runtimes-extensions-api-reg), that's the name that the extension will be registered with. If you want to register the extension with a different name, you only have to rename this binary file and deploy it with the new name. \ No newline at end of file +This previous command will generate a binary file in `target/x86_64-unknown-linux-musl/release/examples` called `basic`. When the extension is registered with the [Runtime Extensions API](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html#runtimes-extensions-api-reg), that's the name that the extension will be registered with. If you want to register the extension with a different name, you only have to rename this binary file and deploy it with the new name. \ No newline at end of file diff --git a/lambda-runtime-api-client/README.md b/lambda-runtime-api-client/README.md index 25cfbc03..530fefdd 100644 --- a/lambda-runtime-api-client/README.md +++ b/lambda-runtime-api-client/README.md @@ -4,7 +4,7 @@ **`lambda-runtime-api-client`** is a library to interact with the AWS Lambda Runtime API. -This crate provides simple building blocks to send REST request to this API. +This crate provides simple building blocks to send REST request to this API. You probably don't need to use this crate directly, look at [lambda_runtime](https://docs.rs/lambda_runtime) and [lambda_extension](https://docs.rs/lambda_extension) instead. ## Example From 981b7cea7a99f5e7c7faf36a1774068a48cb8c68 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Fri, 10 Dec 2021 18:30:58 -0800 Subject: [PATCH 12/12] Fix extension name Cleanup the path from the executable when it takes the name from arg[0]. Signed-off-by: David Calavera --- lambda-extension/src/lib.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lambda-extension/src/lib.rs b/lambda-extension/src/lib.rs index 71b0d893..41f56890 100644 --- a/lambda-extension/src/lib.rs +++ b/lambda-extension/src/lib.rs @@ -9,6 +9,7 @@ use hyper::client::{connect::Connection, HttpConnector}; use lambda_runtime_api_client::Client; use serde::Deserialize; use std::future::Future; +use std::path::PathBuf; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_stream::StreamExt; use tower_service::Service; @@ -220,7 +221,12 @@ impl<'a> RuntimeBuilder<'a> { Some(name) => name.into(), None => { let args: Vec = std::env::args().collect(); - args[0].clone() + PathBuf::from(args[0].clone()) + .file_name() + .expect("unexpected executable name") + .to_str() + .expect("unexpect executable name") + .to_string() } };