diff --git a/infrastructure/terraform/components/callbacks/elasticache_delivery_state.tf b/infrastructure/terraform/components/callbacks/elasticache_delivery_state.tf index 58a675aa..6b5d3da1 100644 --- a/infrastructure/terraform/components/callbacks/elasticache_delivery_state.tf +++ b/infrastructure/terraform/components/callbacks/elasticache_delivery_state.tf @@ -1,11 +1,17 @@ +resource "random_password" "elasticache_default_user" { + length = 32 + special = false +} + resource "aws_elasticache_user" "delivery_state_default" { - user_id = "${local.csi}-delivery-state-default" + user_id = "${local.csi}-valkey-default" user_name = "default" engine = "valkey" access_string = "off -@all" authentication_mode { - type = "no-password-required" + type = "password" + passwords = [random_password.elasticache_default_user.result] } tags = local.default_tags diff --git a/infrastructure/terraform/components/callbacks/locals.tf b/infrastructure/terraform/components/callbacks/locals.tf index 64bd622c..d80b5b7e 100644 --- a/infrastructure/terraform/components/callbacks/locals.tf +++ b/infrastructure/terraform/components/callbacks/locals.tf @@ -21,7 +21,7 @@ locals { targets = [ for target in try(client.targets, []) : merge(target, { - invocationEndpoint = try(target.delivery.mtls.enabled, false) ? "https://${aws_lb.mock_webhook_mtls[0].dns_name}/${target.targetId}" : "http://${aws_lb.mock_webhook_mtls[0].dns_name}/${target.targetId}" + invocationEndpoint = "https://${aws_lb.mock_webhook_mtls[0].dns_name}/${target.targetId}" apiKey = merge(target.apiKey, { headerValue = random_password.mock_webhook_api_key[0].result }) }) ] diff --git a/infrastructure/terraform/components/callbacks/module_mock_webhook_alb_mtls.tf b/infrastructure/terraform/components/callbacks/module_mock_webhook_alb_mtls.tf index 7e7badf8..eb8b6776 100644 --- a/infrastructure/terraform/components/callbacks/module_mock_webhook_alb_mtls.tf +++ b/infrastructure/terraform/components/callbacks/module_mock_webhook_alb_mtls.tf @@ -19,18 +19,7 @@ resource "aws_vpc_security_group_ingress_rule" "mock_webhook_alb_https" { from_port = 443 to_port = 443 ip_protocol = "tcp" - description = "Allow HTTPS Client Lambda to reach mock webhook via mTLS" - tags = local.default_tags -} - -resource "aws_vpc_security_group_ingress_rule" "mock_webhook_alb_http" { - count = var.deploy_mock_clients ? 1 : 0 - security_group_id = aws_security_group.mock_webhook_alb[0].id - referenced_security_group_id = aws_security_group.https_client_lambda.id - from_port = 80 - to_port = 80 - ip_protocol = "tcp" - description = "Allow HTTPS Client Lambda to reach mock webhook without mTLS" + description = "Allow HTTPS Client Lambda to reach mock webhook (mTLS and non-mTLS)" tags = local.default_tags } @@ -102,17 +91,3 @@ resource "aws_lb_listener" "mock_webhook_mtls" { tags = local.default_tags } - -resource "aws_lb_listener" "mock_webhook_http" { - count = var.deploy_mock_clients ? 1 : 0 - load_balancer_arn = aws_lb.mock_webhook_mtls[0].arn - port = 80 - protocol = "HTTP" - - default_action { - type = "forward" - target_group_arn = aws_lb_target_group.mock_webhook_mtls[0].arn - } - - tags = local.default_tags -} diff --git a/lambdas/https-client-lambda/src/__tests__/dlq-sender.test.ts b/lambdas/https-client-lambda/src/__tests__/dlq-sender.test.ts index 21ae3700..692e41c9 100644 --- a/lambdas/https-client-lambda/src/__tests__/dlq-sender.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/dlq-sender.test.ts @@ -54,4 +54,81 @@ describe("sendToDlq", () => { process.env.DLQ_URL = saved; }); + + it("includes ERROR_CODE and ERROR_MESSAGE for HTTP error with JSON body", async () => { + mockSend.mockResolvedValue({}); + + await sendToDlq('{"test":"message"}', { + statusCode: 400, + responseBody: JSON.stringify({ message: "Bad request" }), + }); + + const command = mockSend.mock.calls[0][0]; + expect(command).toBeInstanceOf(SendMessageCommand); + expect(command.input.MessageAttributes).toEqual({ + ERROR_CODE: { DataType: "String", StringValue: "HTTP_CLIENT_ERROR" }, + ERROR_MESSAGE: { DataType: "String", StringValue: "Bad request" }, + }); + }); + + it("uses raw response body as ERROR_MESSAGE when not valid JSON", async () => { + mockSend.mockResolvedValue({}); + + await sendToDlq('{"test":"message"}', { + statusCode: 400, + responseBody: "Bad request", + }); + + const command = mockSend.mock.calls[0][0]; + expect(command.input.MessageAttributes).toEqual({ + ERROR_CODE: { DataType: "String", StringValue: "HTTP_CLIENT_ERROR" }, + ERROR_MESSAGE: { DataType: "String", StringValue: "Bad request" }, + }); + }); + + it("uses errorCode as ERROR_CODE when provided", async () => { + mockSend.mockResolvedValue({}); + + await sendToDlq('{"test":"message"}', { + errorCode: "CERT_HAS_EXPIRED", + }); + + const command = mockSend.mock.calls[0][0]; + expect(command.input.MessageAttributes).toEqual({ + ERROR_CODE: { DataType: "String", StringValue: "CERT_HAS_EXPIRED" }, + }); + }); + + it("sends empty MessageAttributes when errorInfo has no relevant fields", async () => { + mockSend.mockResolvedValue({}); + + await sendToDlq('{"test":"message"}', {}); + + const command = mockSend.mock.calls[0][0]; + expect(command.input.MessageAttributes).toEqual({}); + }); + + it("sends no MessageAttributes when errorInfo is omitted", async () => { + mockSend.mockResolvedValue({}); + + await sendToDlq('{"test":"message"}'); + + const command = mockSend.mock.calls[0][0]; + expect(command.input.MessageAttributes).toBeUndefined(); + }); + + it("uses JSON body message field when present in responseBody", async () => { + mockSend.mockResolvedValue({}); + + await sendToDlq('{"test":"message"}', { + statusCode: 422, + responseBody: JSON.stringify({ message: "Validation failed", code: 42 }), + }); + + const command = mockSend.mock.calls[0][0]; + expect(command.input.MessageAttributes?.ERROR_MESSAGE).toEqual({ + DataType: "String", + StringValue: "Validation failed", + }); + }); }); diff --git a/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts b/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts index 84984c71..efbc6d88 100644 --- a/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/endpoint-gate.test.ts @@ -98,14 +98,6 @@ describe("admit", () => { ); }); - it("propagates non-NOSCRIPT Redis errors", async () => { - mockSendCommand.mockRejectedValueOnce(new Error("Connection refused")); - - await expect( - admit(mockRedis, "target-1", 10, true, defaultConfig), - ).rejects.toThrow("Connection refused"); - }); - it("passes cbProbeIntervalMs=0 when circuit breaker is disabled", async () => { mockSendCommand.mockResolvedValueOnce([1, "allowed", 0, 10]); @@ -123,8 +115,46 @@ describe("admit", () => { await admit(mockRedis, "my-target", 5, true, defaultConfig); const args = mockSendCommand.mock.calls[0]![0] as string[]; - expect(args[3]).toBe("cb:my-target"); - expect(args[4]).toBe("rl:my-target"); + expect(args[3]).toBe("cb:{my-target}"); + expect(args[4]).toBe("rl:{my-target}"); + }); +}); + +describe("evalScript", () => { + it("throws a wrapped error including the original message when EVALSHA fails with a non-NOSCRIPT Error", async () => { + const redisError = new Error("WRONGTYPE Operation against a key"); + mockSendCommand.mockRejectedValueOnce(redisError); + + const thrown = await admit( + mockRedis, + "target-1", + 10, + true, + defaultConfig, + ).catch((error: unknown) => error); + + expect(thrown).toBeInstanceOf(Error); + expect((thrown as Error).message).toContain("Redis error in script"); + expect((thrown as Error).message).toContain( + "WRONGTYPE Operation against a key", + ); + expect((thrown as Error & { cause: unknown }).cause).toBe(redisError); + }); + + it("throws a wrapped error using String() when EVALSHA rejects with a non-Error value", async () => { + mockSendCommand.mockRejectedValueOnce("connection refused"); + + const thrown = await admit( + mockRedis, + "target-1", + 10, + true, + defaultConfig, + ).catch((error: unknown) => error); + + expect(thrown).toBeInstanceOf(Error); + expect((thrown as Error).message).toContain("Redis error in script"); + expect((thrown as Error).message).toContain("connection refused"); }); }); @@ -187,20 +217,12 @@ describe("recordResult", () => { expect(mockSendCommand).toHaveBeenCalledTimes(2); }); - it("propagates non-NOSCRIPT Redis errors", async () => { - mockSendCommand.mockRejectedValueOnce(new Error("Connection refused")); - - await expect( - recordResult(mockRedis, "target-1", false, defaultConfig), - ).rejects.toThrow("Connection refused"); - }); - it("passes correct cb key for target", async () => { mockSendCommand.mockResolvedValueOnce([1, "closed"]); await recordResult(mockRedis, "my-target", true, defaultConfig); const args = mockSendCommand.mock.calls[0]![0] as string[]; - expect(args[3]).toBe("cb:my-target"); + expect(args[3]).toBe("cb:{my-target}"); }); }); diff --git a/lambdas/https-client-lambda/src/__tests__/handler.test.ts b/lambdas/https-client-lambda/src/__tests__/handler.test.ts index a31cc61c..f6cbdb68 100644 --- a/lambdas/https-client-lambda/src/__tests__/handler.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/handler.test.ts @@ -142,7 +142,9 @@ describe("processRecords", () => { const failures = await processRecords([makeRecord()]); expect(failures).toEqual([]); - expect(mockSendToDlq).toHaveBeenCalledWith(makeRecord().body); + expect(mockSendToDlq).toHaveBeenCalledWith(makeRecord().body, { + outcome: "permanent_failure", + }); }); it("returns failure for transient 5xx errors", async () => { @@ -538,4 +540,21 @@ describe("processRecords", () => { 3_600_000, ); }); + + it("returns no failure when handleRateLimitedRecord resolves without throwing", async () => { + mockDeliverPayload.mockResolvedValue({ + outcome: "permanent_failure", + statusCode: 429, + retryAfterHeader: "60", + }); + mockHandleRateLimitedRecord.mockResolvedValueOnce(undefined); + + const failures = await processRecords([makeRecord()]); + + expect(failures).toEqual([]); + expect(mockIsWindowExhausted).toHaveBeenCalledWith( + expect.any(Number), + 7_200_000, + ); + }); }); diff --git a/lambdas/https-client-lambda/src/__tests__/https-client.test.ts b/lambdas/https-client-lambda/src/__tests__/https-client.test.ts index e1850567..a6229c57 100644 --- a/lambdas/https-client-lambda/src/__tests__/https-client.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/https-client.test.ts @@ -36,6 +36,7 @@ type MockResponse = EventEmitter & { function mockHttpsRequest( statusCode: number, headers: Record = {}, + body = "", ) { const mockReq = new EventEmitter() as EventEmitter & { end: jest.Mock; @@ -56,7 +57,13 @@ function mockHttpsRequest( }); if (callback) { - process.nextTick(() => callback(res)); + process.nextTick(() => { + callback(res); + process.nextTick(() => { + if (body) res.emit("data", Buffer.from(body)); + res.emit("end"); + }); + }); } return mockReq as unknown as ReturnType; @@ -125,7 +132,7 @@ describe("deliverPayload", () => { }); it("returns permanent_failure on 4xx non-429", async () => { - mockHttpsRequest(400); + mockHttpsRequest(400, {}, JSON.stringify({ message: "Bad request" })); const result = await deliverPayload( createTarget(), @@ -134,7 +141,11 @@ describe("deliverPayload", () => { createMockAgent(), ); - expect(result).toEqual({ outcome: "permanent_failure" }); + expect(result).toEqual({ + outcome: "permanent_failure", + statusCode: 400, + responseBody: JSON.stringify({ message: "Bad request" }), + }); }); it("returns permanent_failure on TLS error CERT_HAS_EXPIRED", async () => { @@ -147,7 +158,10 @@ describe("deliverPayload", () => { createMockAgent(), ); - expect(result).toEqual({ outcome: "permanent_failure" }); + expect(result).toEqual({ + outcome: "permanent_failure", + errorCode: "CERT_HAS_EXPIRED", + }); }); it("returns permanent_failure on TLS pinning error", async () => { @@ -160,7 +174,10 @@ describe("deliverPayload", () => { createMockAgent(), ); - expect(result).toEqual({ outcome: "permanent_failure" }); + expect(result).toEqual({ + outcome: "permanent_failure", + errorCode: "ERR_CERT_PINNING_FAILED", + }); }); it("returns transient_failure on 5xx", async () => { @@ -189,6 +206,7 @@ describe("deliverPayload", () => { expect(result).toEqual({ outcome: "rate_limited", retryAfterHeader: "60", + statusCode: 429, }); }); @@ -205,6 +223,7 @@ describe("deliverPayload", () => { expect(result).toEqual({ outcome: "rate_limited", retryAfterHeader: undefined, + statusCode: 429, }); }); @@ -287,4 +306,43 @@ describe("deliverPayload", () => { expect(result).toEqual({ outcome: "transient_failure", statusCode: 0 }); }); + + it("treats undefined statusCode as 0", async () => { + const mockReq = new EventEmitter() as EventEmitter & { + end: jest.Mock; + destroy: jest.Mock; + }; + mockReq.end = jest.fn(); + mockReq.destroy = jest.fn(); + + jest.spyOn(https, "request").mockImplementation((...args: unknown[]) => { + const callback = args.find((a) => typeof a === "function") as + | ((res: MockResponse) => void) + | undefined; + + const res = Object.assign(new EventEmitter(), { + statusCode: undefined as unknown as number, + headers: {}, + resume: jest.fn(), + }) as MockResponse; + + if (callback) { + process.nextTick(() => { + callback(res); + process.nextTick(() => (res as EventEmitter).emit("end")); + }); + } + + return mockReq as unknown as ReturnType; + }); + + const result = await deliverPayload( + createTarget(), + '{"test":true}', + "sig-abc", + createMockAgent(), + ); + + expect(result).toEqual({ outcome: "transient_failure", statusCode: 0 }); + }); }); diff --git a/lambdas/https-client-lambda/src/__tests__/tls-agent-factory.test.ts b/lambdas/https-client-lambda/src/__tests__/tls-agent-factory.test.ts index b2ca7877..fae8112f 100644 --- a/lambdas/https-client-lambda/src/__tests__/tls-agent-factory.test.ts +++ b/lambdas/https-client-lambda/src/__tests__/tls-agent-factory.test.ts @@ -129,6 +129,34 @@ describe("tls-agent-factory", () => { expect(mockSecretsManagerSend).not.toHaveBeenCalled(); }); + it("loads test CA for server trust when MTLS_TEST_CA_S3_KEY is set and mtls is disabled", async () => { + process.env.MTLS_TEST_CA_S3_KEY = "test-ca.pem"; + jest.resetModules(); + // @ts-expect-error -- modulePaths resolves at runtime + const mod = await import("services/delivery/tls-agent-factory"); + + const caPem = + "-----BEGIN CERTIFICATE-----\ntest-ca\n-----END CERTIFICATE-----"; + mockS3Send + .mockResolvedValueOnce({ + Body: { + transformToString: jest.fn().mockResolvedValue(COMBINED_PEM), + }, + }) + .mockResolvedValueOnce({ + Body: { transformToString: jest.fn().mockResolvedValue(caPem) }, + }); + + const agent = await mod.buildAgent( + createTarget({ delivery: { mtls: { enabled: false } } }), + ); + + expect(agent).toBeDefined(); + expect(agent.options.ca).toBe(caPem); + expect(agent.options.key).toBeUndefined(); + expect(agent.options.cert).toBeUndefined(); + }); + it("loads test CA when MTLS_TEST_CA_S3_KEY is set", async () => { process.env.MTLS_TEST_CA_S3_KEY = "test-ca.pem"; jest.resetModules(); diff --git a/lambdas/https-client-lambda/src/handler.ts b/lambdas/https-client-lambda/src/handler.ts index f552ea0d..48ee53cf 100644 --- a/lambdas/https-client-lambda/src/handler.ts +++ b/lambdas/https-client-lambda/src/handler.ts @@ -106,8 +106,13 @@ async function handleDeliveryResult( } if (result.outcome === OUTCOME_PERMANENT_FAILURE) { - recordDeliveryPermanentFailure(clientId, targetId); - await sendToDlq(record.body); + recordDeliveryPermanentFailure( + clientId, + targetId, + result.statusCode, + result.errorCode, + ); + await sendToDlq(record.body, result); return; } @@ -219,6 +224,10 @@ export async function processRecords( return null; } catch (error) { if (!(error instanceof VisibilityManagedError)) { + logger.error("Failed to process record", { + messageId: record.messageId, + err: error, + }); const receiveCount = Number( record.attributes.ApproximateReceiveCount, ); diff --git a/lambdas/https-client-lambda/src/services/delivery-observability.ts b/lambdas/https-client-lambda/src/services/delivery-observability.ts index 8fd4cea5..50dbb30e 100644 --- a/lambdas/https-client-lambda/src/services/delivery-observability.ts +++ b/lambdas/https-client-lambda/src/services/delivery-observability.ts @@ -31,11 +31,15 @@ export function recordDeliverySuccess( export function recordDeliveryPermanentFailure( clientId: string, targetId: string, + statusCode?: number, + errorCode?: string, ): void { emitDeliveryPermanentFailure(targetId); logger.warn("Permanent delivery failure — sending to DLQ", { clientId, targetId, + ...(statusCode !== undefined && { statusCode }), + ...(errorCode !== undefined && { errorCode }), }); } diff --git a/lambdas/https-client-lambda/src/services/delivery/https-client.ts b/lambdas/https-client-lambda/src/services/delivery/https-client.ts index 418d7563..dfe142f8 100644 --- a/lambdas/https-client-lambda/src/services/delivery/https-client.ts +++ b/lambdas/https-client-lambda/src/services/delivery/https-client.ts @@ -10,9 +10,15 @@ export const OUTCOME_TRANSIENT_FAILURE = "transient_failure" as const; export type DeliveryResult = | { outcome: typeof OUTCOME_SUCCESS } - | { outcome: typeof OUTCOME_PERMANENT_FAILURE } + | { + outcome: typeof OUTCOME_PERMANENT_FAILURE; + statusCode?: number; + errorCode?: string; + responseBody?: string; + } | { outcome: typeof OUTCOME_RATE_LIMITED; + statusCode: 429; retryAfterHeader: string | undefined; } | { outcome: typeof OUTCOME_TRANSIENT_FAILURE; statusCode: number }; @@ -41,29 +47,40 @@ export function deliverPayload( }, }, (res) => { - res.resume(); - const statusCode = res.statusCode ?? 0; if (statusCode >= 200 && statusCode < 300) { + res.resume(); resolve({ outcome: OUTCOME_SUCCESS }); return; } if (statusCode === 429) { + res.resume(); const retryAfterHeader = res.headers["retry-after"]; resolve({ outcome: OUTCOME_RATE_LIMITED, + statusCode, retryAfterHeader, }); return; } if (statusCode >= 400 && statusCode < 500) { - resolve({ outcome: OUTCOME_PERMANENT_FAILURE }); + const chunks: Buffer[] = []; + res.on("data", (chunk: Buffer) => chunks.push(chunk)); + res.on("end", () => { + const responseBody = Buffer.concat(chunks).toString("utf8"); + resolve({ + outcome: OUTCOME_PERMANENT_FAILURE, + statusCode, + responseBody, + }); + }); return; } + res.resume(); resolve({ outcome: OUTCOME_TRANSIENT_FAILURE, statusCode }); }, ); @@ -74,7 +91,7 @@ export function deliverPayload( req.on("error", (error: NodeJS.ErrnoException) => { if (error.code && PERMANENT_TLS_ERROR_CODES.has(error.code)) { - resolve({ outcome: OUTCOME_PERMANENT_FAILURE }); + resolve({ outcome: OUTCOME_PERMANENT_FAILURE, errorCode: error.code }); return; } diff --git a/lambdas/https-client-lambda/src/services/delivery/tls-agent-factory.ts b/lambdas/https-client-lambda/src/services/delivery/tls-agent-factory.ts index e6c0fcfa..fb1ea136 100644 --- a/lambdas/https-client-lambda/src/services/delivery/tls-agent-factory.ts +++ b/lambdas/https-client-lambda/src/services/delivery/tls-agent-factory.ts @@ -150,14 +150,20 @@ export async function buildAgent(target: CallbackTarget): Promise { ); } - if (target.delivery?.mtls?.enabled) { + // Always load the CA in test environments (MTLS_TEST_CA_S3_KEY set) so that + // targets with mtls.enabled: false can still verify the server's cert chain. + // In production the CA comes from SecretsManager only when mTLS is in use. + if (target.delivery?.mtls?.enabled || MTLS_TEST_CA_S3_KEY) { const material = await getMaterial(); - agentOptions.key = material.key; - agentOptions.cert = material.cert; if (material.ca) { agentOptions.ca = material.ca; } + + if (target.delivery?.mtls?.enabled) { + agentOptions.key = material.key; + agentOptions.cert = material.cert; + } } if (certPinning?.enabled) { diff --git a/lambdas/https-client-lambda/src/services/dlq-sender.ts b/lambdas/https-client-lambda/src/services/dlq-sender.ts index af61a666..56b92405 100644 --- a/lambdas/https-client-lambda/src/services/dlq-sender.ts +++ b/lambdas/https-client-lambda/src/services/dlq-sender.ts @@ -1,17 +1,70 @@ -import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs"; +import { + type MessageAttributeValue, + SQSClient, + SendMessageCommand, +} from "@aws-sdk/client-sqs"; const sqsClient = new SQSClient({}); -export async function sendToDlq(messageBody: string): Promise { +export type DlqErrorInfo = { + statusCode?: number; + errorCode?: string; + responseBody?: string; +}; + +function buildDlqAttributes( + errorInfo: DlqErrorInfo, +): Record { + const attrs: Record = {}; + + if (errorInfo.errorCode) { + attrs.ERROR_CODE = { + DataType: "String", + StringValue: errorInfo.errorCode, + }; + } else if (errorInfo.statusCode !== undefined) { + attrs.ERROR_CODE = { + DataType: "String", + StringValue: "HTTP_CLIENT_ERROR", + }; + } + + if (errorInfo.responseBody) { + let errorMessage = errorInfo.responseBody; + try { + const parsed = JSON.parse(errorInfo.responseBody) as { + message?: string; + }; + if (parsed.message) { + errorMessage = parsed.message; + } + } catch { + // use raw body if not valid JSON + } + attrs.ERROR_MESSAGE = { DataType: "String", StringValue: errorMessage }; + } + + return attrs; +} + +export async function sendToDlq( + messageBody: string, + errorInfo?: DlqErrorInfo, +): Promise { const { DLQ_URL } = process.env; if (!DLQ_URL) { throw new Error("DLQ_URL is required"); } + const messageAttributes = errorInfo + ? buildDlqAttributes(errorInfo) + : undefined; + await sqsClient.send( new SendMessageCommand({ QueueUrl: DLQ_URL, MessageBody: messageBody, + ...(messageAttributes && { MessageAttributes: messageAttributes }), }), ); } diff --git a/lambdas/https-client-lambda/src/services/endpoint-gate.ts b/lambdas/https-client-lambda/src/services/endpoint-gate.ts index 81a98290..c2d85439 100644 --- a/lambdas/https-client-lambda/src/services/endpoint-gate.ts +++ b/lambdas/https-client-lambda/src/services/endpoint-gate.ts @@ -60,7 +60,12 @@ async function evalScript( const isNoScript = error instanceof Error && error.message.includes("NOSCRIPT"); if (!isNoScript) { - throw error; + throw new Error( + `Redis error in script ${script}: ${ + error instanceof Error ? error.message : String(error) + }`, + { cause: error }, + ); } return client.sendCommand(["EVAL", script, keyCount, ...keys, ...args]); } @@ -73,8 +78,8 @@ export async function admit( cbEnabled: boolean, config: EndpointGateConfig, ): Promise { - const cbKey = `cb:${targetId}`; - const rlKey = `rl:${targetId}`; + const cbKey = `cb:{${targetId}}`; + const rlKey = `rl:{${targetId}}`; const now = Date.now().toString(); const probeIntervalMs = cbEnabled ? config.cbProbeIntervalMs.toString() : "0"; @@ -125,7 +130,7 @@ export async function recordResult( success: boolean, config: EndpointGateConfig, ): Promise { - const cbKey = `cb:${targetId}`; + const cbKey = `cb:{${targetId}}`; const now = Date.now().toString(); const args = [ diff --git a/lambdas/https-client-lambda/src/services/redis-client.ts b/lambdas/https-client-lambda/src/services/redis-client.ts index bfe9e29c..7d8785c8 100644 --- a/lambdas/https-client-lambda/src/services/redis-client.ts +++ b/lambdas/https-client-lambda/src/services/redis-client.ts @@ -34,20 +34,28 @@ async function generateElastiCacheIamToken(): Promise { { protocol: "https:", method: "GET", - hostname: endpoint, + hostname: cacheName, path: "/", query: { Action: "connect", User: username }, - headers: { host: endpoint }, + headers: { host: cacheName }, }, { expiresIn: TOKEN_EXPIRY_SECONDS }, ); tokenExpiry = Date.now() + TOKEN_EXPIRY_SECONDS * 1000; + logger.info("ElastiCache IAM token generated", { + cacheName, + username, + region, + signingAlgorithm: signed.query?.["X-Amz-Algorithm"], + tokenExpiresAt: new Date(tokenExpiry).toISOString(), + }); + const qs = new URLSearchParams( signed.query as Record, ).toString(); - return `https://${signed.hostname}${signed.path}?${qs}`; + return `${cacheName}/?${qs}`; } export async function getRedisClient(): Promise { @@ -55,6 +63,7 @@ export async function getRedisClient(): Promise { tokenExpiry > Date.now() + TOKEN_REFRESH_BUFFER_SECONDS * 1000; if (redisClient?.isOpen && isTokenValid) { + logger.info("Reusing existing Redis client"); return redisClient; } @@ -69,11 +78,14 @@ export async function getRedisClient(): Promise { } if (redisClient?.isOpen) { + logger.info("Disconnecting Redis client for token refresh"); await redisClient.disconnect(); } const token = await generateElastiCacheIamToken(); + logger.info("Connecting to ElastiCache", { endpoint, username }); + redisClient = createClient({ url: `rediss://${endpoint}:6379`, username, diff --git a/scripts/tests/integration-debug.sh b/scripts/tests/integration-debug.sh index ac9fb905..a4ebbd63 100755 --- a/scripts/tests/integration-debug.sh +++ b/scripts/tests/integration-debug.sh @@ -12,16 +12,20 @@ set -euo pipefail # Actions: # queue-status Show SQS queue message counts # queue-peek Peek one message from each SQS queue -# tail-transform Tail client-transform-filter lambda logs -# tail-webhook Tail mock-webhook lambda logs -# tail-pipe Tail EventBridge pipe log group -# pipe-state Show EventBridge pipe state and recent metrics +# tail-transform Tail client-transform-filter lambda logs +# tail-https-client Tail https-client lambda logs (requires CLIENT_ID) +# tail-webhook Tail mock-webhook lambda logs +# tail-pipe Tail EventBridge pipe log group +# pipe-state Show EventBridge pipe state and recent metrics # # Required: # ENVIRONMENT # AWS_PROFILE # ACTION # +# Required for queue-status, queue-peek: +# CLIENT_ID Client ID (e.g. mock-client-1) +# # Optional: # LOG_FILTER CloudWatch Logs filter pattern / text # AWS_REGION (default: eu-west-2) @@ -45,7 +49,7 @@ fi REGION="${AWS_REGION:-eu-west-2}" LOG_FILTER="${LOG_FILTER:-}" -SUBSCRIPTION_FIXTURE_PATH="${SUBSCRIPTION_FIXTURE_PATH:-tests/integration/fixtures/subscriptions/mock-client-1.json}" +CLIENT_ID="${CLIENT_ID:-}" if ! aws sts get-caller-identity --profile "$AWS_PROFILE" >/dev/null 2>&1; then echo "No active AWS SSO session for profile '$AWS_PROFILE'. Running aws sso login..." @@ -69,21 +73,12 @@ queue_url() { echo "https://sqs.${REGION}.amazonaws.com/${ACCOUNT_ID}/${queue_name}" } -target_dlq_queue_name() { - local target_id - - if [ ! -f "$SUBSCRIPTION_FIXTURE_PATH" ]; then - echo "Error: subscription fixture not found: $SUBSCRIPTION_FIXTURE_PATH" >&2 - exit 1 - fi - - target_id="$(jq -r '.targets[0].targetId // empty' "$SUBSCRIPTION_FIXTURE_PATH")" - if [ -z "$target_id" ]; then - echo "Error: unable to read targets[0].targetId from $SUBSCRIPTION_FIXTURE_PATH" >&2 +require_client_id() { + if [ -z "$CLIENT_ID" ]; then + echo "Error: CLIENT_ID must be set for this action." >&2 + echo "Example: CLIENT_ID=mock-client-1 ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=queue-status" >&2 exit 1 fi - - echo "${PREFIX}-${target_id}-dlq-queue" } show_queue_counts() { @@ -101,9 +96,11 @@ show_queue_counts() { } action_queue_status() { - show_queue_counts "Mock Target DLQ - Queue Message Counts" "$(target_dlq_queue_name)" - show_queue_counts "Inbound Event Queue - Queue Message Counts" "${PREFIX}-inbound-event-queue" - show_queue_counts "Inbound Event DLQ - Queue Message Counts" "${PREFIX}-inbound-event-dlq" + require_client_id + show_queue_counts "Client Delivery Queue - Message Counts" "${PREFIX}-${CLIENT_ID}-delivery-queue" + show_queue_counts "Client Delivery DLQ - Message Counts" "${PREFIX}-${CLIENT_ID}-delivery-dlq-queue" + show_queue_counts "Inbound Event Queue - Message Counts" "${PREFIX}-inbound-event-queue" + show_queue_counts "Inbound Event DLQ - Message Counts" "${PREFIX}-inbound-event-dlq" } peek_queue_message() { @@ -128,21 +125,19 @@ peek_queue_message() { } action_queue_peek() { - peek_queue_message "Mock Target DLQ - Message Peek" "$(target_dlq_queue_name)" + require_client_id + peek_queue_message "Client Delivery Queue - Message Peek" "${PREFIX}-${CLIENT_ID}-delivery-queue" + peek_queue_message "Client Delivery DLQ - Message Peek" "${PREFIX}-${CLIENT_ID}-delivery-dlq-queue" peek_queue_message "Inbound Event Queue - Message Peek" "${PREFIX}-inbound-event-queue" peek_queue_message "Inbound Event DLQ - Message Peek" "${PREFIX}-inbound-event-dlq" } log_filter_args() { - local -a args=() - local escaped_log_filter if [[ -n "$LOG_FILTER" ]]; then - escaped_log_filter="${LOG_FILTER//\"/\\\"}" + local escaped_log_filter="${LOG_FILTER//\"/\\\"}" # CloudWatch filter patterns treat quoted strings as exact phrases. - args+=(--filter-pattern "\"$escaped_log_filter\"") + printf '%s\n' --filter-pattern "\"$escaped_log_filter\"" fi - - printf '%s\n' "${args[@]}" } action_tail_transform() { @@ -160,6 +155,22 @@ action_tail_transform() { "${filter_args[@]}" } +action_tail_https_client() { + require_client_id + local -a filter_args=() + mapfile -t filter_args < <(log_filter_args) + + print_section "HTTPS Client Lambda Logs" + aws logs tail \ + "/aws/lambda/${PREFIX}-https-client-${CLIENT_ID}" \ + --region "$REGION" \ + --profile "$AWS_PROFILE" \ + --since 30m \ + --follow \ + --format short \ + "${filter_args[@]}" +} + action_tail_webhook() { local -a filter_args=() mapfile -t filter_args < <(log_filter_args) @@ -266,6 +277,9 @@ case "$ACTION" in tail-transform) action_tail_transform ;; + tail-https-client) + action_tail_https_client + ;; tail-webhook) action_tail_webhook ;; @@ -277,7 +291,7 @@ case "$ACTION" in ;; *) echo "Unknown action: $ACTION" >&2 - echo "Actions: queue-status, queue-peek, tail-transform, tail-webhook, tail-pipe, pipe-state" >&2 + echo "Actions: queue-status, queue-peek, tail-transform, tail-https-client, tail-webhook, tail-pipe, pipe-state" >&2 exit 1 ;; esac diff --git a/scripts/tests/test.mk b/scripts/tests/test.mk index a94a5af0..2bb70740 100644 --- a/scripts/tests/test.mk +++ b/scripts/tests/test.mk @@ -38,7 +38,7 @@ test-integration-local: # Run integration tests locally against a remoptely depl test-integration-debug: # Debug a live environment - inspect queues, tail logs, check pipe state (requires ENVIRONMENT, AWS_PROFILE, ACTION) @Testing make _test name="integration-debug" ACTION="$(or $(ACTION),$(word 2,$(MAKECMDGOALS)))" -queue-status queue-peek tail-transform tail-webhook tail-pipe pipe-state: +queue-status queue-peek tail-transform tail-https-client tail-webhook tail-pipe pipe-state: @: test-load: # Run all your load tests @Testing diff --git a/tests/integration/README.md b/tests/integration/README.md index a58531b8..0a76bf74 100644 --- a/tests/integration/README.md +++ b/tests/integration/README.md @@ -50,30 +50,33 @@ All are run via `make test-integration-debug ACTION=`. - [`queue-status`](#queue-status) – SQS queue message counts - [`queue-peek`](#queue-peek) – Peek at one message from each SQS queue - [`tail-transform`](#tail-transform) – Tail the transform/filter Lambda logs +- [`tail-https-client`](#tail-https-client) – Tail the https-client Lambda logs - [`tail-webhook`](#tail-webhook) – Tail the mock-webhook Lambda logs - [`tail-pipe`](#tail-pipe) – Tail the EventBridge pipe logs - [`pipe-state`](#pipe-state) – Show EventBridge pipe state and recent metrics -All log-tailing actions (`tail-transform`, `tail-webhook`, `tail-pipe`) accept an optional `LOG_FILTER` to narrow output to a specific message ID or pattern. +Some actions require `CLIENT_ID` (e.g. `mock-client-single-target`) — see individual actions below. + +All log-tailing actions (`tail-transform`, `tail-https-client`, `tail-webhook`, `tail-pipe`) accept an optional `LOG_FILTER` to narrow output to a specific message ID or pattern. --- ### `queue-status` -Shows approximate message counts for the inbound event queue, inbound event DLQ, and mock target DLQ. +Shows approximate message counts for the inbound event queue, inbound event DLQ, client delivery queue, and client delivery DLQ. Requires `CLIENT_ID`. ```sh -ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=queue-status +CLIENT_ID= ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=queue-status ``` --- ### `queue-peek` -Reads one message (without deleting it) from each of the same three queues, printing body, attributes, and message attributes. +Reads one message (without deleting it) from each of the same four queues, printing body, attributes, and message attributes. Requires `CLIENT_ID`. ```sh -ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=queue-peek +CLIENT_ID= ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=queue-peek ``` --- @@ -94,6 +97,22 @@ ENVIRONMENT= AWS_PROFILE= LOG_FILTER=SOME-MESSAGE-ID make test-int --- +### `tail-https-client` + +Tails CloudWatch logs for the `https-client` Lambda for the given client, following from the last 30 minutes. Requires `CLIENT_ID`. + +```sh +CLIENT_ID= ENVIRONMENT= AWS_PROFILE= make test-integration-debug ACTION=tail-https-client +``` + +Filter to a specific message ID: + +```sh +CLIENT_ID= ENVIRONMENT= AWS_PROFILE= LOG_FILTER=SOME-MESSAGE-ID make test-integration-debug ACTION=tail-https-client +``` + +--- + ### `tail-webhook` Tails CloudWatch logs for the `mock-webhook` Lambda, following from the last 30 minutes. diff --git a/tests/integration/dlq-alarms.test.ts b/tests/integration/dlq-alarms.test.ts index 1cf3a578..c4f69fa8 100644 --- a/tests/integration/dlq-alarms.test.ts +++ b/tests/integration/dlq-alarms.test.ts @@ -5,14 +5,18 @@ import { } from "@aws-sdk/client-cloudwatch"; import type { DeploymentDetails } from "@nhs-notify-client-callbacks/test-support/helpers"; import { getDeploymentDetails } from "@nhs-notify-client-callbacks/test-support/helpers"; -import { getAllSubscriptionTargetIds } from "./helpers/mock-client-config"; +import { + CLIENT_FIXTURES, + type ClientFixtureKey, + getClientConfig, +} from "./helpers/mock-client-config"; import { buildMockClientDlqQueueUrl } from "./helpers/sqs"; function buildDlqDepthAlarmName( { component, environment, project }: DeploymentDetails, - targetId: string, + clientId: string, ): string { - return `${project}-${environment}-${component}-${targetId}-dlq-depth`; + return `${project}-${environment}-${component}-${clientId}-dlq-depth`; } function getQueueNameFromUrl(queueUrl: string): string { @@ -27,7 +31,7 @@ function getQueueNameFromUrl(queueUrl: string): string { describe("DLQ alarms", () => { let cloudWatchClient: CloudWatchClient; let deploymentDetails: DeploymentDetails; - let targetIds: string[]; + let clientIds: string[]; beforeAll(() => { deploymentDetails = getDeploymentDetails(); @@ -35,22 +39,25 @@ describe("DLQ alarms", () => { region: deploymentDetails.region, }); - targetIds = getAllSubscriptionTargetIds(); + clientIds = (Object.keys(CLIENT_FIXTURES) as ClientFixtureKey[]).map( + (key) => getClientConfig(key).clientId, + ); }); afterAll(() => { cloudWatchClient.destroy(); }); - it("should create a DLQ depth alarm for every target DLQ", async () => { - expect(targetIds.length).toBeGreaterThan(0); + it("should create a DLQ depth alarm for every client DLQ", async () => { + expect(clientIds.length).toBeGreaterThan(0); - for (const targetId of targetIds) { - const alarmName = buildDlqDepthAlarmName(deploymentDetails, targetId); - const targetDlqQueueUrl = buildMockClientDlqQueueUrl(deploymentDetails, [ - { targetId }, - ]); - const targetDlqQueueName = getQueueNameFromUrl(targetDlqQueueUrl); + for (const clientId of clientIds) { + const alarmName = buildDlqDepthAlarmName(deploymentDetails, clientId); + const clientDlqQueueUrl = buildMockClientDlqQueueUrl( + deploymentDetails, + clientId, + ); + const clientDlqQueueName = getQueueNameFromUrl(clientDlqQueueUrl); const response = await cloudWatchClient.send( new DescribeAlarmsCommand({ AlarmNames: [alarmName], @@ -67,7 +74,7 @@ describe("DLQ alarms", () => { expect.arrayContaining([ expect.objectContaining({ Name: "QueueName", - Value: targetDlqQueueName, + Value: clientDlqQueueName, }), ]), ); diff --git a/tests/integration/dlq-redrive.test.ts b/tests/integration/dlq-redrive.test.ts index e88e4920..639eadc4 100644 --- a/tests/integration/dlq-redrive.test.ts +++ b/tests/integration/dlq-redrive.test.ts @@ -19,8 +19,10 @@ import { sendSqsEvent, } from "./helpers/sqs"; import { + CLIENT_FIXTURES, + type ClientFixtureKey, buildMockWebhookTargetPath, - getAllSubscriptionTargetIds, + getClientConfig, getMockItClientConfig, } from "./helpers/mock-client-config"; import { awaitSignedCallbacksFromWebhookLogGroup } from "./helpers/cloudwatch"; @@ -37,20 +39,20 @@ describe("DLQ Redrive", () => { beforeAll(async () => { const deploymentDetails = getDeploymentDetails(); - const mockClient1 = getMockItClientConfig(); - - const allSubscriptionTargetIds = getAllSubscriptionTargetIds(); + const { clientId } = getMockItClientConfig(); sqsClient = createSqsClient(deploymentDetails); cloudWatchClient = createCloudWatchLogsClient(deploymentDetails); inboundQueueUrl = buildInboundEventQueueUrl(deploymentDetails); - dlqQueueUrl = buildMockClientDlqQueueUrl( - deploymentDetails, - mockClient1.targets, - ); - allTargetDlqQueueUrls = allSubscriptionTargetIds.map((targetId) => - buildMockClientDlqQueueUrl(deploymentDetails, [{ targetId }]), + dlqQueueUrl = buildMockClientDlqQueueUrl(deploymentDetails, clientId); + allTargetDlqQueueUrls = ( + Object.keys(CLIENT_FIXTURES) as ClientFixtureKey[] + ).map((key) => + buildMockClientDlqQueueUrl( + deploymentDetails, + getClientConfig(key).clientId, + ), ); webhookLogGroupName = buildLambdaLogGroupName( deploymentDetails, @@ -67,7 +69,7 @@ describe("DLQ Redrive", () => { }); describe("Infrastructure validation", () => { - it("should confirm a target DLQ is accessible for all configured subscription targets", async () => { + it("should confirm a DLQ is accessible for all configured clients", async () => { const responses = await Promise.all( allTargetDlqQueueUrls.map((queueUrl) => sqsClient.send( diff --git a/tests/integration/helpers/mock-client-config.ts b/tests/integration/helpers/mock-client-config.ts index a004b4bc..950e699d 100644 --- a/tests/integration/helpers/mock-client-config.ts +++ b/tests/integration/helpers/mock-client-config.ts @@ -29,14 +29,6 @@ export const CLIENT_FIXTURES = { export type ClientFixtureKey = keyof typeof CLIENT_FIXTURES; -const ALL_CLIENT_FIXTURE_KEYS = Object.keys( - CLIENT_FIXTURES, -) as ClientFixtureKey[]; - -function dedupe(values: string[]): string[] { - return [...new Set(values)]; -} - export function getClientConfig(key: ClientFixtureKey): MockItClientConfig { // eslint-disable-next-line security/detect-object-injection -- key is constrained to ClientFixtureKey, a keyof the hardcoded as-const CLIENT_FIXTURES object const { apiKeyVar, applicationIdVar, fixture } = CLIENT_FIXTURES[key]; @@ -82,18 +74,3 @@ export function buildMockWebhookTargetPaths( ): string[] { return buildWebhookTargetPaths(key); } - -export function getSubscriptionTargetIds( - key: ClientFixtureKey = "client1", -): string[] { - const config = getClientConfig(key); - return dedupe( - config.subscriptions.flatMap((subscription) => subscription.targetIds), - ); -} - -export function getAllSubscriptionTargetIds( - keys: ClientFixtureKey[] = ALL_CLIENT_FIXTURE_KEYS, -): string[] { - return dedupe(keys.flatMap((key) => getSubscriptionTargetIds(key))); -} diff --git a/tests/integration/helpers/sqs.ts b/tests/integration/helpers/sqs.ts index 857fd3a7..747f746b 100644 --- a/tests/integration/helpers/sqs.ts +++ b/tests/integration/helpers/sqs.ts @@ -46,13 +46,16 @@ function buildQueueUrl( export function buildMockClientDlqQueueUrl( deploymentDetails: DeploymentDetails, - targets: { targetId: string }[], + clientId: string, ): string { - const [firstTarget] = targets; - if (!firstTarget) { - throw new Error("At least one target is required to build DLQ URL"); - } - return buildQueueUrl(deploymentDetails, `${firstTarget.targetId}-dlq`); + return buildQueueUrl(deploymentDetails, `${clientId}-delivery-dlq`); +} + +export function buildMockClientDeliveryQueueUrl( + deploymentDetails: DeploymentDetails, + clientId: string, +): string { + return buildQueueUrl(deploymentDetails, `${clientId}-delivery`); } export async function sendSqsEvent( diff --git a/tests/integration/inbound-sqs-to-webhook.test.ts b/tests/integration/inbound-sqs-to-webhook.test.ts index 4305f05e..d75ad402 100644 --- a/tests/integration/inbound-sqs-to-webhook.test.ts +++ b/tests/integration/inbound-sqs-to-webhook.test.ts @@ -28,6 +28,7 @@ import { assertCallbackHeaders } from "./helpers/signature"; import { awaitQueueMessage, awaitQueueMessageByMessageId, + buildMockClientDeliveryQueueUrl, buildMockClientDlqQueueUrl, ensureInboundQueueIsEmpty, purgeQueues, @@ -49,6 +50,7 @@ describe("SQS to Webhook Integration", () => { let cloudWatchClient: CloudWatchLogsClient; let callbackEventQueueUrl: string; let clientDlqQueueUrl: string; + let clientDeliveryQueueUrl: string; let inboundEventDlqQueueUrl: string; let webhookLogGroupName: string; let webhookTargetPath: string; @@ -56,12 +58,16 @@ describe("SQS to Webhook Integration", () => { beforeAll(async () => { const deploymentDetails = getDeploymentDetails(); - const { targets } = getMockItClientConfig(); + const { clientId } = getMockItClientConfig(); sqsClient = createSqsClient(deploymentDetails); cloudWatchClient = createCloudWatchLogsClient(deploymentDetails); callbackEventQueueUrl = buildInboundEventQueueUrl(deploymentDetails); - clientDlqQueueUrl = buildMockClientDlqQueueUrl(deploymentDetails, targets); + clientDlqQueueUrl = buildMockClientDlqQueueUrl(deploymentDetails, clientId); + clientDeliveryQueueUrl = buildMockClientDeliveryQueueUrl( + deploymentDetails, + clientId, + ); inboundEventDlqQueueUrl = buildInboundEventDlqQueueUrl(deploymentDetails); webhookLogGroupName = buildLambdaLogGroupName( deploymentDetails, @@ -72,6 +78,7 @@ describe("SQS to Webhook Integration", () => { await purgeQueues(sqsClient, [ inboundEventDlqQueueUrl, clientDlqQueueUrl, + clientDeliveryQueueUrl, callbackEventQueueUrl, ]); }); @@ -80,6 +87,7 @@ describe("SQS to Webhook Integration", () => { await purgeQueues(sqsClient, [ inboundEventDlqQueueUrl, clientDlqQueueUrl, + clientDeliveryQueueUrl, callbackEventQueueUrl, ]); @@ -195,7 +203,7 @@ describe("SQS to Webhook Integration", () => { }); describe("Client Webhook DLQ", () => { - it("should route a non-retriable (4xx) webhook response to the per-target DLQ", async () => { + it("should route a non-retriable (4xx) webhook response to the per-client DLQ", async () => { const event: StatusPublishEvent = createMessageStatusPublishEvent({ data: { @@ -209,7 +217,7 @@ describe("SQS to Webhook Integration", () => { expect(dlqMessage.Body).toBeDefined(); expect(dlqMessage.MessageAttributes?.ERROR_CODE?.StringValue).toBe( - "INVALID_PARAMETER", + "HTTP_CLIENT_ERROR", ); expect( dlqMessage.MessageAttributes?.ERROR_MESSAGE?.StringValue, diff --git a/tests/integration/metrics.test.ts b/tests/integration/metrics.test.ts index 2f314f85..f40eba69 100644 --- a/tests/integration/metrics.test.ts +++ b/tests/integration/metrics.test.ts @@ -40,12 +40,12 @@ describe("Metrics", () => { beforeAll(async () => { const deploymentDetails = getDeploymentDetails(); - const { targets } = getMockItClientConfig(); + const { clientId } = getMockItClientConfig(); sqsClient = createSqsClient(deploymentDetails); cloudWatchClient = createCloudWatchLogsClient(deploymentDetails); callbackEventQueueUrl = buildInboundEventQueueUrl(deploymentDetails); - clientDlqQueueUrl = buildMockClientDlqQueueUrl(deploymentDetails, targets); + clientDlqQueueUrl = buildMockClientDlqQueueUrl(deploymentDetails, clientId); inboundEventDlqQueueUrl = buildInboundEventDlqQueueUrl(deploymentDetails); logGroupName = buildLambdaLogGroupName( deploymentDetails,