diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index 788b3c89..96506022 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -41,3 +41,4 @@ tracing = { version = "0.1.37", features = ["log"] } tower = { version = "0.4", features = ["util"] } tokio-stream = "0.1.2" lambda_runtime_api_client = { version = "0.8", path = "../lambda-runtime-api-client" } +serde_path_to_error = "0.1.11" diff --git a/lambda-runtime/src/deserializer.rs b/lambda-runtime/src/deserializer.rs new file mode 100644 index 00000000..1841c050 --- /dev/null +++ b/lambda-runtime/src/deserializer.rs @@ -0,0 +1,43 @@ +use std::{error::Error, fmt}; + +use serde::Deserialize; + +use crate::{Context, LambdaEvent}; + +const ERROR_CONTEXT: &str = "failed to deserialize the incoming data into the function's payload type"; + +/// Event payload deserialization error. +/// Returned when the data sent to the function cannot be deserialized +/// into the type that the function receives. +#[derive(Debug)] +pub(crate) struct DeserializeError { + inner: serde_path_to_error::Error, +} + +impl fmt::Display for DeserializeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let path = self.inner.path().to_string(); + if path == "." { + writeln!(f, "{ERROR_CONTEXT}: {}", self.inner) + } else { + writeln!(f, "{ERROR_CONTEXT}: [{path}] {}", self.inner) + } + } +} + +impl Error for DeserializeError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + Some(&self.inner) + } +} + +/// Deserialize the data sent to the function into the type that the function receives. +pub(crate) fn deserialize(body: &[u8], context: Context) -> Result, DeserializeError> +where + T: for<'de> Deserialize<'de>, +{ + let jd = &mut serde_json::Deserializer::from_slice(body); + serde_path_to_error::deserialize(jd) + .map(|payload| LambdaEvent::new(payload, context)) + .map_err(|inner| DeserializeError { inner }) +} diff --git a/lambda-runtime/src/lib.rs b/lambda-runtime/src/lib.rs index 31c9297c..86a0848b 100644 --- a/lambda-runtime/src/lib.rs +++ b/lambda-runtime/src/lib.rs @@ -28,6 +28,7 @@ pub use tower::{self, service_fn, Service}; use tower::{util::ServiceFn, ServiceExt}; use tracing::{error, trace, Instrument}; +mod deserializer; mod requests; #[cfg(test)] mod simulated; @@ -149,8 +150,8 @@ where return Err(parts.status.to_string().into()); } - let body = match serde_json::from_slice(&body) { - Ok(body) => body, + let lambda_event = match deserializer::deserialize(&body, ctx) { + Ok(lambda_event) => lambda_event, Err(err) => { let req = build_event_error_request(request_id, err)?; client.call(req).await.expect("Unable to send response to Runtime APIs"); @@ -161,8 +162,7 @@ where let req = match handler.ready().await { Ok(handler) => { // Catches panics outside of a `Future` - let task = - panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(LambdaEvent::new(body, ctx)))); + let task = panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(lambda_event))); let task = match task { // Catches panics inside of the `Future` diff --git a/lambda-runtime/src/streaming.rs b/lambda-runtime/src/streaming.rs index 15cd210f..b9cf978b 100644 --- a/lambda-runtime/src/streaming.rs +++ b/lambda-runtime/src/streaming.rs @@ -1,6 +1,6 @@ use crate::{ - build_event_error_request, incoming, type_name_of_val, Config, Context, Error, EventErrorRequest, IntoRequest, - LambdaEvent, Runtime, + build_event_error_request, deserializer, incoming, type_name_of_val, Config, Context, Error, EventErrorRequest, + IntoRequest, LambdaEvent, Runtime, }; use bytes::Bytes; use futures::FutureExt; @@ -142,8 +142,8 @@ where return Err(parts.status.to_string().into()); } - let body = match serde_json::from_slice(&body) { - Ok(body) => body, + let lambda_event = match deserializer::deserialize(&body, ctx) { + Ok(lambda_event) => lambda_event, Err(err) => { let req = build_event_error_request(request_id, err)?; client.call(req).await.expect("Unable to send response to Runtime APIs"); @@ -154,8 +154,7 @@ where let req = match handler.ready().await { Ok(handler) => { // Catches panics outside of a `Future` - let task = - panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(LambdaEvent::new(body, ctx)))); + let task = panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(lambda_event))); let task = match task { // Catches panics inside of the `Future`