Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lambda-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ tracing = { version = "0.1.37", features = ["log"] }
tower = { version = "0.4", features = ["util"] }
tokio-stream = "0.1.2"
lambda_runtime_api_client = { version = "0.8", path = "../lambda-runtime-api-client" }
serde_path_to_error = "0.1.11"
43 changes: 43 additions & 0 deletions lambda-runtime/src/deserializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::{error::Error, fmt};

use serde::Deserialize;

use crate::{Context, LambdaEvent};

const ERROR_CONTEXT: &str = "failed to deserialize the incoming data into the function's payload type";

/// Event payload deserialization error.
/// Returned when the data sent to the function cannot be deserialized
/// into the type that the function receives.
#[derive(Debug)]
pub(crate) struct DeserializeError {
inner: serde_path_to_error::Error<serde_json::Error>,
}

impl fmt::Display for DeserializeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let path = self.inner.path().to_string();
if path == "." {
writeln!(f, "{ERROR_CONTEXT}: {}", self.inner)
} else {
writeln!(f, "{ERROR_CONTEXT}: [{path}] {}", self.inner)
}
}
}

impl Error for DeserializeError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.inner)
}
}

/// Deserialize the data sent to the function into the type that the function receives.
pub(crate) fn deserialize<T>(body: &[u8], context: Context) -> Result<LambdaEvent<T>, DeserializeError>
where
T: for<'de> Deserialize<'de>,
{
let jd = &mut serde_json::Deserializer::from_slice(body);
serde_path_to_error::deserialize(jd)
.map(|payload| LambdaEvent::new(payload, context))
.map_err(|inner| DeserializeError { inner })
}
8 changes: 4 additions & 4 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use tower::{self, service_fn, Service};
use tower::{util::ServiceFn, ServiceExt};
use tracing::{error, trace, Instrument};

mod deserializer;
mod requests;
#[cfg(test)]
mod simulated;
Expand Down Expand Up @@ -149,8 +150,8 @@ where
return Err(parts.status.to_string().into());
}

let body = match serde_json::from_slice(&body) {
Ok(body) => body,
let lambda_event = match deserializer::deserialize(&body, ctx) {
Ok(lambda_event) => lambda_event,
Err(err) => {
let req = build_event_error_request(request_id, err)?;
client.call(req).await.expect("Unable to send response to Runtime APIs");
Expand All @@ -161,8 +162,7 @@ where
let req = match handler.ready().await {
Ok(handler) => {
// Catches panics outside of a `Future`
let task =
panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(LambdaEvent::new(body, ctx))));
let task = panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(lambda_event)));

let task = match task {
// Catches panics inside of the `Future`
Expand Down
11 changes: 5 additions & 6 deletions lambda-runtime/src/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
build_event_error_request, incoming, type_name_of_val, Config, Context, Error, EventErrorRequest, IntoRequest,
LambdaEvent, Runtime,
build_event_error_request, deserializer, incoming, type_name_of_val, Config, Context, Error, EventErrorRequest,
IntoRequest, LambdaEvent, Runtime,
};
use bytes::Bytes;
use futures::FutureExt;
Expand Down Expand Up @@ -142,8 +142,8 @@ where
return Err(parts.status.to_string().into());
}

let body = match serde_json::from_slice(&body) {
Ok(body) => body,
let lambda_event = match deserializer::deserialize(&body, ctx) {
Ok(lambda_event) => lambda_event,
Err(err) => {
let req = build_event_error_request(request_id, err)?;
client.call(req).await.expect("Unable to send response to Runtime APIs");
Expand All @@ -154,8 +154,7 @@ where
let req = match handler.ready().await {
Ok(handler) => {
// Catches panics outside of a `Future`
let task =
panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(LambdaEvent::new(body, ctx))));
let task = panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(lambda_event)));

let task = match task {
// Catches panics inside of the `Future`
Expand Down