diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 731ca597..02607680 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -59,7 +59,7 @@ private enum LocalLambda { var logger = Logger(label: "LocalLambdaServer") logger.logLevel = configuration.general.logLevel self.logger = logger - self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) + self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1) self.host = configuration.runtimeEngine.ip self.port = configuration.runtimeEngine.port self.invocationEndpoint = invocationEndpoint ?? "/invoke" @@ -88,13 +88,20 @@ private enum LocalLambda { } final class HTTPHandler: ChannelInboundHandler { + + enum InvocationState { + case waitingForNextRequest + case idle(EventLoopPromise) + case processing(Pending) + } + public typealias InboundIn = HTTPServerRequestPart public typealias OutboundOut = HTTPServerResponsePart - private static let queueLock = Lock() - private static var queue = [String: Pending]() - private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>() + + private static var queue = [Pending]() + private static var invocationState: InvocationState = .waitingForNextRequest private let logger: Logger private let invocationEndpoint: String @@ -137,43 +144,63 @@ private enum LocalLambda { self.writeResponse(context: context, response: .init(status: .internalServerError)) } } - Self.queueLock.withLock { - Self.queue[requestId] = Pending(requestId: requestId, request: work, responsePromise: promise) + let pending = Pending(requestId: requestId, request: work, responsePromise: promise) + switch Self.invocationState { + case .idle(let promise): + promise.succeed(pending) + case .processing(_), .waitingForNextRequest: + Self.queue.append(pending) } } } else if request.head.uri.hasSuffix("/next") { - switch (Self.queueLock.withLock { Self.queue.popFirst() }) { + // check if our server is in the correct state + guard case .waitingForNextRequest = Self.invocationState else { + #warning("better error code?!") + self.writeResponse(context: context, response: .init(status: .conflict)) + return + } + + // pop the first task from the queue + switch !Self.queue.isEmpty ? Self.queue.removeFirst() : nil { case .none: - self.writeResponse(context: context, response: .init(status: .noContent)) - case .some(let pending): - var response = Response() - response.body = pending.value.request - // required headers - response.headers = [ - (AmazonHeaders.requestID, pending.key), - (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), - (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), - (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), - ] - Self.queueLock.withLock { - Self.queue[pending.key] = pending.value + // if there is nothing in the queue, create a promise that we can succeed, + // when we get a new task + let promise = context.eventLoop.makePromise(of: Pending.self) + promise.futureResult.whenComplete { (result) in + switch result { + case .failure(let error): + self.writeResponse(context: context, response: .init(status: .internalServerError)) + case .success(let pending): + Self.invocationState = .processing(pending) + self.writeResponse(context: context, response: pending.toResponse()) + } } - self.writeResponse(context: context, response: response) + Self.invocationState = .idle(promise) + case .some(let pending): + // if there is a task pending, we can immediatly respond with it. + Self.invocationState = .processing(pending) + self.writeResponse(context: context, response: pending.toResponse()) } } else if request.head.uri.hasSuffix("/response") { let parts = request.head.uri.split(separator: "/") guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else { + // the request is malformed, since we were expecting a requestId in the path return self.writeResponse(context: context, response: .init(status: .badRequest)) } - switch (Self.queueLock.withLock { Self.queue[requestId] }) { - case .none: - self.writeResponse(context: context, response: .init(status: .badRequest)) - case .some(let pending): - pending.responsePromise.succeed(.init(status: .ok, body: request.body)) - self.writeResponse(context: context, response: .init(status: .accepted)) - Self.queueLock.withLock { Self.queue[requestId] = nil } + guard case .processing(let pending) = Self.invocationState else { + // a response was send, but we did not expect to receive one + #warning("better error code?!") + return self.writeResponse(context: context, response: .init(status: .conflict)) } + guard requestId == pending.requestId else { + // the request's requestId is not matching the one we are expecting + return self.writeResponse(context: context, response: .init(status: .badRequest)) + } + + pending.responsePromise.succeed(.init(status: .ok, body: request.body)) + self.writeResponse(context: context, response: .init(status: .accepted)) + Self.invocationState = .waitingForNextRequest } else { self.writeResponse(context: context, response: .init(status: .notFound)) } @@ -211,6 +238,19 @@ private enum LocalLambda { let requestId: String let request: ByteBuffer let responsePromise: EventLoopPromise + + func toResponse() -> Response { + var response = Response() + response.body = self.request + // required headers + response.headers = [ + (AmazonHeaders.requestID, self.requestId), + (AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"), + (AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"), + (AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"), + ] + return response + } } }