From 2f7eb744229ed7025a702c3312206806f2b2de5c Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sat, 5 Jan 2019 20:00:23 +0000 Subject: [PATCH 01/12] RuntimeClient::next_event returns a Future --- lambda-runtime-client/src/client.rs | 39 +++++++++++++---------------- lambda-runtime-client/src/lib.rs | 4 ++- lambda-runtime/src/runtime.rs | 3 ++- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/lambda-runtime-client/src/client.rs b/lambda-runtime-client/src/client.rs index f87a2d2c..6e710901 100644 --- a/lambda-runtime-client/src/client.rs +++ b/lambda-runtime-client/src/client.rs @@ -8,6 +8,7 @@ use hyper::{ }; use serde_derive::Deserialize; use serde_json; +use tokio::prelude::future::IntoFuture; use tokio::runtime::Runtime; use crate::error::{ApiError, ErrorResponse, RuntimeApiError}; @@ -149,20 +150,20 @@ impl RuntimeClient { impl RuntimeClient { /// Polls for new events to the Runtime APIs. - pub fn next_event(&self) -> Result<(Vec, EventContext), ApiError> { + pub fn next_event(&self) -> impl Future, EventContext), Error=ApiError> { let uri = format!( "http://{}/{}/runtime/invocation/next", self.endpoint, RUNTIME_API_VERSION - ) - .parse()?; + ).parse(); trace!("Polling for next event"); - - // We wait instead of processing the future asynchronously because AWS Lambda - // itself enforces only one event per container at a time. No point in taking on - // the additional complexity. - let out = self.http_client.get(uri).wait(); - match out { - Ok(resp) => { + let http_client = self.http_client.clone(); + uri.into_future() + .map_err(ApiError::from) + .and_then(move |uri| http_client.get(uri).map_err(|e| { + error!("Error when fetching next event from Runtime API: {}", e); + ApiError::from(e) + })) + .and_then(|resp| { if resp.status().is_client_error() { error!( "Runtime API returned client error when polling for new events: {}", @@ -182,22 +183,18 @@ impl RuntimeClient { .unrecoverable() .clone()); } - let ctx = self.get_event_context(&resp.headers())?; - let out = resp.into_body().concat2().wait()?; - let buf: Vec = out.into_bytes().to_vec(); + return Ok((Self::get_event_context(&resp.headers())?, resp)); + }).and_then(|(ctx, resp)| Ok(ctx).into_future().join(resp.into_body().concat2().map_err(Into::into))) + .map(|(ctx, body)| { + let buf = body.into_bytes().to_vec(); trace!( "Received new event for request id {}. Event length {} bytes", ctx.aws_request_id, buf.len() ); - Ok((buf, ctx)) - } - Err(e) => { - error!("Error when fetching next event from Runtime API: {}", e); - Err(ApiError::from(e)) - } - } + (buf, ctx) + }) } /// Calls the Lambda Runtime APIs to submit a response to an event. In this function we treat @@ -378,7 +375,7 @@ impl RuntimeClient { /// A `Result` containing the populated `EventContext` or an `ApiError` if the required headers /// were not present or the client context and cognito identity could not be parsed from the /// JSON string. - fn get_event_context(&self, headers: &HeaderMap) -> Result { + fn get_event_context(headers: &HeaderMap) -> Result { // let headers = resp.headers(); let aws_request_id = match headers.get(LambdaHeaders::RequestId.as_str()) { diff --git a/lambda-runtime-client/src/lib.rs b/lambda-runtime-client/src/lib.rs index efd336bd..69d0f470 100644 --- a/lambda-runtime-client/src/lib.rs +++ b/lambda-runtime-client/src/lib.rs @@ -14,11 +14,13 @@ //! //! ```rust,no_run //! extern crate lambda_runtime_client; +//! extern crate tokio; //! #[macro_use] //! extern crate serde_derive; //! extern crate serde_json; //! //! use lambda_runtime_client::{RuntimeClient, EventContext}; +//! use tokio::prelude::future::Future; //! //! #[derive(Serialize, Deserialize, Debug)] //! struct CustomEvent { @@ -35,7 +37,7 @@ //! let client = RuntimeClient::new(runtime_endpoint, None) //! .expect("Could not initialize client"); //! -//! let (event_data, event_context) = client.next_event() +//! let (event_data, event_context) = client.next_event().wait() //! .expect("Could not retrieve next event"); //! let custom_event: CustomEvent = serde_json::from_slice(&event_data) //! .expect("Could not turn Vec into CustomEvent object"); diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index caac0983..879b36ce 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -3,6 +3,7 @@ use std::{error::Error, marker::PhantomData, result}; use lambda_runtime_client::RuntimeClient; use serde; use serde_json; +use tokio::prelude::future::Future; use tokio::runtime::Runtime as TokioRuntime; use crate::{ @@ -294,7 +295,7 @@ where } } - match self.runtime_client.next_event() { + match self.runtime_client.next_event().wait() { Ok((ev_data, invocation_ctx)) => { let parse_result = serde_json::from_slice(&ev_data); match parse_result { From a5072a01919d19a2d79db7cb466c0a7f4b4c856a Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sat, 5 Jan 2019 20:45:51 +0000 Subject: [PATCH 02/12] RuntimeClient::event_response returns a Future --- lambda-runtime-client/src/client.rs | 22 ++++++++++++---------- lambda-runtime-client/src/lib.rs | 2 +- lambda-runtime/src/runtime.rs | 2 +- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/lambda-runtime-client/src/client.rs b/lambda-runtime-client/src/client.rs index 6e710901..91eb126e 100644 --- a/lambda-runtime-client/src/client.rs +++ b/lambda-runtime-client/src/client.rs @@ -208,21 +208,23 @@ impl RuntimeClient { /// * `output` The object be sent back to the Runtime APIs as a response. /// /// # Returns - /// A `Result` object containing a bool return value for the call or an `error::ApiError` instance. - pub fn event_response(&self, request_id: &str, output: Vec) -> Result<(), ApiError> { - let uri: Uri = format!( + /// A `Future` object containing a either resolving () for success or an `error::ApiError` instance. + pub fn event_response(&self, request_id: String, output: Vec) -> impl Future { + let uri = format!( "http://{}/{}/runtime/invocation/{}/response", self.endpoint, RUNTIME_API_VERSION, request_id - ) - .parse()?; + ).parse(); trace!( "Posting response for request {} to Runtime API. Response length {} bytes", request_id, output.len() ); - let req = self.get_runtime_post_request(&uri, output); - - match self.http_client.request(req).wait() { + let http_client = self.http_client.clone(); + uri.into_future() + .map_err(ApiError::from) + .map(move |uri| Self::get_runtime_post_request(&uri, output)) + .and_then(move |req| http_client.request(req).map_err(ApiError::from)) + .then(move |result| match result { Ok(resp) => { if !resp.status().is_success() { error!( @@ -242,7 +244,7 @@ impl RuntimeClient { error!("Error when calling runtime API for request {}: {}", request_id, e); Err(ApiError::from(e)) } - } + }) } /// Calls Lambda's Runtime APIs to send an error generated by the `Handler`. Because it's rust, @@ -341,7 +343,7 @@ impl RuntimeClient { /// /// # Returns /// A Populated Hyper `Request` object. - fn get_runtime_post_request(&self, uri: &Uri, body: Vec) -> Request { + fn get_runtime_post_request(uri: &Uri, body: Vec) -> Request { Request::builder() .method(Method::POST) .uri(uri.clone()) diff --git a/lambda-runtime-client/src/lib.rs b/lambda-runtime-client/src/lib.rs index 69d0f470..b028e663 100644 --- a/lambda-runtime-client/src/lib.rs +++ b/lambda-runtime-client/src/lib.rs @@ -47,7 +47,7 @@ //! let resp_object = CustomResponse{ surname: String::from("Doe")}; //! let resp_vec = serde_json::to_vec(&resp_object) //! .expect("Could not serialize CustomResponse to Vec"); -//! client.event_response(&event_context.aws_request_id, resp_vec) +//! client.event_response(event_context.aws_request_id.clone(), resp_vec).wait() //! .expect("Response sent successfully"); //! } else { //! // return a custom error by implementing the RuntimeApiError trait. diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 879b36ce..2a139d28 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -213,7 +213,7 @@ where ); match serde_json::to_vec(&response) { Ok(response_bytes) => { - match self.runtime_client.event_response(&request_id, response_bytes) { + match self.runtime_client.event_response(request_id.clone(), response_bytes).wait() { Ok(_) => info!("Response for {} accepted by Runtime API", request_id), // unrecoverable error while trying to communicate with the endpoint. // we let the Lambda Runtime API know that we have died From 4301ffd2b115031aa04d88369af8c2eb3214fbb2 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sat, 5 Jan 2019 22:25:10 +0000 Subject: [PATCH 03/12] RuntimeClient::event_error returns Future --- lambda-runtime-client/src/client.rs | 42 ++++++++++++++++------------- lambda-runtime/src/runtime.rs | 4 +-- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/lambda-runtime-client/src/client.rs b/lambda-runtime-client/src/client.rs index 91eb126e..5adb3857 100644 --- a/lambda-runtime-client/src/client.rs +++ b/lambda-runtime-client/src/client.rs @@ -258,26 +258,32 @@ impl RuntimeClient { /// object. /// /// # Returns - /// A `Result` object containing a bool return value for the call or an `error::ApiError` instance. - pub fn event_error(&self, request_id: &str, e: &dyn RuntimeApiError) -> Result<(), ApiError> { - let uri: Uri = format!( + /// A `Future` object containing a either resolving () for success or an `error::ApiError` instance. + pub fn event_error(&self, request_id: String, e: &dyn RuntimeApiError) -> impl Future { + let uri = format!( "http://{}/{}/runtime/invocation/{}/error", self.endpoint, RUNTIME_API_VERSION, request_id ) - .parse()?; - trace!( - "Posting error to runtime API for request {}: {}", - request_id, - e.to_response().error_message - ); - let req = self.get_runtime_error_request(&uri, &e.to_response()); - - match self.http_client.request(req).wait() { + .parse(); + let http_client = self.http_client.clone(); + let response = e.to_response(); + let request_id2 = request_id.clone(); + uri.into_future() + .map_err(ApiError::from) + .map(move |uri| (Self::get_runtime_error_request(&uri, &response), response)) + .and_then(move |(req, error_response)| { + trace!( + "Posting error to runtime API for request {}: {}", + request_id, + error_response.error_message + ); + http_client.request(req).map_err(ApiError::from) + }).then(move |result| match result { Ok(resp) => { if !resp.status().is_success() { error!( "Error from Runtime API when posting error response for request {}: {}", - request_id, + request_id2, resp.status() ); return Err(ApiError::new(&format!( @@ -285,14 +291,14 @@ impl RuntimeClient { resp.status() ))); } - trace!("Posted error response for request id {}", request_id); + trace!("Posted error response for request id {}", request_id2); Ok(()) } Err(e) => { - error!("Error when calling runtime API for request {}: {}", request_id, e); + error!("Error when calling runtime API for request {}: {}", request_id2, e); Err(ApiError::from(e)) } - } + }) } /// Calls the Runtime APIs to report a failure during the init process. @@ -311,7 +317,7 @@ impl RuntimeClient { .parse() .expect("Could not generate Runtime URI"); error!("Calling fail_init Runtime API: {}", e.to_response().error_message); - let req = self.get_runtime_error_request(&uri, &e.to_response()); + let req = Self::get_runtime_error_request(&uri, &e.to_response()); self.http_client .request(req) @@ -352,7 +358,7 @@ impl RuntimeClient { .unwrap() } - fn get_runtime_error_request(&self, uri: &Uri, e: &ErrorResponse) -> Request { + fn get_runtime_error_request(uri: &Uri, e: &ErrorResponse) -> Request { let body = serde_json::to_vec(e).expect("Could not turn error object into response JSON"); Request::builder() .method(Method::POST) diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 2a139d28..4f9fccb8 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -244,7 +244,7 @@ where Err(e) => { debug!("Handler returned an error for {}: {}", request_id, e); debug!("Attempting to send error response to Runtime API for {}", request_id); - match self.runtime_client.event_error(&request_id, &e) { + match self.runtime_client.event_error(request_id.clone(), &e).wait() { Ok(_) => info!("Error response for {} accepted by Runtime API", request_id), Err(e) => { error!("Unable to send error response for {} to Runtime API: {}", request_id, e); @@ -281,7 +281,7 @@ where match err.request_id.clone() { Some(req_id) => { self.runtime_client - .event_error(&req_id, &err) + .event_error(req_id, &err).wait() .expect("Could not send event error response"); } None => { From 5adedefb4bf5a122222b8c5ff8cfe88ce7fd9eda Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sat, 5 Jan 2019 22:52:08 +0000 Subject: [PATCH 04/12] Drive execution from a tokio Runtime --- lambda-http/src/lib.rs | 4 ++-- lambda-runtime-client/src/client.rs | 12 +++-------- lambda-runtime-client/src/lib.rs | 4 +++- lambda-runtime/src/runtime.rs | 31 ++++++++++++++++++----------- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index 56402a1a..0c01d45a 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -92,14 +92,14 @@ where /// /// # Panics /// The function panics if the Lambda environment variables are not set. -pub fn start(f: impl Handler, runtime: Option) +pub fn start(f: impl Handler + 'static, runtime: Option) where R: IntoResponse, { // handler requires a mutable ref let mut func = f; lambda::start( - |req: LambdaRequest<'_>, ctx: Context| { + move |req: LambdaRequest<'_>, ctx: Context| { let is_alb = req.request_context.is_alb(); func.run(req.into(), ctx) .map(|resp| LambdaResponse::from_response(is_alb, resp.into_response())) diff --git a/lambda-runtime-client/src/client.rs b/lambda-runtime-client/src/client.rs index 5adb3857..cdced843 100644 --- a/lambda-runtime-client/src/client.rs +++ b/lambda-runtime-client/src/client.rs @@ -9,7 +9,7 @@ use hyper::{ use serde_derive::Deserialize; use serde_json; use tokio::prelude::future::IntoFuture; -use tokio::runtime::Runtime; +use tokio::runtime::TaskExecutor; use crate::error::{ApiError, ErrorResponse, RuntimeApiError}; @@ -122,7 +122,6 @@ pub struct EventContext { /// Used by the Runtime to communicate with the internal endpoint. pub struct RuntimeClient { - _runtime: Runtime, http_client: Client, endpoint: String, } @@ -130,18 +129,13 @@ pub struct RuntimeClient { impl RuntimeClient { /// Creates a new instance of the Runtime APIclient SDK. The http client has timeouts disabled and /// will always send a `Connection: keep-alive` header. - pub fn new(endpoint: String, runtime: Option) -> Result { + pub fn new(endpoint: String, task_executor: TaskExecutor) -> Result { debug!("Starting new HttpRuntimeClient for {}", endpoint); // start a tokio core main event loop for hyper - let runtime = match runtime { - Some(r) => r, - None => Runtime::new()?, - }; - let http_client = Client::builder().executor(runtime.executor()).build_http(); + let http_client = Client::builder().executor(task_executor).build_http(); Ok(RuntimeClient { - _runtime: runtime, http_client, endpoint, }) diff --git a/lambda-runtime-client/src/lib.rs b/lambda-runtime-client/src/lib.rs index b028e663..9f7ba59a 100644 --- a/lambda-runtime-client/src/lib.rs +++ b/lambda-runtime-client/src/lib.rs @@ -21,6 +21,7 @@ //! //! use lambda_runtime_client::{RuntimeClient, EventContext}; //! use tokio::prelude::future::Future; +//! use tokio::runtime::Runtime as TokioRuntime; //! //! #[derive(Serialize, Deserialize, Debug)] //! struct CustomEvent { @@ -33,8 +34,9 @@ //! } //! //! fn main() { +//! let tokio_runtime = TokioRuntime::new().expect("Could not make tokio runtime"); //! let runtime_endpoint = String::from("http://localhost:8080"); -//! let client = RuntimeClient::new(runtime_endpoint, None) +//! let client = RuntimeClient::new(runtime_endpoint, tokio_runtime.executor()) //! .expect("Could not initialize client"); //! //! let (event_data, event_context) = client.next_event().wait() diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 4f9fccb8..e1f5d1a2 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -3,7 +3,7 @@ use std::{error::Error, marker::PhantomData, result}; use lambda_runtime_client::RuntimeClient; use serde; use serde_json; -use tokio::prelude::future::Future; +use tokio::prelude::future::{Future, IntoFuture}; use tokio::runtime::Runtime as TokioRuntime; use crate::{ @@ -11,6 +11,7 @@ use crate::{ env::{ConfigProvider, EnvConfigProvider, FunctionSettings}, error::{HandlerError, RuntimeError}, }; +use tokio::runtime::TaskExecutor; const MAX_RETRIES: i8 = 3; @@ -37,12 +38,14 @@ where /// /// # Panics /// The function panics if the Lambda environment variables are not set. -pub fn start(f: impl Handler, runtime: Option) +pub fn start(f: impl Handler + 'static, runtime: Option) where - E: serde::de::DeserializeOwned, - O: serde::Serialize, + E: serde::de::DeserializeOwned + 'static, + O: serde::Serialize + 'static, { - start_with_config(f, &EnvConfigProvider::new(), runtime) + let mut runtime = runtime.unwrap_or_else(|| TokioRuntime::new().expect("Failed to start tokio runtime")); + let task_executor = runtime.executor(); + runtime.block_on(start_with_config(f, &EnvConfigProvider::new(), task_executor)).unwrap(); } #[macro_export] @@ -74,7 +77,7 @@ macro_rules! lambda { /// The function panics if the `ConfigProvider` returns an error from the `get_runtime_api_endpoint()` /// or `get_function_settings()` methods. The panic forces AWS Lambda to terminate the environment /// and spin up a new one for the next invocation. -pub(crate) fn start_with_config(f: impl Handler, config: &C, runtime: Option) +pub(crate) fn start_with_config(f: impl Handler, config: &C, task_executor: TaskExecutor) -> impl Future where E: serde::de::DeserializeOwned, O: serde::Serialize, @@ -100,9 +103,9 @@ where } } - match RuntimeClient::new(endpoint, runtime) { + match RuntimeClient::new(endpoint, task_executor) { Ok(client) => { - start_with_runtime_client(f, function_config, client); + start_with_runtime_client(f, function_config, client) } Err(e) => { panic!("Could not create runtime client SDK: {}", e); @@ -125,7 +128,7 @@ pub(crate) fn start_with_runtime_client( f: impl Handler, func_settings: FunctionSettings, client: RuntimeClient, -) where +) -> impl Future where E: serde::de::DeserializeOwned, O: serde::Serialize, { @@ -138,7 +141,7 @@ pub(crate) fn start_with_runtime_client( } // start the infinite loop - lambda_runtime.start(); + lambda_runtime.start() } /// Internal representation of the runtime object that polls for events and communicates @@ -198,7 +201,7 @@ where /// Starts the main event loop and begin polling or new events. If one of the /// Runtime APIs returns an unrecoverable error this method calls the init failed /// API and then panics. - fn start(&mut self) { + fn start(&mut self) -> impl Future { debug!("Beginning main event loop"); loop { let (event, ctx) = self.get_next_event(0, None); @@ -261,6 +264,9 @@ where } } } + + #[allow(unreachable_code)] + Ok(()).into_future() } /// Invoke the handler function. This method is split out of the main loop to @@ -332,11 +338,12 @@ pub(crate) mod tests { #[test] fn runtime_invokes_handler() { let config: &dyn env::ConfigProvider = &env::tests::MockConfigProvider { error: false }; + let runtime = TokioRuntime::new().expect("Could not create tokio runtime"); let client = RuntimeClient::new( config .get_runtime_api_endpoint() .expect("Could not get runtime endpoint"), - None, + runtime.executor(), ) .expect("Could not initialize client"); let handler = |_e: String, _c: context::Context| -> Result { Ok("hello".to_string()) }; From 695333c296d176354ad25a068e8b3b8f2c97edec Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sat, 5 Jan 2019 23:19:46 +0000 Subject: [PATCH 05/12] get_next_event uses a future loop_fn --- lambda-runtime-client/src/client.rs | 1 + lambda-runtime/src/runtime.rs | 98 ++++++++++++++++------------- 2 files changed, 54 insertions(+), 45 deletions(-) diff --git a/lambda-runtime-client/src/client.rs b/lambda-runtime-client/src/client.rs index cdced843..f6381b2f 100644 --- a/lambda-runtime-client/src/client.rs +++ b/lambda-runtime-client/src/client.rs @@ -121,6 +121,7 @@ pub struct EventContext { } /// Used by the Runtime to communicate with the internal endpoint. +#[derive(Clone)] pub struct RuntimeClient { http_client: Client, endpoint: String, diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index e1f5d1a2..0cf479d8 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -3,7 +3,7 @@ use std::{error::Error, marker::PhantomData, result}; use lambda_runtime_client::RuntimeClient; use serde; use serde_json; -use tokio::prelude::future::{Future, IntoFuture}; +use tokio::prelude::future::{Future, IntoFuture, loop_fn, Loop}; use tokio::runtime::Runtime as TokioRuntime; use crate::{ @@ -79,7 +79,7 @@ macro_rules! lambda { /// and spin up a new one for the next invocation. pub(crate) fn start_with_config(f: impl Handler, config: &C, task_executor: TaskExecutor) -> impl Future where - E: serde::de::DeserializeOwned, + E: serde::de::DeserializeOwned + 'static, O: serde::Serialize, C: ConfigProvider, { @@ -129,7 +129,7 @@ pub(crate) fn start_with_runtime_client( func_settings: FunctionSettings, client: RuntimeClient, ) -> impl Future where - E: serde::de::DeserializeOwned, + E: serde::de::DeserializeOwned + 'static, O: serde::Serialize, { let mut lambda_runtime: Runtime<_, E, O>; @@ -195,7 +195,7 @@ impl Runtime { impl Runtime where F: Handler, - E: serde::de::DeserializeOwned, + E: serde::de::DeserializeOwned + 'static, O: serde::Serialize, { /// Starts the main event loop and begin polling or new events. If one of the @@ -204,7 +204,7 @@ where fn start(&mut self) -> impl Future { debug!("Beginning main event loop"); loop { - let (event, ctx) = self.get_next_event(0, None); + let (event, ctx) = self.get_next_event().wait().unwrap(); let request_id = ctx.aws_request_id.clone(); info!("Received new event with AWS request id: {}", request_id); let function_outcome = self.invoke(event, ctx); @@ -279,53 +279,61 @@ where /// unless the error throws is not recoverable. /// /// # Return - /// The next `Event` object to be processed. - pub(super) fn get_next_event(&self, retries: i8, e: Option) -> (E, Context) { - if let Some(err) = e { - if retries > self.max_retries { - error!("Unrecoverable error while fetching next event: {}", err); - match err.request_id.clone() { - Some(req_id) => { - self.runtime_client - .event_error(req_id, &err).wait() - .expect("Could not send event error response"); - } - None => { - self.runtime_client.fail_init(&err); + /// A `Future` resolving to the next `Event` object to be processed. + pub(super) fn get_next_event(&self) -> impl Future { + let max_retries = self.max_retries; + let runtime_client = self.runtime_client.clone(); + let settings = self.settings.clone(); + loop_fn((0, None), move |(iteration, maybe_error): (i8, Option)| { + if let Some(err) = maybe_error { + if iteration > max_retries { + error!("Unrecoverable error while fetching next event: {}", err); + match err.request_id.clone() { + Some(req_id) => { + return Box::new(runtime_client + .event_error(req_id, &err) + .map_err(|e| format!("Could not send event error response: {}", e)) + // these errors are not recoverable. Either we can't communicate with the runtime APIs + // or we cannot parse the event. panic to restart the environment. + .then(|_| Err("Could not retrieve next event".to_owned()))) as Box> + } + None => { + runtime_client.fail_init(&err); + unreachable!(); + } } } - - // these errors are not recoverable. Either we can't communicate with the runtie APIs - // or we cannot parse the event. panic to restart the environment. - panic!("Could not retrieve next event"); } - } - match self.runtime_client.next_event().wait() { - Ok((ev_data, invocation_ctx)) => { - let parse_result = serde_json::from_slice(&ev_data); - match parse_result { - Ok(ev) => { - let mut handler_ctx = Context::new(self.settings.clone()); - handler_ctx.invoked_function_arn = invocation_ctx.invoked_function_arn; - handler_ctx.aws_request_id = invocation_ctx.aws_request_id; - handler_ctx.xray_trace_id = invocation_ctx.xray_trace_id; - handler_ctx.client_context = invocation_ctx.client_context; - handler_ctx.identity = invocation_ctx.identity; - handler_ctx.deadline = invocation_ctx.deadline; + let settings = settings.clone(); + Box::new(runtime_client.next_event().then(move |result| { + match result { + Ok((ev_data, invocation_ctx)) => { + let parse_result = serde_json::from_slice(&ev_data); + match parse_result { + Ok(ev) => { + let mut handler_ctx = Context::new(settings.clone()); + handler_ctx.invoked_function_arn = invocation_ctx.invoked_function_arn; + handler_ctx.aws_request_id = invocation_ctx.aws_request_id; + handler_ctx.xray_trace_id = invocation_ctx.xray_trace_id; + handler_ctx.client_context = invocation_ctx.client_context; + handler_ctx.identity = invocation_ctx.identity; + handler_ctx.deadline = invocation_ctx.deadline; - (ev, handler_ctx) - } - Err(e) => { - error!("Could not parse event to type: {}", e); - let mut runtime_err = RuntimeError::from(e); - runtime_err.request_id = Option::from(invocation_ctx.aws_request_id); - self.get_next_event(retries + 1, Option::from(runtime_err)) + Ok(Loop::Break((ev, handler_ctx))) + } + Err(e) => { + error!("Could not parse event to type: {}", e); + let mut runtime_err = RuntimeError::from(e); + runtime_err.request_id = Some(invocation_ctx.aws_request_id); + Ok(Loop::Continue((iteration + 1, Some(runtime_err)))) + } + } } + Err(e) => Ok(Loop::Continue((iteration + 1, Some(RuntimeError::from(e))))), } - } - Err(e) => self.get_next_event(retries + 1, Option::from(RuntimeError::from(e))), - } + })) as Box> + }) } } From e96a01303640dfad07f5dfe3c8da41a2c178a4fd Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sun, 6 Jan 2019 10:48:36 +0000 Subject: [PATCH 06/12] Runtime::start is fully async --- lambda-http/src/lib.rs | 4 +- lambda-runtime/src/runtime.rs | 176 +++++++++++++++++++--------------- 2 files changed, 99 insertions(+), 81 deletions(-) diff --git a/lambda-http/src/lib.rs b/lambda-http/src/lib.rs index 0c01d45a..9b6e66f4 100644 --- a/lambda-http/src/lib.rs +++ b/lambda-http/src/lib.rs @@ -70,14 +70,14 @@ use crate::{request::LambdaRequest, response::LambdaResponse}; pub type Request = http::Request; /// Functions serving as ALB and API Gateway handlers must conform to this type. -pub trait Handler { +pub trait Handler: Send { /// Run the handler. fn run(&mut self, event: Request, ctx: Context) -> Result; } impl Handler for F where - F: FnMut(Request, Context) -> Result, + F: FnMut(Request, Context) -> Result + Send, { fn run(&mut self, event: Request, ctx: Context) -> Result { (*self)(event, ctx) diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 0cf479d8..602a3dfc 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -12,18 +12,20 @@ use crate::{ error::{HandlerError, RuntimeError}, }; use tokio::runtime::TaskExecutor; +use std::sync::Arc; +use std::sync::Mutex; const MAX_RETRIES: i8 = 3; /// Functions acting as a handler must conform to this type. -pub trait Handler { +pub trait Handler: Send { /// Run the handler. fn run(&mut self, event: E, ctx: Context) -> Result; } impl Handler for F where - F: FnMut(E, Context) -> Result, + F: FnMut(E, Context) -> Result + Send, { fn run(&mut self, event: E, ctx: Context) -> Result { (*self)(event, ctx) @@ -40,8 +42,8 @@ where /// The function panics if the Lambda environment variables are not set. pub fn start(f: impl Handler + 'static, runtime: Option) where - E: serde::de::DeserializeOwned + 'static, - O: serde::Serialize + 'static, + E: serde::de::DeserializeOwned + Send + 'static, + O: serde::Serialize + Send + 'static, { let mut runtime = runtime.unwrap_or_else(|| TokioRuntime::new().expect("Failed to start tokio runtime")); let task_executor = runtime.executor(); @@ -77,10 +79,10 @@ macro_rules! lambda { /// The function panics if the `ConfigProvider` returns an error from the `get_runtime_api_endpoint()` /// or `get_function_settings()` methods. The panic forces AWS Lambda to terminate the environment /// and spin up a new one for the next invocation. -pub(crate) fn start_with_config(f: impl Handler, config: &C, task_executor: TaskExecutor) -> impl Future +pub(crate) fn start_with_config(f: impl Handler, config: &C, task_executor: TaskExecutor) -> impl Future + Send where - E: serde::de::DeserializeOwned + 'static, - O: serde::Serialize, + E: serde::de::DeserializeOwned + Send + 'static, + O: serde::Serialize + Send, C: ConfigProvider, { // if we cannot find the endpoint we panic, nothing else we can do. @@ -128,11 +130,11 @@ pub(crate) fn start_with_runtime_client( f: impl Handler, func_settings: FunctionSettings, client: RuntimeClient, -) -> impl Future where - E: serde::de::DeserializeOwned + 'static, - O: serde::Serialize, +) -> impl Future + Send where + E: serde::de::DeserializeOwned + Send + 'static, + O: serde::Serialize + Send, { - let mut lambda_runtime: Runtime<_, E, O>; + let lambda_runtime: Runtime<_, E, O>; match Runtime::new(f, func_settings, MAX_RETRIES, client) { Ok(r) => lambda_runtime = r, Err(e) => { @@ -148,7 +150,7 @@ pub(crate) fn start_with_runtime_client( /// with the Runtime APIs pub(super) struct Runtime { runtime_client: RuntimeClient, - handler: F, + handler: Arc>, max_retries: i8, settings: FunctionSettings, _phan: PhantomData<(E, O)>, @@ -183,7 +185,7 @@ impl Runtime { Ok(Runtime { runtime_client: client, settings: config, - handler: f, + handler: Arc::new(Mutex::new(f)), max_retries: retries, _phan: PhantomData, }) @@ -195,84 +197,103 @@ impl Runtime { impl Runtime where F: Handler, - E: serde::de::DeserializeOwned + 'static, - O: serde::Serialize, + E: serde::de::DeserializeOwned + Send + 'static, + O: serde::Serialize + Send, { /// Starts the main event loop and begin polling or new events. If one of the /// Runtime APIs returns an unrecoverable error this method calls the init failed /// API and then panics. - fn start(&mut self) -> impl Future { + fn start(&self) -> impl Future + Send { debug!("Beginning main event loop"); - loop { - let (event, ctx) = self.get_next_event().wait().unwrap(); - let request_id = ctx.aws_request_id.clone(); - info!("Received new event with AWS request id: {}", request_id); - let function_outcome = self.invoke(event, ctx); - match function_outcome { - Ok(response) => { - debug!( - "Function executed succesfully for {}, pushing response to Runtime API", - request_id - ); - match serde_json::to_vec(&response) { - Ok(response_bytes) => { - match self.runtime_client.event_response(request_id.clone(), response_bytes).wait() { - Ok(_) => info!("Response for {} accepted by Runtime API", request_id), - // unrecoverable error while trying to communicate with the endpoint. - // we let the Lambda Runtime API know that we have died - Err(e) => { - error!("Could not send response for {} to Runtime API: {}", request_id, e); - if !e.recoverable { - error!( - "Error for {} is not recoverable, sending fail_init signal and panicking.", - request_id - ); - self.runtime_client.fail_init(&e); - panic!("Could not send response"); + + let max_retries = self.max_retries; + let runtime_client = self.runtime_client.clone(); + let settings = self.settings.clone(); + let handler = self.handler.clone(); + + loop_fn((), move |()| { + let runtime_client = runtime_client.clone(); + let handler = handler.clone(); + Self::get_next_event(max_retries, runtime_client.clone(), settings.clone()).and_then(move |(event, ctx)| { + let runtime_client = runtime_client.clone(); + let request_id = ctx.aws_request_id.clone(); + info!("Received new event with AWS request id: {}", request_id); + let handler = handler.clone(); + let mut handler_function = handler.lock().unwrap(); + let function_outcome = handler_function.run(event, ctx); + match function_outcome { + Ok(response) => { + debug!( + "Function executed succesfully for {}, pushing response to Runtime API", + request_id + ); + match serde_json::to_vec(&response) { + Ok(response_bytes) => { + Box::new(runtime_client.event_response(request_id.clone(), response_bytes).then(move |result| match result { + Ok(_) => { + info!("Response for {} accepted by Runtime API", request_id); + Ok(()) + }, + // unrecoverable error while trying to communicate with the endpoint. + // we let the Lambda Runtime API know that we have died + Err(e) => { + error!("Could not send response for {} to Runtime API: {}", request_id, e); + if !e.recoverable { + error!( + "Error for {} is not recoverable, sending fail_init signal and panicking.", + request_id + ); + runtime_client.fail_init(&e); + panic!("Could not send response"); + } + Ok(()) } - } + }).map(|()| Loop::Continue(()))) as Box + Send> } - } - Err(e) => { - error!( - "Could not marshal output object to Vec JSON represnetation for request {}: {}", - request_id, e - ); - self.runtime_client - .fail_init(&RuntimeError::unrecoverable(e.description())); - panic!("Failed to marshal handler output, panic"); - } - } - } - Err(e) => { - debug!("Handler returned an error for {}: {}", request_id, e); - debug!("Attempting to send error response to Runtime API for {}", request_id); - match self.runtime_client.event_error(request_id.clone(), &e).wait() { - Ok(_) => info!("Error response for {} accepted by Runtime API", request_id), - Err(e) => { - error!("Unable to send error response for {} to Runtime API: {}", request_id, e); - if !e.recoverable { + Err(e) => { error!( - "Error for {} is not recoverable, sending fail_init signal and panicking", - request_id + "Could not marshal output object to Vec JSON represnetation for request {}: {}", + request_id, e ); - self.runtime_client.fail_init(&e); - panic!("Could not send error response"); + runtime_client + .fail_init(&RuntimeError::unrecoverable(e.description())); + Box::new(Err("Failed to marshal handler output, panic".to_owned()).into_future()) as Box + Send> } } } + Err(e) => { + debug!("Handler returned an error for {}: {}", request_id, e); + debug!("Attempting to send error response to Runtime API for {}", request_id); + Box::new(runtime_client.event_error(request_id.clone(), &e).then(move |result| match result { + Ok(_) => { + info!("Error response for {} accepted by Runtime API", request_id); + Ok(()) + }, + Err(e) => { + error!("Unable to send error response for {} to Runtime API: {}", request_id, e); + if !e.recoverable { + error!( + "Error for {} is not recoverable, sending fail_init signal and panicking", + request_id + ); + runtime_client.fail_init(&e); + panic!("Could not send error response"); + } + Ok(()) + } + }).map(|()| Loop::Continue(()))) as Box + Send> + } } - } - } - - #[allow(unreachable_code)] - Ok(()).into_future() + }) + }) } /// Invoke the handler function. This method is split out of the main loop to /// make it testable. + #[cfg(test)] pub(super) fn invoke(&mut self, e: E, ctx: Context) -> Result { - (&mut self.handler).run(e, ctx) + let mut handler = self.handler.lock().unwrap(); + (&mut handler).run(e, ctx) } /// Attempts to get the next event from the Runtime APIs and keeps retrying @@ -280,10 +301,7 @@ where /// /// # Return /// A `Future` resolving to the next `Event` object to be processed. - pub(super) fn get_next_event(&self) -> impl Future { - let max_retries = self.max_retries; - let runtime_client = self.runtime_client.clone(); - let settings = self.settings.clone(); + pub(super) fn get_next_event(max_retries: i8, runtime_client: RuntimeClient, settings: FunctionSettings) -> impl Future { loop_fn((0, None), move |(iteration, maybe_error): (i8, Option)| { if let Some(err) = maybe_error { if iteration > max_retries { @@ -295,7 +313,7 @@ where .map_err(|e| format!("Could not send event error response: {}", e)) // these errors are not recoverable. Either we can't communicate with the runtime APIs // or we cannot parse the event. panic to restart the environment. - .then(|_| Err("Could not retrieve next event".to_owned()))) as Box> + .then(|_| Err("Could not retrieve next event".to_owned()))) as Box + Send> } None => { runtime_client.fail_init(&err); @@ -332,7 +350,7 @@ where } Err(e) => Ok(Loop::Continue((iteration + 1, Some(RuntimeError::from(e))))), } - })) as Box> + })) as Box + Send> }) } } From c65d0e592380f325b52575cc2c1f03936beeb67b Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sun, 6 Jan 2019 00:24:28 +0000 Subject: [PATCH 07/12] Handler returns IntoFuture not Result Result implements IntoFuture, so this is backwards compatible, assuming Handler (and its Event type) are Send and 'static, which they almost certainly always are. --- lambda-runtime/src/runtime.rs | 135 ++++++++++++++++++---------------- 1 file changed, 72 insertions(+), 63 deletions(-) diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 602a3dfc..0dfb39b4 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -19,15 +19,23 @@ const MAX_RETRIES: i8 = 3; /// Functions acting as a handler must conform to this type. pub trait Handler: Send { + /// Future of return value returned by handler. + type Future: Future + Send; + /// IntoFuture of return value returned by handler. + type IntoFuture: IntoFuture + Send; + /// Run the handler. - fn run(&mut self, event: E, ctx: Context) -> Result; + fn run(&mut self, event: E, ctx: Context) -> Self::IntoFuture; } -impl Handler for F +impl + Send, IntoFut: IntoFuture + Send> Handler for F where - F: FnMut(E, Context) -> Result + Send, + F: FnMut(E, Context) -> IntoFut + Send, { - fn run(&mut self, event: E, ctx: Context) -> Result { + type Future = Fut; + type IntoFuture = IntoFut; + + fn run(&mut self, event: E, ctx: Context) -> IntoFut { (*self)(event, ctx) } } @@ -220,70 +228,71 @@ where info!("Received new event with AWS request id: {}", request_id); let handler = handler.clone(); let mut handler_function = handler.lock().unwrap(); - let function_outcome = handler_function.run(event, ctx); - match function_outcome { - Ok(response) => { - debug!( - "Function executed succesfully for {}, pushing response to Runtime API", - request_id - ); - match serde_json::to_vec(&response) { - Ok(response_bytes) => { - Box::new(runtime_client.event_response(request_id.clone(), response_bytes).then(move |result| match result { - Ok(_) => { - info!("Response for {} accepted by Runtime API", request_id); - Ok(()) - }, - // unrecoverable error while trying to communicate with the endpoint. - // we let the Lambda Runtime API know that we have died - Err(e) => { - error!("Could not send response for {} to Runtime API: {}", request_id, e); - if !e.recoverable { - error!( - "Error for {} is not recoverable, sending fail_init signal and panicking.", - request_id - ); - runtime_client.fail_init(&e); - panic!("Could not send response"); + handler_function.run(event, ctx).into_future().then(|function_outcome| { + match function_outcome { + Ok(response) => { + debug!( + "Function executed succesfully for {}, pushing response to Runtime API", + request_id + ); + match serde_json::to_vec(&response) { + Ok(response_bytes) => { + Box::new(runtime_client.event_response(request_id.clone(), response_bytes).then(move |result| match result { + Ok(_) => { + info!("Response for {} accepted by Runtime API", request_id); + Ok(()) + }, + // unrecoverable error while trying to communicate with the endpoint. + // we let the Lambda Runtime API know that we have died + Err(e) => { + error!("Could not send response for {} to Runtime API: {}", request_id, e); + if !e.recoverable { + error!( + "Error for {} is not recoverable, sending fail_init signal and panicking.", + request_id + ); + runtime_client.fail_init(&e); + panic!("Could not send response"); + } + Ok(()) } - Ok(()) - } - }).map(|()| Loop::Continue(()))) as Box + Send> - } - Err(e) => { - error!( - "Could not marshal output object to Vec JSON represnetation for request {}: {}", - request_id, e - ); - runtime_client - .fail_init(&RuntimeError::unrecoverable(e.description())); - Box::new(Err("Failed to marshal handler output, panic".to_owned()).into_future()) as Box + Send> - } - } - } - Err(e) => { - debug!("Handler returned an error for {}: {}", request_id, e); - debug!("Attempting to send error response to Runtime API for {}", request_id); - Box::new(runtime_client.event_error(request_id.clone(), &e).then(move |result| match result { - Ok(_) => { - info!("Error response for {} accepted by Runtime API", request_id); - Ok(()) - }, - Err(e) => { - error!("Unable to send error response for {} to Runtime API: {}", request_id, e); - if !e.recoverable { + }).map(|()| Loop::Continue(()))) as Box + Send> + } + Err(e) => { error!( - "Error for {} is not recoverable, sending fail_init signal and panicking", - request_id + "Could not marshal output object to Vec JSON represnetation for request {}: {}", + request_id, e ); - runtime_client.fail_init(&e); - panic!("Could not send error response"); + runtime_client + .fail_init(&RuntimeError::unrecoverable(e.description())); + Box::new(Err("Failed to marshal handler output, panic".to_owned()).into_future()) as Box + Send> } - Ok(()) } - }).map(|()| Loop::Continue(()))) as Box + Send> + } + Err(e) => { + debug!("Handler returned an error for {}: {}", request_id, e); + debug!("Attempting to send error response to Runtime API for {}", request_id); + Box::new(runtime_client.event_error(request_id.clone(), &e).then(move |result| match result { + Ok(_) => { + info!("Error response for {} accepted by Runtime API", request_id); + Ok(()) + }, + Err(e) => { + error!("Unable to send error response for {} to Runtime API: {}", request_id, e); + if !e.recoverable { + error!( + "Error for {} is not recoverable, sending fail_init signal and panicking", + request_id + ); + runtime_client.fail_init(&e); + panic!("Could not send error response"); + } + Ok(()) + } + }).map(|()| Loop::Continue(()))) as Box + Send> + } } - } + }) }) }) } @@ -291,7 +300,7 @@ where /// Invoke the handler function. This method is split out of the main loop to /// make it testable. #[cfg(test)] - pub(super) fn invoke(&mut self, e: E, ctx: Context) -> Result { + pub(super) fn invoke(&mut self, e: E, ctx: Context) -> F::IntoFuture { let mut handler = self.handler.lock().unwrap(); (&mut handler).run(e, ctx) } From 2f6e599e269ec99be0d98df0a4e88a5d8f18eb11 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sun, 6 Jan 2019 10:54:21 +0000 Subject: [PATCH 08/12] Add contrived async Handler example --- lambda-runtime/examples/async.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 lambda-runtime/examples/async.rs diff --git a/lambda-runtime/examples/async.rs b/lambda-runtime/examples/async.rs new file mode 100644 index 00000000..d32c10d6 --- /dev/null +++ b/lambda-runtime/examples/async.rs @@ -0,0 +1,30 @@ +use std::error::Error; + +use lambda_runtime::{error::HandlerError, lambda, Context}; +use serde_derive::{Deserialize, Serialize}; +use simple_logger; +use tokio::prelude::future::{Future, ok}; + +#[derive(Deserialize)] +struct CustomEvent { + #[serde(rename = "firstName")] + first_name: String, +} + +#[derive(Serialize)] +struct CustomOutput { + message: String, +} + +fn main() -> Result<(), Box> { + simple_logger::init_with_level(log::Level::Debug).unwrap(); + lambda!(my_handler); + + Ok(()) +} + +fn my_handler(e: CustomEvent, _c: Context) -> impl Future { + ok(format!("Hello, {}!", e.first_name)) + .map(|message| format!("{} (modified in a Future)", message)) + .map(|message| CustomOutput { message }) +} From 3aa5a853e47569af0168057d87cbc36bffd90117 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sun, 6 Jan 2019 10:57:31 +0000 Subject: [PATCH 09/12] Update README --- README.md | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 028f9da7..0c074345 100644 --- a/README.md +++ b/README.md @@ -174,18 +174,21 @@ For error reporting to the runtime APIs the library defines the `RuntimeApiError This library makes it easy to create Rust executables for AWS lambda. The library defines a `lambda!()` macro. Call the `lambda!()` macro from your main method with an implementation the `Handler` type: ```rust -pub trait Handler { +pub trait Handler: Send { + /// Future of return value returned by handler. + type Future: Future + Send; + /// IntoFuture of return value returned by handler. + type IntoFuture: IntoFuture + Send; + /// Run the handler. - fn run( - &mut self, - event: E, - ctx: Context - ) -> Result; + fn run(&mut self, event: E, ctx: Context) -> Self::IntoFuture; } ``` `Handler` provides a default implementation that enables you to provide a Rust closure or function pointer to the `lambda!()` macro. +If your handler is synchronous, you can just return a `Result` from it; if your handler is asynchronous, you can return a `Future` from it. + Optionally, you can pass your own instance of Tokio runtime to the `lambda!()` macro. See our [`with_custom_runtime.rs` example](https://github.com/awslabs/aws-lambda-rust-runtime/tree/master/lambda-runtime/examples/with_custom_runtime.rs) ## AWS event objects From f2ae75f408d9761af270a29a04d3291ca830c518 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sun, 6 Jan 2019 12:45:28 +0000 Subject: [PATCH 10/12] cargo fmt --- lambda-runtime-client/src/client.rs | 125 +++++++++++++++------------- lambda-runtime/examples/async.rs | 4 +- lambda-runtime/src/runtime.rs | 90 ++++++++++++-------- 3 files changed, 122 insertions(+), 97 deletions(-) diff --git a/lambda-runtime-client/src/client.rs b/lambda-runtime-client/src/client.rs index f6381b2f..c358b0fb 100644 --- a/lambda-runtime-client/src/client.rs +++ b/lambda-runtime-client/src/client.rs @@ -136,28 +136,28 @@ impl RuntimeClient { let http_client = Client::builder().executor(task_executor).build_http(); - Ok(RuntimeClient { - http_client, - endpoint, - }) + Ok(RuntimeClient { http_client, endpoint }) } } impl RuntimeClient { /// Polls for new events to the Runtime APIs. - pub fn next_event(&self) -> impl Future, EventContext), Error=ApiError> { + pub fn next_event(&self) -> impl Future, EventContext), Error = ApiError> { let uri = format!( "http://{}/{}/runtime/invocation/next", self.endpoint, RUNTIME_API_VERSION - ).parse(); + ) + .parse(); trace!("Polling for next event"); let http_client = self.http_client.clone(); uri.into_future() .map_err(ApiError::from) - .and_then(move |uri| http_client.get(uri).map_err(|e| { - error!("Error when fetching next event from Runtime API: {}", e); - ApiError::from(e) - })) + .and_then(move |uri| { + http_client.get(uri).map_err(|e| { + error!("Error when fetching next event from Runtime API: {}", e); + ApiError::from(e) + }) + }) .and_then(|resp| { if resp.status().is_client_error() { error!( @@ -179,7 +179,12 @@ impl RuntimeClient { .clone()); } return Ok((Self::get_event_context(&resp.headers())?, resp)); - }).and_then(|(ctx, resp)| Ok(ctx).into_future().join(resp.into_body().concat2().map_err(Into::into))) + }) + .and_then(|(ctx, resp)| { + Ok(ctx) + .into_future() + .join(resp.into_body().concat2().map_err(Into::into)) + }) .map(|(ctx, body)| { let buf = body.into_bytes().to_vec(); @@ -204,11 +209,12 @@ impl RuntimeClient { /// /// # Returns /// A `Future` object containing a either resolving () for success or an `error::ApiError` instance. - pub fn event_response(&self, request_id: String, output: Vec) -> impl Future { + pub fn event_response(&self, request_id: String, output: Vec) -> impl Future { let uri = format!( "http://{}/{}/runtime/invocation/{}/response", self.endpoint, RUNTIME_API_VERSION, request_id - ).parse(); + ) + .parse(); trace!( "Posting response for request {} to Runtime API. Response length {} bytes", request_id, @@ -220,26 +226,26 @@ impl RuntimeClient { .map(move |uri| Self::get_runtime_post_request(&uri, output)) .and_then(move |req| http_client.request(req).map_err(ApiError::from)) .then(move |result| match result { - Ok(resp) => { - if !resp.status().is_success() { - error!( - "Error from Runtime API when posting response for request {}: {}", - request_id, - resp.status() - ); - return Err(ApiError::new(&format!( - "Error {} while sending response", - resp.status() - ))); + Ok(resp) => { + if !resp.status().is_success() { + error!( + "Error from Runtime API when posting response for request {}: {}", + request_id, + resp.status() + ); + return Err(ApiError::new(&format!( + "Error {} while sending response", + resp.status() + ))); + } + trace!("Posted response to Runtime API for request {}", request_id); + Ok(()) } - trace!("Posted response to Runtime API for request {}", request_id); - Ok(()) - } - Err(e) => { - error!("Error when calling runtime API for request {}: {}", request_id, e); - Err(ApiError::from(e)) - } - }) + Err(e) => { + error!("Error when calling runtime API for request {}: {}", request_id, e); + Err(ApiError::from(e)) + } + }) } /// Calls Lambda's Runtime APIs to send an error generated by the `Handler`. Because it's rust, @@ -254,7 +260,7 @@ impl RuntimeClient { /// /// # Returns /// A `Future` object containing a either resolving () for success or an `error::ApiError` instance. - pub fn event_error(&self, request_id: String, e: &dyn RuntimeApiError) -> impl Future { + pub fn event_error(&self, request_id: String, e: &dyn RuntimeApiError) -> impl Future { let uri = format!( "http://{}/{}/runtime/invocation/{}/error", self.endpoint, RUNTIME_API_VERSION, request_id @@ -267,33 +273,34 @@ impl RuntimeClient { .map_err(ApiError::from) .map(move |uri| (Self::get_runtime_error_request(&uri, &response), response)) .and_then(move |(req, error_response)| { - trace!( - "Posting error to runtime API for request {}: {}", - request_id, - error_response.error_message - ); - http_client.request(req).map_err(ApiError::from) - }).then(move |result| match result { - Ok(resp) => { - if !resp.status().is_success() { - error!( - "Error from Runtime API when posting error response for request {}: {}", - request_id2, - resp.status() - ); - return Err(ApiError::new(&format!( - "Error {} while sending response", - resp.status() - ))); + trace!( + "Posting error to runtime API for request {}: {}", + request_id, + error_response.error_message + ); + http_client.request(req).map_err(ApiError::from) + }) + .then(move |result| match result { + Ok(resp) => { + if !resp.status().is_success() { + error!( + "Error from Runtime API when posting error response for request {}: {}", + request_id2, + resp.status() + ); + return Err(ApiError::new(&format!( + "Error {} while sending response", + resp.status() + ))); + } + trace!("Posted error response for request id {}", request_id2); + Ok(()) } - trace!("Posted error response for request id {}", request_id2); - Ok(()) - } - Err(e) => { - error!("Error when calling runtime API for request {}: {}", request_id2, e); - Err(ApiError::from(e)) - } - }) + Err(e) => { + error!("Error when calling runtime API for request {}: {}", request_id2, e); + Err(ApiError::from(e)) + } + }) } /// Calls the Runtime APIs to report a failure during the init process. diff --git a/lambda-runtime/examples/async.rs b/lambda-runtime/examples/async.rs index d32c10d6..56451ad3 100644 --- a/lambda-runtime/examples/async.rs +++ b/lambda-runtime/examples/async.rs @@ -3,7 +3,7 @@ use std::error::Error; use lambda_runtime::{error::HandlerError, lambda, Context}; use serde_derive::{Deserialize, Serialize}; use simple_logger; -use tokio::prelude::future::{Future, ok}; +use tokio::prelude::future::{ok, Future}; #[derive(Deserialize)] struct CustomEvent { @@ -23,7 +23,7 @@ fn main() -> Result<(), Box> { Ok(()) } -fn my_handler(e: CustomEvent, _c: Context) -> impl Future { +fn my_handler(e: CustomEvent, _c: Context) -> impl Future { ok(format!("Hello, {}!", e.first_name)) .map(|message| format!("{} (modified in a Future)", message)) .map(|message| CustomOutput { message }) diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 0dfb39b4..dff29bc3 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -3,7 +3,7 @@ use std::{error::Error, marker::PhantomData, result}; use lambda_runtime_client::RuntimeClient; use serde; use serde_json; -use tokio::prelude::future::{Future, IntoFuture, loop_fn, Loop}; +use tokio::prelude::future::{loop_fn, Future, IntoFuture, Loop}; use tokio::runtime::Runtime as TokioRuntime; use crate::{ @@ -11,24 +11,30 @@ use crate::{ env::{ConfigProvider, EnvConfigProvider, FunctionSettings}, error::{HandlerError, RuntimeError}, }; -use tokio::runtime::TaskExecutor; use std::sync::Arc; use std::sync::Mutex; +use tokio::runtime::TaskExecutor; const MAX_RETRIES: i8 = 3; /// Functions acting as a handler must conform to this type. pub trait Handler: Send { /// Future of return value returned by handler. - type Future: Future + Send; + type Future: Future + Send; /// IntoFuture of return value returned by handler. - type IntoFuture: IntoFuture + Send; + type IntoFuture: IntoFuture + Send; /// Run the handler. fn run(&mut self, event: E, ctx: Context) -> Self::IntoFuture; } -impl + Send, IntoFut: IntoFuture + Send> Handler for F +impl< + F, + E, + O: Send, + Fut: Future + Send, + IntoFut: IntoFuture + Send, + > Handler for F where F: FnMut(E, Context) -> IntoFut + Send, { @@ -55,7 +61,9 @@ where { let mut runtime = runtime.unwrap_or_else(|| TokioRuntime::new().expect("Failed to start tokio runtime")); let task_executor = runtime.executor(); - runtime.block_on(start_with_config(f, &EnvConfigProvider::new(), task_executor)).unwrap(); + runtime + .block_on(start_with_config(f, &EnvConfigProvider::new(), task_executor)) + .unwrap(); } #[macro_export] @@ -87,7 +95,11 @@ macro_rules! lambda { /// The function panics if the `ConfigProvider` returns an error from the `get_runtime_api_endpoint()` /// or `get_function_settings()` methods. The panic forces AWS Lambda to terminate the environment /// and spin up a new one for the next invocation. -pub(crate) fn start_with_config(f: impl Handler, config: &C, task_executor: TaskExecutor) -> impl Future + Send +pub(crate) fn start_with_config( + f: impl Handler, + config: &C, + task_executor: TaskExecutor, +) -> impl Future + Send where E: serde::de::DeserializeOwned + Send + 'static, O: serde::Serialize + Send, @@ -114,9 +126,7 @@ where } match RuntimeClient::new(endpoint, task_executor) { - Ok(client) => { - start_with_runtime_client(f, function_config, client) - } + Ok(client) => start_with_runtime_client(f, function_config, client), Err(e) => { panic!("Could not create runtime client SDK: {}", e); } @@ -138,7 +148,8 @@ pub(crate) fn start_with_runtime_client( f: impl Handler, func_settings: FunctionSettings, client: RuntimeClient, -) -> impl Future + Send where +) -> impl Future + Send +where E: serde::de::DeserializeOwned + Send + 'static, O: serde::Serialize + Send, { @@ -211,7 +222,7 @@ where /// Starts the main event loop and begin polling or new events. If one of the /// Runtime APIs returns an unrecoverable error this method calls the init failed /// API and then panics. - fn start(&self) -> impl Future + Send { + fn start(&self) -> impl Future + Send { debug!("Beginning main event loop"); let max_retries = self.max_retries; @@ -310,31 +321,38 @@ where /// /// # Return /// A `Future` resolving to the next `Event` object to be processed. - pub(super) fn get_next_event(max_retries: i8, runtime_client: RuntimeClient, settings: FunctionSettings) -> impl Future { - loop_fn((0, None), move |(iteration, maybe_error): (i8, Option)| { - if let Some(err) = maybe_error { - if iteration > max_retries { - error!("Unrecoverable error while fetching next event: {}", err); - match err.request_id.clone() { - Some(req_id) => { - return Box::new(runtime_client - .event_error(req_id, &err) - .map_err(|e| format!("Could not send event error response: {}", e)) - // these errors are not recoverable. Either we can't communicate with the runtime APIs - // or we cannot parse the event. panic to restart the environment. - .then(|_| Err("Could not retrieve next event".to_owned()))) as Box + Send> - } - None => { - runtime_client.fail_init(&err); - unreachable!(); + pub(super) fn get_next_event( + max_retries: i8, + runtime_client: RuntimeClient, + settings: FunctionSettings, + ) -> impl Future { + loop_fn( + (0, None), + move |(iteration, maybe_error): (i8, Option)| { + if let Some(err) = maybe_error { + if iteration > max_retries { + error!("Unrecoverable error while fetching next event: {}", err); + match err.request_id.clone() { + Some(req_id) => { + return Box::new( + runtime_client + .event_error(req_id, &err) + .map_err(|e| format!("Could not send event error response: {}", e)) + // these errors are not recoverable. Either we can't communicate with the runtime APIs + // or we cannot parse the event. panic to restart the environment. + .then(|_| Err("Could not retrieve next event".to_owned())), + ) as Box + Send>; + } + None => { + runtime_client.fail_init(&err); + unreachable!(); + } } } } - } - let settings = settings.clone(); - Box::new(runtime_client.next_event().then(move |result| { - match result { + let settings = settings.clone(); + Box::new(runtime_client.next_event().then(move |result| match result { Ok((ev_data, invocation_ctx)) => { let parse_result = serde_json::from_slice(&ev_data); match parse_result { @@ -358,9 +376,9 @@ where } } Err(e) => Ok(Loop::Continue((iteration + 1, Some(RuntimeError::from(e))))), - } - })) as Box + Send> - }) + })) as Box + Send> + }, + ) } } From 0f253c6e3036ed7027ba67f2e95a22a5e3362643 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sun, 6 Jan 2019 13:15:02 +0000 Subject: [PATCH 11/12] cargo fmt with nightly --- lambda-runtime-client/src/client.rs | 3 +-- lambda-runtime/src/runtime.rs | 9 +++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lambda-runtime-client/src/client.rs b/lambda-runtime-client/src/client.rs index c358b0fb..a838b035 100644 --- a/lambda-runtime-client/src/client.rs +++ b/lambda-runtime-client/src/client.rs @@ -8,8 +8,7 @@ use hyper::{ }; use serde_derive::Deserialize; use serde_json; -use tokio::prelude::future::IntoFuture; -use tokio::runtime::TaskExecutor; +use tokio::{prelude::future::IntoFuture, runtime::TaskExecutor}; use crate::error::{ApiError, ErrorResponse, RuntimeApiError}; diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index dff29bc3..944d000f 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -3,16 +3,17 @@ use std::{error::Error, marker::PhantomData, result}; use lambda_runtime_client::RuntimeClient; use serde; use serde_json; -use tokio::prelude::future::{loop_fn, Future, IntoFuture, Loop}; -use tokio::runtime::Runtime as TokioRuntime; +use tokio::{ + prelude::future::{loop_fn, Future, IntoFuture, Loop}, + runtime::Runtime as TokioRuntime, +}; use crate::{ context::Context, env::{ConfigProvider, EnvConfigProvider, FunctionSettings}, error::{HandlerError, RuntimeError}, }; -use std::sync::Arc; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use tokio::runtime::TaskExecutor; const MAX_RETRIES: i8 = 3; From 5d1de6df98fa064bba6cab995a30df6ca9dd0fe2 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Sun, 13 Jan 2019 10:19:30 +0000 Subject: [PATCH 12/12] Review comments --- README.md | 2 +- lambda-runtime-client/src/client.rs | 4 ++-- lambda-runtime-client/src/lib.rs | 3 +-- lambda-runtime/src/runtime.rs | 10 ++-------- 4 files changed, 6 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 0c074345..6a9393cb 100644 --- a/README.md +++ b/README.md @@ -187,7 +187,7 @@ pub trait Handler: Send { `Handler` provides a default implementation that enables you to provide a Rust closure or function pointer to the `lambda!()` macro. -If your handler is synchronous, you can just return a `Result` from it; if your handler is asynchronous, you can return a `Future` from it. +Your `Handler` needs to return something which implements `IntoFuture`; `Result` implements `IntoIterator`, so most synchronous `Handler`s will return a `Result`. Optionally, you can pass your own instance of Tokio runtime to the `lambda!()` macro. See our [`with_custom_runtime.rs` example](https://github.com/awslabs/aws-lambda-rust-runtime/tree/master/lambda-runtime/examples/with_custom_runtime.rs) diff --git a/lambda-runtime-client/src/client.rs b/lambda-runtime-client/src/client.rs index a838b035..f9268318 100644 --- a/lambda-runtime-client/src/client.rs +++ b/lambda-runtime-client/src/client.rs @@ -129,13 +129,13 @@ pub struct RuntimeClient { impl RuntimeClient { /// Creates a new instance of the Runtime APIclient SDK. The http client has timeouts disabled and /// will always send a `Connection: keep-alive` header. - pub fn new(endpoint: String, task_executor: TaskExecutor) -> Result { + pub fn new(endpoint: String, task_executor: TaskExecutor) -> Self { debug!("Starting new HttpRuntimeClient for {}", endpoint); // start a tokio core main event loop for hyper let http_client = Client::builder().executor(task_executor).build_http(); - Ok(RuntimeClient { http_client, endpoint }) + RuntimeClient { http_client, endpoint } } } diff --git a/lambda-runtime-client/src/lib.rs b/lambda-runtime-client/src/lib.rs index 9f7ba59a..f64ce6b1 100644 --- a/lambda-runtime-client/src/lib.rs +++ b/lambda-runtime-client/src/lib.rs @@ -36,8 +36,7 @@ //! fn main() { //! let tokio_runtime = TokioRuntime::new().expect("Could not make tokio runtime"); //! let runtime_endpoint = String::from("http://localhost:8080"); -//! let client = RuntimeClient::new(runtime_endpoint, tokio_runtime.executor()) -//! .expect("Could not initialize client"); +//! let client = RuntimeClient::new(runtime_endpoint, tokio_runtime.executor()); //! //! let (event_data, event_context) = client.next_event().wait() //! .expect("Could not retrieve next event"); diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 944d000f..7bcbfc0a 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -126,12 +126,7 @@ where } } - match RuntimeClient::new(endpoint, task_executor) { - Ok(client) => start_with_runtime_client(f, function_config, client), - Err(e) => { - panic!("Could not create runtime client SDK: {}", e); - } - } + start_with_runtime_client(f, function_config, RuntimeClient::new(endpoint, task_executor)) } /// Starts the rust runtime with the given Runtime API client. @@ -398,8 +393,7 @@ pub(crate) mod tests { .get_runtime_api_endpoint() .expect("Could not get runtime endpoint"), runtime.executor(), - ) - .expect("Could not initialize client"); + ); let handler = |_e: String, _c: context::Context| -> Result { Ok("hello".to_string()) }; let retries: i8 = 3; let runtime = Runtime::new(