From 2020d6ba6c5b4fefe6a325b530f680220ad8ae47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Fri, 1 Aug 2025 10:48:04 +0400 Subject: [PATCH 01/30] return HTTP accepted on error --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- Sources/MockServer/MockHTTPServer.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index f536e3f4..86069aad 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -452,7 +452,7 @@ internal struct LambdaHTTPServer { await self.responsePool.push( LocalServerResponse( id: requestId, - status: .ok, + status: .accepted, // the local server has no mecanism to collect headers set by the lambda function headers: HTTPHeaders(), body: body, diff --git a/Sources/MockServer/MockHTTPServer.swift b/Sources/MockServer/MockHTTPServer.swift index 78685c52..92fd297f 100644 --- a/Sources/MockServer/MockHTTPServer.swift +++ b/Sources/MockServer/MockHTTPServer.swift @@ -224,7 +224,7 @@ struct HttpServer { } else if requestHead.uri.hasSuffix("/response") { responseStatus = .accepted } else if requestHead.uri.hasSuffix("/error") { - responseStatus = .ok + responseStatus = .accepted } else { responseStatus = .notFound } From 6e01c6ef85b12d38632ae349ed6971f5774c3fa4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Fri, 1 Aug 2025 11:16:54 +0400 Subject: [PATCH 02/30] force exit() when we loose connection to Lambda service --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index a1afb464..1962478f 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -330,6 +330,8 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { try channel.pipeline.syncOperations.addHTTPClientHandlers() // Lambda quotas... An invocation payload is maximal 6MB in size: // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html + // TODO: should we enforce this here ? What about streaming functions that + // support up to 20Mb responses ? try channel.pipeline.syncOperations.addHandler( NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024) ) @@ -364,6 +366,14 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { channel.closeFuture.whenComplete { result in self.assumeIsolated { runtimeClient in runtimeClient.channelClosed(channel) + + // at this stage, we lost the connection to the Lambda Service, + // this is very unlikely to happen when running in a lambda function deployed in the cloud + // however, this happens when performance testing against the MockServer + // shutdown this runtime. + // The Lambda service will create a new runtime environment anyway + runtimeClient.logger.trace("Connection to Lambda API lost, exiting") + exit(-1) } } From 166cd461de8f595da9af1d33be4e2d601ab392a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sun, 3 Aug 2025 18:54:23 +0200 Subject: [PATCH 03/30] propagate the connection closed info through a Future --- Sources/AWSLambdaRuntime/Lambda.swift | 12 ++++++++++++ .../LambdaRuntime+ServiceLifecycle.swift | 10 ++++++++-- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 8 +++++++- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 10 ++++++++-- 4 files changed, 35 insertions(+), 5 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index 5412c139..ae8ca78a 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -41,6 +41,17 @@ public enum Lambda { var logger = logger do { while !Task.isCancelled { + + if let runtimeClient = runtimeClient as? LambdaRuntimeClient, + let futureConnectionClosed = await runtimeClient.futureConnectionClosed + { + // Wait for the futureConnectionClosed to complete, + // which will happen when the Lambda HTTP Server (or MockServer) closes the connection + // This allows us to exit the run loop gracefully. + // The futureConnectionClosed is always an error, let it throw to finish the run loop. + let _ = try await futureConnectionClosed.get() + } + let (invocation, writer) = try await runtimeClient.nextInvocation() logger[metadataKey: "aws-request-id"] = "\(invocation.metadata.requestID)" @@ -84,6 +95,7 @@ public enum Lambda { } catch is CancellationError { // don't allow cancellation error to propagate further } + } /// The default EventLoop the Lambda is scheduled on. diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift index 1b05b1c2..eb3e210e 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift @@ -17,8 +17,14 @@ import ServiceLifecycle extension LambdaRuntime: Service { public func run() async throws { - try await cancelWhenGracefulShutdown { - try await self._run() + await cancelWhenGracefulShutdown { + do { + try await self._run() + } catch { + // catch top level error that have not been handled before + // this avoids the runtime to crash and generate a backtrace + self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + } } } } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 5f66df6f..7fc217a6 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -59,7 +59,13 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb #if !ServiceLifecycleSupport @inlinable internal func run() async throws { - try await _run() + do { + try await _run() + } catch { + // catch top level error that have not been handled before + // this avoids the runtime to crash and generate a backtrace + self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + } } #endif diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index 1962478f..a54c534d 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -92,6 +92,9 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case closed } + @usableFromInline + var futureConnectionClosed: EventLoopFuture? = nil + private let eventLoop: any EventLoop private let logger: Logger private let configuration: Configuration @@ -372,8 +375,10 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { // however, this happens when performance testing against the MockServer // shutdown this runtime. // The Lambda service will create a new runtime environment anyway - runtimeClient.logger.trace("Connection to Lambda API lost, exiting") - exit(-1) + runtimeClient.logger.trace("Connection to Lambda API. lost, exiting") + runtimeClient.futureConnectionClosed = runtimeClient.eventLoop.makeFailedFuture( + LambdaRuntimeError(code: .connectionToControlPlaneLost) + ) } } @@ -392,6 +397,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { return handler } } catch { + switch self.connectionState { case .disconnected, .connected: fatalError("Unexpected state: \(self.connectionState)") From a69ed54056867d823bc0c7263b17cfc70b8e954d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sun, 3 Aug 2025 19:04:50 +0200 Subject: [PATCH 04/30] fix typos --- Sources/AWSLambdaRuntime/Lambda.swift | 2 +- Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift | 2 +- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index ae8ca78a..9b96f7ce 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -48,7 +48,7 @@ public enum Lambda { // Wait for the futureConnectionClosed to complete, // which will happen when the Lambda HTTP Server (or MockServer) closes the connection // This allows us to exit the run loop gracefully. - // The futureConnectionClosed is always an error, let it throw to finish the run loop. + // The futureConnectionClosed is always an error, let it throw to terminate the Lambda run loop. let _ = try await futureConnectionClosed.get() } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift index eb3e210e..90c4b060 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift @@ -21,7 +21,7 @@ extension LambdaRuntime: Service { do { try await self._run() } catch { - // catch top level error that have not been handled before + // catch top level errors that have not been handled until now // this avoids the runtime to crash and generate a backtrace self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 7fc217a6..9d4c3c8b 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -62,7 +62,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb do { try await _run() } catch { - // catch top level error that have not been handled before + // catch top level errors that have not been handled until now // this avoids the runtime to crash and generate a backtrace self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) } From 04d9fc7cdce9a443533f6b1a178483285df8d1a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Sun, 3 Aug 2025 20:07:09 +0200 Subject: [PATCH 05/30] fix unit tests --- Sources/AWSLambdaRuntime/Lambda.swift | 4 ++++ .../LambdaRuntime+ServiceLifecycle.swift | 9 ++++++++- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 7 +++++++ Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 4 +--- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index 9b96f7ce..b24d98fb 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -52,6 +52,7 @@ public enum Lambda { let _ = try await futureConnectionClosed.get() } + logger.trace("Waiting for next invocation") let (invocation, writer) = try await runtimeClient.nextInvocation() logger[metadataKey: "aws-request-id"] = "\(invocation.metadata.requestID)" @@ -87,10 +88,13 @@ public enum Lambda { logger: logger ) ) + logger.trace("Handler finished processing invocation") } catch { + logger.trace("Handler failed processing invocation", metadata: ["Handler error": "\(error)"]) try await writer.reportError(error) continue } + logger.handler.metadata.removeValue(forKey: "aws-request-id") } } catch is CancellationError { // don't allow cancellation error to propagate further diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift index 90c4b060..50616df9 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift @@ -17,13 +17,20 @@ import ServiceLifecycle extension LambdaRuntime: Service { public func run() async throws { - await cancelWhenGracefulShutdown { + try await cancelWhenGracefulShutdown { do { try await self._run() } catch { // catch top level errors that have not been handled until now // this avoids the runtime to crash and generate a backtrace self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + if let error = error as? LambdaRuntimeError, + error.code != .connectionToControlPlaneLost + { + // if the error is a LambdaRuntimeError but not a connection error, + // we rethrow it to preserve existing behaviour + throw error + } } } } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 9d4c3c8b..daa8ed5f 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -65,6 +65,13 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb // catch top level errors that have not been handled until now // this avoids the runtime to crash and generate a backtrace self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + if let error = error as? LambdaRuntimeError, + error.code != .connectionToControlPlaneLost + { + // if the error is a LambdaRuntimeError but not a connection error, + // we rethrow it to preserve existing behaviour + throw error + } } } #endif diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index a54c534d..ece727ef 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -121,10 +121,8 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } catch { result = .failure(error) } - await runtime.close() - //try? await runtime.close() return try result.get() } @@ -375,7 +373,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { // however, this happens when performance testing against the MockServer // shutdown this runtime. // The Lambda service will create a new runtime environment anyway - runtimeClient.logger.trace("Connection to Lambda API. lost, exiting") + runtimeClient.logger.trace("Connection to Lambda Service HTTP Server lost, exiting") runtimeClient.futureConnectionClosed = runtimeClient.eventLoop.makeFailedFuture( LambdaRuntimeError(code: .connectionToControlPlaneLost) ) From 025a0e59e31df9275d23b4e8ed41122d8c8d527f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 16:00:39 +0200 Subject: [PATCH 06/30] simplify by checking connection state in the `nextInvocation()` call --- Sources/AWSLambdaRuntime/Lambda.swift | 16 +++----- .../LambdaRuntimeClient.swift | 39 ++++++++++++------- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index 80f862ce..f3edaccf 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -41,17 +41,13 @@ public enum Lambda { var logger = logger do { while !Task.isCancelled { - - if let runtimeClient = runtimeClient as? LambdaRuntimeClient, - let futureConnectionClosed = await runtimeClient.futureConnectionClosed - { - // Wait for the futureConnectionClosed to complete, - // which will happen when the Lambda HTTP Server (or MockServer) closes the connection - // This allows us to exit the run loop gracefully. - // The futureConnectionClosed is always an error, let it throw to terminate the Lambda run loop. - let _ = try await futureConnectionClosed.get() + + guard let runtimeClient = runtimeClient as? LambdaRuntimeClient, + await !runtimeClient.isConnectionStateDisconnected else { + logger.trace("Runtime client not connected, exiting run loop") + throw LambdaRuntimeError.init(code: .connectionToControlPlaneLost) } - + logger.trace("Waiting for next invocation") let (invocation, writer) = try await runtimeClient.nextInvocation() logger[metadataKey: "aws-request-id"] = "\(invocation.metadata.requestID)" diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index ece727ef..e06d1732 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -67,10 +67,23 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { NIOLoopBound>, any Error > - private enum ConnectionState { + private enum ConnectionState: Equatable { case disconnected case connecting([ConnectionContinuation]) case connected(Channel, LambdaChannelHandler) + + static func == (lhs: ConnectionState, rhs: ConnectionState) -> Bool { + switch (lhs, rhs) { + case (.disconnected, .disconnected): + return true + case (.connecting, .connecting): + return true + case (.connected, .connected): + return true + default: + return false + } + } } enum LambdaState { @@ -92,14 +105,22 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case closed } - @usableFromInline - var futureConnectionClosed: EventLoopFuture? = nil - private let eventLoop: any EventLoop private let logger: Logger private let configuration: Configuration private var connectionState: ConnectionState = .disconnected + + // adding this dynamic property because I can not give access to `connectionState` directly + // because it is private, depending on multiple private and non-Sendable types + // the only thing we need to know outside of this class is if the connection state is disconnected + @usableFromInline + var isConnectionStateDisconnected: Bool { + get { + self.connectionState == .disconnected + } + } + private var lambdaState: LambdaState = .idle(previousRequestID: nil) private var closingState: ClosingState = .notClosing @@ -367,16 +388,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { channel.closeFuture.whenComplete { result in self.assumeIsolated { runtimeClient in runtimeClient.channelClosed(channel) - - // at this stage, we lost the connection to the Lambda Service, - // this is very unlikely to happen when running in a lambda function deployed in the cloud - // however, this happens when performance testing against the MockServer - // shutdown this runtime. - // The Lambda service will create a new runtime environment anyway - runtimeClient.logger.trace("Connection to Lambda Service HTTP Server lost, exiting") - runtimeClient.futureConnectionClosed = runtimeClient.eventLoop.makeFailedFuture( - LambdaRuntimeError(code: .connectionToControlPlaneLost) - ) } } From ce8b5672aa4806a1d6d9f2652fed5452d15a5466 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 16:49:26 +0200 Subject: [PATCH 07/30] introducing a new connection state "lostConnection" --- Sources/AWSLambdaRuntime/Lambda.swift | 4 ++-- .../LambdaRuntimeClient.swift | 24 +++++++++++++++---- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index f3edaccf..683e3090 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -43,8 +43,8 @@ public enum Lambda { while !Task.isCancelled { guard let runtimeClient = runtimeClient as? LambdaRuntimeClient, - await !runtimeClient.isConnectionStateDisconnected else { - logger.trace("Runtime client not connected, exiting run loop") + await !runtimeClient.didLooseConnection else { + logger.trace("Runtime client disconnected, exiting run loop") throw LambdaRuntimeError.init(code: .connectionToControlPlaneLost) } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index e06d1732..d1900509 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -69,6 +69,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { private enum ConnectionState: Equatable { case disconnected + case lostConnection case connecting([ConnectionContinuation]) case connected(Channel, LambdaChannelHandler) @@ -80,6 +81,8 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { return true case (.connected, .connected): return true + case (.lostConnection, .lostConnection): + return true default: return false } @@ -115,9 +118,9 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { // because it is private, depending on multiple private and non-Sendable types // the only thing we need to know outside of this class is if the connection state is disconnected @usableFromInline - var isConnectionStateDisconnected: Bool { + var didLooseConnection: Bool { get { - self.connectionState == .disconnected + self.connectionState == .lostConnection } } @@ -179,12 +182,16 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case .connected(let channel, _): channel.close(mode: .all, promise: nil) + case .lostConnection: + // this should never happen. + fatalError("Lost connection to Lambda service while closing the runtime client") } } } @usableFromInline func nextInvocation() async throws -> (Invocation, Writer) { + try await withTaskCancellationHandler { switch self.lambdaState { case .idle: @@ -284,7 +291,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { private func channelClosed(_ channel: any Channel) { switch (self.connectionState, self.closingState) { - case (_, .closed): + case (_, .closed), (.lostConnection, _): fatalError("Invalid state: \(self.connectionState), \(self.closingState)") case (.disconnected, .notClosing): @@ -344,6 +351,10 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { return loopBound.value case .connected(_, let handler): return handler + + case .lostConnection: + // this should never happen + fatalError("Lost connection to Lambda service") } let bootstrap = ClientBootstrap(group: self.eventLoop) @@ -392,7 +403,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } switch self.connectionState { - case .disconnected, .connected: + case .disconnected, .connected, .lostConnection: fatalError("Unexpected state: \(self.connectionState)") case .connecting(let array): @@ -408,7 +419,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } catch { switch self.connectionState { - case .disconnected, .connected: + case .disconnected, .connected, .lostConnection: fatalError("Unexpected state: \(self.connectionState)") case .connecting(let array): @@ -456,6 +467,9 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate { isolated.connectionState = .disconnected + case .lostConnection: + // this should never happen + fatalError("Lost connection to Lambda service") } } } From be4cb20bf1d0fc656a4ffa56258a528e995d3e3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 16:58:10 +0200 Subject: [PATCH 08/30] add state change --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index d1900509..eef8cbcd 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -399,6 +399,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { channel.closeFuture.whenComplete { result in self.assumeIsolated { runtimeClient in runtimeClient.channelClosed(channel) + runtimeClient.connectionState = .lostConnection } } From b37ea0ee389fafa16d5ab8181900fd68b27753cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:26:42 +0200 Subject: [PATCH 09/30] fix lost continuation --- Sources/AWSLambdaRuntime/Lambda.swift | 6 --- .../LambdaRuntimeClient.swift | 46 ++++++++++++------- .../AWSLambdaRuntime/LambdaRuntimeError.swift | 1 - 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index 683e3090..042d9c6f 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -42,12 +42,6 @@ public enum Lambda { do { while !Task.isCancelled { - guard let runtimeClient = runtimeClient as? LambdaRuntimeClient, - await !runtimeClient.didLooseConnection else { - logger.trace("Runtime client disconnected, exiting run loop") - throw LambdaRuntimeError.init(code: .connectionToControlPlaneLost) - } - logger.trace("Waiting for next invocation") let (invocation, writer) = try await runtimeClient.nextInvocation() logger[metadataKey: "aws-request-id"] = "\(invocation.metadata.requestID)" diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index eef8cbcd..e0366a77 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -114,16 +114,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { private var connectionState: ConnectionState = .disconnected - // adding this dynamic property because I can not give access to `connectionState` directly - // because it is private, depending on multiple private and non-Sendable types - // the only thing we need to know outside of this class is if the connection state is disconnected - @usableFromInline - var didLooseConnection: Bool { - get { - self.connectionState == .lostConnection - } - } - private var lambdaState: LambdaState = .idle(previousRequestID: nil) private var closingState: ClosingState = .notClosing @@ -146,7 +136,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { result = .failure(error) } await runtime.close() - return try result.get() } @@ -182,9 +171,9 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case .connected(let channel, _): channel.close(mode: .all, promise: nil) + case .lostConnection: - // this should never happen. - fatalError("Lost connection to Lambda service while closing the runtime client") + continuation.resume() } } } @@ -192,12 +181,17 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { @usableFromInline func nextInvocation() async throws -> (Invocation, Writer) { - try await withTaskCancellationHandler { + if self.connectionState == .lostConnection { + throw LambdaRuntimeError(code: .connectionToControlPlaneLost) + } + + return try await withTaskCancellationHandler { switch self.lambdaState { case .idle: self.lambdaState = .waitingForNextInvocation let handler = try await self.makeOrGetConnection() let invocation = try await handler.nextInvocation() + guard case .waitingForNextInvocation = self.lambdaState else { fatalError("Invalid state: \(self.lambdaState)") } @@ -312,7 +306,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case (.connecting(let array), .notClosing): self.connectionState = .disconnected for continuation in array { - continuation.resume(throwing: LambdaRuntimeError(code: .lostConnectionToControlPlane)) + continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost)) } case (.connecting(let array), .closing(let continuation)): @@ -326,6 +320,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case (.connected, .notClosing): self.connectionState = .disconnected + case (.connected, .closing(let continuation)): self.connectionState = .disconnected @@ -398,13 +393,24 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { ) channel.closeFuture.whenComplete { result in self.assumeIsolated { runtimeClient in + + // resume any pending continuation on the handler + if case .connected(_ , let handler) = runtimeClient.connectionState { + if case .connected(_ , let lambdaState) = handler.state { + if case .waitingForNextInvocation(let continuation) = lambdaState { + continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost)) + } + } + } + + // close the channel runtimeClient.channelClosed(channel) runtimeClient.connectionState = .lostConnection } } switch self.connectionState { - case .disconnected, .connected, .lostConnection: + case .disconnected, .connected: fatalError("Unexpected state: \(self.connectionState)") case .connecting(let array): @@ -416,11 +422,14 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } } return handler + case .lostConnection: + // this should never happen + fatalError("Lost connection to Lambda service") } } catch { switch self.connectionState { - case .disconnected, .connected, .lostConnection: + case .disconnected, .connected: fatalError("Unexpected state: \(self.connectionState)") case .connecting(let array): @@ -431,6 +440,9 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } } throw error + case .lostConnection: + // this should never happen + fatalError("Lost connection to Lambda service") } } } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift index a9c0cbca..bc4865db 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeError.swift @@ -25,7 +25,6 @@ package struct LambdaRuntimeError: Error { case writeAfterFinishHasBeenSent case finishAfterFinishHasBeenSent - case lostConnectionToControlPlane case unexpectedStatusCodeForRequest case nextInvocationMissingHeaderRequestID From cd0094823fcb54277bd0d682ea756ead78e066b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:32:46 +0200 Subject: [PATCH 10/30] fix compilation error --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index e0366a77..27eb74ea 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -515,7 +515,7 @@ private final class LambdaChannelHandler } } - private var state: State = .disconnected + var state: State = .disconnected private var lastError: Error? private var reusableErrorBuffer: ByteBuffer? private let logger: Logger From 008c54246390ae45ace08ba8b0860649958fc5f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:33:06 +0200 Subject: [PATCH 11/30] DRY: move the error handling to the _run() function --- .../LambdaRuntime+ServiceLifecycle.swift | 15 +--------- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 28 +++++++++---------- 2 files changed, 15 insertions(+), 28 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift index 50616df9..1b05b1c2 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime+ServiceLifecycle.swift @@ -18,20 +18,7 @@ import ServiceLifecycle extension LambdaRuntime: Service { public func run() async throws { try await cancelWhenGracefulShutdown { - do { - try await self._run() - } catch { - // catch top level errors that have not been handled until now - // this avoids the runtime to crash and generate a backtrace - self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) - if let error = error as? LambdaRuntimeError, - error.code != .connectionToControlPlaneLost - { - // if the error is a LambdaRuntimeError but not a connection error, - // we rethrow it to preserve existing behaviour - throw error - } - } + try await self._run() } } } diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index daa8ed5f..4d7450e5 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -59,20 +59,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb #if !ServiceLifecycleSupport @inlinable internal func run() async throws { - do { - try await _run() - } catch { - // catch top level errors that have not been handled until now - // this avoids the runtime to crash and generate a backtrace - self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) - if let error = error as? LambdaRuntimeError, - error.code != .connectionToControlPlaneLost - { - // if the error is a LambdaRuntimeError but not a connection error, - // we rethrow it to preserve existing behaviour - throw error - } - } + try await _run() } #endif @@ -107,6 +94,7 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb let ip = String(ipAndPort[0]) guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) } + do { try await LambdaRuntimeClient.withRuntimeClient( configuration: .init(ip: ip, port: port), eventLoop: self.eventLoop, @@ -118,6 +106,18 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb logger: self.logger ) } + } catch { + // catch top level errors that have not been handled until now + // this avoids the runtime to crash and generate a backtrace + self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + if let error = error as? LambdaRuntimeError, + error.code != .connectionToControlPlaneLost + { + // if the error is a LambdaRuntimeError but not a connection error, + // we rethrow it to preserve existing behaviour + throw error + } + } } else { From 9dcb4b33115b0667a2f1a6542e5215c30237dd2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:51:33 +0200 Subject: [PATCH 12/30] fix a case where continuation was resumed twice --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index 27eb74ea..ff03e329 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -937,6 +937,7 @@ extension LambdaChannelHandler: ChannelInboundHandler { // fail any pending responses with last error or assume peer disconnected switch self.state { case .connected(_, .waitingForNextInvocation(let continuation)): + self.state = .disconnected continuation.resume(throwing: self.lastError ?? ChannelError.ioOnClosedChannel) default: break From f2d94a2b5c68f4ad45b1292c7f391071f098a204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:52:45 +0200 Subject: [PATCH 13/30] fix unit test --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index ff03e329..a0898971 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -348,8 +348,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { return handler case .lostConnection: - // this should never happen - fatalError("Lost connection to Lambda service") + throw LambdaRuntimeError(code: .connectionToControlPlaneLost) } let bootstrap = ClientBootstrap(group: self.eventLoop) From 852391e51635c23f21bda3705ac2a997c28e935b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:53:11 +0200 Subject: [PATCH 14/30] swift format --- Sources/AWSLambdaRuntime/Lambda.swift | 2 +- Sources/AWSLambdaRuntime/LambdaRuntime.swift | 42 +++++++++---------- .../LambdaRuntimeClient.swift | 11 +++-- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda.swift b/Sources/AWSLambdaRuntime/Lambda.swift index 042d9c6f..f6223cb5 100644 --- a/Sources/AWSLambdaRuntime/Lambda.swift +++ b/Sources/AWSLambdaRuntime/Lambda.swift @@ -41,7 +41,7 @@ public enum Lambda { var logger = logger do { while !Task.isCancelled { - + logger.trace("Waiting for next invocation") let (invocation, writer) = try await runtimeClient.nextInvocation() logger[metadataKey: "aws-request-id"] = "\(invocation.metadata.requestID)" diff --git a/Sources/AWSLambdaRuntime/LambdaRuntime.swift b/Sources/AWSLambdaRuntime/LambdaRuntime.swift index 4d7450e5..a639ac31 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntime.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntime.swift @@ -95,29 +95,29 @@ public final class LambdaRuntime: Sendable where Handler: StreamingLamb guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) } do { - try await LambdaRuntimeClient.withRuntimeClient( - configuration: .init(ip: ip, port: port), - eventLoop: self.eventLoop, - logger: self.logger - ) { runtimeClient in - try await Lambda.runLoop( - runtimeClient: runtimeClient, - handler: handler, + try await LambdaRuntimeClient.withRuntimeClient( + configuration: .init(ip: ip, port: port), + eventLoop: self.eventLoop, logger: self.logger - ) - } - } catch { - // catch top level errors that have not been handled until now - // this avoids the runtime to crash and generate a backtrace - self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) - if let error = error as? LambdaRuntimeError, - error.code != .connectionToControlPlaneLost - { - // if the error is a LambdaRuntimeError but not a connection error, - // we rethrow it to preserve existing behaviour - throw error + ) { runtimeClient in + try await Lambda.runLoop( + runtimeClient: runtimeClient, + handler: handler, + logger: self.logger + ) + } + } catch { + // catch top level errors that have not been handled until now + // this avoids the runtime to crash and generate a backtrace + self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"]) + if let error = error as? LambdaRuntimeError, + error.code != .connectionToControlPlaneLost + { + // if the error is a LambdaRuntimeError but not a connection error, + // we rethrow it to preserve existing behaviour + throw error + } } - } } else { diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index a0898971..40f84f89 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -72,7 +72,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case lostConnection case connecting([ConnectionContinuation]) case connected(Channel, LambdaChannelHandler) - + static func == (lhs: ConnectionState, rhs: ConnectionState) -> Bool { switch (lhs, rhs) { case (.disconnected, .disconnected): @@ -171,7 +171,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case .connected(let channel, _): channel.close(mode: .all, promise: nil) - + case .lostConnection: continuation.resume() } @@ -191,7 +191,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { self.lambdaState = .waitingForNextInvocation let handler = try await self.makeOrGetConnection() let invocation = try await handler.nextInvocation() - + guard case .waitingForNextInvocation = self.lambdaState else { fatalError("Invalid state: \(self.lambdaState)") } @@ -320,7 +320,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case (.connected, .notClosing): self.connectionState = .disconnected - case (.connected, .closing(let continuation)): self.connectionState = .disconnected @@ -394,8 +393,8 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { self.assumeIsolated { runtimeClient in // resume any pending continuation on the handler - if case .connected(_ , let handler) = runtimeClient.connectionState { - if case .connected(_ , let lambdaState) = handler.state { + if case .connected(_, let handler) = runtimeClient.connectionState { + if case .connected(_, let lambdaState) = handler.state { if case .waitingForNextInvocation(let continuation) = lambdaState { continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost)) } From b13bf5cfa0bf5c28c6eb0a31a3218a48f4a26537 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 7 Aug 2025 23:59:23 +0200 Subject: [PATCH 15/30] remove comment on max payload size --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 2 -- 1 file changed, 2 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index 40f84f89..008b9b42 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -356,8 +356,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { try channel.pipeline.syncOperations.addHTTPClientHandlers() // Lambda quotas... An invocation payload is maximal 6MB in size: // https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html - // TODO: should we enforce this here ? What about streaming functions that - // support up to 20Mb responses ? try channel.pipeline.syncOperations.addHandler( NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024) ) From 9c283c3f77270599db7f838c4425fbd9fb602bcb Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Sun, 24 Aug 2025 13:49:28 +0200 Subject: [PATCH 16/30] further simplify by removing the new state `lostConnection` --- .../LambdaRuntimeClient.swift | 29 ++----------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index 008b9b42..91d478a9 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -69,7 +69,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { private enum ConnectionState: Equatable { case disconnected - case lostConnection case connecting([ConnectionContinuation]) case connected(Channel, LambdaChannelHandler) @@ -81,8 +80,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { return true case (.connected, .connected): return true - case (.lostConnection, .lostConnection): - return true default: return false } @@ -171,9 +168,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { case .connected(let channel, _): channel.close(mode: .all, promise: nil) - - case .lostConnection: - continuation.resume() } } } @@ -181,11 +175,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { @usableFromInline func nextInvocation() async throws -> (Invocation, Writer) { - if self.connectionState == .lostConnection { - throw LambdaRuntimeError(code: .connectionToControlPlaneLost) - } - - return try await withTaskCancellationHandler { + try await withTaskCancellationHandler { switch self.lambdaState { case .idle: self.lambdaState = .waitingForNextInvocation @@ -285,7 +275,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { private func channelClosed(_ channel: any Channel) { switch (self.connectionState, self.closingState) { - case (_, .closed), (.lostConnection, _): + case (_, .closed): fatalError("Invalid state: \(self.connectionState), \(self.closingState)") case (.disconnected, .notClosing): @@ -345,9 +335,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { return loopBound.value case .connected(_, let handler): return handler - - case .lostConnection: - throw LambdaRuntimeError(code: .connectionToControlPlaneLost) } let bootstrap = ClientBootstrap(group: self.eventLoop) @@ -401,7 +388,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { // close the channel runtimeClient.channelClosed(channel) - runtimeClient.connectionState = .lostConnection + runtimeClient.connectionState = .disconnected } } @@ -418,9 +405,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } } return handler - case .lostConnection: - // this should never happen - fatalError("Lost connection to Lambda service") } } catch { @@ -436,9 +420,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { } } throw error - case .lostConnection: - // this should never happen - fatalError("Lost connection to Lambda service") } } } @@ -475,10 +456,6 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate { } isolated.connectionState = .disconnected - - case .lostConnection: - // this should never happen - fatalError("Lost connection to Lambda service") } } } From a620a2fcf0d097ee0a0b871f3e6642400d58f1db Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Sun, 24 Aug 2025 13:55:15 +0200 Subject: [PATCH 17/30] remove unecessary code --- .../AWSLambdaRuntime/LambdaRuntimeClient.swift | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index 91d478a9..492bd903 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -67,23 +67,10 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { NIOLoopBound>, any Error > - private enum ConnectionState: Equatable { + private enum ConnectionState { case disconnected case connecting([ConnectionContinuation]) case connected(Channel, LambdaChannelHandler) - - static func == (lhs: ConnectionState, rhs: ConnectionState) -> Bool { - switch (lhs, rhs) { - case (.disconnected, .disconnected): - return true - case (.connecting, .connecting): - return true - case (.connected, .connected): - return true - default: - return false - } - } } enum LambdaState { From f6712286c709933072f076034652e8a45a41d504 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Mon, 25 Aug 2025 08:18:16 +0200 Subject: [PATCH 18/30] restrict access to channel handler state variable --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index 492bd903..ad37218a 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -475,7 +475,7 @@ private final class LambdaChannelHandler } } - var state: State = .disconnected + private(set) var state: State = .disconnected private var lastError: Error? private var reusableErrorBuffer: ByteBuffer? private let logger: Logger From b0234f5c3f576464c52caf9c1fe12702cc8e6a9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Wed, 27 Aug 2025 16:50:58 +0200 Subject: [PATCH 19/30] add a unit test to verify that an error is thrown when the connection is closed, whatever is the state of the LambdaChannelHandler --- .../LambdaRuntimeClientTests.swift | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index 35e89c3a..b990d0d6 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -238,4 +238,54 @@ struct LambdaRuntimeClientTests { } } } + + @Test("Server closing the connection when waiting for next invocation throws an error") + func testChannelCloseFutureWithWaitingForNextInvocation() async throws { + struct DisconnectBehavior: LambdaServerBehavior { + func getInvocation() -> GetInvocationResult { + // Return "disconnect" to trigger server closing the connection + .success(("disconnect", "0")) + } + + func processResponse(requestId: String, response: String?) -> Result { + Issue.record("should not process response") + return .failure(.internalServerError) + } + + func processError(requestId: String, error: ErrorResponse) -> Result { + Issue.record("should not report error") + return .failure(.internalServerError) + } + + func processInitError(error: ErrorResponse) -> Result { + Issue.record("should not report init error") + return .failure(.internalServerError) + } + } + + try await withMockServer(behaviour: DisconnectBehavior()) { port in + let configuration = LambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: port) + + try await LambdaRuntimeClient.withRuntimeClient( + configuration: configuration, + eventLoop: NIOSingletons.posixEventLoopGroup.next(), + logger: self.logger + ) { runtimeClient in + do { + // This should fail when server closes connection + let _ = try await runtimeClient.nextInvocation() + Issue.record("Expected connection error but got successful invocation") + + // Verify we get an error when the connection is closed. + // the error is either a ChannelError or a LambdaRuntimeError + } catch let error as LambdaRuntimeError { + #expect(error.code == .connectionToControlPlaneLost) + } catch let error as ChannelError { + #expect(error == .ioOnClosedChannel) + } catch { + Issue.record("Unexpected error type: \(error)") + } + } + } + } } From 482e09e994286243c503b6684bfffbbaaea53f89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Wed, 27 Aug 2025 17:20:15 +0200 Subject: [PATCH 20/30] swift-format --- Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index b990d0d6..83646559 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -276,11 +276,11 @@ struct LambdaRuntimeClientTests { let _ = try await runtimeClient.nextInvocation() Issue.record("Expected connection error but got successful invocation") - // Verify we get an error when the connection is closed. - // the error is either a ChannelError or a LambdaRuntimeError } catch let error as LambdaRuntimeError { + // Verify we get an error when the connection is closed. #expect(error.code == .connectionToControlPlaneLost) } catch let error as ChannelError { + // the error is either a ChannelError or a LambdaRuntimeError #expect(error == .ioOnClosedChannel) } catch { Issue.record("Unexpected error type: \(error)") From 156e6c7265346b5e1177293ffaf8b8130685a88d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Wed, 27 Aug 2025 22:14:10 +0200 Subject: [PATCH 21/30] make sure connectionToControlPlaneLost error is triggered by the test --- .../LambdaRuntimeClientTests.swift | 82 +++++++++++++------ .../MockLambdaServer.swift | 28 +++++-- 2 files changed, 78 insertions(+), 32 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index 83646559..387a6e5b 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -42,10 +42,10 @@ struct LambdaRuntimeClientTests { .success((self.requestId, self.event)) } - func processResponse(requestId: String, response: String?) -> Result { + func processResponse(requestId: String, response: String?) -> Result { #expect(self.requestId == requestId) #expect(self.event == response) - return .success(()) + return .success(nil) } func processError(requestId: String, error: ErrorResponse) -> Result { @@ -102,9 +102,9 @@ struct LambdaRuntimeClientTests { .success((self.requestId, self.event)) } - func processResponse(requestId: String, response: String?) -> Result { + func processResponse(requestId: String, response: String?) -> Result { #expect(self.requestId == requestId) - return .success(()) + return .success(nil) } mutating func captureHeaders(_ headers: HTTPHeaders) { @@ -197,10 +197,10 @@ struct LambdaRuntimeClientTests { .success((self.requestId, self.event)) } - func processResponse(requestId: String, response: String?) -> Result { + func processResponse(requestId: String, response: String?) -> Result { #expect(self.requestId == requestId) #expect(self.event == response) - return .success(()) + return .success(nil) } func processError(requestId: String, error: ErrorResponse) -> Result { @@ -239,31 +239,56 @@ struct LambdaRuntimeClientTests { } } - @Test("Server closing the connection when waiting for next invocation throws an error") - func testChannelCloseFutureWithWaitingForNextInvocation() async throws { - struct DisconnectBehavior: LambdaServerBehavior { - func getInvocation() -> GetInvocationResult { - // Return "disconnect" to trigger server closing the connection - .success(("disconnect", "0")) - } + struct DisconnectAfterSendingResponseBehavior: LambdaServerBehavior { + func getInvocation() -> GetInvocationResult { + .success((UUID().uuidString, "hello")) + } - func processResponse(requestId: String, response: String?) -> Result { - Issue.record("should not process response") - return .failure(.internalServerError) - } + func processResponse(requestId: String, response: String?) -> Result { + // Return "disconnect" to trigger server closing the connection + // after having accepted a response + .success("delayed-disconnect") + } - func processError(requestId: String, error: ErrorResponse) -> Result { - Issue.record("should not report error") - return .failure(.internalServerError) - } + func processError(requestId: String, error: ErrorResponse) -> Result { + Issue.record("should not report error") + return .failure(.internalServerError) + } - func processInitError(error: ErrorResponse) -> Result { - Issue.record("should not report init error") - return .failure(.internalServerError) - } + func processInitError(error: ErrorResponse) -> Result { + Issue.record("should not report init error") + return .failure(.internalServerError) + } + } + + struct DisconnectBehavior: LambdaServerBehavior { + func getInvocation() -> GetInvocationResult { + // Return "disconnect" to trigger server closing the connection + .success(("disconnect", "0")) } - try await withMockServer(behaviour: DisconnectBehavior()) { port in + func processResponse(requestId: String, response: String?) -> Result { + Issue.record("should not process response") + return .failure(.internalServerError) + } + + func processError(requestId: String, error: ErrorResponse) -> Result { + Issue.record("should not report error") + return .failure(.internalServerError) + } + + func processInitError(error: ErrorResponse) -> Result { + Issue.record("should not report init error") + return .failure(.internalServerError) + } + } + + @Test( + "Server closing the connection when waiting for next invocation throws an error", + arguments: [DisconnectAfterSendingResponseBehavior(), DisconnectBehavior()] as [any LambdaServerBehavior] + ) + func testChannelCloseFutureWithWaitingForNextInvocation(behavior: LambdaServerBehavior) async throws { + try await withMockServer(behaviour: behavior) { port in let configuration = LambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: port) try await LambdaRuntimeClient.withRuntimeClient( @@ -273,7 +298,12 @@ struct LambdaRuntimeClientTests { ) { runtimeClient in do { // This should fail when server closes connection + let (_, writer) = try await runtimeClient.nextInvocation() + let response = ByteBuffer(string: "hello") + try await writer.writeAndFinish(response) + let _ = try await runtimeClient.nextInvocation() + Issue.record("Expected connection error but got successful invocation") } catch let error as LambdaRuntimeError { diff --git a/Tests/AWSLambdaRuntimeTests/MockLambdaServer.swift b/Tests/AWSLambdaRuntimeTests/MockLambdaServer.swift index 5d307ce2..d5ad8876 100644 --- a/Tests/AWSLambdaRuntimeTests/MockLambdaServer.swift +++ b/Tests/AWSLambdaRuntimeTests/MockLambdaServer.swift @@ -160,6 +160,7 @@ final class HTTPHandler: ChannelInboundHandler { var responseStatus: HTTPResponseStatus var responseBody: String? var responseHeaders: [(String, String)]? + var disconnectAfterSend = false // Handle post-init-error first to avoid matching the less specific post-error suffix. if request.head.uri.hasSuffix(Consts.postInitErrorURL) { @@ -202,8 +203,11 @@ final class HTTPHandler: ChannelInboundHandler { behavior.captureHeaders(request.head.headers) switch behavior.processResponse(requestId: String(requestId), response: requestBody) { - case .success: + case .success(let next): responseStatus = .accepted + if next == "delayed-disconnect" { + disconnectAfterSend = true + } case .failure(let error): responseStatus = .init(statusCode: error.rawValue) } @@ -223,14 +227,21 @@ final class HTTPHandler: ChannelInboundHandler { } else { responseStatus = .notFound } - self.writeResponse(context: context, status: responseStatus, headers: responseHeaders, body: responseBody) + self.writeResponse( + context: context, + status: responseStatus, + headers: responseHeaders, + body: responseBody, + closeConnection: disconnectAfterSend + ) } func writeResponse( context: ChannelHandlerContext, status: HTTPResponseStatus, headers: [(String, String)]? = nil, - body: String? = nil + body: String? = nil, + closeConnection: Bool = false ) { var headers = HTTPHeaders(headers ?? []) headers.add(name: "Content-Length", value: "\(body?.utf8.count ?? 0)") @@ -253,14 +264,19 @@ final class HTTPHandler: ChannelInboundHandler { } let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop) - let keepAlive = self.keepAlive context.writeAndFlush(wrapOutboundOut(.end(nil))).whenComplete { result in + let context = loopBoundContext.value + if closeConnection { + context.close(promise: nil) + return + } + if case .failure(let error) = result { logger.error("write error \(error)") } + if !keepAlive { - let context = loopBoundContext.value context.close().whenFailure { error in logger.error("close error \(error)") } @@ -271,7 +287,7 @@ final class HTTPHandler: ChannelInboundHandler { protocol LambdaServerBehavior: Sendable { func getInvocation() -> GetInvocationResult - func processResponse(requestId: String, response: String?) -> Result + func processResponse(requestId: String, response: String?) -> Result func processError(requestId: String, error: ErrorResponse) -> Result func processInitError(error: ErrorResponse) -> Result From a0b1d572eaa11e5700d1603d44e78f7ff259145c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Wed, 27 Aug 2025 22:53:06 +0200 Subject: [PATCH 22/30] give more time to the server to close the connection --- .../LambdaRuntimeClientTests.swift | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index 387a6e5b..22196c93 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -245,8 +245,8 @@ struct LambdaRuntimeClientTests { } func processResponse(requestId: String, response: String?) -> Result { - // Return "disconnect" to trigger server closing the connection - // after having accepted a response + // Return "delayed-disconnect" to trigger server closing the connection + // after having accepted the first response .success("delayed-disconnect") } @@ -261,15 +261,13 @@ struct LambdaRuntimeClientTests { } } - struct DisconnectBehavior: LambdaServerBehavior { + struct DisconnectBehavior: LambdaServerBehavior { func getInvocation() -> GetInvocationResult { - // Return "disconnect" to trigger server closing the connection .success(("disconnect", "0")) } func processResponse(requestId: String, response: String?) -> Result { - Issue.record("should not process response") - return .failure(.internalServerError) + .success(nil) } func processError(requestId: String, error: ErrorResponse) -> Result { @@ -285,7 +283,7 @@ struct LambdaRuntimeClientTests { @Test( "Server closing the connection when waiting for next invocation throws an error", - arguments: [DisconnectAfterSendingResponseBehavior(), DisconnectBehavior()] as [any LambdaServerBehavior] + arguments: [DisconnectBehavior(), DisconnectAfterSendingResponseBehavior()] as [any LambdaServerBehavior] ) func testChannelCloseFutureWithWaitingForNextInvocation(behavior: LambdaServerBehavior) async throws { try await withMockServer(behaviour: behavior) { port in @@ -297,12 +295,14 @@ struct LambdaRuntimeClientTests { logger: self.logger ) { runtimeClient in do { - // This should fail when server closes connection let (_, writer) = try await runtimeClient.nextInvocation() - let response = ByteBuffer(string: "hello") - try await writer.writeAndFinish(response) + try await writer.writeAndFinish(ByteBuffer(string: "hello")) - let _ = try await runtimeClient.nextInvocation() + // continue to simulate traffic until the server reports it has closed the connection + for i in 1...100 { + let (_, writer2) = try await runtimeClient.nextInvocation() + try await writer2.writeAndFinish(ByteBuffer(string: "hello")) + } Issue.record("Expected connection error but got successful invocation") From 54459e05eb708b4b5435f05112220e13a2f72e27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Wed, 27 Aug 2025 23:11:00 +0200 Subject: [PATCH 23/30] add catch for IOError --- Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index 22196c93..8da3e374 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -307,11 +307,11 @@ struct LambdaRuntimeClientTests { Issue.record("Expected connection error but got successful invocation") } catch let error as LambdaRuntimeError { - // Verify we get an error when the connection is closed. #expect(error.code == .connectionToControlPlaneLost) } catch let error as ChannelError { - // the error is either a ChannelError or a LambdaRuntimeError #expect(error == .ioOnClosedChannel) + } catch let error as IOError { + #expect(error.errnoCode == ECONNRESET || error.errnoCode == EPIPE) } catch { Issue.record("Unexpected error type: \(error)") } From 89cd2599fdeb22987f061a4276bde505cdbbb6c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Wed, 27 Aug 2025 23:21:21 +0200 Subject: [PATCH 24/30] swift-format --- Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index 8da3e374..9a64d8f2 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -261,7 +261,7 @@ struct LambdaRuntimeClientTests { } } - struct DisconnectBehavior: LambdaServerBehavior { + struct DisconnectBehavior: LambdaServerBehavior { func getInvocation() -> GetInvocationResult { .success(("disconnect", "0")) } From 6b0795582df96e9b9714ac86db2e73972c16f713 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Wed, 27 Aug 2025 23:51:06 +0200 Subject: [PATCH 25/30] remove compilation warning --- Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index 9a64d8f2..94c0f178 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -299,7 +299,7 @@ struct LambdaRuntimeClientTests { try await writer.writeAndFinish(ByteBuffer(string: "hello")) // continue to simulate traffic until the server reports it has closed the connection - for i in 1...100 { + for _ in 1...1000 { let (_, writer2) = try await runtimeClient.nextInvocation() try await writer2.writeAndFinish(ByteBuffer(string: "hello")) } From 86ccbf476381f8daf9f27a922b00556a88dacde5 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 28 Aug 2025 08:10:37 +0200 Subject: [PATCH 26/30] improve test with a timeout --- .../LambdaRuntimeClient.swift | 4 +- .../LambdaRuntimeClientTests.swift | 24 ++++--- Tests/AWSLambdaRuntimeTests/Timeout.swift | 67 +++++++++++++++++++ 3 files changed, 86 insertions(+), 9 deletions(-) create mode 100644 Tests/AWSLambdaRuntimeTests/Timeout.swift diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index ad37218a..61d49ad9 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -162,7 +162,9 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { @usableFromInline func nextInvocation() async throws -> (Invocation, Writer) { - try await withTaskCancellationHandler { + try Task.checkCancellation() + + return try await withTaskCancellationHandler { switch self.lambdaState { case .idle: self.lambdaState = .waitingForNextInvocation diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index 94c0f178..d6aa3cb1 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -295,22 +295,30 @@ struct LambdaRuntimeClientTests { logger: self.logger ) { runtimeClient in do { - let (_, writer) = try await runtimeClient.nextInvocation() - try await writer.writeAndFinish(ByteBuffer(string: "hello")) - // continue to simulate traffic until the server reports it has closed the connection - for _ in 1...1000 { - let (_, writer2) = try await runtimeClient.nextInvocation() - try await writer2.writeAndFinish(ByteBuffer(string: "hello")) + // simulate traffic until the server reports it has closed the connection + // or a timeout, whichever comes first + // result is ignored here, either there is a connection error or a timeout + let _ = try await timeout(deadline: .seconds(1)) { + while true { + let (_, writer) = try await runtimeClient.nextInvocation() + try await writer.writeAndFinish(ByteBuffer(string: "hello")) + } } + // result is ignored here, we should never reach this line + Issue.record("Connection reset test did not throw an error") - Issue.record("Expected connection error but got successful invocation") - + } catch is CancellationError { + print("++CancellationError") + Issue.record("Runtime client did not send connection closed error") } catch let error as LambdaRuntimeError { + print("++LambdaRuntimeError") #expect(error.code == .connectionToControlPlaneLost) } catch let error as ChannelError { + print("++ChannelError") #expect(error == .ioOnClosedChannel) } catch let error as IOError { + print("++IOError") #expect(error.errnoCode == ECONNRESET || error.errnoCode == EPIPE) } catch { Issue.record("Unexpected error type: \(error)") diff --git a/Tests/AWSLambdaRuntimeTests/Timeout.swift b/Tests/AWSLambdaRuntimeTests/Timeout.swift new file mode 100644 index 00000000..6a8dc5dc --- /dev/null +++ b/Tests/AWSLambdaRuntimeTests/Timeout.swift @@ -0,0 +1,67 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +// as suggested by https://github.com/vapor/postgres-nio/issues/489#issuecomment-2186509773 +func timeout( + deadline: Duration, + _ closure: @escaping @Sendable () async throws -> Success +) async throws -> Success { + + let clock = ContinuousClock() + + let result = await withTaskGroup(of: TimeoutResult.self, returning: Result.self) { + taskGroup in + taskGroup.addTask { + do { + try await clock.sleep(until: clock.now + deadline, tolerance: nil) + return .deadlineHit + } catch { + return .deadlineCancelled + } + } + + taskGroup.addTask { + do { + let success = try await closure() + return .workFinished(.success(success)) + } catch let error { + return .workFinished(.failure(error)) + } + } + + var r: Swift.Result? + while let taskResult = await taskGroup.next() { + switch taskResult { + case .deadlineCancelled: + continue // loop + + case .deadlineHit: + taskGroup.cancelAll() + + case .workFinished(let result): + taskGroup.cancelAll() + r = result + } + } + return r! + } + + return try result.get() +} + +enum TimeoutResult { + case deadlineHit + case deadlineCancelled + case workFinished(Result) +} From cbf9c5e19d10e3ed5502817333908b6b28d8b83b Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 28 Aug 2025 08:13:48 +0200 Subject: [PATCH 27/30] remove debugging print statements --- Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index d6aa3cb1..12a2b762 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -309,16 +309,12 @@ struct LambdaRuntimeClientTests { Issue.record("Connection reset test did not throw an error") } catch is CancellationError { - print("++CancellationError") Issue.record("Runtime client did not send connection closed error") } catch let error as LambdaRuntimeError { - print("++LambdaRuntimeError") #expect(error.code == .connectionToControlPlaneLost) } catch let error as ChannelError { - print("++ChannelError") #expect(error == .ioOnClosedChannel) } catch let error as IOError { - print("++IOError") #expect(error.errnoCode == ECONNRESET || error.errnoCode == EPIPE) } catch { Issue.record("Unexpected error type: \(error)") From 9a709151a6575951ebed51955a2c4e30b769952e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 28 Aug 2025 09:34:24 +0200 Subject: [PATCH 28/30] add logger trace --- Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift index 12a2b762..fe85f973 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift @@ -311,10 +311,13 @@ struct LambdaRuntimeClientTests { } catch is CancellationError { Issue.record("Runtime client did not send connection closed error") } catch let error as LambdaRuntimeError { + logger.trace("LambdaRuntimeError - expected") #expect(error.code == .connectionToControlPlaneLost) } catch let error as ChannelError { + logger.trace("ChannelError - expected") #expect(error == .ioOnClosedChannel) } catch let error as IOError { + logger.trace("IOError - expected") #expect(error.errnoCode == ECONNRESET || error.errnoCode == EPIPE) } catch { Issue.record("Unexpected error type: \(error)") From c54cb2f62bb6317bd26d5c1aae275e923ea5d75f Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Fri, 29 Aug 2025 21:18:52 +0200 Subject: [PATCH 29/30] fulfill continuation in channelInactive() rather than channel.closeFuture.whenComplete --- .../LambdaRuntimeClient.swift | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index 61d49ad9..ee6f1032 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -365,16 +365,6 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol { ) channel.closeFuture.whenComplete { result in self.assumeIsolated { runtimeClient in - - // resume any pending continuation on the handler - if case .connected(_, let handler) = runtimeClient.connectionState { - if case .connected(_, let lambdaState) = handler.state { - if case .waitingForNextInvocation(let continuation) = lambdaState { - continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost)) - } - } - } - // close the channel runtimeClient.channelClosed(channel) runtimeClient.connectionState = .disconnected @@ -898,9 +888,16 @@ extension LambdaChannelHandler: ChannelInboundHandler { func channelInactive(context: ChannelHandlerContext) { // fail any pending responses with last error or assume peer disconnected switch self.state { - case .connected(_, .waitingForNextInvocation(let continuation)): + case .connected(_, let lambdaState): + switch lambdaState { + case .waitingForNextInvocation(let continuation): + continuation.resume(throwing: self.lastError ?? ChannelError.ioOnClosedChannel) + case .sentResponse(let continuation): + continuation.resume(throwing: self.lastError ?? ChannelError.ioOnClosedChannel) + case .idle, .sendingResponse, .waitingForResponse: + break + } self.state = .disconnected - continuation.resume(throwing: self.lastError ?? ChannelError.ioOnClosedChannel) default: break } From ccb3d1455c1192f51c17a283e7a21907c39ec4d8 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Fri, 29 Aug 2025 21:28:59 +0200 Subject: [PATCH 30/30] remove private(set) on LambdaChannelHandler.state --- Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift index ee6f1032..1d90e2c9 100644 --- a/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift @@ -467,7 +467,7 @@ private final class LambdaChannelHandler } } - private(set) var state: State = .disconnected + private var state: State = .disconnected private var lastError: Error? private var reusableErrorBuffer: ByteBuffer? private let logger: Logger