From daa65d6553afc2cc1d86ed86a627ebf31d8a4d2a Mon Sep 17 00:00:00 2001 From: sundaresanr Date: Mon, 18 May 2026 14:20:44 -0700 Subject: [PATCH] debug(aws_s3 source): include body sample in SQS parse-error context When an SQS message cannot be deserialized as an `SqsEvent`, the existing error only reports the message_id. Extend `ProcessingError::InvalidSqsMessage` to include the message body (truncated to 1024 chars). --- src/sources/aws_s3/sqs.rs | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 9ad5b47be61cc..c8a922f7b69e0 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -211,13 +211,19 @@ pub(super) enum IngestorNewError { #[derive(Debug, Snafu)] pub enum ProcessingError { #[snafu(display( - "Could not parse SQS message with id {} as S3 notification: {}", + "Could not parse SQS message with id {} as S3 notification: {} (body_len={}, was_sns_unwrapped={}, body_sample={:?})", message_id, - source + source, + body_len, + was_sns_unwrapped, + body_sample ))] InvalidSqsMessage { source: serde_json::Error, message_id: String, + body_sample: String, + body_len: usize, + was_sns_unwrapped: bool, }, #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))] GetObject { @@ -598,16 +604,22 @@ impl IngestorProcess { } async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> { - let sqs_body = message.body.unwrap_or_default(); - let sqs_body = serde_json::from_str::(sqs_body.as_ref()) + const MAX_BODY_SAMPLE: usize = 1024; + + let raw_body = message.body.unwrap_or_default(); + let sqs_body = serde_json::from_str::(raw_body.as_ref()) .map(|notification| notification.message) - .unwrap_or(sqs_body); + .unwrap_or_else(|_| raw_body.clone()); + let was_sns_unwrapped = sqs_body != raw_body; let s3_event: SqsEvent = serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu { message_id: message .message_id .clone() .unwrap_or_else(|| "".to_owned()), + body_sample: sqs_body.chars().take(MAX_BODY_SAMPLE).collect::(), + body_len: sqs_body.len(), + was_sns_unwrapped, })?; match s3_event {