Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 100 additions & 74 deletions crates/fakecloud-sqs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,101 @@ use std::collections::{HashMap, VecDeque};
use fakecloud_aws::arn::Arn;
use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};

use crate::state::{MessageAttribute, RedrivePolicy, SharedSqsState, SqsMessage, SqsQueue};
use crate::state::{
MessageAttribute, RedrivePolicy, SharedSqsState, SqsMessage, SqsQueue, SqsState,
};

/// Validate DelaySeconds (0–900) and MaximumMessageSize (1024–1 MiB) if
/// present in the caller-supplied queue attributes. Both match AWS's
/// documented ranges; we return the same error code/message the real
/// service does.
fn validate_create_queue_attributes(
attrs: &HashMap<String, String>,
) -> Result<(), AwsServiceError> {
if let Some(ds) = attrs.get("DelaySeconds") {
match ds.parse::<i64>() {
Ok(d) if (0..=900).contains(&d) => {}
_ => {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidAttributeValue",
"Invalid value for the parameter DelaySeconds.".to_string(),
));
}
}
}

if let Some(mms) = attrs.get("MaximumMessageSize") {
if let Ok(size) = mms.parse::<u64>() {
if !(1024..=1_048_576).contains(&size) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidAttributeValue",
"Invalid value for the parameter MaximumMessageSize.",
));
}
}
}

Ok(())
}

/// Parse the JSON stored under the `RedrivePolicy` queue attribute into
/// a typed `RedrivePolicy`. AWS accepts both string and integer encodings
/// of `maxReceiveCount`, so we tolerate both. Returns `None` for any
/// parse failure (the caller treats it as "no redrive policy").
fn parse_redrive_policy(attr_str: &str) -> Option<RedrivePolicy> {
let rp: Value = serde_json::from_str(attr_str).ok()?;
let dead_letter_target_arn = rp["deadLetterTargetArn"].as_str()?.to_string();
let max_receive_count = rp["maxReceiveCount"]
.as_u64()
.or_else(|| rp["maxReceiveCount"].as_str()?.parse().ok())?
as u32;
Some(RedrivePolicy {
dead_letter_target_arn,
max_receive_count,
})
}

/// Verify that the DLQ referenced by `rp` actually exists, and — when
/// the source queue is FIFO — that the DLQ is itself a FIFO queue.
/// Mirrors AWS's constraint that FIFO and standard queues cannot be
/// paired across a redrive boundary.
fn validate_redrive_policy_target(
state: &SqsState,
rp: &RedrivePolicy,
is_fifo: bool,
) -> Result<(), AwsServiceError> {
let dlq = state
.queues
.values()
.find(|q| q.arn == rp.dead_letter_target_arn);

let Some(dlq) = dlq else {
return Err(AwsServiceError::aws_error_with_headers(
StatusCode::BAD_REQUEST,
"QueueDoesNotExist",
format!(
"Dead letter target does not exist: {}",
rp.dead_letter_target_arn
),
vec![(
"x-amzn-query-error".to_string(),
"AWS.SimpleQueueService.NonExistentQueue;Sender".to_string(),
)],
));
};

if is_fifo && !dlq.is_fifo {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValue",
"Dead-letter queue must be the same type of queue as the source.",
));
}

Ok(())
}

pub struct SqsService {
state: SharedSqsState,
Expand Down Expand Up @@ -508,87 +602,19 @@ impl SqsService {
attributes.insert("FifoThroughputLimit".to_string(), "perQueue".to_string());
}

// Validate DelaySeconds (0–900 inclusive)
if let Some(ds) = new_attributes.get("DelaySeconds") {
match ds.parse::<i64>() {
Ok(d) if (0..=900).contains(&d) => {}
_ => {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidAttributeValue",
"Invalid value for the parameter DelaySeconds.".to_string(),
));
}
}
}

// Validate MaximumMessageSize before inserting
if let Some(mms) = new_attributes.get("MaximumMessageSize") {
if let Ok(size) = mms.parse::<u64>() {
if !(1024..=1_048_576).contains(&size) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidAttributeValue",
"Invalid value for the parameter MaximumMessageSize.",
));
}
}
}
validate_create_queue_attributes(&new_attributes)?;

// Override with provided attributes (trim keys to handle trailing whitespace)
for (k, v) in new_attributes {
attributes.insert(k.trim().to_string(), v);
}

let redrive_policy = attributes.get("RedrivePolicy").and_then(|rp_str| {
let rp: Value = serde_json::from_str(rp_str).ok()?;
let dead_letter_target_arn = rp["deadLetterTargetArn"].as_str()?.to_string();
let max_receive_count = rp["maxReceiveCount"]
.as_u64()
.or_else(|| rp["maxReceiveCount"].as_str()?.parse().ok())?
as u32;
Some(RedrivePolicy {
dead_letter_target_arn,
max_receive_count,
})
});
let redrive_policy = attributes
.get("RedrivePolicy")
.and_then(|s| parse_redrive_policy(s));

// Validate that the DLQ actually exists
if let Some(ref rp) = redrive_policy {
let dlq_exists = state
.queues
.values()
.any(|q| q.arn == rp.dead_letter_target_arn);
if !dlq_exists {
return Err(AwsServiceError::aws_error_with_headers(
StatusCode::BAD_REQUEST,
"QueueDoesNotExist",
format!(
"Dead letter target does not exist: {}",
rp.dead_letter_target_arn
),
vec![(
"x-amzn-query-error".to_string(),
"AWS.SimpleQueueService.NonExistentQueue;Sender".to_string(),
)],
));
}
// Validate FIFO queue can only use FIFO DLQ
if is_fifo {
let dlq_is_fifo = state
.queues
.values()
.find(|q| q.arn == rp.dead_letter_target_arn)
.map(|q| q.is_fifo)
.unwrap_or(false);
if !dlq_is_fifo {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterValue",
"Dead-letter queue must be the same type of queue as the source.",
));
}
}
validate_redrive_policy_target(&state, rp, is_fifo)?;
}

// Normalize RedrivePolicy JSON (convert maxReceiveCount to integer)
Expand Down
Loading