diff --git a/internal/admin/principal.go b/internal/admin/principal.go index 883bac79..ba8c1db3 100644 --- a/internal/admin/principal.go +++ b/internal/admin/principal.go @@ -15,6 +15,17 @@ func (r Role) AllowsWrite() bool { return r == RoleFull } +// AllowsRead reports whether the role may execute sensitive read +// operations that surface payload content (e.g. SQS AdminPeekQueue, +// which exposes message bodies and attributes). Both RoleReadOnly and +// RoleFull satisfy the gate; the zero value (unauthenticated / +// role-less principal) does not. List / Describe endpoints use the +// looser session-auth gate because their output is metadata; peek +// diverges because the payload is the stored message itself. +func (r Role) AllowsRead() bool { + return r == RoleReadOnly || r == RoleFull +} + // AuthPrincipal is the authenticated caller derived from a session cookie or, // in the future, a follower→leader forwarded request. The admin handler and // adapter internal entrypoints pass it around instead of a raw HTTP request diff --git a/internal/admin/sqs_handler.go b/internal/admin/sqs_handler.go index f3087762..1c090d87 100644 --- a/internal/admin/sqs_handler.go +++ b/internal/admin/sqs_handler.go @@ -5,6 +5,7 @@ import ( "errors" "log/slog" "net/http" + "path" "strconv" "strings" "time" @@ -39,6 +40,17 @@ type QueueSummary struct { CreatedAt *time.Time `json:"created_at,omitempty"` Attributes map[string]string `json:"attributes,omitempty"` Counters QueueCounters `json:"counters"` + // IsDLQ is true when at least one other queue's RedrivePolicy + // resolves its deadLetterTargetArn to this queue. The SPA uses + // the flag to switch the Messages-tab framing and the Purge + // button label between "Purge messages" and "Purge DLQ". + IsDLQ bool `json:"is_dlq"` + // DLQSources lists the names of queues whose RedrivePolicy + // points at this queue, sorted lexicographically. Empty when + // IsDLQ is false; the SPA renders these as chips on the queue + // detail page so the operator confirms what feeds the DLQ + // before purging. + DLQSources []string `json:"dlq_sources,omitempty"` } // QueueCounters mirrors the three Approximate* counters AWS exposes @@ -56,13 +68,80 @@ type QueueCounters struct { // // AdminDescribeQueue returns (nil, false, nil) for a missing queue so // callers can distinguish "not found" from a storage error without -// sniffing sentinels. AdminDeleteQueue returns the structured -// sentinels below so the handler can map them to HTTP statuses -// without leaking the adapter's error vocabulary. +// sniffing sentinels. AdminDeleteQueue / AdminPeekQueue / AdminPurgeQueue +// return the structured sentinels below so the handler can map them +// to HTTP statuses without leaking the adapter's error vocabulary. type QueuesSource interface { AdminListQueues(ctx context.Context) ([]string, error) AdminDescribeQueue(ctx context.Context, name string) (*QueueSummary, bool, error) AdminDeleteQueue(ctx context.Context, principal AuthPrincipal, name string) error + // AdminPeekQueue returns a non-destructive sample of currently- + // visible messages on the queue (read role required). Wired only + // when the source supports it; the bridge in main_admin.go + // translates between adapter and admin types so the admin package + // stays free of the adapter dependency tree. + AdminPeekQueue(ctx context.Context, principal AuthPrincipal, name string, opts PeekMessageOptions) (PeekResult, error) + // AdminPurgeQueue is the SigV4-bypass purge counterpart to + // AdminDeleteQueue: bumps the queue's generation so every + // message becomes unreachable, leaving the queue itself in place. + // Returns the committed generation pair so the audit log records + // the value that actually landed. + AdminPurgeQueue(ctx context.Context, principal AuthPrincipal, name string) (PurgeResult, error) +} + +// PeekMessageOptions controls a peek call. Field defaults match the +// adapter's documented bounds (Limit clamped to [1, 100] with 0 +// meaning "default of 20"; BodyMaxBytes clamped to [256, 262144] +// with 0 meaning "default of 4096"; Cursor empty means "start from +// the front"). +type PeekMessageOptions struct { + Limit int + Cursor string + BodyMaxBytes int +} + +// PeekResult is the admin-package projection of the adapter's +// AdminPeekQueue 3-tuple return (rows, nextCursor, error) bundled +// into one value so QueuesSource's method signatures stay regular. +// The handler renders this directly as the wire JSON. +type PeekResult struct { + Messages []PeekedMessage `json:"messages"` + NextCursor string `json:"next_cursor,omitempty"` +} + +// PeekedMessage is one row in the peek response. JSON tags pin the +// snake_case wire shape the design doc §3.5 specifies. +type PeekedMessage struct { + MessageID string `json:"message_id"` + Body string `json:"body"` + BodyTruncated bool `json:"body_truncated"` + BodyOriginalSize int64 `json:"body_original_size"` + SentTimestamp time.Time `json:"sent_timestamp"` + ReceiveCount int32 `json:"receive_count"` + GroupID string `json:"group_id,omitempty"` + DeduplicationID string `json:"deduplication_id,omitempty"` + Attributes map[string]PeekedAttribute `json:"attributes,omitempty"` +} + +// PeekedAttribute mirrors the typed SQS MessageAttribute shape so +// binary payloads and the DataType discriminator survive the round +// trip to the SPA. +type PeekedAttribute struct { + DataType string `json:"data_type"` + StringValue string `json:"string_value,omitempty"` + BinaryValue []byte `json:"binary_value,omitempty"` +} + +// PurgeResult carries the committed-OCC generation pair so the +// admin handler's audit line records the value that actually landed +// (separate before/after meta reads would race a concurrent purge). +// JSON tags are pinned even though the current Phase 4 handler +// returns 204 with no body — the Phase 5 audit log will record the +// generation pair and a future wire encoder needs the snake_case +// shape (Claude r1 on PR #797 flagged the pre-emptive fix). +type PurgeResult struct { + GenerationBefore uint64 `json:"generation_before"` + GenerationAfter uint64 `json:"generation_after"` } // Errors the QueuesSource may return for the handler to map onto a @@ -82,8 +161,34 @@ var ( ErrQueuesNotFound = errors.New("admin sqs: queue not found") // ErrQueuesValidation — request shape is bad (400). ErrQueuesValidation = errors.New("admin sqs: validation failed") + // ErrQueuesPurgeInProgress — the queue's 60-second + // PurgeQueue cooldown is active (429). The handler matches + // against this sentinel and pulls RetryAfter from a typed + // PurgeInProgressError wrapping it, so callers can branch via + // errors.Is while extracting the duration via errors.As. + ErrQueuesPurgeInProgress = errors.New("admin sqs: purge in progress") ) +// PurgeInProgressError is the typed admin error returned when the +// queue's 60-second PurgeQueue rate limit is active. RetryAfter +// carries the remaining wall-clock duration the caller should wait, +// derived from the same LastPurgedAtMillis value the adapter's +// rate-limit check observed inside its OCC read. +type PurgeInProgressError struct { + RetryAfter time.Duration +} + +func (e *PurgeInProgressError) Error() string { + return "admin sqs: purge already in progress; retry after " + e.RetryAfter.String() +} + +// Is lets errors.Is(err, ErrQueuesPurgeInProgress) match any +// *PurgeInProgressError so the handler can branch on the sentinel +// while errors.As pulls the typed duration. +func (e *PurgeInProgressError) Is(target error) bool { + return target == ErrQueuesPurgeInProgress +} + // SqsHandler serves /admin/api/v1/sqs/queues and // /admin/api/v1/sqs/queues/{name}. Reads (list, describe) accept GET; // delete accepts DELETE and goes through the same protected @@ -125,34 +230,181 @@ func (h *SqsHandler) WithRoleStore(r RoleStore) *SqsHandler { return h } -// ServeHTTP routes /queues and /queues/{name}. Method handling -// mirrors DynamoHandler — keep the two parallel so an operator -// reading one understands the other for free. +// ServeHTTP routes the SQS admin paths: +// +// GET /admin/api/v1/sqs/queues — list +// GET /admin/api/v1/sqs/queues/{name} — describe +// DELETE /admin/api/v1/sqs/queues/{name} — delete +// GET /admin/api/v1/sqs/queues/{name}/messages — peek +// DELETE /admin/api/v1/sqs/queues/{name}/messages — purge +// +// Routing follows the 6-step ordered procedure documented in +// docs/design/2026_05_16_..._admin_purge_queue.md §3.4. The procedure +// closes confused-deputy classes (empty / %2F / %252F / dot-segment) +// that would otherwise let a crafted URL route to the wrong queue or +// the wrong sub-resource. +// sqsSubResourceMessages is the recognised sub-resource segment on +// /queues/{name}/{sub}. Anything else is a 404. Lifted to a constant +// so the dispatcher and any future routing layer stay aligned. +const sqsSubResourceMessages = "messages" + +// sqsRouteSegmentsQueue / sqsRouteSegmentsMessages are the segment +// counts the dispatcher recognises after the prefix trim. Named to +// keep the dispatch switch self-documenting. +const ( + sqsRouteSegmentsQueue = 1 + sqsRouteSegmentsMessages = 2 +) + func (h *SqsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - switch { - case r.URL.Path == pathSqsQueues: - switch r.Method { - case http.MethodGet: - h.handleList(w, r) - default: - writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET") + escaped := r.URL.EscapedPath() + if h.serveCollectionRoot(w, r, escaped) { + return + } + if !strings.HasPrefix(escaped, pathPrefixSqsQueues) { + writeJSONError(w, http.StatusNotFound, "unknown_endpoint", + "no admin SQS handler is registered for this path") + return + } + segments, ok := h.parseSqsRouteSegments(w, escaped) + if !ok { + return + } + h.dispatchSqsRoute(w, r, segments) +} + +// serveCollectionRoot handles the .../queues and .../queues/ paths +// before any splitting (Step 2 of the routing procedure). Returns +// true when the request was handled (the caller short-circuits). +// strings.TrimPrefix returns its input unchanged when the prefix +// does not match and strings.Split("", "/") returns a one-element +// slice — handling the collection root pre-split sidesteps both +// corner cases. +func (h *SqsHandler) serveCollectionRoot(w http.ResponseWriter, r *http.Request, escaped string) bool { + if escaped != pathSqsQueues && escaped != pathPrefixSqsQueues { + return false + } + switch r.Method { + case http.MethodGet: + h.handleList(w, r) + default: + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET") + } + return true +} + +// parseSqsRouteSegments runs Steps 3 to 5 of the routing procedure +// — trim the prefix, split, drop the single trailing empty, validate +// each segment (rejecting empty / % / dot-segments), then run +// path.Clean for trailing-slash normalisation. Returns the cleaned +// segment slice or false after writing a 400 invalid_path response. +func (h *SqsHandler) parseSqsRouteSegments(w http.ResponseWriter, escaped string) ([]string, bool) { + rest := strings.TrimPrefix(escaped, pathPrefixSqsQueues) + segments := dropSingleTrailingEmpty(strings.Split(rest, "/")) + for _, seg := range segments { + if !isValidSqsPathSegment(seg) { + writeJSONError(w, http.StatusBadRequest, "invalid_path", + "path segment is empty, contains %, or is a dot-segment") + return nil, false } - case strings.HasPrefix(r.URL.Path, pathPrefixSqsQueues): - name := strings.TrimPrefix(r.URL.Path, pathPrefixSqsQueues) - switch r.Method { - case http.MethodGet: - h.handleDescribe(w, r, name) - case http.MethodDelete: - h.handleDelete(w, r, name) - default: - writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET or DELETE") + } + cleaned := strings.TrimPrefix(path.Clean(pathPrefixSqsQueues+rest), pathPrefixSqsQueues) + return dropSingleTrailingEmpty(strings.Split(cleaned, "/")), true +} + +// dropSingleTrailingEmpty removes one trailing empty segment so a +// legal trailing slash (/queues/orders/messages/) does not collide +// with the empty-segment rejection in step 4. Never drops an +// interior empty: /queues/orders//messages still fails validation. +func dropSingleTrailingEmpty(segments []string) []string { + if len(segments) > 1 && segments[len(segments)-1] == "" { + return segments[:len(segments)-1] + } + return segments +} + +// dispatchSqsRoute is the post-validation router. Segment count tells +// us which resource family the request targets; the per-resource +// dispatcher checks the HTTP method. +func (h *SqsHandler) dispatchSqsRoute(w http.ResponseWriter, r *http.Request, segments []string) { + name, err := decodeSqsPathSegment(segments[0]) + if err != nil { + writeJSONError(w, http.StatusBadRequest, "invalid_path", + "queue name segment is not valid percent-encoding") + return + } + switch len(segments) { + case sqsRouteSegmentsQueue: + h.dispatchQueueResource(w, r, name) + case sqsRouteSegmentsMessages: + sub, err := decodeSqsPathSegment(segments[1]) + if err != nil || sub != sqsSubResourceMessages { + writeJSONError(w, http.StatusNotFound, "unknown_endpoint", + "unknown sub-resource on this queue") + return } + h.dispatchMessagesResource(w, r, name) default: writeJSONError(w, http.StatusNotFound, "unknown_endpoint", - "no admin SQS handler is registered for this path") + "too many path segments") } } +// dispatchQueueResource handles the .../queues/{name} endpoint: +// GET = describe, DELETE = delete. +func (h *SqsHandler) dispatchQueueResource(w http.ResponseWriter, r *http.Request, name string) { + switch r.Method { + case http.MethodGet: + h.handleDescribe(w, r, name) + case http.MethodDelete: + h.handleDelete(w, r, name) + default: + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET or DELETE") + } +} + +// dispatchMessagesResource handles the .../queues/{name}/messages +// endpoint: GET = peek, DELETE = purge. +func (h *SqsHandler) dispatchMessagesResource(w http.ResponseWriter, r *http.Request, name string) { + switch r.Method { + case http.MethodGet: + h.handlePeek(w, r, name) + case http.MethodDelete: + h.handlePurge(w, r, name) + default: + writeJSONError(w, http.StatusMethodNotAllowed, "method_not_allowed", "only GET or DELETE") + } +} + +// isValidSqsPathSegment enforces the step-4 rules. Every segment is +// rejected if it is empty, contains a percent sign (closes the +// %2F / %252F / %2e / %2E / %2E%2E percent-encoded slash and +// dot-segment classes structurally without enumerating depths), or +// is one of the literal dot-segment strings . / .. (closes the raw +// dot-segment traversal class — path.Clean would otherwise collapse +// .. against the preceding segment and dispatch on the wrong path). +func isValidSqsPathSegment(seg string) bool { + if seg == "" { + return false + } + if strings.ContainsRune(seg, '%') { + return false + } + if seg == "." || seg == ".." { + return false + } + return true +} + +// decodeSqsPathSegment is a small wrapper over url.PathUnescape. The +// step-4 validation rejects every % in raw segments, so by the time +// this runs the segment is guaranteed to be percent-free and the +// decoder is a no-op. Kept as a function so a future relaxation can +// turn it back on without touching the routing logic. +func decodeSqsPathSegment(seg string) (string, error) { + return seg, nil +} + type listQueuesResponse struct { Queues []string `json:"queues"` } @@ -210,7 +462,7 @@ func (h *SqsHandler) handleDescribe(w http.ResponseWriter, r *http.Request, name } func (h *SqsHandler) handleDelete(w http.ResponseWriter, r *http.Request, name string) { - principal, ok := h.principalForWrite(w, r) + principal, ok := h.principalForWrite(w, r, "delete queues") if !ok { return } @@ -226,6 +478,132 @@ func (h *SqsHandler) handleDelete(w http.ResponseWriter, r *http.Request, name s w.WriteHeader(http.StatusNoContent) } +// handlePeek serves GET /admin/api/v1/sqs/queues/{name}/messages. +// Read role required (RoleReadOnly or RoleFull). Query parameters +// follow snake_case (limit / cursor / body_max_bytes); the SPA's +// TypeScript client adapter does the case translation at the +// request boundary. +func (h *SqsHandler) handlePeek(w http.ResponseWriter, r *http.Request, name string) { + principal, ok := h.principalForReadSensitive(w, r) + if !ok { + return + } + opts, ok := parsePeekQueryParams(w, r) + if !ok { + return + } + result, err := h.source.AdminPeekQueue(r.Context(), principal, name, opts) + if err != nil { + writeQueuesError(w, err, h.logger, r) + return + } + if result.Messages == nil { + // Force the empty-result case to render as `{"messages": []}` + // rather than `{"messages": null}`. The SPA iterates the + // array directly and would crash on null. + result.Messages = []PeekedMessage{} + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(result); err != nil { + h.logger.LogAttrs(r.Context(), slog.LevelWarn, "admin sqs peek response encode failed", + slog.String("error", err.Error()), + ) + } +} + +// parsePeekQueryParams folds the snake_case query string into a +// typed PeekMessageOptions. Non-numeric limit / body_max_bytes +// surface as ErrQueuesValidation-shaped 400s; the adapter does the +// clamp itself, so we forward any in-range integer untouched. +func parsePeekQueryParams(w http.ResponseWriter, r *http.Request) (PeekMessageOptions, bool) { + opts := PeekMessageOptions{Cursor: r.URL.Query().Get("cursor")} + if raw := r.URL.Query().Get("limit"); raw != "" { + n, err := strconv.Atoi(raw) + if err != nil { + writeJSONError(w, http.StatusBadRequest, "invalid_request", "limit must be an integer") + return PeekMessageOptions{}, false + } + opts.Limit = n + } + if raw := r.URL.Query().Get("body_max_bytes"); raw != "" { + n, err := strconv.Atoi(raw) + if err != nil { + writeJSONError(w, http.StatusBadRequest, "invalid_request", "body_max_bytes must be an integer") + return PeekMessageOptions{}, false + } + opts.BodyMaxBytes = n + } + return opts, true +} + +// handlePurge serves DELETE /admin/api/v1/sqs/queues/{name}/messages. +// Write role required (RoleFull only). Success returns 204; the +// 60-second rate limit surfaces as 429 with the Retry-After header +// and a retry_after_seconds JSON body via writeQueuesError. +func (h *SqsHandler) handlePurge(w http.ResponseWriter, r *http.Request, name string) { + principal, ok := h.principalForWriteOnPurge(w, r) + if !ok { + return + } + if strings.TrimSpace(name) == "" { + writeJSONError(w, http.StatusBadRequest, "invalid_queue_name", "queue name is required") + return + } + if _, err := h.source.AdminPurgeQueue(r.Context(), principal, name); err != nil { + writeQueuesError(w, err, h.logger, r) + return + } + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusNoContent) +} + +// principalForReadSensitive gates peek (and any future read endpoint +// that surfaces stored payload content). Mirrors principalForWrite's +// live-role re-check pattern but accepts the lower RoleReadOnly +// tier. List / Describe themselves stay on the looser session-auth +// gate because their output is queue metadata already shown on the +// SPA's queue list page; peek diverges because the payload is the +// stored message bodies themselves (Codex r9 P1 on the design doc +// flagged the security-class distinction). +func (h *SqsHandler) principalForReadSensitive(w http.ResponseWriter, r *http.Request) (AuthPrincipal, bool) { + principal, ok := PrincipalFromContext(r.Context()) + if !ok { + writeJSONError(w, http.StatusInternalServerError, "internal", "missing session principal") + return AuthPrincipal{}, false + } + if h.roles != nil { + live, exists := h.roles.LookupRole(principal.AccessKey) + if !exists { + writeJSONError(w, http.StatusForbidden, "forbidden", + "this access key is not authorised to read queue contents") + return AuthPrincipal{}, false + } + if !live.AllowsRead() { + writeJSONError(w, http.StatusForbidden, "forbidden", + "this access key is not authorised to read queue contents") + return AuthPrincipal{}, false + } + principal.Role = live + } else if !principal.Role.AllowsRead() { + writeJSONError(w, http.StatusForbidden, "forbidden", + "this access key is not authorised to read queue contents") + return AuthPrincipal{}, false + } + return principal, true +} + +// principalForWriteOnPurge wraps principalForWrite with the verb the +// purge handler wants on rejection messages. Without this, an +// operator clicking Purge sees a 403 body saying "not authorised to +// delete queues" (the SqsHandler's principalForWrite was authored +// before the purge handler existed). Claude r1 caught the misleading +// wording. +func (h *SqsHandler) principalForWriteOnPurge(w http.ResponseWriter, r *http.Request) (AuthPrincipal, bool) { + return h.principalForWrite(w, r, "purge messages") +} + // principalForWrite resolves the live role from the RoleStore (when // configured), gates the request, and returns the principal with the // **live** role overridden in place — so the role that flows downstream @@ -236,12 +614,19 @@ func (h *SqsHandler) handleDelete(w http.ResponseWriter, r *http.Request, name s // ErrAdminForbidden, forcing the user to log out and back in for a // delete to work. // +// The action verb (e.g. "delete queues", "purge messages") is woven +// into the 403 rejection body so an operator sees the right +// operation name. Without this, a read-only principal clicking +// Purge would be told they "lack the role to delete queues" — a +// confusing message about the wrong operation (Claude r1 caught +// this on PR #797). +// // Failure paths write the response and return ok=false; callers // short-circuit on the bool. Logged-out / wrong-role callers never // reach the source layer, so the leader's identity is not leaked // by indirection (forbidden response is the same shape regardless // of leadership state). -func (h *SqsHandler) principalForWrite(w http.ResponseWriter, r *http.Request) (AuthPrincipal, bool) { +func (h *SqsHandler) principalForWrite(w http.ResponseWriter, r *http.Request, action string) (AuthPrincipal, bool) { principal, ok := PrincipalFromContext(r.Context()) if !ok { // SessionAuth runs before this handler, so a missing @@ -257,12 +642,12 @@ func (h *SqsHandler) principalForWrite(w http.ResponseWriter, r *http.Request) ( // login. Treat it as no-access regardless of what // the JWT claimed. writeJSONError(w, http.StatusForbidden, "forbidden", - "this access key is not authorised to delete queues") + "this access key is not authorised to "+action) return AuthPrincipal{}, false } if !live.AllowsWrite() { writeJSONError(w, http.StatusForbidden, "forbidden", - "this access key is not authorised to delete queues") + "this access key is not authorised to "+action) return AuthPrincipal{}, false } // Forward the live role downstream so the adapter @@ -273,7 +658,7 @@ func (h *SqsHandler) principalForWrite(w http.ResponseWriter, r *http.Request) ( principal.Role = live } else if !principal.Role.AllowsWrite() { writeJSONError(w, http.StatusForbidden, "forbidden", - "this access key is not authorised to delete queues") + "this access key is not authorised to "+action) return AuthPrincipal{}, false } return principal, true @@ -284,7 +669,16 @@ func (h *SqsHandler) principalForWrite(w http.ResponseWriter, r *http.Request) ( // — the raw err.Error() may include adapter internals (Pebble paths, // raft peer ids) that should not flow to the SPA. func writeQueuesError(w http.ResponseWriter, err error, logger *slog.Logger, r *http.Request) { + // errors.As lets us pull the typed RetryAfter duration from a + // PurgeInProgressError without losing the errors.Is sentinel + // match below. The typed-error path is the only race-free way + // to surface the duration: a second loadQueueMetaAt call would + // race a concurrent purge resetting LastPurgedAtMillis in the + // 60-second window. + var purgeErr *PurgeInProgressError switch { + case errors.As(err, &purgeErr): + writePurgeInProgress(w, purgeErr) case errors.Is(err, ErrQueuesForbidden): writeJSONError(w, http.StatusForbidden, "forbidden", "principal lacks required role") case errors.Is(err, ErrQueuesNotLeader): @@ -303,3 +697,26 @@ func writeQueuesError(w http.ResponseWriter, err error, logger *slog.Logger, r * "queue operation failed; see server logs") } } + +// writePurgeInProgress emits the 429 response shape the design doc +// §3.4 specifies: Retry-After header (rounded up to whole seconds so +// a client retrying exactly at the deadline is guaranteed to clear) +// + JSON body { code, message, retry_after_seconds }. +func writePurgeInProgress(w http.ResponseWriter, err *PurgeInProgressError) { + secs := int64(err.RetryAfter / time.Second) + if err.RetryAfter%time.Second != 0 { + secs++ + } + if secs < 1 { + secs = 1 + } + w.Header().Set("Retry-After", strconv.FormatInt(secs, 10)) + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Header().Set("Cache-Control", "no-store") + w.WriteHeader(http.StatusTooManyRequests) + _ = json.NewEncoder(w).Encode(map[string]any{ + "code": "PurgeQueueInProgress", + "message": "only one PurgeQueue operation on each queue is allowed every 60 seconds", + "retry_after_seconds": secs, + }) +} diff --git a/internal/admin/sqs_handler_test.go b/internal/admin/sqs_handler_test.go index 30ebe23e..f39fd01a 100644 --- a/internal/admin/sqs_handler_test.go +++ b/internal/admin/sqs_handler_test.go @@ -2,10 +2,12 @@ package admin import ( "context" + "encoding/json" "errors" "net/http" "net/http/httptest" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -18,8 +20,17 @@ type stubQueuesSource struct { queues []string describeErr error deleteErr error + peekErr error + peekResult PeekResult + purgeErr error + purgeResult PurgeResult lastDeleteName string lastDeletePrincipal AuthPrincipal + lastPeekName string + lastPeekPrincipal AuthPrincipal + lastPeekOpts PeekMessageOptions + lastPurgeName string + lastPurgePrincipal AuthPrincipal } func (s *stubQueuesSource) AdminListQueues(_ context.Context) ([]string, error) { @@ -53,6 +64,25 @@ func (s *stubQueuesSource) AdminDeleteQueue(_ context.Context, principal AuthPri return ErrQueuesNotFound } +func (s *stubQueuesSource) AdminPeekQueue(_ context.Context, principal AuthPrincipal, name string, opts PeekMessageOptions) (PeekResult, error) { + s.lastPeekName = name + s.lastPeekPrincipal = principal + s.lastPeekOpts = opts + if s.peekErr != nil { + return PeekResult{}, s.peekErr + } + return s.peekResult, nil +} + +func (s *stubQueuesSource) AdminPurgeQueue(_ context.Context, principal AuthPrincipal, name string) (PurgeResult, error) { + s.lastPurgeName = name + s.lastPurgePrincipal = principal + if s.purgeErr != nil { + return PurgeResult{}, s.purgeErr + } + return s.purgeResult, nil +} + // TestSqsHandler_DeleteQueue_LivePromotion pins the live-role // forwarding contract: a JWT minted while the access key was // read_only must, after the operator promotes the key to full in @@ -198,6 +228,274 @@ func TestSqsHandler_DescribeQueue_ZeroCreatedAtIsOmittedOnTheWire(t *testing.T) "created_at must be omitted entirely when the queue has no wall-clock timestamp so the SPA renders the placeholder; body=%s", body) } -// helper to silence the unused-import warning when errors is only -// referenced inside one of the test functions. -var _ = errors.New +// TestSqsHandler_PeekMessages_HappyPath confirms a GET on +// /queues/{name}/messages dispatches to AdminPeekQueue, forwards +// the parsed limit / cursor / body_max_bytes query params, and +// renders the response as snake_case JSON with messages and +// next_cursor. +func TestSqsHandler_PeekMessages_HappyPath(t *testing.T) { + src := &stubQueuesSource{ + queues: []string{"orders"}, + peekResult: PeekResult{ + Messages: []PeekedMessage{ + {MessageID: "m1", Body: "hello", BodyOriginalSize: 5, SentTimestamp: time.UnixMilli(1700000000000).UTC()}, + }, + NextCursor: "cursor-page-2", + }, + } + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodGet, pathPrefixSqsQueues+"orders/messages?limit=5&cursor=abc&body_max_bytes=1024", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_RO", Role: RoleReadOnly})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code, "body=%s", rec.Body.String()) + require.Equal(t, "orders", src.lastPeekName) + require.Equal(t, PeekMessageOptions{Limit: 5, Cursor: "abc", BodyMaxBytes: 1024}, src.lastPeekOpts) + var body map[string]any + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &body)) + require.Contains(t, body, "messages") + require.Equal(t, "cursor-page-2", body["next_cursor"]) +} + +// TestSqsHandler_PeekMessages_EmptyArrayNotNull pins the nil to +// empty-slice normalisation: a queue with no visible messages must +// render as `{"messages":[]}` not `{"messages":null}` so the SPA's +// `messages.length` iteration does not crash. +func TestSqsHandler_PeekMessages_EmptyArrayNotNull(t *testing.T) { + src := &stubQueuesSource{ + queues: []string{"orders"}, + peekResult: PeekResult{Messages: nil, NextCursor: ""}, + } + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodGet, pathPrefixSqsQueues+"orders/messages", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_RO", Role: RoleReadOnly})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + require.Contains(t, rec.Body.String(), `"messages":[]`) +} + +// TestSqsHandler_PeekMessages_NoRoleForbidden confirms a role-less +// principal (the zero AuthPrincipal) is rejected with 403 even +// without a RoleStore wired — peek's payload is sensitive enough +// that the lower-bound gate fires. +func TestSqsHandler_PeekMessages_NoRoleForbidden(t *testing.T) { + src := &stubQueuesSource{queues: []string{"orders"}} + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodGet, pathPrefixSqsQueues+"orders/messages", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_NOROLE", Role: ""})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusForbidden, rec.Code) + require.Empty(t, src.lastPeekName, "source must not be reached for role-less principal") +} + +// TestSqsHandler_PeekMessages_BadLimitQueryParam pins the query +// parsing: a non-numeric limit returns a 400 without dispatching. +func TestSqsHandler_PeekMessages_BadLimitQueryParam(t *testing.T) { + src := &stubQueuesSource{queues: []string{"orders"}} + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodGet, pathPrefixSqsQueues+"orders/messages?limit=NaN", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_RO", Role: RoleReadOnly})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Empty(t, src.lastPeekName) +} + +// TestSqsHandler_PurgeQueue_HappyPath confirms a DELETE on +// /queues/{name}/messages dispatches to AdminPurgeQueue (write role +// required) and returns 204 on success. +func TestSqsHandler_PurgeQueue_HappyPath(t *testing.T) { + src := &stubQueuesSource{ + queues: []string{"orders"}, + purgeResult: PurgeResult{GenerationBefore: 1, GenerationAfter: 2}, + } + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders/messages", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_FULL", Role: RoleFull})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNoContent, rec.Code, "body=%s", rec.Body.String()) + require.Equal(t, "orders", src.lastPurgeName) + require.Equal(t, RoleFull, src.lastPurgePrincipal.Role) +} + +// TestSqsHandler_PurgeQueue_RateLimited429 pins the 60-second +// cooldown response shape: 429 status, Retry-After header rounded +// up to whole seconds, and retry_after_seconds JSON field. errors.As +// on the *PurgeInProgressError chain feeds the duration in. +func TestSqsHandler_PurgeQueue_RateLimited429(t *testing.T) { + src := &stubQueuesSource{ + queues: []string{"orders"}, + purgeErr: &PurgeInProgressError{RetryAfter: 42*time.Second + 500*time.Millisecond}, + } + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders/messages", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_FULL", Role: RoleFull})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusTooManyRequests, rec.Code) + // 42.5s rounds up to 43. + require.Equal(t, "43", rec.Header().Get("Retry-After")) + var body map[string]any + require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &body)) + require.Equal(t, "PurgeQueueInProgress", body["code"]) + require.EqualValues(t, 43, body["retry_after_seconds"]) +} + +// TestSqsHandler_PurgeQueue_ReadOnlyForbidden pins the write gate: +// a read-only principal cannot purge. The 403 body carries the +// purge-specific action verb ("purge messages"), not the +// delete-specific one — Claude r1 caught the misleading wording +// when principalForWrite was shared between handleDelete and +// handlePurge. +func TestSqsHandler_PurgeQueue_ReadOnlyForbidden(t *testing.T) { + src := &stubQueuesSource{queues: []string{"orders"}} + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders/messages", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_RO", Role: RoleReadOnly})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusForbidden, rec.Code) + require.Empty(t, src.lastPurgeName, "read-only principal must not reach the purge source") + require.Contains(t, rec.Body.String(), "purge messages", + "purge rejection body must use the purge verb (not 'delete queues'); got=%s", rec.Body.String()) + require.NotContains(t, rec.Body.String(), "delete queues", + "purge rejection body must not say 'delete queues' (Claude r1 fix); got=%s", rec.Body.String()) +} + +// TestSqsHandler_Routing_PathValidation_BadInputs is the table-driven +// pin for the 6-step routing's confused-deputy classes (design doc +// §3.4): every input must be rejected with 400 invalid_path before +// dispatch. +func TestSqsHandler_Routing_PathValidation_BadInputs(t *testing.T) { + cases := []struct { + name string + url string + }{ + {"empty queue name segment", pathSqsQueues + "//messages"}, + {"percent-encoded slash", pathSqsQueues + "/%2F/messages"}, + {"double-encoded slash", pathSqsQueues + "/%252F/messages"}, + {"dot-segment escape", pathSqsQueues + "/orders/../messages"}, + {"leading dot", pathSqsQueues + "/./orders/messages"}, + {"empty interior segment", pathSqsQueues + "/orders//messages"}, + {"percent-encoded dot-dot", pathSqsQueues + "/orders/%2E%2E/messages"}, + {"percent-encoded dot", pathSqsQueues + "/orders/%2e/messages"}, + } + src := &stubQueuesSource{queues: []string{"orders", "messages"}} + h := NewSqsHandler(src) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, tc.url, nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_RO", Role: RoleReadOnly})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusBadRequest, rec.Code, + "%s: want 400 got %d body=%s", tc.name, rec.Code, rec.Body.String()) + }) + } +} + +// TestSqsHandler_Routing_TrailingSlashAccepted confirms path.Clean +// normalises /queues/orders/messages/ (legal trailing slash) into +// /queues/orders/messages — it dispatches to peek (GET) and purge +// (DELETE) just like the un-suffixed form. +func TestSqsHandler_Routing_TrailingSlashAccepted(t *testing.T) { + src := &stubQueuesSource{queues: []string{"orders"}, peekResult: PeekResult{}} + h := NewSqsHandler(src) + + t.Run("GET with trailing slash dispatches peek", func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, pathPrefixSqsQueues+"orders/messages/", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_RO", Role: RoleReadOnly})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code, "body=%s", rec.Body.String()) + require.Equal(t, "orders", src.lastPeekName) + }) + + t.Run("DELETE with trailing slash dispatches purge", func(t *testing.T) { + src.lastPurgeName = "" + req := httptest.NewRequest(http.MethodDelete, pathPrefixSqsQueues+"orders/messages/", nil) + req = req.WithContext(context.WithValue(req.Context(), ctxKeyPrincipal, + AuthPrincipal{AccessKey: "AKIA_FULL", Role: RoleFull})) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNoContent, rec.Code) + require.Equal(t, "orders", src.lastPurgeName) + }) +} + +// TestSqsHandler_Routing_QueueNamedMessages confirms a queue +// literally named "messages" still routes to describe / delete on +// the /queues/messages path (no /messages sub-resource segment). +// Distinguishes "queue named messages" from "missing queue name". +func TestSqsHandler_Routing_QueueNamedMessages(t *testing.T) { + src := &stubQueuesSource{queues: []string{"messages"}} + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodGet, pathPrefixSqsQueues+"messages", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code, "body=%s", rec.Body.String()) + require.Contains(t, rec.Body.String(), `"name":"messages"`) +} + +// TestSqsHandler_Routing_UnknownSubResource pins the 404 fallback +// for sub-resources we don't recognise (e.g. /queues/orders/foo +// instead of /queues/orders/messages). +func TestSqsHandler_Routing_UnknownSubResource(t *testing.T) { + src := &stubQueuesSource{queues: []string{"orders"}} + h := NewSqsHandler(src) + req := httptest.NewRequest(http.MethodGet, pathPrefixSqsQueues+"orders/notathing", nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusNotFound, rec.Code) +} + +// TestSqsHandler_Routing_CollectionRoot pins the step-2 pre-split +// dispatch: bare /queues and /queues/ both route to handleList, +// not through the queue-name validator that would reject them as +// "empty queue name segment". +func TestSqsHandler_Routing_CollectionRoot(t *testing.T) { + src := &stubQueuesSource{queues: []string{"orders"}} + h := NewSqsHandler(src) + for _, path := range []string{pathSqsQueues, pathPrefixSqsQueues} { + t.Run(path, func(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, path, nil) + rec := httptest.NewRecorder() + h.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code, "body=%s", rec.Body.String()) + require.Contains(t, rec.Body.String(), `"queues":["orders"]`) + }) + } +} + +// TestPurgeInProgressError_IsAndAs pins the typed-error contract: +// errors.Is matches ErrQueuesPurgeInProgress and errors.As pulls the +// typed RetryAfter duration. Without these the handler's branching +// in writeQueuesError silently falls through to 500. +func TestPurgeInProgressError_IsAndAs(t *testing.T) { + err := &PurgeInProgressError{RetryAfter: 30 * time.Second} + require.True(t, errors.Is(err, ErrQueuesPurgeInProgress)) + var typed *PurgeInProgressError + require.True(t, errors.As(err, &typed)) + require.Equal(t, 30*time.Second, typed.RetryAfter) +} + +// TestRole_AllowsRead pins the new gate. +func TestRole_AllowsRead(t *testing.T) { + require.True(t, RoleFull.AllowsRead()) + require.True(t, RoleReadOnly.AllowsRead()) + require.False(t, Role("").AllowsRead()) + require.False(t, Role("bogus").AllowsRead()) +} diff --git a/main_admin.go b/main_admin.go index 93ecd21e..71358c7f 100644 --- a/main_admin.go +++ b/main_admin.go @@ -214,7 +214,11 @@ func (b *sqsQueuesBridge) AdminListQueues(ctx context.Context) ([]string, error) } func (b *sqsQueuesBridge) AdminDescribeQueue(ctx context.Context, name string) (*admin.QueueSummary, bool, error) { - summary, exists, err := b.server.AdminDescribeQueue(ctx, name) + // Phase 4 wires IsDLQ / DLQSources: the SPA's Messages tab and + // Purge button need this flag to switch their framing, so the + // bridge opts into the catalog reverse-scan that Phase 2 gated + // behind AdminDescribeQueueOptions{IncludeDLQSources}. + summary, exists, err := b.server.AdminDescribeQueue(ctx, name, adapter.AdminDescribeQueueOptions{IncludeDLQSources: true}) if err != nil { return nil, false, translateAdminQueuesError(err) } @@ -231,6 +235,78 @@ func (b *sqsQueuesBridge) AdminDeleteQueue(ctx context.Context, principal admin. return nil } +func (b *sqsQueuesBridge) AdminPeekQueue(ctx context.Context, principal admin.AuthPrincipal, name string, opts admin.PeekMessageOptions) (admin.PeekResult, error) { + rows, nextCursor, err := b.server.AdminPeekQueue(ctx, convertAdminPrincipal(principal), name, adapter.AdminPeekMessageOptions{ + Limit: opts.Limit, + Cursor: opts.Cursor, + BodyMaxBytes: opts.BodyMaxBytes, + }) + if err != nil { + return admin.PeekResult{}, translateAdminQueuesError(err) + } + return admin.PeekResult{ + Messages: convertPeekedMessages(rows), + NextCursor: nextCursor, + }, nil +} + +func (b *sqsQueuesBridge) AdminPurgeQueue(ctx context.Context, principal admin.AuthPrincipal, name string) (admin.PurgeResult, error) { + res, err := b.server.AdminPurgeQueue(ctx, convertAdminPrincipal(principal), name) + if err != nil { + return admin.PurgeResult{}, translateAdminQueuesError(err) + } + return admin.PurgeResult{ + GenerationBefore: res.GenerationBefore, + GenerationAfter: res.GenerationAfter, + }, nil +} + +// convertPeekedMessages re-shapes the adapter's []AdminPeekedMessage +// into the admin package's []PeekedMessage. The two structs are +// deliberately isomorphic so this translation is a per-field copy; +// if either side grows a field, the build breaks here, which is the +// drift signal we want. +func convertPeekedMessages(in []adapter.AdminPeekedMessage) []admin.PeekedMessage { + if len(in) == 0 { + return nil + } + out := make([]admin.PeekedMessage, len(in)) + for i, m := range in { + out[i] = admin.PeekedMessage{ + MessageID: m.MessageID, + Body: m.Body, + BodyTruncated: m.BodyTruncated, + BodyOriginalSize: m.BodyOriginalSize, + SentTimestamp: m.SentTimestamp, + ReceiveCount: m.ReceiveCount, + GroupID: m.GroupID, + DeduplicationID: m.DeduplicationID, + Attributes: convertPeekedAttributes(m.Attributes), + } + } + return out +} + +// convertPeekedAttributes re-shapes the typed attribute map; both +// structs share the same field layout so a direct type conversion +// would suffice if the JSON tags matched, but they differ (adapter +// vs admin packages) so an explicit copy keeps the wire shape stable +// per package. +func convertPeekedAttributes(in map[string]adapter.AdminPeekedAttribute) map[string]admin.PeekedAttribute { + if len(in) == 0 { + return nil + } + out := make(map[string]admin.PeekedAttribute, len(in)) + for k, v := range in { + out[k] = admin.PeekedAttribute{ + DataType: v.DataType, + StringValue: v.StringValue, + BinaryValue: v.BinaryValue, + } + } + return out +} + // convertAdminQueueSummary mirrors adapter.AdminQueueSummary into // admin.QueueSummary. Counter fields are int64 on both sides; if // either side grows a field, this function should be extended in the @@ -261,6 +337,8 @@ func convertAdminQueueSummary(in *adapter.AdminQueueSummary) *admin.QueueSummary NotVisible: in.Counters.NotVisible, Delayed: in.Counters.Delayed, }, + IsDLQ: in.IsDLQ, + DLQSources: in.DLQSources, } } @@ -280,6 +358,19 @@ func translateAdminQueuesError(err error) error { return admin.ErrQueuesNotFound case errors.Is(err, adapter.ErrAdminSQSValidation): return admin.ErrQueuesValidation + case errors.Is(err, adapter.ErrAdminSQSPurgeInProgress): + // Pull the typed RetryAfter duration from the adapter's + // *PurgeInProgressError and rewrap as admin's typed error. + // Both errors.Is(err, ErrQueuesPurgeInProgress) and + // errors.As(err, &*admin.PurgeInProgressError) match on the + // admin side, so the handler can branch on the sentinel + // while still extracting RetryAfter for the Retry-After + // header. + var adapterErr *adapter.PurgeInProgressError + if errors.As(err, &adapterErr) { + return &admin.PurgeInProgressError{RetryAfter: adapterErr.RetryAfter} + } + return &admin.PurgeInProgressError{} case isLeaderChurnError(err): // Leadership can be lost between AdminDeleteQueue's upfront // isVerifiedSQSLeader check and the coordinator dispatch