diff --git a/crates/fakecloud-sqs/src/service.rs b/crates/fakecloud-sqs/src/service.rs index 1807e354..2374bd6f 100644 --- a/crates/fakecloud-sqs/src/service.rs +++ b/crates/fakecloud-sqs/src/service.rs @@ -9,7 +9,101 @@ use std::collections::{HashMap, VecDeque}; use fakecloud_aws::arn::Arn; use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError}; -use crate::state::{MessageAttribute, RedrivePolicy, SharedSqsState, SqsMessage, SqsQueue}; +use crate::state::{ + MessageAttribute, RedrivePolicy, SharedSqsState, SqsMessage, SqsQueue, SqsState, +}; + +/// Validate DelaySeconds (0–900) and MaximumMessageSize (1024–1 MiB) if +/// present in the caller-supplied queue attributes. Both match AWS's +/// documented ranges; we return the same error code/message the real +/// service does. +fn validate_create_queue_attributes( + attrs: &HashMap, +) -> Result<(), AwsServiceError> { + if let Some(ds) = attrs.get("DelaySeconds") { + match ds.parse::() { + Ok(d) if (0..=900).contains(&d) => {} + _ => { + return Err(AwsServiceError::aws_error( + StatusCode::BAD_REQUEST, + "InvalidAttributeValue", + "Invalid value for the parameter DelaySeconds.".to_string(), + )); + } + } + } + + if let Some(mms) = attrs.get("MaximumMessageSize") { + if let Ok(size) = mms.parse::() { + if !(1024..=1_048_576).contains(&size) { + return Err(AwsServiceError::aws_error( + StatusCode::BAD_REQUEST, + "InvalidAttributeValue", + "Invalid value for the parameter MaximumMessageSize.", + )); + } + } + } + + Ok(()) +} + +/// Parse the JSON stored under the `RedrivePolicy` queue attribute into +/// a typed `RedrivePolicy`. AWS accepts both string and integer encodings +/// of `maxReceiveCount`, so we tolerate both. Returns `None` for any +/// parse failure (the caller treats it as "no redrive policy"). +fn parse_redrive_policy(attr_str: &str) -> Option { + let rp: Value = serde_json::from_str(attr_str).ok()?; + let dead_letter_target_arn = rp["deadLetterTargetArn"].as_str()?.to_string(); + let max_receive_count = rp["maxReceiveCount"] + .as_u64() + .or_else(|| rp["maxReceiveCount"].as_str()?.parse().ok())? + as u32; + Some(RedrivePolicy { + dead_letter_target_arn, + max_receive_count, + }) +} + +/// Verify that the DLQ referenced by `rp` actually exists, and — when +/// the source queue is FIFO — that the DLQ is itself a FIFO queue. +/// Mirrors AWS's constraint that FIFO and standard queues cannot be +/// paired across a redrive boundary. +fn validate_redrive_policy_target( + state: &SqsState, + rp: &RedrivePolicy, + is_fifo: bool, +) -> Result<(), AwsServiceError> { + let dlq = state + .queues + .values() + .find(|q| q.arn == rp.dead_letter_target_arn); + + let Some(dlq) = dlq else { + return Err(AwsServiceError::aws_error_with_headers( + StatusCode::BAD_REQUEST, + "QueueDoesNotExist", + format!( + "Dead letter target does not exist: {}", + rp.dead_letter_target_arn + ), + vec![( + "x-amzn-query-error".to_string(), + "AWS.SimpleQueueService.NonExistentQueue;Sender".to_string(), + )], + )); + }; + + if is_fifo && !dlq.is_fifo { + return Err(AwsServiceError::aws_error( + StatusCode::BAD_REQUEST, + "InvalidParameterValue", + "Dead-letter queue must be the same type of queue as the source.", + )); + } + + Ok(()) +} pub struct SqsService { state: SharedSqsState, @@ -508,87 +602,19 @@ impl SqsService { attributes.insert("FifoThroughputLimit".to_string(), "perQueue".to_string()); } - // Validate DelaySeconds (0–900 inclusive) - if let Some(ds) = new_attributes.get("DelaySeconds") { - match ds.parse::() { - Ok(d) if (0..=900).contains(&d) => {} - _ => { - return Err(AwsServiceError::aws_error( - StatusCode::BAD_REQUEST, - "InvalidAttributeValue", - "Invalid value for the parameter DelaySeconds.".to_string(), - )); - } - } - } - - // Validate MaximumMessageSize before inserting - if let Some(mms) = new_attributes.get("MaximumMessageSize") { - if let Ok(size) = mms.parse::() { - if !(1024..=1_048_576).contains(&size) { - return Err(AwsServiceError::aws_error( - StatusCode::BAD_REQUEST, - "InvalidAttributeValue", - "Invalid value for the parameter MaximumMessageSize.", - )); - } - } - } + validate_create_queue_attributes(&new_attributes)?; // Override with provided attributes (trim keys to handle trailing whitespace) for (k, v) in new_attributes { attributes.insert(k.trim().to_string(), v); } - let redrive_policy = attributes.get("RedrivePolicy").and_then(|rp_str| { - let rp: Value = serde_json::from_str(rp_str).ok()?; - let dead_letter_target_arn = rp["deadLetterTargetArn"].as_str()?.to_string(); - let max_receive_count = rp["maxReceiveCount"] - .as_u64() - .or_else(|| rp["maxReceiveCount"].as_str()?.parse().ok())? - as u32; - Some(RedrivePolicy { - dead_letter_target_arn, - max_receive_count, - }) - }); + let redrive_policy = attributes + .get("RedrivePolicy") + .and_then(|s| parse_redrive_policy(s)); - // Validate that the DLQ actually exists if let Some(ref rp) = redrive_policy { - let dlq_exists = state - .queues - .values() - .any(|q| q.arn == rp.dead_letter_target_arn); - if !dlq_exists { - return Err(AwsServiceError::aws_error_with_headers( - StatusCode::BAD_REQUEST, - "QueueDoesNotExist", - format!( - "Dead letter target does not exist: {}", - rp.dead_letter_target_arn - ), - vec![( - "x-amzn-query-error".to_string(), - "AWS.SimpleQueueService.NonExistentQueue;Sender".to_string(), - )], - )); - } - // Validate FIFO queue can only use FIFO DLQ - if is_fifo { - let dlq_is_fifo = state - .queues - .values() - .find(|q| q.arn == rp.dead_letter_target_arn) - .map(|q| q.is_fifo) - .unwrap_or(false); - if !dlq_is_fifo { - return Err(AwsServiceError::aws_error( - StatusCode::BAD_REQUEST, - "InvalidParameterValue", - "Dead-letter queue must be the same type of queue as the source.", - )); - } - } + validate_redrive_policy_target(&state, rp, is_fifo)?; } // Normalize RedrivePolicy JSON (convert maxReceiveCount to integer)