diff --git a/Makefile b/Makefile index 0890173a..d1eb2c99 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ INTEG_STACK_NAME ?= rust-lambda-integration-tests INTEG_FUNCTIONS_BUILD := runtime-fn runtime-trait http-fn http-trait INTEG_FUNCTIONS_INVOKE := RuntimeFn RuntimeFnAl2 RuntimeTrait RuntimeTraitAl2 Python PythonAl2 INTEG_API_INVOKE := RestApiUrl HttpApiUrl -INTEG_EXTENSIONS := extension-fn extension-trait +INTEG_EXTENSIONS := extension-fn extension-trait logs-trait # Using musl to run extensions on both AL1 and AL2 INTEG_ARCH := x86_64-unknown-linux-musl diff --git a/lambda-extension/Cargo.toml b/lambda-extension/Cargo.toml index 7ad1b491..69fa7745 100644 --- a/lambda-extension/Cargo.toml +++ b/lambda-extension/Cargo.toml @@ -11,17 +11,18 @@ keywords = ["AWS", "Lambda", "API"] readme = "README.md" [dependencies] -tokio = { version = "1.0", features = ["macros", "io-util", "sync", "rt-multi-thread"] } +async-stream = "0.3" +bytes = "1.0" +chrono = { version = "0.4", features = ["serde"] } +http = "0.2" hyper = { version = "0.14", features = ["http1", "client", "server", "stream", "runtime"] } +lambda_runtime_api_client = { version = "0.4", path = "../lambda-runtime-api-client" } serde = { version = "1", features = ["derive"] } serde_json = "^1" -bytes = "1.0" -http = "0.2" -async-stream = "0.3" tracing = { version = "0.1", features = ["log"] } -tower = { version = "0.4", features = ["util"] } +tokio = { version = "1.0", features = ["macros", "io-util", "sync", "rt-multi-thread"] } tokio-stream = "0.1.2" -lambda_runtime_api_client = { version = "0.4", path = "../lambda-runtime-api-client" } +tower = { version = "0.4", features = ["make", "util"] } [dev-dependencies] simple-error = "0.2" diff --git a/lambda-extension/README.md b/lambda-extension/README.md index 79296608..95b08ccd 100644 --- a/lambda-extension/README.md +++ b/lambda-extension/README.md @@ -2,9 +2,11 @@ [![Docs](https://docs.rs/lambda_extension/badge.svg)](https://docs.rs/lambda_extension) -**`lambda-extension`** is a library that makes it easy to write [AWS Lambda Runtime Extensions](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html) in Rust. +**`lambda-extension`** is a library that makes it easy to write [AWS Lambda Runtime Extensions](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html) in Rust. It also helps with using [Lambda Logs API](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-logs-api.html). -## Example extension +## Example extensions + +### Simple extension The code below creates a simple extension that's registered to every `INVOKE` and `SHUTDOWN` events, and logs them in CloudWatch. @@ -37,6 +39,40 @@ async fn main() -> Result<(), Error> { ``` +### Log processor extension + +```rust,no_run +use lambda_extension::{service_fn, Error, Extension, LambdaLog, LambdaLogRecord, SharedService}; +use tracing::info; + +async fn handler(logs: Vec) -> Result<(), Error> { + for log in logs { + match log.record { + LambdaLogRecord::Function(_record) => { + // do something with the function log record + }, + LambdaLogRecord::Extension(_record) => { + // do something with the extension log record + }, + }, + _ => (), + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let logs_processor = SharedService::new(service_fn(handler)); + + Extension::new().with_logs_processor(logs_processor).run().await?; + + Ok(()) +} + +``` + ## Deployment Lambda extensions can be added to your functions either using [Lambda layers](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#using-extensions-config), or adding them to [containers images](https://docs.aws.amazon.com/lambda/latest/dg/using-extensions.html#invocation-extensions-images). diff --git a/lambda-extension/examples/combined.rs b/lambda-extension/examples/combined.rs new file mode 100644 index 00000000..eede270f --- /dev/null +++ b/lambda-extension/examples/combined.rs @@ -0,0 +1,51 @@ +use lambda_extension::{ + service_fn, Error, Extension, LambdaEvent, LambdaLog, LambdaLogRecord, NextEvent, SharedService, +}; +use tracing::info; + +async fn my_extension(event: LambdaEvent) -> Result<(), Error> { + match event.next { + NextEvent::Shutdown(_e) => { + // do something with the shutdown event + } + NextEvent::Invoke(_e) => { + // do something with the invoke event + } + } + Ok(()) +} + +async fn my_log_processor(logs: Vec) -> Result<(), Error> { + for log in logs { + match log.record { + LambdaLogRecord::Function(record) => info!("[logs] [function] {}", record), + LambdaLogRecord::Extension(record) => info!("[logs] [extension] {}", record), + _ => (), + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + // The runtime logging can be enabled here by initializing `tracing` with `tracing-subscriber` + // While `tracing` is used internally, `log` can be used as well if preferred. + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // this needs to be set to false, otherwise ANSI color codes will + // show up in a confusing manner in CloudWatch logs. + .with_ansi(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + let func = service_fn(my_extension); + let logs_processor = SharedService::new(service_fn(my_log_processor)); + + Extension::new() + .with_events_processor(func) + .with_logs_processor(logs_processor) + .run() + .await +} diff --git a/lambda-extension/examples/custom_events.rs b/lambda-extension/examples/custom_events.rs index d2756c23..f796ca31 100644 --- a/lambda-extension/examples/custom_events.rs +++ b/lambda-extension/examples/custom_events.rs @@ -1,4 +1,4 @@ -use lambda_extension::{service_fn, Error, LambdaEvent, NextEvent, Runtime}; +use lambda_extension::{service_fn, Error, Extension, LambdaEvent, NextEvent}; async fn my_extension(event: LambdaEvent) -> Result<(), Error> { match event.next { @@ -27,9 +27,9 @@ async fn main() -> Result<(), Error> { .without_time() .init(); - let func = service_fn(my_extension); - - let runtime = Runtime::builder().with_events(&["SHUTDOWN"]).register().await?; - - runtime.run(func).await + Extension::new() + .with_events(&["SHUTDOWN"]) + .with_events_processor(service_fn(my_extension)) + .run() + .await } diff --git a/lambda-extension/examples/custom_logs_service.rs b/lambda-extension/examples/custom_logs_service.rs new file mode 100644 index 00000000..aace5f6b --- /dev/null +++ b/lambda-extension/examples/custom_logs_service.rs @@ -0,0 +1,64 @@ +use lambda_extension::{Error, Extension, LambdaLog, LambdaLogRecord, Service, SharedService}; +use std::{ + future::{ready, Future}, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, + }, + task::Poll, +}; +use tracing::info; + +/// Custom log processor that increments a counter for each log record. +/// +/// This is a simple example of a custom log processor that can be used to +/// count the number of log records that are processed. +/// +/// This needs to derive Clone (and store the counter in an Arc) as the runtime +/// could need multiple `Service`s to process the logs. +#[derive(Clone, Default)] +struct MyLogsProcessor { + counter: Arc, +} + +impl MyLogsProcessor { + pub fn new() -> Self { + Self::default() + } +} + +/// Implementation of the actual log processor +/// +/// This receives a `Vec` whenever there are new log entries available. +impl Service> for MyLogsProcessor { + type Response = (); + type Error = Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, logs: Vec) -> Self::Future { + let counter = self.counter.fetch_add(1, SeqCst); + for log in logs { + match log.record { + LambdaLogRecord::Function(record) => info!("[logs] [function] {}: {}", counter, record), + LambdaLogRecord::Extension(record) => info!("[logs] [extension] {}: {}", counter, record), + _ => (), + } + } + + Box::pin(ready(Ok(()))) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let logs_processor = SharedService::new(MyLogsProcessor::new()); + + Extension::new().with_logs_processor(logs_processor).run().await?; + + Ok(()) +} diff --git a/lambda-extension/examples/custom_trait_implementation.rs b/lambda-extension/examples/custom_service.rs similarity index 100% rename from lambda-extension/examples/custom_trait_implementation.rs rename to lambda-extension/examples/custom_service.rs diff --git a/lambda-extension/examples/logs.rs b/lambda-extension/examples/logs.rs new file mode 100644 index 00000000..79973a46 --- /dev/null +++ b/lambda-extension/examples/logs.rs @@ -0,0 +1,23 @@ +use lambda_extension::{service_fn, Error, Extension, LambdaLog, LambdaLogRecord, SharedService}; +use tracing::info; + +async fn handler(logs: Vec) -> Result<(), Error> { + for log in logs { + match log.record { + LambdaLogRecord::Function(record) => info!("[logs] [function] {}", record), + LambdaLogRecord::Extension(record) => info!("[logs] [extension] {}", record), + _ => (), + } + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + let logs_processor = SharedService::new(service_fn(handler)); + + Extension::new().with_logs_processor(logs_processor).run().await?; + + Ok(()) +} diff --git a/lambda-extension/src/error.rs b/lambda-extension/src/error.rs new file mode 100644 index 00000000..2c3e23b3 --- /dev/null +++ b/lambda-extension/src/error.rs @@ -0,0 +1,23 @@ +/// Error type that extensions may result in +pub type Error = lambda_runtime_api_client::Error; + +/// Simple error that encapsulates human readable descriptions +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ExtensionError { + err: String, +} + +impl ExtensionError { + pub(crate) fn boxed>(str: T) -> Box { + Box::new(ExtensionError { err: str.into() }) + } +} + +impl std::fmt::Display for ExtensionError { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.err.fmt(f) + } +} + +impl std::error::Error for ExtensionError {} diff --git a/lambda-extension/src/events.rs b/lambda-extension/src/events.rs new file mode 100644 index 00000000..87fd62a4 --- /dev/null +++ b/lambda-extension/src/events.rs @@ -0,0 +1,71 @@ +use serde::Deserialize; + +/// Request tracing information +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Tracing { + /// The type of tracing exposed to the extension + pub r#type: String, + /// The span value + pub value: String, +} +/// Event received when there is a new Lambda invocation. +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct InvokeEvent { + /// The time that the function times out + pub deadline_ms: u64, + /// The ID assigned to the Lambda request + pub request_id: String, + /// The function's Amazon Resource Name + pub invoked_function_arn: String, + /// The request tracing information + pub tracing: Tracing, +} + +/// Event received when a Lambda function shuts down. +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ShutdownEvent { + /// The reason why the function terminates + /// It can be SPINDOWN, TIMEOUT, or FAILURE + pub shutdown_reason: String, + /// The time that the function times out + pub deadline_ms: u64, +} + +/// Event that the extension receives in +/// either the INVOKE or SHUTDOWN phase +#[derive(Debug, Deserialize)] +#[serde(rename_all = "UPPERCASE", tag = "eventType")] +pub enum NextEvent { + /// Payload when the event happens in the INVOKE phase + Invoke(InvokeEvent), + /// Payload when the event happens in the SHUTDOWN phase + Shutdown(ShutdownEvent), +} + +impl NextEvent { + /// Return whether the event is a [`NextEvent::Invoke`] event or not + pub fn is_invoke(&self) -> bool { + matches!(self, NextEvent::Invoke(_)) + } +} + +/// Wrapper with information about the next +/// event that the Lambda Runtime is going to process +pub struct LambdaEvent { + /// ID assigned to this extension by the Lambda Runtime + pub extension_id: String, + /// Next incoming event + pub next: NextEvent, +} + +impl LambdaEvent { + pub(crate) fn new(ex_id: &str, next: NextEvent) -> LambdaEvent { + LambdaEvent { + extension_id: ex_id.into(), + next, + } + } +} diff --git a/lambda-extension/src/extension.rs b/lambda-extension/src/extension.rs new file mode 100644 index 00000000..71e5bcdf --- /dev/null +++ b/lambda-extension/src/extension.rs @@ -0,0 +1,300 @@ +use crate::{logs::*, requests, Error, ExtensionError, LambdaEvent, NextEvent}; +use hyper::{server::conn::AddrStream, Server}; +use lambda_runtime_api_client::Client; +use std::{fmt, future::ready, future::Future, net::SocketAddr, path::PathBuf, pin::Pin, sync::Arc}; +use tokio::sync::Mutex; +use tokio_stream::StreamExt; +use tower::{service_fn, MakeService, Service}; +use tracing::{error, trace}; + +const DEFAULT_LOG_PORT_NUMBER: u16 = 9002; + +/// An Extension that runs event and log processors +pub struct Extension<'a, E, L> { + extension_name: Option<&'a str>, + events: Option<&'a [&'a str]>, + events_processor: E, + log_types: Option<&'a [&'a str]>, + logs_processor: Option, + log_buffering: Option, + log_port_number: u16, +} + +impl<'a> Extension<'a, Identity, MakeIdentity>> { + /// Create a new base [`Extension`] with a no-op events processor + pub fn new() -> Self { + Extension { + extension_name: None, + events: None, + events_processor: Identity::new(), + log_types: None, + log_buffering: None, + logs_processor: None, + log_port_number: DEFAULT_LOG_PORT_NUMBER, + } + } +} + +impl<'a> Default for Extension<'a, Identity, MakeIdentity>> { + fn default() -> Self { + Self::new() + } +} + +impl<'a, E, L> Extension<'a, E, L> +where + E: Service, + E::Future: Future>, + E::Error: Into> + fmt::Display, + + // Fixme: 'static bound might be too restrictive + L: MakeService<(), Vec, Response = ()> + Send + Sync + 'static, + L::Service: Service, Response = ()> + Send + Sync, + >>::Future: Send + 'a, + L::Error: Into>, + L::MakeError: Into>, + L::Future: Send, +{ + /// Create a new [`Extension`] with a given extension name + pub fn with_extension_name(self, extension_name: &'a str) -> Self { + Extension { + extension_name: Some(extension_name), + ..self + } + } + + /// Create a new [`Extension`] with a list of given events. + /// The only accepted events are `INVOKE` and `SHUTDOWN`. + pub fn with_events(self, events: &'a [&'a str]) -> Self { + Extension { + events: Some(events), + ..self + } + } + + /// Create a new [`Extension`] with a service that receives Lambda events. + pub fn with_events_processor(self, ep: N) -> Extension<'a, N, L> + where + N: Service, + N::Future: Future>, + N::Error: Into> + fmt::Display, + { + Extension { + events_processor: ep, + extension_name: self.extension_name, + events: self.events, + log_types: self.log_types, + log_buffering: self.log_buffering, + logs_processor: self.logs_processor, + log_port_number: self.log_port_number, + } + } + + /// Create a new [`Extension`] with a service that receives Lambda logs. + pub fn with_logs_processor(self, lp: N) -> Extension<'a, E, N> + where + N: Service<()>, + N::Future: Future>, + N::Error: Into> + fmt::Display, + { + Extension { + logs_processor: Some(lp), + events_processor: self.events_processor, + extension_name: self.extension_name, + events: self.events, + log_types: self.log_types, + log_buffering: self.log_buffering, + log_port_number: self.log_port_number, + } + } + + /// Create a new [`Extension`] with a list of logs types to subscribe. + /// The only accepted log types are `function`, `platform`, and `extension`. + pub fn with_log_types(self, log_types: &'a [&'a str]) -> Self { + Extension { + log_types: Some(log_types), + ..self + } + } + + /// Create a new [`Extension`] with specific configuration to buffer logs. + pub fn with_log_buffering(self, lb: LogBuffering) -> Self { + Extension { + log_buffering: Some(lb), + ..self + } + } + + /// Create a new [`Extension`] with a different port number to listen to logs. + pub fn with_log_port_number(self, port_number: u16) -> Self { + Extension { + log_port_number: port_number, + ..self + } + } + + /// Execute the given extension + pub async fn run(self) -> Result<(), Error> { + let client = &Client::builder().build()?; + + let extension_id = register(client, self.extension_name, self.events).await?; + let extension_id = extension_id.to_str()?; + let mut ep = self.events_processor; + + if let Some(mut log_processor) = self.logs_processor { + trace!("Log processor found"); + // Spawn task to run processor + let addr = SocketAddr::from(([0, 0, 0, 0], self.log_port_number)); + let make_service = service_fn(move |_socket: &AddrStream| { + trace!("Creating new log processor Service"); + let service = log_processor.make_service(()); + async move { + let service = Arc::new(Mutex::new(service.await?)); + Ok::<_, L::MakeError>(service_fn(move |req| log_wrapper(service.clone(), req))) + } + }); + let server = Server::bind(&addr).serve(make_service); + tokio::spawn(async move { + if let Err(e) = server.await { + error!("Error while running log processor: {}", e); + } + }); + trace!("Log processor started"); + + // Call Logs API to start receiving events + let req = requests::subscribe_logs_request( + extension_id, + self.log_types, + self.log_buffering, + self.log_port_number, + )?; + let res = client.call(req).await?; + if res.status() != http::StatusCode::OK { + return Err(ExtensionError::boxed("unable to initialize the logs api")); + } + trace!("Registered extension with Logs API"); + } + + let incoming = async_stream::stream! { + loop { + trace!("Waiting for next event (incoming loop)"); + let req = requests::next_event_request(extension_id)?; + let res = client.call(req).await; + yield res; + } + }; + + tokio::pin!(incoming); + while let Some(event) = incoming.next().await { + trace!("New event arrived (run loop)"); + let event = event?; + let (_parts, body) = event.into_parts(); + + let body = hyper::body::to_bytes(body).await?; + trace!("{}", std::str::from_utf8(&body)?); // this may be very verbose + let event: NextEvent = serde_json::from_slice(&body)?; + let is_invoke = event.is_invoke(); + + let event = LambdaEvent::new(extension_id, event); + + let res = ep.call(event).await; + if let Err(error) = res { + let req = if is_invoke { + requests::init_error(extension_id, &error.to_string(), None)? + } else { + requests::exit_error(extension_id, &error.to_string(), None)? + }; + + client.call(req).await?; + return Err(error.into()); + } + } + Ok(()) + } +} + +/// A no-op generic processor +#[derive(Clone)] +pub struct Identity { + _phantom: std::marker::PhantomData, +} + +impl Identity { + fn new() -> Self { + Self { + _phantom: std::marker::PhantomData, + } + } +} + +impl Service for Identity { + type Error = Error; + type Future = Pin> + Send>>; + type Response = (); + + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + core::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, _event: T) -> Self::Future { + Box::pin(ready(Ok(()))) + } +} + +/// Service factory to generate no-op generic processors +#[derive(Clone)] +pub struct MakeIdentity { + _phantom: std::marker::PhantomData, +} + +impl Service<()> for MakeIdentity +where + T: Send + Sync + 'static, +{ + type Error = Error; + type Response = Identity; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + core::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, _: ()) -> Self::Future { + Box::pin(ready(Ok(Identity::new()))) + } +} + +/// Initialize and register the extension in the Extensions API +async fn register<'a>( + client: &'a Client, + extension_name: Option<&'a str>, + events: Option<&'a [&'a str]>, +) -> Result { + let name = match extension_name { + Some(name) => name.into(), + None => { + let args: Vec = std::env::args().collect(); + PathBuf::from(args[0].clone()) + .file_name() + .expect("unexpected executable name") + .to_str() + .expect("unexpect executable name") + .to_string() + } + }; + + let events = events.unwrap_or(&["INVOKE", "SHUTDOWN"]); + + let req = requests::register_request(&name, events)?; + let res = client.call(req).await?; + if res.status() != http::StatusCode::OK { + return Err(ExtensionError::boxed("unable to register the extension")); + } + + let header = res + .headers() + .get(requests::EXTENSION_ID_HEADER) + .ok_or_else(|| ExtensionError::boxed("missing extension id header")) + .map_err(|e| ExtensionError::boxed(e.to_string()))?; + Ok(header.clone()) +} diff --git a/lambda-extension/src/lib.rs b/lambda-extension/src/lib.rs index 130aae50..9bb44b50 100644 --- a/lambda-extension/src/lib.rs +++ b/lambda-extension/src/lib.rs @@ -1,244 +1,33 @@ #![deny(clippy::all, clippy::cargo)] -#![allow(clippy::multiple_crate_versions)] +#![allow(clippy::multiple_crate_versions, clippy::type_complexity)] #![warn(missing_docs, nonstandard_style, rust_2018_idioms)] //! This module includes utilities to create Lambda Runtime Extensions. //! //! Create a type that conforms to the [`Extension`] trait. This type can then be passed //! to the the `lambda_extension::run` function, which launches and runs the Lambda runtime extension. -use hyper::client::{connect::Connection, HttpConnector}; -use lambda_runtime_api_client::Client; -use serde::Deserialize; -use std::{fmt, future::Future, path::PathBuf}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_stream::StreamExt; -pub use tower::{self, service_fn, Service}; -use tracing::trace; +use std::{fmt, future::Future}; +pub use tower::{self, make::Shared as SharedService, service_fn, Service}; + +mod error; +pub use error::*; +mod extension; +pub use extension::*; +mod events; +pub use events::*; +mod logs; +pub use logs::*; /// Include several request builders to interact with the Extension API. pub mod requests; -/// Error type that extensions may result in -pub type Error = lambda_runtime_api_client::Error; - -/// Simple error that encapsulates human readable descriptions -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct ExtensionError { - err: String, -} - -impl ExtensionError { - fn boxed>(str: T) -> Box { - Box::new(ExtensionError { err: str.into() }) - } -} - -impl std::fmt::Display for ExtensionError { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.err.fmt(f) - } -} - -impl std::error::Error for ExtensionError {} - -/// Request tracing information -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Tracing { - /// The type of tracing exposed to the extension - pub r#type: String, - /// The span value - pub value: String, -} - -/// Event received when there is a new Lambda invocation. -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct InvokeEvent { - /// The time that the function times out - pub deadline_ms: u64, - /// The ID assigned to the Lambda request - pub request_id: String, - /// The function's Amazon Resource Name - pub invoked_function_arn: String, - /// The request tracing information - pub tracing: Tracing, -} - -/// Event received when a Lambda function shuts down. -#[derive(Debug, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ShutdownEvent { - /// The reason why the function terminates - /// It can be SPINDOWN, TIMEOUT, or FAILURE - pub shutdown_reason: String, - /// The time that the function times out - pub deadline_ms: u64, -} - -/// Event that the extension receives in -/// either the INVOKE or SHUTDOWN phase -#[derive(Debug, Deserialize)] -#[serde(rename_all = "UPPERCASE", tag = "eventType")] -pub enum NextEvent { - /// Payload when the event happens in the INVOKE phase - Invoke(InvokeEvent), - /// Payload when the event happens in the SHUTDOWN phase - Shutdown(ShutdownEvent), -} - -impl NextEvent { - fn is_invoke(&self) -> bool { - matches!(self, NextEvent::Invoke(_)) - } -} - -/// Wrapper with information about the next -/// event that the Lambda Runtime is going to process -pub struct LambdaEvent { - /// ID assigned to this extension by the Lambda Runtime - pub extension_id: String, - /// Next incoming event - pub next: NextEvent, -} - -/// The Runtime handles all the incoming extension requests -pub struct Runtime = HttpConnector> { - extension_id: String, - client: Client, -} - -impl Runtime { - /// Create a [`RuntimeBuilder`] to initialize the [`Runtime`] - pub fn builder<'a>() -> RuntimeBuilder<'a> { - RuntimeBuilder::default() - } -} - -impl Runtime -where - C: Service + Clone + Send + Sync + Unpin + 'static, - C::Future: Unpin + Send, - C::Error: Into>, - C::Response: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, -{ - /// Execute the given extension. - /// Register the extension with the Extensions API and wait for incoming events. - pub async fn run(&self, mut extension: E) -> Result<(), Error> - where - E: Service, - E::Future: Future>, - E::Error: Into> + fmt::Display, - { - let client = &self.client; - - let incoming = async_stream::stream! { - loop { - trace!("Waiting for next event (incoming loop)"); - let req = requests::next_event_request(&self.extension_id)?; - let res = client.call(req).await; - yield res; - } - }; - - tokio::pin!(incoming); - while let Some(event) = incoming.next().await { - trace!("New event arrived (run loop)"); - let event = event?; - let (_parts, body) = event.into_parts(); - - let body = hyper::body::to_bytes(body).await?; - trace!("{}", std::str::from_utf8(&body)?); // this may be very verbose - let event: NextEvent = serde_json::from_slice(&body)?; - let is_invoke = event.is_invoke(); - - let event = LambdaEvent { - extension_id: self.extension_id.clone(), - next: event, - }; - - let res = extension.call(event).await; - if let Err(error) = res { - let req = if is_invoke { - requests::init_error(&self.extension_id, &error.to_string(), None)? - } else { - requests::exit_error(&self.extension_id, &error.to_string(), None)? - }; - - self.client.call(req).await?; - return Err(error.into()); - } - } - - Ok(()) - } -} - -/// Builder to construct a new extension [`Runtime`] -#[derive(Default)] -pub struct RuntimeBuilder<'a> { - extension_name: Option<&'a str>, - events: Option<&'a [&'a str]>, -} - -impl<'a> RuntimeBuilder<'a> { - /// Create a new [`RuntimeBuilder`] with a given extension name - pub fn with_extension_name(self, extension_name: &'a str) -> Self { - RuntimeBuilder { - extension_name: Some(extension_name), - ..self - } - } - - /// Create a new [`RuntimeBuilder`] with a list of given events. - /// The only accepted events are `INVOKE` and `SHUTDOWN`. - pub fn with_events(self, events: &'a [&'a str]) -> Self { - RuntimeBuilder { - events: Some(events), - ..self - } - } - - /// Initialize and register the extension in the Extensions API - pub async fn register(&self) -> Result { - let name = match self.extension_name { - Some(name) => name.into(), - None => { - let args: Vec = std::env::args().collect(); - PathBuf::from(args[0].clone()) - .file_name() - .expect("unexpected executable name") - .to_str() - .expect("unexpect executable name") - .to_string() - } - }; - - let events = self.events.unwrap_or(&["INVOKE", "SHUTDOWN"]); - - let client = Client::builder().build()?; - - let req = requests::register_request(&name, events)?; - let res = client.call(req).await?; - if res.status() != http::StatusCode::OK { - return Err(ExtensionError::boxed("unable to register the extension")); - } - - let extension_id = res.headers().get(requests::EXTENSION_ID_HEADER).unwrap().to_str()?; - Ok(Runtime { - extension_id: extension_id.into(), - client, - }) - } -} - -/// Execute the given extension -pub async fn run(extension: E) -> Result<(), Error> +/// Execute the given events processor +pub async fn run(events_processor: E) -> Result<(), Error> where E: Service, E::Future: Future>, E::Error: Into> + fmt::Display, { - Runtime::builder().register().await?.run(extension).await + let ext = Extension::new().with_events_processor(events_processor); + ext.run().await } diff --git a/lambda-extension/src/logs.rs b/lambda-extension/src/logs.rs new file mode 100644 index 00000000..66f47ee1 --- /dev/null +++ b/lambda-extension/src/logs.rs @@ -0,0 +1,299 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::{boxed::Box, sync::Arc}; +use tokio::sync::Mutex; +use tower::Service; +use tracing::{error, trace}; + +/// Payload received from the Lambda Logs API +/// See: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-logs-api.html#runtimes-logs-api-msg +#[derive(Clone, Debug, Deserialize, PartialEq)] +pub struct LambdaLog { + /// Time when the log was generated + pub time: DateTime, + /// Log record entry + #[serde(flatten)] + pub record: LambdaLogRecord, +} + +/// Record in a LambdaLog entry +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(tag = "type", content = "record", rename_all = "lowercase")] +pub enum LambdaLogRecord { + /// Function log records + Function(String), + + /// Extension log records + Extension(String), + + /// Platform start record + #[serde(rename = "platform.start", rename_all = "camelCase")] + PlatformStart { + /// Request identifier + request_id: String, + }, + /// Platform stop record + #[serde(rename = "platform.end", rename_all = "camelCase")] + PlatformEnd { + /// Request identifier + request_id: String, + }, + /// Platform report record + #[serde(rename = "platform.report", rename_all = "camelCase")] + PlatformReport { + /// Request identifier + request_id: String, + /// Request metrics + metrics: LogPlatformReportMetrics, + }, + /// Runtime or execution environment error record + #[serde(rename = "platform.fault")] + PlatformFault(String), + /// Extension-specific record + #[serde(rename = "platform.extension", rename_all = "camelCase")] + PlatformExtension { + /// Name of the extension + name: String, + /// State of the extension + state: String, + /// Events sent to the extension + events: Vec, + }, + /// Log processor-specific record + #[serde(rename = "platform.logsSubscription", rename_all = "camelCase")] + PlatformLogsSubscription { + /// Name of the extension + name: String, + /// State of the extensions + state: String, + /// Types of records sent to the extension + types: Vec, + }, + /// Record generated when the log processor is falling behind + #[serde(rename = "platform.logsDropped", rename_all = "camelCase")] + PlatformLogsDropped { + /// Reason for dropping the logs + reason: String, + /// Number of records dropped + dropped_records: u64, + /// Total size of the dropped records + dropped_bytes: u64, + }, + /// Record marking the completion of an invocation + #[serde(rename = "platform.runtimeDone", rename_all = "camelCase")] + PlatformRuntimeDone { + /// Request identifier + request_id: String, + /// Status of the invocation + status: String, + }, +} + +/// Platform report metrics +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct LogPlatformReportMetrics { + /// Duration in milliseconds + pub duration_ms: f64, + /// Billed duration in milliseconds + pub billed_duration_ms: u64, + /// Memory allocated in megabytes + #[serde(rename = "memorySizeMB")] + pub memory_size_mb: u64, + /// Maximum memory used for the invoke in megabytes + #[serde(rename = "maxMemoryUsedMB")] + pub max_memory_used_mb: u64, + /// Init duration in case of a cold start + #[serde(default = "Option::default")] + pub init_duration_ms: Option, +} + +/// Log buffering configuration. +/// Allows Lambda to buffer logs before deliverying them to a subscriber. +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct LogBuffering { + /// The maximum time (in milliseconds) to buffer a batch. + /// Default: 1,000. Minimum: 25. Maximum: 30,000 + pub timeout_ms: usize, + /// The maximum size (in bytes) of the logs to buffer in memory. + /// Default: 262,144. Minimum: 262,144. Maximum: 1,048,576 + pub max_bytes: usize, + /// The maximum number of events to buffer in memory. + /// Default: 10,000. Minimum: 1,000. Maximum: 10,000 + pub max_items: usize, +} + +impl Default for LogBuffering { + fn default() -> Self { + LogBuffering { + timeout_ms: 1_000, + max_bytes: 262_144, + max_items: 10_000, + } + } +} + +/// Wrapper function that sends logs to the subscriber Service +/// +/// This takes an `hyper::Request` and transforms it into `Vec` for the +/// underlying `Service` to process. +pub(crate) async fn log_wrapper( + service: Arc>, + req: hyper::Request, +) -> Result, Box> +where + S: Service, Response = ()>, + S::Error: Into>, + S::Future: Send, +{ + trace!("Received logs request"); + // Parse the request body as a Vec + let body = match hyper::body::to_bytes(req.into_body()).await { + Ok(body) => body, + Err(e) => { + error!("Error reading logs request body: {}", e); + return Ok(hyper::Response::builder() + .status(hyper::StatusCode::BAD_REQUEST) + .body(hyper::Body::empty()) + .unwrap()); + } + }; + let logs: Vec = match serde_json::from_slice(&body) { + Ok(logs) => logs, + Err(e) => { + error!("Error parsing logs: {}", e); + return Ok(hyper::Response::builder() + .status(hyper::StatusCode::BAD_REQUEST) + .body(hyper::Body::empty()) + .unwrap()); + } + }; + + { + let mut service = service.lock().await; + let _ = service.call(logs).await; + } + + Ok(hyper::Response::new(hyper::Body::empty())) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + #[test] + fn deserialize_full() { + let data = r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": "hello world"}"#; + let expected = LambdaLog { + time: Utc.ymd(2020, 8, 20).and_hms_milli(12, 31, 32, 123), + record: LambdaLogRecord::Function("hello world".to_string()), + }; + + let actual = serde_json::from_str::(data).unwrap(); + + assert_eq!(expected, actual); + } + + macro_rules! deserialize_tests { + ($($name:ident: $value:expr,)*) => { + $( + #[test] + fn $name() { + let (input, expected) = $value; + let actual = serde_json::from_str::(&input).expect("unable to deserialize"); + + assert!(actual.record == expected); + } + )* + } + } + + deserialize_tests! { + // function + function: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "function", "record": "hello world"}"#, + LambdaLogRecord::Function("hello world".to_string()), + ), + + // extension + extension: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "extension", "record": "hello world"}"#, + LambdaLogRecord::Extension("hello world".to_string()), + ), + + // platform.start + platform_start: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.start","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56"}}"#, + LambdaLogRecord::PlatformStart { + request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(), + }, + ), + // platform.end + platform_end: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.end","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56"}}"#, + LambdaLogRecord::PlatformEnd { + request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(), + }, + ), + // platform.report + platform_report: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.report","record": {"requestId": "6f7f0961f83442118a7af6fe80b88d56","metrics": {"durationMs": 1.23,"billedDurationMs": 123,"memorySizeMB": 123,"maxMemoryUsedMB": 123,"initDurationMs": 1.23}}}"#, + LambdaLogRecord::PlatformReport { + request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(), + metrics: LogPlatformReportMetrics { + duration_ms: 1.23, + billed_duration_ms: 123, + memory_size_mb: 123, + max_memory_used_mb: 123, + init_duration_ms: Some(1.23), + }, + }, + ), + // platform.fault + platform_fault: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.fault","record": "RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request"}"#, + LambdaLogRecord::PlatformFault( + "RequestId: d783b35e-a91d-4251-af17-035953428a2c Process exited before completing request" + .to_string(), + ), + ), + // platform.extension + platform_extension: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.extension","record": {"name": "Foo.bar","state": "Ready","events": ["INVOKE", "SHUTDOWN"]}}"#, + LambdaLogRecord::PlatformExtension { + name: "Foo.bar".to_string(), + state: "Ready".to_string(), + events: vec!["INVOKE".to_string(), "SHUTDOWN".to_string()], + }, + ), + // platform.logsSubscription + platform_logssubscription: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.logsSubscription","record": {"name": "test","state": "active","types": ["test"]}}"#, + LambdaLogRecord::PlatformLogsSubscription { + name: "test".to_string(), + state: "active".to_string(), + types: vec!["test".to_string()], + }, + ), + // platform.logsDropped + platform_logsdropped: ( + r#"{"time": "2020-08-20T12:31:32.123Z","type": "platform.logsDropped","record": {"reason": "Consumer seems to have fallen behind as it has not acknowledged receipt of logs.","droppedRecords": 123,"droppedBytes": 12345}}"#, + LambdaLogRecord::PlatformLogsDropped { + reason: "Consumer seems to have fallen behind as it has not acknowledged receipt of logs." + .to_string(), + dropped_records: 123, + dropped_bytes: 12345, + }, + ), + // platform.runtimeDone + platform_runtimedone: ( + r#"{"time": "2021-02-04T20:00:05.123Z","type": "platform.runtimeDone","record": {"requestId":"6f7f0961f83442118a7af6fe80b88d56","status": "success"}}"#, + LambdaLogRecord::PlatformRuntimeDone { + request_id: "6f7f0961f83442118a7af6fe80b88d56".to_string(), + status: "success".to_string(), + }, + ), + } +} diff --git a/lambda-extension/src/requests.rs b/lambda-extension/src/requests.rs index 2fdbf2a6..6cff70b6 100644 --- a/lambda-extension/src/requests.rs +++ b/lambda-extension/src/requests.rs @@ -1,4 +1,4 @@ -use crate::Error; +use crate::{Error, LogBuffering}; use http::{Method, Request}; use hyper::Body; use lambda_runtime_api_client::build_request; @@ -29,6 +29,33 @@ pub(crate) fn register_request(extension_name: &str, events: &[&str]) -> Result< Ok(req) } +pub(crate) fn subscribe_logs_request( + extension_id: &str, + types: Option<&[&str]>, + buffering: Option, + port_number: u16, +) -> Result, Error> { + let types = types.unwrap_or(&["platform", "function"]); + + let data = serde_json::json!({ + "schemaVersion": "2021-03-18", + "types": types, + "buffering": buffering.unwrap_or_default(), + "destination": { + "protocol": "HTTP", + "URI": format!("http://sandbox.localdomain:{}", port_number), + } + }); + + let req = build_request() + .method(Method::PUT) + .uri("/2020-08-15/logs") + .header(EXTENSION_ID_HEADER, extension_id) + .body(Body::from(serde_json::to_string(&data)?))?; + + Ok(req) +} + /// Payload to send error information to the Extensions API. #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] diff --git a/lambda-integration-tests/src/bin/logs-trait.rs b/lambda-integration-tests/src/bin/logs-trait.rs new file mode 100644 index 00000000..871a5019 --- /dev/null +++ b/lambda-integration-tests/src/bin/logs-trait.rs @@ -0,0 +1,76 @@ +use lambda_extension::{Error, Extension, LambdaLog, LambdaLogRecord, Service, SharedService}; +use std::{ + future::{ready, Future}, + pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, + }, + task::Poll, +}; +use tracing::info; + +/// Custom log processor that increments a counter for each log record. +/// +/// This is a simple example of a custom log processor that can be used to +/// count the number of log records that are processed. +/// +/// This needs to derive Clone (and store the counter in an Arc) as the runtime +/// could need multiple `Service`s to process the logs. +#[derive(Clone, Default)] +struct MyLogsProcessor { + counter: Arc, +} + +impl MyLogsProcessor { + pub fn new() -> Self { + Self::default() + } +} + +/// Implementation of the actual log processor +/// +/// This receives a `Vec` whenever there are new log entries available. +impl Service> for MyLogsProcessor { + type Response = (); + type Error = Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, logs: Vec) -> Self::Future { + let counter = self.counter.fetch_add(1, SeqCst); + for log in logs { + match log.record { + LambdaLogRecord::Function(record) => { + info!("[logs] {} [function] {}: {}", log.time, counter, record.trim()) + } + LambdaLogRecord::Extension(record) => { + info!("[logs] {} [extension] {}: {}", log.time, counter, record.trim()) + } + _ => (), + } + } + + Box::pin(ready(Ok(()))) + } +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // this needs to be set to false, otherwise ANSI color codes will + // show up in a confusing manner in CloudWatch logs. + .with_ansi(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + let logs_processor = SharedService::new(MyLogsProcessor::new()); + Extension::new().with_logs_processor(logs_processor).run().await?; + + Ok(()) +} diff --git a/lambda-integration-tests/template.yaml b/lambda-integration-tests/template.yaml index 848f8be5..d3716d7c 100644 --- a/lambda-integration-tests/template.yaml +++ b/lambda-integration-tests/template.yaml @@ -15,6 +15,7 @@ Resources: CodeUri: ../build/runtime-fn/ Runtime: provided.al2 Layers: + - !Ref LogsTrait - !Ref ExtensionFn - !Ref ExtensionTrait @@ -25,6 +26,7 @@ Resources: CodeUri: ../build/runtime-fn/ Runtime: provided Layers: + - !Ref LogsTrait - !Ref ExtensionFn - !Ref ExtensionTrait @@ -35,6 +37,7 @@ Resources: CodeUri: ../build/runtime-trait/ Runtime: provided.al2 Layers: + - !Ref LogsTrait - !Ref ExtensionFn - !Ref ExtensionTrait @@ -45,6 +48,7 @@ Resources: CodeUri: ../build/runtime-trait/ Runtime: provided Layers: + - !Ref LogsTrait - !Ref ExtensionFn - !Ref ExtensionTrait @@ -76,6 +80,7 @@ Resources: Method: POST Path: /al2/post Layers: + - !Ref LogsTrait - !Ref ExtensionFn - !Ref ExtensionTrait @@ -107,6 +112,7 @@ Resources: Method: POST Path: /al2-trait/post Layers: + - !Ref LogsTrait - !Ref ExtensionFn - !Ref ExtensionTrait @@ -138,6 +144,7 @@ Resources: Method: POST Path: /post Layers: + - !Ref LogsTrait - !Ref ExtensionFn - !Ref ExtensionTrait @@ -169,6 +176,7 @@ Resources: Method: POST Path: /trait/post Layers: + - !Ref LogsTrait - !Ref ExtensionFn - !Ref ExtensionTrait @@ -180,6 +188,7 @@ Resources: Handler: main.handler Runtime: python3.9 Layers: + - !Ref LogsTrait - !Ref ExtensionFn - !Ref ExtensionTrait @@ -191,9 +200,15 @@ Resources: Handler: main.handler Runtime: python3.7 Layers: + - !Ref LogsTrait - !Ref ExtensionFn - !Ref ExtensionTrait + LogsTrait: + Type: AWS::Serverless::LayerVersion + Properties: + ContentUri: ../build/logs-trait/ + ExtensionFn: Type: AWS::Serverless::LayerVersion Properties: