From 1de4b1946fe44909550dfc77b8c9f844de3ab179 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 11:23:39 +0200 Subject: [PATCH 01/10] Gently decline subsequent POST /invoke request while the Lambda handler processes a request --- Examples/Streaming/README.md | 2 +- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 47 +++++++++++++++---- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/Examples/Streaming/README.md b/Examples/Streaming/README.md index 2c40df57..663ac14c 100644 --- a/Examples/Streaming/README.md +++ b/Examples/Streaming/README.md @@ -82,7 +82,7 @@ You can test the function locally before deploying: swift run # In another terminal, test with curl: -curl -v \ +curl -v --output response.txt \ --header "Content-Type: application/json" \ --data '"this is not used"' \ http://127.0.0.1:7000/invoke diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index c3fa2e5d..dcbd88b7 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -95,8 +95,8 @@ extension Lambda { internal struct LambdaHTTPServer { private let invocationEndpoint: String - private let invocationPool = Pool() - private let responsePool = Pool() + private let invocationPool = Pool(name: "Invocation Pool") + private let responsePool = Pool(name: "Response Pool") private init( invocationEndpoint: String? @@ -388,8 +388,21 @@ internal struct LambdaHTTPServer { // we always accept the /invoke request and push them to the pool let requestId = "\(DispatchTime.now().uptimeNanoseconds)" logger[metadataKey: "requestId"] = "\(requestId)" + logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") - await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) + // detect concurrent invocations of POST and gently decline the requests while we're processing one. + if await !self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) { + let response = LocalServerResponse( + id: requestId, + status: .badRequest, + body: ByteBuffer( + string: + "It's illegal to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)" + ) + ) + try await self.sendResponse(response, outbound: outbound, logger: logger) + return + } // wait for the lambda function to process the request for try await response in self.responsePool { @@ -410,7 +423,12 @@ internal struct LambdaHTTPServer { "Received response for a different request id", metadata: ["response requestId": "\(response.requestId ?? "")"] ) - // should we return an error here ? Or crash as this is probably a programming error? + let response = LocalServerResponse( + id: requestId, + status: .badRequest, + body: ByteBuffer(string: "The response Id not equal to the request Id.") + ) + try await self.sendResponse(response, outbound: outbound, logger: logger) } } // What todo when there is no more responses to process? @@ -548,6 +566,9 @@ internal struct LambdaHTTPServer { /// This data structure is shared between instances of the HTTPHandler /// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function). internal final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { + private let poolName: String + internal init(name: String) { self.poolName = name } + typealias Element = T enum State: ~Copyable { @@ -558,8 +579,11 @@ internal struct LambdaHTTPServer { private let lock = Mutex(.buffer([])) /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element - public func push(_ invocation: T) async { - // if the iterator is waiting for an element, give it to it + /// Returns true when we receive a element and the pool was in "waiting for continuation" state, false otherwise + @discardableResult + public func push(_ invocation: T) async -> Bool { + + // if the iterator is waiting for an element on `next()``, give it to it // otherwise, enqueue the element let maybeContinuation = self.lock.withLock { state -> CheckedContinuation? in switch consume state { @@ -574,7 +598,12 @@ internal struct LambdaHTTPServer { } } - maybeContinuation?.resume(returning: invocation) + if let maybeContinuation { + maybeContinuation.resume(returning: invocation) + return true + } else { + return false + } } func next() async throws -> T? { @@ -596,8 +625,8 @@ internal struct LambdaHTTPServer { return nil } - case .continuation: - fatalError("Concurrent invocations to next(). This is illegal.") + case .continuation(_): + fatalError("\(self.poolName) : Concurrent invocations to next(). This is illegal.") } } From f35685c05c8090ff81d64f09f197d476cbc7bd46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Tue, 14 Oct 2025 11:33:21 +0200 Subject: [PATCH 02/10] Update Sources/AWSLambdaRuntime/Lambda+LocalServer.swift Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index dcbd88b7..46327984 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -397,7 +397,7 @@ internal struct LambdaHTTPServer { status: .badRequest, body: ByteBuffer( string: - "It's illegal to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)" + "It is not allowed to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)" ) ) try await self.sendResponse(response, outbound: outbound, logger: logger) From 8aa9a46da61fb02c4cdf9161a332b50b7dbaccc3 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 11:36:17 +0200 Subject: [PATCH 03/10] fix typo and language in comments --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 46327984..ec17742b 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -420,13 +420,13 @@ internal struct LambdaHTTPServer { } } else { logger.error( - "Received response for a different request id", + "Received response for a different requestId", metadata: ["response requestId": "\(response.requestId ?? "")"] ) let response = LocalServerResponse( id: requestId, status: .badRequest, - body: ByteBuffer(string: "The response Id not equal to the request Id.") + body: ByteBuffer(string: "The responseId is not equal to the requestId.") ) try await self.sendResponse(response, outbound: outbound, logger: logger) } @@ -626,7 +626,7 @@ internal struct LambdaHTTPServer { } case .continuation(_): - fatalError("\(self.poolName) : Concurrent invocations to next(). This is illegal.") + fatalError("\(self.poolName) : Concurrent invocations to next(). This is not allowed.") } } From 6d028481b333fda952d29d8856d6e46e036dff86 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 11:39:44 +0200 Subject: [PATCH 04/10] fix test --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index ec17742b..e4a36036 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -567,7 +567,7 @@ internal struct LambdaHTTPServer { /// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function). internal final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { private let poolName: String - internal init(name: String) { self.poolName = name } + internal init(name: String = "Pool") { self.poolName = name } typealias Element = T From 0566ef5d54253ce430b829652d6bbf15847c072f Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 13:32:42 +0200 Subject: [PATCH 05/10] remove async constraint on `push()` --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 14 +++++++------- Tests/AWSLambdaRuntimeTests/PoolTests.swift | 14 ++++++++------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index e4a36036..c7687c63 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -272,7 +272,7 @@ internal struct LambdaHTTPServer { // for streaming requests, push a partial head response if self.isStreamingResponse(requestHead) { - await self.responsePool.push( + self.responsePool.push( LocalServerResponse( id: requestId, status: .ok @@ -286,7 +286,7 @@ internal struct LambdaHTTPServer { // if this is a request from a Streaming Lambda Handler, // stream the response instead of buffering it if self.isStreamingResponse(requestHead) { - await self.responsePool.push( + self.responsePool.push( LocalServerResponse(id: requestId, body: body) ) } else { @@ -298,7 +298,7 @@ internal struct LambdaHTTPServer { if self.isStreamingResponse(requestHead) { // for streaming response, send the final response - await self.responsePool.push( + self.responsePool.push( LocalServerResponse(id: requestId, final: true) ) } else { @@ -391,7 +391,7 @@ internal struct LambdaHTTPServer { logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") // detect concurrent invocations of POST and gently decline the requests while we're processing one. - if await !self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) { + if !self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) { let response = LocalServerResponse( id: requestId, status: .badRequest, @@ -475,7 +475,7 @@ internal struct LambdaHTTPServer { } // enqueue the lambda function response to be served as response to the client /invoke logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"]) - await self.responsePool.push( + self.responsePool.push( LocalServerResponse( id: requestId, status: .accepted, @@ -506,7 +506,7 @@ internal struct LambdaHTTPServer { } // enqueue the lambda function response to be served as response to the client /invoke logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"]) - await self.responsePool.push( + self.responsePool.push( LocalServerResponse( id: requestId, status: .internalServerError, @@ -581,7 +581,7 @@ internal struct LambdaHTTPServer { /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element /// Returns true when we receive a element and the pool was in "waiting for continuation" state, false otherwise @discardableResult - public func push(_ invocation: T) async -> Bool { + public func push(_ invocation: T) -> Bool { // if the iterator is waiting for an element on `next()``, give it to it // otherwise, enqueue the element diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index 8cbe8a2e..84be4178 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -24,8 +24,8 @@ struct PoolTests { let pool = LambdaHTTPServer.Pool() // Push values - await pool.push("first") - await pool.push("second") + pool.push("first") + pool.push("second") // Iterate and verify order var values = [String]() @@ -53,7 +53,9 @@ struct PoolTests { task.cancel() // This should complete without receiving any values - try await task.value + do { + try await task.value + } catch is CancellationError {} // this might happen depending on the order on which the cancellation is handled } @Test @@ -78,7 +80,7 @@ struct PoolTests { try await withThrowingTaskGroup(of: Void.self) { group in for i in 0.. Date: Tue, 14 Oct 2025 13:42:33 +0200 Subject: [PATCH 06/10] swift-format --- Tests/AWSLambdaRuntimeTests/PoolTests.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index 84be4178..2e042914 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -55,7 +55,9 @@ struct PoolTests { // This should complete without receiving any values do { try await task.value - } catch is CancellationError {} // this might happen depending on the order on which the cancellation is handled + } catch is CancellationError { + // this might happen depending on the order on which the cancellation is handled + } } @Test From d73dce678a8f155ea4f760b49d0eefae7395c805 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 14:25:21 +0200 Subject: [PATCH 07/10] remove the fatal error to make testinge easier --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 117 ++++++++++-------- Tests/AWSLambdaRuntimeTests/PoolTests.swift | 28 +++++ 2 files changed, 94 insertions(+), 51 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index c7687c63..0b661ad7 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -391,7 +391,42 @@ internal struct LambdaHTTPServer { logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") // detect concurrent invocations of POST and gently decline the requests while we're processing one. - if !self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) { + self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) + + // wait for the lambda function to process the request + // when POST /invoke is called multiple times before a response is process, the + // `for try await ... in` loop will throw an error and we will return a 400 error to the client + do { + for try await response in self.responsePool { + logger[metadataKey: "response requestId"] = "\(response.requestId ?? "nil")" + logger.trace("Received response to return to client") + if response.requestId == requestId { + logger.trace("/invoke requestId is valid, sending the response") + // send the response to the client + // if the response is final, we can send it and return + // if the response is not final, we can send it and wait for the next response + try await self.sendResponse(response, outbound: outbound, logger: logger) + if response.final == true { + logger.trace("/invoke returning") + return // if the response is final, we can return and close the connection + } + } else { + logger.error( + "Received response for a different requestId", + metadata: ["response requestId": "\(response.requestId ?? "")"] + ) + let response = LocalServerResponse( + id: requestId, + status: .badRequest, + body: ByteBuffer(string: "The responseId is not equal to the requestId.") + ) + try await self.sendResponse(response, outbound: outbound, logger: logger) + } + } + // What todo when there is no more responses to process? + // This should not happen as the async iterator blocks until there is a response to process + fatalError("No more responses to process - the async for loop should not return") + } catch is LambdaHTTPServer.Pool.PoolError { let response = LocalServerResponse( id: requestId, status: .badRequest, @@ -401,39 +436,7 @@ internal struct LambdaHTTPServer { ) ) try await self.sendResponse(response, outbound: outbound, logger: logger) - return - } - - // wait for the lambda function to process the request - for try await response in self.responsePool { - logger[metadataKey: "response requestId"] = "\(response.requestId ?? "nil")" - logger.trace("Received response to return to client") - if response.requestId == requestId { - logger.trace("/invoke requestId is valid, sending the response") - // send the response to the client - // if the response is final, we can send it and return - // if the response is not final, we can send it and wait for the next response - try await self.sendResponse(response, outbound: outbound, logger: logger) - if response.final == true { - logger.trace("/invoke returning") - return // if the response is final, we can return and close the connection - } - } else { - logger.error( - "Received response for a different requestId", - metadata: ["response requestId": "\(response.requestId ?? "")"] - ) - let response = LocalServerResponse( - id: requestId, - status: .badRequest, - body: ByteBuffer(string: "The responseId is not equal to the requestId.") - ) - try await self.sendResponse(response, outbound: outbound, logger: logger) - } } - // What todo when there is no more responses to process? - // This should not happen as the async iterator blocks until there is a response to process - fatalError("No more responses to process - the async for loop should not return") // client uses incorrect HTTP method case (_, let url) where url.hasSuffix(self.invocationEndpoint): @@ -579,9 +582,7 @@ internal struct LambdaHTTPServer { private let lock = Mutex(.buffer([])) /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element - /// Returns true when we receive a element and the pool was in "waiting for continuation" state, false otherwise - @discardableResult - public func push(_ invocation: T) -> Bool { + public func push(_ invocation: T) { // if the iterator is waiting for an element on `next()``, give it to it // otherwise, enqueue the element @@ -598,12 +599,7 @@ internal struct LambdaHTTPServer { } } - if let maybeContinuation { - maybeContinuation.resume(returning: invocation) - return true - } else { - return false - } + maybeContinuation?.resume(returning: invocation) } func next() async throws -> T? { @@ -614,25 +610,30 @@ internal struct LambdaHTTPServer { return try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let nextAction = self.lock.withLock { state -> T? in + let (nextAction, nextError) = self.lock.withLock { state -> (T?, PoolError?) in switch consume state { case .buffer(var buffer): if let first = buffer.popFirst() { state = .buffer(buffer) - return first + return (first, nil) } else { state = .continuation(continuation) - return nil + return (nil, nil) } - case .continuation(_): - fatalError("\(self.poolName) : Concurrent invocations to next(). This is not allowed.") + case .continuation(let previousContinuation): + state = .buffer([]) + return (nil, PoolError(cause: .nextCalledTwice([previousContinuation, continuation]))) } } - guard let nextAction else { return } - - continuation.resume(returning: nextAction) + if let nextError, + case let .nextCalledTwice(continuations) = nextError.cause + { + for continuation in continuations { continuation?.resume(throwing: nextError) } + } else if let nextAction { + continuation.resume(returning: nextAction) + } } } onCancel: { self.lock.withLock { state in @@ -640,8 +641,8 @@ internal struct LambdaHTTPServer { case .buffer(let buffer): state = .buffer(buffer) case .continuation(let continuation): - continuation?.resume(throwing: CancellationError()) state = .buffer([]) + continuation?.resume(throwing: CancellationError()) } } } @@ -650,6 +651,20 @@ internal struct LambdaHTTPServer { func makeAsyncIterator() -> Pool { self } + + struct PoolError: Error { + let cause: Cause + var message: String { + switch self.cause { + case .nextCalledTwice: + return "Concurrent invocations to next(). This is not allowed." + } + } + + enum Cause { + case nextCalledTwice([CheckedContinuation?]) + } + } } private struct LocalServerResponse: Sendable { diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index 2e042914..1e2fff2e 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -158,4 +158,32 @@ struct PoolTests { #expect(receivedValues.count == producerCount * messagesPerProducer) #expect(Set(receivedValues).count == producerCount * messagesPerProducer) } + + @Test + @available(LambdaSwift 2.0, *) + func testConcurrentNext() async throws { + let pool = LambdaHTTPServer.Pool() + + // Create two tasks that will both wait for elements to be available + await #expect(throws: LambdaHTTPServer.Pool.PoolError.self) { + try await withThrowingTaskGroup(of: Void.self) { group in + + // one of the two task will throw a PoolError + + group.addTask { + for try await _ in pool { + } + Issue.record("Loop 1 should not complete") + } + + group.addTask { + for try await _ in pool { + } + Issue.record("Loop 2 should not complete") + } + try await group.waitForAll() + } + } + } + } From 0cd73dad2ad8a5bb4f7a3e8ac62730246605fbb6 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 14:34:42 +0200 Subject: [PATCH 08/10] fix comment --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 0b661ad7..498d3e79 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -390,12 +390,11 @@ internal struct LambdaHTTPServer { logger[metadataKey: "requestId"] = "\(requestId)" logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") - // detect concurrent invocations of POST and gently decline the requests while we're processing one. self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) // wait for the lambda function to process the request - // when POST /invoke is called multiple times before a response is process, the - // `for try await ... in` loop will throw an error and we will return a 400 error to the client + // when POST /invoke is called multiple times before a response is processed, + // the `for try await ... in` loop will throw an error and we will return a 400 error to the client do { for try await response in self.responsePool { logger[metadataKey: "response requestId"] = "\(response.requestId ?? "nil")" @@ -427,6 +426,7 @@ internal struct LambdaHTTPServer { // This should not happen as the async iterator blocks until there is a response to process fatalError("No more responses to process - the async for loop should not return") } catch is LambdaHTTPServer.Pool.PoolError { + // detect concurrent invocations of POST and gently decline the requests while we're processing one. let response = LocalServerResponse( id: requestId, status: .badRequest, From bed1a5d847cf4e23a73ccf30a35a9987763579cb Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 21:31:00 +0200 Subject: [PATCH 09/10] Split file LocalServer and Pool --- .../Lambda+LocalServer+Pool.swift | 123 ++++++++++++++++++ .../AWSLambdaRuntime/Lambda+LocalServer.swift | 106 +-------------- 2 files changed, 124 insertions(+), 105 deletions(-) create mode 100644 Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift new file mode 100644 index 00000000..238530b7 --- /dev/null +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift @@ -0,0 +1,123 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if LocalServerSupport +import DequeModule +import Synchronization + +@available(LambdaSwift 2.0, *) +extension LambdaHTTPServer { + /// A shared data structure to store the current invocation or response requests and the continuation objects. + /// This data structure is shared between instances of the HTTPHandler + /// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function). + internal final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { + private let poolName: String + internal init(name: String = "Pool") { self.poolName = name } + + typealias Element = T + + enum State: ~Copyable { + case buffer(Deque) + case continuation(CheckedContinuation?) + } + + private let lock = Mutex(.buffer([])) + + /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element + public func push(_ invocation: T) { + + // if the iterator is waiting for an element on `next()``, give it to it + // otherwise, enqueue the element + let maybeContinuation = self.lock.withLock { state -> CheckedContinuation? in + switch consume state { + case .continuation(let continuation): + state = .buffer([]) + return continuation + + case .buffer(var buffer): + buffer.append(invocation) + state = .buffer(buffer) + return nil + } + } + + maybeContinuation?.resume(returning: invocation) + } + + func next() async throws -> T? { + // exit the async for loop if the task is cancelled + guard !Task.isCancelled else { + return nil + } + + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let (nextAction, nextError) = self.lock.withLock { state -> (T?, PoolError?) in + switch consume state { + case .buffer(var buffer): + if let first = buffer.popFirst() { + state = .buffer(buffer) + return (first, nil) + } else { + state = .continuation(continuation) + return (nil, nil) + } + + case .continuation(let previousContinuation): + state = .buffer([]) + return (nil, PoolError(cause: .nextCalledTwice([previousContinuation, continuation]))) + } + } + + if let nextError, + case let .nextCalledTwice(continuations) = nextError.cause + { + for continuation in continuations { continuation?.resume(throwing: nextError) } + } else if let nextAction { + continuation.resume(returning: nextAction) + } + } + } onCancel: { + self.lock.withLock { state in + switch consume state { + case .buffer(let buffer): + state = .buffer(buffer) + case .continuation(let continuation): + state = .buffer([]) + continuation?.resume(throwing: CancellationError()) + } + } + } + } + + func makeAsyncIterator() -> Pool { + self + } + + struct PoolError: Error { + let cause: Cause + var message: String { + switch self.cause { + case .nextCalledTwice: + return "Concurrent invocations to next(). This is not allowed." + } + } + + enum Cause { + case nextCalledTwice([CheckedContinuation?]) + } + } + } +} +#endif \ No newline at end of file diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 498d3e79..33fcabed 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -13,13 +13,11 @@ //===----------------------------------------------------------------------===// #if LocalServerSupport -import DequeModule import Dispatch import Logging import NIOCore import NIOHTTP1 import NIOPosix -import Synchronization // This functionality is designed for local testing when the LocalServerSupport trait is enabled. @@ -565,108 +563,6 @@ internal struct LambdaHTTPServer { } } - /// A shared data structure to store the current invocation or response requests and the continuation objects. - /// This data structure is shared between instances of the HTTPHandler - /// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function). - internal final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { - private let poolName: String - internal init(name: String = "Pool") { self.poolName = name } - - typealias Element = T - - enum State: ~Copyable { - case buffer(Deque) - case continuation(CheckedContinuation?) - } - - private let lock = Mutex(.buffer([])) - - /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element - public func push(_ invocation: T) { - - // if the iterator is waiting for an element on `next()``, give it to it - // otherwise, enqueue the element - let maybeContinuation = self.lock.withLock { state -> CheckedContinuation? in - switch consume state { - case .continuation(let continuation): - state = .buffer([]) - return continuation - - case .buffer(var buffer): - buffer.append(invocation) - state = .buffer(buffer) - return nil - } - } - - maybeContinuation?.resume(returning: invocation) - } - - func next() async throws -> T? { - // exit the async for loop if the task is cancelled - guard !Task.isCancelled else { - return nil - } - - return try await withTaskCancellationHandler { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let (nextAction, nextError) = self.lock.withLock { state -> (T?, PoolError?) in - switch consume state { - case .buffer(var buffer): - if let first = buffer.popFirst() { - state = .buffer(buffer) - return (first, nil) - } else { - state = .continuation(continuation) - return (nil, nil) - } - - case .continuation(let previousContinuation): - state = .buffer([]) - return (nil, PoolError(cause: .nextCalledTwice([previousContinuation, continuation]))) - } - } - - if let nextError, - case let .nextCalledTwice(continuations) = nextError.cause - { - for continuation in continuations { continuation?.resume(throwing: nextError) } - } else if let nextAction { - continuation.resume(returning: nextAction) - } - } - } onCancel: { - self.lock.withLock { state in - switch consume state { - case .buffer(let buffer): - state = .buffer(buffer) - case .continuation(let continuation): - state = .buffer([]) - continuation?.resume(throwing: CancellationError()) - } - } - } - } - - func makeAsyncIterator() -> Pool { - self - } - - struct PoolError: Error { - let cause: Cause - var message: String { - switch self.cause { - case .nextCalledTwice: - return "Concurrent invocations to next(). This is not allowed." - } - } - - enum Cause { - case nextCalledTwice([CheckedContinuation?]) - } - } - } - private struct LocalServerResponse: Sendable { let requestId: String? let status: HTTPResponseStatus? @@ -687,7 +583,7 @@ internal struct LambdaHTTPServer { self.final = final } } - + private struct LocalServerInvocation: Sendable { let requestId: String let request: ByteBuffer From 72b062f854db2f5e5ea005d5aac8598535cc65b6 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 21:54:42 +0200 Subject: [PATCH 10/10] use Result<> as return type --- .../Lambda+LocalServer+Pool.swift | 43 +++++++++++++------ .../AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift index 238530b7..c64a8183 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift @@ -29,7 +29,7 @@ extension LambdaHTTPServer { enum State: ~Copyable { case buffer(Deque) - case continuation(CheckedContinuation?) + case continuation(CheckedContinuation) } private let lock = Mutex(.buffer([])) @@ -55,6 +55,16 @@ extension LambdaHTTPServer { maybeContinuation?.resume(returning: invocation) } + /// AsyncSequence's standard next() function + /// Returns: + /// - nil when the task is cancelled + /// - an element when there is one in the queue + /// + /// When there is no element in the queue, the task will be suspended until an element is pushed to the queue + /// or the task is cancelled + /// + /// - Throws: PoolError if the next() function is called twice concurrently + @Sendable func next() async throws -> T? { // exit the async for loop if the task is cancelled guard !Task.isCancelled else { @@ -63,29 +73,34 @@ extension LambdaHTTPServer { return try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let (nextAction, nextError) = self.lock.withLock { state -> (T?, PoolError?) in + let nextAction: Result? = self.lock.withLock { state -> Result? in switch consume state { case .buffer(var buffer): if let first = buffer.popFirst() { state = .buffer(buffer) - return (first, nil) + return .success(first) } else { state = .continuation(continuation) - return (nil, nil) + return nil } case .continuation(let previousContinuation): state = .buffer([]) - return (nil, PoolError(cause: .nextCalledTwice([previousContinuation, continuation]))) + return .failure(PoolError(cause: .nextCalledTwice(previousContinuation))) } } - if let nextError, - case let .nextCalledTwice(continuations) = nextError.cause - { - for continuation in continuations { continuation?.resume(throwing: nextError) } - } else if let nextAction { - continuation.resume(returning: nextAction) + switch nextAction { + case .success(let action): + continuation.resume(returning: action) + case .failure(let error): + if case let .nextCalledTwice(continuation) = error.cause { + continuation.resume(throwing: error) + } + continuation.resume(throwing: error) + case .none: + // do nothing + break } } } onCancel: { @@ -95,7 +110,7 @@ extension LambdaHTTPServer { state = .buffer(buffer) case .continuation(let continuation): state = .buffer([]) - continuation?.resume(throwing: CancellationError()) + continuation.resume(throwing: CancellationError()) } } } @@ -115,9 +130,9 @@ extension LambdaHTTPServer { } enum Cause { - case nextCalledTwice([CheckedContinuation?]) + case nextCalledTwice(CheckedContinuation) } } } } -#endif \ No newline at end of file +#endif diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 33fcabed..61cadc94 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -583,7 +583,7 @@ internal struct LambdaHTTPServer { self.final = final } } - + private struct LocalServerInvocation: Sendable { let requestId: String let request: ByteBuffer